4 个版本
0.1.2 | 2024年3月4日 |
---|---|
0.1.1 | 2024年2月26日 |
0.1.0 | 2024年2月20日 |
0.0.0 | 2023年11月13日 |
1475 在 开发工具 中
每月 下载 30 次
在 2 个crate中使用(通过 silx-types)
355KB
4K SLoC
这是 Silx 项目的一部分
silx-core
包含实现 silx 应用程序的核心组件
注意
关于版本 0.1.2
- 实现新的 HashedTypeDef 功能
- 依赖项已更新
目的
Silx 旨在使
- 在一台或多台机器上构建作为异步服务网络的应用程序
- 构建这些服务而不必担心服务之间交换通道的详细实现
- 使用简单的参数化交换通道连接这些服务
- 借助类型哈希码控制通道数据类型的相干性
- 在交换通道上实现具有零拷贝反序列化的序列化(rkyv)
- 将应用程序的整个网络定义序列化为可编辑的文本格式,然后重新加载并执行它
Silx 仍然是一个处于开发中的项目。
以下是一个简化的示例概述。项目github上也有其他示例。
最小化示例(你好)
Cargo.toml
[package]
name = "silx_hello"
version = "0.1.2"
edition = "2021"
[dependencies]
tokio = "^1.36.0"
serde = "^1.0.197"
typetag = "^0.2.16"
silx-core = "0.1.2"
silx-types = "0.1.2"
main.rs
use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, time::Duration };
use serde::{Deserialize, Serialize};
use tokio::{spawn, time::sleep};
use silx_core::{
id_tools::IdBuilder, servants::shutdown::ShutdownBuilder,
utils::{
produce_emit, produce_future, produce_query, produce_read, produce_reply2,
Filable, MsgFromServant, ProcessInstance, ProcessProducer, SendToMaster,
ServantBuilder, ServantBuilderParameters, Starter, StarterProducer
},
};
use silx_types::{ ArchSized, WakeSlx };
// ///////////////////////////
// Servants implementations
/// Servant replying greetings by completing queryied full name with Hello
#[derive(Serialize, Deserialize, Clone,)]
struct Hello(String);
#[typetag::serde] impl ServantBuilder for Hello { }
impl ServantBuilderParameters for Hello {
fn max_cycle_time(&self) -> Duration { Duration::from_millis(100) }
fn build_process(&self, _task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
let mut producer = ProcessProducer::new(&send_to_master);
let hello = self.0.clone();
let query_channel = "QueryHello".to_string();
// build reply process
produce_reply2!([hello], producer, String => String, query_channel, data, {
// get full name
let full_name: &str = data.archive_ref().unwrap();
// build an return greeting
let greeting = format!("{hello} {full_name}");
greeting.arch_sized().unwrap()
}).unwrap();
producer.named_process()
}
}
/// Servant sending first name
#[derive(Serialize, Deserialize, Clone,)]
struct FirstName(String);
#[typetag::serde] impl ServantBuilder for FirstName { }
impl ServantBuilderParameters for FirstName {
fn max_cycle_time(&self) -> Duration { Duration::from_millis(100) }
fn build_process(&self, _task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
let mut producer = ProcessProducer::new(&send_to_master);
let first_name = self.0.clone();
// build channels
let emit_channel = "FirstName".to_string();
let sender = produce_emit!(producer, String, emit_channel, None,).unwrap();
// build process
produce_future!(producer, {
sleep(Duration::from_millis(100)).await; // Wait a little bit for receiver to be ready
sender.send(first_name.arch_sized().unwrap()).await.unwrap();
})
}
}
/// Servant doing:
/// * receive first name
/// * build full name
/// * query for greeting
/// * print greeting
/// * shutdown
#[derive(Serialize, Deserialize, Clone,)]
struct LastName(String);
#[typetag::serde] impl ServantBuilder for LastName { }
impl ServantBuilderParameters for LastName {
fn max_cycle_time(&self) -> Duration { Duration::from_millis(100) }
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
let mut producer = ProcessProducer::new(&send_to_master);
let last_name = self.0.clone();
// build channels
let recv_channel = "FirstName".to_string();
let receiver = produce_read!(producer,String,recv_channel,None,).unwrap();
let query_channel = "QueryHello".to_string();
let (query_sender,reply_receiver) = produce_query!(producer,String => String,query_channel, None).unwrap();
let emit_death = "Shutdown".to_string();
let death_sender = produce_emit!(producer, WakeSlx, emit_death, None,).unwrap();
// build process
produce_future!(producer, {
// receive first name
let arc_first_name = receiver.recv().await.unwrap();
// build full name
let full_name = format!("{} {last_name}", arc_first_name.archive_ref().unwrap());
// query for greeting
let arc_full_name = full_name.arch_sized().unwrap();
query_sender.send(arc_full_name).await.unwrap();
let reply = reply_receiver.recv().await.unwrap();
// print greeting
println!("{}",reply.archive_ref().unwrap());
// shutdown
death_sender.send(WakeSlx.arch_sized().unwrap()).await.unwrap();
let tid = task_id.lock().await.generate();
MsgFromServant::Shutdown(tid).send(&send_to_master).await.unwrap();
})
}
}
// ///////////////////////////
// Network implementation
/// Given main and slave socket addresses, build main and slave starters
/// * main cluster implements servants `last_name` and `hello`
/// * slave cluster implements servants `first_name` and `shutdown` (which will shutdown the slave)
/// * actions of `last_name`:
/// * receive first name from `first_name`
/// * build full name and query greeting from `hello`
/// * print greeting
/// * send shutdown signal to `shutdown` and shutdown main cluster
/// * `main_addr: SocketAddr` : main socket address
/// * `slave_addr: SocketAddr` : slave socket address
/// * `save_dir: &PathBuf` : directory where to save the network
/// * Output: main and slave starters
pub fn build_network (main_addr: SocketAddr, slave_addr: SocketAddr, save_dir: &PathBuf) -> (Starter,Starter) {
let max_ping = Duration::from_millis(100);
// set two clusters within the network
let start_prod = StarterProducer::new(
main_addr, "starter=main.yaml", "builder=main.yaml", None, 16
).add_cluster(
slave_addr, "starter=slave.yaml", "builder=slave.yaml", None, 16
).unwrap().done();
// add named servants
let start_prod = start_prod.add_process(
&main_addr, "last_name".to_string(), "servant=last_name.yaml", LastName("Doe".to_string())
).unwrap().add_process(
&main_addr, "hello".to_string(), "servant=hello.yaml", Hello("Welcome".to_string())
).unwrap().add_process(
&slave_addr, "first_name".to_string(),"servant=first_name.yaml", FirstName("John".to_string())
).unwrap().add_process(
&slave_addr, "shutdown".to_string(),"servant=shutdown.yaml", ShutdownBuilder::new("Shutdown".to_string())
).unwrap().done();
// add channels connecting the servants and produce the starter for each cluster
let mut starters = start_prod.add_query(
"channel=QueryHello.yaml", "QueryHello".to_string(), main_addr, ["last_name".to_string()], ["hello".to_string()], max_ping, None
).unwrap().add_net_broadcast(
"channel=FirstName.yaml", "FirstName".to_string(), slave_addr, [format!("first_name"),], main_addr, [format!("last_name"),], max_ping, 16
).unwrap().add_net_broadcast(
"channel=Shutdown.yaml", "Shutdown".to_string(), main_addr, ["last_name".to_string()], slave_addr, ["shutdown".to_string()], max_ping, 16,
).unwrap().done();
// save, get and return starters of the clusters
let main_starter = starters.remove(&main_addr).unwrap().unload(Some(save_dir)).unwrap();
let slave_starter = starters.remove(&slave_addr).unwrap().unload(Some(save_dir)).unwrap();
(main_starter,slave_starter)
}
// //////////////////////////
// Run the network
/// Main performs:
/// * build network and save it in files
/// * network execution
/// * network loading from files
/// * execute the loaded network
#[tokio::main]
pub async fn main() {
// build network and save it in files
let main_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8180);
let slave_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8181);
let save_dir = PathBuf::from("./saved");
let (main_starter,slave_starter) = build_network(main_addr, slave_addr, &save_dir);
// network execution
println!("==== first run -------------\n");
let handle_slave = spawn(async move {
// NOTA: main starter should be launched FIRST
sleep(Duration::from_millis(100)).await; // So wait a little bit
slave_starter.run().await.unwrap();
});
main_starter.run().await.unwrap();
handle_slave.await.unwrap();
sleep(Duration::from_millis(300)).await;
// network loading from files
println!("\n==== second run (loadind network) -------------\n");
let main_starter = Starter::load("starter=main.yaml", &save_dir).unwrap();
let slave_starter = Starter::load("starter=slave.yaml", &save_dir).unwrap();
// execute the loaded network
let handle_slave = spawn(async move {
sleep(Duration::from_millis(100)).await;
slave_starter.run().await.unwrap();
});
main_starter.run().await.unwrap();
handle_slave.await.unwrap();
}
典型输出
==== first run -------------
127.0.0.1:8181: try to connect 127.0.0.1:8180
127.0.0.1:8181: Listening connection established
cluster 127.0.0.1:8181 has been built
cluster 127.0.0.1:8180 has been built
Welcome John Doe
cluster 127.0.0.1:8181 is ended
cluster 127.0.0.1:8180 is ended
==== second run (loadind network) -------------
127.0.0.1:8181: try to connect 127.0.0.1:8180
127.0.0.1:8181: Listening connection established
cluster 127.0.0.1:8181 has been built
cluster 127.0.0.1:8180 has been built
Welcome John Doe
cluster 127.0.0.1:8180 is ended
cluster 127.0.0.1:8181 is ended
服务定义
服务是通过使用宏 #[typetag::serde]
实现 ServantBuilderParameters
特性和 ServantBuilder
特性来构建的。该宏 #[typetag::serde]
用于序列化 ServantBuilder
实现,因此对于通过配置文件(见下文)描述网络是必要的。下面,我们将详细查看构建 LastName
和 Hello
服务员的步骤
LastName
对应于服务的主要类型,包括传入和传出通道以及处理代码Hello
是一个响应查询的仆从,其形式为一个简单的函数。仆从构建是唯一需要严格使用 Rust 实现的阶段。否则,构建计算网络所需的所有内容只是配置文件的定义。
仆从 LastName
所有仆从都必须实现 ServantBuilderParameters
特性和 ServantBuilder
特性。ServantBuilder
的实现是空的但必需的。实现 ServantBuilderParameters
需要定义 max_cycle_time
和 build_process
方法。max_cycle_time
方法指定了允许从集群主节点响应请求的最大时间。在此时间之后,仆从被认为无效并被终止,因此此功能的重要性很小。
#[derive(Serialize, Deserialize, Clone,)]
struct LastName(String);
#[typetag::serde] impl ServantBuilder for LastName { }
impl ServantBuilderParameters for LastName {
fn max_cycle_time(&self) -> Duration { Duration::from_millis(100) }
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
}
}
相比之下,build_process
方法的实现涉及仆从功能行为的本质方面。
初始化生产者和检索仆从数据
首先,必须使用发送给主节点的发送通道初始化一个新的生产者,其次,仆从数据可以被克隆(对于可复制数据,此任务不是必需的)。生产者将是构建所有仆从组件的基本助手。
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
let mut producer = ProcessProducer::new(&send_to_master);
let last_name = self.0.clone();
[...]
}
设置连接仆从的通道
通道是通过宏 produce_read
、QueryHello
和 Shutdown
构建的。这些宏在处理生产者。
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
let recv_channel = "FirstName".to_string();
let receiver = produce_read!(producer,String,recv_channel,None,).unwrap();
let query_channel = "QueryHello".to_string();
let (query_sender,reply_receiver) = produce_query!(producer,String => String,query_channel, None).unwrap();
let emit_death = "Shutdown".to_string();
let death_sender = produce_emit!(producer, WakeSlx, emit_death, None,).unwrap();
[...]
}
在这段代码中,我们依次定义了到 FirstName
、QueryHello
和 Shutdown
通道的连接。
- 宏
produce_read
将仆从注册为FirstName
通道的读取者。生成接收器receiver
以访问通道输出。 - 宏
produce_query
将仆从注册为QueryHello
通道的查询者。生成发送器query_sender
和接收器reply_receiver
以发送查询和接收回复。 - 宏
produce_emit
将仆从注册为Shutdown
通道的发射者。生成发送器death_sender
以访问通道输入。
构建仆从进程
仆从依次执行以下操作
- 接收名字并构建全名
- 请求问候消息并打印问候消息
- 关闭
进程通过宏定义
produce_future!(producer, { ... })
接收名字并构建全名
仆从等待来自 FirstName
通道的消息。此消息被存档,可以使用 archive_ref
方法作为引用(零拷贝反序列化)访问。然后使用 format
宏构建全名。
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
produce_future!(producer, {
let arc_first_name = receiver.recv().await.unwrap();
let full_name = format!("{} {last_name}", arc_first_name.archive_ref().unwrap());
[...]
})
}
注意 1:有两种从存档引用的方法,即 archive_ref
和 arch_deref
。archive_ref
方法引用 rkyv 存档,而 arch_deref
提供了更大的灵活性。然而,arch_deref
的实现较少。
注意 2:archive_mut
和 arch_deref_mut
是 archive_ref
和 arch_deref
的固定可变对应物。
请求问候消息并打印问候消息
仆从通过方法 arch_sized
从全名构建存档,将其作为查询发送,等待回复,并打印此回复。
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
produce_future!(producer, {
[...]
let arc_full_name = full_name.arch_sized().unwrap();
query_sender.send(arc_full_name).await.unwrap();
let reply = reply_receiver.recv().await.unwrap();
println!("{}",reply.archive_ref().unwrap());
[...]
})
}
关闭
仆从通过向其他集群的 shutdown
仆从发送唤醒消息并将关闭任务发送给其主节点来关闭网络。
fn build_process(&self, task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
produce_future!(producer, {
[...]
death_sender.send(WakeSlx.arch_sized().unwrap()).await.unwrap();
let tid = task_id.lock().await.generate();
MsgFromServant::Shutdown(tid).send(&send_to_master).await.unwrap();
})
}
仆从 Hello
这个仆人是响应者,因此build_process
的定义不同。首先,使用发送通道初始化一个新的生产者,复制仆人数据,并定义查询通道名称
[...]
impl ServantBuilderParameters for Hello {
[...]
fn build_process(&self, _task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
let mut producer = ProcessProducer::new(&send_to_master);
let hello = self.0.clone();
let query_channel = "QueryHello".to_string();
[...]
}
}
然后,通过宏将响应代码注册到生产者中
produce_reply2!([hello], producer, String => String, query_channel, data, { ... })
[hello]
通知宏非可复制的变量hello
将被移动到闭包中String => String
表示查询的类型是String
,响应的类型也是String
query_channel
是查询通道的名称data
是包含查询的变量的名称
在其过程中,仆人从存档data
检索完整的名称引用,然后在前面加上问候语,最后返回结果的存档
[...]
impl ServantBuilderParameters for Hello {
[...]
fn build_process(&self, _task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
produce_reply2!([hello], producer, String => String, query_channel, data, {
let full_name: &str = data.archive_ref().unwrap();
let greeting = format!("{hello} {full_name}");
greeting.arch_sized().unwrap()
}).unwrap();
[...]
}
}
最后,从producer
恢复进程实例并返回
[...]
impl ServantBuilderParameters for Hello {
[...]
fn build_process(&self, _task_id: IdBuilder, send_to_master: SendToMaster,) -> ProcessInstance {
[...]
producer.named_process()
}
}
网络定义
可以使用StarterProducer
及其衍生品来构建网络。另一种方法是编辑配置文件,这些文件用于通过反序列化构建网络的集群启动器。这些配置文件可以使用示例中的build_network
方法中的StarterProducer
自动生成。示例按以下步骤进行
- 初始化具有主和从集群特征的生产者。结果是,为每个启动器和每个构建器(每个集群一个)提供了序列化文件。集群也通过它们的套接字地址进行标识
let start_prod = StarterProducer::new(main_addr, "starter=main.yaml", "builder=main.yaml", None, 16)
.add_cluster(slave_addr, "starter=slave.yaml", "builder=slave.yaml", None, 16).unwrap().done()
- 将仆人添加到集群中。仆人
last_name
和hello
添加到主集群,而仆人first_name
和shutdown
添加到从集群。提供了每个仆人的名称、序列化文件和值
let start_prod = start_prod
.add_process(&main_addr, "last_name".to_string(), "servant=last_name.yaml", LastName("Doe".to_string())).unwrap()
.add_process(&main_addr, "hello".to_string(), "servant=hello.yaml", Hello("Welcome".to_string())).unwrap()
.add_process(&slave_addr, "first_name".to_string(),"servant=first_name.yaml", FirstName("John".to_string())).unwrap()
.add_process(&slave_addr, "shutdown".to_string(),"servant=shutdown.yaml", ShutdownBuilder::new("Shutdown".to_string())).unwrap().done();
- 将通道添加到集群中,并检索可序列化的启动器。提供了序列化文件、名称和随后的输出仆人的输入仆人。实际上,通道可以将几个仆人与几个仆人连接起来。此外,在集群内通道的情况下,提供了集群地址;在两个集群之间的通道的情况下,提供了输入集群地址和输出集群地址。通道的性质由使用的方法确定,这里使用
add_net_broadcast
和add_query
let mut starters = start_prod.add_query(
"channel=QueryHello.yaml", "QueryHello".to_string(), main_addr, ["last_name".to_string()], ["hello".to_string()], max_ping, None
).unwrap().add_net_broadcast(
"channel=FirstName.yaml", "FirstName".to_string(), slave_addr, [format!("first_name"),], main_addr, [format!("last_name"),], max_ping, 16
).unwrap().add_net_broadcast(
"channel=Shutdown.yaml", "Shutdown".to_string(), main_addr, ["last_name".to_string()], slave_addr, ["shutdown".to_string()], max_ping, 16,
).unwrap().done();
- 在此阶段,启动器是可序列化的,但不可执行。我们可以使用
unload
命令生成序列化文件,并检索集群的可执行启动器。在此阶段,我们具有可序列化但不可执行的启动器。我们可以使用unload
命令生成序列化文件并检索集群的可执行启动器
let main_starter = starters.remove(&main_addr).unwrap().unload(Some(save_dir)).unwrap();
let slave_starter = starters.remove(&slave_addr).unwrap().unload(Some(save_dir)).unwrap();
(main_starter,slave_starter)
请注意,可以使用unload
命令而不产生任何序列化,通过提供None
作为序列化目录;您还可以使用unwrap
命令,它可以达到相同的结果。
集群加载和执行
可以使用load
方法加载启动器,这是更基本命令序列的快捷方式。执行启动器只需使用run
方法。
let save_dir = PathBuf::from("./saved");
[...]
let main_starter = Starter::load("starter=main.yaml", &save_dir).unwrap();
main_starter.run().await.unwrap();
网络序列化保存的文件
运行后,在项目目录 saved
中从网络序列化生成了 11 个文件。
│ Cargo.toml
│
├───saved
│ ├───main
│ │ builder=main.yaml
│ │ builder=slave.yaml
│ │ channel=FirstName.yaml
│ │ channel=QueryHello.yaml
│ │ channel=Shutdown.yaml
│ │ servant=first_name.yaml
│ │ servant=hello.yaml
│ │ servant=last_name.yaml
│ │ servant=shutdown.yaml
│ │ starter=main.yaml
│ │
│ └───slave
│ starter=slave.yaml
│
└───src
main.rs
目录 saved/main
包含了主启动器的完整定义,而目录 saved/slave
包含了从启动器的完整定义。
一个重要的点是,所有网络架构的方面都通过这些可编辑的文件进行参数化。 唯一不能参数化且需要在 Rust 中实现的是仆人的定义,通过实现 ServantBuilder
和 ServantBuilderParameters
特性。
从启动器保存文件
目录 saved/slave
包含唯一的文件,starter=slave.yaml
。
从启动器文件 starter=slave.yaml
!Listener
main: 127.0.0.1:8180
this: 127.0.0.1:8181
该文件解释了一切
- slave 是一个
!Listener
- 它的套接字地址是
this: 127.0.0.1:8181
- 它等待从主套接字地址
main: 127.0.0.1:8180
发来的所有指令和定义
主启动器保存文件
目录 saved/main
包含所有其他文件,包括 builder=slave.yaml
。
主启动器文件 starter=main.yaml
!Main
builders:
127.0.0.1:8180: !unloaded
path: builder=main.yaml
127.0.0.1:8181: !unloaded
path: builder=slave.yaml
flow:
FirstName: !unloaded
path: channel=FirstName.yaml
QueryHello: !unloaded
path: channel=QueryHello.yaml
Shutdown: !unloaded
path: channel=Shutdown.yaml
main: 127.0.0.1:8180
该文件包含网络的所有结构
- main 是一个
!Main
- 它的套接字地址是
main: 127.0.0.1:8180
- 它控制两个集群,包括自身,其构建器列在
builders:
之后- 地址为
127.0.0.1:8180
的集群在文件builder=main.yaml
中定义 - 地址为
127.0.0.1:8181
的集群在文件builder=slave.yaml
中定义
- 地址为
- 它包含所有通道的定义,列在
flow:
之后- 通道
FirstName
、QueryHello
和Shutdown
分别在channel=FirstName.yaml
、channel=QueryHello.yaml
和channel=Shutdown.yaml
中定义
- 通道
构建器
构建器文件 builder=main.yaml
net_size: null
named_servants:
hello: !unloaded
path: servant=hello.yaml
last_name: !unloaded
path: servant=last_name.yaml
ctrl_ch_capacity: 16
本文件通知主集群包含以下从属服务:hello
和last_name
,它们分别定义在文件servant=hello.yaml
和servant=last_name.yaml
构建文件builder=slave.yaml
net_size: null
named_servants:
first_name: !unloaded
path: servant=first_name.yaml
shutdown: !unloaded
path: servant=shutdown.yaml
ctrl_ch_capacity: 16
本文件通知从属集群包含以下从属服务:first_name
和shutdown
,它们分别定义在文件servant=first_name.yaml
和servant=shutdown.yaml
从属服务与通道文件
从属服务文件通过序列化直接继承自类型定义
从属服务文件servant=hello.yaml
servant: Hello
value: Welcome
通道以相同的方式进行序列化,但包含通道类型、集群地址、数据类型哈希码以及输入/输出从属服务列表
通道文件channel=QueryHello.yaml
!Query
cluster: 127.0.0.1:8180
max_ping:
secs: 0
nanos: 100000000
query_type: 31758449-bc37-9d2d-7a6d-5463554081ac
reply_type: 31758449-bc37-9d2d-7a6d-5463554081ac
size: null
input:
- last_name
output:
- hello
通道文件channel=FirstName.yaml
!NetBroadcast
max_ping:
secs: 0
nanos: 100000000
data_type: 31758449-bc37-9d2d-7a6d-5463554081ac
size: 16
input:
- 127.0.0.1:8181
- - first_name
output:
- 127.0.0.1:8180
- - last_name
依赖项
~8–15MB
~194K SLoC