#grpc-api #database-client #google-api #google #client #grpc-client #json-path

firestore

库提供了一个简单的 API,用于 Google Firestore 和基于高效 gRPC API 的自研 Serde 序列化器

77 个版本 (35 个破坏性更新)

0.43.0 2024年7月12日
0.41.0 2024年4月12日
0.39.3 2024年3月19日
0.37.5 2023年12月21日
0.6.0 2022年7月31日

#39 in 网页编程

Download history 1790/week @ 2024-05-02 968/week @ 2024-05-09 1408/week @ 2024-05-16 1198/week @ 2024-05-23 1167/week @ 2024-05-30 1626/week @ 2024-06-06 1850/week @ 2024-06-13 1350/week @ 2024-06-20 922/week @ 2024-06-27 680/week @ 2024-07-04 1118/week @ 2024-07-11 833/week @ 2024-07-18 1682/week @ 2024-07-25 2175/week @ 2024-08-01 2175/week @ 2024-08-08 2737/week @ 2024-08-15

8,898 每月下载量
用于 2 crate

Apache-2.0

520KB
13K SLoC

Cargo tests and formatting security audit

Firestore for Rust

库提供了一个简单的 API,用于基于官方 gRPC API 的 Google Firestore

  • 使用 Rust 结构和 Serde 创建或更新文档;
  • 支持
    • 查询/流式传输文档/对象;
    • 列出文档/对象(并支持自动分页滚动);
    • 监听 Firestore 的变化;
    • 事务;
    • 聚合查询;
    • 自动节流以避免 Firestore 的时间限制的流式批量写入;
    • 最近邻(KNN)向量搜索;
    • 解释查询;
  • 流畅的高级和强类型 API;
  • 基于 Tokio 运行的完全异步;
  • 宏,帮助您将 JSON 路径作为对结构字段引用;
  • 实现自己的 Serde 序列化器以支持 Firestore protobuf 值;
  • 支持多个数据库 ID;
  • 支持扩展数据类型;
    • Firestore 时间戳支持 #[serde)] 和专用结构
    • 经纬度
    • 参考
  • 支持集合和文档的缓存
    • 内存缓存;
    • 持久缓存;
  • 基于 gcloud-sdk 库 的 Google 客户端,该库自动检测 GCE 环境或本地开发的应用程序默认帐户;

快速入门

Cargo.toml

[dependencies]
firestore = "0.43"

示例

所有示例均可在 示例目录 中找到。

要使用环境变量运行示例

PROJECT_ID=<your-google-project-id> cargo run --example crud

Firestore 数据库客户端实例和生命周期

要创建一个新的 Firestore 客户端实例,您至少需要提供一个 GCP 项目 ID。不建议为每个请求创建一个新的客户端,因此建议一次性创建客户端并在可能的情况下重用它。克隆实例比创建新实例便宜得多。

客户端是通过使用 Firestore::new 方法创建的

use firestore::*;

// Create an instance
let db = FirestoreDb::new( & config_env_var("PROJECT_ID") ? ).await?;

这是创建客户端实例的推荐方法,因为它可以自动检测环境并使用凭证、服务账户、GCP 上的 Workload Identity 等。有关详细信息,请参阅下面的 Google 身份验证 部分。

如果您需要显式指定密钥文件来创建新的实例,可以使用

FirestoreDb::with_options_service_account_key_file(
  FirestoreDbOptions::new(config_env_var("PROJECT_ID") ?.to_string()),
  "/tmp/key.json".into()
).await?

或者如果您需要更大的灵活性,可以使用预配置的令牌源和作用域

FirestoreDb::with_options_token_source(
  FirestoreDbOptions::new(config_env_var("PROJECT_ID") ?.to_string()),
  gcloud_sdk::GCP_DEFAULT_SCOPES.clone(),
  gcloud_sdk::TokenSourceType::File("/tmp/key.json".into())
).await?

Firebase 现在支持每个项目中的多个数据库,因此您可以在选项中指定数据库 ID

