#trading #backtesting #investment #stock #financial-data #data-transformation #data-model

易货集成

适用于构建灵活的Web集成的低级别框架,尤其是在与金融交易所集成时

38个版本

0.7.3 2024年7月8日
0.7.1 2024年4月23日
0.6.2 2024年3月20日
0.5.3 2023年4月16日
0.2.4 2022年5月15日

4 in 财务

Download history 472/week @ 2024-05-03 528/week @ 2024-05-10 424/week @ 2024-05-17 496/week @ 2024-05-24 403/week @ 2024-05-31 509/week @ 2024-06-07 367/week @ 2024-06-14 295/week @ 2024-06-21 338/week @ 2024-06-28 604/week @ 2024-07-05 377/week @ 2024-07-12 288/week @ 2024-07-19 265/week @ 2024-07-26 270/week @ 2024-08-02 263/week @ 2024-08-09 413/week @ 2024-08-16

1,307每月下载量
用于 3 crates

MIT 许可证

69KB
1K SLoC

易货集成

高性能、低级别的框架,用于构建灵活的Web集成。

由其他 Barter 交易生态系统crates使用,以构建强大的金融交易所集成,主要用于公共数据收集和交易执行。它是

  • 低级别的:将通过网络传输的原始数据流转换为任意数据模型,使用任意数据转换。
  • 灵活的:兼容任何协议(WebSocket、FIX、Http等)、任何输入/输出模型以及任何用户定义的转换。

核心抽象包括

  • RestClient 提供客户端与服务器之间可配置的签名Http通信。
  • ExchangeStream 提供任何异步流协议(WebSocket、FIX等)上的可配置通信。

这两个核心抽象都提供了您需要的强大粘合剂,方便地在服务器和客户端数据模型之间进行转换。

参见: BarterBarter-DataBarter-Execution

Crates.io MIT licensed Build Status Discord chat

API文档 | 聊天

概述

易货集成是一个高性能、低级别、可配置的框架,用于构建灵活的Web集成。

RestClient

(同步私有和公共Http通信)

在较高层次上,一个 RestClient 具有几个主要组件,允许它执行 RestRequests

  • 在目标API上具有可配置签名逻辑的 RequestSigner
  • 将API特定响应转换为所需输出类型的 HttpParser

ExchangeStream

(使用WebSocket和FIX等流式传输协议的异步通信)

从高层次来看,一个 ExchangeStream 由几个主要组件组成

  • 内部流/接收套接字(例如/ WebSocket、FIX等)。
  • StreamParser,能够将输入协议消息(例如/ WebSocket、FIX等)解析为交易所特定消息。
  • 转换器,将交易所特定消息转换为所需输出类型的迭代器。

示例

使用带签名的GET请求获取Ftx账户余额

use std::borrow::Cow;

use barter_integration::{
    error::SocketError,
    metric::Tag,
    model::Symbol,
    protocol::http::{
        private::{encoder::HexEncoder, RequestSigner, Signer},
        rest::{client::RestClient, RestRequest},
        HttpParser,
    },
};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use hmac::{Hmac, Mac};
use reqwest::{RequestBuilder, StatusCode};
use serde::Deserialize;
use thiserror::Error;
use tokio::sync::mpsc;

struct FtxSigner {
    api_key: String,
}

// Configuration required to sign every Ftx `RestRequest`
struct FtxSignConfig<'a> {
    api_key: &'a str,
    time: DateTime<Utc>,
    method: reqwest::Method,
    path: Cow<'static, str>,
}

impl Signer for FtxSigner {
    type Config<'a> = FtxSignConfig<'a> where Self: 'a;

