5 个稳定版本
1.1.0 | 2020年3月25日 |
---|---|
1.0.6 | 2020年3月25日 |
1.0.2 | 2020年1月9日 |
1.0.1 | 2019年12月20日 |
1369 在 异步 中
每月下载量 26 次
51KB
877 行
async-mq
lapin async-await 客户端crate的零成本抽象
模块
- client:
Client
和Connection
结构体 - consume:
Consumer
和ConsumerBuilder
结构体 - produce:
Producer
和ProducerBuilder
结构体 - message:
Message
结构体,MessagePeek
和MessageProcess
异步特质
示例
目前,mqctl.rs 通过 Rust 1.39 的 async-await 功能演示了 RabbitMQ RPC 模式。它使用 FlatBuffers 进行消息编码/解码。
以下为 tokio
的 线程调度器 示例,如在 Makefile 中所示。
fn tokio_threaded(cfg: crate::cfg::Config) -> Result<(), Box<dyn std::error::Error>> {
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_time()
.build()?;
let client = Client::new();
rt.block_on(async move {
// One connection for multiple producers.
let conn = client.connect(&cfg.uri).await?;
let mut builder = conn.producer_builder();
builder.exchange(&cfg.exchange).queue(&cfg.queue);
for _ in 0..cfg.producers {
let builder = builder.clone();
tokio::spawn(async move {
match builder.build().await {
Err(e) => eprintln!("{}", e),
Ok(p) => {
let mut p = ASCIIGenerator(p);
if let Err(err) = p.run().await {
eprintln!("{}", err);
}
}
}
});
}
// One connection for multiple consumers.
let conn = client.connect(&cfg.uri).await?;
let mut builder = conn.consumer_builder();
builder.exchange(&cfg.exchange).queue(&cfg.queue);
for _ in 0..cfg.consumers {
let builder = builder.clone();
tokio::spawn(async move {
match builder.build().await {
Err(err) => eprintln!("{}", err),
Ok(c) => {
let mut c = EchoConsumer(c);
if let Err(err) = c.run().await {
eprintln!("{}", err);
}
}
}
});
}
// idle loop.
loop {
tokio::time::delay_for(std::time::Duration::from_millis(1000)).await;
}
})
}
以下是示例 ASCIIGererator
生产者
struct ASCIIGenerator(Producer);
impl ASCIIGenerator {
async fn run(&mut self) -> Result<(), Error> {
let mut builder = FlatBufferBuilder::new();
loop {
// Generate ASCII character FlatBuffer messages
// and print the received message to stderr.
for data in { b'!'..=b'~' } {
let req = Self::make_buf(&mut builder, vec![data]);
let resp = self.0.rpc(req).await?;
Self::print_buf(resp);
}
}
}
fn make_buf(builder: &mut FlatBufferBuilder, data: Vec<u8>) -> Vec<u8> {
let data = builder.create_string(&String::from_utf8(data).unwrap());
let mut mb = crate::msg::MessageBuilder::new(builder);
mb.add_msg(data);
let msg = mb.finish();
builder.finish(msg, None);
let req = builder.finished_data().to_vec();
builder.reset();
req
}
fn print_buf(resp: Vec<u8>) {
if resp.is_empty() {
return;
}
let msg = crate::msg::get_root_as_message(&resp);
if let Some(data) = msg.msg() {
eprint!("{}", data);
}
}
}
以下是示例 EchoConsumer
消费者
struct EchoConsumer(Consumer);
impl EchoConsumer {
async fn run(&mut self) -> Result<(), Error> {
while let Some(msg) = self.0.next().await {
match msg {
// Echo back the message.
Ok(req) => self.0.response(&req, req.data()).await?,
Err(err) => return Err(err),
}
}
Ok(())
}
}
执行
以下是 make run 的输出,它是 cargo run --example
的别名,如在 Makefile 中所示。它将可打印的 ASCII 字符输出到 stderr。
$ make run
Compiling async-mq v0.3.0 (/home/kei/git/async-mq)
Finished dev [unoptimized + debuginfo] target(s) in 1.63s
Running `target/debug/examples/async-mq`
!!!!!!!!!!!"!!"!!!!!!!!!"!!""!!!!!!!"!""""#"""#""""#""#"""""#""#"#"##""#"$##"$#######$######$$###$##$$$$$%$$%#$#$$$$$$%$$$
$$$%%$$$%$%%%$%%&%&%%$%%%%%%%&%%%%&%&%&%&&%&'&%&%%&&&&&&&&''&'&&&&&'&&'''(&&&'''('''''('&'''''(''(&'(''('()'()('(()((()(((
()'()()(()((()*'((*()*())))*)**))))))))(+)*))*))(+*)*+*)*+*+*****+*,****+)**+*+**+,)*+,*,+++,+,++++-,+++*++++,+*-,,++,-,,-
,+,,-,,-,,-,,+,-,,,-,..+--.,---.------,.,,-..---,.--/.-//..-......-.//.--./...-//...0.///0////././/00////100.//0/.00//0.01
/010/00020110/0000100/101121100/100121112111120131112012223122221122231223022323343131322332223423534244332442344313234333
3343465444535445543345434444555425475556665455466555666447554568365766556766755676766579787876767458875677666667768788789:
978788987577898677887688798899:99;989:8:869988899:99998:98:77;:::::;7:9;9<99;9;::9:;;;9;:<::;8:8:9=:;;<;8::<<::;<:;:=<9:<9
9<<;<=;;><<;<;;;;=;=:<;=;>=;<;==:<<?=<=<><>><<<=<=:=;=><><<@=<==>?>;?===>>;?<?===>===?>?>>A=>?<?=?=?@>>><@>>@@?B=?>>?@>>>@
?@>?>>@>@??=@AA?CAA??@?@A?AA?@?@@A??@??A>?B@>AB@D@B@@BA@BABBA@B@C@AAEAA@CAB@C?@A@A?BAACBCCAACBBADCBDABBBAEB@BAFBDCDBCDDBBC
BDCADB@CCEBCCBCCFGEEAACDDCBCFEECCBEDDCDDDCDECCDFHGDBFGDDFEFBEFEDIHDDEDDCEGEGDEEECEFCDHEEGFCFIHFFDEEDEEFEFFHJEEFFDFGFGIFHGD
GGJIGEFFEHGFIHEHJHFGGGFKGIHFGFGJGKGEHFGGFGIJIKHIJIHHHFIGHHGKGHIHGFHHLGLHGHJKJIJJILGIHJHIKGLJMJHIKHIIKILHIHIKIIMKJMHJKNJHKJ
LLJKMIKJIIJJKILNNIMLLJJJIILLKJMIKKKJMMOKKLMKMJNLJKMNJOJOLJMLKNKJLLNKLNKLONPNOKPKLMLKPNLMMMNMKLOKLMOMLLOOMPLMPLLNMONNNMQLMP
MNPLMNQOOQPNMPNOMNMMNPQQOOOMMPRNPQROONNNQONOQPRONNQONPPRNPRQNSQPORQPOPOSORSQQSQPORPQOOSOPORORRTQPSRSTPSQQURTPQPPRTRPPTQRPQ
SRPSTQSSPUTSRQVRTSSUTQQQRQTQTTUUUTRSQQRSSVURQWRUSSTRTVUVXUVTRRRURSRTSTVVRRVWUUSSTTUSVWTSSVWSUTVSWVTXUTVVUSWSWWUTUSYWUXUTWT
UWVXWTUTVWXYTVXUXXUWUXTVVYVTVZXUVWXXUYWVUXYZYZUWVXYYYW[UWWWVVUYYVXWYVZVZWZX[YV[XXWX\VYZWXWZ[YVWXXWZZWZZ[ZW[Y\YXY\X[XXZ\X]W
XZY[[WZ\Y]X[\[Y[[YXZ]YYY\Z][Y^X[YZY]Z\X]\Z[ZZ^\_[\YY^Z\Z[\]^\Z[YZZ\[^]Y\][^[]Z[Z_]`\_\[[Z]]_\][[Z\^_]^][\^\[[^\`_a^^[]^\\]
^^_]\[\^]\`\`]__]`\_]ba`_\_^`_]_`c_^a\ba]]a^`^`a]_]^]^]`^]`a``^_dc^__^bab`_^a`]ab^^aaa^___^bab_`ed__``cbab_`^cb_`b`e_ab_cb
bfc`a`_c``aadcb`cd_c`a`cdcagacdeafaabecdba`c``bddabdabedhbabdedadedbabcfbcbacgeeecbffbceiebdchdbeegeccfgfcffcccfbdgdfbfdjc
cdefefcichhdeddedegfddggfdgckddghefggjggdieeeifledeehehgehhghehehfhgfiffkfhjfjhiifimiggiikffgfffiigjfhkiiegjhglgjgjnhjgggj
jgkjmkjijlkhhkhlghghihjhofkihhhkhkkkmilniglijlpjlliikhilhikiimjiillmlhmommjmijjjjqknjnljmmjkjjimnjkmlinnnnpmkknnkrkjklomkn
ookojkoknkoklljoqnllllskloplpokollpkpplmpolmnplrmommmplmlmmqplqqmqmpnstomqpnqmmqnnpnnqmnmnqnmrnnonrrotonrnrurqqoronpoqonro
onrsoooppsoussspptpsrppvooppsoprporssqqpqtpvtqsttqttptpwqptqqpqqsrurqqqsruqwuuurqrvurrttuuvsurrrsvqqxrsvtxrrqrvswrsvssuvuv
sswwwrtsyvsxswvrrttwtytsussttvwwtxtxuuswuzwxsuuvxywutxtx
^C
您可以使用 rabbitmqctl list_queues
检查队列状态,如下所示
$ sudo rabbitmqctl list_queues --vhost your_vhost
Timeout: 60.0 seconds ...
Listing queues for vhost mx ...
name messages
amq.gen-3CNgzxmjJGoTIjAcy2zhHQ 0
amq.gen-6kHFOKiqnJIltqeH-1I4WQ 0
amq.gen-kWjXOMz0MDX9Rwo6F8sPCA 0
amq.gen-tFNZuCMpdn6so9WnNLAS4w 1
amq.gen-ScKfpco30LHj1feWIIsFXg 1
amq.gen-7uGgXCiiExxrAXZEcrbypg 0
amq.gen-vkFA83xnTHHhR6c_zUG34A 0
amq.gen-48HpeKkKhmQLAa4Q1beQvg 0
amq.gen-FAYzlhiy9liKuMEUVUm2Uw 0
amq.gen-de6sYdZX8cT8yYkb_Y-mPw 0
hello 23
amq.gen-gu_TDgPgWcestqlWNtFSoA 0
amq.gen-advkCsBJKod22vexRSBV6A 0
amq.gen-T6jLPqqOKsL9jk04CFJwtA 0
amq.gen--wRWW5hI-rpdds9goMAYdg 1
amq.gen-f9EYMuzwQoCAhNyIg6SiUQ 0
amq.gen-DxmcpUxHYGOxCD9Q7QM1xg 1
amq.gen-EeeJAmHFOT3GPIMhfXDi8Q 0
amq.gen-zhqOTQag0rM7MDU17pCdXA 0
amq.gen-vyX6_lAv2Pnmcm1a_tBUKA 0
amq.gen-enXP4BCXZhB0tg4C1an4sw 0
amq.gen-nIpKSKokUzU_pCoGTiSBCQ 0
amq.gen-k1p4udDFoA0xhqSXIpPo-Q 0
amq.gen-XrNQZ0cqHgSgUZK_CP0g6w 0
amq.gen-9WZJ4Jw02Dbhhl7sJIQKAQ 0
amq.gen-1TMw_E8g09Xt9UgCoMz-ig 0
amq.gen-rVUkIh-85ims0IiabvF7GA 1
amq.gen-9NeGc4C9qmfX-PLjHkXVDA 0
amq.gen-swJrMKZmNnLhtI0Djz--ag 0
amq.gen-5rEAFqYpG4cp8lBEDG6_gQ 1
amq.gen-ZDNy2Ggt4Dqbvj6cnS-c8A 0
amq.gen-D8id7SF143eN-k7tmHratw 1
amq.gen-AiKQaNiz73V9du8EtgKfMg 0
参考
- 异步编程指南:Rust 中的异步编程
- 《风格指南》:Rust 风格指南
- RabbitMQ:最广泛部署的开源消息代理
- Crate lapin:基于 AMQP 0.9.1 规范的 RabbitMQ Crate
- Crate futures 0.3:异步编程的抽象
- crate async-std 1.0
- crate tokio 0.2
- 原始 futures 设计:由 Aaron Turon 提出的原始 futures 设计
快乐编程!
依赖关系
~5–7.5MB
~167K SLoC