#substreams #ethereum #firehose #thegraph #helper #streamingfast #api-bindings

substreams-ethereum-core

以太坊链的Substreams开发套件,包含Firehose块模型和辅助工具,以及以太坊ABI编码/解码的实用工具

32个版本

0.9.13 2024年8月20日
0.9.12 2024年6月20日
0.9.11 2024年4月22日
0.9.10 2024年3月25日
0.8.0 2022年11月28日

10 in #streamingfast

Download history 391/week @ 2024-04-29 336/week @ 2024-05-06 323/week @ 2024-05-13 428/week @ 2024-05-20 491/week @ 2024-05-27 427/week @ 2024-06-03 326/week @ 2024-06-10 413/week @ 2024-06-17 300/week @ 2024-06-24 67/week @ 2024-07-01 146/week @ 2024-07-08 302/week @ 2024-07-15 280/week @ 2024-07-22 312/week @ 2024-07-29 288/week @ 2024-08-05 325/week @ 2024-08-12

1,232 每月下载次数
4 个crate中使用 (2 直接)

Apache-2.0

105KB
1K SLoC

Substreams Ethereum

以太坊链的Substreams开发套件,包含Rust Firehose块模型和辅助工具,以及以太坊ABI编码/解码的实用工具。

用法

[package]
name = "substreams-acme"
version = 0.1.2

[lib]
crate-type = ["cdylib"]

[dependencies]
substreams-ethereum = "0.6.0"

开发

我们手动保持Rust Firehose块模型与在 sf-ethereum 中找到的实际Protocol Buffer定义文件同步,并将它们提交到Git。

这意味着对Protobuf文件的所有更改都必须手动重新生成和提交,下面将说明如何进行。

从Protobuf重新生成Rust Firehose块

./gen.sh

注意事项

带元组的ABI

现在支持为ABI生成代码的元组。它确实为使用它们的函数/事件生成了Rust无命名元组。如果您希望在事件中为元组生成“结构”,则可以使用以下说明。您可以展开下面的 Instructions 部分以获取如何“手动”生成代码的详细说明。

说明

第一步将是稍微修改ABI,以便正确生成解码代码。然后,将生成的初始代码复制并修改为正确解码元组。

我们的想法是将内部 tuple 在 ABI 中“爆炸”成它自己的“事件”,这将生成表示元组的 struct 的代码以及解码该 struct 的代码。然后我们将调整这段生成的代码,将一切连接起来。

来自

[
  {
    "anonymous": false,
    "inputs": [
      {
        "indexed": false,
        "internalType": "bytes32",
        "name": "orderHash",
        "type": "bytes32"
      },
      {
        "components": [
          {
            "internalType": "enum ItemType",
            "name": "itemType",
            "type": "uint8"
          },
          {
            "internalType": "uint256",
            "name": "amount",
            "type": "uint256"
          }
        ],
        "indexed": false,
        "internalType": "struct SpentItem[]",
        "name": "offer",
        "type": "tuple[]"
      }
    ],
    "name": "OrderFulfilled",
    "type": "event"
  }
]

我们将 SpentItem 爆炸成它自己的类型,并将 offer 类型 tuple[] 替换为 address[],以便正确编译

[
  {
    "anonymous": false,
    "inputs": [
      {
        "indexed": false,
        "internalType": "bytes32",
        "name": "orderHash",
        "type": "bytes32"
      },
      {
        "components": [
          {
            "internalType": "enum ItemType",
            "name": "itemType",
            "type": "uint8"
          },
          {
            "internalType": "uint256",
            "name": "amount",
            "type": "uint256"
          }
        ],
        "indexed": false,
        "internalType": "struct SpentItem[]",
        "name": "offer",
        "type": "address[]"
      }
    ],
    "name": "OrderFulfilled",
    "type": "event"
  },
  {
    "anonymous": false,
    "name": "SpentItem",
    "type": "event",
    "inputs": [
      {
        "internalType": "enum ItemType",
        "name": "itemType",
        "type": "uint8"
      },
      {
        "internalType": "uint256",
        "name": "amount",
        "type": "uint256"
      }
    ]
  }
]

注意 不需要删除 components 或更改 internalType 值,它们将被忽略。

使用此修改后的 ABI 执行 cargo build,以便在 src/abi/<file>.rs 中生成代码,它现在不正确,但我们将将其复制到其他地方并使其工作。

src/abi/<file>.rs 中找到 OrderFulfilled 事件的生成代码,并将其复制到新文件 src/events.rs 中。你应该复制 pub struct OrderFulfilled 块、impl OrderFulfilled 块和 impl substreams_ethereum::Event for OrderFulfilled 块,以及 SpentItem 结构和 impl SpentItem 块。