    fn config<'a, Request>(
        &'a self,
        request: Request,
        _: &RequestBuilder,
    ) -> Result<Self::Config<'a>, SocketError>
    where
        Request: RestRequest,
    {
        Ok(FtxSignConfig {
            api_key: self.api_key.as_str(),
            time: Utc::now(),
            method: Request::method(),
            path: request.path(),
        })
    }

    fn add_bytes_to_sign<M>(mac: &mut M, config: &Self::Config<'a>) -> Bytes
    where
        M: Mac
    {
        mac.update(config.time.to_string().as_bytes());
        mac.update(config.method.as_str().as_bytes());
        mac.update(config.path.as_bytes());
    }

    fn build_signed_request<'a>(
        config: Self::Config<'a>,
        builder: RequestBuilder,
        signature: String,
    ) -> Result<reqwest::Request, SocketError> {
        // Add Ftx required Headers & build reqwest::Request
        builder
            .header("FTX-KEY", config.api_key)
            .header("FTX-TS", &config.time.timestamp_millis().to_string())
            .header("FTX-SIGN", &signature)
            .build()
            .map_err(SocketError::from)
    }
}

struct FtxParser;

impl HttpParser for FtxParser {
    type ApiError = serde_json::Value;
    type OutputError = ExecutionError;

    fn parse_api_error(&self, status: StatusCode, api_error: Self::ApiError) -> Self::OutputError {
        // For simplicity, use serde_json::Value as Error and extract raw String for parsing
        let error = api_error.to_string();

        // Parse Ftx error message to determine custom ExecutionError variant
        match error.as_str() {
            message if message.contains("Invalid login credentials") => {
                ExecutionError::Unauthorised(error)
            }
            _ => ExecutionError::Socket(SocketError::HttpResponse(status, error)),
        }
    }
}

#[derive(Debug, Error)]
enum ExecutionError {
    #[error("request authorisation invalid: {0}")]
    Unauthorised(String),

    #[error("SocketError: {0}")]
    Socket(#[from] SocketError),
}

struct FetchBalancesRequest;

impl RestRequest for FetchBalancesRequest {
    type Response = FetchBalancesResponse; // Define Response type
    type QueryParams = (); // FetchBalances does not require any QueryParams
    type Body = (); // FetchBalances does not require any Body

    fn path(&self) -> Cow<'static, str> {
        Cow::Borrowed("/api/wallet/balances")
    }

    fn method() -> reqwest::Method {
        reqwest::Method::GET
    }
}

#[derive(Deserialize)]
#[allow(dead_code)]
struct FetchBalancesResponse {
    success: bool,
    result: Vec<FtxBalance>,
}

#[derive(Deserialize)]
#[allow(dead_code)]
struct FtxBalance {
    #[serde(rename = "coin")]
    symbol: Symbol,
    total: f64,
}

/// See Barter-Execution for a comprehensive real-life example, as well as code you can use out of the
/// box to execute trades on many exchanges.
#[tokio::main]
async fn main() {
    // HMAC-SHA256 encoded account API secret used for signing private http requests
    let mac: Hmac<sha2::Sha256> = Hmac::new_from_slice("api_secret".as_bytes()).unwrap();

    // Build Ftx configured RequestSigner for signing http requests with hex encoding
    let request_signer = RequestSigner::new(
        FtxSigner {
            api_key: "api_key".to_string(),
        },
        mac,
        HexEncoder,
    );

    // Build RestClient with Ftx configuration
    let rest_client = RestClient::new("https://ftx.com", request_signer, FtxParser);

    // Fetch Result<FetchBalancesResponse, ExecutionError>
    let _response = rest_client.execute(FetchBalancesRequest).await;
}

消费Binance期货逐笔交易并计算成交量的滚动总和

use barter_integration::{
    error::SocketError,
    protocol::websocket::{WebSocket, WebSocketParser, WsMessage},
    ExchangeStream, Transformer,
};
use futures::{SinkExt, StreamExt};
use serde::{de, Deserialize};
use serde_json::json;
use std::str::FromStr;
use tokio_tungstenite::connect_async;
use tracing::debug;

// Convenient type alias for an `ExchangeStream` utilising a tungstenite `WebSocket`
type ExchangeWsStream<Exchange> = ExchangeStream<WebSocketParser, WebSocket, Exchange, VolumeSum>;

