3 个不稳定版本

0.2.1 2022年1月4日
0.2.0 2021年4月12日
0.1.0 2021年4月5日

并发中排名第490

MIT许可证

45KB
466

共享资源池构建器

一个生产者-消费者线程池构建器,最终会使用共享资源按顺序处理所有项目。

代码示例

这个库的最佳用途可以通过一个示例来描述,所以让我们直接进入正题

use shared_resource_pool_builder::SharedResourcePoolBuilder;

// Define a pool builder with a Vector as shared resource, and an action that
// should be executed sequentially over all items. Note that the second
// parameter of this function is a shared consumer function that takes two
// parameters: The shared resource and the items it will be used on. These
// items will be returned from the pool consumers.
let pool_builder = SharedResourcePoolBuilder::new(
    Vec::new(),
    |vec, i| vec.push(i)
);

// Create a producer-consumer thread pool that does some work. In this case,
// send the numbers 0 to 2 to the consumer. There, they will be multiplied by
// 10. Note that the first parameter is a producer function that takes a
// Sender like object, like [`std::sync::mpsc::Sender`]. Every item that gets
// sent will be processed by the consumer, which is the second parameter to
// this function. This function expects an object of the same type that gets
// sent by the producer. The consumer is expected to return an object of the
// type that the shared consumer takes as the second parameter and will
// ultimately used on the shared resource.
let pool_1 = pool_builder
    .create_pool(
        |tx| (0..=2).for_each(|i| tx.send(i).unwrap()),
        |i| i * 10
    );

// Create a second pool. Here, the numbers 3 to 5 will be produced, multiplied
// by 2, and being processed by the shared consumer function.
let pool_2 = pool_builder
    .create_pool(
        |tx| (3..=5).for_each(|i| tx.send(i).unwrap()),
        |i| i * 2
    );

// Wait for a specific pool to finish before continuing ...
pool_1.join().unwrap();

// ... or wait for all pools and the shared consumer to finish their work at
// once and return the shared resource. Afterwards, the pool builder can no
// longer be used, since the shared resource gets moved.
let result = {
    let mut result = pool_builder.join().unwrap();

    // By default, the pool consumers run in as many threads as there are
    // cores in the machine, so the result may not be in the same order as
    // they were programmed.
    result.sort();
    result
};

assert_eq!(result, vec![0, 6, 8, 10, 10, 20]);

动机

想象一下,你有一个需要处理的大量项目列表。每个项目都可以独立处理,所以自然的方法是使用线程池来并行化工作。但想象一下,生成的项目需要与一个不支持多线程的资源一起使用。生成的项目将必须按顺序处理,因此不需要与多个线程共享资源。

作为一个更具体的例子,想象一个程序,它使用SQLite数据库来存储计算密集型的结果。在实际工作之前,你想要从数据库中删除过时的结果。然后你需要收集所有需要处理的新数据。最后,你想要处理所有新数据。

通常,这涉及三个顺序步骤:清理、收集和处理。这些步骤必须是顺序的,因为在同一个SQLite数据库上进行的写操作一次只能由一个线程执行。你可能想实现同步,以便一个线程写入,所有其他线程都阻塞并等待它们的轮次。但如果操作不当,这可能会出错。你可能会遇到锁定或繁忙数据库的随机恐慌。此外,你会失去有效使用预处理语句的能力。

这就是shared-resource-pool-builder的作用。有了这个,你可以定义一个共享资源和一个共享消费者函数。从这个构建器中,你可以创建生产者-消费者池。然后,将按顺序将消费的项目交给共享消费者函数以应用于共享资源。

池消费者将在一个共享线程池中运行,因此你不必担心系统抖动。你甚至可以限制并行处理的项目数量。

继续以数据库为例,这意味着你可以定义一个池构建器,将数据库连接作为共享资源,并定义一个共享消费者函数,该函数将删除、插入或更新项目。你可以使用枚举来实现这一点。

然后,对于清理步骤,您创建一个池,从数据库生成所有项目,并通过返回一个对象来消耗它们,使项目要么被删除(如果已过时),要么被共享消费者忽略。

接下来,您可以创建一个池,生成所有新项目,并通过为每个项目创建插入操作来消耗它们。

根据数据库布局,这两个池甚至可以并行运行。多亏了共享消费者,同时只运行一个删除或插入操作。

同样,根据布局,您只需等待插入池完成,然后创建另一个池,该池生成所有需要处理的新项目,并通过实际工作和返回适当的更新对象来消耗它们。同时,清理池也可以继续其工作。

最后,您等待所有池完成后再终止程序。

这样,您就不必担心同步多个写线程或多个打开的数据库连接。只需将工作拆分,让计算密集型任务在池消费者中完成,让共享消费者只做最终化。

依赖项

约120KB