#[derive(Debug, Clone, PartialEq)]
pub struct OrderFulfilled {
    pub order_hash: [u8; 32usize],
    pub offer: Vec<Vec<u8>>,
}
impl OrderFulfilled {
    const TOPIC_ID: [u8; 32] = [
        227u8,
        56u8,
        222u8,
        32u8,
        39u8,
        120u8,
        0u8,
        226u8,
        120u8,
        84u8,
        168u8,
        160u8,
        171u8,
        38u8,
        80u8,
        66u8,
        198u8,
        237u8,
        193u8,
        186u8,
        154u8,
        14u8,
        209u8,
        73u8,
        102u8,
        185u8,
        47u8,
        163u8,
        179u8,
        98u8,
        194u8,
        244u8,
    ];
    pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        if log.topics.len() != 1usize {
            return false;
        }
        if log.data.len() < 96usize {
            return false;
        }
        return log.topics.get(0).expect("bounds already checked").as_ref()
            == Self::TOPIC_ID;
    }
    pub fn decode(
        log: &substreams_ethereum::pb::eth::v2::Log,
    ) -> Result<Self, String> {
        let mut values = ethabi::decode(
                &[
                    ethabi::ParamType::FixedBytes(32usize),
                    ethabi::ParamType::Array(
                        Box::new(ethabi::ParamType::Address),
                    ),
                ],
                log.data.as_ref(),
            )
            .map_err(|e| format!("unable to decode log.data: {:?}", e))?;
        values.reverse();
        Ok(Self {
            order_hash: {
                let mut result = [0u8; 32];
                let v = values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_fixed_bytes()
                    .expect(INTERNAL_ERR);
                result.copy_from_slice(&v);
                result
            },
            offer: values
                .pop()
                .expect(INTERNAL_ERR)
                .into_array()
                .expect(INTERNAL_ERR)
                .into_iter()
                .map(|inner| {
                    inner.into_address().expect(INTERNAL_ERR).as_bytes().to_vec()
                })
                .collect(),
        })
    }
}
impl substreams_ethereum::Event for OrderFulfilled {
    const NAME: &'static str = "OrderFulfilled";
    fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        Self::match_log(log)
    }
    fn decode(
        log: &substreams_ethereum::pb::eth::v2::Log,
    ) -> Result<Self, String> {
        Self::decode(log)
    }
}
#[derive(Debug, Clone, PartialEq)]
pub struct SpentItem {
    pub item_type: substreams::scalar::BigInt,
    pub amount: substreams::scalar::BigInt,
}
impl SpentItem {
    const TOPIC_ID: [u8; 32] = [
        18u8,
        7u8,
        103u8,
        62u8,
        30u8,
        101u8,
        94u8,
        85u8,
        209u8,
        209u8,
        166u8,
        82u8,
        139u8,
        137u8,
        197u8,
        45u8,
        11u8,
        224u8,
        230u8,
        74u8,
        27u8,
        234u8,
        238u8,
        52u8,
        150u8,
        245u8,
        214u8,
        202u8,
        230u8,
        104u8,
        138u8,
        22u8,
    ];
    pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        if log.topics.len() != 1usize {
            return false;
        }
        if log.data.len() != 64usize {
            return false;
        }
        return log.topics.get(0).expect("bounds already checked").as_ref()
            == Self::TOPIC_ID;
    }
    pub fn decode(
        log: &substreams_ethereum::pb::eth::v2::Log,
    ) -> Result<Self, String> {
        let mut values = ethabi::decode(
                &[
                    ethabi::ParamType::Uint(8usize),
                    ethabi::ParamType::Uint(256usize),
                ],
                log.data.as_ref(),
            )
            .map_err(|e| format!("unable to decode log.data: {:?}", e))?;
        values.reverse();
        Ok(Self {
            item_type: {
                let mut v = [0 as u8; 32];
                values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_uint()
                    .expect(INTERNAL_ERR)
                    .to_big_endian(v.as_mut_slice());
                substreams::scalar::BigInt::from_unsigned_bytes_be(&v)
            },
            amount: {
                let mut v = [0 as u8; 32];
                values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_uint()
                    .expect(INTERNAL_ERR)
                    .to_big_endian(v.as_mut_slice());
                substreams::scalar::BigInt::from_unsigned_bytes_be(&v)
            },
        })
    }
}
impl substreams_ethereum::Event for SpentItem {
    const NAME: &'static str = "SpentItem";
    fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        Self::match_log(log)
    }
    fn decode(
        log: &substreams_ethereum::pb::eth::v2::Log,
    ) -> Result<Self, String> {
        Self::decode(log)
    }
}

