3 个不稳定版本
0.2.1 | 2022年5月28日 |
---|---|
0.2.0 | 2022年5月28日 |
0.1.0 | 2022年2月19日 |
#22 in #datafusion
在 datafusion-tui 中使用
785KB
568 行
DataFusion-ObjectStore-S3
S3 作为 Datafusion 的对象存储。
使用 Datafusion 查询 S3 上的文件
这个包实现了 DataFusion 的 ObjectStore
特性,在 AWS S3 和 S3 标准的实现者上。我们利用官方的 AWS Rust SDK 与 S3 交互。虽然我们理解我们使用的 AWS API 相对稳定,但我们不能保证 AWS 或本包内部 API 的稳定性。这个包的 API 与 DataFusion 密切相关,DataFusion 是一个快速发展的项目,因此我们将根据上游更改进行更改。
示例
以下展示了查询 AWS 和其他实现者(如 MinIO)的示例。
从默认的 AWS 凭据提供者(如环境或 ~/.aws/credentials)加载凭据
let s3_file_system = Arc::new(S3FileSystem::default().await);
S3FileSystem::default()
是 S3FileSystem::new(None, None, None, None, None, None)
的便捷包装。
使用访问密钥和密钥连接到 S3 API 的实现者(在这个例子中是 MinIO)。
// Example credentials provided by MinIO
const ACCESS_KEY_ID: &str = "AKIAIOSFODNN7EXAMPLE";
const SECRET_ACCESS_KEY: &str = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
const PROVIDER_NAME: &str = "Static";
const MINIO_ENDPOINT: &str = "https://127.0.0.1:9000";
let s3_file_system = S3FileSystem::new(
Some(SharedCredentialsProvider::new(Credentials::new(
MINIO_ACCESS_KEY_ID,
MINIO_SECRET_ACCESS_KEY,
None,
None,
PROVIDER_NAME,
))), // Credentials provider
None, // Region
Some(Endpoint::immutable(Uri::from_static(MINIO_ENDPOINT))), // Endpoint
None, // RetryConfig
None, // AsyncSleep
None, // TimeoutConfig
)
.await;
使用 DataFusion 的 ListingTableConfig
将表注册到 DataFusion 的 ExecutionContext
中,以便可以查询。
let filename = "data/alltypes_plain.snappy.parquet";
let config = ListingTableConfig::new(s3_file_system, filename).infer().await?;
let table = ListingTable::try_new(config)?;
let mut ctx = ExecutionContext::new();
ctx.register_table("tbl", Arc::new(table))?;
let df = ctx.sql("SELECT * FROM tbl").await?;
df.show()
我们还可以将 S3FileSystem
直接注册为 ObjectStore
在 ExecutionContext
上。这提供了一种创建可查询的 TableProviders
的语法糖。
execution_ctx.register_object_store(
"s3",
Arc::new(S3FileSystem::default().await),
);
let input_uri = "s3://parquet-testing/data/alltypes_plain.snappy.parquet";
let (object_store, _) = ctx.object_store(input_uri)?;
let config = ListingTableConfig::new(s3_file_system, filename).infer().await?;
let mut table_provider: Arc<dyn TableProvider + Send + Sync> = Arc::new(ListingTable::try_new(config)?);
测试
测试使用 MinIO 运行,它提供了一个亚马逊 S3 API 的容器化实现。
首先克隆测试数据仓库
git submodule update --init --recursive
然后启动 MinIO 容器
docker run \
--detach \
--rm \
--publish 9000:9000 \
--publish 9001:9001 \
--name minio \
--volume "$(pwd)/parquet-testing:/data" \
--env "MINIO_ROOT_USER=AKIAIOSFODNN7EXAMPLE" \
--env "MINIO_ROOT_PASSWORD=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" \
quay.io/minio/minio server /data \
--console-address ":9001"
启动后,按常规方式运行测试
cargo test
依赖关系
~31-46MB
~751K SLoC