FirestoreDb::with_options(
  FirestoreDbOptions::new("your-project-id".to_string())
    .with_database_id("your-database-id".to_string())
  )
.await?

Fluent API

该库提供了两个 API

  • Fluent API:为了简化开发和开发者体验,从 v0.12.x 版本开始,该库提供了更多高级 API。这是所有应用程序推荐使用的 API。
  • 经典和低级 API:在 0.12 版本之前存在的 API 仍然可用且未弃用,因此当需要时可以继续使用。此外,Fluent API 基于经典的 API,通常类似于智能且方便的构造函数。API 可能会因为引入不兼容的更改而更改,因此不建议长期使用。
use firestore::*;

const TEST_COLLECTION_NAME: &'static str = "test";

let my_struct = MyTestStructure {
  some_id: "test-1".to_string(),
  some_string: "Test".to_string(),
  one_more_string: "Test2".to_string(),
  some_num: 42,
};

// Create
let object_returned: MyTestStructure = db.fluent()
  .insert()
  .into(TEST_COLLECTION_NAME)
  .document_id( & my_struct.some_id)
  .object( & my_struct)
  .execute()
  .await?;

// Update or Create 
// (Firestore supports creating documents with update if you provide the document ID).
let object_updated: MyTestStructure = db.fluent()
  .update()
  .fields(paths!(MyTestStructure::{some_num, one_more_string}))
  .in_col(TEST_COLLECTION_NAME)
  .document_id( & my_struct.some_id)
  .object( & MyTestStructure {
      some_num: my_struct.some_num + 1,
      one_more_string: "updated-value".to_string(),
        ..my_struct.clone()
   })
  .execute()
  .await?;

// Get object by id
let find_it_again: Option<MyTestStructure> = db.fluent()
  .select()
  .by_id_in(TEST_COLLECTION_NAME)
  .obj()
  .one( & my_struct.some_id)
  .await?;

// Delete data
db.fluent()
  .delete()
  .from(TEST_COLLECTION_NAME)
  .document_id( & my_struct.some_id)
  .execute()
  .await?;

查询

该库支持丰富的查询 API,包括筛选、排序、分页等。

// Query as a stream our data
let object_stream: BoxStream<FirestoreResult<MyTestStructure> > = db.fluent()
  .select()
  .fields(paths!(MyTestStructure::{some_id, some_num, some_string, one_more_string, created_at})) // Optionally select the fields needed
  .from(TEST_COLLECTION_NAME)
  .filter( | q| { // Fluent filter API example
      q.for_all([
        q.field(path!(MyTestStructure::some_num)).is_not_null(),
        q.field(path!(MyTestStructure::some_string)).eq("Test"),
        // Sometimes you have optional filters
        Some("Test2")
          .and_then( | value | q.field(path ! (MyTestStructure::one_more_string)).eq(value)),        
      ])
  })
  .order_by([(
    path!(MyTestStructure::some_num),
    FirestoreQueryDirection::Descending,
  )])
  .obj() // Reading documents as structures using Serde gRPC deserializer
  .stream_query_with_errors()
  .await?;

let as_vec: Vec<MyTestStructure> = object_stream.try_collect().await?;
println!("{:?}", as_vec);

使用

  • q.for_all 用于 AND 条件
  • q.for_any 用于 OR 条件(Firestore 最近刚刚增加了对 OR 条件的支持)

您可以嵌套 q.for_all/q.for_any

获取和批量获取支持


let find_it_again: Option<MyTestStructure> = db.fluent()
  .select()
  .by_id_in(TEST_COLLECTION_NAME)
  .obj()
  .one( & my_struct.some_id)
  .await?;

let object_stream: BoxStream<(String, Option<MyTestStructure>) > = db.fluent()
  .select()
  .by_id_in(TEST_COLLECTION_NAME)
  .obj()
  .batch(vec!["test-0", "test-5"])
  .await?;

时间戳支持

