#async-io #non-blocking #async #net #future #no-std

no-std hirun

基于事件驱动和非阻塞 I/O 机制的异步编程并发框架

9 个版本

新版本 0.1.8 2024 年 8 月 12 日
0.1.7 2023 年 12 月 25 日
0.1.6 2023 年 11 月 20 日
0.1.5 2023 年 10 月 15 日
0.1.4 2023 年 8 月 25 日

#155 in 异步

Download history 1/week @ 2024-05-22 6/week @ 2024-07-03 107/week @ 2024-08-07

每月 107 次下载

MIT/Apache

215KB
6K SLoC

hirun

提供 Rust 异步并发框架,底层基于非阻塞的 IO 操作和事件驱动的机制来实现。

基于事件驱动和非阻塞 I/O 机制的 Rust 编程语言异步应用程序运行时。

在之前的工作中,我们对已有的 C/Rust 并发框架的并发性能进行了深入对比。C 版本的私有实现在所有场景下的测试数据都比 tokio 的好,但 Rust 版本在编码效率和难度上都优于 C 版本。此外,C 版本本身在嵌入式环境中使用,内存和磁盘资源非常有限,因此 Rust 并发框架支持 no_std 是非常有必要的。

我们在工作中对比了现有的 C/Rust 并发框架的并发性能。C 版本的私有实现在所有场景下的测试数据都比 Tokio 版本好,但 Rust 版本在编码效率和难度上都优于 C 版本。此外,C 版本本身在嵌入式环境中使用,内存和磁盘资源非常有限。因此,Rust 并发框架支持 no_std 是很有必要的。

版本更新说明

  1. 0.1.8: 与 hierr 0.2 版本兼容,与 hipool 保持一致。

后续计划

  1. 支持 Windows
  2. 支持 HTTP 代理
  3. 支持 Socket 代理

no_std

no_std 环境也需要一个异步并发框架,现在广泛使用的 tokio 等并不支持。本 crate 基于 Linux libc 的能力构建。因为内部使用了 Linux 的 eventfd/epoll,当前还仅支持 Linux。

同步异步混合编程

spawn 系列接口基本上与 tokio 的定义相同,只要在运行时创建之后,可以在同步和异步环境的任何时候调用它,没有调用上下文的约束。此外,其返回的 JoinHandle 提供了 join 接口,供同步环境中等待异步任务的结束。注意,在异步函数中,不能调用 join,否则会阻塞当前工作线程。

阻塞等待异步任务结束,通常用于业务层的异步任务的入口函数的调用。这类似于标准库 thread::scope 的使用方式。

use hirun::runtime::{Builder, block_on};

fn main() {
    Builder::new().build().unwrap();
    let val = block_on(async_main(100)).unwrap();
    println!("async_main return {val}");
}

async fn async_main(val: i32) -> i32 {
    val + 100
}

在没有统一的异步任务入口的情况下,只是在同步流程的某些环节利用异步并发机制提高并发度,那么可以利用 spawn 接口,在需要的时候调用 join 等待异步任务返回,这样异步任务和同步环境可以并发执行。这类似于标准库 thread::spawn 的使用方式。

use hirun::runtime::{Builder, spawn};

fn main() {
    Builder::new().build().unwrap();
    let val = spawn(foo(100)).join().unwrap();
    println!("async foo return: {val}");
}

async fn foo(val: i32) -> i32 {
    val + 100
}

支持运行时多实例

包含阻塞操作的异步任务最好在单独的运行时实例中调度,以避免对其他异步任务的影响。可以在 spawn_with 接口中按需指定运行时实例。

use hirun::runtime::{Builder, spawn, spawn_with, Attr};

const BLOCK_RUNTIME_ID: u8 = 1;

fn main() {
    Builder::new().build().unwrap();
    Builder::new().id(BLOCK_RUNTIME_ID).build().unwrap();
    let h1 = spawn(foo(100));
    let h2 = spawn_with(bar(200), Attr::new().id(BLOCK_RUNTIME_ID));
    println!("default runtime: foo return {}", h1.join().unwrap());
    println!("runtime_1: bar return {}", h2.join().unwrap());
}

async fn foo(val: i32) -> i32 {
    val + 100
}

async fn bar(val: i32) -> i32 {
    val + 1000
}

支持基于哈希的调度策略

在业务上需要约束某些任务必须在一个线程或不同线程中调度时,业务层可以为任务指定哈希值以实现此功能。使用此功能时,应了解运行时的工作线程数量,才能利用哈希值达到控制目标。

以下代码强制异步任务一定在同一个工作线程运行。

use hirun::runtime::{Builder, spawn_with, Attr};
use libc::pthread_self;

fn main() {
    Builder::new().nth(2).build().unwrap();
    let h1 = spawn_with(foo(200), Attr::new().hash(1));
    let h2 = spawn_with(bar(200), Attr::new().hash(1));
    println!("foo return {}", h1.join().unwrap());
    println!("bar return {}", h2.join().unwrap());
}

async fn foo(val: i32) -> i32 {
    println!("pthread_id: {}", unsafe { pthread_self() });
    val + 100
}

async fn bar(val: i32) -> i32 {
    println!("pthread_id: {}", unsafe { pthread_self() });
    val + 1000
}

JoinSet

批量分发异步任务后,可能需要等待所有任务执行完毕后返回,也可能等待最先完成的任务返回,可以利用 JoinSet 实现。

以下等待所有任务完成后再返回。

use hirun::runtime::{Builder, block_on, spawn, JoinSet};

