1 个不稳定版本
使用旧Rust 2015
0.0.0 | 2017年10月14日 |
---|
#35 在 #consume
46KB
1K SLoC
tokio by hand
我一直试图弄清楚future,但这真是一次相当艰难的经历。
我开始工作的项目是消费加密货币交易所市场数据websocket馈送并将其写入数据库。
我开始使用 https://crates.io/crates/websocket 并决定将其异步化。虽然我可以用线程、锁和通道完成它,但我之前从未深入使用过tokio和future,所以我用它作为学习的借口。
我的体验基本上是,提供的组合器足够简单易用,但当我需要超出它们时,事情很快变得困难起来。学习曲线非常不平稳,简单的事情是可控的,但突然之间,当不能使用基本组合器时,我就陷入了困境。
有时组合器不足的原因有几个
-
程序组织。我有一大堆组合器调用,可以完成我想做的事情,但它们都在一个函数中,难以阅读和调试。我从websocket读取,将控制消息写回websocket,将websocket中的文本反序列化为我的本地结构,并将高级控制消息序列化为文本并写回websocket。
我的想法是将反序列化、序列化和低级websocket管理抽象到另一个Sink + Stream中,但这意味着我自己实现一个future,这比成功使用组合器要困难得多。
-
疯狂的类型。我发现组合器通常会产生难以理解的类型。如果我在具体的类型上实现自己的具体future,程序将会简单得多。装箱基本可行,但有时类型推理以令人困惑的方式失败,我需要大量注释来引导事物到正确的类型。
尝试实现,作为我的第一个future,一个包含反序列化、序列化和websocket连接管理的非常复杂的问题,如你所想象的那样,非常困难。
因此,我退了一步,开始实现一系列更简单的future,这样我就可以理解它们是如何工作的。
我实现了以下future
standard::sleeper::Sleeper
,这是一个在通过在子线程上休眠后经过超时产生()
的future。standard::instant::{Producer,Consumer}
产生和消费u8
的期货。消费u8
的期货有些不寻常,但我将其用作实现汇的依据。standard::delayed::{Producer,Consumer}
与上述相同,但操作在超时后执行。standard::instant_series::{Producer,Consumer}
一个流和一个汇,产生无限序列的随机u8
。standard::instant_series::{Producer,Consumer}
与上述相同,但每个项目之后延迟一秒。standard::buffered::Consumer
与上述消费者相同,但在接收项目比处理速度快时包含固定大小的缓冲区。
在实现它们时,我在理解期货模型中遇到的主要困难如下
-
我发现缺乏显式的任务参数非常令人困惑。对我来说,任务是什么,如何创建它,它代表什么,以及如何通知任务并不明显。
我考虑在 https://github.com/alexcrichton/futures-rs/issues/129 上发表评论,但我觉得它可能主要是一个“我也是”的评论,所以我暂时搁置了。
-
理解何时期货“足够完成”以确保它会在将来取得进展。不止一次,我会实现一个期货并期望它工作,但实际上并没有直接或间接地安排任务被通知,所以一切都会挂起。
在弄清楚我的困难所在后,我创建了一个修改后的期货 API,这将使避免这些相同的错误变得困难。
您可以在 extended
模块中看到 API。我用 extended
API 重新实现了 standard
模块中的所有内容,发现它更容易理解为什么会发生这一切。
它在 poll
、start_send
和 poll_complete
中添加了显式的 TaskHandle
。这是为了帮助 #1。为了帮助 #2,我扩展了 Async
和 AsyncSink
类型,以便在不准备就绪时也包含一个 AgreementToNotify
值。
AgreementToNotify
值通过调用 TaskHandle::i_will_notify()
获得返回 (Task, AgreementToNotify)
。 (内部,Task
从 task::current()
获取。)该方法名称显然有些直白,但目的是在您的期货实现中标记您实际获取当前任务的地方,以便您可以在将来通知它。
在实践中,我在代码库中只这样做过一次,就是在实现 extended::sleeper::Sleeper
时。然而,这仍然非常有用,因为它清楚地表明了某个 future 是否是 future 链中的最后一个,因此负责通知任务,或者它只是委托给另一个 future。
如果委托给另一个 future,它必须从另一个 future 的 NotReady
获取 AgreementToNotify
,因此不可能在不直接或间接安排通知任务的情况下返回 NotReady
。
扩展 API 让我对此有了更清晰的了解。我明白它是一个非常冗长的 API,但我认为它极大地帮助了理解 futures 模型,使 traits 的契约变得非常清晰,并将许多错误移至编译时。至少,对我来说是这样的。
我能够相当容易地实现 extended::adapter::Adapter
,这基本上是为原始程序编写的 future 的一个简单版本,我相当有信心现在能够做到这一点。
依赖关系
约 6.5MB
约 100K SLoC