#metrics #statsd #unix-socket #udp-socket #client #cadence #sink

cadence-with-flush

Cadence的分支,支持客户端刷新功能

2个版本

0.29.1 2023年5月30日
0.29.0 2023年1月21日

#4 in #cadence

Apache-2.0 OR MIT

210KB
3K SLoC

Cadence

build status docs.rs crates.io Rust 1.36+

Cadence文档

宏文档

一个可扩展的Rust Statsd客户端!

Cadence是一种快速灵活的方式,可以将Statsd指标从应用程序中发出。

特性

  • 支持 通过UDP(或可选的Unix套接字)将计数器、计时器、直方图、分布、仪表、计量器和集合等指标发送到Statsd。
  • 支持通过MetricSink特质使用其他后端。
  • 支持Datadog风格的指标标签。
  • 用于简化常见的指标发出调用
  • 一个简单而灵活的指标发送API。

安装

要在项目中使用cadence,请将其添加到您的Cargo.toml文件中的依赖项中。

[dependencies]
cadence = "x.y.z"

这就足够了!

用法

以下展示了Cadence的一些用法示例。这些示例从简单开始,逐渐过渡到如何在生产应用程序中使用Cadence。

简单使用

以下展示了Cadence的简单使用。在这个例子中,我们只导入了客户端,创建了一个将写入某个假想的指标服务器的实例,并发送了一些指标。

use std::net::UdpSocket;
use cadence::prelude::*;
use cadence::{StatsdClient, UdpMetricSink, DEFAULT_PORT};
// Create client that will write to the given host over UDP.
//
// Note that you'll probably want to actually handle any errors creating
// the client when you use it for real in your application. We're just
// using .unwrap() here since this is an example!
let host = ("metrics.example.com", DEFAULT_PORT);
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let sink = UdpMetricSink::from(host, socket).unwrap();
let client = StatsdClient::from_sink("my.metrics", sink);

// Emit metrics!
client.incr("some.counter");
client.time("some.methodCall", 42);
client.gauge("some.thing", 7);
client.meter("some.value", 5);

缓冲UDP接收器

虽然通过UDP发送指标非常快,但频繁的网络调用开销可能会开始累积。如果您正在编写一个高性能应用程序,该应用程序发出大量指标,这一点尤为重要。

为了确保指标不会干扰您应用程序的性能,您可能希望使用一个MetricSink实现,该实现会在发送单个网络操作之前缓冲多个指标。为此,我们有BufferedUdpMetricSink。以下是如何使用此接收器的示例。

use std::net::UdpSocket;
use cadence::prelude::*;
use cadence::{StatsdClient, BufferedUdpMetricSink, DEFAULT_PORT};

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_nonblocking(true).unwrap();

let host = ("metrics.example.com", DEFAULT_PORT);
let sink = BufferedUdpMetricSink::from(host, socket).unwrap();
let client = StatsdClient::from_sink("my.prefix", sink);

client.count("my.counter.thing", 29);
client.time("my.service.call", 214);
client.incr("some.event");

如您所见,使用此缓冲UDP接收器与使用常规的、非缓冲的UDP接收器一样简单。

这个漏斗的唯一缺点是,只有在缓冲区满时才会将指标写入Statsd服务器。如果你有一个持续发出指标的繁忙应用程序,这应该不是问题。然而,如果你的应用程序只是偶尔发出指标,这个漏斗可能会导致指标延迟一段时间,直到缓冲区填满。在这种情况下,使用UdpMetricSink可能更合适,因为它不做任何缓冲。

异步指标队列

为了保证发出指标不会干扰应用程序的性能(尽管发出指标通常非常快),最好确保指标在不同于应用程序线程的线程中发出。

为此,有QueuingMetricSink。这个漏斗允许你将任何其他指标漏斗包装起来,并通过队列将其发送,因为它在另一个线程中异步地发出指标,从应用程序流程中分离出来。

包装的指标漏斗的要求是它是线程安全的,这意味着它实现了SendSync特性。如果你使用的是来自Cadence的另一个漏斗,你不需要担心:它们都是线程安全的。

以下是一个使用QueuingMetricSink来包装缓冲UDP指标漏斗的示例。这是在生产中使用Cadence的首选方式。

use std::net::UdpSocket;
use cadence::prelude::*;
use cadence::{StatsdClient, QueuingMetricSink, BufferedUdpMetricSink, DEFAULT_PORT};

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_nonblocking(true).unwrap();

let host = ("metrics.example.com", DEFAULT_PORT);
let udp_sink = BufferedUdpMetricSink::from(host, socket).unwrap();
let queuing_sink = QueuingMetricSink::from(udp_sink);
let client = StatsdClient::from_sink("my.prefix", queuing_sink);

client.count("my.counter.thing", 29);
client.time("my.service.call", 214);

在上面的示例中,我们使用了队列漏斗的默认构造函数,它创建了一个无界队列,没有最大大小,将主线程(客户端发送指标)连接到后台线程(包装的漏斗运行)。如果你想创建一个有界队列,可以具有最大大小,可以使用with_capacity构造函数。以下是一个示例。

