1 个不稳定版本
0.1.0 | 2024年4月25日 |
---|
#557 在 并发
17KB
306 行
类似于 Atomic<Option<Box<dyn FnOnce + Send + 'static>>>
.
这是一个基本的并发工具,可用于构建更高级的抽象。例如,seg_queue
示例展示了如何将 crossbeam 的 SegQueue
(一个不支持阻塞,只支持轮询的并发队列)转换为一个既支持阻塞也支持异步/await 的 mpsc 队列。
pub struct Sender<T>(Arc<State<T>>);
pub struct Receiver<T>(Arc<State<T>>);
struct State<T> {
queue: SegQueue<T>,
callback_cell: CallbackCell,
}
fn new_queue<T>() -> (Sender<T>, Receiver<T>) {
let state_1 = Arc::new(State {
queue: SegQueue::new(),
callback_cell: CallbackCell::new(),
});
let state_2 = Arc::clone(&state_1);
(Sender(state_1), Receiver(state_2))
}
impl<T> Sender<T> {
fn send(&self, item: T) {
self.0.queue.push(item);
self.0.callback_cell.take_call();
}
}
impl<T> Receiver<T> {
fn recv_blocking(&mut self) -> T {
if let Some(item) = self.0.queue.pop() {
return item;
}
let parker = Parker::new();
loop {
let unparker = parker.unparker().clone();
self.0.callback_cell.put(move || unparker.unpark());
if let Some(item) = self.0.queue.pop() {
return item;
}
parker.park();
}
}
async fn recv_async(&mut self) -> T {
if let Some(item) = self.0.queue.pop() {
return item;
}
let notify_1 = Arc::new(Notify::new());
loop {
let notify_2 = Arc::clone(¬ify_1);
self.0.callback_cell.put(move || notify_2.notify_one());
if let Some(item) = self.0.queue.pop() {
return item;
}
notify_1.notified().await;
}
}
}
实现此功能的一种简单方法是涉及两层间接引用
- 首先,
FnOnce
可以被装箱成一个Box<dyn FnOnce>
,实现动态分发。 - 然后,这可以被装箱成一个
Box<Box<dyn FnOnce>>
,使其成为一个普通指针而不是胖指针。 - 外部的
Box
可以转换成一个原始指针,然后转换成一个usize
并存储在AtomicUsize
中。
然而,这个工具通过稍微巧妙地使用单态化和 std::alloc
API,只在一次堆分配中完成,而不是两次。