3个版本 (破坏性更新)
0.11.0 | 2024年6月10日 |
---|---|
0.10.2 | 2023年10月25日 |
0.9.0 | 2022年11月16日 |
#910 在 网络编程
每月130次下载
480KB
12K SLoC
iSCP-rs
iSCPv2客户端库
安装
-
在Cargo.toml中添加依赖
iscp-rs = "1"
使用方法
库功能
gen
: 使用最新protobuf版本重新生成protobuf消息。
lib.rs
:
iscp库是用于访问iSCP版本2的实时API的客户端库。
使用iscp库可以实现使用iSCP的客户端应用程序。
要使用iSCP进行通信,请在建立iscp::Conn
连接后,使用流或E2E调用来发送和接收数据。
示例
连接到intdash API
此示例将建立与intdash API的连接。
use std::env;
use std::sync::Arc;
#[derive(Clone)]
struct TokenSource {
access_token: String,
}
#[async_trait::async_trait]
impl iscp::TokenSource for TokenSource {
async fn token(&self) -> iscp::Result<String> {
Ok(self.access_token.clone())
}
}
let host = env::var("EXAMPLE_HOST").unwrap_or_else(|_| "xxx.xxx.jp".to_string());
let port = env::var("EXAMPLE_PORT")
.unwrap_or_else(|_| "11443".to_string())
.parse::<i32>()
.unwrap();
let api_token = env::var("EXAMPLE_TOKEN").unwrap_or_else(|_| {
"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx".to_string()
});
let node_id = env::var("EXAMPLE_NODE_ID")
.unwrap_or_else(|_| "11111111-1111-1111-1111-111111111111".to_string());
let addr = format!("{}:{}", host, port);
let token_source = Arc::new(TokenSource {
// TODO: アクセストークンはintdash-apiからoauth2で動的に取得してください。
access_token: api_token,
});
let builder = iscp::ConnBuilder::new(&addr, iscp::TransportKind::Quic)
.quic_config(Some(iscp::tr::QuicConfig {
host, // `host` は、サーバー証明書の検証に使用されます。
mtu: 1000,
..Default::default()
}))
.encoding(iscp::enc::EncodingKind::Proto)
.token_source(Some(token_source))
.node_id(node_id);
tokio::runtime::Runtime::new().unwrap().block_on(async {
let conn = builder.connect().await.unwrap();
conn.close().await.unwrap();
});
启动上游
这是上游发送的示例。在这个示例中,从连接中打开上游,并将基准时间元数据和字符串数据点发送到iSCP服务器。
use std::env;
use std::sync::Arc;
#[derive(Clone)]
struct TokenSource {
access_token: String,
}
#[async_trait::async_trait]
impl iscp::TokenSource for TokenSource {
async fn token(&self) -> iscp::Result<String> {
Ok(self.access_token.clone())
}
}
let host = env::var("EXAMPLE_HOST").unwrap_or_else(|_| "xxx.xxx.jp".to_string());
let port = env::var("EXAMPLE_PORT")
.unwrap_or_else(|_| "11443".to_string())
.parse::<i32>()
.unwrap();
let api_token = env::var("EXAMPLE_TOKEN").unwrap_or_else(|_| {
"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx".to_string()
});
let node_id = env::var("EXAMPLE_NODE_ID")
.unwrap_or_else(|_| "11111111-1111-1111-1111-111111111111".to_string());
let addr = format!("{}:{}", host, port);
let token_source = Arc::new(TokenSource {
// TODO: アクセストークンはintdash-apiからoauth2で動的に取得してください。
access_token: api_token,
});
let builder = iscp::ConnBuilder::new(&addr, iscp::TransportKind::Quic)
.quic_config(Some(iscp::tr::QuicConfig {
host, // `host` は、サーバー証明書の検証に使用されます。
mtu: 1000,
..Default::default()
}))
.encoding(iscp::enc::EncodingKind::Proto)
.token_source(Some(token_source))
.node_id(node_id);
tokio::runtime::Runtime::new().unwrap().block_on(async {
let conn = builder.connect().await.unwrap();
let session_id = uuid::Uuid::new_v4().to_string(); // セッションIDを払い出します。
let base_time = chrono::Utc::now();
let up = conn
.upstream_builder(&session_id)
.flush_policy(iscp::FlushPolicy::IntervalOnly {
interval: std::time::Duration::from_millis(5),
})
.ack_interval(chrono::Duration::milliseconds(1000))
.persist(true)
.build()
.await
.unwrap();
// 基準時刻をiSCPサーバーへ送信します。
conn.send_base_time(
iscp::msg::BaseTime {
elapsed_time: chrono::Duration::zero(),
name: "edge_rtc".to_string(),
base_time,
priority: 20,
session_id,
},
iscp::SendMetadataOptions {
persist: true,
},
)
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// データポイントをiSCPサーバーへ送信します。
up.write_data_points(iscp::DataPointGroup {
id: iscp::DataId::new("greeting", "string"),
data_points: vec![iscp::DataPoint {
payload: "hello".into(),
elapsed_time: chrono::Utc::now() - base_time,
}],
})
.await
.unwrap();
up.close(Some(iscp::UpstreamCloseOptions {
close_session: true,
}))
.await
.unwrap();
conn.close().await.unwrap();
});
启动下游
这是接收上述上游发送数据的下游示例。在这个示例中,接收上游开始元数据、基准时间元数据和字符串数据点。
use std::env;
use std::sync::Arc;
#[derive(Clone)]
struct TokenSource {
access_token: String,
}
#[async_trait::async_trait]
impl iscp::TokenSource for TokenSource {
async fn token(&self) -> iscp::Result<String> {
Ok(self.access_token.clone())
}
}
let host = env::var("EXAMPLE_HOST").unwrap_or_else(|_| "xxx.xxx.jp".to_string());
let port = env::var("EXAMPLE_PORT")
.unwrap_or_else(|_| "11443".to_string())
.parse::<i32>()
.unwrap();
let api_token = env::var("EXAMPLE_TOKEN").unwrap_or_else(|_| {
"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx".to_string()
});
let node_id = env::var("EXAMPLE_NODE_ID")
.unwrap_or_else(|_| "11111111-1111-1111-1111-111111111111".to_string());
let source_node_id = env::var("EXAMPLE_SRC_NODE_ID")
.unwrap_or_else(|_| "22222222-2222-2222-2222-222222222222".to_string());
let addr = format!("{}:{}", host, port);
let token_source = Arc::new(TokenSource {
// TODO: アクセストークンはintdash-apiからoauth2で動的に取得してください。
access_token: api_token,
});
let builder = iscp::ConnBuilder::new(&addr, iscp::TransportKind::Quic)
.quic_config(Some(iscp::tr::QuicConfig {
host, // `host` は、サーバー証明書の検証に使用されます。
mtu: 1000,
..Default::default()
}))
.encoding(iscp::enc::EncodingKind::Proto)
.token_source(Some(token_source))
.node_id(node_id);
tokio::runtime::Runtime::new().unwrap().block_on(async {
let conn = builder.connect().await.unwrap();
let mut disconnect_notified = conn.subscribe_disconnect();
let down = conn
.downstream_builder(
vec![iscp::DownstreamFilter {
source_node_id, // 送信元のノードIDを指定します。
data_filters: vec![iscp::DataId::new("#", "#").into()], // 受信したいデータを名称と型で指定します。この例では、ワイルドカード `#` を使用して全てのデータを取得します。
}],
)
.build()
.await
.unwrap();
// ダウンストリーム開始のメタデータの受信
let metadata = down.read_metadata().await;
println!("{:?}", metadata.unwrap());
// アップストリーム開始のメタデータの受信
let metadata = down.read_metadata().await;
println!("{:?}", metadata.unwrap());
// 基準時刻のメタデータの受信
let metadata = down.read_metadata().await;
println!("{:?}", metadata.unwrap());
// 文字列型のデータポイントの受信
let down_chunk = down.read_data_points().await.unwrap();
println!("{:?}", down_chunk);
down.close().await.unwrap();
tokio::spawn(async move { conn.close().await.unwrap() });
disconnect_notified.recv().await;
});
依赖关系
~21–37MB
~642K SLoC