#ipfs #hash-table #content #distributed #addressing #storage #ipfs-embed

app ipfs-embed-cli

ipfs-embed的命令行工具

4 个版本 (重大更改)

0.6.0 2020年9月16日
0.4.0 2020年8月25日
0.3.0 2020年7月27日
0.2.0 2020年7月12日

#2 in #addressing

MIT/Apache

46KB
821 行代码(不包括注释)

ipfs-embed

一个专为嵌入到复杂p2p应用程序而设计的小型、快速且可靠的ipfs实现。

  • 通过mdns进行节点发现
  • 通过kademlia进行提供者发现
  • 通过bitswap交换块
  • lru淘汰策略
  • 别名,递归命名固定点的抽象
  • 为构建dag提供临时的递归固定点,防止与垃圾收集器竞争
  • 有效地同步大型dag块

可以通过启用compat功能标志来启用与go-ipfs的一些兼容性。

入门

use ipfs_embed::{Config, DefaultParams, Ipfs};
use libipld::DagCbor;
use libipld::store::Store;

#[derive(Clone, DagCbor, Debug, Eq, PartialEq)]
struct Identity {
    id: u64,
    name: String,
    age: u8,
}

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let cache_size = 10;
    let ipfs = Ipfs::<DefaultParams>::new(Config::new(None, cache_size)).await?;
    ipfs.listen_on("/ip4/0.0.0.0/tcp/0".parse()?).await?;

    let identity = Identity {
        id: 0,
        name: "David Craven".into(),
        age: 26,
    };
    let cid = ipfs.insert(&identity)?;
    let identity2 = ipfs.get(&cid)?;
    assert_eq!(identity, identity2);
    println!("identity cid is {}", cid);

    Ok(())
}

以下是一些关于ipfs-embed历史的笔记。这些信息对于当前实现不再准确。

什么是ipfs?

Ipfs是一个p2p网络,用于定位和提供称为块的以内容地址数据块。

内容地址意味着数据通过其哈希值定位,而不是位置地址。

不出所料,这是通过分布式哈希表完成的。为了避免在dht中存储大量数据,dht存储哪些对等节点拥有一个块。在确定提供块的节点后,从这些节点请求块。

为了验证对等节点正在发送请求的块而不是垃圾流,块需要有限的大小。在实践中,我们将假设最大块大小为1MB。

将任意数据编码到1MB块中,对编解码器提出了两个要求。它需要一个规范表示形式,以确保相同的数据产生相同的哈希值,并且它需要支持链接到其他内容地址块。具有这两个属性的编解码器称为ipld编解码器。

从内容地址(将边表示为节点的哈希值)得出的一个性质是,任意块的图是不可能的。块图保证是有向且无环的。

{"a":3}
{
  "a": 3,
}
{"/":"QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u"}

块存储

让我们从一个简单的持久块存储模型开始。

trait BlockStorage {
    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
    fn insert(&mut self, cid: &Cid, data: &[u8]) -> Result<()>;
    fn remove(&mut self, cid: &Cid) -> Result<()>;
}

由于内容地址块形成一个有向无环图,因此不能简单地删除块。一个块可能被多个节点引用,因此需要某种形式的引用计数和垃圾回收来确定何时可以安全地删除块。为了在p2p网络上成为一个好节点,我们可能想要保留其他节点可能需要的旧块。因此,将其视为引用计数缓存可能是一个更合适的模型。我们最终得到如下内容:

trait BlockStorage {
    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
    fn insert(&mut self, cid: &Cid, data: &[u8], references: &[Cid]) -> Result<()>;
    fn evict(&mut self) -> Result<()>;
    fn pin(&mut self, cid: &Cid) -> Result<()>;
    fn unpin(&mut self, cid: &Cid) -> Result<()>;
}

要修改一个块,我们需要执行三个步骤。获取块,修改并插入修改后的块,最后删除旧的一个。我们还需要一个从键到cids的映射,因此需要更多的步骤。这些步骤中的任何一个都可能失败,导致块存储处于不一致的状态,从而引起数据泄露。为了防止数据泄露,每个API消费者都必须实现一个写前日志。为了解决这些问题,我们通过名为别名的命名引脚扩展了存储。

trait BlockStorage {
    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
    fn insert(&mut self, cid: &Cid, data: &[u8], references: &[Cid]) -> Result<()>;
    fn evict(&mut self) -> Result<()>;
    fn alias(&mut self, alias: &[u8], cid: Option<&Cid>) -> Result<()>;
    fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>>;
}

假设每个操作都是原子性和持久的,我们拥有存储内容寻址块dags所需的最小操作集。

网络化块存储 - ipfs-embed API

