2 个不稳定版本
| 0.2.0 | 2020年6月27日 | 
|---|---|
| 0.1.0 | 2020年4月27日 | 
#2693 在 数据库接口
在  2 crate 中使用
67KB
 1.5K  SLoC
tang_rs
轻量级连接池。
主要目的是在任何工作量下保持每个连接的公平和平衡使用。
需求
rustc1.42.0
- 一些示例可能需要依赖nightly版本。
功能
tokio-postgres
redis
mongodb(实验性)
lib.rs:
异步连接池。
功能
- default- 多线程池,其中所有future都需要- Send约束。
- no-send- 单线程池,接受- !Sendfutures。
已知限制
不能用于嵌套运行时。
示例
// This example shows how to implement the pool on async_std runtime.
// Most of the xxx-tang crates are implemented with tokio runtime so they can be seen as examples on that matter.
use std::fmt::{Debug, Formatter, Result as FmtResult};
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use async_std::task;
use smol::Timer;
use tang_rs::{Builder, Manager, ManagerFuture, ManagerTimeout};
// our test pool would just generate usize from 0 as connections.
struct TestPoolManager(AtomicUsize);
impl TestPoolManager {
    fn new() -> Self {
        TestPoolManager(AtomicUsize::new(0))
    }
}
// dummy error type
struct TestPoolError;
impl Debug for TestPoolError {
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
        f.debug_struct("TestPoolError")
            .field("source", &"Unknown")
            .finish()
    }
}
// convert instant as timeout error to our pool error.
impl From<Instant> for TestPoolError {
    fn from(_: Instant) -> Self {
        TestPoolError
    }
}
impl Manager for TestPoolManager {
    type Connection = usize;
    type Error = TestPoolError;
    type Timeout = Timer;
    type TimeoutError = Instant;
    fn connect(&self) -> ManagerFuture<'_, Result<Self::Connection, Self::Error>> {
        // how we generate new connections and put them into pool.
        Box::pin(async move { Ok(self.0.fetch_add(1, Ordering::SeqCst)) })
    }
    fn is_valid<'a>(
        &self,
        _conn: &'a mut Self::Connection,
    ) -> ManagerFuture<'a, Result<(), Self::Error>> {
        Box::pin(async {
            // when the connection is pulled from the pool we can check if it's valid.
            Ok(())
        })
    }
    fn is_closed(&self, _conn: &mut Self::Connection) -> bool {
        // return true if you check the connection and want it to be dropped from the pool because it's closed.
        false
    }
    fn spawn<Fut>(&self, fut: Fut)
        where
            Fut: Future<Output = ()> + Send + 'static,
    {
        // some pool inner functions would want to spawn on your executor.
        // you can use the handler to further manage them if you want.
        // normally we just spawn the task and forget about it.
        let _handler = task::spawn(fut);
    }
    // Boilerplate implement for runtime specific timeout future.
    fn timeout<Fut: Future>(&self,fut: Fut, dur: Duration) -> ManagerTimeout<Fut, Self::Timeout> {
        ManagerTimeout::new(fut, Timer::after(dur))
    }
}
#[async_std::main]
async fn main() {
    let mgr = TestPoolManager::new();
    let builder = Builder::new()
        .always_check(false)
        .idle_timeout(None)
        .max_lifetime(None)
        .min_idle(24)
        .max_size(24)
        .build(mgr);
    let pool = builder.await.expect("fail to build pool");
    // spawn 24 futures and pull connections from pool at the same time.
    let (tx, rx) = async_std::sync::channel(100);
    for _i in 0..24 {
        let pool = pool.clone();
        let tx = tx.clone();
        task::spawn(async move {
            let mut pool_ref = pool.get().await.expect("fail to get PoolRef");
            let conn_ref = &*pool_ref;
            println!("we have the reference of a connection : {:?}", conn_ref);
            // we can also get a mut reference from pool_ref
            let conn_ref = &mut *pool_ref;
            let _ = tx.send(*conn_ref);
        });
    }
    drop(tx);
    while let Ok(_connection) = rx.recv().await {
        // We just wait until all connections are pulled out once
    }
}