7 个稳定版本 (3 个主要版本)
新 41.0.0 | 2024 年 8 月 15 日 |
---|---|
40.1.0 | 2024 年 7 月 16 日 |
39.1.0 | 2024 年 7 月 16 日 |
38.1.1 | 2024 年 6 月 5 日 |
38.0.0 | 2024 年 5 月 28 日 |
#745 in 魔法豆
每月下载量 1,060
81KB
1.5K SLoC
Apache Datafusion 的 Ethereum RPC 桥
关于
本适配器允许使用 SQL 查询与 Ethereum 兼容的区块链节点中的数据。
SQL 查询由 Apache Datafusion 引擎进行分析和优化,并使用自定义物理计划节点将它们转换为对节点的 Ethereum JSON-RPC 调用,最大程度地利用谓词下推。节点通信通过 alloy
库完成。
该库还提供用于 ABI 解码的 UDFs(一组自定义 SQL 函数)。
该库目前设计为嵌入式,但目标也是提供支持 FlightSQL 协议和流数据查询的应用程序。
关于 crates.io 发布的警告
我们目前暂停了将此库发布到 crates.io
,在从 ethers
迁移到 alloy
之后。这是因为 alloy
团队尚未发布他们的库(见 alloy-rs/alloy#791)。请现在使用 git
依赖项。
快速入门
设置依赖项
# Note: Alloy is still under active development and needs to be used via git
alloy = { git = "https://github.com/alloy-rs/alloy", branch = "main", features = [
"provider-http",
"provider-ws",
] }
datafusion = { version = "*" }
datafusion-ethers = { git = "https://github.com/kamu-data/datafusion-ethers", branch = "master" }
初始化库并运行查询
// Create `alloy` RPC client
let rpc_client = ProviderBuilder::new()
.on_builtin("https://127.0.0.1:8545")
.await
.unwrap();
// (Optional) Add config extension
let mut cfg = SessionConfig::new()
.with_option_extension(datafusion_ethers::config::EthProviderConfig::default());
// Create `datafusion` session
let mut ctx = SessionContext::new_with_config(cfg);
// Register all UDFs
datafusion_ethers::udf::register_all(&mut ctx).unwrap();
// (Optional) Register JSON UDFs
datafusion_functions_json::register_all(&mut ctx).unwrap();
// Register catalog provider
ctx.register_catalog(
"eth",
Arc::new(datafusion_ethers::provider::EthCatalog::new(rpc_client)),
);
let df = ctx.sql("select * from eth.eth.logs limit 5").await.unwrap();
df.show().await.unwrap();
示例查询
获取原始日志
select *
from eth.eth.logs
where block_number between 0 and 5
limit 5
+--------------+------------------------------------------------------------------+----------------------+-------------------+------------------------------------------------------------------+-----------+------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| block_number | block_hash | block_timestamp | transaction_index | transaction_hash | log_index | address | topic0 | topic1 | topic2 | topic3 | data |

| 3 | e5a51ad7ea21386e46be1a9f94ff96be0b5b9d4fd79a898a3aaa759d1dff6ae4 | 2024-06-07T08:14:44Z | 0 | 944d0ecfa3e3d226b5af093570ba50d743313c0485f236a1414d4781777b5b00 | 0 | 5fbdb2315678afecb367f032d93f642f64180aa3 | d9e93ef3ac030ca8925f1725575c96d8a49bd825c0843a168225c1bb686bba67 | 000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266 | 000000000000000000000000000000000000000000000000000000000000007b | | |
| 3 | e5a51ad7ea21386e46be1a9f94ff96be0b5b9d4fd79a898a3aaa759d1dff6ae4 | 2024-06-07T08:14:44Z | 0 | 944d0ecfa3e3d226b5af093570ba50d743313c0485f236a1414d4781777b5b00 | 1 | 5fbdb2315678afecb367f032d93f642f64180aa3 | da343a831f3915a0c465305afdd6b0f1c8a3c85635bb14272bf16b6de3664a51 | 0000000000000000000000005fbdb2315678afecb367f032d93f642f64180aa3 | | | 00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000005612d626172000000000000000000000000000000000000000000000000000000 |
| 4 | f379253db8ae7ce55c559fc8603b399caa163f95b7e4ef785f3fef50762cc9f2 | 2024-06-07T08:14:45Z | 0 | 4da3936c231342e2855bc879c4c3a77724142c249bc15065e0c2fc0af28e8072 | 0 | e7f1725e7734ce288f8367e1bb143e90bb3f0512 | d9e93ef3ac030ca8925f1725575c96d8a49bd825c0843a168225c1bb686bba67 | 000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266 | 000000000000000000000000000000000000000000000000000000000000007b | | |
| 4 | f379253db8ae7ce55c559fc8603b399caa163f95b7e4ef785f3fef50762cc9f2 | 2024-06-07T08:14:45Z | 0 | 4da3936c231342e2855bc879c4c3a77724142c249bc15065e0c2fc0af28e8072 | 1 | e7f1725e7734ce288f8367e1bb143e90bb3f0512 | da343a831f3915a0c465305afdd6b0f1c8a3c85635bb14272bf16b6de3664a51 | 000000000000000000000000e7f1725e7734ce288f8367e1bb143e90bb3f0512 | | | 00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000005612d626172000000000000000000000000000000000000000000000000000000 |

注意:如果节点实现支持,则
block_time
列将填充,请参阅 ethereum/execution-apis#295。
从特定合约地址获取原始日志
select *
from eth.eth.logs
where address = X'5fbdb2315678afecb367f032d93f642f64180aa3'
获取与特定签名匹配的原始日志
select *
from eth.eth.logs
where topic0 = eth_event_selector('MyLog(address indexed addr, uint64 indexed id)')
将原始日志数据解码为 JSON
select
eth_decode_event(
'SendRequest(uint64 indexed requestId, address indexed consumerAddr, bytes request)',
topic0,
topic1,
topic2,
topic3,
data
) as event
from eth.eth.logs
+-----------------------------------------------------------------------------------------------------------------------+
| event |
+-----------------------------------------------------------------------------------------------------------------------+
| {"consumerAddr":"aabbccddaabbccddaabbccddaabbccddaabbccdd","name":"SendRequest","request":"ff00bbaa","requestId":123} |
+-----------------------------------------------------------------------------------------------------------------------+
将解码的事件提取到具有良好类型的表中(使用 datafusion-functions-json
库)
select
json_get_str(event, 'name') as name,
json_get_int(event, 'requestId') as request_id,
decode(json_get_str(event, 'consumerAddr'), 'hex') as consumer_addr,
decode(json_get_str(event, 'request'), 'hex') as request
from (
select
eth_decode_event(
'SendRequest(uint64 indexed requestId, address indexed consumerAddr, bytes request)',
topic0,
topic1,
topic2,
topic3,
data
) as event
from eth.eth.logs
)
+-------------+------------+------------------------------------------+----------+
| name | request_id | consumer_addr | request |
+-------------+------------+------------------------------------------+----------+
| SendRequest | 123 | aabbccddaabbccddaabbccddaabbccddaabbccdd | ff00bbaa |
+-------------+------------+------------------------------------------+----------+
注意:当前DataFusion不允许UDF根据参数产生不同的结果数据类型。因此,我们无法分析事件签名字面量并从UDF返回相应的嵌套结构。因此,我们必须使用JSON作为返回值。如果您想跳过JSON并直接使用嵌套结构,请查看
EthDecodedLogsToArrow
和EthRawAndDecodedLogsToArrow
转换器。
支持的功能
- 表:
logs
- 可用于访问事务日志事件,其模式与
eth_getLogs
RPC方法紧密对应 - 支持对
block_number
、block_hash
、address
、topic[0-3]
的谓词下推 - 支持
limit
- 可用于访问事务日志事件,其模式与
- 配置选项
block_range_from
(默认earliest
)- 在节点下推谓词时,对区块范围的下界(包含)限制block_range_to
(默认latest
)- 在节点下推谓词时,对区块范围的上界(包含)限制",
- UDF:
eth_event_selector(<solidity log signature>): hash
- 将Solidity日志签名转换为哈希
- 对于通过
topic0
过滤事件非常有用
- UDF:
eth_decode_event(<solidity log signature>), topic0, topic1, topic2, topic3, data): json string
- 将原始事件数据解码为JSON字符串
- 然后可以使用
datafusion-functions-json
crate进一步处理JSON
- 转换器
EthRawLogsToArrow
- 将原始日志数据转换为Arrow记录批处理EthDecodedLogsToArrow
- 将解码的日志数据转换为Arrow记录批处理EthRawAndDecodedLogsToArrow
- 给定原始日志数据和事件类型,生成包含原始数据列和以嵌套结构作为解码事件列的Arrow记录批处理
- 实用工具
RawLogsStream
- 使用提供的过滤器实现高效的、可恢复的分页,以处理eth_getLogs
相关项目
kamu-cli
- 可验证的数据处理工具集,可以从区块链中获取数据,并将其作为新一代预言机提供给智能合约。
依赖项
~71MB
~1.5M SLoC