2 个版本

0.1.1 2024 年 2 月 5 日
0.1.0 2023 年 11 月 23 日

#325网页编程

每月 26 次下载
用于 2 个 crates(通过 atm0s-media-server-transp…

MIT/Apache

1.5MB
30K SLoC

包含(ZIP 文件,51KB) logo/str0m logo.graffle

str0m

str0m logo

Rust 中的同步无 I/O WebRTC 实现。

这是一个 Sans I/O 实现,这意味着 Rtc 实例本身不执行任何网络通信。此外,它没有内部线程或异步任务。所有操作都是从公共 API 的调用中同步发生的。

这故意不是一个标准的 RTCPeerConnection API,因为这并不非常适合 Rust。更多详细信息请参阅下文。

加入我们

我们在 Zulip 上讨论 str0m 的事情。使用此 邀请链接 加入我们。或者匿名浏览讨论 str0m.zulipchat.com

silly clip showing video playing

用法

chat 示例展示了如何连接多个浏览器并充当 SFU(选择转发单元)。示例将所有流量多路复用到服务器的一个 UDP 套接字,并使用两个线程(一个用于网页服务器,另一个用于 SFU 循环)。

TLS

为了浏览器执行 WebRTC,所有流量都必须在 TLS 下。该项目附带了一个自签名的证书,用于示例。证书用于主机名 str0m.test,因为 TLD .test 永远不会解析为真实的 DNS 名称。

cargo run --example chat

日志应提示您连接浏览器到 https://10.0.0.103:3000 – 这很可能会引起安全警告,您必须让浏览器接受。

示例 http-post 大概说明了如何从浏览器客户端接收媒体数据。该示例是单线程的,比聊天示例简单一些,是理解API的一个良好起点。

cargo run --example http-post

被动

对于被动连接,即媒体和初始OFFER由远程对端发起,我们需要以下步骤来建立连接。

// Instantiate a new Rtc instance.
let mut rtc = Rtc::new();

//  Add some ICE candidate such as a locally bound UDP port.
let addr = "1.2.3.4:5000".parse().unwrap();
let candidate = Candidate::host(addr, "udp").unwrap();
rtc.add_local_candidate(candidate);

// Accept an incoming offer from the remote peer
// and get the corresponding answer.
let offer = todo!();
let answer = rtc.sdp_api().accept_offer(offer).unwrap();

// Forward the answer to the remote peer.

// Go to _run loop_

主动

主动连接意味着我们正在发起初始OFFER并等待远程ANSWER以开始连接。

#
// Instantiate a new Rtc instance.
let mut rtc = Rtc::new();

// Add some ICE candidate such as a locally bound UDP port.
let addr = "1.2.3.4:5000".parse().unwrap();
let candidate = Candidate::host(addr, "udp").unwrap();
rtc.add_local_candidate(candidate);

// Create a `SdpApi`. The change lets us make multiple changes
// before sending the offer.
let mut change = rtc.sdp_api();

// Do some change. A valid OFFER needs at least one "m-line" (media).
let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None);

// Get the offer.
let (offer, pending) = change.apply().unwrap();

// Forward the offer to the remote peer and await the answer.
// How to transfer this is outside the scope for this library.
let answer = todo!();

// Apply answer.
rtc.sdp_api().accept_answer(pending, answer).unwrap();

// Go to _run loop_

运行循环

驱动 Rtc 状态向前推进的是运行循环,无论同步或异步,其外观如下。

#
// Buffer for reading incoming UDP packets.
let mut buf = vec![0; 2000];

// A UdpSocket we obtained _somehow_.
let socket: UdpSocket = todo!();

