#actix-actor #async-await #actix #await #atomic #context #async

actix-interop

使用异步/await语法与actix演员配合使用

5个版本 (3个重大变更)

0.4.0 2022年4月30日
0.3.0 2021年8月18日
0.2.0 2021年7月27日
0.1.1 2020年4月21日
0.1.0 2020年4月18日

#933异步

Download history 2/week @ 2024-04-22 7/week @ 2024-05-20 5/week @ 2024-06-03 4/week @ 2024-06-10 8/week @ 2024-06-24 24/week @ 2024-07-01 27/week @ 2024-07-22

每月51次下载

MIT/Apache

23KB
238 代码行

actix-interop

CI

文档

允许使用异步/await语法实现actix演员,并提供了一种方便的方式来控制对演员状态的访问。

# Cargo.toml
[dependencies]
actix-interop = "0.3"

示例

此示例演示了如何实现一个通用的“管道适配器”,它可以将任何Sink/Stream对(形成请求/响应管道)转换为actix Actor。

响应将与它们发送的顺序相匹配(因此第一个响应对应于发送到Sink的第一个请求等)。这要求我们的“发送”操作是严格有序的,在actix中由于异步操作通常允许交错,所以这是很难实现的。

此外,虽然发送必须是原子的,但我们还希望能够在任何给定时间拥有大量正在进行的请求,因此消息处理程序接收部分在等待时不需要独占访问演员。因此,像AtomicResponse这样的抽象过于简单,无法提供帮助。

为了解决这个问题,我们使用critical_section函数来允许我们的消息处理程序中的特定部分是原子的。

use std::collections::VecDeque;
use std::pin::Pin;

use futures::{Sink, SinkExt, Stream, channel::oneshot};
use actix::prelude::*;

use actix_interop::{FutureInterop, with_ctx, critical_section};

// Define our actor
pub struct PipelineAdapter<Req, Res, Err> {
    sink: Option<Pin<Box<dyn Sink<Req, Error=Err>>>>,
    in_flight_reqs: VecDeque<oneshot::Sender<Result<Res, Err>>>,
}

// Implement a constructor
impl<Req, Res, Err> PipelineAdapter<Req, Res, Err>
where
    Req: 'static,
    Res: 'static,
    Err: 'static,
{
    pub fn new<Si, St>(sink: Si, stream: St) -> Addr<Self>
    where
        Si: Sink<Req, Error=Err> + 'static,
        St: Stream<Item=Res> + 'static,
    {
        // Convert to a boxed trait object
        let sink: Box<dyn Sink<Req, Error=Err>> = Box::new(sink);

        Self::create(|ctx| {
            ctx.add_stream(stream);
            Self {
                sink: Some(sink.into()),
                in_flight_reqs: VecDeque::new(),
            }
        })
    }
}

// Tell actix this is an actor using the default Context type
impl<Req, Res, Err> Actor for PipelineAdapter<Req, Res, Err>
where
    Req: 'static,
    Res: 'static,
    Err: 'static,
{
    type Context = Context<Self>;
}

// Transform actix messages into the pipelines request/response protocol
impl<Req, Res, Err> Handler<Req> for PipelineAdapter<Req, Res, Err>
where
    Req: Message<Result=Result<Res, Err>> + 'static,
    Res: 'static,
    Err: 'static,
{
    type Result = ResponseActFuture<Self, Result<Res, Err>>; // <- Message response type

    fn handle(&mut self, msg: Req, _ctx: &mut Context<Self>) -> Self::Result {
        async move {
            let (tx, rx) = oneshot::channel();

            // Perform sends in a critical section so they are strictly ordered
            critical_section::<Self, _>(async {
                // Take the sink from the actor state
                let mut sink = with_ctx(|actor: &mut Self, _| actor.sink.take())
                    .expect("Sink to be present");
                
                // Send the request
                let res = sink.send(msg).await;

                // Put the sink back, and if the send was successful,
                // record the in-flight request.
                with_ctx(|actor: &mut Self, _| {
                    actor.sink = Some(sink);
                    match res {
                        Ok(()) => actor.in_flight_reqs.push_back(tx),
                        Err(e) => {
                            // Don't care if the receiver has gone away
                            let _ = tx.send(Err(e));
                        }
                    }
                });
            })
            .await;

            // Wait for the result concurrently, so many requests can
            // be pipelined at the same time.
            rx.await.expect("Sender should not be dropped")
        }
        .interop_actor_boxed(self)
    }
}

// Process responses
impl<Req, Res, Err> StreamHandler<Res> for PipelineAdapter<Req, Res, Err>
where
    Req: 'static,
    Res: 'static,
    Err: 'static,
{
    fn handle(&mut self, msg: Res, _ctx: &mut Context<Self>) {
        // When we receive a response, just pull the first in-flight
        // request and forward on the result.
        let _ = self.in_flight_reqs
            .pop_front()
            .expect("There to be an in-flight request")
            .send(Ok(msg));
    }
}

许可证

该项目可在以下任一许可证下使用:

任选其一。

贡献

除非您明确声明,否则根据Apache-2.0许可证定义,您提交的任何旨在包含在actix-interop中的贡献,都将按上述方式双重许可,不附加任何额外条款或条件。

依赖项

~5-14MB
~160K SLoC