1 个不稳定版本

0.1.0 2024年4月25日

#557并发

MIT 许可证

17KB
306

类似于 Atomic<Option<Box<dyn FnOnce + Send + 'static>>>.

这是一个基本的并发工具,可用于构建更高级的抽象。例如,seg_queue 示例展示了如何将 crossbeam 的 SegQueue(一个不支持阻塞,只支持轮询的并发队列)转换为一个既支持阻塞也支持异步/await 的 mpsc 队列。

pub struct Sender<T>(Arc<State<T>>);

pub struct Receiver<T>(Arc<State<T>>);

struct State<T> {
    queue: SegQueue<T>,
    callback_cell: CallbackCell,
}

fn new_queue<T>() -> (Sender<T>, Receiver<T>) {
    let state_1 = Arc::new(State {
        queue: SegQueue::new(),
        callback_cell: CallbackCell::new(),
    });
    let state_2 = Arc::clone(&state_1);
    (Sender(state_1), Receiver(state_2))
}

impl<T> Sender<T> {
    fn send(&self, item: T) {
        self.0.queue.push(item);
        self.0.callback_cell.take_call();
    }
}

impl<T> Receiver<T> {
    fn recv_blocking(&mut self) -> T {
        if let Some(item) = self.0.queue.pop() {
            return item;
        }
        let parker = Parker::new();
        loop {
            let unparker = parker.unparker().clone();
            self.0.callback_cell.put(move || unparker.unpark());
            if let Some(item) = self.0.queue.pop() {
                return item;
            }
            parker.park();
        }
    }

    async fn recv_async(&mut self) -> T {
        if let Some(item) = self.0.queue.pop() {
            return item;
        }
        let notify_1 = Arc::new(Notify::new());
        loop {
            let notify_2 = Arc::clone(&notify_1);
            self.0.callback_cell.put(move || notify_2.notify_one());
            if let Some(item) = self.0.queue.pop() {
                return item;
            }
            notify_1.notified().await;
        }
    }
}

实现此功能的一种简单方法是涉及两层间接引用

  • 首先,FnOnce 可以被装箱成一个 Box<dyn FnOnce>,实现动态分发。
  • 然后,这可以被装箱成一个 Box<Box<dyn FnOnce>>,使其成为一个普通指针而不是胖指针。
  • 外部的 Box 可以转换成一个原始指针,然后转换成一个 usize 并存储在 AtomicUsize 中。

然而,这个工具通过稍微巧妙地使用单态化和 std::alloc API,只在一次堆分配中完成,而不是两次。

无运行时依赖