84个版本 (32个重大变更)
0.33.0 | 2020年3月12日 |
---|---|
0.32.5 | 2020年3月9日 |
0.28.4 | 2019年12月3日 |
0.28.3 | 2019年11月21日 |
0.6.0 | 2017年3月30日 |
#1991 in 数据库接口
每月151次下载
用于 13 个crates(直接使用7个)
350KB
9K SLoC
已弃用 - 使用lapin代替
lib.rs
:
lapin-futures
此库在lapin库上提供了基于futures-0.1的API。它利用了futures-0.1库,因此您可以与tokio、futures-cpupool或其他任何executor一起使用。
发布消息
use futures::future::Future;
use lapin_futures as lapin;
use crate::lapin::{BasicProperties, Client, ConnectionProperties};
use crate::lapin::options::{BasicPublishOptions, QueueDeclareOptions};
use crate::lapin::types::FieldTable;
use log::info;
fn main() {
env_logger::init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
futures::executor::spawn(
Client::connect(&addr, ConnectionProperties::default()).and_then(|client| {
// create_channel returns a future that is resolved
// once the channel is successfully created
client.create_channel()
}).and_then(|channel| {
let id = channel.id();
info!("created channel with id: {}", id);
// we using a "move" closure to reuse the channel
// once the queue is declared. We could also clone
// the channel
channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |_| {
info!("channel {} declared queue {}", id, "hello");
channel.basic_publish("", "hello", b"hello from tokio".to_vec(), BasicPublishOptions::default(), BasicProperties::default())
})
})
).wait_future().expect("runtime failure");
}
创建消费者
use futures::{Future, Stream};
use lapin_futures as lapin;
use crate::lapin::{Client, ConnectionProperties};
use crate::lapin::options::{BasicConsumeOptions, QueueDeclareOptions};
use crate::lapin::types::FieldTable;
use log::{debug, info};
fn main() {
env_logger::init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
futures::executor::spawn(
Client::connect(&addr, ConnectionProperties::default()).and_then(|client| {
// create_channel returns a future that is resolved
// once the channel is successfully created
client.create_channel()
}).and_then(|channel| {
let id = channel.id();
info!("created channel with id: {}", id);
let ch = channel.clone();
channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |queue| {
info!("channel {} declared queue {:?}", id, queue);
// basic_consume returns a future of a message
// stream. Any time a message arrives for this consumer,
// the for_each method would be called
channel.basic_consume("hello", "my_consumer", BasicConsumeOptions::default(), FieldTable::default())
}).and_then(|stream| {
info!("got consumer stream");
stream.for_each(move |message| {
debug!("got message: {:?}", message);
info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap());
ch.basic_ack(message.delivery_tag, false)
})
})
})
).wait_future().expect("runtime failure");
}
依赖项
~5–19MB
~300K SLoC