#actor-framework #processing #order #message #sqlite #ingestion #received

bin+lib arrows

一个具有消息持久性和消息处理顺序的 Rust actor 框架

17 个版本

0.1.16 2022 年 1 月 25 日
0.1.15 2022 年 1 月 24 日
0.1.0 2021 年 10 月 16 日

并发 中排名 801

Download history 4/week @ 2024-03-28 2/week @ 2024-04-04

每月下载量 51

AGPL-3.0-or-later

130KB
3K SLoC

Arrows

一个具有消息持久性和消息处理顺序的 Rust actor 框架。通过嵌入的 sqlite 实例实现消息持久化。消息内容可以是文本或二进制。消息本身以二进制形式存储在后备存储中。

use crate::{Actor, Mail, Msg, Producer};
use serde::{Deserialize, Serialize};

//A sample actor
pub struct DemoActor;
impl Actor for DemoActor {
    fn receive(&mut self, incoming: Mail) -> Option<Mail> {
        match incoming {
            Mail::Trade(msg) => println!("Received: {}", msg),
            bulk @ Mail::Bulk(_) => println!("Received bulk msg: {}", bulk),
            Mail::Blank => println!("DemoActor received blank"),
        }
        Some(Msg::from_text("Message from DemoActor").into())
    }
}

//Producer implementations are called to produce actor instances.

//Produces DemoActor instances
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct ActorProducer;

//Producer implementations need to be tagged with `typetag` marker.

#[typetag::serde]
impl Producer for ActorProducer {
    fn produce(&mut self) -> Box<dyn Actor> {
        Box::new(DemoActor)
    }
}

//The `define_actor` - macro actually defines a new actor `instance` in the system. The
//actor instance along with the producer - get persisted in the backing store, the actor
//instance gets activated and receives a startup signal and becomes ready to process
//incoming messages. The producer defintion is used to restart/restore the actor as
//required.

use arrows::define_actor;

let producer = ActorProducer::default();
define_actor!("demo_actor", producer);

//At this stage - the actor instance `demo_actor` is ready for incoming messages. It
//should have already received the startup signal.

use arrows::send;

let m1 = Msg::from_text("Message to demo actor");
let m2 = Msg::from_text("Message to demo actor");
let m3 = Msg::from_text("Message to demo actor");

send!("demo_actor", (m1, m2, m3));

//Create another actor instance - demo_actor1

define_actor!("demo_actor1", ActorProducer::default());

let m4 = Msg::from_text("Message to demo actor1");
let m5 = Msg::from_text("Message to demo actor1");

let m6 = Msg::from_text("Message to demo actor");
let m7 = Msg::from_text("Message to demo actor");

//Send out multiple messages to multiple actors at one go

send!("demo_actor1", (m4, m5), "demo_actor", (m6, m7));


//Actors running in remote systems - need to be identified by the `Addr` construct:

use arrows::Addr;

let remote_actor = Addr::remote("remote_actor", "11.11.11.11:8181");

let m1 = Msg::with_text("Message to remote actor");
let m2 = Msg::with_text("Message to remote actor");

send!("remote_actor", m1, m2);

//While sending to a single actor - its not recessary to group messages within braces.

如何开始

  • 克隆 GitHub 仓库。
  • 在项目目录中执行 cargo build。
  • 打开一个额外的终端
  • 在项目目录中运行 register.sh 脚本。
  • 在另一个终端中,在相同目录下运行 server.sh 脚本
  • 从之前的终端运行 send.sh 脚本 - actor 应该开始接收消息

贡献

该项目仍在不断发展。欢迎贡献。

依赖关系

~27–40MB
~623K SLoC