默认情况下,DateTime 等类型将序列化为字符串到 Firestore(而反序列化则从 Timestamps 和 Strings 读取)。

要更改此行为并在数据库级别支持 Firestore 时间戳,有两种选项

  • #[serde(with)] 和属性
#[derive(Debug, Clone, Deserialize, Serialize)]
struct MyTestStructure {
    #[serde(with = "firestore::serialize_as_timestamp")]
    created_at: DateTime<Utc>,

    #[serde(default)]
    #[serde(with = "firestore::serialize_as_optional_timestamp")]
    updated_at: Option<DateTime<Utc>>,
}
  • 使用类型 FirestoreTimestamp
#[derive(Debug, Clone, Deserialize, Serialize)]
struct MyTestStructure {
    created_at: firestore::FirestoreTimestamp,
    updated_at: Option<firestore::FirestoreTimestamp>
}

这将只更改 Firestore 序列化,但它仍然将字符串序列化到 JSON(因此您可以重用相同的模型用于 JSON 和 Firestore)。

在您的查询中,您需要使用包装类 firestore::FirestoreTimestamp,例如

   q.field(path!(MyTestStructure::created_at)).less_than_or_equal(firestore::FirestoreTimestamp(Utc::now()))

嵌套集合

您可以通过指定文档的父级路径/位置与嵌套集合一起工作


// Creating a parent doc
db.fluent()
  .insert()
  .into(TEST_PARENT_COLLECTION_NAME)
  .document_id(&parent_struct.some_id)
  .object(&parent_struct)
  .execute()
  .await?;

// The doc path where we store our children
let parent_path = db.parent_path(TEST_PARENT_COLLECTION_NAME, parent_struct.some_id)?;

// Create a child doc
db.fluent()
  .insert()
  .into(TEST_CHILD_COLLECTION_NAME)
  .document_id(&child_struct.some_id)
  .parent(&parent_path)
  .object(&child_struct)
  .execute()
  .await?;

// Listing children
println!("Listing all children");

let objs_stream: BoxStream<MyChildStructure> = db.fluent()
  .list()
  .from(TEST_CHILD_COLLECTION_NAME)
  .parent( & parent_path)
  .obj()
  .stream_all()
  .await?;

完整的示例请参见 此处

您可以使用 at() 使用嵌套多个级别的集合

let parent_path =
db.parent_path(TEST_PARENT_COLLECTION_NAME, "parent-id")?
  .at(TEST_CHILD_COLLECTION_NAME, "child-id")?
  .at(TEST_GRANDCHILD_COLLECTION_NAME, "grand-child-id")?;

事务

要手动管理事务,可以使用 db.begin_transaction(),然后使用 Fluent API 添加事务中需要的操作。

let mut transaction = db.begin_transaction().await?;

db.fluent()
  .update()
  .fields(paths!(MyTestStructure::{
       some_string
     }))
  .in_col(TEST_COLLECTION_NAME)
  .document_id("test-0")
  .object( & MyTestStructure {
    some_id: format!("test-0"),
    some_string: "UpdatedTest".to_string(),
  })
  .add_to_transaction( & mut transaction) ?;

db.fluent()
  .delete()
  .from(TEST_COLLECTION_NAME)
  .document_id("test-5")
  .add_to_transaction( & mut transaction) ?;

transaction.commit().await?;

您还可以使用 run_transaction 执行自动以指数退避重试的事务。

    db.run_transaction( | db, transaction| {
      Box::pin(async move {
      let mut test_structure: MyTestStructure = db
        .fluent()
        .select()
        .by_id_in(TEST_COLLECTION_NAME)
        .obj()
        .one(TEST_DOCUMENT_ID)
        .await?
        .expect("Missing document");

      // Perform some kind of operation that depends on the state of the document
      test_structure.test_string += "a";

      db.fluent()
        .update()
        .fields(paths!(MyTestStructure::{
          test_string
         }))
        .in_col(TEST_COLLECTION_NAME)
        .document_id(TEST_DOCUMENT_ID)
        .object(&test_structure)
        .add_to_transaction(transaction) ?;

        Ok(())
      })
})
  .await?;

