6 个版本
0.0.7 | 2020 年 4 月 14 日 |
---|---|
0.0.6 | 2020 年 4 月 14 日 |
0.0.1 |
|
#404 在 异步
16KB
175 行
streamline
Rust 的可逆 Stream-based 状态机库
示例
以下是如何使用 Streamline
创建 GitHub 仓库、发布推文,并在出错时撤销整个过程的示例。
// recommended, but not required
use async_trait::async_trait;
// some example clients for communicating through third-party APIs
use clients::{Github, Twitter};
use futures::StreamExt;
use streamline::{State, Streamline};
const MY_USERNAME: &'static str = "my-github-username";
// clients are stored in context for shared access throughout the life of the streamline
#[derive(Debug)]
struct Context {
github: Github,
twitter: Twitter,
}
#[derive(Debug, PartialEq)]
// you should create better errors than this
type MyError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Clone, Debug, PartialEq)]
enum MyState {
Github { repo_name: String },
Twitter { repo_id: i32, repo_name: String }
Done { repo_id: i32, repo_name: String, tweet_id: i32 }
}
#[async_trait(?Send)]
impl State for MyState {
type Context = Context;
type Error = MyError;
// every state needs to be mapped to the next
async fn next(&self, context: Option<&mut Self::Context>) -> Result<Option<Self>, Self::Error> {
let context = context.ok_or_else(|_| Box::new("No context supplied!"))?;
let next_state = match self {
MyState::Github { repo_name } => {
context
.github
.add_repo(&repo_name)
.await?
.map(|response| Some(MyState::Twitter { repo_id: &response.repo_id, repo_url: repo_name }))
},
MyState::Twitter { repo_name, .. } => {
context
.twitter
.tweet(&format("Look at my new Github repo at https://github.com/{}/{}!", &repo_name))
.await?
.map(|response| Some(MyState::Done { tweet_id: response.tweet_id }))
},
MyState::Done { .. } => None // returning Ok(None) stops the stream!
};
Ok(next_state)
}
// optionally, old states can be cleaned up if something goes wrong
async fn revert(&self, context: Option<&mut Self::Context>) -> Result<Option<Self>, Self::Error> {
let context = context.ok_or_else(|_| Box::new("No context supplied!"))?;
let next_state = match self {
MyState::Done { tweet_id, repo_id, repo_name } => {
context
.twitter
.delete_tweet(tweet_id)
.await?;
Some(MyState::Twitter { repo_id, repo_name })
},
MyState::Twitter { repo_id, repo_name } => {
context
.github
.delete_repo(repo_id)
.await?;
Some(MyState::Github { repo_id, repo_name })
},
MyState::Github { .. } => None
};
Ok(next_state)
}
}
async fn handle_tweeting_repo(repo_name: String) {
let context = Context {
github: Github::new(),
twitter: Twitter::new(),
};
Streamline::build(MyState { repo_name })
.context(context)
.run()
.for_each(|state| println!("Next state {:?}", &state))
.await;
}
动机
如果想在过程中从一个状态移动到下一个状态,那么在 Rust 中,通过类型系统查找一些可用的状态机模式是有意义的。特别是,枚举(enum)是一种很好的方法,可以在不排除可能状态的情况下模拟过程的进展。但对于以下场景处理状态时,确定性较低:
- 非阻塞状态更新:通常,我们最关心状态机的最终状态,但我们也希望在向最终状态过渡的过程中更新状态。在状态机中,这通常作为副作用实现(例如,通过通道或事件代理)。
- 超级状态性:虽然将状态携带在枚举的各个变体(variant)中很常见,但确定何时处理不直接附加到状态机单个变体的状态更新则要复杂得多。例如,如何处理状态机进展过程中更新数据库中的值?与第三方服务交互呢?何时应该处理这些状态部分?
- 可逆性:大多数过程都需要知道如何进行清理。建模这些有缺陷的过程以及它们返回到原始状态(或失败)的路径是一个复杂且模板密集的过程。
- 取消:中断
Stream
的执行很容易……只需丢弃Stream
!但清理表示某些进行中状态的流则要困难得多。
streamline
通过以下方式解决了这些问题:
futures::Stream
-兼容性:在状态机执行期间,而不是使用副作用,该库将状态机的每个更新建模为Item
,位于std::futures::Stream
中。- 一致的
上下文
:所有超变状态都可以通过一致的上下文
访问。 - 自动回滚:每当一个过程返回一个
Err
时,streamline
将(可选地)回滚到该点之前的所有状态,返回原始错误。 - 手动取消:
run_preemptible
方法返回一个Stream
和一个Cancel
处理程序,可以用来触发工作流的回滚过程。
依赖项
~1–1.6MB
~33K SLoC