15个版本 (稳定)
1.1.7 | 2021年12月23日 |
---|---|
1.1.4 | 2021年1月3日 |
1.1.2 | 2020年12月31日 |
0.5.0 | 2020年4月29日 |
0.2.0 | 2019年9月28日 |
#203 in 异步
每月34次下载
27KB
544 行
timeout-iterator
timeout_iterator::TimeoutIterator
是任何迭代器的包装器,它添加了两个额外的函数
- peek_timeout()
- next_timeout()
标准用法是解析多行自由格式记录(如跟踪日志文件),在这种情况下,希望消费最后一行,并查看记录是否在下次继续,不无限期地阻塞在 peek 上。
例如,这是用于从 /dev/kmsg
解析内核日志的。一个内核日志记录可能看起来像这样
6,361,518496,-;ahci 0000:00:05.0: AHCI 0001.0300 32 slots 6 ports 6 Gbps 0x1 impl SATA mode
SUBSYSTEM=pci
DEVICE=+pci:0000:00:05.0
这种记录的结束只有在下一行开始新的记录时才知道。然而,如果这是最后一条记录,则它可能永远不会被刷新/解析,因为没有下一个记录来触发它。
这就是一个具有超时功能的迭代器如何帮助打破僵局的地方。
同步迭代
当使用功能 sync
时。
迭代器明显且直观
use timeout_iterator::synchronous::TimeoutIterator;
let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];
let mut ti = TimeoutIterator::with_iter(numbers.into_iter()).unwrap();
这将遍历 Vector 中的所有整数。
但是,如果底层迭代器是突发的,则可以在一定超时时间后退出迭代。这在跟踪日志文件时非常有用 - 您可以等待超时以发布或处理日志行组。
use timeout_iterator::synchronous::TimeoutIterator;
use timeout_iterator::error::TimeoutIteratorError;
use std::io::{BufReader, BufRead};
use std::fs::File;
use std::time::Duration;
let file = File::open("log.txt").unwrap();
let lines = BufReader::new(file).lines();
let mut ti = TimeoutIterator::with_iter(lines).unwrap();
loop {
match ti.next_timeout(Duration::from_secs(1)) {
Ok(Ok(next_line)) => {
println!("{}", next_line);
},
Ok(Err(_)) => {
// TimeoutIterator succeeded, underlying iterator provided an error
},
Err(TimeoutIteratorError::TimedOut) => {
// timed out waiting for underlying iterator to provide something
},
Err(_) => {
// other TimeoutIterator error
}
}
}
还有一个类似的 peek_timeout
函数可以查看下一个条目(或超时执行此操作),因此您可以看到是否有任何东西在下面,而无需消费它。
异步迭代
当使用功能 async
时。
流包装器直观
let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];
let numbers_stream = iter(numbers.into_iter());
let mut ti = TimeoutStream::with_stream(numbers_stream).await?;
您可以使用它就像任何其他 Stream 一样
assert_eq!(ti.next().await?, 1);
assert_eq!(ti.next().await?, 2);
assert_eq!(ti.next().await?, 3);
assert_eq!(ti.next().await?, 4);
assert_eq!(ti.next().await?, 5);
您可以查看下一个值而不消费它
// Can peek many times
assert_eq!(ti.peek().await.unwrap(), 1);
assert_eq!(ti.peek().await.unwrap(), 1);
assert_eq!(ti.peek().await.unwrap(), 1);
assert_eq!(ti.peek().await.unwrap(), 1);
// And then consume with 'next`
assert_eq!(ti.next().await.unwrap(), 3);
您可以使用超时查看或消费,并捕获 Error::TimedOut 错误
let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];
// Slow down the numbers to stream
let throttled_numbers = iter(numbers.into_iter())
.throttle(Duration::from_secs(1));
let mut ti = TimeoutStream::with_stream(throttled_numbers).await.unwrap();
// First number is always available
assert_eq!(ti.next().await.unwrap(), 1);
// 2nd number will timeout at half a second in
assert_matches!(ti.next_timeout(Duration::from_millis(500)).await.unwrap_err(), Error::TimedOut);
// Will consume it if called blocking
assert_eq!(ti.next().await.unwrap(), 2);
// Peek with timeout will... timeout
assert_matches!(ti.peek_timeout(Duration::from_millis(500)).await.unwrap_err(), Error::TimedOut);
// a blocking peek will eventually succeed
// we dereference the peek because it's a reference (not move)
assert_eq!(*ti.peek().await.unwrap(), 3);
// The number will be consumed
assert_eq!(ti.next().await.unwrap(), 3);
// As proven by the next number
assert_eq!(ti.next().await.unwrap(), 4);
依赖关系
~0–1.5MB
~26K SLoC