#redis #stream #tokio #api-bindings

redis-asio

基于 tokio 的异步 Redis 驱动器

1 个不稳定发布

0.1.0-alpha2020 年 2 月 21 日

#1631 in 异步

MIT/Apache

94KB
1.5K SLoC

redis-asio

redis-asio 是一个基于异步 tokio 库的纯 Rust 编写的 Redis 客户端库。

该库提供了一个 base 模块,用于低级请求发送和响应处理,以及一个 stream 模块,该模块包含用于与 Redis-Stream "https://redis.ac.cn/topics/streams-intro" 交互的特定接口。

该库使用二进制安全的字符串,允许用户序列化其消息结构并通过 RESP 协议 "https://redis.ac.cn/topics/protocol" 发送。

在项目中使用

通过 Cargo 在项目中依赖 redis-asio

[dependencies]
redis-async = "0.1"

解析 crate 接口

extern crate redis_asio;

激励示例

设置、从缓存获取值

use std::net::SocketAddr;
use futures::Future;
use redis_asio::{RedisCoreConnection, RedisValue, from_redis_value};

let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();

let set_req = command("SET").arg("foo").arg(123);
let get_req = command("GET").arg("foo");

let future = RedisCoreConnection::connect(address)
    .and_then(move |con| {
        // send "SET foo 123" request
        con.send(set_req)
    })
    .and_then(|(con, response)| {
        // check if the value has been set
        assert_eq!(RedisValue::Ok, response);
        // send "GET foo" request
        con.send(get_req)
    })
    .map(move |(_, response)|
        // check if the received value is the same
        assert_eq!(123, from_redis_value(&response).unwrap()))
    .map_err(|_| unreachable!());
// start the Tokio runtime using the `future`
tokio::run(future);

订阅 Redis 流

订阅 Redis 流并处理所有传入条目。Redis Streams 需要每次客户端在之前接收到响应时发送 XREAD/XREADGROUP 请求,换句话说,Redis Streams 不提供订阅 Redis 流的接口。

在 Crate 中,通过 Crate 引擎内部的隐藏请求发送来实现订阅。

以下示例中将发送的请求以获取新条目的请求:"XREADGROUP GROUP mygroup Bob BLOCK 0 STREAMS mystream <"

use std::net::SocketAddr;
use futures::{Future, Stream};
use redis_asio::stream::{RedisStream, SubscribeOptions, StreamEntry,
                         RedisGroup};

let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
// create options to pass it into RedisStream::subscribe()
let group_info = RedisGroup::new("mygroup".to_string(), "Bob".to_string());
let subscribe_options =
    SubscribeOptions::with_group(vec!["mystream".to_string()], group_info);

let future = RedisStream::connect(address)
    .and_then(move |stream: RedisStream| {
        stream.subscribe(subscribe_options)
    })
    .and_then(|subscribe| /*:Subscribe*/ {
        // pass the closure that will be called on each incoming entries
        subscribe.for_each(|entries: Vec<StreamEntry>| {
            for entry in entries.into_iter() {
                println!("Received: {:?}", entry);
            }
            Ok(())
        })
    })
    .map_err(|err| eprintln!("something went wrong: {}", err));
// start the Tokio runtime using the `future`
tokio::run(future);

向 Redis 流发送条目

向 Redis 流发送条目。以下示例中将发送的请求以推送指定条目:"XADD mystream * type 3 data "Hello, world""

use std::net::SocketAddr;
use std::collections::HashMap;
use futures::Future;
use redis_asio::{RedisArgument, IntoRedisArgument};
use redis_asio::stream::{RedisStream, SendEntryOptions, EntryId};

let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
// create options to pass it into RedisStream::send_entry()
let send_options = SendEntryOptions::new("mystream".to_string());

// create key-value pairs
let mut request: HashMap<String, RedisArgument> = HashMap::new();
request.insert("type".to_string(), 3i32.into_redis_argument());
request.insert("data".to_string(), "Hello, world!".into_redis_argument());

let future = RedisStream::connect(address)
    .and_then(move |stream: RedisStream| {
        // HashMap<String, RedisArgument> satisfies the
        // HashMap<String, ToRedisArgument>
        stream.send_entry(send_options, request)
    })
    .map(|(_, inserted_entry_id): (RedisStream, EntryId)| {
        println!("{:?} has sent", inserted_entry_id.to_string());
    })
    .map_err(|err| eprintln!("something went wrong: {}", err));
tokio::run(future);

依赖关系

~6.5MB
~103K SLoC