#google-cloud #pubsub #gcp

google-cloud-pubsub

Google Cloud Platform pubsub 客户端库

38 个版本 (破坏性更新)

0.28.1 2024 年 8 月 1 日
0.28.0 2024 年 7 月 12 日
0.27.0 2024 年 6 月 27 日
0.23.0 2024 年 2 月 27 日
0.1.2 2022 年 3 月 19 日

#374 in 网页编程

Download history 17037/week @ 2024-04-22 8348/week @ 2024-04-29 9745/week @ 2024-05-06 11851/week @ 2024-05-13 10669/week @ 2024-05-20 9888/week @ 2024-05-27 10544/week @ 2024-06-03 11050/week @ 2024-06-10 10706/week @ 2024-06-17 10417/week @ 2024-06-24 7755/week @ 2024-07-01 8703/week @ 2024-07-08 9050/week @ 2024-07-15 8634/week @ 2024-07-22 8594/week @ 2024-07-29 7546/week @ 2024-08-05

34,091 每月下载量
7 crates 中使用

MIT 许可证

8MB
141K SLoC

Bazel 112K SLoC // 0.1% comments Rust 29K SLoC // 0.0% comments Shell 181 SLoC // 0.3% comments Go 148 SLoC // 0.2% comments Forge Config 1 SLoC // 0.8% comments

google-cloud-pubsub

Google Cloud Platform pub/sub 库。

crates.io

安装

[dependencies]
google-cloud-pubsub = "version"

快速入门

身份验证

有两种方式可以创建一个对 Google Cloud 进行身份验证的客户端。

自动

函数 with_auth() 将尝试从环境变量 GOOGLE_APPLICATION_CREDENTIALSGOOGLE_APPLICATION_CREDENTIALS_JSON 或元数据服务器中读取凭据。

这也在 google-cloud-auth 中进行了描述

use google_cloud_pubsub::client::{ClientConfig, Client};

async fn run() {
    let config = ClientConfig::default().with_auth().await.unwrap();
    let client = Client::new(config).await.unwrap();
}

手动

当你不能使用 gcloud 进行身份验证,但你有一种不同的方式获取凭据(例如不同的环境变量)时,你可以解析自己的 'credentials-file' 并像这样使用它

use google_cloud_auth::credentials::CredentialsFile;
// or google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile
use google_cloud_pubsub::client::{ClientConfig, Client};

async fn run(cred: CredentialsFile) {
    let config = ClientConfig::default().with_credentials(cred).await.unwrap();
    let client = Client::new(config).await.unwrap();
}

模拟器

对于测试,你可以使用 Emulator-Option,如下所示:在执行程序之前,在以下环境变量中指定模拟器的地址。

export PUBSUB_EMULATOR_HOST=localhost:8681

发布消息

use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::topic::TopicConfig;
use google_cloud_pubsub::subscription::SubscriptionConfig;
use google_cloud_gax::grpc::Status;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

async fn run(config: ClientConfig) -> Result<(), Status> {

    // Create pubsub client.
    let client = Client::new(config).await.unwrap();

    // Create topic.
    let topic = client.topic("test-topic");
    if !topic.exists(None).await? {
        topic.create(None, None).await?;
    }

    // Start publisher.
    let publisher = topic.new_publisher(None);

    // Publish message.
    let tasks : Vec<JoinHandle<Result<String,Status>>> = (0..10).into_iter().map(|_i| {
        let publisher = publisher.clone();
        tokio::spawn(async move {
            let msg = PubsubMessage {
               data: "abc".into(),
               // Set ordering_key if needed (https://cloud.google.com/pubsub/docs/ordering)
               ordering_key: "order".into(),
               ..Default::default()
            };

            // Send a message. There are also `publish_bulk` and `publish_immediately` methods.
            let mut awaiter = publisher.publish(msg).await;

            // The get method blocks until a server-generated ID or an error is returned for the published message.
            awaiter.get().await
        })
    }).collect();

    // Wait for all publish task finish
    for task in tasks {
        let message_id = task.await.unwrap()?;
    }

    // Wait for publishers in topic finish.
    let mut publisher = publisher;
    publisher.shutdown();

    Ok(())
}

订阅消息

use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::SubscriptionConfig;
use google_cloud_gax::grpc::Status;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use futures_util::StreamExt;

async fn run(config: ClientConfig) -> Result<(), Status> {

    // Create pubsub client.
    let client = Client::new(config).await.unwrap();

    // Get the topic to subscribe to.
    let topic = client.topic("test-topic");

    // Create subscription
    // If subscription name does not contain a "/", then the project is taken from client above. Otherwise, the
    // name will be treated as a fully qualified resource name
    let config = SubscriptionConfig {
        // Enable message ordering if needed (https://cloud.google.com/pubsub/docs/ordering)
        enable_message_ordering: true,
        ..Default::default()
    };

    // Create subscription
    let subscription = client.subscription("test-subscription");
    if !subscription.exists(None).await? {
        subscription.create(topic.fully_qualified_name(), config, None).await?;
    }

    // Token for cancel.
    let cancel = CancellationToken::new();
    let cancel2 = cancel.clone();
    tokio::spawn(async move {
        // Cancel after 10 seconds.
        tokio::time::sleep(Duration::from_secs(10)).await;
        cancel2.cancel();
    });

    // Receive blocks until the ctx is cancelled or an error occurs.
    // Or simply use the `subscription.subscribe` method.
    subscription.receive(|mut message, cancel| async move {
        // Handle data.
        println!("Got Message: {:?}", message.message.data);

        // Ack or Nack message.
        let _ = message.ack().await;
    }, cancel.clone(), None).await?;

    // Delete subscription if needed.
    subscription.delete(None).await?;

    Ok(())
}

订阅消息(另一种方式)

取消后,等待所有拉取的消息都被处理。

use std::time::Duration;
use futures_util::StreamExt;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
use google_cloud_gax::grpc::Status;

async fn run(config: ClientConfig) -> Result<(), Status> {
     // Creating Client, Topic and Subscription...
     let client = Client::new(config).await.unwrap();
     let subscription = client.subscription("test-subscription");

     // Read the messages as a stream
     let mut stream = subscription.subscribe(None).await.unwrap();
     let cancellable = stream.cancellable();
     let task = tokio::spawn(async move {
         // None if the stream is cancelled
         while let Some(message) = stream.next().await {
             message.ack().await.unwrap();
         }
     });
     tokio::time::sleep(Duration::from_secs(60)).await;
     cancellable.cancel();
     let _ = task.await;
     Ok(())
}

取消后,未处理的消息将被 nack。

use std::time::Duration;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
use google_cloud_gax::grpc::Status;

async fn run(config: ClientConfig) -> Result<(), Status> {
     // Creating Client, Topic and Subscription...
     let client = Client::new(config).await.unwrap();
     let subscription = client.subscription("test-subscription");

     // Read the messages as a stream
     let mut stream = subscription.subscribe(None).await.unwrap();
     let cancellable = stream.cancellable();
     let task = tokio::spawn(async move {
         // None if the stream is cancelled
         while let Some(message) = stream.read().await {
             message.ack().await.unwrap();
         }
     });
     tokio::time::sleep(Duration::from_secs(60)).await;
     cancellable.cancel();
     let _ = task.await;
     Ok(())
}

依赖

~14–28MB
~520K SLoC