5个版本 (破坏性更新)

0.10.0 2021年1月9日
0.9.0 2020年11月19日
0.8.0 2020年10月5日
0.7.0 2020年9月23日
0.6.0 2020年9月16日

#9 in #block-store

每月下载量29次
ipfs-embed-cli中使用

MIT/Apache

35KB
865

ipfs-embed

为嵌入到复杂p2p应用而设计的小型、快速、可靠的ipfs实现。

  • 通过mdns进行节点发现
  • 通过kademlia进行提供者发现
  • 通过bitswap交换块
  • lru驱逐策略
  • 别名,递归命名的引用的抽象
  • 用于构建dag的临时递归引用,防止与垃圾回收器的竞争
  • 高效同步大型dag块

可以通过compat功能标志启用一些与go-ipfs的兼容性。

入门

use ipfs_embed::{Config, DefaultParams, Ipfs};
use libipld::DagCbor;
use libipld::store::Store;

#[derive(Clone, DagCbor, Debug, Eq, PartialEq)]
struct Identity {
    id: u64,
    name: String,
    age: u8,
}

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let cache_size = 10;
    let ipfs = Ipfs::<DefaultParams>::new(Config::new(None, cache_size)).await?;
    ipfs.listen_on("/ip4/0.0.0.0/tcp/0".parse()?).await?;

    let identity = Identity {
        id: 0,
        name: "David Craven".into(),
        age: 26,
    };
    let cid = ipfs.insert(&identity)?;
    let identity2 = ipfs.get(&cid)?;
    assert_eq!(identity, identity2);
    println!("identity cid is {}", cid);

    Ok(())
}

以下是一些关于ipfs-embed历史的笔记。这些信息对于当前实现不再准确。

什么是ipfs?

Ipfs是一个p2p网络,用于定位和提供称为块的以内容地址标记的数据块。

内容地址意味着数据是通过其哈希值而不是位置地址来定位的。

不出所料,这是使用分布式哈希表完成的。为了避免在dht中存储大量数据,dht存储了哪些对等方拥有块。在确定提供块的节点后,将从这些节点请求块。

为了验证对等方正在发送请求的块而不是无限垃圾流,块需要有限的大小。在实践中,我们将假设最大块大小为1MB。

将任意数据编码到1MB块中对编解码器施加了两个要求。它需要有规范表示,以确保相同的数据产生相同的哈希值,并且它需要支持链接到其他内容地址块。具有这两个属性的编解码器称为ipld编解码器。

从内容地址(将边表示为节点的哈希值)推导出的一个属性是,不可能有任意块的图。块的图保证是有向且无环的。

{"a":3}
{
  "a": 3,
}
{"/":"QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u"}

块存储

让我们从一个简单的持久化块存储模型开始。

trait BlockStorage {
    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
    fn insert(&mut self, cid: &Cid, data: &[u8]) -> Result<()>;
    fn remove(&mut self, cid: &Cid) -> Result<()>;
}

由于内容寻址块形成有向无环图,因此不能简单地删除块。一个块可能被多个节点引用,因此需要某种形式的引用计数和垃圾回收来确定何时可以安全地删除块。为了在p2p网络中成为一个好的对等节点,我们可能想要保留其他对等节点可能需要的旧块。因此,将其视为引用计数的缓存可能是一个更合适的模型。最后我们得到如下内容

trait BlockStorage {
    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
    fn insert(&mut self, cid: &Cid, data: &[u8], references: &[Cid]) -> Result<()>;
    fn evict(&mut self) -> Result<()>;
    fn pin(&mut self, cid: &Cid) -> Result<()>;
    fn unpin(&mut self, cid: &Cid) -> Result<()>;
}

要修改一个块,我们需要执行三个步骤:获取块、修改并插入修改后的块,最后删除旧块。我们还需要一个从键到cids的映射,因此还需要更多的步骤。任何这些步骤都可能失败,导致块存储处于不一致的状态,从而导致数据泄露。为了防止数据泄露,每个API消费者都必须实现写前日志。为了解决这些问题,我们通过具有别名的命名引脚扩展了存储。

trait BlockStorage {
    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
    fn insert(&mut self, cid: &Cid, data: &[u8], references: &[Cid]) -> Result<()>;
    fn evict(&mut self) -> Result<()>;
    fn alias(&mut self, alias: &[u8], cid: Option<&Cid>) -> Result<()>;
    fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>>;
}

假设每个操作都是原子性和持久的,我们得到存储内容寻址块dags所需的最小操作集。

网络化块存储 - ipfs-embed api

