3个版本 (破坏性更新)

0.11.0 2024年6月10日
0.10.2 2023年10月25日
0.9.0 2022年11月16日

#910网络编程

Download history 150/week @ 2024-06-05 35/week @ 2024-06-12 5/week @ 2024-06-19

每月130次下载

Apache-2.0

480KB
12K SLoC

iSCP-rs

iSCPv2客户端库

安装

  • 在Cargo.toml中添加依赖

    iscp-rs = "1"
    

使用方法

库功能

  • gen: 使用最新protobuf版本重新生成protobuf消息。

lib.rs:

iscp库是用于访问iSCP版本2的实时API的客户端库。

使用iscp库可以实现使用iSCP的客户端应用程序。

要使用iSCP进行通信,请在建立iscp::Conn连接后,使用流或E2E调用来发送和接收数据。

示例

连接到intdash API

此示例将建立与intdash API的连接。

use std::env;
use std::sync::Arc;

#[derive(Clone)]
struct TokenSource {
    access_token: String,
}

#[async_trait::async_trait]
impl iscp::TokenSource for TokenSource {
    async fn token(&self) -> iscp::Result<String> {
        Ok(self.access_token.clone())
    }
}

let host = env::var("EXAMPLE_HOST").unwrap_or_else(|_| "xxx.xxx.jp".to_string());
let port = env::var("EXAMPLE_PORT")
    .unwrap_or_else(|_| "11443".to_string())
    .parse::<i32>()
    .unwrap();
let api_token = env::var("EXAMPLE_TOKEN").unwrap_or_else(|_| {
    "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx".to_string()
});
let node_id = env::var("EXAMPLE_NODE_ID")
    .unwrap_or_else(|_| "11111111-1111-1111-1111-111111111111".to_string());

let addr = format!("{}:{}", host, port);

let token_source = Arc::new(TokenSource {
    // TODO: アクセストークンはintdash-apiからoauth2で動的に取得してください。
    access_token: api_token,
});

let builder = iscp::ConnBuilder::new(&addr, iscp::TransportKind::Quic)
    .quic_config(Some(iscp::tr::QuicConfig {
        host, // `host` は、サーバー証明書の検証に使用されます。
        mtu: 1000,
        ..Default::default()
    }))
    .encoding(iscp::enc::EncodingKind::Proto)
    .token_source(Some(token_source))
    .node_id(node_id);

tokio::runtime::Runtime::new().unwrap().block_on(async {
    let conn = builder.connect().await.unwrap();
    conn.close().await.unwrap();
});

启动上游

这是上游发送的示例。在这个示例中,从连接中打开上游,并将基准时间元数据和字符串数据点发送到iSCP服务器。

use std::env;
use std::sync::Arc;

#[derive(Clone)]
struct TokenSource {
    access_token: String,
}

#[async_trait::async_trait]
impl iscp::TokenSource for TokenSource {
    async fn token(&self) -> iscp::Result<String> {
        Ok(self.access_token.clone())
    }
}

let host = env::var("EXAMPLE_HOST").unwrap_or_else(|_| "xxx.xxx.jp".to_string());
let port = env::var("EXAMPLE_PORT")
    .unwrap_or_else(|_| "11443".to_string())
    .parse::<i32>()
    .unwrap();
let api_token = env::var("EXAMPLE_TOKEN").unwrap_or_else(|_| {
    "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx".to_string()
});
let node_id = env::var("EXAMPLE_NODE_ID")
    .unwrap_or_else(|_| "11111111-1111-1111-1111-111111111111".to_string());

let addr = format!("{}:{}", host, port);

let token_source = Arc::new(TokenSource {
    // TODO: アクセストークンはintdash-apiからoauth2で動的に取得してください。
    access_token: api_token,
});

let builder = iscp::ConnBuilder::new(&addr, iscp::TransportKind::Quic)
    .quic_config(Some(iscp::tr::QuicConfig {
        host, // `host` は、サーバー証明書の検証に使用されます。
        mtu: 1000,
        ..Default::default()
    }))
    .encoding(iscp::enc::EncodingKind::Proto)
    .token_source(Some(token_source))
    .node_id(node_id);

