#datafusion #ethereum #sql #blockchain

datafusion-ethers

Apache Datafusion 的 Ethereum RPC 桥

7 个稳定版本 (3 个主要版本)

41.0.0 2024 年 8 月 15 日
40.1.0 2024 年 7 月 16 日
39.1.0 2024 年 7 月 16 日
38.1.1 2024 年 6 月 5 日
38.0.0 2024 年 5 月 28 日

#745 in 魔法豆

Download history 111/week @ 2024-05-24 198/week @ 2024-05-31 49/week @ 2024-06-07 4/week @ 2024-06-14 2/week @ 2024-06-28 439/week @ 2024-07-12 278/week @ 2024-07-19 284/week @ 2024-07-26 107/week @ 2024-08-02 316/week @ 2024-08-09

每月下载量 1,060

Apache-2.0

81KB
1.5K SLoC

Apache Datafusion 的 Ethereum RPC 桥

Release CI Dependencies Chat

关于

本适配器允许使用 SQL 查询与 Ethereum 兼容的区块链节点中的数据。

SQL 查询由 Apache Datafusion 引擎进行分析和优化,并使用自定义物理计划节点将它们转换为对节点的 Ethereum JSON-RPC 调用,最大程度地利用谓词下推。节点通信通过 alloy 库完成。

该库还提供用于 ABI 解码的 UDFs(一组自定义 SQL 函数)。

该库目前设计为嵌入式,但目标也是提供支持 FlightSQL 协议和流数据查询的应用程序。

关于 crates.io 发布的警告

