1 个不稳定版本
0.1.0 | 2020年3月23日 |
---|
#1068 在 并发
12KB
105 代码行
Instrumented Mpsc
包装 futures::channel::mpsc::unbounded
以通过 Prometheus 计数器统计无界通道发送和接收的消息。
限制通道对于反向压力和适当的调度是必要的。在无界通道中,没有方法告诉生产者端让消费者端赶上进度。换句话说,调度器无法知道何时在拥挤的通道上优先处理消费者任务,反之亦然。
应始终避免无界通道。尽管如此,一些项目大量使用它们。此crate允许用户以最不侵入的方式获取无界通道的队列长度可见性。
注意:虽然这应该具有合理的性能,因为它归结为每次发送和接收的单个原子操作,但它不建议在生产环境中运行。
注意:请记住,这是使用全局初始化的计数器。虽然这不是任何编程最佳实践,但使用全局计数器可以以最不侵入的方式对无界通道进行监控。无需初始化计数器,也无需将其注册到注册表中。
use futures::StreamExt;
use instrumented_mpsc::{register_metrics, unbounded};
use prometheus::{Counter, Encoder, Registry, TextEncoder};
let registry = Registry::new();
register_metrics(®istry);
let (tx, mut rx) = unbounded();
tx.unbounded_send(()).unwrap();
futures::executor::block_on(async {
rx.next().await.unwrap();
});
drop(rx);
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metric_families = registry.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
assert_eq!(String::from_utf8(buffer).unwrap(), "# HELP instrumented_mpsc_channels_created_total Channels created total.\
\n# TYPE instrumented_mpsc_channels_created_total counter\
\ninstrumented_mpsc_channels_created_total 1\
\n# HELP instrumented_mpsc_channels_dropped_total Channels dropped total.\
\n# TYPE instrumented_mpsc_channels_dropped_total counter\
\ninstrumented_mpsc_channels_dropped_total 1\
\n# HELP instrumented_mpsc_msgs_received_total Messages received total.\
\n# TYPE instrumented_mpsc_msgs_received_total counter\
\ninstrumented_mpsc_msgs_received_total 1\
\n# HELP instrumented_mpsc_msgs_send_total Messages send total.\
\n# TYPE instrumented_mpsc_msgs_send_total counter\
\ninstrumented_mpsc_msgs_send_total 1\n");
依赖关系
~2.5MB
~55K SLoC