38个版本
| 0.7.3 | 2024年7月8日 | 
|---|---|
| 0.7.1 | 2024年4月23日 | 
| 0.6.2 | 2024年3月20日 | 
| 0.5.3 | 2023年4月16日 | 
| 0.2.4 | 2022年5月15日 | 
4 in 财务
1,307每月下载量
用于  3 crates
69KB
 1K  SLoC
易货集成
高性能、低级别的框架,用于构建灵活的Web集成。
由其他 Barter 交易生态系统crates使用,以构建强大的金融交易所集成,主要用于公共数据收集和交易执行。它是
- 低级别的:将通过网络传输的原始数据流转换为任意数据模型,使用任意数据转换。
- 灵活的:兼容任何协议(WebSocket、FIX、Http等)、任何输入/输出模型以及任何用户定义的转换。
核心抽象包括
- RestClient 提供客户端与服务器之间可配置的签名Http通信。
- ExchangeStream 提供任何异步流协议(WebSocket、FIX等)上的可配置通信。
这两个核心抽象都提供了您需要的强大粘合剂,方便地在服务器和客户端数据模型之间进行转换。
参见: Barter、Barter-Data 和 Barter-Execution
概述
易货集成是一个高性能、低级别、可配置的框架,用于构建灵活的Web集成。
RestClient
(同步私有和公共Http通信)
在较高层次上,一个 RestClient 具有几个主要组件,允许它执行 RestRequests
- 在目标API上具有可配置签名逻辑的 RequestSigner。
- 将API特定响应转换为所需输出类型的 HttpParser。
ExchangeStream
(使用WebSocket和FIX等流式传输协议的异步通信)
从高层次来看,一个 ExchangeStream 由几个主要组件组成
- 内部流/接收套接字(例如/ WebSocket、FIX等)。
- StreamParser,能够将输入协议消息(例如/ WebSocket、FIX等)解析为交易所特定消息。
- 转换器,将交易所特定消息转换为所需输出类型的迭代器。
示例
使用带签名的GET请求获取Ftx账户余额
use std::borrow::Cow;
use barter_integration::{
    error::SocketError,
    metric::Tag,
    model::Symbol,
    protocol::http::{
        private::{encoder::HexEncoder, RequestSigner, Signer},
        rest::{client::RestClient, RestRequest},
        HttpParser,
    },
};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use hmac::{Hmac, Mac};
use reqwest::{RequestBuilder, StatusCode};
use serde::Deserialize;
use thiserror::Error;
use tokio::sync::mpsc;
struct FtxSigner {
    api_key: String,
}
// Configuration required to sign every Ftx `RestRequest`
struct FtxSignConfig<'a> {
    api_key: &'a str,
    time: DateTime<Utc>,
    method: reqwest::Method,
    path: Cow<'static, str>,
}
impl Signer for FtxSigner {
    type Config<'a> = FtxSignConfig<'a> where Self: 'a;
    fn config<'a, Request>(
        &'a self,
        request: Request,
        _: &RequestBuilder,
    ) -> Result<Self::Config<'a>, SocketError>
    where
        Request: RestRequest,
    {
        Ok(FtxSignConfig {
            api_key: self.api_key.as_str(),
            time: Utc::now(),
            method: Request::method(),
            path: request.path(),
        })
    }
    fn add_bytes_to_sign<M>(mac: &mut M, config: &Self::Config<'a>) -> Bytes
    where
        M: Mac
    {
        mac.update(config.time.to_string().as_bytes());
        mac.update(config.method.as_str().as_bytes());
        mac.update(config.path.as_bytes());
    }
    fn build_signed_request<'a>(
        config: Self::Config<'a>,
        builder: RequestBuilder,
        signature: String,
    ) -> Result<reqwest::Request, SocketError> {
        // Add Ftx required Headers & build reqwest::Request
        builder
            .header("FTX-KEY", config.api_key)
            .header("FTX-TS", &config.time.timestamp_millis().to_string())
            .header("FTX-SIGN", &signature)
            .build()
            .map_err(SocketError::from)
    }
}
struct FtxParser;
impl HttpParser for FtxParser {
    type ApiError = serde_json::Value;
    type OutputError = ExecutionError;
    fn parse_api_error(&self, status: StatusCode, api_error: Self::ApiError) -> Self::OutputError {
        // For simplicity, use serde_json::Value as Error and extract raw String for parsing
        let error = api_error.to_string();
        // Parse Ftx error message to determine custom ExecutionError variant
        match error.as_str() {
            message if message.contains("Invalid login credentials") => {
                ExecutionError::Unauthorised(error)
            }
            _ => ExecutionError::Socket(SocketError::HttpResponse(status, error)),
        }
    }
}
#[derive(Debug, Error)]
enum ExecutionError {
    #[error("request authorisation invalid: {0}")]
    Unauthorised(String),
    #[error("SocketError: {0}")]
    Socket(#[from] SocketError),
}
struct FetchBalancesRequest;
impl RestRequest for FetchBalancesRequest {
    type Response = FetchBalancesResponse; // Define Response type
    type QueryParams = (); // FetchBalances does not require any QueryParams
    type Body = (); // FetchBalances does not require any Body
    fn path(&self) -> Cow<'static, str> {
        Cow::Borrowed("/api/wallet/balances")
    }
    fn method() -> reqwest::Method {
        reqwest::Method::GET
    }
}
#[derive(Deserialize)]
#[allow(dead_code)]
struct FetchBalancesResponse {
    success: bool,
    result: Vec<FtxBalance>,
}
#[derive(Deserialize)]
#[allow(dead_code)]
struct FtxBalance {
    #[serde(rename = "coin")]
    symbol: Symbol,
    total: f64,
}
/// See Barter-Execution for a comprehensive real-life example, as well as code you can use out of the
/// box to execute trades on many exchanges.
#[tokio::main]
async fn main() {
    // HMAC-SHA256 encoded account API secret used for signing private http requests
    let mac: Hmac<sha2::Sha256> = Hmac::new_from_slice("api_secret".as_bytes()).unwrap();
    // Build Ftx configured RequestSigner for signing http requests with hex encoding
    let request_signer = RequestSigner::new(
        FtxSigner {
            api_key: "api_key".to_string(),
        },
        mac,
        HexEncoder,
    );
    // Build RestClient with Ftx configuration
    let rest_client = RestClient::new("https://ftx.com", request_signer, FtxParser);
    // Fetch Result<FetchBalancesResponse, ExecutionError>
    let _response = rest_client.execute(FetchBalancesRequest).await;
}
消费Binance期货逐笔交易并计算成交量的滚动总和
use barter_integration::{
    error::SocketError,
    protocol::websocket::{WebSocket, WebSocketParser, WsMessage},
    ExchangeStream, Transformer,
};
use futures::{SinkExt, StreamExt};
use serde::{de, Deserialize};
use serde_json::json;
use std::str::FromStr;
use tokio_tungstenite::connect_async;
use tracing::debug;
// Convenient type alias for an `ExchangeStream` utilising a tungstenite `WebSocket`
type ExchangeWsStream<Exchange> = ExchangeStream<WebSocketParser, WebSocket, Exchange, VolumeSum>;
// Communicative type alias for what the VolumeSum the Transformer is generating
type VolumeSum = f64;
#[derive(Deserialize)]
#[serde(untagged, rename_all = "camelCase")]
enum BinanceMessage {
    SubResponse {
        result: Option<Vec<String>>,
        id: u32,
    },
    Trade {
        #[serde(rename = "q", deserialize_with = "de_str")]
        quantity: f64,
    },
}
struct StatefulTransformer {
    sum_of_volume: VolumeSum,
}
impl Transformer<VolumeSum> for StatefulTransformer {
    type Input = BinanceMessage;
    type OutputIter = Vec<Result<VolumeSum, SocketError>>;
    fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
        // Add new input Trade quantity to sum
        match input {
            BinanceMessage::SubResponse { result, id } => {
                debug!("Received SubResponse for {}: {:?}", id, result);
                // Don't care about this for the example
            }
            BinanceMessage::Trade { quantity, .. } => {
                // Add new Trade volume to internal state VolumeSum
                self.sum_of_volume += quantity;
            }
        };
        // Return IntoIterator of length 1 containing the running sum of volume
        vec![Ok(self.sum_of_volume)]
    }
}
/// See Barter-Data for a comprehensive real-life example, as well as code you can use out of the
/// box to collect real-time public market data from many exchanges.
#[tokio::main]
async fn main() {
    // Establish Sink/Stream communication with desired WebSocket server
    let mut binance_conn = connect_async("wss://fstream.binance.com/ws/")
        .await
        .map(|(ws_conn, _)| ws_conn)
        .expect("failed to connect");
    // Send something over the socket (eg/ Binance trades subscription)
    binance_conn
        .send(WsMessage::Text(
            json!({"method": "SUBSCRIBE","params": ["btcusdt@aggTrade"],"id": 1}).to_string(),
        ))
        .await
        .expect("failed to send WsMessage over socket");
    // Instantiate some arbitrary Transformer to apply to data parsed from the WebSocket protocol
    let transformer = StatefulTransformer { sum_of_volume: 0.0 };
    // ExchangeWsStream includes pre-defined WebSocket Sink/Stream & WebSocket StreamParser
    let mut ws_stream = ExchangeWsStream::new(binance_conn, transformer);
    // Receive a stream of your desired Output data model from the ExchangeStream
    while let Some(volume_result) = ws_stream.next().await {
        match volume_result {
            Ok(cumulative_volume) => {
                // Do something with your data
                println!("{cumulative_volume:?}");
            }
            Err(error) => {
                // React to any errors produced by the internal transformation
                eprintln!("{error}")
            }
        }
    }
}
/// Deserialize a `String` as the desired type.
fn de_str<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
    D: de::Deserializer<'de>,
    T: FromStr,
    T::Err: std::fmt::Display,
{
    let data: String = Deserialize::deserialize(deserializer)?;
    data.parse::<T>().map_err(de::Error::custom)
}
对于更大型、“现实世界”的示例,请参阅Barter-Data仓库。
获取帮助
首先,查看您的问题是否可以在API文档中找到。如果答案不在那里,我很乐意通过聊天并尝试通过Discord回答您的问题。
贡献
感谢您帮助改进Barter生态系统!请在Discord上联系我,讨论开发、新功能和未来路线图。
相关项目
除了Barter-Integration包之外,Barter项目还维护
- Barter:具有电池包的高性能、可扩展和模块化交易组件。包含一个预构建的交易引擎,可以作为实时交易或回测系统。
- Barter-Data:一个高性能WebSocket集成库,用于从领先的加密货币交易所流式传输公共数据。
- Barter-Execution:金融交易所集成,用于交易执行——尚未发布!
路线图
- 添加新的默认StreamParser实现,以实现与其他流行系统(如Kafka)的集成。
许可
此项目受MIT许可的许可。
贡献
除非您明确声明,否则您提交给Barter-Integration的任何贡献都将按MIT许可,不附加任何额外条款或条件。
依赖关系
~9–21MB
~305K SLoC