// Communicative type alias for what the VolumeSum the Transformer is generating
type VolumeSum = f64;

#[derive(Deserialize)]
#[serde(untagged, rename_all = "camelCase")]
enum BinanceMessage {
    SubResponse {
        result: Option<Vec<String>>,
        id: u32,
    },
    Trade {
        #[serde(rename = "q", deserialize_with = "de_str")]
        quantity: f64,
    },
}

struct StatefulTransformer {
    sum_of_volume: VolumeSum,
}

impl Transformer<VolumeSum> for StatefulTransformer {
    type Input = BinanceMessage;
    type OutputIter = Vec<Result<VolumeSum, SocketError>>;

    fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
        // Add new input Trade quantity to sum
        match input {
            BinanceMessage::SubResponse { result, id } => {
                debug!("Received SubResponse for {}: {:?}", id, result);
                // Don't care about this for the example
            }
            BinanceMessage::Trade { quantity, .. } => {
                // Add new Trade volume to internal state VolumeSum
                self.sum_of_volume += quantity;
            }
        };

        // Return IntoIterator of length 1 containing the running sum of volume
        vec![Ok(self.sum_of_volume)]
    }
}

/// See Barter-Data for a comprehensive real-life example, as well as code you can use out of the
/// box to collect real-time public market data from many exchanges.
#[tokio::main]
async fn main() {
    // Establish Sink/Stream communication with desired WebSocket server
    let mut binance_conn = connect_async("wss://fstream.binance.com/ws/")
        .await
        .map(|(ws_conn, _)| ws_conn)
        .expect("failed to connect");

    // Send something over the socket (eg/ Binance trades subscription)
    binance_conn
        .send(WsMessage::Text(
            json!({"method": "SUBSCRIBE","params": ["btcusdt@aggTrade"],"id": 1}).to_string(),
        ))
        .await
        .expect("failed to send WsMessage over socket");

    // Instantiate some arbitrary Transformer to apply to data parsed from the WebSocket protocol
    let transformer = StatefulTransformer { sum_of_volume: 0.0 };

    // ExchangeWsStream includes pre-defined WebSocket Sink/Stream & WebSocket StreamParser
    let mut ws_stream = ExchangeWsStream::new(binance_conn, transformer);

    // Receive a stream of your desired Output data model from the ExchangeStream
    while let Some(volume_result) = ws_stream.next().await {
        match volume_result {
            Ok(cumulative_volume) => {
                // Do something with your data
                println!("{cumulative_volume:?}");
            }
            Err(error) => {
                // React to any errors produced by the internal transformation
                eprintln!("{error}")
            }
        }
    }
}

/// Deserialize a `String` as the desired type.
fn de_str<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
    D: de::Deserializer<'de>,
    T: FromStr,
    T::Err: std::fmt::Display,
{
    let data: String = Deserialize::deserialize(deserializer)?;
    data.parse::<T>().map_err(de::Error::custom)
}

对于更大型、“现实世界”的示例,请参阅Barter-Data仓库。

获取帮助

首先,查看您的问题是否可以在API文档中找到。如果答案不在那里,我很乐意通过聊天并尝试通过Discord回答您的问题。

贡献

感谢您帮助改进Barter生态系统!请在Discord上联系我,讨论开发、新功能和未来路线图。

除了Barter-Integration包之外,Barter项目还维护

  • Barter:具有电池包的高性能、可扩展和模块化交易组件。包含一个预构建的交易引擎,可以作为实时交易或回测系统。
  • Barter-Data:一个高性能WebSocket集成库,用于从领先的加密货币交易所流式传输公共数据。
  • Barter-Execution:金融交易所集成,用于交易执行——尚未发布!

路线图

  • 添加新的默认StreamParser实现,以实现与其他流行系统(如Kafka)的集成。

许可

此项目受MIT许可的许可。

贡献

除非您明确声明,否则您提交给Barter-Integration的任何贡献都将按MIT许可,不附加任何额外条款或条件。

依赖关系

~9–21MB
~305K SLoC