38 个版本 (破坏性更新)
0.28.1 | 2024 年 8 月 1 日 |
---|---|
0.28.0 | 2024 年 7 月 12 日 |
0.27.0 | 2024 年 6 月 27 日 |
0.23.0 | 2024 年 2 月 27 日 |
0.1.2 | 2022 年 3 月 19 日 |
#374 in 网页编程
34,091 每月下载量
在 7 crates 中使用
8MB
141K SLoC
google-cloud-pubsub
Google Cloud Platform pub/sub 库。
安装
[dependencies]
google-cloud-pubsub = "version"
快速入门
身份验证
有两种方式可以创建一个对 Google Cloud 进行身份验证的客户端。
自动
函数 with_auth()
将尝试从环境变量 GOOGLE_APPLICATION_CREDENTIALS
、GOOGLE_APPLICATION_CREDENTIALS_JSON
或元数据服务器中读取凭据。
这也在 google-cloud-auth 中进行了描述
use google_cloud_pubsub::client::{ClientConfig, Client};
async fn run() {
let config = ClientConfig::default().with_auth().await.unwrap();
let client = Client::new(config).await.unwrap();
}
手动
当你不能使用 gcloud
进行身份验证,但你有一种不同的方式获取凭据(例如不同的环境变量)时,你可以解析自己的 'credentials-file' 并像这样使用它
use google_cloud_auth::credentials::CredentialsFile;
// or google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile
use google_cloud_pubsub::client::{ClientConfig, Client};
async fn run(cred: CredentialsFile) {
let config = ClientConfig::default().with_credentials(cred).await.unwrap();
let client = Client::new(config).await.unwrap();
}
模拟器
对于测试,你可以使用 Emulator-Option,如下所示:在执行程序之前,在以下环境变量中指定模拟器的地址。
export PUBSUB_EMULATOR_HOST=localhost:8681
发布消息
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::topic::TopicConfig;
use google_cloud_pubsub::subscription::SubscriptionConfig;
use google_cloud_gax::grpc::Status;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
async fn run(config: ClientConfig) -> Result<(), Status> {
// Create pubsub client.
let client = Client::new(config).await.unwrap();
// Create topic.
let topic = client.topic("test-topic");
if !topic.exists(None).await? {
topic.create(None, None).await?;
}
// Start publisher.
let publisher = topic.new_publisher(None);
// Publish message.
let tasks : Vec<JoinHandle<Result<String,Status>>> = (0..10).into_iter().map(|_i| {
let publisher = publisher.clone();
tokio::spawn(async move {
let msg = PubsubMessage {
data: "abc".into(),
// Set ordering_key if needed (https://cloud.google.com/pubsub/docs/ordering)
ordering_key: "order".into(),
..Default::default()
};
// Send a message. There are also `publish_bulk` and `publish_immediately` methods.
let mut awaiter = publisher.publish(msg).await;
// The get method blocks until a server-generated ID or an error is returned for the published message.
awaiter.get().await
})
}).collect();
// Wait for all publish task finish
for task in tasks {
let message_id = task.await.unwrap()?;
}
// Wait for publishers in topic finish.
let mut publisher = publisher;
publisher.shutdown();
Ok(())
}
订阅消息
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::SubscriptionConfig;
use google_cloud_gax::grpc::Status;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use futures_util::StreamExt;
async fn run(config: ClientConfig) -> Result<(), Status> {
// Create pubsub client.
let client = Client::new(config).await.unwrap();
// Get the topic to subscribe to.
let topic = client.topic("test-topic");
// Create subscription
// If subscription name does not contain a "/", then the project is taken from client above. Otherwise, the
// name will be treated as a fully qualified resource name
let config = SubscriptionConfig {
// Enable message ordering if needed (https://cloud.google.com/pubsub/docs/ordering)
enable_message_ordering: true,
..Default::default()
};
// Create subscription
let subscription = client.subscription("test-subscription");
if !subscription.exists(None).await? {
subscription.create(topic.fully_qualified_name(), config, None).await?;
}
// Token for cancel.
let cancel = CancellationToken::new();
let cancel2 = cancel.clone();
tokio::spawn(async move {
// Cancel after 10 seconds.
tokio::time::sleep(Duration::from_secs(10)).await;
cancel2.cancel();
});
// Receive blocks until the ctx is cancelled or an error occurs.
// Or simply use the `subscription.subscribe` method.
subscription.receive(|mut message, cancel| async move {
// Handle data.
println!("Got Message: {:?}", message.message.data);
// Ack or Nack message.
let _ = message.ack().await;
}, cancel.clone(), None).await?;
// Delete subscription if needed.
subscription.delete(None).await?;
Ok(())
}
订阅消息(另一种方式)
取消后,等待所有拉取的消息都被处理。
use std::time::Duration;
use futures_util::StreamExt;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
use google_cloud_gax::grpc::Status;
async fn run(config: ClientConfig) -> Result<(), Status> {
// Creating Client, Topic and Subscription...
let client = Client::new(config).await.unwrap();
let subscription = client.subscription("test-subscription");
// Read the messages as a stream
let mut stream = subscription.subscribe(None).await.unwrap();
let cancellable = stream.cancellable();
let task = tokio::spawn(async move {
// None if the stream is cancelled
while let Some(message) = stream.next().await {
message.ack().await.unwrap();
}
});
tokio::time::sleep(Duration::from_secs(60)).await;
cancellable.cancel();
let _ = task.await;
Ok(())
}
取消后,未处理的消息将被 nack。
use std::time::Duration;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
use google_cloud_gax::grpc::Status;
async fn run(config: ClientConfig) -> Result<(), Status> {
// Creating Client, Topic and Subscription...
let client = Client::new(config).await.unwrap();
let subscription = client.subscription("test-subscription");
// Read the messages as a stream
let mut stream = subscription.subscribe(None).await.unwrap();
let cancellable = stream.cancellable();
let task = tokio::spawn(async move {
// None if the stream is cancelled
while let Some(message) = stream.read().await {
message.ack().await.unwrap();
}
});
tokio::time::sleep(Duration::from_secs(60)).await;
cancellable.cancel();
let _ = task.await;
Ok(())
}
依赖
~14–28MB
~520K SLoC