4 个版本
0.1.2 | 2024年3月4日 |
---|---|
0.1.1 | 2024年2月26日 |
0.1.0 | 2024年2月20日 |
0.0.0 | 2023年11月13日 |
#612 in 开发工具
每月 34 次下载
用于 furtif-core
465KB
4.5K SLoC
这是 Silx 项目的一部分
silx-types
包含用于实现 silx 应用的类型定义
注意
关于版本 0.1.2
- 依赖项已更新
目的
Silx 旨在使
- 在一台或多台机器上构建一个作为异步服务网络的应用程序
- 构建这些服务而不必担心服务之间交换通道的详细实现
- 使用简单的参数化交换通道连接这些服务
- 通过类型哈希码控制通道数据类型的完整性
- 在交换通道上实现具有零拷贝反序列化的序列化(rkyv)
- 以可编辑文本格式序列化应用程序的整个网络定义,然后重新加载并执行它
Silx 仍然是一个处于开发中的项目。
以下是一个简单的示例,以提供一个简约的概述。项目 GitHub 上也有其他示例。
简约示例(Hello)
Cargo.toml
[package]
name = "silx_hello"
version = "0.1.2"
edition = "2021"
[dependencies]
tokio = "^1.36.0"
serde = "^1.0.197"
typetag = "^0.2.16"
silx-core = "0.1.2"
silx-types = "0.1.2"
main.rs
use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, time::Duration };
use serde::{Deserialize, Serialize};
use tokio::{spawn, time::sleep};
use silx_core::{
id_tools::IdBuilder, servants::shutdown::ShutdownBuilder,
utils::{
produce_emit, produce_future, produce_query, produce_read, produce_reply2,
Filable, MsgFromServant, ProcessInstance, ProcessProducer, SendToMaster,
ServantBuilder, ServantBuilderParameters, Starter, StarterProducer
},
};
use silx_types::{ ArchSized, WakeSlx };
// ///////////////////////////
// Servants implementations
/// Servant replying greetings by completing queryied full name with Hello
#[derive(Serialize, Deserialize, Clone,)]
struct Hello(String);
#[typetag::serde] impl ServantBuilder for Hello { }
impl ServantBuilderParameters for Hello {
fn max_cycle_time(&self) -> Duration { Duration::from_millis(100) }
fn build_process(&self, _task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
let mut producer = ProcessProducer::new(&send_to_master);
let hello = self.0.clone();
let query_channel = "QueryHello".to_string();
// build reply process
produce_reply2!([hello], producer, String => String, query_channel, data, {
// get full name
let full_name: &str = data.archive_ref().unwrap();
// build an return greeting
let greeting = format!("{hello} {full_name}");
greeting.arch_sized().unwrap()
}).unwrap();
producer.named_process()
}
}
/// Servant sending first name
#[derive(Serialize, Deserialize, Clone,)]
struct FirstName(String);
#[typetag::serde] impl ServantBuilder for FirstName { }
impl ServantBuilderParameters for FirstName {
fn max_cycle_time(&self) -> Duration { Duration::from_millis(100) }
fn build_process(&self, _task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
let mut producer = ProcessProducer::new(&send_to_master);
let first_name = self.0.clone();
// build channels
let emit_channel = "FirstName".to_string();
let sender = produce_emit!(producer, String, emit_channel, None,).unwrap();
// build process
produce_future!(producer, {
sleep(Duration::from_millis(100)).await; // Wait a little bit for receiver to be ready
sender.send(first_name.arch_sized().unwrap()).await.unwrap();
})
}
}
/// Servant doing:
/// * receive first name
/// * build full name
/// * query for greeting
/// * print greeting
/// * shutdown
#[derive(Serialize, Deserialize, Clone,)]
struct LastName(String);
#[typetag::serde] impl ServantBuilder for LastName { }
impl ServantBuilderParameters for LastName {
fn max_cycle_time(&self) -> Duration { Duration::from_millis(100) }
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
let mut producer = ProcessProducer::new(&send_to_master);
let last_name = self.0.clone();
// build channels
let recv_channel = "FirstName".to_string();
let receiver = produce_read!(producer,String,recv_channel,None,).unwrap();
let query_channel = "QueryHello".to_string();
let (query_sender,reply_receiver) = produce_query!(producer,String => String,query_channel, None).unwrap();
let emit_death = "Shutdown".to_string();
let death_sender = produce_emit!(producer, WakeSlx, emit_death, None,).unwrap();
// build process
produce_future!(producer, {
// receive first name
let arc_first_name = receiver.recv().await.unwrap();
// build full name
let full_name = format!("{} {last_name}", arc_first_name.archive_ref().unwrap());
// query for greeting
let arc_full_name = full_name.arch_sized().unwrap();
query_sender.send(arc_full_name).await.unwrap();
let reply = reply_receiver.recv().await.unwrap();
// print greeting
println!("{}",reply.archive_ref().unwrap());
// shutdown
death_sender.send(WakeSlx.arch_sized().unwrap()).await.unwrap();
let tid = task_id.lock().await.generate();
MsgFromServant::Shutdown(tid).send(&send_to_master).await.unwrap();
})
}
}
// ///////////////////////////
// Network implementation
/// Given main and slave socket addresses, build main and slave starters
/// * main cluster implements servants `last_name` and `hello`
/// * slave cluster implements servants `first_name` and `shutdown` (which will shutdown the slave)
/// * actions of `last_name`:
/// * receive first name from `first_name`
/// * build full name and query greeting from `hello`
/// * print greeting
/// * send shutdown signal to `shutdown` and shutdown main cluster
/// * `main_addr: SocketAddr` : main socket address
/// * `slave_addr: SocketAddr` : slave socket address
/// * `save_dir: &PathBuf` : directory where to save the network
/// * Output: main and slave starters
pub fn build_network (main_addr: SocketAddr, slave_addr: SocketAddr, save_dir: &PathBuf) -> (Starter,Starter) {
let max_ping = Duration::from_millis(100);
// set two clusters within the network
let start_prod = StarterProducer::new(
main_addr, "starter=main.yaml", "builder=main.yaml", None, 16
).add_cluster(
slave_addr, "starter=slave.yaml", "builder=slave.yaml", None, 16
).unwrap().done();
// add named servants
let start_prod = start_prod.add_process(
&main_addr, "last_name".to_string(), "servant=last_name.yaml", LastName("Doe".to_string())
).unwrap().add_process(
&main_addr, "hello".to_string(), "servant=hello.yaml", Hello("Welcome".to_string())
).unwrap().add_process(
&slave_addr, "first_name".to_string(),"servant=first_name.yaml", FirstName("John".to_string())
).unwrap().add_process(
&slave_addr, "shutdown".to_string(),"servant=shutdown.yaml", ShutdownBuilder::new("Shutdown".to_string())
).unwrap().done();
// add channels connecting the servants and produce the starter for each cluster
let mut starters = start_prod.add_query(
"channel=QueryHello.yaml", "QueryHello".to_string(), main_addr, ["last_name".to_string()], ["hello".to_string()], max_ping, None
).unwrap().add_net_broadcast(
"channel=FirstName.yaml", "FirstName".to_string(), slave_addr, [format!("first_name"),], main_addr, [format!("last_name"),], max_ping, 16
).unwrap().add_net_broadcast(
"channel=Shutdown.yaml", "Shutdown".to_string(), main_addr, ["last_name".to_string()], slave_addr, ["shutdown".to_string()], max_ping, 16,
).unwrap().done();
// save, get and return starters of the clusters
let main_starter = starters.remove(&main_addr).unwrap().unload(Some(save_dir)).unwrap();
let slave_starter = starters.remove(&slave_addr).unwrap().unload(Some(save_dir)).unwrap();
(main_starter,slave_starter)
}
// //////////////////////////
// Run the network
/// Main performs:
/// * build network and save it in files
/// * network execution
/// * network loading from files
/// * execute the loaded network
#[tokio::main]
pub async fn main() {
// build network and save it in files
let main_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8180);
let slave_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8181);
let save_dir = PathBuf::from("./saved");
let (main_starter,slave_starter) = build_network(main_addr, slave_addr, &save_dir);
// network execution
println!("==== first run -------------\n");
let handle_slave = spawn(async move {
// NOTA: main starter should be launched FIRST
sleep(Duration::from_millis(100)).await; // So wait a little bit
slave_starter.run().await.unwrap();
});
main_starter.run().await.unwrap();
handle_slave.await.unwrap();
sleep(Duration::from_millis(300)).await;
// network loading from files
println!("\n==== second run (loadind network) -------------\n");
let main_starter = Starter::load("starter=main.yaml", &save_dir).unwrap();
let slave_starter = Starter::load("starter=slave.yaml", &save_dir).unwrap();
// execute the loaded network
let handle_slave = spawn(async move {
sleep(Duration::from_millis(100)).await;
slave_starter.run().await.unwrap();
});
main_starter.run().await.unwrap();
handle_slave.await.unwrap();
}
典型输出
==== first run -------------
127.0.0.1:8181: try to connect 127.0.0.1:8180
127.0.0.1:8181: Listening connection established
cluster 127.0.0.1:8181 has been built
cluster 127.0.0.1:8180 has been built
Welcome John Doe
cluster 127.0.0.1:8181 is ended
cluster 127.0.0.1:8180 is ended
==== second run (loadind network) -------------
127.0.0.1:8181: try to connect 127.0.0.1:8180
127.0.0.1:8181: Listening connection established
cluster 127.0.0.1:8181 has been built
cluster 127.0.0.1:8180 has been built
Welcome John Doe
cluster 127.0.0.1:8180 is ended
cluster 127.0.0.1:8181 is ended
服务定义
服务是通过实现 ServantBuilderParameters
特性和使用宏 #[typetag::serde]
的 ServantBuilder
特性来构建的。该宏 #[typetag::serde]
用于序列化 ServantBuilder
实现者,因此是必要的,可以通过配置文件来描述网络(见下文)。下面,我们将详细查看 LastName
和 Hello
服务员的构建
LastName
对应于服务员的主体类型,包括输入和输出通道以及处理代码Hello
是一个响应查询的仆从,其形式是一个简单的函数。仆从构建阶段是 Rust 实现严格必要的唯一阶段。否则,构建计算网络只需要定义配置文件。
仆从 LastName
所有仆从都必须实现 ServantBuilderParameters
特性和 ServantBuilder
特性。ServantBuilder
的实现为空但必须存在。实现 ServantBuilderParameters
需要定义 max_cycle_time
和 build_process
方法。max_cycle_time
方法指定允许的最大时间来响应来自集群主机的请求。在此时间之后,仆从被视为无效并被终止,因此此功能的重要性不大。
#[derive(Serialize, Deserialize, Clone,)]
struct LastName(String);
#[typetag::serde] impl ServantBuilder for LastName { }
impl ServantBuilderParameters for LastName {
fn max_cycle_time(&self) -> Duration { Duration::from_millis(100) }
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
}
}
相比之下,实现 build_process
方法涉及仆从功能行为的本质方面。
初始化生产者和检索仆从数据
首先,必须使用发送到主机的发送通道初始化一个新的生产者,其次,仆从数据可以被克隆(对于可复制数据,此任务不是必需的)。生产者将是构建所有仆从组件的必要助手。
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
let mut producer = ProcessProducer::new(&send_to_master);
let last_name = self.0.clone();
[...]
}
设置连接仆从的通道
通道是通过宏 produce_read
、QueryHello
和 Shutdown
构建的。这些宏在处理生产者。
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
let recv_channel = "FirstName".to_string();
let receiver = produce_read!(producer,String,recv_channel,None,).unwrap();
let query_channel = "QueryHello".to_string();
let (query_sender,reply_receiver) = produce_query!(producer,String => String,query_channel, None).unwrap();
let emit_death = "Shutdown".to_string();
let death_sender = produce_emit!(producer, WakeSlx, emit_death, None,).unwrap();
[...]
}
在此代码中,我们依次定义了到 FirstName
、QueryHello
和 Shutdown
通道的连接。
- 宏
produce_read
将仆从注册为FirstName
通道的读取者。生成了接收器receiver
来访问通道输出。 - 宏
produce_query
将仆从注册为QueryHello
通道的查询者。生成了发送器query_sender
和接收器reply_receiver
来发送查询和接收回复。 - 宏
produce_emit
将仆从注册为Shutdown
通道的发射者。生成了发送器death_sender
来访问通道输入。
构建仆从进程
仆从按以下顺序执行以下操作
- 接收名字并构建全名
- 请求问候消息并打印问候消息
- 关闭
过程是通过宏定义的
produce_future!(producer, { ... })
接收名字并构建全名
仆从等待来自 FirstName
通道的消息。此消息被存档,可以使用 archive_ref
方法作为引用(零拷贝反序列化)访问。然后使用 format
宏构建全名。
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
produce_future!(producer, {
let arc_first_name = receiver.recv().await.unwrap();
let full_name = format!("{} {last_name}", arc_first_name.archive_ref().unwrap());
[...]
})
}
注意 1:有两个从存档引用的方法,archive_ref
和 arch_deref
。archive_ref
方法引用 rkyv 存档,而 arch_deref
提供了更大的灵活性。然而,arch_deref
的实现较少。
注意 2:archive_mut
和 arch_deref_mut
是 archive_ref
和 arch_deref
的固定可变对应物。
请求问候消息并打印问候消息
仆从通过 arch_sized
方法从全名构建存档,将其作为查询发送,等待回复,并打印此回复。
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
produce_future!(producer, {
[...]
let arc_full_name = full_name.arch_sized().unwrap();
query_sender.send(arc_full_name).await.unwrap();
let reply = reply_receiver.recv().await.unwrap();
println!("{}",reply.archive_ref().unwrap());
[...]
})
}
关闭
服务通过向其他集群的 shutdown
服务发送唤醒消息,并向其主服务发送关闭任务来关闭网络。
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
produce_future!(producer, {
[...]
death_sender.send(WakeSlx.arch_sized().unwrap()).await.unwrap();
let tid = task_id.lock().await.generate();
MsgFromServant::Shutdown(tid).send(&send_to_master).await.unwrap();
})
}
服务 Hello
此服务是一个响应者,因此 build_process
的定义不同。首先,使用发送到主服务的通道初始化一个新的生产者,复制服务数据并定义查询通道名称。
[...]
impl ServantBuilderParameters for Hello {
[...]
fn build_process(&self, _task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
let mut producer = ProcessProducer::new(&send_to_master);
let hello = self.0.clone();
let query_channel = "QueryHello".to_string();
[...]
}
}
然后,通过宏将响应代码注册到生产者。
produce_reply2!([hello], producer, String => String, query_channel, data, { ... })
[hello]
通知宏不可复制的变量hello
将被移动到闭包中。String => String
表示查询的类型为String
,回复的类型也是String
。query_channel
是查询通道的名称。data
是包含查询的变量的名称。
在其过程中,服务从存档 data
中检索全名引用,然后在其前面添加问候信息,最后返回结果的存档。
[...]
impl ServantBuilderParameters for Hello {
[...]
fn build_process(&self, _task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
produce_reply2!([hello], producer, String => String, query_channel, data, {
let full_name: &str = data.archive_ref().unwrap();
let greeting = format!("{hello} {full_name}");
greeting.arch_sized().unwrap()
}).unwrap();
[...]
}
}
最后,从 producer
中恢复进程实例并返回。
[...]
impl ServantBuilderParameters for Hello {
[...]
fn build_process(&self, _task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
producer.named_process()
}
}
网络定义
可以使用 StarterProducer
及其衍生品来构建网络。另一种方法是编辑配置文件,这些文件用于通过反序列化构建网络的集群启动器。这些配置文件可以使用 StarterProducer
自动生成,如示例中的 build_network
方法所示。示例步骤如下:
- 使用主和从集群的特性初始化生产者。每个启动器和每个构建器(每个集群一个)都提供了序列化文件。集群也通过它们的套接字地址进行识别。
let start_prod = StarterProducer::new(main_addr, "starter=main.yaml", "builder=main.yaml", None, 16)
.add_cluster(slave_addr, "starter=slave.yaml", "builder=slave.yaml", None, 16).unwrap().done()
- 将服务添加到集群。服务
last_name
和hello
被添加到主集群,而服务first_name
和shutdown
被添加到从集群。提供了每个服务的名称、序列化文件和值。
let start_prod = start_prod
.add_process(&main_addr, "last_name".to_string(), "servant=last_name.yaml", LastName("Doe".to_string())).unwrap()
.add_process(&main_addr, "hello".to_string(), "servant=hello.yaml", Hello("Welcome".to_string())).unwrap()
.add_process(&slave_addr, "first_name".to_string(),"servant=first_name.yaml", FirstName("John".to_string())).unwrap()
.add_process(&slave_addr, "shutdown".to_string(),"servant=shutdown.yaml", ShutdownBuilder::new("Shutdown".to_string())).unwrap().done();
- 将通道添加到集群,并检索可序列化的启动器。提供了序列化文件、名称以及随后的输出服务器的输入服务器。实际上,通道可以将多个服务器连接到多个服务器。此外,如果通道在集群内,提供了集群地址;如果是两个集群之间的通道,提供了输入集群地址和输出集群地址。通道的性质由使用的方法确定,此处为
add_net_broadcast
和add_query
。
let mut starters = start_prod.add_query(
"channel=QueryHello.yaml", "QueryHello".to_string(), main_addr, ["last_name".to_string()], ["hello".to_string()], max_ping, None
).unwrap().add_net_broadcast(
"channel=FirstName.yaml", "FirstName".to_string(), slave_addr, [format!("first_name"),], main_addr, [format!("last_name"),], max_ping, 16
).unwrap().add_net_broadcast(
"channel=Shutdown.yaml", "Shutdown".to_string(), main_addr, ["last_name".to_string()], slave_addr, ["shutdown".to_string()], max_ping, 16,
).unwrap().done();
- 在这个阶段,启动器是可序列化的,但不能执行。我们可以生成序列化文件,并使用
unload
命令检索集群的可执行启动器。在这个阶段,我们有可序列化的,但不能执行的启动器。我们可以生成序列化文件,并使用unload
命令检索集群的可执行启动器。
let main_starter = starters.remove(&main_addr).unwrap().unload(Some(save_dir)).unwrap();
let slave_starter = starters.remove(&slave_addr).unwrap().unload(Some(save_dir)).unwrap();
(main_starter,slave_starter)
请注意,可以使用 unload
命令不产生任何序列化,通过提供 None
作为序列化目录;也可以使用 unwrap
命令,它可以达到相同的结果。
集群加载和执行
启动器可以使用 load
方法进行加载,这是对一系列更基本命令的快捷方式。执行启动器只需使用 run
方法。
let save_dir = PathBuf::from("./saved");
[...]
let main_starter = Starter::load("starter=main.yaml", &save_dir).unwrap();
main_starter.run().await.unwrap();
来自网络序列化的已保存文件
运行后,项目 saved
目录中从网络序列化生成了11个文件。
│ Cargo.toml
│
├───saved
│ ├───main
│ │ builder=main.yaml
│ │ builder=slave.yaml
│ │ channel=FirstName.yaml
│ │ channel=QueryHello.yaml
│ │ channel=Shutdown.yaml
│ │ servant=first_name.yaml
│ │ servant=hello.yaml
│ │ servant=last_name.yaml
│ │ servant=shutdown.yaml
│ │ starter=main.yaml
│ │
│ └───slave
│ starter=slave.yaml
│
└───src
main.rs
目录 saved/main
包含主启动器的完整定义,而目录 saved/slave
包含从启动器的完整定义。
一个重要的点是,网络的各个方面都由这些可编辑的文件进行参数化。 唯一不能参数化并需要在 Rust 中实现的是服务员的定义,通过实现 ServantBuilder
和 ServantBuilderParameters
特性。
从启动器已保存文件
目录 saved/slave
包含唯一文件,starter=slave.yaml
。
从启动器文件 starter=slave.yaml
!Listener
main: 127.0.0.1:8180
this: 127.0.0.1:8181
文件解释一切
- slave 是一个
!Listener
- 它的套接字地址是
this: 127.0.0.1:8181
- 它等待从主套接字地址
main: 127.0.0.1:8180
发来的所有指令和定义
主启动器已保存文件
目录 saved/main
包含所有其他文件,包括 builder=slave.yaml
。
主启动器文件 starter=main.yaml
!Main
builders:
127.0.0.1:8180: !unloaded
path: builder=main.yaml
127.0.0.1:8181: !unloaded
path: builder=slave.yaml
flow:
FirstName: !unloaded
path: channel=FirstName.yaml
QueryHello: !unloaded
path: channel=QueryHello.yaml
Shutdown: !unloaded
path: channel=Shutdown.yaml
main: 127.0.0.1:8180
该文件包含网络的全部结构
- main 是一个
!Main
- 它的套接字地址是
main: 127.0.0.1:8180
- 它控制两个集群,包括自身,其构建器列在
builders:
之后- 地址为
127.0.0.1:8180
的集群在文件builder=main.yaml
中定义 - 地址为
127.0.0.1:8181
的集群在文件builder=slave.yaml
中定义
- 地址为
- 它包含所有通道的定义,列在
flow:
之后- 通道
FirstName
、QueryHello
和Shutdown
分别在channel=FirstName.yaml
、channel=QueryHello.yaml
和channel=Shutdown.yaml
中定义
- 通道
构建器
构建文件 builder=main.yaml
net_size: null
named_servants:
hello: !unloaded
path: servant=hello.yaml
last_name: !unloaded
path: servant=last_name.yaml
ctrl_ch_capacity: 16
该文件表明主集群包含服务 hello
和 last_name
,分别定义在文件 servant=hello.yaml
和 servant=last_name.yaml
构建文件 builder=slave.yaml
net_size: null
named_servants:
first_name: !unloaded
path: servant=first_name.yaml
shutdown: !unloaded
path: servant=shutdown.yaml
ctrl_ch_capacity: 16
该文件表明从属集群包含服务 first_name
和 shutdown
,分别定义在文件 servant=first_name.yaml
和 servant=shutdown.yaml
服务和通道文件
服务文件通过序列化直接继承自类型定义
服务文件 servant=hello.yaml
servant: Hello
value: Welcome
通道以相同的方式进行序列化,但包含通道类型、集群地址、数据类型哈希码和输入/输出服务列表
通道文件 channel=QueryHello.yaml
!Query
cluster: 127.0.0.1:8180
max_ping:
secs: 0
nanos: 100000000
query_type: 31758449-bc37-9d2d-7a6d-5463554081ac
reply_type: 31758449-bc37-9d2d-7a6d-5463554081ac
size: null
input:
- last_name
output:
- hello
通道文件 channel=FirstName.yaml
!NetBroadcast
max_ping:
secs: 0
nanos: 100000000
data_type: 31758449-bc37-9d2d-7a6d-5463554081ac
size: 16
input:
- 127.0.0.1:8181
- - first_name
output:
- 127.0.0.1:8180
- - last_name
依赖项
~8–15MB
~201K SLoC