6 个版本
使用旧的 Rust 2015
0.3.2 | 2019 年 2 月 17 日 |
---|---|
0.3.1 | 2019 年 2 月 16 日 |
0.3.0 | 2019 年 1 月 20 日 |
0.2.1 | 2018 年 11 月 18 日 |
0.1.0 | 2018 年 11 月 16 日 |
#777 在 并发
在 smartpool-spatial 中使用
110KB
2K SLoC
Smartpool
Smartpool 是一个具有未来意识的线程池,旨在提供对池行为的极大控制。虽然需要一点代码来启动,但在许多高背压情况下或池必须以特定方式关闭的情况下,它可以保持反应性。如果池中缺少某些特定的优先级方案,则此库可以很容易地扩展以实现所需的行为。
功能
- 高性能并发
- 未来意识
- 多个优先级级别
- 可定制任务优先级行为
- 按池关闭时的行为对任务通道进行分区
- 范围操作
- 计划操作
- 空间优先级,使用
smartpool-spatial
crate - 在稳定通道上使用
- 循环调度
- 最早截止日期优先
- 可扩展
- 数据驱动
- BPA-free
- 比我们的竞争对手多 20% 的 gucci,并且有 70% 的更多术语
概念
smartpool 架构有几个基础概念
- 通道
- 级别
- 调度器
- 池
通道
通道是一些可以轮询的任务源。通常,通道是一种任务队列,程序员可以将其插入任务。这些通道可以具有任何奇特的优先级方法,例如 最早截止日期优先 或 空间优先级。
从抽象的角度来看,通道实际上不必是可以插入的队列;它只需要是可轮询的。例如,通道可以用来挖掘比特币,模拟蛋白质折叠或异步从网络下载任务。
级别
级别是一组具有相同 优先级类别 的通道。这听起来有些模糊,因为级别的确切含义取决于调度器。
调度器
调度器确定池如何优先考虑不同的级别。虽然通道可以完全扩展,但不幸的是,调度器不能;它们目前需要修改池的核心逻辑。就本项目的初始版本而言,存在两个调度器
- 从高级开始
- 池将始终从可用的最高级别任务中进行轮询。
- 轮询
- 池将在不同级别之间循环交替,并为每个级别分配特定的实际时间长度。
池
池是实际线程和其他数据的集合,用于管理它。池分为两部分
- 一个
OwnedPool
,表示对池的所有权,并可用于关闭池并等待其正确关闭 - 一个
Arc<Pool>
,一个可共享的句柄,可以用来访问其通道。
后续
Smartpool是一个感知未来的线程池。这意味着它可以有效地以惯用的方式处理可以暂停的协程。如果任务暂停,它必须被重新插入某个通道。在任务从通道取出并暂停后,它会被重新插入某个后续通道。具体决定哪个通道成为后续通道是由用户配置的。
它可以提高通道的后续通道的平均完成时间,使其优先级高于原始通道。实质上,这是因为这导致线程池在开始任务后“承诺”完成该任务,而不是被其他任务“分心”。
关闭
可以使用OwnedPool
来正确关闭其对应的线程池。关闭方法产生一个future,它可以被丢弃或等待。
每个通道都可以配置是否是complete_on_close
。当线程池关闭时,如果它仍然包含任务,所有在complete_on_close
通道中的任务都将运行到完成,而其他通道中的任务将被完全忽略(并最终丢弃)。
即使来自complete_on_close
通道的协程当前正在某个外部条件下阻塞,线程池也会等待协程唤醒,然后驱动它完成,然后再关闭。
示例代码
extern crate smartpool;
extern crate atomic;
extern crate num_cpus;
extern crate futures;
use futures::prelude::*;
use smartpool::prelude::*;
use std::sync::Arc;
// we will use a submodule to define our pool behavior
mod my_pool {
// the prelude::setup module is a special prelude for defining pool behavior
use smartpool::prelude::setup::*;
use num_cpus;
// enum for a channel in the pool
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum MyPoolChannel {
Realtime,
Responsive,
Backlog,
BacklogDeadline,
}
// the pool behavior
pub struct MyPool {
// we can only submit time-critical tasks here
// the multi channel creates an array of the inner channel type, and alternates
// between them round-robin using an atomic integer.
// this reduces thread contention.
pub realtime: MultiChannel<VecDequeChannel>,
// we can submit tasks here which should be executed soonish, but try to keep backpressure low
pub responsive: MultiChannel<VecDequeChannel>,
// the backlog can be a lot of extra stuff that needs to get run
pub backlog: MultiChannel<VecDequeChannel>,
// another backlog channel with a SDF scheduler
pub backlog_deadline: MultiChannel<ShortestDeadlineFirst>,
}
impl MyPool {
// constructor is useful
pub fn new() -> Self {
MyPool {
realtime: MultiChannel::new(4, VecDequeChannel::new),
responsive: MultiChannel::new(4, VecDequeChannel::new),
backlog: MultiChannel::new(4, VecDequeChannel::new),
backlog_deadline: MultiChannel::new(4, ShortestDeadlineFirst::new),
}
}
}
// implementing pool behavior allows it to initialize a threadpool
impl PoolBehavior for MyPool {
type ChannelKey = MyPoolChannel;
// configuration data, queried once by the pool
fn config(&mut self) -> PoolConfig<Self> {
PoolConfig {
// as many threads as we have CPU
threads: num_cpus::get() as u32,
// use the highest first scheduler
schedule: ScheduleAlgorithm::HighestFirst,
// levels, highest to lowest
levels: vec![
// level 0 is just the realtime channel
// it will complete on close
vec![ChannelParams {
key: MyPoolChannel::Realtime,
complete_on_close: true,
}],
// level 1 is just the responsive channel
// it will complete on close
vec![ChannelParams {
key: MyPoolChannel::Responsive,
complete_on_close: true,
}],
// level 2 is both backlog channels, together
// they will not complete on close
vec![
ChannelParams {
key: MyPoolChannel::Backlog,
complete_on_close: false,
},
ChannelParams {
key: MyPoolChannel::BacklogDeadline,
complete_on_close: false,
}
]
]
}
}
// boilerplate for the engine to access the channels with no dynamic dispatch
fn touch_channel<O>(&self, key: MyPoolChannel, mut toucher: impl ChannelToucher<O>) -> O {
match key {
MyPoolChannel::Realtime => toucher.touch(&self.realtime),
MyPoolChannel::Responsive => toucher.touch(&self.responsive),
MyPoolChannel::Backlog => toucher.touch(&self.backlog),
MyPoolChannel::BacklogDeadline => toucher.touch(&self.backlog_deadline),
}
}
// boilerplate for the engine to access the channels with no dynamic dispatch
fn touch_channel_mut<O>(&mut self, key: MyPoolChannel, mut toucher: impl ChannelToucherMut<O>) -> O {
match key {
MyPoolChannel::Realtime => toucher.touch_mut(&mut self.realtime),
MyPoolChannel::Responsive => toucher.touch_mut(&mut self.responsive),
MyPoolChannel::Backlog => toucher.touch_mut(&mut self.backlog),
MyPoolChannel::BacklogDeadline => toucher.touch_mut(&mut self.backlog_deadline),
}
}
// given a running task which has been produced by a particular channel, then yielded,
// then become available once again, re-insert it in some channel
fn followup(&self, from: MyPoolChannel, task: RunningTask) {
match from {
MyPoolChannel::Realtime => self.realtime.submit(task),
MyPoolChannel::Responsive => self.responsive.submit(task),
MyPoolChannel::Backlog => self.responsive.submit(task),
MyPoolChannel::BacklogDeadline => self.responsive.submit(task),
}
}
}
}
use self::my_pool::MyPool;
fn main() {
// create the threadpool
let owned: OwnedPool<MyPool> = OwnedPool::new(MyPool::new()).unwrap();
let pool: Arc<Pool<MyPool>> = owned.pool.clone();
// spawn a task (use the run function to create a task of pure work)
pool.realtime.exec(run(|| {
let mut a: u128 = 0;
let mut b: u128 = 1;
for _ in 0..100 {
let sum = a + b;
a = b;
b = sum;
}
println!("{}", a);
}));
// close the pool, and wait for it to finish closing
owned.close().wait().unwrap();
}
范围操作
scoped
模块允许以安全的方式执行可以引用局部调用栈的批量操作。这有助于避免跨多线程任务共享数据所带来的性能和优雅性的惩罚。
计划操作
timescheduler
模块允许创建在未来的某个时刻变得可用或以定期间隔流数据的futures。这允许调度未来发生的操作。
可扩展性
可以通过实现Channel
接口以及可选的Exec
或ExecParam
,轻松地通过第三方库扩展池的行为。
算法
内部算法相对简单,经过抽象成几个级别并分离成模块。没有线程与任何其他线程协作,每个线程的行为都相同。每个线程只是尝试找到具有可用任务的最高优先级级别,并从该级别的某个通道中轮询一个任务,然后运行它。当同一级别的通道有多个时,它只是通过一个原子整数在这些通道之间轮询,以轮询的方式交替。每个通道内部使用一个数据结构,用互斥锁包裹,并通过使用内部可变性来推送和轮询。
然而,此算法仍然存在一些需要解决的问题
- 池如何跟踪哪些通道和级别中包含任务?
- 当没有任务时,池是如何正确地进入睡眠状态,而当任务在某处可用时又如何醒来,同时不引入全局争用呢?
- 如何以最小成本的方式将可以异步产生和通知的协程概念集成到池中?
状态字段
前两个问题通过位于其自己的crate中的atomicmonitor
抽象来解决。概括来说,原子监视器就像是一个对原子数据的监视器,这些数据是原子性修改的,并且通常比传统监视器更不具争议性。
每个互斥锁保护的数据结构,这些数据结构可能对应于特定通道中任务的可用性(通常每个通道1个),对应于共享的AtomicMonitor<u64>
中的位。这个原子u64
是一个位域,其中每个位表示特定互斥锁保护结构中是否有任务。通道只需在向其队列数据结构添加或删除元素时维护该位的有效性。当特定位的值发生变化时,使用原子位运算AND和OR指令更新原子监视器。
当线程池正在检查特定级别中是否有任务时,它可以直接加载位域,用该级别中所有通道对应的位掩码进行AND运算,并检查是否等于0x0
。
然而,更重要的是,为了使线程池以最小成本的方式正确处理睡眠,它只需在位域不等于0x0
的条件下在原子监视器上阻塞即可。
原子责任转移
第三个问题,关于有效地处理与Rust的所有权方案相关的协程,通过为这个线程池发明的一种算法来解决。
当smartpool在协程上运行poll_future_notify
时,它会尝试将协程驱动到完成。在此点,协程可以返回两个可能的结果
- 完成(成功或失败)
- 阻塞
如果未来返回阻塞,则在未来的某个时刻,协程将变得可用,我们传递给poll_future_notify
的回调将被激活。
这似乎表明了一种处理通知的简单方法:让通知处理程序回调简单地将任务重新提交到后续队列。然而,有一个架构细节使这个问题复杂化了
为了正确关闭池,包括在最终驱动它们完成之前等待在complete_on_close
的通道上产生的已产生的协程,必须维护一个原子监视器,该监视器统计所有来自complete_on_close
的通道的外部阻塞协程。这个原子整数必须按顺序递增和递减,即在外部任务阻止池关闭的时间内。这个细节使处理运行任务的所有权变得异常复杂。
为了解决这个问题,我们引入了两个原子数据,每个任务都有一个
Atomic<bool>
spawn_counted- 与
RunningTask
绑定
- 与
Atomic<RunStatus>
status- 在运行的任务和通知回调之间共享
- 3种可能的状态
未请求但将被处理
已请求但将被处理
未请求且不会被处理
当一个RunningTask
开始运行时,它会被移动到堆上,并转换为原始指针。我们的算法负责处理其借用和生命周期语义。
这些原子数据的确切使用方式最好通过代码来表示,而为什么需要这段代码的含义最好通过修改代码并尝试运行自动化测试来表示。这段代码位于pool
模块文件中的run
子模块内。
性能
这个crate的单元测试包含了各种池设置的性能测试,其结果可以通过在运行测试时将rust日志级别设置为INFO
或更高来查看。
其中一项非常不科学的测试执行了1,000批次的1,000个原子作用域操作(以消除引用计数成本)。这个在发布模式下、在Windows 10超薄笔记本电脑上运行的测试,平均操作时间为
824纳秒
依赖项
~1–1.5MB
~23K SLoC