#datafusion #query #sql #arrow #aws-sdk

datafusion-objectstore-s3

S3 作为 Datafusion 的对象存储

3 个不稳定版本

0.2.1 2022年5月28日
0.2.0 2022年5月28日
0.1.0 2022年2月19日

#22 in #datafusion


datafusion-tui 中使用

Apache-2.0

785KB
568

DataFusion-ObjectStore-S3

S3 作为 Datafusion 的对象存储。

使用 Datafusion 查询 S3 上的文件

这个包实现了 DataFusion 的 ObjectStore 特性,在 AWS S3 和 S3 标准的实现者上。我们利用官方的 AWS Rust SDK 与 S3 交互。虽然我们理解我们使用的 AWS API 相对稳定,但我们不能保证 AWS 或本包内部 API 的稳定性。这个包的 API 与 DataFusion 密切相关,DataFusion 是一个快速发展的项目,因此我们将根据上游更改进行更改。

示例

以下展示了查询 AWS 和其他实现者(如 MinIO)的示例。

从默认的 AWS 凭据提供者(如环境或 ~/.aws/credentials)加载凭据

let s3_file_system = Arc::new(S3FileSystem::default().await);

S3FileSystem::default()S3FileSystem::new(None, None, None, None, None, None) 的便捷包装。

使用访问密钥和密钥连接到 S3 API 的实现者(在这个例子中是 MinIO)。

// Example credentials provided by MinIO
const ACCESS_KEY_ID: &str = "AKIAIOSFODNN7EXAMPLE";
const SECRET_ACCESS_KEY: &str = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
const PROVIDER_NAME: &str = "Static";
const MINIO_ENDPOINT: &str = "https://127.0.0.1:9000";

let s3_file_system = S3FileSystem::new(
    Some(SharedCredentialsProvider::new(Credentials::new(
        MINIO_ACCESS_KEY_ID,
        MINIO_SECRET_ACCESS_KEY,
        None,
        None,
        PROVIDER_NAME,
    ))), // Credentials provider
    None, // Region
    Some(Endpoint::immutable(Uri::from_static(MINIO_ENDPOINT))), // Endpoint
    None, // RetryConfig
    None, // AsyncSleep
    None, // TimeoutConfig
)
.await;

使用 DataFusion 的 ListingTableConfig 将表注册到 DataFusion 的 ExecutionContext 中,以便可以查询。

let filename = "data/alltypes_plain.snappy.parquet";

let config = ListingTableConfig::new(s3_file_system, filename).infer().await?;

let table = ListingTable::try_new(config)?;

let mut ctx = ExecutionContext::new();

ctx.register_table("tbl", Arc::new(table))?;

let df = ctx.sql("SELECT * FROM tbl").await?;
df.show()

我们还可以将 S3FileSystem 直接注册为 ObjectStoreExecutionContext 上。这提供了一种创建可查询的 TableProviders 的语法糖。

execution_ctx.register_object_store(
    "s3",
    Arc::new(S3FileSystem::default().await),
);

let input_uri = "s3://parquet-testing/data/alltypes_plain.snappy.parquet";

let (object_store, _) = ctx.object_store(input_uri)?;

let config = ListingTableConfig::new(s3_file_system, filename).infer().await?;

let mut table_provider: Arc<dyn TableProvider + Send + Sync> = Arc::new(ListingTable::try_new(config)?);

测试

测试使用 MinIO 运行,它提供了一个亚马逊 S3 API 的容器化实现。

首先克隆测试数据仓库

git submodule update --init --recursive

然后启动 MinIO 容器

docker run \
--detach \
--rm \
--publish 9000:9000 \
--publish 9001:9001 \
--name minio \
--volume "$(pwd)/parquet-testing:/data" \
--env "MINIO_ROOT_USER=AKIAIOSFODNN7EXAMPLE" \
--env "MINIO_ROOT_PASSWORD=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" \
quay.io/minio/minio server /data \
--console-address ":9001"

启动后,按常规方式运行测试

cargo test

依赖关系

~31-46MB
~751K SLoC