请参阅可用的完整示例 此处

请注意,Firestore不支持在事务中创建文档(自动生成文档ID),因此您需要使用update()来隐式创建文档并指定您自己的ID。

将Firestore文档元数据作为结构字段读取

Firestore为每个创建的文档提供额外的生成字段

  • _firestore_id:生成的文档ID(当客户端未指定时);
  • _firestore_created:文档创建的时间;
  • _firestore_updated:文档最后更改的时间;

为了能够读取它们,库将它们作为系统字段提供给Serde反序列化器,具有保留名称,因此您可以在您的结构中指定它们

#[derive(Debug, Clone, Deserialize, Serialize)]
struct MyTestStructure {
    #[serde(alias = "_firestore_id")]
    id: Option<String>,
    #[serde(alias = "_firestore_created")]
    created_at: Option<DateTime<Utc>>,
    #[serde(alias = "_firestore_updated")]
    updated_at: Option<DateTime<Utc>>,
    some_string: String,
    one_more_string: String,
    some_num: u64,
}

完整示例请见此处

在动态/文档级别上工作

有时静态结构可能会限制您处理动态数据,因此可以使用Fluent API来处理文档,而无需引入任何结构。

let object_returned = db
.fluent()
.insert()
.into(TEST_COLLECTION_NAME)
.document_id("test-1")
.document(FirestoreDb::serialize_map_to_doc("",
    [
      ("some_id", "test-id".into()),
      ("some_string", "test-value".into()),
      ("some_num", 42.into()),
      (
      "embedded_obj",
        FirestoreValue::from_map([
          ("inner_some_id", "inner-id-value".into()),
          ("inner_some_string", "inner-some-value".into()),
        ]),
      ),
      ("created_at", FirestoreTimestamp(Utc::now()).into()),
    ])?
)
.execute()
.await?;

完整示例请见此处

文档转换

库支持事务和批量写入中的服务器端文档转换


// Only transformation
db.fluent()
.update()
.in_col(TEST_COLLECTION_NAME)
.document_id("test-4")
.transforms(|t| { // Transformations
    t.fields([
      t.field(path!(MyTestStructure::some_num)).increment(10),
      t.field(path!(MyTestStructure::some_array)).append_missing_elements([4, 5]),
      t.field(path!(MyTestStructure::some_array)).remove_all_from_array([3]),
    ])
})
.only_transform()
.add_to_transaction( & mut transaction) ?; // or add_to_batch

// Update and transform (in this order and atomically):
db.fluent()
.update()
.in_col(TEST_COLLECTION_NAME)
.document_id("test-5")
.object(&my_obj) // Updating the objects with the fields here
.transforms(|t| { // Transformations after the update
    t.fields([
      t.field(path!(MyTestStructure::some_num)).increment(10),
    ])
})
.add_to_transaction(&mut transaction) ?; // or add_to_batch

在Firestore上监听文档更改

为了帮助处理异步事件监听器,库支持高级API,以在单独的线程上监听Firestore的事件

监听器实现需要提供用于存储指定目标的最后接收令牌的存储,以便能够从最后处理的令牌恢复监听更改,并避免接收所有之前的变化。

库提供了基本的令牌存储实现,但如果需要,您可以实现自己的更复杂的存储

  • FirestoreTempFilesListenStateStorage - 将令牌作为本地文件系统上的临时文件存储;
  • FirestoreMemListenStateStorage - 由HashMap支持的内存存储(使用此实现,如果您重新启动应用程序,您将再次收到所有通知);

let mut listener = db.create_listener(
    FirestoreTempFilesListenStateStorage::new() // or FirestoreMemListenStateStorage or your own implementation 
).await?;