目前,此 src/events.rs 文件没有编译/包含在项目中,因为在 Rust 中,只有当模块在某处“定义”时才会包含模块,所以让我们定义这个模块。在 src/lib.rs 文件的顶部,添加以下内容:

mod events

让我们开始修改我们的错误生成代码,使其正确。首先定义 INTERNAL_ERR 常量,因为它通常被使用,将其放在 src/events.rs 文件的顶部。

const INTERNAL_ERR: &str = "decode event internal error";

现在是一个棘手的部分,我们需要更新 TOPIC_ID 常量,因为它现在是不正确的。如果您已经知道事件 ID(它是 topics #0),那就太好了。如果您不知道,您可以通过对事件定义进行 keccak256 哈希来计算它。您需要事件名称及其类型,在我们的例子中是 OrderFulfilled(bytes32,(uint8,uint256)[])

警告 事件定义必须没有空格或额外的标点符号,任何一个错误的字符都会使整个事件 ID 错误。

您可以使用 jq -'.[] | select(.name == "OrderFulfilled") | .inputs[].type' | tr "\n" ","jq -'.[] | select(.name == "SpentItem") | .inputs[].type' | tr "\n" "," 命令在刚刚修改的 "modified" ABI 文件上,以获取正确的有序类型。然后正确地汇编它,将 OrderFulfilled 的第二个字段更改为元组定义 (uint8,uint256)[](由于我们的修改,它返回为 address[])。

现在我们已经有了事件定义,我们可以计算 keccak256 哈希。我们使用 CLI 工具 keccak-256sum 来完成此操作(《安装说明》)

$ printf 'OrderFulfilled(bytes32,(uint8,uint256)[])' | keccak-256sum
e86f4727db138d4b9cb776888b1d2239562eafaa38dd110b7d5def7698ccfd41  -

因此,我们的事件主题 0 是 e86f4727db138d4b9cb776888b1d2239562eafaa38dd110b7d5def7698ccfd41。现在我们只需将 TOPIC_ID 常量定义在 impl OrderFulfilled 定义中更改为以下内容

const TOPIC_ID: [u8; 32] = hex_literal::hex!("e86f4727db138d4b9cb776888b1d2239562eafaa38dd110b7d5def7698ccfd41");

现在,在 pub struct OrderFulfilled 中,将 offer 字段(当前定义为 offer: Vec<Vec<u8>>),通常包含一个元组的数组,更改为它的正确值 offer: Vec<SpentItem>

pub struct OrderFulfilled {
    pub order_hash: [u8; 32usize],
    pub offer: Vec<SpentItem>,
}

然后在 pub fn decode(log: <type>) 中找到事件 "token" 定义,该定义当前为

let mut values = ethabi::decode(
    &[
        ethabi::ParamType::FixedBytes(32usize),
        ethabi::ParamType::Array(Box::new(ethabi::ParamType::Address)),
    ],
    log.data.as_ref(),
)

确定字段是元组,在我们的情况下是第二个字段,将其定义为 ethabi::ParamType::Tuple 类型。

let mut values = ethabi::decode(
    &[
        ethabi::ParamType::FixedBytes(32usize),
        ethabi::ParamType::Array(Box::new(ethabi::ParamType::Tuple(vec![
            ethabi::ParamType::Uint(8usize),
            ethabi::ParamType::Uint(256usize),
        ]))),
    ],
    log.data.as_ref(),

信息 函数 SpentItem::decode 在函数开头有一个 let mut values 定义变量,用于列出要放入元组的正确元素,无需手动定义列表,只需复制即可。

现在仍在 OrderFulfilled 函数中 pub fn decode(log: <type>),找到执行实际解码工作的代码片段,目前看起来是这样的:

 ...
 },
 offer: values
            .pop()
            .expect(INTERNAL_ERR)
            .into_array()
            .expect(INTERNAL_ERR)
            .into_iter()
            .map(|inner| {
                inner
                    .into_address()
                    .expect(INTERNAL_ERR)
                    .as_bytes()
                    .to_vec()
            })
            .collect(),

将其修改为将其解码工作转发到 SpentItem 结构。

 ...
 },
 offer: values
            .pop()
            .expect(INTERNAL_ERR)
            .into_array()
            .expect(INTERNAL_ERR)
            .into_iter()
            .map(|inner| {
                let fields = inner.into_tuple().expect(INTERNAL_ERR);
                SpentItem::decode(fields).expect(INTERNAL_ERR);
            })
            .collect(),

impl OrderFulfilled 结构中的最终修改是略微修改 pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool 定义。它包含一段代码,确保 log.data 有一定数量的字节,我们的修改后的ABI将为 log.data 生成错误的验证代码,所以让我们将其删除。

pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
    if log.topics.len() != 1usize {
        return false;
    }
    if log.data.len() < 96usize {
        return false;
    }
    return log.topics.get(0).expect("bounds already checked").as_ref() == Self::TOPIC_ID;
}

应变为:

pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
    if log.topics.len() != 1usize {
        return false;
    }
    return log.topics.get(0).expect("bounds already checked").as_ref() == Self::TOPIC_ID;
}

注意 计算过程过于繁琐,无法手动完成,主题计数和验证主题0的值就足够匹配所需的仅事件。

现在,让我们将注意力转向 SpentItem 的实现。在 impl SpentItem 块中,删除:

  • TOPIC_ID 常量
  • match_log 函数

并完全删除 impl substreams_ethereum::Event for SpentItem 块。最后要做的就是调整 SpentItem::decode 函数,更改其当前的签名

pub fn decode(log: &substreams_ethereum::pb::eth::v2::Log) -> Result<Self, String>

使其接受一个 Vec<ethabi::Token> 并将变量重命名为 values,以及将其设置为可变的。

pub fn decode(mut values: Vec<ethabi::Token>) -> Result<Self, String>

最后,删除函数体中现在已有的以前的 let mut values 定义,它看起来像这样:

let mut values = ethabi::decode(
    &[
        ethabi::ParamType::Uint(8usize),
        ethabi::ParamType::Uint(256usize),
    ],
    log.data.as_ref(),
)
.map_err(|e| format!("unable to decode log.data: {:?}",

注意 您必须保留下面的 values.reverse() 部分,这对于解码代码的正常运行非常重要。

现在一切都已经完成,您可以使用 OrderFulfilled 来解码包含 tuple 的事件。最终代码看起来像这样

const INTERNAL_ERR: &str = "decode event internal error";
#[derive(Debug, Clone, PartialEq)]
pub struct OrderFulfilled {
    pub order_hash: [u8; 32usize],
    pub offer: Vec<SpentItem>,
}
impl OrderFulfilled {
    const TOPIC_ID: [u8; 32] =
        hex_literal::hex!("e86f4727db138d4b9cb776888b1d2239562eafaa38dd110b7d5def7698ccfd41");

    pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        if log.topics.len() != 1usize {
            return false;
        }
        return log.topics.get(0).expect("bounds already checked").as_ref() == Self::TOPIC_ID;
    }

    pub fn decode(log: &substreams_ethereum::pb::eth::v2::Log) -> Result<Self, String> {
        let mut values = ethabi::decode(
            &[
                ethabi::ParamType::FixedBytes(32usize),
                ethabi::ParamType::Array(Box::new(ethabi::ParamType::Tuple(vec![
                    ethabi::ParamType::Uint(8usize),
                    ethabi::ParamType::Uint(256usize),
                ]))),
            ],
            log.data.as_ref(),
        )
        .map_err(|e| format!("unable to decode log.data: {:?}", e))?;
        values.reverse();
        Ok(Self {
            order_hash: {
                let mut result = [0u8; 32];
                let v = values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_fixed_bytes()
                    .expect(INTERNAL_ERR);
                result.copy_from_slice(&v);
                result
            },
            offer: values
                .pop()
                .expect(INTERNAL_ERR)
                .into_array()
                .expect(INTERNAL_ERR)
                .into_iter()
                .map(|inner| {
                    let fields = inner.into_tuple().expect(INTERNAL_ERR);
                    SpentItem::decode(fields).unwrap()
                })
                .collect(),
        })
    }
}
impl substreams_ethereum::Event for OrderFulfilled {
    const NAME: &'static str = "OrderFulfilled";
    fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        Self::match_log(log)
    }
    fn decode(log: &substreams_ethereum::pb::eth::v2::Log) -> Result<Self, String> {
        Self::decode(log)
    }
}
#[derive(Debug, Clone, PartialEq)]
pub struct SpentItem {
    pub item_type: substreams::scalar::BigInt,
    pub amount: substreams::scalar::BigInt,
}
impl SpentItem {
    pub fn decode(mut values: Vec<ethabi::Token>) -> Result<Self, String> {
        values.reverse();
        Ok(Self {
            item_type: {
                let mut v = [0 as u8; 32];
                values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_uint()
                    .expect(INTERNAL_ERR)
                    .to_big_endian(v.as_mut_slice());
                substreams::scalar::BigInt::from_unsigned_bytes_be(&v)
            },
            amount: {
                let mut v = [0 as u8; 32];
                values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_uint()
                    .expect(INTERNAL_ERR)
                    .to_big_endian(v.as_mut_slice());
                substreams::scalar::BigInt::from_unsigned_bytes_be(&v)
            },
        })
    }
}

如果您遇到困难,请在 Discord 上联系我们,我们将帮助您解决问题。

依赖项

~7–9.5MB
~160K SLoC