#metrics #observability #logging-tracing #tracing #logging

emit_otlp

向OpenTelemetry兼容的收集器发送诊断事件

9个版本

0.11.0-alpha.9 2024年8月21日
0.11.0-alpha.82024年8月15日
0.11.0-alpha.72024年7月31日
0.11.0-alpha.62024年6月28日

#245 in 调试

Download history 402/week @ 2024-06-03 302/week @ 2024-06-10 7/week @ 2024-06-17 166/week @ 2024-06-24 86/week @ 2024-07-01 125/week @ 2024-07-29 1/week @ 2024-08-05 121/week @ 2024-08-12

247 每月下载量

MIT/Apache

485KB
9K SLoC

emit_otlp

otlp

当前文档

通过OpenTelemetry协议(OTLP)发送诊断事件。

此库直接向某些远程OTLP接收器发送导出请求。如果您需要将emit与OpenTelemetry SDK集成,请参阅emit-opentelemetry


lib.rs:

通过OpenTelemetry协议(OTLP)发送诊断事件。

此库提供Otlp,一个发送导出请求直接到某些远程OTLP接收器的emit::Emitter。如果您需要将emit与OpenTelemetry SDK集成,请参阅emit-opentelemetry

工作原理

┌────────────────────────────────────────┐  ┌─────────────┐    ┌─────────────────────────────┐
│                caller                  │  │   channel   │    │     background worker       │
│                                        │  │             │    │                             │
│ emit::Event─┬─*─►is trace?──►Span──────┼──┼──►Trace─────┼─┐  │ ExportTraceServiceRequest   │
│             │                          │  │             │ │  │                             │
│             ├─*─►is metric?─►Metric────┼──┼──►Metrics───┼─┼──► ExportMetricsServiceRequest │
│             │                          │  │             │ │  │                             │
│             └─*─────────────►LogRecord─┼──┼──►Logs──────┼─┘  │ ExportLogsServiceRequest    │
└────────────────────────────────────────┘  └─────────────┘    └─────────────────────────────┘
* Only if the logs/trace/metrics signal is configured

发射器基于异步、批量通道。诊断事件通过以下关键步骤从emit::emit!传递到远程OTLP接收器

  1. 确定事件属于哪种信号
  • 如果事件携带emit::Kind::Span,并且已配置跟踪信号,则将其视为跨度。
  • 如果事件携带emit::Kind::Metric,并且已配置了度量信号,则将其视为度量。
  • 在其他任何情况下,如果已配置了日志信号,则将其视为日志记录。
  1. 将事件序列化为目标格式(JSON/protobuf)的OTLP数据结构。
  2. 将序列化的事件放入通道。每个信号在通道中都有自己的内部队列。
  3. 在后台工作线程中,通过将事件组合成OTLP导出请求并使用目标协议(HTTP/gRPC)发送它们来处理通道中的事件。

此库基于hyper的HTTP和rustls的TLS以及ring。这些依赖项不可配置,也不能替换为替代实现。

入门指南

emitemit_otlp添加到您的Cargo.toml

[dependencies.emit]
version = "0.11.0-alpha.9"

[dependencies.emit_otlp]
version = "0.11.0-alpha.9"

main.rs的开始处使用OTLP发射器初始化emit

fn main() {
let rt = emit::setup()
.emit_to(emit_otlp::new()
// Add required resource properties for OTLP
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
})
// Configure endpoints for logs/traces/metrics using gRPC + protobuf
.logs(emit_otlp::logs_grpc_proto("http://localhost:4319"))
.traces(emit_otlp::traces_grpc_proto("http://localhost:4319"))
.metrics(emit_otlp::metrics_grpc_proto("http://localhost:4319"))
.spawn()
.unwrap())
.init();

// Your app code goes here

rt.blocking_flush(std::time::Duration::from_secs(30));
}

[new] 方法返回一个OtlpBuilder,可以通过其OtlpBuilder::logsOtlpBuilder::tracesOtlpBuilder::metrics方法进行配置。

您不需要配置所有信号,但至少应配置OtlpBuilder::logs

配置好构建器后,调用OtlpBuilder::spawn并将生成的Otlp传递给emit::Setup::emit_to

后台工作线程的生成位置