impl Ipfs {
    pub fn new(storage: Arc<S>, network: Arc<N>) -> Self { .. }
    pub fn local_peer_id(&self) -> &PeerId { .. }
    pub async fn listeners(&self) -> Vec<Multiaddr> { .. }
    pub async fn external_addresses(&self) -> Vec<Multiaddr> { .. }
    pub async fn pinned(&self, cid: &Cid) -> Result<Option<bool>> { .. }
    pub async fn get(&self, cid: &Cid) -> Result<Block> {
        if let Some(block) = self.storage.get(cid)? {
            return Ok(block);
        }
        self.network.get(cid).await?;
        if let Some(block) = self.storage.get(cid)? {
            return Ok(block);
        }
        log::error!("block evicted too soon");
        Err(BlockNotFound(*cid))
    }
    pub async fn insert(&self, cid: &Cid) -> Result<()> {
        self.storage.insert(cid)?;
        self.network.provide(cid)?;
        Ok(())
    }
    pub async fn alias(&self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
        if let Some(cid) = cid {
            self.network.sync(cid).await?;
        }
        self.storage.alias(alias, cid).await?;
        Ok(())
    }
    pub async fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>> {
        self.storage.resolve(alias)?;
        Ok(())
    }
}

设计模式 - ipfs-embed的实际应用

我们将查看在chain示例中使用的某些模式。该chain示例使用ipfs-embed来存储一系列块。块被定义为

#[derive(Debug, Default, DagCbor)]
pub struct Block {
    prev: Option<Cid>,
    id: u32,
    loopback: Option<Cid>,
    payload: Vec<u8>,
}

原子性

在这个例子中,我们有两种不同的数据库。一种是ipfs-embed管理的,用于存储块和别名;另一种是特定于示例的,它将块索引映射到块cid,这样我们就可以快速查找块,而无需遍历整个链。为了确保原子性,我们定义了两个别名,并在两步中执行同步。这确保了同步的链始终对其块进行索引。

const TMP_ROOT: &str = alias!(tmp_root);
const ROOT: &str = alias!(root);

ipfs.alias(TMP_ROOT, Some(new_root)).await?;
for _ in prev_root_id..new_root_id {
    // index block may error for various reasons
}
ipfs.alias(ROOT, Some(new_root)).await?;

Dagification

递归同步算法在同步链时表现最差,因为每个块都需要一个接一个地同步,无法利用任何并行性。为了解决这个问题,我们通过包含回环来增加链的链接,从而增加dag的分支。

为此,@rklaehn提出了一种算法

fn loopback(block: usize) -> Option<usize> {
    let x = block.trailing_zeros();
    if x > 1 && block > 0 {
        Some(block - (1 << (x - 1)))
    } else {
        None
    }
}

选择器

同步可能需要很长时间,并且不允许选择所需的数据子集。为此,有一个实验性的alias_with_syncer api,允许自定义同步行为。在链示例中,它用于提供块验证,以确保块的有效性。尽管这个api可能在将来发生变化。

pub struct ChainSyncer<S: StoreParams, T: Storage<S>> {
    index: sled::Db,
    storage: BitswapStorage<S, T>,
}

impl<S: StoreParams, T: Storage<S>> BitswapSync for ChainSyncer<S, T>
where
    S::Codecs: Into<DagCborCodec>,
{
    fn references(&self, cid: &Cid) -> Box<dyn Iterator<Item = Cid>> {
        if let Some(data) = self.storage.get(cid) {
            let ipld_block = libipld::Block::<S>::new_unchecked(*cid, data);
            if let Ok(block) = ipld_block.decode::<DagCborCodec, Block>() {
                return Box::new(block.prev.into_iter().chain(block.loopback.into_iter()));
            }
        }
        Box::new(std::iter::empty())
    }

    fn contains(&self, cid: &Cid) -> bool {
        self.storage.contains(cid)
    }
}

高效的块存储实现 - ipfs-embed内部

ipfs embed使用SQLite来实现块存储,它是一个高性能的可嵌入SQL持久层/数据库。

type Id = u64;
type Atime = u64;

#[derive(Clone)]
struct BlockCache {
    // Cid -> Id
    lookup: Tree,
    // Id -> Cid
    cid: Tree,
    // Id -> Vec<u8>
    data: Tree,
    // Id -> Vec<Id>
    refs: Tree,
    // Id -> Atime
    atime: Tree,
    // Atime -> Id
    lru: Tree,
}

impl BlockCache {
    // Updates the atime and lru trees and returns the data from the data tree.
    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { .. }
    // Returns an iterator of blocks sorted by least recently used.
    fn lru(&self) -> impl Iterator<Item = Result<Id>> { self.lru.iter().values() }
    // Inserts into all trees.
    fn insert(&self, cid: &Cid, data: &[u8]) -> Result<()> { ... }
    // Removes from all trees.
    fn remove(&self, id: &Id) -> Result<()> { ... }
    // Returns the recursive set of references.
    fn closure(&self, cid: &Cid) -> Result<Vec<Id>> { ... }
    // A stream of insert/remove events, useful for plugging in a network layer.
    fn subscribe(&self) -> impl Stream<Item = StorageEvent> { ... }
}

