#json-stream #json-parser #json #parse-json #deserialize #stream

json-walker

JSON流解析器和反序列化器。在这个包中,您可以选择解析JSON的哪些部分。

1个不稳定版本

0.1.0 2024年2月14日

2339解析器实现

MIT 许可证

92KB
2K SLoC

Rust Json-Walker

通过这个库解析JSON流或文本。主要思想是保持一个游标并将其向前移动,直到找到所需元素,因此JSON可以部分可见。

验证在解析过程中发生。如果JSON格式不正确,它将引发panic。反序列化将返回Error。

  • 请注意,不支持反向遍历。
  • 默认情况下不支持异步操作,但可以通过通道来实现。查看以下示例。
  • "null"、"true" 和 "false" 在大写模式中不受支持,如果您需要它们,请修改 parser_core.rs 文件并取消注释带有 "support_capital_word" 标记的行(两者都位于 extract_word() 函数中)。
  • 要反序列化JSON的一部分,您需要启用 "deserialize" 功能。

功能

deserialize -> 通过 current_value() 函数启用反序列化

一些提供的方法

get_current_level -> 如果我们将JSON视为一棵树,则节点可以有级别
get_path -> 从树根到节点的路径
seek_by_level_offset -> 通过级别跳转到节点
next_item -> 下一个键或值
next_key -> 只获取下一个键,无论它是在子节点、父节点还是兄弟节点中
next_key_by_name -> 解析JSON直到特定的键
next_sibling_key -> 跳转到下一个兄弟键
next_item_by_pattern -> 如果您正在寻找特定的项目,则跳转到它
current_value_content -> 游标所在位置的相关值将作为内容返回
current_value -> 游标所在位置的相关值将作为反序列化后的值返回(启用 "deserialize" 功能才能使用此功能)

示例

use json_walker::json_walker::{Item, JsonWalker, ValueType, StringReader};

fn main() {
    // we are looking for "key2"
    let json = r#"{"key1": null, "key2": true, "key3": false, "key4": 111}"#;
    let mut walker = JsonWalker::from_string(StringReader::new(json.to_string()), 0);
    let result = walker.next_key_by_name("key2");
    assert_eq!(
        Some((ValueType::Str, String::from("key2"))),
        result
    );
}

通过模式查找第二个 "key4"。模式是一组比较器。在这个示例中,提供了一些比较器,但请注意,最终性能取决于您。
use json_walker::json_walker::{CurrentState, Item, JsonWalker, ValueType};

fn main() {
    let json = r#"[{"key1":{"key4":100},"key2":10},[{"key1":{"key4":300}, "key3":100}],"key1"]"#;
    let mut walker = JsonWalker::from_string(StringReader::new(json.to_string()), 0);
    let pattern = vec![
        |cs: &CurrentState| -> bool{
            cs.level == 2.0 && cs.nth_occurrence == 0
        },
        |cs: &CurrentState| -> bool{
            cs.latest_key.eq("key1") && cs.level == 3.0
        },
        |cs: &CurrentState| -> bool{
            cs.latest_key.eq("key4")
        },
    ];

    let item = walker.next_item_by_pattern(&pattern);
    assert_eq!(item, Some((ValueType::Str, String::from("key4"))));
}

此库默认不支持异步操作。必须由Reader处理。Reader是 Box<Iterator<Item=u8>>。

以下是如何处理异步Reader的示例

use json_walker::json_walker::JsonWalker;

use crate::stream_reader::StreamReader;

#[tokio::main]
async fn main() {
  let reader = StreamReader::new("path to json file".into(), 10);
  let mut walker = JsonWalker::new(Box::new(reader), 0);
  loop {
    match walker.next_item() {
      Ok(t) => { println!("{:?}", t) }
      Err(_) => { break; }
    }
  }
}

mod stream_reader {
  use std::fs::File;
  use std::io::{BufRead, BufReader};
  use std::sync::mpsc::{Receiver, sync_channel, SyncSender};

  // read target file line by line asynchronously
  async fn read_line(reader: &mut BufReader<File>) -> Result<String, ()> {
    let mut buffer = String::new();
    return match reader.read_line(&mut buffer) {
      Ok(_) => { Ok(buffer) }
      _ => { Err(()) }
    };
  }

  pub struct StreamReader {
    buffer_slice: Vec<u8>,
    buffer_slice_len: usize,
    index: usize,
    consumer: Receiver<Vec<u8>>,
  }

  impl StreamReader {
    pub fn new(file_path: String, queue_size: usize) -> Self {
      // std::..::sync_channel makes a queue and that queue will be filled to the queue_size.
      // receiver will wait till there is some data ready to use, otherwise no blocking happens
      let (producer, consumer) = sync_channel(queue_size);
      StreamReader::start_thread(file_path, producer);
      StreamReader {
        buffer_slice: vec![],
        buffer_slice_len: 0,
        index: 0,
        consumer,
      }
    }

    // fill sender queue in this thread
    fn start_thread(file_path: String, producer: SyncSender<Vec<u8>>) {
      tokio::spawn(async move {
        let file = File::open(file_path.clone()).expect(&format!("file: {} does not exist", file_path));
        let mut reader = BufReader::new(file);
        loop {
          // read_line needs await mechanism
          match read_line(&mut reader).await {
            Ok(line) => {
              match producer.send(line.into_bytes()) {
                Err(_) => break, // maybe receiver is dismissed
                _ => {}
              }
            }
            Err(_) => {
              panic!("reading file failed");
            }
          };
        }
      });
    }
  }

  impl Iterator for StreamReader {
    type Item = u8;

    // provide bytes
    fn next(&mut self) -> Option<Self::Item> {
      // if current piece of data is finished, catch another one
      if self.index == self.buffer_slice_len {
        match self.consumer.recv() {
          Ok(d) if d.len() > 0 => {
            self.buffer_slice = d;
            self.buffer_slice_len = self.buffer_slice.len();
            self.index = 0;
          }
          _ => { return None; }
        }
      }
      let i = self.index;
      self.index += 1;
      Some(self.buffer_slice[i])
    }
  }
}
  • 其他示例提供在函数文档和测试中

依赖项

~175KB