use std::net::UdpSocket;
use cadence::prelude::*;
use cadence::{StatsdClient, QueuingMetricSink, BufferedUdpMetricSink,
              DEFAULT_PORT};

// Queue with a maximum capacity of 128K elements
const QUEUE_SIZE: usize = 128 * 1024;

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_nonblocking(true).unwrap();

let host = ("metrics.example.com", DEFAULT_PORT);
let udp_sink = BufferedUdpMetricSink::from(host, socket).unwrap();
let queuing_sink = QueuingMetricSink::with_capacity(udp_sink, QUEUE_SIZE);
let client = StatsdClient::from_sink("my.prefix", queuing_sink);

client.count("my.counter.thing", 29);
client.time("my.service.call", 214);
client.incr("some.event");

使用具有容量的QueuingMetricSink意味着当队列满时,尝试通过StatsdClient发出指标将失败。虽然这不好,但如果你使用的是无界队列,未发送的指标会逐渐占用更多内存,直到你的应用程序耗尽所有内存。

使用无界队列意味着指标的发送可以吸收发送指标时的减慢,直到你的应用程序耗尽内存。使用有界队列将限制你的应用程序中发送指标将使用的内存量。这是Cadence用户必须自己决定的权衡。

使用标签

通过使用Cadence StatsdClient结构体中包含的每个_with_tags方法来为指标添加标签。以下是如何使用这些方法的示例。注意,标签是Statsd协议的扩展,因此可能不被所有服务器支持。

有关更多信息,请参阅Datadog文档

use cadence::prelude::*;
use cadence::{Metric, StatsdClient, NopMetricSink};

let client = StatsdClient::from_sink("my.prefix", NopMetricSink);

let res = client.count_with_tags("my.counter", 29)
    .with_tag("host", "web03.example.com")
    .with_tag_value("beta-test")
    .try_send();

assert_eq!(
    concat!(
        "my.prefix.my.counter:29|c|#",
        "host:web03.example.com,",
        "beta-test"
    ),
    res.unwrap().as_metric_str()
);

默认标签

可以在构建时使用构建器向StatsdClient添加默认标签。默认标签会添加到StatsdClient发出的每个指标中,无需在构建客户端后进行任何额外工作。注意,标签是Statsd协议的扩展,因此可能不被所有服务器支持。

有关更多信息,请参阅Datadog文档

use cadence::prelude::*;
use cadence::{Metric, StatsdClient, NopMetricSink};

let client = StatsdClient::builder("my.prefix", NopMetricSink)
    .with_tag("env", "prod")
    .with_tag("app", "auth")
    .build();

let res = client.count_with_tags("my.counter", 29)
    .with_tag("host", "web03.example.com")
    .with_tag_value("beta-test")
    .try_send();

assert_eq!(
    concat!(
        "my.prefix.my.counter:29|c|#",
        "env:prod,",
        "app:auth,",
        "host:web03.example.com,",
        "beta-test"
    ),
    res.unwrap().as_metric_str()
);

值打包

值打包允许将多个值作为一个单一的指标发送给直方图、分布和计时器类型。Cadence客户端接受直方图、分布和计时器方法的Vec<T>,并将多个值格式化如下。请注意,此功能是Datadog的扩展功能,因此您的服务器可能不支持。它由Datadog代理的版本>=v6.25.0 && <v7.0.0>=v7.25.0支持。

打包指标具有以下格式

<METRIC_NAME>:<VALUE1>:<VALUE2>:<VALUE3>|<TYPE>|#<TAG_KEY_1>:<TAG_VALUE_1>,<TAG_2>`

有关更多信息,请参阅Datadog文档

use cadence::prelude::*;
use cadence::{Metric, StatsdClient, NopMetricSink};

let client = StatsdClient::from_sink("my.prefix", NopMetricSink);

let res = client.distribution_with_tags("my.distribution", vec![29, 30, 31, 32])
    .with_tag("host", "web03.example.com")
    .with_tag_value("beta-test")
    .try_send();

assert_eq!(
    concat!(
        "my.prefix.my.distribution:29:30:31:32|d|#",
        "host:web03.example.com,",
        "beta-test"
    ),
    res.unwrap().as_metric_str()
);

实现的特质

Cadence StatsdClient结构体使用的每个用于发送指标的函数都实现为一个特质。还有一个将所有这些特质组合在一起的特质。如果我们愿意,我们可以只使用其中一个特质类型来引用客户端实例。如果您想在单元测试代码时用模拟版本替换实际Cadence客户端,或者想通过特质和指针抽象出客户端的实现细节,这可能对您有用。

这些特质都导出在预定义模块中。它们也存在于主模块中,但通常不会像那样使用。

use cadence::prelude::*;
use cadence::{StatsdClient, UdpMetricSink, DEFAULT_PORT};

pub struct User {
    id: u64,
    username: String,
    email: String
}

