#queue #coroutine #hook #open-coroutine #work-stealing #generic

work-steal-queue

基于 st3 和 crossbeam-deque 实现的并发工作窃取队列

3 个版本

0.1.2 2023 年 2 月 12 日
0.1.1 2023 年 2 月 7 日
0.1.0 2023 年 2 月 6 日

#1128并发

Download history 29/week @ 2024-03-28 15/week @ 2024-04-04 2/week @ 2024-04-18 26/week @ 2024-04-25

每月 155 次下载

LGPL-3.0 OR Apache-2.0

17KB
319

open-coroutine

open-coroutine 是一个简单、高效且通用的堆栈式协程库。

我有故事,你有酒吗?

状态

仍在开发中,请勿在生产环境中使用此库!

如何使用此库?

步骤1:在 Cargo.toml 中添加依赖

[dependencies]
# check https://crates.io/crates/open-coroutine
open-coroutine = "x.y.z"

步骤2:添加宏

#[open_coroutine::main]
fn main() {
    //......
}

步骤3:享受 open-coroutine 带来的性能提升!

示例

惊人的抢占式调度

注意:不支持 Windows

#[open_coroutine::main]
fn main() -> std::io::Result<()> {
    cfg_if::cfg_if! {
        if #[cfg(all(unix, feature = "preemptive-schedule"))] {
            use open_coroutine_core::scheduler::Scheduler;
            use std::sync::{Arc, Condvar, Mutex};
            use std::time::Duration;

            static mut TEST_FLAG1: bool = true;
            static mut TEST_FLAG2: bool = true;
            let pair = Arc::new((Mutex::new(true), Condvar::new()));
            let pair2 = Arc::clone(&pair);
            let handler = std::thread::Builder::new()
                .name("preemptive".to_string())
                .spawn(move || {
                    let scheduler = Scheduler::new();
                    _ = scheduler.submit(
                        |_, _| {
                            println!("coroutine1 launched");
                            while unsafe { TEST_FLAG1 } {
                                println!("loop1");
                                _ = unsafe { libc::usleep(10_000) };
                            }
                            println!("loop1 end");
                            1
                        },
                        None,
                    );
                    _ = scheduler.submit(
                        |_, _| {
                            println!("coroutine2 launched");
                            while unsafe { TEST_FLAG2 } {
                                println!("loop2");
                                _ = unsafe { libc::usleep(10_000) };
                            }
                            println!("loop2 end");
                            unsafe { TEST_FLAG1 = false };
                            2
                        },
                        None,
                    );
                    _ = scheduler.submit(
                        |_, _| {
                            println!("coroutine3 launched");
                            unsafe { TEST_FLAG2 = false };
                            3
                        },
                        None,
                    );
                    scheduler.try_schedule();

                    let (lock, cvar) = &*pair2;
                    let mut pending = lock.lock().unwrap();
                    *pending = false;
                    // notify the condvar that the value has changed.
                    cvar.notify_one();
                })
                .expect("failed to spawn thread");

            // wait for the thread to start up
            let (lock, cvar) = &*pair;
            let result = cvar
                .wait_timeout_while(
                    lock.lock().unwrap(),
                    Duration::from_millis(3000),
                    |&mut pending| pending,
                )
                .unwrap();
            if result.1.timed_out() {
                Err(std::io::Error::new(
                    std::io::ErrorKind::Other,
                    "preemptive schedule failed",
                ))
            } else {
                unsafe {
                    handler.join().unwrap();
                    assert!(!TEST_FLAG1);
                }
                Ok(())
            }
        } else {
            println!("please enable preemptive-schedule feature");
            Ok(())
        }
    }
}

输出

coroutine1 launched
loop1
coroutine2 launched
loop2
coroutine3 launched
loop1
loop2 end
loop1 end

任意使用阻塞系统调用

#[open_coroutine::main]
fn main() {
    std::thread::sleep(std::time::Duration::from_secs(1));
}

输出

nanosleep hooked

功能

待办事项

  • 支持可伸缩的堆栈

  • 支持 AF_XDP 套接字兼容性

  • hook 可能被信号中断的其他系统调用

    系统调用
    • open
    • chdir
    • chroot
    • mkdir
    • rmdir
    • link
    • unlink
    • readlink
    • stat
    • dup
    • dup2
    • umask
    • mount
    • umount
    • mknod
    • fcntl
    • truncate
    • ftruncate
    • setjmp
    • longjmp
    • chown
    • lchown
    • fchown
    • chmod
    • fchmod
    • fchmodat
    • semop
    • ppoll
    • pselect
    • io_getevents
    • semop
    • semtimedop
    • msgrcv
    • msgsnd
  • 支持 #[open_coroutine::join] 宏以等待协程

0.5.x

  • 重构系统调用状态,区分状态和内部状态

0.4.x

  • 支持并与本地文件 IO 的 io_uring 兼容
  • 优雅的关闭
  • 使用 log 代替 println
  • 增强 #[open_coroutine::main]
  • 重构 hook 实现,现在无需发布动态库
  • Monitor 遵循 thread-per-core 指南
  • EventLoop 遵循 thread-per-core 指南

0.3.x

  • 支持 genawaiter 作为低级无堆栈协程(由于 hook 无法支持它)
  • 使用 corosensei 作为低级协程
  • 支持回溯
  • 支持 #[open_coroutine::co]
  • 重构 WorkStealQueue

0.2.x

  • 使用正确的 epoll_event 结构体

  • 使用 rayon 进行并行计算

  • 支持 #[open_coroutine::main]

  • 挂钩几乎所有 read 系统调用

    读取系统调用
    • 接收
    • 读取向量
    • 预读取
    • 预读取向量
    • 接收从
    • 接收消息
  • 挂钩几乎所有 write 系统调用

    写入系统调用
    • 发送
    • 写入向量
    • 发送到
    • 发送消息
    • 预写入
    • 预写入向量
  • 挂钩其他系统调用

    其他系统调用
    • 睡眠
    • 微睡眠
    • 纳秒睡眠
    • 连接
    • 监听
    • 接受
    • 关闭
    • 轮询
    • 选择

0.1.x

  • 基本挂起/恢复支持
  • 使用 jemalloc 作为内存池
  • 支持高级协程抽象
  • 支持抢占式调度
  • 支持工作窃取
  • 支持睡眠系统调用挂钩

依赖关系

~0.4–26MB
~334K SLoC