4个版本 (2个破坏性版本)
0.3.0 | 2024年8月1日 |
---|---|
0.2.0 | 2024年6月1日 |
0.1.1 | 2024年5月28日 |
0.1.0 | 2024年5月25日 |
#264 in 并发
每月128次下载
用于 xrm
61KB
1K SLoC
nblock
描述
nblock是Rust的非阻塞运行时。它在一组管理的线程集合上执行非阻塞任务。
任务
使用Task
在Runtime
上启动,使用Runtime::spawn
。任务类似于std::future::Future
,但它们是可变的,保证从单个线程运行,并在驱动到完成的过程中区分Idle
和Active
状态。像Future
一样,Task
有一个Output
,可以通过JoinHandle
获得,这是Runtime::spawn
返回的。
线程
任务是在由运行时管理的共享线程集合上启动的。任务根据提供的ThreadSelector
绑定到特定线程。
示例
循环启动
以下示例将使用循环ThreadSelector
在两个线程之间交替运行任务,为每个任务打印"hello, world"和线程名称。
代码
use nblock::{
idle::{Backoff, NoOp},
selector::RoundRobinSelector,
task::{Nonblock, Task},
Runtime,
};
use std::thread::current;
let runtime = Runtime::builder()
.with_thread_selector(
RoundRobinSelector::builder()
.with_thread_ids(vec![1, 2])
.with_idle(Backoff::default())
.build()
.unwrap(),
)
.build()
.unwrap();
struct HelloWorldTask;
impl Task for HelloWorldTask {
type Output = ();
fn drive(&mut self) -> nblock::task::Nonblock<Self::Output> {
println!("hello, world! from: {:?}", current().name().unwrap());
Nonblock::Complete(())
}
}
runtime.spawn("t1", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t2", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t3", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t4", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t5", HelloWorldTask).join(NoOp).unwrap();
输出
hello, world! from: "nblock thread-1"
hello, world! from: "nblock thread-2"
hello, world! from: "nblock thread-1"
hello, world! from: "nblock thread-2"
hello, world! from: "nblock thread-1"
专用启动
以下示例将使用一个ThreadSelector
,它将每个任务启动到专用线程上,为每个任务打印"hello, world"和线程名称。
代码
use nblock::{
idle::{Backoff, NoOp},
task::{Nonblock, Task},
selector::DedicatedThreadSelector,
Runtime,
};
use std::thread::current;
let runtime = Runtime::builder()
.with_thread_selector(DedicatedThreadSelector::new(Backoff::default()))
.build()
.unwrap();
struct HelloWorldTask;
impl Task for HelloWorldTask {
type Output = ();
fn drive(&mut self) -> nblock::task::Nonblock<Self::Output> {
println!("hello, world! from: {:?}", current().name().unwrap());
Nonblock::Complete(())
}
}
runtime.spawn("t1", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t2", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t3", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t4", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t5", HelloWorldTask).join(NoOp).unwrap();
输出
hello, world! from: "nblock task t1"
hello, world! from: "nblock task t2"
hello, world! from: "nblock task t3"
hello, world! from: "nblock task t4"
hello, world! from: "nblock task t5"
在完成时启动
以下示例展示了如何使用JoinHandle
在另一个任务完成后自动启动一个新的任务。这与异步代码中的await
非常相似,但它在支持任务可变性的同时是无锁的。注意,在任务内部,你可以使用Runtime::get()
来获取当前的Runtime。
use nblock::{
idle::{Backoff, NoOp},
selector::RoundRobinSelector,
task::{Nonblock, Task},
Runtime,
};
use std::{thread, time::Duration};
let runtime = Runtime::builder()
.with_thread_selector(
RoundRobinSelector::builder()
.with_thread_ids(vec![1, 2])
.with_idle(Backoff::default())
.build()
.unwrap(),
)
.build()
.unwrap();
struct HelloWorldTask {
input: u64,
}
impl HelloWorldTask {
fn new(input: u64) -> Self {
Self { input }
}
}
impl Task for HelloWorldTask {
type Output = u64;
fn drive(&mut self) -> nblock::task::Nonblock<Self::Output> {
println!(
"hello, world! from: {:?}! The input was {}.",
thread::current().name().unwrap(),
self.input
);
Nonblock::Complete(self.input + 1)
}
}
runtime
.spawn("t1", HelloWorldTask::new(1))
.on_complete(|output| {
Runtime::get().spawn("t2", HelloWorldTask::new(output));
});
thread::sleep(Duration::from_millis(100));
runtime
.shutdown(NoOp, Some(Duration::from_secs(1)))
.unwrap();
输出
hello, world! from: "nblock thread-1"! The input was 1.
hello, world! from: "nblock thread-2"! The input was 2.
许可证:MIT OR Apache-2.0
依赖项
~2.4–3.5MB
~58K SLoC