1个不稳定版本
0.7.3 | 2021年2月18日 |
---|---|
0.7.2 |
|
0.7.1 |
|
0.7.0 |
|
#976 在 并发
每月22次下载
在 crispy_octo_fractals 中使用
64KB
625 行
kik_sync_service
用于执行自定义多线程操作的同步Rust库。
这是一个仅为同步操作创建两个通道和几个工作者的crate。关键是消息是由用户构建的。创建一个结构体来作为您恢复的数据(MessageData),一个结构体来作为您需要的输入(MessageInput),并将它们包含在您将在线程之间共享的消息中(Message)。
之后,您只需要添加一个输入向量,并通过for循环获取结果。重复所需的次数。
这种方法不需要太多的内存使用,并允许用户尽可能多地使用CPU。
可以使用每个DeliveryService通道的ChannelConfig参数更好地配置线程。
如何使用
用户需要实现的特正在这里
use std::marker::{Send, Sync};
/// MessageData holds the resource type that will be returned
/// by the worker-threads. Must implement Sync, Send, Clone
/// and have lifetime 'static.
pub trait MessageData: Sync + Send + Clone + 'static{
fn new() -> Self;
}
// This is the trait input that can only be applied to objects
// with MessageData trait
/// MessageInput will have the input arguments for generating
/// each MessageData. Must implement Sync, Send, Clone and have
/// lifetime 'static.
pub trait MessageInput<T> : Sync + Send + Clone + 'static where T: MessageData
{
fn new() -> Self;
}
// This is the Message Trait that holds the data and the
// value type that changes it
/// Message has the tools to generate each MessageData T,
/// based on each MessageInput R. Must implement Sync,
/// Send, Clone and have lifetime 'static.
pub trait Message<T, R> : Sync + Send + Clone + 'static where
R: MessageInput<T>,
T: MessageData,
{
/// Behavior for storing a given input MessageInput,
/// before a worker can use it for generating MessageData.
/// Used by kik_feeder.
fn set_input(&mut self, message_input: R);
/// Workers will call this to use the stored
/// MessageInput (R<T>) to generate and replace
/// the existing MessageData stored. Used by kik_worker.
fn work(&mut self);
/// This will call MessageInput::new() method. No
/// need to implement this. Used by kik_feeder.
fn new_message_input() -> R{
R::new()
}
/// This will call MessageData::new() method.
/// No need to implement this. Used by kik_feeder.
fn new_message_data() -> T{
T::new()
}
/// This method is used when retrieving MessageData
/// for the iterator. Clone the MessageData stored
/// and return it. Used by kik_feeder.
fn clone_message_data(&self) -> T;
/// Construct a new message with default values.
/// Used by kik_feeder.
fn new() -> Self;
}
在实现类型之后。通过调用类似以下内容创建一个新的通道:
let delivery_service = DeliveryService::default(),
通过调用以下内容为输入向量提供数据:
delivery_service.feed_feeder(input_vec: &mut Vec)
并通过迭代&mut delivery_service来获取您实现的MessageData。
设置所有类型大约需要100行代码。但一旦设置好类型,只需要5-10行代码即可工作。
大型示例
以下是使用此crate的示例
#[cfg(test)]
mod tests{
// replace crate for kik_sync_channel
use crate::message::{Message, MessageData, MessageInput};
use crate::channel::{DeliveryService};
// What type of data should be returned.
pub struct MessageArray{
data: [u32; 1024],
}
impl Clone for MessageArray{
fn clone(&self) -> Self{
let mut new_array: [u32; 1024] = [0; 1024];
for i in 0..1024{
new_array[i] = self.data[i];
}
MessageArray{
data: new_array,
}
}
}
// with this trait it can be used as data for a Message.
impl MessageData for MessageArray{
fn new() -> Self{
MessageArray{
data: [0; 1024],
}
}
}
impl MessageArray{
pub fn get(&mut self) -> &mut [u32; 1024]{
&mut self.data
}
}
// What kind of input it needs.
pub struct Coordinates{
pub x0: usize,
pub y0: usize,
pub x1: usize,
pub y1: usize,
}
// Doesn't need to implement Copy, but needs Clone.
impl Clone for Coordinates{
fn clone(&self) -> Self{
Coordinates{
x0: self.x0,
y0: self.y0,
x1: self.x1,
y1: self.y1,
}
}
}
// This implementation tells the compiler that this object can be
// used as input for the worker threads, and it can only work with MessageArray.
impl MessageInput<MessageArray> for Coordinates{
fn new() -> Self{
Coordinates{
x0: 0,
y0: 0,
x1: 0,
y1: 0,
}
}
}
// This is the message that holds both the data and input.
// Feel free to add anything else you might need to work with it.
pub struct ThreadMessage{
pub array: MessageArray,
pub current_input: Coordinates,
}
impl Clone for ThreadMessage{
fn clone(&self) -> Self{
ThreadMessage{
array: self.array.clone(),
current_input: self.current_input.clone(),
}
}
}
// ThreadMessage uses MessageArray as data,
// ThreadMessage uses Coordinates as input to change the data.
impl Message<MessageArray, Coordinates> for ThreadMessage {
fn set_input(&mut self, message_input: Coordinates){
self.current_input = message_input.clone();
}
fn work (&mut self){
let (x0, y0, x1, y1) = (
self.current_input.x0,
self.current_input.y0,
self.current_input.x1,
self.current_input.y1,
);
let array = self.array.get();
let mut counter: usize = 0;
// Not very creative right now, each operation will count from 0 to 1024.
// This is just to show that the results are being made and returned. I'll use it to generate fractals in the next project.
// I'm grateful for anyone offering a better example.
// Counting in the first line will go like 0 1 2 3 ... 30 31 1 2 3 ...
// Counting in the second line will go like 32 33 34 35 ... 59 60 61 62 63 ...
// And so forth until 1023 in the last line.
for _y in (y0)..(y1){
for _x in (x0)..(x1){
let value = counter;
array[counter] = value as u32;
counter = counter + 1;
}
}
}
fn clone_message_data(&self) -> MessageArray{
self.array.clone()
}
fn new() -> Self{
let new_data = MessageArray::new();
let new_input = Coordinates::new();
ThreadMessage{
current_input: new_input,
array: new_data,
}
}
}
// Finally, Now that all the data structure is set, time to use the channel.
#[test]
fn test(){
let width: usize = 1024;
let height: usize = 768;
let mut coordinates: Vec<Coordinates> = Vec::with_capacity((height as f32/32.0 * width as f32/32.0)as usize);
assert_eq!(width % 32, 0);
assert_eq!(height % 32, 0);
// Creating a vec of coordinates to use as input.
// for y in 0..24
for y in 0..(((height as f32)/32.0) as usize){
// for x in 0..32
for x in 0..(((width as f32)/32.0) as usize){
let (x0, y0) = (32 * x, 32 * y);
coordinates.push(Coordinates{x0: x0, y0: y0, x1: x0 + 32, y1: y0 + 32});
}
}
// Personal Note:
// create a vec of inputs
// create channel
// send the vec of inputs
// iterate through the channel
// print the resulting values
//data is MessageArray
//input is Coordinates
//message is ThreadMessage
// Creating a channel that uses MessageArray as MessageData, Coordinates as MessageInput, ThreadMessage as Message. Default config values have been used.
let mut kiki_channel: DeliveryService<MessageArray,Coordinates,ThreadMessage> = DeliveryService::default();
kiki_channel.feed_feeder(&mut coordinates);
let mut counter = 0;
// Need to iterate through a mutable reference of kiki_channel to maintain ownership of it.
for mut i in &mut kiki_channel{
let mut highest: u32 = 0;
let message_array = i.get();
for j in message_array{
if highest < *j {
highest = *j;
}
}
// All the highest values for each line will be 31, 63, n * 32 -1, ...
assert_eq!(highest % 32, 31);
println!("Total line {}: {}", counter, highest);
counter += 1;
}
// Creating another vec to feed the structure again.
// for y in 0..24
for y in 0..(((height as f32)/32.0) as usize){
// for x in 0..32
for x in 0..(((width as f32)/32.0) as usize){
let (x0, y0) = (32 * x, 32 * y);
coordinates.push(Coordinates{x0: x0, y0: y0, x1: x0 + 32, y1: y0 + 32});
}
}
// You can feed more input values after emptying the results from last run.
kiki_channel.feed_feeder(&mut coordinates);
let mut counter = 0;
// The worker threads and feeder will only be closed when channel goes out of scope (unless they panic).
// Need to iterate through a mutable reference of kiki_channel to maintain ownership of it.
for mut i in &mut kiki_channel{
let mut highest: u32 = 0;
let message_array = i.get();
for j in message_array{
if highest < *j {
highest = *j;
}
}
// Used this when I was testing as fn main
// if counter % 13 == 0{
// println!("Total linha {}: {}", counter, total);
// }
// All the highest values for each line will be 31, 63, n * 32 -1, ...
assert_eq!(highest % 32, 31);
println!("Total line {}: {}", counter, highest);
counter += 1;
}
}
}
贡献
有一些部分请求帮助贡献。此crate的主功能在我的测试中运行正常,但还有许多需要改进的地方。详细信息请参考网络文档: https://on0n0k1.github.io/Projects/Rust%20crates/kik_sync_service/doc/kik_sync_service/index.html