// Adding query listener
db.fluent()
.select()
.from(TEST_COLLECTION_NAME)
.listen()
.add_target(TEST_TARGET_ID_BY_QUERY, &mut listener) ?;

// Adding docs listener by IDs
db.fluent()
.select()
.by_id_in(TEST_COLLECTION_NAME)
.batch_listen([doc_id1, doc_id2])
.add_target(TEST_TARGET_ID_BY_DOC_IDS, &mut listener) ?;

listener
.start( | event| async move {
    match event {
        FirestoreListenEvent::DocumentChange( ref doc_change) => {
            println ! ("Doc changed: {:?}", doc_change);
            
            if let Some(doc) = & doc_change.document {
              let obj: MyTestStructure =
              FirestoreDb::deserialize_doc_to::<MyTestStructure > (doc)
              .expect("Deserialized object");
              println ! ("As object: {:?}", obj);
            }
        }
        _ => {
          println ! ("Received a listen response event to handle: {:?}", event);
        }
    }

  Ok(())
})
.await?;

// Wait some events like Ctrl-C, signals, etc
// <put-your-implementation-for-wait-here>

// and then shutdown
listener.shutdown().await?;

在示例目录中查看完整示例。

显式空值序列化

默认情况下,所有Option<>都序列化为空字段,这在许多情况下很方便。然而,有时您需要显式空值。

为此,为serde(with)实现了附加属性

  • 对于任何类型
#[serde(default)]
#[serde(with = "firestore::serialize_as_null")]
test_null: Option<String>,
  • 对于Firestore时间戳属性
#[serde(default)]
#[serde(with = "firestore::serialize_as_null_timestamp")]
test_null: Option<DateTime<Utc> >,

选择聚合函数

库支持查询的聚合函数

db.fluent()
  .select()
  .from(TEST_COLLECTION_NAME)
  .aggregate(|a| a.fields([a.field(path!(MyAggTestStructure::counter)).count()]))
  .obj()
  .query()
  .await?;

更新/删除预条件

库支持预条件

  .precondition(FirestoreWritePrecondition::Exists(true))

解释查询

库支持查询解释

db.fluent()
  .select()
  .from(TEST_COLLECTION_NAME)
  .explain()
  // or use explain_with_options if you want to provide additional options like analyze which run query to gather additional statistics 
  // .explain_with_options(FirestoreExplainOptions::new().with_analyze(true))
  .stream_query_with_metadata()
  .await?;

Google身份验证

在以下位置查找凭据,首选找到的第一个位置

  • 一个路径由GOOGLE_APPLICATION_CREDENTIALS环境变量指定的JSON文件。
  • 在gcloud命令行工具已知的位置的JSON文件,使用gcloud auth application-default login
  • 在Google Compute Engine上,它从元数据服务器获取凭据。

本地开发

不要混淆gcloud auth logingcloud auth application-default login用于本地开发,因为第一个授权仅授权gcloud工具访问云平台。

后者通过Web流程获取用户访问凭证,并将它们放在应用程序默认凭证(ADC)的已知位置。此命令在您正在开发通常使用服务账户但需要在本地开发环境中运行代码的代码时很有用,在这种环境中提供用户凭证更容易。因此,为了在本地开发中使用,您需要使用 gcloud auth application-default login

与Docker镜像一起工作

在设计Dockerfile时,请确保您已安装根CA证书或使用已包含它们的基镜像。如果没有安装证书,您通常会观察到以下错误:

SystemError(FirestoreSystemError { public: FirestoreErrorPublicGenericDetails { code: "GrpcStatus(tonic::transport::Error(Transport, hyper::Error(Connect, Custom { kind: InvalidData, error: InvalidCertificateData(\"invalid peer certificate: UnknownIssuer\") })))" }, message: "GCloud system error: Tonic/gRPC error: transport error" })

例如,对于基于Debian的镜像,这通常可以使用此包修复:

RUN apt-get install -y ca-certificates

此外,我建议考虑使用Google Distroless镜像,因为它们是安全的,已包含根CA证书,并且已针对大小优化。

