#reactive #frp #signal

nightly mini-rx

使用中心数据依赖图实现的简易“响应式编程”(变更传播)

2 个版本

0.1.1 2022 年 7 月 21 日
0.1.0 2022 年 7 月 18 日

#702算法

Apache-2.0

70KB
1K SLoC

mini-rx:类似 scala.rx 的微型 响应式编程 变更传播

Cargo 文档

示例

use mini_rx::*;

fn example() {
	// setup
	let side_effect = Cell::new(0);
	let side_effect2 = RefCell::new(String::new());

	// The centralized data dependency graph
	let mut g = RxDAG::new();

	// Create variables which you can set
	let var1 = g.new_var(1);
	let var2 = g.new_var("hello");
	assert_eq!(var1.get(g.now()), &1);
	assert_eq!(var2.get(g.now()), &"hello");
	var1.set(&g, 2);
	var2.set(&g, "world");
	assert_eq!(var1.get(g.now()), &2);
	assert_eq!(var2.get(g.now()), &"world");

	// Create computed values which depend on these variables...
	let crx1 = g.new_crx(move |g| var1.get(g) * 2);
	// ...and other Rxs
	let crx2 = g.new_crx(move |g| format!("{}-{}", var2.get(g), crx1.get(g) * 2));
	// ...and create multiple values which are computed from a single function
	let (crx3, crx4) = g.new_crx2(move |g| var2.get(g).split_at(3));
	assert_eq!(crx1.get(g.now()), &4);
	assert_eq!(crx2.get(g.now()), &"world-8");
	assert_eq!(crx3.get(g.now()), &"wor");
	assert_eq!(crx4.get(g.now()), &"ld");
	var1.set(&g, 3);
	var2.set(&g, &"rust");
	assert_eq!(crx1.get(g.now()), &6);
	assert_eq!(crx2.get(g.now()), &"rust-12");
	assert_eq!(crx3.get(g.now()), &"rus");
	assert_eq!(crx4.get(g.now()), &"t");

	// Run side effects when a value is recomputed
	let var3 = g.new_var(Vec::from("abc"));
	let side_effect_ref = &side_effect;
	let side_effect_ref2 = &side_effect2;
	// borrowed values must outlive g but don't have to be static
	g.run_crx(move |g| {
		side_effect_ref.set(side_effect_ref.get() + var1.get(g));
		side_effect_ref2.borrow_mut().push_str(&String::from_utf8_lossy(var3.get(g)));
	});
	assert_eq!(side_effect.get(), 3);
	assert_eq!(&*side_effect2.borrow(), &"abc");
	var1.set(&g, 4);
	g.recompute();

	assert_eq!(side_effect.get(), 7);
	assert_eq!(&*side_effect2.borrow(), &"abcabc");

	// Note that the dependencies aren't updated until .recompute or .now is called...
	var3.set(&g, Vec::from("xyz"));
	assert_eq!(side_effect.get(), 7);
	assert_eq!(&*side_effect2.borrow(), &"abcabc");
	g.recompute();
	assert_eq!(side_effect.get(), 11);
	assert_eq!(&*side_effect2.borrow(), &"abcabcxyz");

	// the side-effect also doesn't trigger when none of its dependencies change
	var2.set(&g, "rust-lang");
	g.recompute();
	assert_eq!(side_effect.get(), 11);
	assert_eq!(&*side_effect2.borrow(), &"abcabcxyz");
	assert_eq!(crx2.get(g.now()), &"rust-lang-16");

	// lastly we can create derived values which will access or mutate part of the base value
	// which are useful to pass to children
	let dvar = var3.derive_using_clone(|x| &x[0], |x, char| {
		x[0] = char;
	});
	assert_eq!(dvar.get(g.now()), &b'x');
	dvar.set(&g, b'b');
	assert_eq!(dvar.get(g.now()), &b'b');
	assert_eq!(var3.get(g.now()), &b"byz");
	dvar.set(&g, b'f');
	assert_eq!(dvar.get(g.now()), &b'f');
	assert_eq!(var3.get(g.now()), &b"fyz");
	assert_eq!(&*side_effect2.borrow(), &"abcabcxyzbyzfyz");
}

概览

mini-rx 是 Rust 中“响应式编程”的简易实现,仅有一个依赖。它使用手动轮询,并通过将所有值存储在中心数据依赖图 RxDAG 中,与借用检查器良好集成。