Otlp发射器不会直接执行任何工作。所有这些都由通过OtlpBuilder::spawn创建的后台工作线程处理。而OtlpBuilder::spawn实际上在何处生成后台工作线程取决于其调用位置。

如果OtlpBuilder::spawntokio运行时内调用,则工作线程将生成到该运行时

// This will spawn in the active tokio runtime because of #[tokio::main]

#[tokio::main]
async fn main() {
let rt = emit::setup()
.emit_to(emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
})
.logs(emit_otlp::logs_grpc_proto("http://localhost:4319"))
.spawn()
.unwrap())
.init();

rt.blocking_flush(std::time::Duration::from_secs(30));
}

如果在外部 tokio 运行时调用 OtlpBuilder::spawn,则工作将在具有单线程执行器的后台线程上启动

// This will spawn on a background thread because there's no active tokio runtime

fn main() {
let rt = emit::setup()
.emit_to(emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
})
.logs(emit_otlp::logs_grpc_proto("http://localhost:4319"))
.spawn()
.unwrap())
.init();

rt.blocking_flush(std::time::Duration::from_secs(30));
}

为 gRPC+protobuf 配置

函数 logs_grpc_prototraces_grpc_protometrics_grpc_proto 生成 gRPC+protobuf 的构建器

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
})
.logs(emit_otlp::logs_grpc_proto("http://localhost:4319"))
.traces(emit_otlp::traces_grpc_proto("http://localhost:4319"))
.metrics(emit_otlp::metrics_grpc_proto("http://localhost:4319"))
.spawn()
.unwrap()

gRPC 基于 HTTP,并使用已知的 URI 路径来路由 RPC 请求。这些路径将自动附加到端点,因此您在配置时无需指定它们。

为 HTTP+JSON 配置

函数 logs_http_jsontraces_http_jsonmetrics_http_json 生成 HTTP+JSON 的构建器

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
})
.logs(emit_otlp::logs_http_json("http://localhost:4318/v1/logs"))
.traces(emit_otlp::traces_http_json("http://localhost:4318/v1/traces"))
.metrics(emit_otlp::metrics_http_json("http://localhost:4318/v1/metrics"))
.spawn()
.unwrap()

为 HTTP+protobuf 配置

函数 logs_http_prototraces_http_protometrics_http_proto 生成 HTTP+protobuf 的构建器

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
})
.logs(emit_otlp::logs_http_proto("http://localhost:4318/v1/logs"))
.traces(emit_otlp::traces_http_proto("http://localhost:4318/v1/traces"))
.metrics(emit_otlp::metrics_http_proto("http://localhost:4318/v1/metrics"))
.spawn()
.unwrap()

配置 TLS

如果启用了 tls Cargo 功能,并且端点方案的协议使用 https:// 方案,则它将使用 rustlsrustls-native-certs 的 TLS。

配置压缩

如果启用了 gzip Cargo 功能,则将自动对所有导出请求应用 gzip 压缩。

您可以通过 OtlpTransportBuilder 禁用任何压缩

emit_otlp::new()
.logs(emit_otlp::logs_proto(emit_otlp::http("http://localhost:4318/v1/logs")
.allow_compression(false))
)

自定义 HTTP 标头

您可以通过 OtlpTransportBuilder 指定用于 HTTP 或 gRPC 请求的自定义标头

emit_otlp::new()
.logs(emit_otlp::logs_proto(emit_otlp::http("http://localhost:4318/v1/logs")
.headers([
("X-ApiKey", "abcd"),
]))
)

配置资源

方法 OtlpBuilder::resource 配置要随每个导出请求发送的 OTLP 资源。一些 OTLP 接收器接受没有资源的数据,但 OpenTelemetry 规范本身要求使用资源。

至少,您应该添加 service.name 属性

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
})

您还应考虑设置其他已知资源属性

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
#[emit::key("telemetry.sdk.language")]
language: emit_otlp::telemetry_sdk_language(),
#[emit::key("telemetry.sdk.name")]
sdk: emit_otlp::telemetry_sdk_name(),
#[emit::key("telemetry.sdk.version")]
version: emit_otlp::telemetry_sdk_version(),
})

日志

所有 emit::Event 都可以表示为 OTLP 日志记录。您至少应配置日志信号以确保以某种方式捕获所有诊断。gRPC+Protobuf 的最小日志配置如下

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
})
.logs(emit_otlp::logs_grpc_proto("http://localhost:4318"))
.spawn()
.unwrap()