鉴于操作的描述以及它在树结构中的组织方式,这些操作易于实现。

#[derive(Clone)]
struct BlockStorage {
    cache: BlockCache,
    // Vec<u8> -> Id
    alias: Tree,
    // Bag of live ids
    filter: Arc<Mutex<CuockooFilter>>,
    // Id -> Vec<Id>
    closure: Tree,
}

impl BlockStorage {
    // get from cache
    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { self.cache.get(cid) }
    // insert to cache
    fn insert(&self, cid: &Cid, data: &[u8]) -> Result<()> { self.cache.insert(cid, data) }
    // returns the value of the alias tree
    fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>> { ... }
    // remove the lru block that is not in the bag of live ids and remove it's closure from
    // the closure tree
    fn evict(&self) -> Result<()> { ... }
    // aliasing is an expensive operation, the implementation is sketched in pseudo code
    fn alias(&self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
        // precompute the closure
        let prev_id = self.alias.get(alias)?;
        let prev_closure = self.closure.get(&prev_id)?;
        let new_id = self.cache.lookup(&cid);
        let new_closure = self.cache.closure(&cid);

        // lock the filter preventing evictions
        let mut filter = self.filter.lock().unwrap();
        // make sure that new closure wasn't evicted in the mean time
        for id in &new_closure {
            if !self.cache.contains_id(&id) {
                return Err("cannot alias, missing references");
            }
        }
        // update the live set
        for id in &new_closure {
            filter.add(id);
        }
        for id in &prev_closure {
            filter.delete(id);
        }
        // perform transaction
        let res = (&self.alias, &self.closure).transaction(|(talias, tclosure)| {
            if let Some(id) = prev_id.as_ref() {
                talias.remove(alias)?;
            }
            if let Some(id) = id.as_ref() {
                talias.insert(alias, id)?;
                tclosure.insert(id, &closure)?;
            }
            Ok(())
        });
        // if transaction failed revert live set to previous state
        if res.is_err() {
            for id in &prev_closure {
                filter.add(id);
            }
            for id in &closure {
                filter.delete(id)
            }
        }
        res
    }
}

高效同步块dags - libp2p-bitswap内部

Bitswap是一个非常简单的协议。它被适应和简化用于ipfs-embed。消息格式可以表示为以下枚举。

pub enum BitswapRequest {
    Have(Cid),
    Block(Cid),
}

pub enum BitswapResponse {
    Have(bool),
    Block(Vec<u8>),
}

定位提供者的机制可以被抽象化。可以插入dht或集中式数据库查询。bitswap api看起来如下

pub enum Query {
    Get(Cid),
    Sync(Cid),
}

pub enum BitswapEvent {
    GetProviders(Cid),
    QueryComplete(Query, Result<()>),
}

impl Bitswap {
    pub fn add_address(&mut self, peer_id: &PeerId, addr: Multiaddr) { .. }
    pub fn get(&mut self, cid: Cid) { .. }
    pub fn cancel_get(&mut self, cid: Cid) { .. }
    pub fn add_provider(&mut self, cid: Cid, peer_id: PeerId) { .. }
    pub fn complete_get_providers(&mut self, cid: Cid) { .. }
    pub fn poll(&mut self, cx: &mut Context) -> BitswapEvent { .. }
}

创建GET请求时会发生什么?首先,使用have请求查询初始集合中的所有提供者。为了优化,在每次查询批次中发送一个块请求。如果GET查询找到一个块,则返回查询完成。如果在初始集合中没有找到该块,则发出GetProviders(Cid)事件。这就是bitswap消费者尝试通过例如执行dht查找来定位提供者的地方。通过调用add_provider方法注册这些提供者。定位提供者完成后,通过调用complete_get_providers来发出信号。然后查询管理器使用新提供者集执行bitswap请求,结果是找到块或块未找到错误。

我们通常希望同步整个dag的块。我们可以通过添加一个同步查询,该查询并行运行所有块的引用的GET查询来有效地同步dag的块。用作引用查询的初始集合是拥有块的提供者集。为此,我们通过以下调用扩展了我们的API。

/// Bitswap sync trait for customizing the syncing behaviour.
pub trait BitswapSync {
    /// Returns the list of blocks that need to be synced.
    fn references(&self, cid: &Cid) -> Box<dyn Iterator<Item = Cid>>;
    /// Returns if a cid needs to be synced.
    fn contains(&self, cid: &Cid) -> bool;
}

impl Bitswap {
    pub fn sync(&mut self, cid: Cid, syncer: Arc<dyn BitswapSync>) { .. }
    pub fn cancel_sync(&mut self, cid: Cid) { .. }
}

请注意,我们可以通过选择要同步的块子集来任意定制同步行为。有关更多信息,请参阅设计模式。

许可证

MIT OR Apache-2.0

依赖项

~16–30MB
~504K SLoC