Firestore模拟器

要使用Google Firestore模拟器,您可以使用环境变量

export FIRESTORE_EMULATOR_HOST="localhost:8080"

或者使用以下方式指定它:FirestoreDb::with_options()

缓存

该库支持集合和文档的缓存。缓存是利用Firestore监听器在文档更改时更新缓存,这意味着更新将自动跨分布式实例传播。

这有助于避免多次从Firestore读取并付费相同的文档。特别是对于一些数据,如字典、配置和其他不经常更改的信息。实际上,这可以真正帮助降低应用程序的成本和延迟。

缓存在文档级别上工作。缓存将用于以下操作:

  • 通过ID读取文档(获取和批量获取);
  • 列出集合中的所有文档;
  • 对集合中的文档查询的部分支持
    • 过滤;
    • 排序;
    • 分页/游标;

(未来可能会扩展其他操作的缓存)。

该库提供了两种缓存实现

  • 内存缓存,使用moka缓存库实现;
  • 持久缓存,使用redb和protobuf实现;

缓存是可选的,您需要在需要时使用cargo功能启用它

  • caching-memory用于内存缓存;
  • caching-persistent用于持久/磁盘后缓存的缓存;

加载模式

缓存支持不同的初始化/加载模式

  • PreloadNone:不预加载任何内容,只需在工作时填充缓存;
  • PreloadAllDocs:将集合中的所有文档预加载到缓存中;
  • PreloadAllIfEmpty:仅在缓存为空时将集合中的所有文档预加载到缓存中(这对于持久缓存很有用,对于内存缓存它和PreloadAllDocs相同);

如何更新缓存

以下情况下将更新缓存:

  • 当您通过缓存按ID读取文档但未在缓存中找到时,它将从Firestore加载并缓存;
  • Firestore监听器在接收到有关文档更改的通知(外部或来自您的应用程序)时将更新缓存;
  • 在启动时使用预加载;

使用方法

// Create an instance
let db = FirestoreDb::new( &config_env_var("PROJECT_ID") ? ).await?;

const TEST_COLLECTION_NAME: &'static str = "test-caching";

// Create a cache instance that also creates an internal Firestore listener
let mut cache = FirestoreCache::new(
"example-mem-cache".into(),
&db,
FirestoreMemoryCacheBackend::new(
  FirestoreCacheConfiguration::new().add_collection_config(
    &db,
    FirestoreCacheCollectionConfiguration::new(
      TEST_COLLECTION_NAME,
      FirestoreListenerTarget::new(1000),
      FirestoreCacheCollectionLoadMode::PreloadNone,
    )
  ),
)?,
  FirestoreMemListenStateStorage::new(),
)
.await?;

// Load and init cache
cache.load().await?; // Required even if you don't preload anything

// Read a document through the cache. If it is not found in the cache, it will be loaded from Firestore and cached.
let my_struct0: Option<MyTestStructure> = db.read_through_cache(&cache)
  .fluent()
  .select()
  .by_id_in(TEST_COLLECTION_NAME)
  .obj()
  .one("test-1")
  .await?;

// Read a document only from the cache. If it is not found in the cache, it will return None.
let my_struct0: Option<MyTestStructure> = db.read_cached_only(&cache)
  .fluent()
  .select()
  .by_id_in(TEST_COLLECTION_NAME)
  .obj()
  .one("test-1")
  .await?;

完整的示例在此处可用:这里这里

Cargo为依赖项提供了不同TLS功能的支持

  • tls-roots:默认功能以支持本机TLS根
  • tls-webpki-roots:切换到webpki crate根的功能

如何测试此库

测试目录中有集成测试,每次提交都会运行一次,针对为测试目的分配的真实Firestore实例。请注意,不要引入大量的文档读取/更新以及与其他测试的集合隔离。

许可证

Apache软件许可证(ASL)

作者

Abdulla Abdurakhmanov

依赖项

~113MB
~2M SLoC