fn main() {
    Builder::new().nth(2).build().unwrap();
    block_on(async {
        let mut set = JoinSet::new();
        let _ = set.spawn(foo(100));
        let _ = set.spawn(bar(200));
        for (seqno, val) in set.wait_all().await {
            println!("{seqno}, return {}", val.unwrap());
        }
    });
}

async fn foo(val: i32) -> i32 {
    val + 100
}

async fn bar(val: i32) -> i32 {
    val + 1000
}

也可以基于任务完成的先后顺序进行处理。

use hirun::runtime::{Builder, block_on, spawn, JoinSet, sleep};
use core::time::Duration;

fn main() {
    Builder::new().nth(2).build().unwrap();
    block_on(async {
        let mut set = JoinSet::new();
        let _ = set.spawn(foo(100));
        let _ = set.spawn(bar(200));
        while let Some((seqno, val)) = set.wait_any().await {
            println!("{seqno}, return {}", val.unwrap());
        }
    });
}

async fn foo(val: i32) -> i32 {
    sleep(Duration::new(1, 0)).await;
    val + 100
}

async fn bar(val: i32) -> i32 {
    val + 1000
}

Fd 和 AioFd

需要支持自定义的 IPC 通信机制,这些机制都是基于 Linux 文件系统实现的,使用方式相同:创建文件句柄,利用 poll 机制获取异步 IO 事件,调用 read/write 读写数据。Linux 新的 io_uring 也可基于 poll 机制获取提交任务的完成情况。

本 crate 未提供 TcpListener/TcpStream 等高级封装,仅封装 fd,即 Fd,同时提供 AioFd,支持异步读写和获取异步 IO 事件通知的功能,具有最大的通用性。只封装了最基础的功能,更多的功能需要业务层基于 libc crate 的 api 来完成。

use hirun::runtime::{Builder, block_on};
use hirun::net::{Fd, AioFd, SocketAddr};
use hirun::event::POLLIN;

fn main() {
    Builder::new().nth(2).build().unwrap();
    let _ = block_on(async {
        let server_addr = SocketAddr::inet("127.0.0.1", 2000).unwrap();

        let fd = Fd::tcp_client(libc::AF_INET, None).unwrap();
        let mut aiofd = AioFd::new(&fd);

        aiofd.connect(&server_addr).await.unwrap();
        aiofd.wait(POLLIN).await.unwrap();

        let mut buf = [0_u8; 100];
        if let Ok(size) = aiofd.try_read(&mut buf) {
            println!("recv {size} bytes from server");
        }
    }).unwrap();
}

也可以直接使用异步读取接口

use hirun::runtime::{Builder, block_on};
use hirun::net::{Fd, AioFd, SocketAddr};

fn main() {
    Builder::new().nth(2).build().unwrap();
    let _ = block_on(async {
        let server_addr = SocketAddr::inet("127.0.0.1", 2000).unwrap();

        let fd = Fd::tcp_client(libc::AF_INET, None).unwrap();
        let mut aiofd = AioFd::new(&fd);

        aiofd.connect(&server_addr).await.unwrap();

        let mut buf = [0_u8; 100];
        if let Ok(size) = aiofd.read(&mut buf).await {
            println!("recv {size} bytes from server");
        }
    }).unwrap();
}

异步函数的参数一定是异步任务的内置数据成员,而并发框架创建的异步任务都会占用堆内存空间。如果大量任务使用异步读取接口,因为缓冲器在堆上分配,可能导致占用的堆内存空间比较大。如果内存资源有限,推荐使用 async wait + try_read 这种组合使用方式。

#[future]

现有 Rust 自动判断异步函数是否支持 Send 的规则存在一定的局限性。一个异步函数内部仅仅是直接调用异步子函数,不会通过 spawn 类接口创建并发的异步任务,那么这个异步函数内部实际上是可以安全地使用 Rc 等类型。

以下代码如果async fn foo不使用#[future]修饰,则会报告因为Future不支持Send无法通过编译.

注意: #[future]生成unsafe代码,将异步函数的函数体转换为支持Send,如果是异步函数的入参不支持Send,则这类异步函数只能使用spawn_local在当前线程调度.

use hirun::runtime::{Builder, spawn};
use hirun::future;
use std::rc::Rc;

fn main() {
    Builder::new().nth(2).build().unwrap();
    let h = spawn(foo(100));
    println!("async foo return {}", h.join().unwrap());
}

#[future]
async fn foo(val: i32) -> i32 {
    let rc = Rc::new(100);
    val + bar(*rc).await
}

async fn bar(val: i32) -> i32 {
    val + 1000
}

性能

examples/httpserver和examples/tokioserver是本crate和tokio实现的完全相同的一个测试用http server,可以利用httperf测试其性能.

启动httpserver, 服务监听端口2000

# export http_body_size=102400
# cargo run --release --example httpserver

启动tokioserver, 服务监听端口2001

# export http_body_size=102400
# cargo run --release --example tokioserver

启动httperf测试, 具体测试参数参见httperf的帮助说明.

# httperf --num-calls 10 --num-conns 1000 --port 2000 # 测试httpserver的能力
# httperf --num-calls 10 --num-conns 1000 --port 2001 # 测试httpserver的能力

目前已有的数据看,不弱于tokio,不少场景下(变化因素: http_body_size, --num-calls, --num-conns)比tokio更优.

在用户的使用环境上进行对比验证获取的数据最真实.

依赖项

~2MB
~44K SLoC