以下诊断

emit::info!("Hello, OTLP!");

将生成以下 HTTP+JSON 导出请求

http://localhost:4318/v1/logs
{
"resourceLogs": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeLogs": [
{
"scope": {
"name": "my_app"
},
"logRecords": [
{
"timeUnixNano": 1716804019165847000,
"observedTimeUnixNano": 1716804019165847000,
"body": {
"stringValue": "Hello, OTLP!"
},
"attributes": [],
"severityNumber": 9,
"severityText": "info"
}
]
}
]
}
]
}

当未配置跟踪信号时,跨度的事件诊断将表示为常规的 OTLP 日志记录。以下诊断

#[emit::span("Compute {a} + {b}")]
fn add(a: i32, b: i32) -> i32 {
let r = a + b;

emit::info!("Produced {r}", r);

r
}

add(1, 3);

将生成以下 HTTP+JSON 导出请求

http://localhost:4318/v1/logs
{
"resourceLogs": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeLogs": [
{
"scope": {
"name": "my_app"
},
"logRecords": [
{
"timeUnixNano": 1716804240222377000,
"observedTimeUnixNano": 1716804240222377000,
"body": {
"stringValue": "Produced 4"
},
"attributes": [
{
"key": "a",
"value": {
"intValue": 1
}
},
{
"key": "b",
"value": {
"intValue": 3
}
},
{
"key": "r",
"value": {
"intValue": 4
}
}
],
"severityNumber": 9,
"severityText": "info",
"traceId": "489571cc6b94414ceb4a32ccc2c7df09",
"spanId": "a93239061c12aa4c"
},
{
"timeUnixNano": 1716804240222675000,
"observedTimeUnixNano": 1716804240222675000,
"body": {
"stringValue": "Compute 1 + 3"
},
"attributes": [
{
"key": "a",
"value": {
"intValue": 1
}
},
{
"key": "b",
"value": {
"intValue": 3
}
},
{
"key": "evt_kind",
"value": {
"stringValue": "span"
}
},
{
"key": "span_name",
"value": {
"stringValue": "Compute {a} + {b}"
}
}
],
"severityNumber": 9,
"severityText": "info",
"traceId": "489571cc6b94414ceb4a32ccc2c7df09",
"spanId": "a93239061c12aa4c"
}
]
}
]
}
]
}

当未配置度量信号时,度量样本的事件诊断将表示为常规的 OTLP 日志记录。以下诊断

emit::emit!(
evt: emit::Metric::new(
emit::mdl!(),
"my_metric",
"count",
emit::Empty,
42,
emit::Empty,
)
);

将生成以下 HTTP+JSON 导出请求

http://localhost:4318/v1/logs
{
"resourceLogs": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeLogs": [
{
"scope": {
"name": "my_app"
},
"logRecords": [
{
"timeUnixNano": 1716876516012074000,
"observedTimeUnixNano": 1716876516012074000,
"body": {
"stringValue": "count of my_metric is 42"
},
"attributes": [
{
"key": "evt_kind",
"value": {
"stringValue": "metric"
}
},
{
"key": "metric_agg",
"value": {
"stringValue": "count"
}
},
{
"key": "metric_name",
"value": {
"stringValue": "my_metric"
}
},
{
"key": "metric_value",
"value": {
"intValue": 42
}
}
],
"severityNumber": 9,
"severityText": "info"
}
]
}
]
}
]
}

跟踪

当配置了跟踪信号时,只要满足以下条件,`emit::Event` 可以表示为 OTLP span:

如果任何条件未满足,事件将表示为 OTLP 日志记录。如果未配置日志信号,则将其丢弃。

gRPC+Protobuf 的最小日志配置

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
})
.traces(emit_otlp::traces_grpc_proto("http://localhost:4318"))
.logs(emit_otlp::logs_grpc_proto("http://localhost:4318"))
.spawn()
.unwrap()

以下诊断

#[emit::span("Compute {a} + {b}")]
fn add(a: i32, b: i32) -> i32 {
let r = a + b;

emit::info!("Produced {r}", r);

r
}

add(1, 3);

将生成以下 HTTP+JSON 导出请求

