#iterator #timeout #peek #wrapper #buffer

timeout-iterator

TimeoutIterator 是任何迭代器的包装器,它添加了 peek_timeout 和 next_timeout 函数。标准用法是解析多行自由格式记录(如跟踪日志文件),在这种情况下,希望消费最后一行,并查看记录是否在下次继续,但不无限期地阻塞在 peek 上。

15个版本 (稳定)

1.1.7 2021年12月23日
1.1.4 2021年1月3日
1.1.2 2020年12月31日
0.5.0 2020年4月29日
0.2.0 2019年9月28日

#203 in 异步

每月34次下载

Apache-2.0

27KB
544

Build Status

timeout-iterator

timeout_iterator::TimeoutIterator 是任何迭代器的包装器,它添加了两个额外的函数

  • peek_timeout()
  • next_timeout()

标准用法是解析多行自由格式记录(如跟踪日志文件),在这种情况下,希望消费最后一行,并查看记录是否在下次继续,不无限期地阻塞在 peek 上。

例如,这是用于从 /dev/kmsg 解析内核日志的。一个内核日志记录可能看起来像这样

6,361,518496,-;ahci 0000:00:05.0: AHCI 0001.0300 32 slots 6 ports 6 Gbps 0x1 impl SATA mode
 SUBSYSTEM=pci
 DEVICE=+pci:0000:00:05.0

这种记录的结束只有在下一行开始新的记录时才知道。然而,如果这是最后一条记录,则它可能永远不会被刷新/解析,因为没有下一个记录来触发它。

这就是一个具有超时功能的迭代器如何帮助打破僵局的地方。

同步迭代

当使用功能 sync 时。

迭代器明显且直观

use timeout_iterator::synchronous::TimeoutIterator;

let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];
let mut ti = TimeoutIterator::with_iter(numbers.into_iter()).unwrap();

这将遍历 Vector 中的所有整数。

但是,如果底层迭代器是突发的,则可以在一定超时时间后退出迭代。这在跟踪日志文件时非常有用 - 您可以等待超时以发布或处理日志行组。

use timeout_iterator::synchronous::TimeoutIterator;
use timeout_iterator::error::TimeoutIteratorError;
use std::io::{BufReader, BufRead};
use std::fs::File;
use std::time::Duration;

let file = File::open("log.txt").unwrap();
let lines = BufReader::new(file).lines();
let mut ti = TimeoutIterator::with_iter(lines).unwrap();

loop {
   match ti.next_timeout(Duration::from_secs(1)) {
       Ok(Ok(next_line)) => {
           println!("{}", next_line);
       },
       Ok(Err(_)) => {
           // TimeoutIterator succeeded, underlying iterator provided an error
       },
       Err(TimeoutIteratorError::TimedOut) => {
           // timed out waiting for underlying iterator to provide something
       },
       Err(_) => {
           // other TimeoutIterator error
       }
   }
}

还有一个类似的 peek_timeout 函数可以查看下一个条目(或超时执行此操作),因此您可以看到是否有任何东西在下面,而无需消费它。

异步迭代

当使用功能 async 时。

流包装器直观

    let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];
    let numbers_stream = iter(numbers.into_iter());

    let mut ti = TimeoutStream::with_stream(numbers_stream).await?;

您可以使用它就像任何其他 Stream 一样

            assert_eq!(ti.next().await?, 1);
            assert_eq!(ti.next().await?, 2);
            assert_eq!(ti.next().await?, 3);
            assert_eq!(ti.next().await?, 4);
            assert_eq!(ti.next().await?, 5);

您可以查看下一个值而不消费它

    // Can peek many times
    assert_eq!(ti.peek().await.unwrap(), 1);
    assert_eq!(ti.peek().await.unwrap(), 1);
    assert_eq!(ti.peek().await.unwrap(), 1);
    assert_eq!(ti.peek().await.unwrap(), 1);

    // And then consume with 'next`
    assert_eq!(ti.next().await.unwrap(), 3);

您可以使用超时查看或消费,并捕获 Error::TimedOut 错误


    let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];

    // Slow down the numbers to stream
    let throttled_numbers = iter(numbers.into_iter())
        .throttle(Duration::from_secs(1));

    let mut ti = TimeoutStream::with_stream(throttled_numbers).await.unwrap();

    // First number is always available
    assert_eq!(ti.next().await.unwrap(), 1);

    // 2nd number will timeout at half a second in
    assert_matches!(ti.next_timeout(Duration::from_millis(500)).await.unwrap_err(), Error::TimedOut);

    // Will consume it if called blocking
    assert_eq!(ti.next().await.unwrap(), 2);

    // Peek with timeout will... timeout
    assert_matches!(ti.peek_timeout(Duration::from_millis(500)).await.unwrap_err(), Error::TimedOut);

    // a blocking peek will eventually succeed
    // we dereference the peek because it's a reference (not move)
    assert_eq!(*ti.peek().await.unwrap(), 3);

    // The number will be consumed
    assert_eq!(ti.next().await.unwrap(), 3);

    // As proven by the next number
    assert_eq!(ti.next().await.unwrap(), 4);

依赖关系

~0–1.5MB
~26K SLoC