5个版本 (3个重大变更)
0.4.0 | 2022年4月30日 |
---|---|
0.3.0 | 2021年8月18日 |
0.2.0 | 2021年7月27日 |
0.1.1 | 2020年4月21日 |
0.1.0 | 2020年4月18日 |
#933 在 异步
每月51次下载
23KB
238 代码行
actix-interop
允许使用异步/await语法实现actix演员,并提供了一种方便的方式来控制对演员状态的访问。
# Cargo.toml
[dependencies]
actix-interop = "0.3"
示例
此示例演示了如何实现一个通用的“管道适配器”,它可以将任何Sink
/Stream
对(形成请求/响应管道)转换为actix Actor。
响应将与它们发送的顺序相匹配(因此第一个响应对应于发送到Sink
的第一个请求等)。这要求我们的“发送”操作是严格有序的,在actix中由于异步操作通常允许交错,所以这是很难实现的。
此外,虽然发送必须是原子的,但我们还希望能够在任何给定时间拥有大量正在进行的请求,因此消息处理程序接收部分在等待时不需要独占访问演员。因此,像AtomicResponse
这样的抽象过于简单,无法提供帮助。
为了解决这个问题,我们使用critical_section
函数来允许我们的消息处理程序中的特定部分是原子的。
use std::collections::VecDeque;
use std::pin::Pin;
use futures::{Sink, SinkExt, Stream, channel::oneshot};
use actix::prelude::*;
use actix_interop::{FutureInterop, with_ctx, critical_section};
// Define our actor
pub struct PipelineAdapter<Req, Res, Err> {
sink: Option<Pin<Box<dyn Sink<Req, Error=Err>>>>,
in_flight_reqs: VecDeque<oneshot::Sender<Result<Res, Err>>>,
}
// Implement a constructor
impl<Req, Res, Err> PipelineAdapter<Req, Res, Err>
where
Req: 'static,
Res: 'static,
Err: 'static,
{
pub fn new<Si, St>(sink: Si, stream: St) -> Addr<Self>
where
Si: Sink<Req, Error=Err> + 'static,
St: Stream<Item=Res> + 'static,
{
// Convert to a boxed trait object
let sink: Box<dyn Sink<Req, Error=Err>> = Box::new(sink);
Self::create(|ctx| {
ctx.add_stream(stream);
Self {
sink: Some(sink.into()),
in_flight_reqs: VecDeque::new(),
}
})
}
}
// Tell actix this is an actor using the default Context type
impl<Req, Res, Err> Actor for PipelineAdapter<Req, Res, Err>
where
Req: 'static,
Res: 'static,
Err: 'static,
{
type Context = Context<Self>;
}
// Transform actix messages into the pipelines request/response protocol
impl<Req, Res, Err> Handler<Req> for PipelineAdapter<Req, Res, Err>
where
Req: Message<Result=Result<Res, Err>> + 'static,
Res: 'static,
Err: 'static,
{
type Result = ResponseActFuture<Self, Result<Res, Err>>; // <- Message response type
fn handle(&mut self, msg: Req, _ctx: &mut Context<Self>) -> Self::Result {
async move {
let (tx, rx) = oneshot::channel();
// Perform sends in a critical section so they are strictly ordered
critical_section::<Self, _>(async {
// Take the sink from the actor state
let mut sink = with_ctx(|actor: &mut Self, _| actor.sink.take())
.expect("Sink to be present");
// Send the request
let res = sink.send(msg).await;
// Put the sink back, and if the send was successful,
// record the in-flight request.
with_ctx(|actor: &mut Self, _| {
actor.sink = Some(sink);
match res {
Ok(()) => actor.in_flight_reqs.push_back(tx),
Err(e) => {
// Don't care if the receiver has gone away
let _ = tx.send(Err(e));
}
}
});
})
.await;
// Wait for the result concurrently, so many requests can
// be pipelined at the same time.
rx.await.expect("Sender should not be dropped")
}
.interop_actor_boxed(self)
}
}
// Process responses
impl<Req, Res, Err> StreamHandler<Res> for PipelineAdapter<Req, Res, Err>
where
Req: 'static,
Res: 'static,
Err: 'static,
{
fn handle(&mut self, msg: Res, _ctx: &mut Context<Self>) {
// When we receive a response, just pull the first in-flight
// request and forward on the result.
let _ = self.in_flight_reqs
.pop_front()
.expect("There to be an in-flight request")
.send(Ok(msg));
}
}
许可证
该项目可在以下任一许可证下使用:
- Apache许可证,版本2.0,(LICENSE-APACHE 或 https://apache.ac.cn/licenses/LICENSE-2.0)
- MIT许可证 (LICENSE-MIT 或 http://opensource.org/licenses/MIT)
任选其一。
贡献
除非您明确声明,否则根据Apache-2.0许可证定义,您提交的任何旨在包含在actix-interop中的贡献,都将按上述方式双重许可,不附加任何额外条款或条件。
依赖项
~5-14MB
~160K SLoC