loop {
    // Poll output until we get a timeout. The timeout means we
    // are either awaiting UDP socket input or the timeout to happen.
    let timeout = match rtc.poll_output().unwrap() {
        // Stop polling when we get the timeout.
        Output::Timeout(v) => v,

        // Transmit this data to the remote peer. Typically via
        // a UDP socket. The destination IP comes from the ICE
        // agent. It might change during the session.
        Output::Transmit(v) => {
            socket.send_to(&v.contents, v.destination).unwrap();
            continue;
        }

        // Events are mainly incoming media data from the remote
        // peer, but also data channel data and statistics.
        Output::Event(v) => {

            // Abort if we disconnect.
            if v == Event::IceConnectionStateChange(IceConnectionState::Disconnected) {
                return;
            }

            // TODO: handle more cases of v here, such as incoming media data.

            continue;
        }
    };

    // Duration until timeout.
    let duration = timeout - Instant::now();

    // socket.set_read_timeout(Some(0)) is not ok
    if duration.is_zero() {
        // Drive time forwards in rtc straight away.
        rtc.handle_input(Input::Timeout(Instant::now())).unwrap();
        continue;
    }

    socket.set_read_timeout(Some(duration)).unwrap();

    // Scale up buffer to receive an entire UDP packet.
    buf.resize(2000, 0);

    // Try to receive. Because we have a timeout on the socket,
    // we will either receive a packet, or timeout.
    // This is where having an async loop shines. We can await multiple things to
    // happen such as outgoing media data, the timeout and incoming network traffic.
    // When using async there is no need to set timeout on the socket.
    let input = match socket.recv_from(&mut buf) {
        Ok((n, source)) => {
            // UDP data received.
            buf.truncate(n);
            Input::Receive(
                Instant::now(),
                Receive {
                    proto: Protocol::Udp,
                    source,
                    destination: socket.local_addr().unwrap(),
                    contents: buf.as_slice().try_into().unwrap(),
                },
            )
        }

        Err(e) => match e.kind() {
            // Expected error for set_read_timeout().
            // One for windows, one for the rest.
            ErrorKind::WouldBlock
                | ErrorKind::TimedOut => Input::Timeout(Instant::now()),

            e => {
                eprintln!("Error: {:?}", e);
                return; // abort
            }
        },
    };

    // Input is either a Timeout or Receive of data. Both drive the state forward.
    rtc.handle_input(input).unwrap();
}

发送媒体数据

在创建媒体时,我们可以决定支持哪些编解码器,并且它们与远程端进行协商。每个编解码器对应一个“负载类型”(PT)。要发送媒体数据,我们需要确定在发送时使用哪个PT。

#
// Obtain mid from Event::MediaAdded
let mid: Mid = todo!();

// Create a media writer for the mid.
let writer = rtc.writer(mid).unwrap();

// Get the payload type (pt) for the wanted codec.
let pt = writer.payload_params().nth(0).unwrap().pt();

// Write the data
let wallclock = todo!();   // Absolute time of the data
let media_time = todo!();  // Media time, in RTP time
let data: &[u8] = todo!(); // Actual data
writer.write(pt, wallclock, media_time, data).unwrap();

媒体时间、系统时间和本地时间

str0m 有三个主要的时间概念:“now”、“媒体时间”和系统时间。

Now

str0m 中的一些调用,例如 Rtc::handle_input,接受一个 now 参数,该参数是 std::time::Instant。这些调用“推动内部状态中的时间前进”。这用于决定何时向远程对端发送各种反馈报告(RTCP)、带宽估计(BWE)和统计信息等。

str0m 没有内部时钟调用。也就是说,str0m 从不调用 Instant::now()。所有时间都是外部输入。这意味着可以构造测试用例来驱动 Rtc 实例,其速度比实时更快(参见 集成测试)。

媒体时间

每个RTP头都有一个32位数字,str0m 称之为“媒体时间”。媒体时间依赖于某些时间基,但是str0m中的所有编解码器都使用90_000Hz进行视频和48_000Hz进行音频。

对于视频,MediaTime 类型是 <timestamp>/90_000,str0m 将RTP头中的32位数字扩展到64位,并考虑了“回滚”。64位是一个非常大的数字,用户不需要担心回滚。

Wallclock

使用 wallclock,str0m 指的是媒体样本在源头产生的时刻。例如,如果我们对着麦克风说话,wallclock就是声音被采样的NTP时间。

由于不是每个设备都与NTP同步,所以我们无法知道远程对端媒体的确切wallclock。每个发送器都会定期产生一个发送者报告(SR),其中包含对端对其wallclock的看法,但是这个数字可能与“真实”的NTP时间相差很大。

此外,并非所有远程设备都会有与本地时间完全一致的时间流逝的线性观念。远程对端的一分钟可能不等于本地的精确一分钟。

这些时间戳在处理来自多个对端的同步音频时变得很重要。

在编写媒体时,我们需要向str0m提供估计的wallclock。最简单的策略是只信任本地时间并使用传入UDP数据包的到达时间。另一个简单的策略是在第一个UDP数据包中锁定某个时间T,然后使用 MediaTime 对每个wallclock进行偏移,例如,对于视频,我们可能有 T + <media time>/90_000

一个值得生产的SFU可能需要一个更复杂的策略,权衡所有可能的时间来源,以获得数据包远程wallclock的准确估计。

项目状态

Str0m最初由Lookback的Martin Algesten开发。Lookback。我们使用str0m用于特定用例:str0m作为服务器SFU(与对等-2-对等相反)。这意味着我们对适用于我们用例的部件进行了大量测试和开发。Str0m旨在成为一个通用的WebRTC库,这意味着它也应该适用于对等-2-对等(主要考虑ICE代理),但这些领域没有得到足够的关注和测试。