impl Ipfs {
    pub fn new(storage: Arc<S>, network: Arc<N>) -> Self { .. }
    pub fn local_peer_id(&self) -> &PeerId { .. }
    pub async fn listeners(&self) -> Vec<Multiaddr> { .. }
    pub async fn external_addresses(&self) -> Vec<Multiaddr> { .. }
    pub async fn pinned(&self, cid: &Cid) -> Result<Option<bool>> { .. }
    pub async fn get(&self, cid: &Cid) -> Result<Block> {
        if let Some(block) = self.storage.get(cid)? {
            return Ok(block);
        }
        self.network.get(cid).await?;
        if let Some(block) = self.storage.get(cid)? {
            return Ok(block);
        }
        log::error!("block evicted too soon");
        Err(BlockNotFound(*cid))
    }
    pub async fn insert(&self, cid: &Cid) -> Result<()> {
        self.storage.insert(cid)?;
        self.network.provide(cid)?;
        Ok(())
    }
    pub async fn alias(&self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
        if let Some(cid) = cid {
            self.network.sync(cid).await?;
        }
        self.storage.alias(alias, cid).await?;
        Ok(())
    }
    pub async fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>> {
        self.storage.resolve(alias)?;
        Ok(())
    }
}

设计模式 - ipfs-embed的实际应用

我们将探讨在chain示例中使用的一些模式。该chain示例使用ipfs-embed来存储一系列块。一个块被定义为

#[derive(Debug, Default, DagCbor)]
pub struct Block {
    prev: Option<Cid>,
    id: u32,
    loopback: Option<Cid>,
    payload: Vec<u8>,
}

原子性

在这个例子中,我们有不同的数据库。由ipfs-embed管理的数据库负责存储块和别名,另一个特定于示例的数据库将块索引映射到块cid,以便我们可以快速查找块,而不必遍历整个链。为了保证原子性,我们定义了两个别名,并在两个步骤中执行同步。这确保了同步的链始终具有其块的索引。

const TMP_ROOT: &str = alias!(tmp_root);
const ROOT: &str = alias!(root);

ipfs.alias(TMP_ROOT, Some(new_root)).await?;
for _ in prev_root_id..new_root_id {
    // index block may error for various reasons
}
ipfs.alias(ROOT, Some(new_root)).await?;

Dagification

递归同步算法在同步链时表现最差,因为每个块都需要一个接一个地同步,无法利用任何并行性。为了解决这个问题,我们通过包括回环来增加链的链接,从而增加dag的分支。

为此,@rklaehn提出了一个算法

fn loopback(block: usize) -> Option<usize> {
    let x = block.trailing_zeros();
    if x > 1 && block > 0 {
        Some(block - (1 << (x - 1)))
    } else {
        None
    }
}

选择器

同步可能需要很长时间,并且不允许选择所需的数据子集。为此,有一个实验性的alias_with_syncer API,允许自定义同步行为。在链示例中,它用于提供块验证,以确保块是有效的。尽管这个API未来可能会改变。

pub struct ChainSyncer<S: StoreParams, T: Storage<S>> {
    index: sled::Db,
    storage: BitswapStorage<S, T>,
}

impl<S: StoreParams, T: Storage<S>> BitswapSync for ChainSyncer<S, T>
where
    S::Codecs: Into<DagCborCodec>,
{
    fn references(&self, cid: &Cid) -> Box<dyn Iterator<Item = Cid>> {
        if let Some(data) = self.storage.get(cid) {
            let ipld_block = libipld::Block::<S>::new_unchecked(*cid, data);
            if let Ok(block) = ipld_block.decode::<DagCborCodec, Block>() {
                return Box::new(block.prev.into_iter().chain(block.loopback.into_iter()));
            }
        }
        Box::new(std::iter::empty())
    }

    fn contains(&self, cid: &Cid) -> bool {
        self.storage.contains(cid)
    }
}

高效的块存储实现 - ipfs-embed内部

Ipcfs embed使用SQLite实现块存储,它是一个性能优良的嵌入式SQL持久层/数据库。

type Id = u64;
type Atime = u64;

#[derive(Clone)]
struct BlockCache {
    // Cid -> Id
    lookup: Tree,
    // Id -> Cid
    cid: Tree,
    // Id -> Vec<u8>
    data: Tree,
    // Id -> Vec<Id>
    refs: Tree,
    // Id -> Atime
    atime: Tree,
    // Atime -> Id
    lru: Tree,
}

impl BlockCache {
    // Updates the atime and lru trees and returns the data from the data tree.
    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { .. }
    // Returns an iterator of blocks sorted by least recently used.
    fn lru(&self) -> impl Iterator<Item = Result<Id>> { self.lru.iter().values() }
    // Inserts into all trees.
    fn insert(&self, cid: &Cid, data: &[u8]) -> Result<()> { ... }
    // Removes from all trees.
    fn remove(&self, id: &Id) -> Result<()> { ... }
    // Returns the recursive set of references.
    fn closure(&self, cid: &Cid) -> Result<Vec<Id>> { ... }
    // A stream of insert/remove events, useful for plugging in a network layer.
    fn subscribe(&self) -> impl Stream<Item = StorageEvent> { ... }
}

