1 个不稳定版本
0.1.0 | 2024年5月12日 |
---|
862 在 数据库接口
每月21次下载
44KB
1K SLoC
RikkaMQ
简单的 Rust 消息队列库
Redis 实现
依赖项
- deadpool-redis
- redis
- tokio
- tracing(可选)
rikka_mq = { version = "", features = ["redis", "tracing"] }
use deadpool_redis::{Config, CreatePoolError, Pool, Runtime};
use rikka_mq::config::MQConfig;
use rikka_mq::define::redis::mq::RedisMessageQueue;
use rikka_mq::info::QueueInfo;
use rikka_mq::mq::MessageQueue;
use serde::{Deserialize, Serialize};
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct QueueData {
a: String,
}
#[derive(Debug)]
struct Module {
pool: Pool,
}
#[tokio::main]
async fn main() -> Result<(), rikka_mq::error::Error> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let url = dotenvy::var(REDIS_URL).unwrap();
let cfg = Config::from_url(url);
let redis_pool = cfg.create_pool(Some(Runtime::Tokio1))?;
let name = "mq_name".to_string();
let module = Arc::new(redis_pool.clone());
let mq = RedisMessageQueue::new(
redis_pool,
module,
name,
MQConfig::default(),
UUID::new_v4,
|module: Arc<Module>, data: TestData| async move {
let pool = &module.pool;
let con = pool.get().await
.map_err(|e| ErrorOperation::Delay(format!("{e:?}")))?;
// Any transactions between db
info!("{:?}, {:?}", module, data);
Ok(())
},
);
mq.start_workers();
for i in 0..1000 {
let data = QueueData {
a: format!("message:{i}"),
};
let data = QueueInfo::from(data);
// Queue
mq.queue(data).await?;
}
tokio::signal::ctrl_c().await?;
Ok(())
}
开发笔记
KeyDB(用于测试 Redis 实现)
podman run --rm --name rikka-mq -p 6379:6379 docker.io/eqalpha/keydb
依赖项
~0.7–10MB
~107K SLoC