#运行时 #非阻塞

nblock

非阻塞运行时

4个版本 (2个破坏性版本)

0.3.0 2024年8月1日
0.2.0 2024年6月1日
0.1.1 2024年5月28日
0.1.0 2024年5月25日

#264 in 并发

Download history 247/week @ 2024-05-24 198/week @ 2024-05-31 15/week @ 2024-06-07 3/week @ 2024-06-14 1/week @ 2024-07-05 83/week @ 2024-07-26 45/week @ 2024-08-02

每月128次下载
用于 xrm

MIT/Apache

61KB
1K SLoC

nblock

描述

nblock是Rust的非阻塞运行时。它在一组管理的线程集合上执行非阻塞任务。

任务

使用TaskRuntime上启动,使用Runtime::spawn。任务类似于std::future::Future,但它们是可变的,保证从单个线程运行,并在驱动到完成的过程中区分IdleActive状态。像Future一样,Task有一个Output,可以通过JoinHandle获得,这是Runtime::spawn返回的。

线程

任务是在由运行时管理的共享线程集合上启动的。任务根据提供的ThreadSelector绑定到特定线程。

示例

循环启动

以下示例将使用循环ThreadSelector在两个线程之间交替运行任务,为每个任务打印"hello, world"和线程名称。

代码

use nblock::{
    idle::{Backoff, NoOp},
    selector::RoundRobinSelector,
    task::{Nonblock, Task},
    Runtime,
};
use std::thread::current;

let runtime = Runtime::builder()
   .with_thread_selector(
       RoundRobinSelector::builder()
           .with_thread_ids(vec![1, 2])
           .with_idle(Backoff::default())
           .build()
           .unwrap(),
   )
   .build()
   .unwrap();

struct HelloWorldTask;
impl Task for HelloWorldTask {
    type Output = ();
    fn drive(&mut self) -> nblock::task::Nonblock<Self::Output> {
        println!("hello, world! from: {:?}", current().name().unwrap());
        Nonblock::Complete(())
    }
}

runtime.spawn("t1", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t2", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t3", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t4", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t5", HelloWorldTask).join(NoOp).unwrap();

输出

hello, world! from: "nblock thread-1"
hello, world! from: "nblock thread-2"
hello, world! from: "nblock thread-1"
hello, world! from: "nblock thread-2"
hello, world! from: "nblock thread-1"

专用启动

以下示例将使用一个ThreadSelector,它将每个任务启动到专用线程上,为每个任务打印"hello, world"和线程名称。

代码

use nblock::{
    idle::{Backoff, NoOp},
    task::{Nonblock, Task},
    selector::DedicatedThreadSelector,
    Runtime,
};
use std::thread::current;

let runtime = Runtime::builder()
    .with_thread_selector(DedicatedThreadSelector::new(Backoff::default()))
    .build()
    .unwrap();

struct HelloWorldTask;
impl Task for HelloWorldTask {
    type Output = ();
    fn drive(&mut self) -> nblock::task::Nonblock<Self::Output> {
        println!("hello, world! from: {:?}", current().name().unwrap());
        Nonblock::Complete(())
    }
}

runtime.spawn("t1", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t2", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t3", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t4", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t5", HelloWorldTask).join(NoOp).unwrap();

输出

hello, world! from: "nblock task t1"
hello, world! from: "nblock task t2"
hello, world! from: "nblock task t3"
hello, world! from: "nblock task t4"
hello, world! from: "nblock task t5"

在完成时启动

以下示例展示了如何使用JoinHandle在另一个任务完成后自动启动一个新的任务。这与异步代码中的await非常相似,但它在支持任务可变性的同时是无锁的。注意,在任务内部,你可以使用Runtime::get()来获取当前的Runtime。

use nblock::{
    idle::{Backoff, NoOp},
    selector::RoundRobinSelector,
    task::{Nonblock, Task},
    Runtime,
};
use std::{thread, time::Duration};

let runtime = Runtime::builder()
    .with_thread_selector(
        RoundRobinSelector::builder()
            .with_thread_ids(vec![1, 2])
            .with_idle(Backoff::default())
            .build()
            .unwrap(),
    )
    .build()
    .unwrap();

struct HelloWorldTask {
    input: u64,
}
impl HelloWorldTask {
    fn new(input: u64) -> Self {
        Self { input }
    }
}
impl Task for HelloWorldTask {
    type Output = u64;
    fn drive(&mut self) -> nblock::task::Nonblock<Self::Output> {
        println!(
            "hello, world! from: {:?}! The input was {}.",
            thread::current().name().unwrap(),
            self.input
        );
        Nonblock::Complete(self.input + 1)
    }
}

runtime
    .spawn("t1", HelloWorldTask::new(1))
    .on_complete(|output| {
        Runtime::get().spawn("t2", HelloWorldTask::new(output));
    });

thread::sleep(Duration::from_millis(100));

runtime
    .shutdown(NoOp, Some(Duration::from_secs(1)))
    .unwrap();

输出

hello, world! from: "nblock thread-1"! The input was 1.
hello, world! from: "nblock thread-2"! The input was 2.

许可证:MIT OR Apache-2.0

依赖项

~2.4–3.5MB
~58K SLoC