// Here's a simple DAO (Data Access Object) that doesn't do anything but
// uses a metric client to keep track of the number of times the
// 'getUserById' method gets called.
pub struct MyUserDao {
    metrics: Box<dyn MetricClient>
}

impl MyUserDao {
    // Create a new instance that will use the StatsdClient
    pub fn new<T: MetricClient + 'static>(metrics: T) -> MyUserDao {
        MyUserDao { metrics: Box::new(metrics) }
    }

    /// Get a new user by their ID
    pub fn get_user_by_id(&self, id: u64) -> Option<User> {
        self.metrics.incr("getUserById");
        None
    }
}

// Create a new Statsd client that writes to "metrics.example.com"
let host = ("metrics.example.com", DEFAULT_PORT);
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let sink = UdpMetricSink::from(host, socket).unwrap();
let metrics = StatsdClient::from_sink("counter.example", sink);

// Create a new instance of the DAO that will use the client
let dao = MyUserDao::new(metrics);

// Try to lookup a user by ID!
match dao.get_user_by_id(123) {
    Some(u) => println!("Found a user!"),
    None => println!("No user!")
};

静默指标发送和错误处理

在发送指标时,有时您并不真正关心尝试发送它的结果,或者您可能只是不想与代码的其他部分一起处理它。为了处理这种情况,Cadence允许您设置默认的错误处理器。当发送指标出现错误时,将调用此处理器,这样调用代码就不必处理它们。

以下是一个配置错误处理器以及可能调用它的示例。

use cadence::prelude::*;
use cadence::{MetricError, StatsdClient, NopMetricSink};

fn my_error_handler(err: MetricError) {
    println!("Metric error! {}", err);
}

let client = StatsdClient::builder("prefix", NopMetricSink)
    .with_error_handler(my_error_handler)
    .build();

// When sending metrics via the `MetricBuilder` used for assembling tags,
// callers may opt into sending metrics quietly via the `.send()` method
// as opposed to the `.try_send()` method
client.count_with_tags("some.counter", 42)
    .with_tag("region", "us-east-2")
    .send();

自定义指标接收器

Cadence StatsdClient使用MetricSink特质的实现将指标发送到指标服务器。Cadence库的大多数用户可能希望使用包装BufferedMetricSink实例的QueuingMetricSink

但是,您可能想做一些现有接收器无法覆盖的事情。以下是一个创建自定义接收器的示例。

use std::io;
use cadence::prelude::*;
use cadence::{StatsdClient, MetricSink, DEFAULT_PORT};

pub struct MyMetricSink;

impl MetricSink for MyMetricSink {
    fn emit(&self, metric: &str) -> io::Result<usize> {
        // Your custom metric sink implementation goes here!
        Ok(0)
    }
}

let sink = MyMetricSink;
let client = StatsdClient::from_sink("my.prefix", sink);

client.count("my.counter.thing", 42);
client.time("my.method.time", 25);
client.incr("some.other.counter");

自定义UDP套接字

大多数Cadence StatsdClient用户会使用它通过UDP套接字发送指标。如果您需要自定义套接字,例如您想在阻塞模式下使用套接字但设置写入超时,您可以像下面演示的那样做。

use std::net::UdpSocket;
use std::time::Duration;
use cadence::prelude::*;
use cadence::{StatsdClient, UdpMetricSink, DEFAULT_PORT};

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_write_timeout(Some(Duration::from_millis(1))).unwrap();

let host = ("metrics.example.com", DEFAULT_PORT);
let sink = UdpMetricSink::from(host, socket).unwrap();
let client = StatsdClient::from_sink("my.prefix", sink);

client.count("my.counter.thing", 29);
client.time("my.service.call", 214);
client.incr("some.event");
client.set("users.uniques", 42);

Unix套接字

Cadence还支持使用Unix数据报套接字,通过UnixMetricSinkBufferedUnixMetricSink。Unix套接字可以用于将指标发送到与应用程序在同一台机器(物理机器、VM、Pod中的容器)上运行的服务器或代理。Unix套接字与UDP套接字类似,但有几个重要区别

  • 在不存在或未被监听的套接字上发送指标会导致错误。
  • 连接套接字上发送的指标保证能够被送达(即,它们是可靠的,与UDP套接字不同)。然而,由于各种环境和服务器特定的原因,指标仍可能不会被服务器读取。

以下是一个使用sinks的示例。

use std::os::unix::net::UnixDatagram;
use cadence::prelude::*;
use cadence::{StatsdClient, BufferedUnixMetricSink};

let socket = UnixDatagram::unbound().unwrap();
socket.set_nonblocking(true).unwrap();
let sink = BufferedUnixMetricSink::from("/run/statsd.sock", socket);
let client = StatsdClient::from_sink("my.prefix", sink);

client.count("my.counter.thing", 29);
client.time("my.service.call", 214);
client.incr("some.event");
client.set("users.uniques", 42);

注意:此功能仅在Unix平台(Linux、BSD、MacOS)上可用。

其他

有关Cadence的更多信息,请参阅存储库根目录下的README

依赖项

~345KB