3 个不稳定版本
0.2.1 | 2022年1月4日 |
---|---|
0.2.0 | 2021年4月12日 |
0.1.0 | 2021年4月5日 |
在并发中排名第490
45KB
466 行
共享资源池构建器
一个生产者-消费者线程池构建器,最终会使用共享资源按顺序处理所有项目。
代码示例
这个库的最佳用途可以通过一个示例来描述,所以让我们直接进入正题
use shared_resource_pool_builder::SharedResourcePoolBuilder;
// Define a pool builder with a Vector as shared resource, and an action that
// should be executed sequentially over all items. Note that the second
// parameter of this function is a shared consumer function that takes two
// parameters: The shared resource and the items it will be used on. These
// items will be returned from the pool consumers.
let pool_builder = SharedResourcePoolBuilder::new(
Vec::new(),
|vec, i| vec.push(i)
);
// Create a producer-consumer thread pool that does some work. In this case,
// send the numbers 0 to 2 to the consumer. There, they will be multiplied by
// 10. Note that the first parameter is a producer function that takes a
// Sender like object, like [`std::sync::mpsc::Sender`]. Every item that gets
// sent will be processed by the consumer, which is the second parameter to
// this function. This function expects an object of the same type that gets
// sent by the producer. The consumer is expected to return an object of the
// type that the shared consumer takes as the second parameter and will
// ultimately used on the shared resource.
let pool_1 = pool_builder
.create_pool(
|tx| (0..=2).for_each(|i| tx.send(i).unwrap()),
|i| i * 10
);
// Create a second pool. Here, the numbers 3 to 5 will be produced, multiplied
// by 2, and being processed by the shared consumer function.
let pool_2 = pool_builder
.create_pool(
|tx| (3..=5).for_each(|i| tx.send(i).unwrap()),
|i| i * 2
);
// Wait for a specific pool to finish before continuing ...
pool_1.join().unwrap();
// ... or wait for all pools and the shared consumer to finish their work at
// once and return the shared resource. Afterwards, the pool builder can no
// longer be used, since the shared resource gets moved.
let result = {
let mut result = pool_builder.join().unwrap();
// By default, the pool consumers run in as many threads as there are
// cores in the machine, so the result may not be in the same order as
// they were programmed.
result.sort();
result
};
assert_eq!(result, vec![0, 6, 8, 10, 10, 20]);
动机
想象一下,你有一个需要处理的大量项目列表。每个项目都可以独立处理,所以自然的方法是使用线程池来并行化工作。但想象一下,生成的项目需要与一个不支持多线程的资源一起使用。生成的项目将必须按顺序处理,因此不需要与多个线程共享资源。
作为一个更具体的例子,想象一个程序,它使用SQLite数据库来存储计算密集型的结果。在实际工作之前,你想要从数据库中删除过时的结果。然后你需要收集所有需要处理的新数据。最后,你想要处理所有新数据。
通常,这涉及三个顺序步骤:清理、收集和处理。这些步骤必须是顺序的,因为在同一个SQLite数据库上进行的写操作一次只能由一个线程执行。你可能想实现同步,以便一个线程写入,所有其他线程都阻塞并等待它们的轮次。但如果操作不当,这可能会出错。你可能会遇到锁定或繁忙数据库的随机恐慌。此外,你会失去有效使用预处理语句的能力。
这就是shared-resource-pool-builder
的作用。有了这个,你可以定义一个共享资源和一个共享消费者函数。从这个构建器中,你可以创建生产者-消费者池。然后,将按顺序将消费的项目交给共享消费者函数以应用于共享资源。
池消费者将在一个共享线程池中运行,因此你不必担心系统抖动。你甚至可以限制并行处理的项目数量。
继续以数据库为例,这意味着你可以定义一个池构建器,将数据库连接作为共享资源,并定义一个共享消费者函数,该函数将删除、插入或更新项目。你可以使用枚举来实现这一点。
然后,对于清理步骤,您创建一个池,从数据库生成所有项目,并通过返回一个对象来消耗它们,使项目要么被删除(如果已过时),要么被共享消费者忽略。
接下来,您可以创建一个池,生成所有新项目,并通过为每个项目创建插入操作来消耗它们。
根据数据库布局,这两个池甚至可以并行运行。多亏了共享消费者,同时只运行一个删除或插入操作。
同样,根据布局,您只需等待插入池完成,然后创建另一个池,该池生成所有需要处理的新项目,并通过实际工作和返回适当的更新对象来消耗它们。同时,清理池也可以继续其工作。
最后,您等待所有池完成后再终止程序。
这样,您就不必担心同步多个写线程或多个打开的数据库连接。只需将工作拆分,让计算密集型任务在池消费者中完成,让共享消费者只做最终化。
依赖项
约120KB