性能非常好,已经有一些工作来发现和优化瓶颈。当然,这样的努力永远不会结束,回报也会减少。虽然没有明显的性能瓶颈,但总是欢迎进行更多的工作——无论是算法上还是在热点路径中的分配/克隆等。

设计

Rtc实例的输出可以分为三种。

  1. 事件(例如接收媒体或数据通道数据)。
  2. 网络输出。要发送的数据,通常来自UDP套接字。
  3. 超时。指示实例何时期望下一个时间输入。

Rtc实例的输入是

  1. 用户操作(例如发送媒体或数据通道数据)。
  2. 网络输入。通常从UDP套接字读取。
  3. 超时。如上所述获得。

正确的使用方法可以在上面的运行循环或示例中看到。

Sans I/O是一种模式,其中我们将网络输入/输出以及时间流逝都转换为API的外部输入。这意味着str0m没有内部线程,只有一个由不同类型的输入驱动的巨大状态机。

样本或RTP级别?

Str0m默认为“样本级别”,将RTP视为内部细节。因此,用户将主要与

  1. Event::MediaData交互,以接收完整的“样本”(音频帧或视频帧)。
  2. Writer::write写入完整的样本。
  3. Writer::request_keyframe请求关键帧。

样本级别

所有编解码器,如h264、vp8、vp9和opus,都输出我们所说的“样本”。对于音频,样本具有非常特定的含义,但本项目在更广泛的意义上使用它,其中样本是视频或音频的编码数据的时间戳块,通常表示音频块,或视频的单帧

样本不适合直接在UDP(RTP)数据包中使用——其中一个原因是它们太大。因此,样本被编解码器特定的有效载荷器进一步分割成RTP数据包。

RTP级别

Str0m还提供了一个RTP级别的API。这将与其他许多RTP库类似,其中RTP数据包本身是API的用户界面(当构建SFU时,人们经常谈论“转发RTP数据包”,而与str0m一起,我们也可以“转发样本”)。

RTP模式

str0m有一个更底层的API,允许用户直接写入/接收RTP数据包。使用此API需要更深入地了解RTP和WebRTC。

要启用RTP模式

let rtc = Rtc::builder()
    // Enable RTP mode for this Rtc instance.
    // This disables `MediaEvent` and the `Writer::write` API.
    .set_rtp_mode(true)
    .build();

RTP模式为我们提供了新的API点。

  1. 每当接收到 RTP 数据包时,都会发出 Event::RtpPacket 事件。用于带宽估计的空数据包会被静默丢弃。
  2. 用于写入出站 RTP 数据包的 StreamTx::write_rtp
  3. 用于从远程请求关键帧的 StreamRx::request_keyframe

NIC 列表和 TURN(以及 STUN)

ICE RFC(RFC 8445)中提到了“收集 ICE 候选者”。这意味着检查本地网络接口,并可能在每个可用的接口上绑定 UDP 套接字。由于 str0m 是无 I/O 的,这部分不在 str0m 的功能范围内。用户如何通过配置或查找本地 NIC 来确定本地 IP 地址不是 str0m 所关心的。

TURN 是一种获取 IP 地址的方法,当直接连接失败时可以作为后备使用。我们将 TURN 视为与列举本地网络接口类似——它是一种获取套接字的方法。

所有发现的候选者,无论是本地(NIC)还是远程套接字(TURN),都会添加到 str0m,str0m 将执行 ICE 代理的任务,形成“候选者对”并确定最佳连接,而发送网络流量的实际任务则留给用户。

&mut self 的重要性

Rust 在我们可以避免使用锁并大量依赖 &mut 进行数据写入访问时表现出色。由于 str0m 没有内部线程,我们永远不需要处理共享数据。此外,库的内部结构组织方式使得我们不需要对同一实体有多个引用。在 str0m 中没有 RcMutexmpscArc(* *) 或其他锁。

这意味着所有输入到库中的数据都可以建模为 handle_something(&mut self, something)

( *) 好吧。如果你使用 Windows 并需要 openssl,那么我们只有一个 Arc

不是标准的 WebRTC "Peer Connection" API

该库故意远离在 JavaScript 和/或 webrtc-rs(或 Go 中的 Pion)中看到的“标准”WebRTC API。这样做有几个原因。