响应式编程的类型是 基于信号 的,类似于 scala.rx,但与大多数库(它们是 基于流 的)不同,也许技术上不是 FRP。您不是操作值的流,而是操作变量,这些变量会触发其他计算值的重新计算,这又反过来触发其他值的重新计算和副作用。

关键概念/类型

  • RxDAG:存储所有 Rx。生命周期规则保证在您有活动引用时它们不会改变(见 生命周期
    • new_var:创建一个 Var
    • new_crx:创建一个 CRx
    • run_crx:运行副作用,当访问的任何 VarCRx 发生变化时将重新运行
    • new_crx[n]:创建 nCRx,它们来自单个计算
    • recompute:更新所有 VarCRx 值,但需要一个可变引用,这确保没有对旧值的活跃共享引用
    • now:重新计算然后获取一个 RxContext,这样您就可以获取值。它必须重新计算,因此需要一个可变引用。
    • stale:不会重新计算,但除非调用 recompute,否则不会返回最近设置的值。
  • Var:无依赖的值,您可以显式设置,这会触发更新
  • CRx:从依赖计算得出的值
  • RxContext:允许您读取 VarCRx 值。通过 RxDAG::now 或在计算(RxDAG::new_crx)和副作用(RxDAG::run_crx)中访问。
  • MutRxContext:允许您向Var写入。一个&RxDAG是一个MutRxContext。由于它们是输入,您不能在CRx计算中设置值。

基于信号

响应式编程的类型是基于信号的,这类似于scala.rx,但与大多数库(它们是基于流的)不同。您不是操作值的流,而是操作变量,这些变量会触发其他计算值的重新计算,这又反过来触发其他值的重新计算和副作用。

您可以通过添加一个在触发时推送值的副作用来模拟基于流的响应式性,如下所示

use mini_rx::*;

fn stream_like() {
	let stream = RefCell::new(Vec::new());
	let stream_ref = &stream;
	let input1 = vec![1, 2, 3];
	let input2 = vec![0.5, 0.4, 0.8];
	
	let mut g = RxDAG::new();
	let var1 = g.new_var(0);
	let var2 = g.new_var(0.0);
	let crx = g.new_crx(move |g| *var1.get(g) as f64 + *var2.get(g));
	
	g.run_crx(move |g| {
		stream_ref.borrow_mut().push(*crx.get(g));
	});
	
	assert_eq!(&*stream.borrow(), &vec![0.0]);
	for (a, b) in input1.iter().zip(input2.iter()) {
		var1.set(&g, *a);
		var2.set(&g, *b);
		g.recompute();
	}
	assert_eq!(&*stream.borrow(), &vec![0.0, 1.5, 2.4, 3.8]);
}

对于更传统的基于流的响应式编程,我推荐reactive-rs

生命周期

您不能获得存储在Var内的值的可变引用。相反,您需要使用完全新的值调用Var::setVar::modify。这是因为可能存在对旧Var的活跃引用。当您调用Var::set时,它不会立即更改旧值,因此这些引用不会改变。

为了实际更新响应式值并运行副作用,您必须调用RxDAG::recompute或内部调用recompute的函数,如RxDAG::now。为此,您需要一个对RxDAG的 mutable 引用,您只能在没有任何活跃引用到任何响应式值的情况下获得它。

此外,RxDAG中的任何计算函数都必须比RxDAG本身存在更长时间。这是因为该函数可能在RxDAG存活时被调用,当它被重新计算时。因此,如果您有在CRx计算或副作用中引用的值,您必须要么在RxDAG之前声明它们,要么使用类似于Weak引用的东西来确保在它们被使用时仍然存活。

为什么?基于信号的响应式编程 101

这里是一个在编程中经常遇到的情况:您有一个值a,它始终等于b + c。您不希望a是一个函数,但每当bc更改时,a必须被重新计算。

或者这里还有另一种情况:您有一个动作,它在值更改时始终运行,并将更新的值发送到服务器。

您可以将这些串联起来。也许您在更新时要发送到服务器的值是a。也许bc是从其他值计算出来的,例如d, e, f等。对于理论家来说,您最终有一个值和依赖关系的DAG(有向无环图)。修改其中一个根节点,它将触发一系列的计算和副作用。

依赖关系

~1.5MB
~36K SLoC