鉴于操作的描述以及它们在树结构中的结构,这些操作很容易实现。

#[derive(Clone)]
struct BlockStorage {
    cache: BlockCache,
    // Vec<u8> -> Id
    alias: Tree,
    // Bag of live ids
    filter: Arc<Mutex<CuockooFilter>>,
    // Id -> Vec<Id>
    closure: Tree,
}

impl BlockStorage {
    // get from cache
    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { self.cache.get(cid) }
    // insert to cache
    fn insert(&self, cid: &Cid, data: &[u8]) -> Result<()> { self.cache.insert(cid, data) }
    // returns the value of the alias tree
    fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>> { ... }
    // remove the lru block that is not in the bag of live ids and remove it's closure from
    // the closure tree
    fn evict(&self) -> Result<()> { ... }
    // aliasing is an expensive operation, the implementation is sketched in pseudo code
    fn alias(&self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
        // precompute the closure
        let prev_id = self.alias.get(alias)?;
        let prev_closure = self.closure.get(&prev_id)?;
        let new_id = self.cache.lookup(&cid);
        let new_closure = self.cache.closure(&cid);

        // lock the filter preventing evictions
        let mut filter = self.filter.lock().unwrap();
        // make sure that new closure wasn't evicted in the mean time
        for id in &new_closure {
            if !self.cache.contains_id(&id) {
                return Err("cannot alias, missing references");
            }
        }
        // update the live set
        for id in &new_closure {
            filter.add(id);
        }
        for id in &prev_closure {
            filter.delete(id);
        }
        // perform transaction
        let res = (&self.alias, &self.closure).transaction(|(talias, tclosure)| {
            if let Some(id) = prev_id.as_ref() {
                talias.remove(alias)?;
            }
            if let Some(id) = id.as_ref() {
                talias.insert(alias, id)?;
                tclosure.insert(id, &closure)?;
            }
            Ok(())
        });
        // if transaction failed revert live set to previous state
        if res.is_err() {
            for id in &prev_closure {
                filter.add(id);
            }
            for id in &closure {
                filter.delete(id)
            }
        }
        res
    }
}

高效同步块dag - libp2p-bitswap内部

Bitswap是一个非常简单的协议。它被改编和简化用于ipfs-embed。消息格式可以用以下枚举表示。

pub enum BitswapRequest {
    Have(Cid),
    Block(Cid),
}

pub enum BitswapResponse {
    Have(bool),
    Block(Vec<u8>),
}

定位提供者的机制可以被抽象化。可以插入dht或集中式数据库查询。Bitswap API如下所示

pub enum Query {
    Get(Cid),
    Sync(Cid),
}

pub enum BitswapEvent {
    GetProviders(Cid),
    QueryComplete(Query, Result<()>),
}

impl Bitswap {
    pub fn add_address(&mut self, peer_id: &PeerId, addr: Multiaddr) { .. }
    pub fn get(&mut self, cid: Cid) { .. }
    pub fn cancel_get(&mut self, cid: Cid) { .. }
    pub fn add_provider(&mut self, cid: Cid, peer_id: PeerId) { .. }
    pub fn complete_get_providers(&mut self, cid: Cid) { .. }
    pub fn poll(&mut self, cx: &mut Context) -> BitswapEvent { .. }
}

那么当你创建一个get请求会发生什么?首先,通过have请求查询初始集中的所有提供者。作为一个优化,在每次查询批次中发送一个块请求。如果get查询找到一个块,它返回一个查询完成。如果块没有在初始集中找到,则发出一个GetProviders(Cid)事件。这就是bitswap消费者尝试通过例如执行dht查找来定位提供者的地方。通过调用add_provider方法注册这些提供者。在定位提供者完成后,通过调用complete_get_providers来发出信号。然后,查询管理器使用新的提供者集执行bitswap请求,结果找到块或找不到块错误。

我们通常希望同步整个区块图。我们可以通过添加一个同步查询,并行运行对区块所有引用的获取查询,来有效地同步区块图。拥有该区块的提供者集合用作引用查询的初始集合。为此,我们通过以下调用扩展了我们的API。

/// Bitswap sync trait for customizing the syncing behaviour.
pub trait BitswapSync {
    /// Returns the list of blocks that need to be synced.
    fn references(&self, cid: &Cid) -> Box<dyn Iterator<Item = Cid>>;
    /// Returns if a cid needs to be synced.
    fn contains(&self, cid: &Cid) -> bool;
}

impl Bitswap {
    pub fn sync(&mut self, cid: Cid, syncer: Arc<dyn BitswapSync>) { .. }
    pub fn cancel_sync(&mut self, cid: Cid) { .. }
}

请注意,我们可以通过选择要同步的区块子集来任意定制同步行为。有关更多信息,请参阅设计模式。

许可证

MIT 或 Apache-2.0

依赖关系

~19–36MB
~562K SLoC