#future #async #runtimes #async-task #waiter #top-level #points

nightly waker-waiter

帮助异步运行时与任意futures互操作

4个版本 (2个重大更改)

0.3.0 2024年6月15日
0.2.1 2023年9月5日
0.2.0 2023年7月24日
0.1.0 2023年3月19日

#1716 in 异步

每月下载 28

MIT 许可证

28KB
511

waker-waiter


lib.rs:

该库通过允许futures提供自己的事件轮询逻辑来帮助异步运行时支持任意futures的执行。这是实现上下文反应器钩子所述方法的尝试。

有两个集成点

在一个 TopLevelPoller 上只能注册一个 WakerWaiter。如果多个futures依赖于相同的事件轮询逻辑,futures应该协调并共享同一个 WakerWaiter

注册 WakerWaiter 的futures示例

#
static REACTOR: Mutex<Option<Arc<Reactor>>> = Mutex::new(None);

struct Reactor {
    waiter: Option<WakerWaiter>,
}

impl Reactor {
    fn current() -> Arc<Reactor> {
        let mut reactor = REACTOR.lock().unwrap();

        if reactor.is_none() {
            let r = Arc::new(Reactor { waiter: None });

            let waiter = Arc::new(ReactorWaiter {
                reactor: Arc::downgrade(&r),
            }).into();

            // SAFETY: nobody else could be borrowing right now
            let r = unsafe {
                let r = (Arc::into_raw(r) as *mut Reactor).as_mut().unwrap();
                r.waiter = Some(waiter);

                Arc::from_raw(r as *const Reactor)
            };

            *reactor = Some(r);
        }

        Arc::clone(reactor.as_ref().unwrap())
    }

    fn waiter<'a>(self: &'a Arc<Self>) -> &'a WakerWaiter {
        self.waiter.as_ref().unwrap()
    }
}

struct ReactorWaiter {
    reactor: Weak<Reactor>,
}

impl WakerWait for ReactorWaiter {
    fn wait(self: &Arc<Self>) {
        // ... blocking poll for events ...
        todo!();
    }

    fn canceler(self: &Arc<Self>) -> WakerWaiterCanceler {
        // ... provide a way to unblock the above ...
        todo!();
    }
}

struct MyFuture;

impl Future for MyFuture {
#
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let p = match get_poller(cx) {
            Some(p) => p,
            None => panic!("MyFuture requires context to provide TopLevelPoller"),
        };

        if p.set_waiter(Reactor::current().waiter()).is_err() {
            panic!("Incompatible waiter already assigned to TopLevelPoller");
        }

        // ... register waker, perform I/O, etc ...
    }
}

示例:一个提供 TopLevelPoller 的执行器

#![feature(local_waker)]
#![feature(context_ext)]
struct ThreadWaker {
    thread: Thread,
    waiter: Arc<Mutex<Option<WakerWaiter>>>,
}

impl Wake for ThreadWaker {
    fn wake(self: Arc<Self>) {
        if thread::current().id() == self.thread.id() {
            // if we were woken in the same thread as execution,
            // then the wake was caused by the WakerWaiter which
            // will return control without any signaling needed
            return;
        }

        let waiter = self.waiter.lock().unwrap().clone();

        if let Some(waiter) = waiter {
            // if a waiter was configured, then the execution thread
            // will be blocking on it and we'll need to unblock it
            waiter.canceler().cancel();
        } else {
            // if a waiter was not configured, then the execution
            // thread will be asleep with a standard thread park
            self.thread.unpark();
        }
    }
}

#[derive(Clone)]
struct MyTopLevelPoller {
    waiter: Arc<Mutex<Option<WakerWaiter>>>,
}

impl TopLevelPoller for MyTopLevelPoller {
    fn set_waiter(&mut self, waiter: &WakerWaiter) -> Result<(), SetWaiterError> {
        let self_waiter = &mut *self.waiter.lock().unwrap();

        if let Some(cur) = self_waiter {
            if cur == waiter {
                return Ok(()); // already set to this waiter
            } else {
                return Err(SetWaiterError); // already set to a different waiter
            }
        }

        *self_waiter = Some(waiter.clone());

        Ok(())
    }
}

let waiter = Arc::new(Mutex::new(None));
let waker = Arc::new(ThreadWaker {
    thread: thread::current(),
    waiter: Arc::clone(&waiter),
}).into();
let mut cx = Context::from_waker(&waker);
let mut poller = MyTopLevelPoller { waiter };

let mut fut = pin!(async { /* ... */ });

loop {
   let result = {
       let mut a = Anyable::new(&mut poller as &mut dyn TopLevelPoller);
       let mut cx = ContextBuilder::from_waker(&waker).ext(a.as_any()).build();

       fut.as_mut().poll(&mut cx)
   };

   match result {
        Poll::Ready(res) => break res,
        Poll::Pending => {
            let waiter = poller.waiter.lock().unwrap().clone();

            // if a waiter is configured then block on it. else do a
            // standard thread park
            match waiter {
                Some(waiter) => waiter.wait(),
                None => thread::park(),
            }
        }
    }
}

无运行时依赖