我们目前暂停了将此库发布到 crates.io,在从 ethers 迁移到 alloy 之后。这是因为 alloy 团队尚未发布他们的库(见 alloy-rs/alloy#791)。请现在使用 git 依赖项。

快速入门

设置依赖项

# Note: Alloy is still under active development and needs to be used via git
alloy = { git = "https://github.com/alloy-rs/alloy", branch = "main", features = [
    "provider-http",
    "provider-ws",
] }
datafusion = { version = "*" }
datafusion-ethers = { git = "https://github.com/kamu-data/datafusion-ethers", branch = "master" }

初始化库并运行查询

// Create `alloy` RPC client
let rpc_client = ProviderBuilder::new()
    .on_builtin("https://127.0.0.1:8545")
    .await
    .unwrap();

// (Optional) Add config extension
let mut cfg = SessionConfig::new()
    .with_option_extension(datafusion_ethers::config::EthProviderConfig::default());

// Create `datafusion` session
let mut ctx = SessionContext::new_with_config(cfg);

// Register all UDFs
datafusion_ethers::udf::register_all(&mut ctx).unwrap();

// (Optional) Register JSON UDFs
datafusion_functions_json::register_all(&mut ctx).unwrap();

// Register catalog provider
ctx.register_catalog(
    "eth",
    Arc::new(datafusion_ethers::provider::EthCatalog::new(rpc_client)),
);

let df = ctx.sql("select * from eth.eth.logs limit 5").await.unwrap();
df.show().await.unwrap();

示例查询

获取原始日志

select *
from eth.eth.logs
where block_number between 0 and 5
limit 5

| block_number | block_hash                                                       | block_timestamp      | transaction_index | transaction_hash                                                 | log_index | address                                  | topic0                                                           | topic1                                                           | topic2                                                           | topic3 | data                                                                                                                                                                                             |

| 3            | e5a51ad7ea21386e46be1a9f94ff96be0b5b9d4fd79a898a3aaa759d1dff6ae4 | 2024-06-07T08:14:44Z | 0                 | 944d0ecfa3e3d226b5af093570ba50d743313c0485f236a1414d4781777b5b00 | 0         | 5fbdb2315678afecb367f032d93f642f64180aa3 | d9e93ef3ac030ca8925f1725575c96d8a49bd825c0843a168225c1bb686bba67 | 000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266 | 000000000000000000000000000000000000000000000000000000000000007b |        |                                                                                                                                                                                                  |
| 3            | e5a51ad7ea21386e46be1a9f94ff96be0b5b9d4fd79a898a3aaa759d1dff6ae4 | 2024-06-07T08:14:44Z | 0                 | 944d0ecfa3e3d226b5af093570ba50d743313c0485f236a1414d4781777b5b00 | 1         | 5fbdb2315678afecb367f032d93f642f64180aa3 | da343a831f3915a0c465305afdd6b0f1c8a3c85635bb14272bf16b6de3664a51 | 0000000000000000000000005fbdb2315678afecb367f032d93f642f64180aa3 |                                                                  |        | 00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000005612d626172000000000000000000000000000000000000000000000000000000 |
| 4            | f379253db8ae7ce55c559fc8603b399caa163f95b7e4ef785f3fef50762cc9f2 | 2024-06-07T08:14:45Z | 0                 | 4da3936c231342e2855bc879c4c3a77724142c249bc15065e0c2fc0af28e8072 | 0         | e7f1725e7734ce288f8367e1bb143e90bb3f0512 | d9e93ef3ac030ca8925f1725575c96d8a49bd825c0843a168225c1bb686bba67 | 000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266 | 000000000000000000000000000000000000000000000000000000000000007b |        |                                                                                                                                                                                                  |
| 4            | f379253db8ae7ce55c559fc8603b399caa163f95b7e4ef785f3fef50762cc9f2 | 2024-06-07T08:14:45Z | 0                 | 4da3936c231342e2855bc879c4c3a77724142c249bc15065e0c2fc0af28e8072 | 1         | e7f1725e7734ce288f8367e1bb143e90bb3f0512 | da343a831f3915a0c465305afdd6b0f1c8a3c85635bb14272bf16b6de3664a51 | 000000000000000000000000e7f1725e7734ce288f8367e1bb143e90bb3f0512 |                                                                  |        | 00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000005612d626172000000000000000000000000000000000000000000000000000000 |


注意:如果节点实现支持,则 block_time 列将填充,请参阅 ethereum/execution-apis#295

从特定合约地址获取原始日志

select *
from eth.eth.logs
where address = X'5fbdb2315678afecb367f032d93f642f64180aa3'

获取与特定签名匹配的原始日志

select *
from eth.eth.logs
where topic0 = eth_event_selector('MyLog(address indexed addr, uint64 indexed id)')

将原始日志数据解码为 JSON

select
  eth_decode_event(
    'SendRequest(uint64 indexed requestId, address indexed consumerAddr, bytes request)',
    topic0,
    topic1,
    topic2,
    topic3,
    data
  ) as event
from eth.eth.logs
+-----------------------------------------------------------------------------------------------------------------------+
| event                                                                                                                 |
+-----------------------------------------------------------------------------------------------------------------------+
| {"consumerAddr":"aabbccddaabbccddaabbccddaabbccddaabbccdd","name":"SendRequest","request":"ff00bbaa","requestId":123} |
+-----------------------------------------------------------------------------------------------------------------------+

将解码的事件提取到具有良好类型的表中(使用 datafusion-functions-json 库)

select
  json_get_str(event, 'name') as name,
  json_get_int(event, 'requestId') as request_id,
  decode(json_get_str(event, 'consumerAddr'), 'hex') as consumer_addr,
  decode(json_get_str(event, 'request'), 'hex') as request
from (
  select
    eth_decode_event(
      'SendRequest(uint64 indexed requestId, address indexed consumerAddr, bytes request)',
      topic0,
      topic1,
      topic2,
      topic3,
      data
    ) as event
  from eth.eth.logs
)
+-------------+------------+------------------------------------------+----------+
| name        | request_id | consumer_addr                            | request  |
+-------------+------------+------------------------------------------+----------+
| SendRequest | 123        | aabbccddaabbccddaabbccddaabbccddaabbccdd | ff00bbaa |
+-------------+------------+------------------------------------------+----------+

注意:当前DataFusion不允许UDF根据参数产生不同的结果数据类型。因此,我们无法分析事件签名字面量并从UDF返回相应的嵌套结构。因此,我们必须使用JSON作为返回值。如果您想跳过JSON并直接使用嵌套结构,请查看EthDecodedLogsToArrowEthRawAndDecodedLogsToArrow转换器。

支持的功能

  • 表:logs
    • 可用于访问事务日志事件,其模式与eth_getLogs RPC方法紧密对应
    • 支持对block_numberblock_hashaddresstopic[0-3]的谓词下推
    • 支持limit
  • 配置选项
    • block_range_from(默认earliest)- 在节点下推谓词时,对区块范围的下界(包含)限制
    • block_range_to(默认latest)- 在节点下推谓词时,对区块范围的上界(包含)限制",
  • UDF:eth_event_selector(<solidity log signature>): hash
    • 将Solidity日志签名转换为哈希
    • 对于通过topic0过滤事件非常有用
  • UDF:eth_decode_event(<solidity log signature>), topic0, topic1, topic2, topic3, data): json string
  • 转换器
    • EthRawLogsToArrow - 将原始日志数据转换为Arrow记录批处理
    • EthDecodedLogsToArrow - 将解码的日志数据转换为Arrow记录批处理
    • EthRawAndDecodedLogsToArrow - 给定原始日志数据和事件类型,生成包含原始数据列和以嵌套结构作为解码事件列的Arrow记录批处理
  • 实用工具
    • RawLogsStream - 使用提供的过滤器实现高效的、可恢复的分页,以处理eth_getLogs
  • kamu-cli - 可验证的数据处理工具集,可以从区块链中获取数据,并将其作为新一代预言机提供给智能合约。

依赖项

~71MB
~1.5M SLoC