1 个不稳定版本
0.1.0 | 2023年10月15日 |
---|
#957 在 异步
34KB
463 行
todc-net
消息传递(HTTP)分布式系统算法。
示例
在下面的示例中,我们创建了一个单例的注册,该注册将以HTTP请求的形式公开读取和写入操作到 /register
。对于这个示例,我们的注册将持有 String
类型的数据。
我们可以使用 hyper
来运行注册的单例,如下所示
use std::net::SocketAddr;
use http_body_util::{BodyExt, Full};
use hyper::{Method, Request, Response};
use hyper::body::{Bytes, Incoming};
use hyper::server::conn::http1;
use hyper::service::{Service, service_fn};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use todc_net::register::AtomicRegister;
// The contents of the register
type Contents = String;
// The main router for our server
async fn router(
register: AtomicRegister<Contents>,
req: Request<Incoming>
) -> Result<Response<Full<Bytes>>, Box<dyn std::error::Error + Send + Sync>> {
match (req.method(), req.uri().path()) {
// Allow the register to be read with GET requests
(&Method::GET, "/register") => {
let value: String = register.read().await.unwrap();
Ok(Response::new(Full::new(Bytes::from(value))))
},
// Allow the register to be written to with POST requests
(&Method::POST, "/register") => {
let body = req.collect().await?.to_bytes();
let value = String::from_utf8(body.to_vec()).unwrap();
register.write(value).await.unwrap();
Ok(Response::new(Full::new(Bytes::new())))
},
// Allow the register to handle all other requests, such as
// internal requests made to /register/local.
_ => register.call(req).await
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create a register for this instance.
let register: AtomicRegister<Contents> = AtomicRegister::default();
// Create a new server with Hyper.
let addr: SocketAddr = ([0, 0, 0, 0], 3000).into();
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
let register = register.clone();
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
// Handle requests by passing them to the router
.serve_connection(io, service_fn(move |req| router(register.clone(), req)))
.await
{
println!("Error serving connection: {:?}", err)
}
});
}
}
与注册交互
尽管这个注册目前还不是容错的,但我们仍然可以尝试它。请参阅可运行的示例 todc-net/examples/atomic-register-hyper
。
通过多个实例增加容错性
为了使我们的注册具有容错性,我们需要添加更多实例。假设我们想要 3
个实例,这样即使一个实例失败,注册仍然可用。
如果我们已经配置了基础设施,对于每个 i
在 [1, 2, 3]
之间,托管实例 i
的服务器将位于 https://my-register-{i}.com
,并且我们已经将 i
暴露为环境变量 INSTANCE_ORDINAL
,那么我们可以如下实例化 AtomicRegister
use std::env;
// Replacement for `let register = AtomicRegister::default();`
let instance_ordinal: u32 = env::var("INSTANCE_ORDINAL").unwrap().parse().unwrap();
let neighbor_urls: Vec<Uri> = (1..4)
.filter(|&i| i != instance_ordinal)
.map(|i| format!("https://my-register-{i}.com").parse().unwrap())
.collect();
let register: AtomicRegister<Contents> = AtomicRegister::new(neighbor_urls);
与容错性注册交互
要与由多个实例支持的容错寄存器交互,请参阅可运行的示例:todc-net/examples/atomic-register-docker-minikube
。
开发
一些测试使用 turmoil 来模拟网络中的延迟和故障。要运行需要此功能的测试,请执行以下操作:
cargo test --features turmoil --test MODULE
依赖项
~5–13MB
~151K SLoC