4个版本

0.2.0 2023年9月6日
0.1.2 2023年8月24日
0.1.1 2023年8月23日
0.1.0 2023年8月23日

#603算法 分类中

每月48次下载
用于 物化视图

MIT/Apache

4MB
40K SLoC

DBSP

数据库流处理器(DBSP)是一个用于持续分析变化数据的计算引擎。使用DBSP,程序员以完整数据集上的计算方式编写代码,但DBSP以增量方式实现,这意味着数据集的变化运行时间与变化的大小成比例,而不是数据集的大小。这对于处理频繁以小幅度变化的大型数据集的应用程序来说是一个主要优势。

DBSP计算引擎是 Feldera连续分析平台 的一个组件。

资源

学习材料

文档

反馈

您可以给我们提供反馈的一些方式

示例

use dbsp::{operator::FilterMap, IndexedZSet, OrdIndexedZSet, OutputHandle, Runtime};

type Node = usize;
type Weight = isize;

// Creates a series of graph edges, then counts the number of times each source
// node appears, then counts the number of times each count appears.
fn main() {
    let edges = 100; // Number of initial edges in the graph.
    let sources = 13; // Number of source nodes in the graph.
    let extra = 5; // Number of extra edges added later to the graph.
    let threads = 2; // Number of threads.

    let (mut dbsp, (hedges, degrees, distribution)) = Runtime::init_circuit(threads, |circuit| {
        let (edges, hedges) = circuit.add_input_zset::<(Node, Node), Weight>();

        // Count the number of edges with each node as its source (each node's
        // out-degree).
        let degrees = edges.map(|(src, _dst)| *src).weighted_count();

        // Count the number of nodes with each out-degree.
        let distribution = degrees.map(|(_src, count)| *count).weighted_count();

        Ok((hedges, degrees.output(), distribution.output()))
    })
    .unwrap();

    // Add some initial edges and print the results.
    for i in 0..edges {
        hedges.push((i % sources, i % 7), 1);
    }
    dbsp.step().unwrap();
    println!("Initialization:");
    print_changes(&degrees, &distribution);

    // Add a few more nodes and print the changes.
    for i in 0..extra {
        hedges.push((i % sources, i % 9), 1);
    }
    dbsp.step().unwrap();
    println!("Changes:");
    print_changes(&degrees, &distribution);

    dbsp.kill().unwrap();
}

fn print_changes(
    degrees: &OutputHandle<OrdIndexedZSet<Node, isize, Weight>>,
    distribution: &OutputHandle<OrdIndexedZSet<isize, isize, Weight>>,
) {
    for (src, outdegree, weight) in degrees.consolidate().iter() {
        println!("    {weight:+}: Node {src} has out-degree {outdegree}");
    }
    for (outdegree, count, weight) in distribution.consolidate().iter() {
        println!("    {weight:+}: {count} nodes have out-degree {outdegree}");
    }
}

依赖项

~15–27MB
~391K SLoC