http://localhost:4318/v1/traces
{
"resourceSpans": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeSpans": [
{
"scope": {
"name": "my_app"
},
"spans": [
{
"name": "Compute {a} + {b}",
"kind": 0,
"startTimeUnixNano": 1716888416629816000,
"endTimeUnixNano": 1716888416630814000,
"attributes": [
{
"key": "a",
"value": {
"intValue": 1
}
},
{
"key": "b",
"value": {
"intValue": 3
}
}
],
"traceId": "0a85ccaf666e11aaca6bd5d469e2850d",
"spanId": "2b9caa35eaefed3a"
}
]
}
]
}
]
}
http://localhost:4318/v1/logs
{
"resourceLogs": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeLogs": [
{
"scope": {
"name": "my_app"
},
"logRecords": [
{
"timeUnixNano": 1716888416630507000,
"observedTimeUnixNano": 1716888416630507000,
"body": {
"stringValue": "Produced 4"
},
"attributes": [
{
"key": "a",
"value": {
"intValue": 1
}
},
{
"key": "b",
"value": {
"intValue": 3
}
},
{
"key": "r",
"value": {
"intValue": 4
}
}
],
"severityNumber": 9,
"severityText": "info",
"traceId": "0a85ccaf666e11aaca6bd5d469e2850d",
"spanId": "2b9caa35eaefed3a"
}
]
}
]
}
]
}

如果设置了 emit::well_known::KEY_ERR 属性,则生成的 OTLP span 将携带语义异常事件

#[emit::span(guard: span, "Compute {a} + {b}")]
fn add(a: i32, b: i32) -> i32 {
let r = a + b;

if r == 4 {
span.complete_with(|evt| {
emit::error!(
evt,
"Compute {a} + {b} failed",
a,
b,
r,
err: "Invalid result",
);
});
}

r
}

add(1, 3);
http://localhost:4318/v1/traces
{
"resourceSpans": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeSpans": [
{
"scope": {
"name": "my_app"
},
"spans": [
{
"name": "Compute {a} + {b}",
"kind": 0,
"startTimeUnixNano": 1716936430882852000,
"endTimeUnixNano": 1716936430883250000,
"attributes": [
{
"key": "a",
"value": {
"intValue": 1
}
},
{
"key": "b",
"value": {
"intValue": 3
}
},
{
"key": "r",
"value": {
"intValue": 4
}
}
],
"traceId": "6499bc190add060dad8822600ba65226",
"spanId": "b72c5152c32cc432",
"events": [
{
"name": "exception",
"timeUnixNano": 1716936430883250000,
"attributes": [
{
"key": "exception.message",
"value": {
"stringValue": "Invalid result"
}
}
]
}
],
"status": {
"message": "Invalid result",
"code": 2
}
}
]
}
]
}
]
}

指标

当配置了指标信号时,只要满足以下条件,`emit::Event` 可以表示为 OTLP 指标:

如果任何条件未满足,事件将表示为 OTLP 日志记录。如果未配置日志信号,则将其丢弃。

gRPC+Protobuf 的最小日志配置

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
})
.metrics(emit_otlp::metrics_grpc_proto("http://localhost:4318"))
.logs(emit_otlp::logs_grpc_proto("http://localhost:4318"))
.spawn()
.unwrap()

如果度量聚合是 "count",则生成的 OTLP 度量是一个单调的总和

emit::emit!(
evt: emit::Metric::new(
emit::mdl!(),
"my_metric",
"count",
emit::Empty,
42,
emit::props! {
a: true
},
)
);
http://localhost:4318/v1/metrics
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeMetrics": [
{
"scope": {
"name": "my_app"
},
"metrics": [
{
"name": "my_metric",
"unit": null,
"sum": {
"dataPoints": [
{
"attributes": [
{
"key": "a",
"value": {
"boolValue": true
}
}
],
"startTimeUnixNano": 1716889540249854000,
"timeUnixNano": 1716889540249854000,
"value": 42
}
],
"aggregationTemporality": 2,
"isMonotonic": true
}
}
]
}
]
}
]
}

如果度量聚合是 "sum",则生成的 OTLP 度量是一个非单调的总和

