#queue #open-coroutine #work-steal

open-coroutine-queue

使用 st3 和 crossbeam-deque 实现的并发工作窃取队列

3 个版本 (破坏性更新)

0.5.0 2024年1月21日
0.4.0 2023年7月23日
0.1.2 2023年7月23日

#589并发


3 个 Crates 中使用 (通过 open-coroutine-core)

LGPL-3.0 OR Apache-2.0

27KB
348

open-coroutine

《open-coroutine》是一个简单、高效且通用的堆栈协程库。

我有故事,你有酒吗?

状态

仍在开发中,请勿在 do not 生产环境中使用此库 !

如何使用此库 ?

步骤1:将依赖项添加到您的 Cargo.toml

[dependencies]
# check https://crates.io/crates/open-coroutine
open-coroutine = "x.y.z"

步骤2:添加宏

#[open_coroutine::main]
fn main() {
    //......
}

步骤3:享受 open-coroutine 带来的性能提升!

示例

惊人的抢占式调度

注意:不支持 windows

#[open_coroutine::main]
fn main() -> std::io::Result<()> {
    cfg_if::cfg_if! {
        if #[cfg(all(unix, feature = "preemptive-schedule"))] {
            use open_coroutine_core::scheduler::Scheduler;
            use std::sync::{Arc, Condvar, Mutex};
            use std::time::Duration;

            static mut TEST_FLAG1: bool = true;
            static mut TEST_FLAG2: bool = true;
            let pair = Arc::new((Mutex::new(true), Condvar::new()));
            let pair2 = Arc::clone(&pair);
            let handler = std::thread::Builder::new()
                .name("preemptive".to_string())
                .spawn(move || {
                    let scheduler = Scheduler::new();
                    _ = scheduler.submit(
                        |_, _| {
                            println!("coroutine1 launched");
                            while unsafe { TEST_FLAG1 } {
                                println!("loop1");
                                _ = unsafe { libc::usleep(10_000) };
                            }
                            println!("loop1 end");
                            1
                        },
                        None,
                    );
                    _ = scheduler.submit(
                        |_, _| {
                            println!("coroutine2 launched");
                            while unsafe { TEST_FLAG2 } {
                                println!("loop2");
                                _ = unsafe { libc::usleep(10_000) };
                            }
                            println!("loop2 end");
                            unsafe { TEST_FLAG1 = false };
                            2
                        },
                        None,
                    );
                    _ = scheduler.submit(
                        |_, _| {
                            println!("coroutine3 launched");
                            unsafe { TEST_FLAG2 = false };
                            3
                        },
                        None,
                    );
                    scheduler.try_schedule();

                    let (lock, cvar) = &*pair2;
                    let mut pending = lock.lock().unwrap();
                    *pending = false;
                    // notify the condvar that the value has changed.
                    cvar.notify_one();
                })
                .expect("failed to spawn thread");

            // wait for the thread to start up
            let (lock, cvar) = &*pair;
            let result = cvar
                .wait_timeout_while(
                    lock.lock().unwrap(),
                    Duration::from_millis(3000),
                    |&mut pending| pending,
                )
                .unwrap();
            if result.1.timed_out() {
                Err(std::io::Error::new(
                    std::io::ErrorKind::Other,
                    "preemptive schedule failed",
                ))
            } else {
                unsafe {
                    handler.join().unwrap();
                    assert!(!TEST_FLAG1);
                }
                Ok(())
            }
        } else {
            println!("please enable preemptive-schedule feature");
            Ok(())
        }
    }
}

输出

coroutine1 launched
loop1
coroutine2 launched
loop2
coroutine3 launched
loop1
loop2 end
loop1 end

任意使用阻塞系统调用

#[open_coroutine::main]
fn main() {
    std::thread::sleep(std::time::Duration::from_secs(1));
}

输出

nanosleep hooked

特性

待办事项

  • 支持可伸缩堆栈

  • 支持 AF_XDP 套接字的兼容性

  • 通过信号可能中断的其他系统调用钩子

    系统调用
    • open
    • chdir
    • chroot
    • mkdir
    • rmdir
    • link
    • unlink
    • readlink
    • stat
    • dup
    • dup2
    • umask
    • mount
    • umount
    • mknod
    • fcntl
    • truncate
    • ftruncate
    • setjmp
    • longjmp
    • chown
    • lchown
    • fchown
    • chmod
    • fchmod
    • fchmodat
    • semop
    • ppoll
    • pselect
    • io_getevents
    • semop
    • semtimedop
    • msgrcv
    • msgsnd
  • 支持 #[open_coroutine::join] 宏以等待协程

0.5.x

  • 重构系统调用状态,区分状态和内部状态

0.4.x

  • 在本地文件 IO 方面支持并兼容 io_uring
  • 优雅的关闭
  • 使用 log 代替 println
  • 增强 #[open_coroutine::main]
  • 重构钩子实现,现在不需要发布动态库
  • Monitor 遵循 thread-per-core 指南
  • EventLoop 遵循 thread-per-core 指南

0.3.x

  • 支持 genawaiter 作为低级无堆栈协程(由于钩子无法支持它)
  • 使用 corosensei 作为低级协程
  • 支持回溯
  • 支持 #[open_coroutine::co]
  • 重构 WorkStealQueue

0.2.x

  • 使用正确的 epoll_event 结构体

  • 使用 rayon 进行并行计算

  • 支持 #[open_coroutine::main]

  • 挂钩几乎所有 read 系统调用

    读取系统调用
    • 接收
    • readv
    • pread
    • preadv
    • recvfrom
    • recvmsg
  • 挂钩几乎所有 write 系统调用

    写入系统调用
    • 发送
    • writev
    • sendto
    • sendmsg
    • pwrite
    • pwritev
  • 挂钩其他系统调用

    其他系统调用
    • 休眠
    • usleep
    • nanosleep
    • 连接
    • 监听
    • 接受
    • 关闭
    • poll
    • select

0.1.x

  • 基本挂起/恢复支持
  • 使用 jemalloc 作为内存池
  • 支持高级协程抽象
  • 支持抢占式调度
  • 支持工作窃取
  • 支持休眠系统调用挂钩

依赖

~0.8–25MB
~346K SLoC