首先,在标准 API 中,事件是回调,这并不适合 Rust。回调需要某种形式的引用(所有权?)来控制事件被分发的实体。例如,如果我们想在 Rust 中使用 pc.addEventListener(x),则 x 必须完全由 pc 所有,或者有某种共享引用(如 Arc)。共享引用意味着共享数据,要获取可变共享数据,我们需要某种形式的锁。例如 Arc<Mutex<EventListener>> 或类似。

作为替代,我们可以将所有事件转换为 mpsc 通道,但如果没有异步处理,同时监听多个通道会很麻烦。

其次,在标准API中,像RTCPeerConnectionRTCRtpTransceiver这样的实体很容易被克隆,并且/或者具有长生命周期的引用。也就是说,pc.getTranscievers()返回的对象可以被调用者保留和拥有。这种模式对于垃圾回收或引用计数语言来说很好,但对于Rust来说则不是很好。

恐慌、错误和展开

Str0m遵循fail-fast原则。这意味着我们不会将状态错误掩盖起来,而是会导致恐慌。我们将错误和bug区分开来。

  • 错误是由于用户输入不正确或难以理解而产生的。
  • bug是内部不变性(假设)的破坏。

如果您查看str0m代码,会发现一些unwrap()(或expect())。这些将(应该)总是伴随着一个代码注释,解释为什么unwrap是安全的。这是一个内部不变性,是str0m负责维护的状态假设。

我们认为将每个unwrap()/expect()都改为unwrap_or_else()if let Some(x) = x { ... }等,是不正确的,因为这会将一个实际的问题(一个不正确的假设)掩盖起来。试图在错误的状态下继续运行,最多会导致行为破坏,最坏的情况下是安全风险!

恐慌是我们的朋友:恐慌意味着bug

还有:str0m 决不应在任何用户输入上恐慌。如果您遇到恐慌,请报告它!

捕获恐慌

恐慌应该非常罕见,否则我们作为项目就会有严重的问题。对于一个SFU来说,如果str0m遇到bug并将整个服务器拖垮,那可能不是理想的情况。

对于那些想要额外一层安全的人,我们建议查看catch_unwind来安全地丢弃一个有缺陷的Rtc实例。由于Rtc没有内部线程、锁或异步任务,因此丢弃实例永远不会造成锁定或其他在捕获恐慌时可能发生的问题。

常见问题解答

特性

以下是对libWebRTC和str0m特性之间简要比较的列表,以帮助您确定str0m是否适合您的项目。

特性 str0m libWebRTC
Peer Connection API
SDP
ICE
数据通道
发送/接收报告
传输范围CC
带宽估计
Simulcast
NACK
打包
固定解包缓冲区
自适应抖动缓冲区
视频/音频捕获
视频/音频编码
视频/音频解码
音频渲染
Turn
网络接口枚举

平台支持

str0m编译和测试的平台

平台 编译 测试
x86_64-pc-windows-msvc
x86_64-unknown-linux-gnu
x86_64-apple-darwin

如果您的平台未列出来,但由Rust支持,我们非常希望您尝试使用str0m并与我们分享您的体验。我们非常感谢您的反馈!

str0m支持IPv4、IPv6、UDP和TCP吗?

当然!str0m完全支持IPv4、IPv6、UDP和TCP协议。

我可以用str0m与任何Rust异步运行时一起使用吗?

绝对可以!str0m是完全同步的,确保它能与您选择的任何Rust异步运行时无缝集成。

我可以用str0m创建客户端吗?

当然可以!您可以使用str0m创建客户端。但是请注意,str0m不包括一些常见客户端功能,如媒体编码、解码和捕获。但这不会阻止您构建出色的应用程序!

我可以在媒体服务器中使用str0m吗?

是的!str0m作为服务器组件表现出色,支持RTP API和Sample API。您可以用Rust轻松构建您梦寐以求的录制服务器或SFU!

我可以将聊天示例部署到生产环境中吗?

虽然聊天示例展示了如何使用str0m的API,但它不是为生产使用或重负载设计的。编写一个功能齐全的SFU或MCU(多点控制单元)是一项重大任务,需要根据生产需求做出各种设计决策。

发现了一个错误?以下是如何与我们分享的方法

我们非常乐意听取您的意见!请提交一个问题,并考虑加入我们的Zulip社区进行进一步讨论。为了获得流畅的报告体验,请参考这个典型的错误报告:https://github.com/algesten/str0m/issues/382。我们感谢您为使str0m变得更好所做的贡献!

我对SDP过敏,你能帮我吗?

是的,使用直接API!


str0m的聊天运行在Zulip上,并由Zulip为开源项目提供赞助。

Zulip logo

许可证:MIT OR Apache-2.0

依赖项

~4.5–9MB
~196K SLoC