1个不稳定版本
0.11.0 | 2021年2月6日 |
---|
#361在内存管理中
20KB
420 行
ipfs-embed
一个小型、快速且可靠的ipfs实现,专为嵌入到复杂的p2p应用而设计。
- 通过mdns进行节点发现
- 通过kademlia进行提供者发现
- 通过bitswap交换块
- LRU驱逐策略
- 别名,递归命名引用的抽象
- 用于构建dag的临时递归引用,防止与垃圾回收器的竞争
- 高效同步大型dag的块
可以通过启用compat
功能标志来启用与go-ipfs的一些兼容性。
入门
use ipfs_embed::{Config, DefaultParams, Ipfs};
use libipld::DagCbor;
use libipld::store::Store;
#[derive(Clone, DagCbor, Debug, Eq, PartialEq)]
struct Identity {
id: u64,
name: String,
age: u8,
}
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cache_size = 10;
let ipfs = Ipfs::<DefaultParams>::new(Config::new(None, cache_size)).await?;
ipfs.listen_on("/ip4/0.0.0.0/tcp/0".parse()?).await?;
let identity = Identity {
id: 0,
name: "David Craven".into(),
age: 26,
};
let cid = ipfs.insert(&identity)?;
let identity2 = ipfs.get(&cid)?;
assert_eq!(identity, identity2);
println!("identity cid is {}", cid);
Ok(())
}
以下是一些关于ipfs-embed历史的说明。这些信息对于当前实现可能不再准确。
什么是ipfs?
Ipfs是一个p2p网络,用于定位和提供称为块的地址内容数据块。
内容寻址意味着数据通过其哈希值定位,而不是位置寻址。
不出所料,这是通过分布式哈希表来完成的。为了避免在DHT中存储大量数据,DHT存储了拥有块的对等节点。在确定提供块的对等节点后,将从这些节点请求块。
为了验证对等节点正在发送请求的块而不是无限垃圾流,块需要有限的大小。在实践中,我们将假设最大块大小为1MB。
将任意数据编码到1MB块中对编解码器提出了两个要求。它需要有一个规范表示,以确保相同的数据产生相同的哈希值,并且它需要支持链接到其他地址内容块。具有这两个属性的编解码器称为ipld编解码器。
从内容寻址(将边表示为节点的哈希值)得出的一个属性是,无法实现任意块的图。块图保证是定向和无环的。
{"a":3}
{
"a": 3,
}
{"/":"QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u"}
块存储
让我们从一个简单的持久块存储模型开始。
trait BlockStorage {
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
fn insert(&mut self, cid: &Cid, data: &[u8]) -> Result<()>;
fn remove(&mut self, cid: &Cid) -> Result<()>;
}
由于内容寻址块构成了一个有向无环图,因此不能简单删除块。一个块可能被多个节点引用,因此需要某种形式的引用计数和垃圾回收来确定何时可以安全地删除块。为了在P2P网络中成为一个好的对等节点,我们可能希望保留其他节点可能需要的旧块。因此,将其视为引用计数的缓存可能是一个更合适的模型。最终我们得到如下所示的内容。
trait BlockStorage {
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
fn insert(&mut self, cid: &Cid, data: &[u8], references: &[Cid]) -> Result<()>;
fn evict(&mut self) -> Result<()>;
fn pin(&mut self, cid: &Cid) -> Result<()>;
fn unpin(&mut self, cid: &Cid) -> Result<()>;
}
要修改一个块,我们需要执行三个步骤:获取块、修改并插入修改后的块,最后删除旧块。我们还需要从键到CID的映射,因此需要更多的步骤。这些步骤中的任何一步都可能失败,导致块存储处于不一致的状态,从而引发数据泄露。为了防止数据泄露,每个API消费者都必须实现预写日志。为了解决这些问题,我们在存储中扩展了名为别名(aliases)的命名引脚。
trait BlockStorage {
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
fn insert(&mut self, cid: &Cid, data: &[u8], references: &[Cid]) -> Result<()>;
fn evict(&mut self) -> Result<()>;
fn alias(&mut self, alias: &[u8], cid: Option<&Cid>) -> Result<()>;
fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>>;
}
假设每个操作都是原子性和持久的,我们就有了存储内容寻址块DAG所需的最小操作集。
网络化块存储 - ipfs-embed API
impl Ipfs {
pub fn new(storage: Arc<S>, network: Arc<N>) -> Self { .. }
pub fn local_peer_id(&self) -> &PeerId { .. }
pub async fn listeners(&self) -> Vec<Multiaddr> { .. }
pub async fn external_addresses(&self) -> Vec<Multiaddr> { .. }
pub async fn pinned(&self, cid: &Cid) -> Result<Option<bool>> { .. }
pub async fn get(&self, cid: &Cid) -> Result<Block> {
if let Some(block) = self.storage.get(cid)? {
return Ok(block);
}
self.network.get(cid).await?;
if let Some(block) = self.storage.get(cid)? {
return Ok(block);
}
log::error!("block evicted too soon");
Err(BlockNotFound(*cid))
}
pub async fn insert(&self, cid: &Cid) -> Result<()> {
self.storage.insert(cid)?;
self.network.provide(cid)?;
Ok(())
}
pub async fn alias(&self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
if let Some(cid) = cid {
self.network.sync(cid).await?;
}
self.storage.alias(alias, cid).await?;
Ok(())
}
pub async fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>> {
self.storage.resolve(alias)?;
Ok(())
}
}
设计模式 - ipfs-embed的实际应用
我们将探讨在chain
示例中使用的某些模式。该chain
示例使用ipfs-embed
存储一系列块。一个块被定义为
#[derive(Debug, Default, DagCbor)]
pub struct Block {
prev: Option<Cid>,
id: u32,
loopback: Option<Cid>,
payload: Vec<u8>,
}
原子性
在这个例子中,我们有两个数据库。一个是ipfs-embed
管理的,用于存储块和别名;另一个是针对示例的特定数据库,它将块索引映射到块CID,这样我们就可以快速查找块,而无需遍历整个链。为了保证原子性,我们定义了两个别名,并在两个步骤中执行同步。这确保了同步的链始终对其块进行索引。
const TMP_ROOT: &str = alias!(tmp_root);
const ROOT: &str = alias!(root);
ipfs.alias(TMP_ROOT, Some(new_root)).await?;
for _ in prev_root_id..new_root_id {
// index block may error for various reasons
}
ipfs.alias(ROOT, Some(new_root)).await?;
dag化
递归同步算法在同步链时性能最差,因为每个块都需要依次同步,不能利用任何并行性。为了解决这个问题,我们通过包括循环回环来增加链的链接,以增加dag的分支。
为此目的,@rklaehn提出了一个算法
fn loopback(block: usize) -> Option<usize> {
let x = block.trailing_zeros();
if x > 1 && block > 0 {
Some(block - (1 << (x - 1)))
} else {
None
}
}
选择器
同步可能需要很长时间,并且不允许选择所需数据的子集。为此,有一个实验性的alias_with_syncer
API,允许自定义同步行为。在链示例中,它用于提供块验证,以确保块的有效性。尽管这个API将来可能会改变。
pub struct ChainSyncer<S: StoreParams, T: Storage<S>> {
index: sled::Db,
storage: BitswapStorage<S, T>,
}
impl<S: StoreParams, T: Storage<S>> BitswapSync for ChainSyncer<S, T>
where
S::Codecs: Into<DagCborCodec>,
{
fn references(&self, cid: &Cid) -> Box<dyn Iterator<Item = Cid>> {
if let Some(data) = self.storage.get(cid) {
let ipld_block = libipld::Block::<S>::new_unchecked(*cid, data);
if let Ok(block) = ipld_block.decode::<DagCborCodec, Block>() {
return Box::new(block.prev.into_iter().chain(block.loopback.into_iter()));
}
}
Box::new(std::iter::empty())
}
fn contains(&self, cid: &Cid) -> bool {
self.storage.contains(cid)
}
}
高效的块存储实现 - ipfs-embed内部机制
Ipfs embed使用SQLite实现块存储,这是一个性能良好的可嵌入的SQL持久层/数据库。
type Id = u64;
type Atime = u64;
#[derive(Clone)]
struct BlockCache {
// Cid -> Id
lookup: Tree,
// Id -> Cid
cid: Tree,
// Id -> Vec<u8>
data: Tree,
// Id -> Vec<Id>
refs: Tree,
// Id -> Atime
atime: Tree,
// Atime -> Id
lru: Tree,
}
impl BlockCache {
// Updates the atime and lru trees and returns the data from the data tree.
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { .. }
// Returns an iterator of blocks sorted by least recently used.
fn lru(&self) -> impl Iterator<Item = Result<Id>> { self.lru.iter().values() }
// Inserts into all trees.
fn insert(&self, cid: &Cid, data: &[u8]) -> Result<()> { ... }
// Removes from all trees.
fn remove(&self, id: &Id) -> Result<()> { ... }
// Returns the recursive set of references.
fn closure(&self, cid: &Cid) -> Result<Vec<Id>> { ... }
// A stream of insert/remove events, useful for plugging in a network layer.
fn subscribe(&self) -> impl Stream<Item = StorageEvent> { ... }
}
鉴于操作的描述以及它在树结构中的组织方式,这些操作的实施是直接的。
#[derive(Clone)]
struct BlockStorage {
cache: BlockCache,
// Vec<u8> -> Id
alias: Tree,
// Bag of live ids
filter: Arc<Mutex<CuockooFilter>>,
// Id -> Vec<Id>
closure: Tree,
}
impl BlockStorage {
// get from cache
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { self.cache.get(cid) }
// insert to cache
fn insert(&self, cid: &Cid, data: &[u8]) -> Result<()> { self.cache.insert(cid, data) }
// returns the value of the alias tree
fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>> { ... }
// remove the lru block that is not in the bag of live ids and remove it's closure from
// the closure tree
fn evict(&self) -> Result<()> { ... }
// aliasing is an expensive operation, the implementation is sketched in pseudo code
fn alias(&self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
// precompute the closure
let prev_id = self.alias.get(alias)?;
let prev_closure = self.closure.get(&prev_id)?;
let new_id = self.cache.lookup(&cid);
let new_closure = self.cache.closure(&cid);
// lock the filter preventing evictions
let mut filter = self.filter.lock().unwrap();
// make sure that new closure wasn't evicted in the mean time
for id in &new_closure {
if !self.cache.contains_id(&id) {
return Err("cannot alias, missing references");
}
}
// update the live set
for id in &new_closure {
filter.add(id);
}
for id in &prev_closure {
filter.delete(id);
}
// perform transaction
let res = (&self.alias, &self.closure).transaction(|(talias, tclosure)| {
if let Some(id) = prev_id.as_ref() {
talias.remove(alias)?;
}
if let Some(id) = id.as_ref() {
talias.insert(alias, id)?;
tclosure.insert(id, &closure)?;
}
Ok(())
});
// if transaction failed revert live set to previous state
if res.is_err() {
for id in &prev_closure {
filter.add(id);
}
for id in &closure {
filter.delete(id)
}
}
res
}
}
高效同步块DAG - libp2p-bitswap内部机制
Bitswap是一个非常简单的协议。它被修改并简化为ipfs-embed。消息格式可以用以下枚举表示。
pub enum BitswapRequest {
Have(Cid),
Block(Cid),
}
pub enum BitswapResponse {
Have(bool),
Block(Vec<u8>),
}
定位提供者的机制可以抽象化。可以插入dht或集中式数据库查询。bitswap API如下所示
pub enum Query {
Get(Cid),
Sync(Cid),
}
pub enum BitswapEvent {
GetProviders(Cid),
QueryComplete(Query, Result<()>),
}
impl Bitswap {
pub fn add_address(&mut self, peer_id: &PeerId, addr: Multiaddr) { .. }
pub fn get(&mut self, cid: Cid) { .. }
pub fn cancel_get(&mut self, cid: Cid) { .. }
pub fn add_provider(&mut self, cid: Cid, peer_id: PeerId) { .. }
pub fn complete_get_providers(&mut self, cid: Cid) { .. }
pub fn poll(&mut self, cx: &mut Context) -> BitswapEvent { .. }
}
创建GET请求时会发生什么?首先,使用have请求查询初始集中的所有提供者。作为一个优化,在每批查询中发送一个块请求。如果GET查询找到一个块,则返回查询完成。如果块在初始集中未找到,则发出一个GetProviders(Cid)
事件。这是位交换消费者尝试通过例如执行dht查找来定位提供者的地方。通过调用add_provider
方法注册这些提供者。在定位提供者完成之后,通过调用complete_get_providers
来发出信号。然后,查询管理器使用新的提供者集执行bitswap请求,这导致找到块或未找到块错误。
通常我们想要同步整个dag的块。我们可以通过添加一个同步查询来有效地同步块的dag,该查询为块的引用并行运行GET查询。已拥有块的提供者集用作参考查询的初始集。为此,我们通过以下调用扩展了我们的API。
/// Bitswap sync trait for customizing the syncing behaviour.
pub trait BitswapSync {
/// Returns the list of blocks that need to be synced.
fn references(&self, cid: &Cid) -> Box<dyn Iterator<Item = Cid>>;
/// Returns if a cid needs to be synced.
fn contains(&self, cid: &Cid) -> bool;
}
impl Bitswap {
pub fn sync(&mut self, cid: Cid, syncer: Arc<dyn BitswapSync>) { .. }
pub fn cancel_sync(&mut self, cid: Cid) { .. }
}
注意,我们可以通过选择要同步的块子集来任意地自定义同步行为。有关更多信息,请参阅设计模式。
许可证
MIT OR Apache-2.0
依赖项
~32–44MB
~750K SLoC