emit::emit!(
evt: emit::Metric::new(
emit::mdl!(),
"my_metric",
"sum",
emit::Empty,
-8,
emit::props! {
a: true
},
)
);
http://localhost:4318/v1/metrics
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeMetrics": [
{
"scope": {
"name": "my_app"
},
"metrics": [
{
"name": "my_metric",
"unit": null,
"sum": {
"dataPoints": [
{
"attributes": [
{
"key": "a",
"value": {
"boolValue": true
}
}
],
"startTimeUnixNano": 1716889891391075000,
"timeUnixNano": 1716889891391075000,
"value": -8
}
],
"aggregationTemporality": 2,
"isMonotonic": false
}
}
]
}
]
}
]
}

任何其他聚合都将表示为 OTLP 仪表

emit::emit!(
evt: emit::Metric::new(
emit::mdl!(),
"my_metric",
"last",
emit::Empty,
615,
emit::props! {
a: true
},
)
);
http://localhost:4318/v1/metrics
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeMetrics": [
{
"scope": {
"name": "my_app"
},
"metrics": [
{
"name": "my_metric",
"unit": null,
"gauge": {
"dataPoints": [
{
"attributes": [
{
"key": "a",
"value": {
"boolValue": true
}
}
],
"startTimeUnixNano": 1716890230856380000,
"timeUnixNano": 1716890230856380000,
"value": 615
}
]
}
}
]
}
]
}
]
}

如果度量聚合是 "count""sum",并且值是一个序列,那么每个值都将被求和以产生一个数据点

let start = emit::Timestamp::from_unix(std::time::Duration::from_secs(1716890420));
let end = emit::Timestamp::from_unix(std::time::Duration::from_secs(1716890425));

emit::emit!(
evt: emit::Metric::new(
emit::mdl!(),
"my_metric",
"count",
start..end,
&[
1.0,
1.0,
1.0,
1.0,
1.0,
],
emit::props! {
a: true
},
)
);
http://localhost:4318/v1/metrics
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeMetrics": [
{
"scope": {
"name": "my_app"
},
"metrics": [
{
"name": "my_metric",
"unit": null,
"sum": {
"dataPoints": [
{
"attributes": [
{
"key": "a",
"value": {
"boolValue": true
}
}
],
"startTimeUnixNano": 1716890420000000000,
"timeUnixNano": 1716890425000000000,
"value": 5
}
],
"aggregationTemporality": 1,
"isMonotonic": true
}
}
]
}
]
}
]
}

限制

这个库不是 OpenTelemetry SDK 的替代品。它专门针对向 OTLP 兼容服务发射诊断事件。它有一些故意的限制

  • 无传播。 这是由应用程序负责管理的。
  • 无直方图度量。 与 OpenTelemetry 相比,`emit` 的度量数据模型很简单,因此不支持直方图或指数直方图。
  • 无跨度事件。 仅支持传统的异常事件。独立的日志事件不会转换为跨度事件。它们通过日志端点发送。
  • 无 tracestate。 `emit` 的跨度数据模型不包括 W3C tracestate。

故障排除

如果您在 OTLP 接收器中看不到诊断出现,您可以通过配置 `emit` 的内部日志来排除 emit_otlp 中的配置问题,并从它那里收集指标

use emit::metric::Source;

fn main() {
// 1. Initialize the internal logger
//    Diagnostics produced by `emit_otlp` itself will go here
let internal = emit::setup()
.emit_to(emit_term::stdout())
.init_internal();

let mut reporter = emit::metric::Reporter::new();

let rt = emit::setup()
.emit_to({
let otlp = emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: emit::pkg!(),
})
.logs(emit_otlp::logs_grpc_proto("http://localhost:4319"))
.traces(emit_otlp::traces_grpc_proto("http://localhost:4319"))
.metrics(emit_otlp::metrics_grpc_proto("http://localhost:4319"))
.spawn()
.unwrap();

// 2. Add `emit_otlp`'s metrics to a reporter so we can see what it's up to
//    You can do this independently of the internal emitter
reporter.add_source(otlp.metric_source());

otlp
})
.init();

// Your app code goes here

rt.blocking_flush(std::time::Duration::from_secs(30));

// 3. Report metrics after attempting to flush
//    You could also do this periodically as your application runs
reporter.emit_metrics(&internal.emitter());
}

诊断包括批次的发射以及观察到的任何失败。

依赖关系

~8–19MB
~269K SLoC