33个版本 (破坏性)
0.26.1 | 2023年7月24日 |
---|---|
0.26.0 | 2022年12月1日 |
0.25.1 | 2022年11月15日 |
0.23.0 | 2022年5月17日 |
0.3.0 | 2020年7月27日 |
#172 在 网络编程
在 5 个crate中(4个直接)使用
220KB
5K SLoC
ipfs-embed
为嵌入到复杂p2p应用而设计的轻量级、快速和可靠的ipfs实现。
- 通过mdns进行节点发现
- 通过kademlia进行提供者发现
- 通过bitswap交换块
- lru淘汰策略
- 别名,递归命名桩的抽象
- 用于构建dag的临时递归桩,防止与垃圾回收器的竞争
- 高效同步大型块dag
可以通过compat
功能标志启用与go-ipfs的一些兼容性。
入门指南
use ipfs_embed::{Config, DefaultParams, Ipfs};
use libipld::DagCbor;
use libipld::store::Store;
#[derive(Clone, DagCbor, Debug, Eq, PartialEq)]
struct Identity {
id: u64,
name: String,
age: u8,
}
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cache_size = 10;
let ipfs = Ipfs::<DefaultParams>::new(Config::new(None, cache_size)).await?;
ipfs.listen_on("/ip4/0.0.0.0/tcp/0".parse()?).await?;
let identity = Identity {
id: 0,
name: "David Craven".into(),
age: 26,
};
let cid = ipfs.insert(&identity)?;
let identity2 = ipfs.get(&cid)?;
assert_eq!(identity, identity2);
println!("identity cid is {}", cid);
Ok(())
}
以下是一些关于ipfs-embed历史的说明。这些信息对于当前实现不再准确。
什么是ipfs?
Ipfs是一个p2p网络,用于定位和提供称为块的内容寻址数据。
内容寻址意味着数据是通过其哈希值而不是位置地址进行定位的。
不出所料,这是通过分布式哈希表来完成的。为了避免在dht中存储大量数据,dht存储了具有块的节点。确定提供块的节点后,将从这些节点请求块。
为了验证对等方正在发送请求的块而不是垃圾无限流,块需要有限的大小。在实践中,我们将假设最大块大小为1MB。
将任意数据编码到1MB块对编解码器提出两个要求。它需要一个规范表示来确保相同的数据产生相同的哈希,并且它需要支持链接到其他内容寻址块。具有这两个属性的编解码器称为ipld编解码器。
从内容寻址(将边表示为节点的哈希值)得出的一个属性是,不可能有任意的块图。块图保证是定向的和无环的。
{"a":3}
{
"a": 3,
}
{"/":"QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u"}
块存储
让我们从一个简单的持久化块存储模型开始。
trait BlockStorage {
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
fn insert(&mut self, cid: &Cid, data: &[u8]) -> Result<()>;
fn remove(&mut self, cid: &Cid) -> Result<()>;
}
由于内容寻址块构成了一个有向无环图,因此不能简单地删除块。一个块可能被多个节点引用,因此需要某种形式的引用计数和垃圾收集来决定何时可以安全地删除块。为了在P2P网络中成为好的节点,我们可能想要保留其他节点可能需要的旧块。因此,将其视为一个引用计数的缓存可能是一个更合适的模型。我们最终得到以下内容:
trait BlockStorage {
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
fn insert(&mut self, cid: &Cid, data: &[u8], references: &[Cid]) -> Result<()>;
fn evict(&mut self) -> Result<()>;
fn pin(&mut self, cid: &Cid) -> Result<()>;
fn unpin(&mut self, cid: &Cid) -> Result<()>;
}
要修改一个块,我们需要执行三个步骤。获取块,修改并插入修改后的块,最后删除旧块。我们还需要一个从键到cids的映射,因此需要更多的步骤。这些步骤中的任何一步都可能失败,导致块存储处于不一致状态,导致数据泄露。为了防止数据泄露,每个API消费者都必须实现一个预写日志。为了解决这些问题,我们通过名为别名(aliases)的命名pins扩展了存储。
trait BlockStorage {
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
fn insert(&mut self, cid: &Cid, data: &[u8], references: &[Cid]) -> Result<()>;
fn evict(&mut self) -> Result<()>;
fn alias(&mut self, alias: &[u8], cid: Option<&Cid>) -> Result<()>;
fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>>;
}
假设每个操作都是原子性和持久的,我们有存储内容寻址块DAG所需的最小操作集。
网络化块存储 - ipfs-embed API
impl Ipfs {
pub fn new(storage: Arc<S>, network: Arc<N>) -> Self { .. }
pub fn local_peer_id(&self) -> &PeerId { .. }
pub async fn listeners(&self) -> Vec<Multiaddr> { .. }
pub async fn external_addresses(&self) -> Vec<Multiaddr> { .. }
pub async fn pinned(&self, cid: &Cid) -> Result<Option<bool>> { .. }
pub async fn get(&self, cid: &Cid) -> Result<Block> {
if let Some(block) = self.storage.get(cid)? {
return Ok(block);
}
self.network.get(cid).await?;
if let Some(block) = self.storage.get(cid)? {
return Ok(block);
}
log::error!("block evicted too soon");
Err(BlockNotFound(*cid))
}
pub async fn insert(&self, cid: &Cid) -> Result<()> {
self.storage.insert(cid)?;
self.network.provide(cid)?;
Ok(())
}
pub async fn alias(&self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
if let Some(cid) = cid {
self.network.sync(cid).await?;
}
self.storage.alias(alias, cid).await?;
Ok(())
}
pub async fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>> {
self.storage.resolve(alias)?;
Ok(())
}
}
设计模式 - ipfs-embed实践
我们将研究在chain
示例中使用的某些模式。该chain
示例使用ipfs-embed
来存储一系列块。块被定义为:
#[derive(Debug, Default, DagCbor)]
pub struct Block {
prev: Option<Cid>,
id: u32,
loopback: Option<Cid>,
payload: Vec<u8>,
}
原子性
在这个例子中,我们有不同的数据库。一个是ipfs-embed
管理的,用于存储块和别名;另一个特定于示例,将块索引映射到块cid,这样我们就可以快速查找块,而无需遍历整个链。为了保证原子性,我们定义了两个别名,并在两步中执行同步。这确保了同步的链始终具有其块索引。
const TMP_ROOT: &str = alias!(tmp_root);
const ROOT: &str = alias!(root);
ipfs.alias(TMP_ROOT, Some(new_root)).await?;
for _ in prev_root_id..new_root_id {
// index block may error for various reasons
}
ipfs.alias(ROOT, Some(new_root)).await?;
DAG化
递归同步算法在同步链时表现最差,因为每个块都需要依次同步,无法利用任何并行性。为了解决这个问题,我们通过包括回环来增加链的链接,以增加DAG的分支。
为此目的,@rklaehn提出了一个算法
fn loopback(block: usize) -> Option<usize> {
let x = block.trailing_zeros();
if x > 1 && block > 0 {
Some(block - (1 << (x - 1)))
} else {
None
}
}
选择器
同步可能需要很长时间,并且不允许选择所需数据的子集。为此,有一个实验性的alias_with_syncer
API,允许自定义同步行为。在链示例中,它用于提供块验证,以确保块是有效的。尽管这个API在未来可能会发生变化。
pub struct ChainSyncer<S: StoreParams, T: Storage<S>> {
index: sled::Db,
storage: BitswapStorage<S, T>,
}
impl<S: StoreParams, T: Storage<S>> BitswapSync for ChainSyncer<S, T>
where
S::Codecs: Into<DagCborCodec>,
{
fn references(&self, cid: &Cid) -> Box<dyn Iterator<Item = Cid>> {
if let Some(data) = self.storage.get(cid) {
let ipld_block = libipld::Block::<S>::new_unchecked(*cid, data);
if let Ok(block) = ipld_block.decode::<DagCborCodec, Block>() {
return Box::new(block.prev.into_iter().chain(block.loopback.into_iter()));
}
}
Box::new(std::iter::empty())
}
fn contains(&self, cid: &Cid) -> bool {
self.storage.contains(cid)
}
}
高效的块存储实现 - ipfs-embed内部
ipfs-embed使用SQLite来实现块存储,这是一个高性能的嵌入式SQL持久层/数据库。
type Id = u64;
type Atime = u64;
#[derive(Clone)]
struct BlockCache {
// Cid -> Id
lookup: Tree,
// Id -> Cid
cid: Tree,
// Id -> Vec<u8>
data: Tree,
// Id -> Vec<Id>
refs: Tree,
// Id -> Atime
atime: Tree,
// Atime -> Id
lru: Tree,
}
impl BlockCache {
// Updates the atime and lru trees and returns the data from the data tree.
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { .. }
// Returns an iterator of blocks sorted by least recently used.
fn lru(&self) -> impl Iterator<Item = Result<Id>> { self.lru.iter().values() }
// Inserts into all trees.
fn insert(&self, cid: &Cid, data: &[u8]) -> Result<()> { ... }
// Removes from all trees.
fn remove(&self, id: &Id) -> Result<()> { ... }
// Returns the recursive set of references.
fn closure(&self, cid: &Cid) -> Result<Vec<Id>> { ... }
// A stream of insert/remove events, useful for plugging in a network layer.
fn subscribe(&self) -> impl Stream<Item = StorageEvent> { ... }
}
给定操作的描述以及它在树结构中的组织方式,这些操作的实施很简单。
#[derive(Clone)]
struct BlockStorage {
cache: BlockCache,
// Vec<u8> -> Id
alias: Tree,
// Bag of live ids
filter: Arc<Mutex<CuockooFilter>>,
// Id -> Vec<Id>
closure: Tree,
}
impl BlockStorage {
// get from cache
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { self.cache.get(cid) }
// insert to cache
fn insert(&self, cid: &Cid, data: &[u8]) -> Result<()> { self.cache.insert(cid, data) }
// returns the value of the alias tree
fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>> { ... }
// remove the lru block that is not in the bag of live ids and remove it's closure from
// the closure tree
fn evict(&self) -> Result<()> { ... }
// aliasing is an expensive operation, the implementation is sketched in pseudo code
fn alias(&self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
// precompute the closure
let prev_id = self.alias.get(alias)?;
let prev_closure = self.closure.get(&prev_id)?;
let new_id = self.cache.lookup(&cid);
let new_closure = self.cache.closure(&cid);
// lock the filter preventing evictions
let mut filter = self.filter.lock().unwrap();
// make sure that new closure wasn't evicted in the mean time
for id in &new_closure {
if !self.cache.contains_id(&id) {
return Err("cannot alias, missing references");
}
}
// update the live set
for id in &new_closure {
filter.add(id);
}
for id in &prev_closure {
filter.delete(id);
}
// perform transaction
let res = (&self.alias, &self.closure).transaction(|(talias, tclosure)| {
if let Some(id) = prev_id.as_ref() {
talias.remove(alias)?;
}
if let Some(id) = id.as_ref() {
talias.insert(alias, id)?;
tclosure.insert(id, &closure)?;
}
Ok(())
});
// if transaction failed revert live set to previous state
if res.is_err() {
for id in &prev_closure {
filter.add(id);
}
for id in &closure {
filter.delete(id)
}
}
res
}
}
高效同步块DAG - libp2p-bitswap内部
Bitswap是一个非常简单的协议。它已被适配和简化用于ipfs-embed。消息格式可以用以下枚举表示。
pub enum BitswapRequest {
Have(Cid),
Block(Cid),
}
pub enum BitswapResponse {
Have(bool),
Block(Vec<u8>),
}
定位提供者的机制可以被抽象化。可以插入一个dht或集中式数据库查询。Bitswap API如下所示:
pub enum Query {
Get(Cid),
Sync(Cid),
}
pub enum BitswapEvent {
GetProviders(Cid),
QueryComplete(Query, Result<()>),
}
impl Bitswap {
pub fn add_address(&mut self, peer_id: &PeerId, addr: Multiaddr) { .. }
pub fn get(&mut self, cid: Cid) { .. }
pub fn cancel_get(&mut self, cid: Cid) { .. }
pub fn add_provider(&mut self, cid: Cid, peer_id: PeerId) { .. }
pub fn complete_get_providers(&mut self, cid: Cid) { .. }
pub fn poll(&mut self, cx: &mut Context) -> BitswapEvent { .. }
}
当您创建一个GET请求时会发生什么?首先,使用have请求查询初始集的所有提供者。作为一种优化,在每次查询批次中发送一个块请求。如果GET查询找到一个块,它将返回查询完成。如果在初始集中没有找到该块,将发出一个GetProviders(Cid)
事件。这就是bitswap消费者尝试通过例如执行dht查找来定位提供者的地方。这些提供者通过调用add_provider
方法进行注册。在定位提供者完成后,通过调用complete_get_providers
来发出信号。然后查询管理器使用新的提供者集执行bitswap请求,从而找到或找不到块。
我们通常想要同步整个块的dag。我们可以通过添加一个并行运行所有块引用的GET查询的同步查询来有效地同步块的dag。在引用查询中使用拥有块的提供者集作为初始集。为此,我们通过以下调用扩展了我们的API。
/// Bitswap sync trait for customizing the syncing behaviour.
pub trait BitswapSync {
/// Returns the list of blocks that need to be synced.
fn references(&self, cid: &Cid) -> Box<dyn Iterator<Item = Cid>>;
/// Returns if a cid needs to be synced.
fn contains(&self, cid: &Cid) -> bool;
}
impl Bitswap {
pub fn sync(&mut self, cid: Cid, syncer: Arc<dyn BitswapSync>) { .. }
pub fn cancel_sync(&mut self, cid: Cid) { .. }
}
请注意,我们可以通过选择要同步的块子集来任意地自定义同步行为。有关更多信息,请参阅设计模式。
许可协议
MIT OR Apache-2.0
依赖项
~43–82MB
~1.5M SLoC