4个版本 (2个重大更改)
0.3.0 | 2024年6月15日 |
---|---|
0.2.1 | 2023年9月5日 |
0.2.0 | 2023年7月24日 |
0.1.0 | 2023年3月19日 |
#1716 in 异步
每月下载 28
28KB
511 行
waker-waiter
lib.rs
:
该库通过允许futures提供自己的事件轮询逻辑来帮助异步运行时支持任意futures的执行。这是实现上下文反应器钩子所述方法的尝试。
有两个集成点
- 需要在执行线程中运行自己的事件轮询逻辑的futures必须调用
get_poller
以获取TopLevelPoller
并调用TopLevelPoller::set_waiter
在其上注册一个WakerWaiter
。 - 负责轮询顶级futures(即异步运行时)的应用程序部分需要实现
TopLevelPoller
特性,并通过std::task::ContextBuilder::ext
提供。该库通过block_on
提供这样的实现。
在一个 TopLevelPoller
上只能注册一个 WakerWaiter
。如果多个futures依赖于相同的事件轮询逻辑,futures应该协调并共享同一个 WakerWaiter
。
注册 WakerWaiter
的futures示例
#
static REACTOR: Mutex<Option<Arc<Reactor>>> = Mutex::new(None);
struct Reactor {
waiter: Option<WakerWaiter>,
}
impl Reactor {
fn current() -> Arc<Reactor> {
let mut reactor = REACTOR.lock().unwrap();
if reactor.is_none() {
let r = Arc::new(Reactor { waiter: None });
let waiter = Arc::new(ReactorWaiter {
reactor: Arc::downgrade(&r),
}).into();
// SAFETY: nobody else could be borrowing right now
let r = unsafe {
let r = (Arc::into_raw(r) as *mut Reactor).as_mut().unwrap();
r.waiter = Some(waiter);
Arc::from_raw(r as *const Reactor)
};
*reactor = Some(r);
}
Arc::clone(reactor.as_ref().unwrap())
}
fn waiter<'a>(self: &'a Arc<Self>) -> &'a WakerWaiter {
self.waiter.as_ref().unwrap()
}
}
struct ReactorWaiter {
reactor: Weak<Reactor>,
}
impl WakerWait for ReactorWaiter {
fn wait(self: &Arc<Self>) {
// ... blocking poll for events ...
todo!();
}
fn canceler(self: &Arc<Self>) -> WakerWaiterCanceler {
// ... provide a way to unblock the above ...
todo!();
}
}
struct MyFuture;
impl Future for MyFuture {
#
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let p = match get_poller(cx) {
Some(p) => p,
None => panic!("MyFuture requires context to provide TopLevelPoller"),
};
if p.set_waiter(Reactor::current().waiter()).is_err() {
panic!("Incompatible waiter already assigned to TopLevelPoller");
}
// ... register waker, perform I/O, etc ...
}
}
示例:一个提供 TopLevelPoller
的执行器
#![feature(local_waker)]
#![feature(context_ext)]
struct ThreadWaker {
thread: Thread,
waiter: Arc<Mutex<Option<WakerWaiter>>>,
}
impl Wake for ThreadWaker {
fn wake(self: Arc<Self>) {
if thread::current().id() == self.thread.id() {
// if we were woken in the same thread as execution,
// then the wake was caused by the WakerWaiter which
// will return control without any signaling needed
return;
}
let waiter = self.waiter.lock().unwrap().clone();
if let Some(waiter) = waiter {
// if a waiter was configured, then the execution thread
// will be blocking on it and we'll need to unblock it
waiter.canceler().cancel();
} else {
// if a waiter was not configured, then the execution
// thread will be asleep with a standard thread park
self.thread.unpark();
}
}
}
#[derive(Clone)]
struct MyTopLevelPoller {
waiter: Arc<Mutex<Option<WakerWaiter>>>,
}
impl TopLevelPoller for MyTopLevelPoller {
fn set_waiter(&mut self, waiter: &WakerWaiter) -> Result<(), SetWaiterError> {
let self_waiter = &mut *self.waiter.lock().unwrap();
if let Some(cur) = self_waiter {
if cur == waiter {
return Ok(()); // already set to this waiter
} else {
return Err(SetWaiterError); // already set to a different waiter
}
}
*self_waiter = Some(waiter.clone());
Ok(())
}
}
let waiter = Arc::new(Mutex::new(None));
let waker = Arc::new(ThreadWaker {
thread: thread::current(),
waiter: Arc::clone(&waiter),
}).into();
let mut cx = Context::from_waker(&waker);
let mut poller = MyTopLevelPoller { waiter };
let mut fut = pin!(async { /* ... */ });
loop {
let result = {
let mut a = Anyable::new(&mut poller as &mut dyn TopLevelPoller);
let mut cx = ContextBuilder::from_waker(&waker).ext(a.as_any()).build();
fut.as_mut().poll(&mut cx)
};
match result {
Poll::Ready(res) => break res,
Poll::Pending => {
let waiter = poller.waiter.lock().unwrap().clone();
// if a waiter is configured then block on it. else do a
// standard thread park
match waiter {
Some(waiter) => waiter.wait(),
None => thread::park(),
}
}
}
}