tokio::runtime::Runtime::new().unwrap().block_on(async {
    let conn = builder.connect().await.unwrap();

    let session_id = uuid::Uuid::new_v4().to_string(); // セッションIDを払い出します。
    let base_time = chrono::Utc::now();

    let up = conn
        .upstream_builder(&session_id)
        .flush_policy(iscp::FlushPolicy::IntervalOnly {
            interval: std::time::Duration::from_millis(5),
        })
        .ack_interval(chrono::Duration::milliseconds(1000))
        .persist(true)
        .build()
        .await
        .unwrap();

    // 基準時刻をiSCPサーバーへ送信します。
    conn.send_base_time(
        iscp::msg::BaseTime {
            elapsed_time: chrono::Duration::zero(),
            name: "edge_rtc".to_string(),
            base_time,
            priority: 20,
            session_id,
        },
        iscp::SendMetadataOptions {
            persist: true,
        },
    )
    .await
    .unwrap();

    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

    // データポイントをiSCPサーバーへ送信します。
    up.write_data_points(iscp::DataPointGroup {
        id: iscp::DataId::new("greeting", "string"),
        data_points: vec![iscp::DataPoint {
            payload: "hello".into(),
            elapsed_time: chrono::Utc::now() - base_time,
        }],
    })
    .await
    .unwrap();

    up.close(Some(iscp::UpstreamCloseOptions {
        close_session: true,
    }))
    .await
    .unwrap();
    conn.close().await.unwrap();
});

启动下游

这是接收上述上游发送数据的下游示例。在这个示例中,接收上游开始元数据、基准时间元数据和字符串数据点。

use std::env;
use std::sync::Arc;

#[derive(Clone)]
struct TokenSource {
    access_token: String,
}

#[async_trait::async_trait]
impl iscp::TokenSource for TokenSource {
    async fn token(&self) -> iscp::Result<String> {
        Ok(self.access_token.clone())
    }
}

let host = env::var("EXAMPLE_HOST").unwrap_or_else(|_| "xxx.xxx.jp".to_string());
let port = env::var("EXAMPLE_PORT")
    .unwrap_or_else(|_| "11443".to_string())
    .parse::<i32>()
    .unwrap();
let api_token = env::var("EXAMPLE_TOKEN").unwrap_or_else(|_| {
    "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx".to_string()
});
let node_id = env::var("EXAMPLE_NODE_ID")
    .unwrap_or_else(|_| "11111111-1111-1111-1111-111111111111".to_string());
let source_node_id = env::var("EXAMPLE_SRC_NODE_ID")
    .unwrap_or_else(|_| "22222222-2222-2222-2222-222222222222".to_string());

let addr = format!("{}:{}", host, port);

let token_source = Arc::new(TokenSource {
    // TODO: アクセストークンはintdash-apiからoauth2で動的に取得してください。
    access_token: api_token,
});

let builder = iscp::ConnBuilder::new(&addr, iscp::TransportKind::Quic)
    .quic_config(Some(iscp::tr::QuicConfig {
        host, // `host` は、サーバー証明書の検証に使用されます。
        mtu: 1000,
        ..Default::default()
    }))
    .encoding(iscp::enc::EncodingKind::Proto)
    .token_source(Some(token_source))
    .node_id(node_id);

tokio::runtime::Runtime::new().unwrap().block_on(async {
    let conn = builder.connect().await.unwrap();

    let mut disconnect_notified = conn.subscribe_disconnect();

    let down = conn
        .downstream_builder(
            vec![iscp::DownstreamFilter {
                source_node_id, // 送信元のノードIDを指定します。
                data_filters: vec![iscp::DataId::new("#", "#").into()], // 受信したいデータを名称と型で指定します。この例では、ワイルドカード `#` を使用して全てのデータを取得します。
            }],
        )
        .build()
        .await
        .unwrap();

    // ダウンストリーム開始のメタデータの受信
    let metadata = down.read_metadata().await;
    println!("{:?}", metadata.unwrap());

    // アップストリーム開始のメタデータの受信
    let metadata = down.read_metadata().await;
    println!("{:?}", metadata.unwrap());

    // 基準時刻のメタデータの受信
    let metadata = down.read_metadata().await;
    println!("{:?}", metadata.unwrap());

    // 文字列型のデータポイントの受信
    let down_chunk = down.read_data_points().await.unwrap();
    println!("{:?}", down_chunk);

    down.close().await.unwrap();
    tokio::spawn(async move { conn.close().await.unwrap() });

    disconnect_notified.recv().await;
});

依赖关系

~21–37MB
~642K SLoC