#thread #sync #mpsc #buffer #stream #multi-threading

kik_sync_service

使用所需数据生成所需结果的同步线程工作通道

1个不稳定版本

0.7.3 2021年2月18日
0.7.2 2021年2月18日
0.7.1 2021年2月18日
0.7.0 2021年2月18日

#976并发

每月22次下载
crispy_octo_fractals 中使用

MIT 许可证

64KB
625

kik_sync_service

用于执行自定义多线程操作的同步Rust库。

这是一个仅为同步操作创建两个通道和几个工作者的crate。关键是消息是由用户构建的。创建一个结构体来作为您恢复的数据(MessageData),一个结构体来作为您需要的输入(MessageInput),并将它们包含在您将在线程之间共享的消息中(Message)。

之后,您只需要添加一个输入向量,并通过for循环获取结果。重复所需的次数。

这种方法不需要太多的内存使用,并允许用户尽可能多地使用CPU。

可以使用每个DeliveryService通道的ChannelConfig参数更好地配置线程。

如何使用

用户需要实现的特正在这里

use std::marker::{Send, Sync};

/// MessageData holds the resource type that will be returned 
/// by the worker-threads. Must implement Sync, Send, Clone 
/// and have lifetime 'static.
pub trait MessageData: Sync + Send + Clone + 'static{
    fn new() -> Self;
}

// This is the trait input that can only be applied to objects 
// with MessageData trait
/// MessageInput will have the input arguments for generating 
/// each MessageData. Must implement Sync, Send, Clone and have 
/// lifetime 'static.
pub trait MessageInput<T> : Sync + Send + Clone + 'static where T: MessageData
{
    fn new() -> Self;
}

// This is the Message Trait that holds the data and the 
// value type that changes it
/// Message has the tools to generate each MessageData T, 
/// based on each MessageInput R. Must implement Sync, 
/// Send, Clone and have lifetime 'static.
pub trait Message<T, R> : Sync + Send + Clone + 'static where
R: MessageInput<T>,
T: MessageData,
{
    /// Behavior for storing a given input MessageInput, 
    /// before a worker can use it for generating MessageData. 
    /// Used by kik_feeder.
    fn set_input(&mut self, message_input: R);

    /// Workers will call this to use the stored 
    /// MessageInput (R<T>) to generate and replace 
    /// the existing MessageData stored. Used by kik_worker.
    fn work(&mut self);

    /// This will call MessageInput::new() method. No 
    /// need to implement this. Used by kik_feeder.
    fn new_message_input() -> R{
        R::new()
    }

    /// This will call MessageData::new() method. 
    /// No need to implement this. Used by kik_feeder.
    fn new_message_data() -> T{
        T::new()
    }

    /// This method is used when retrieving MessageData 
    /// for the iterator. Clone the MessageData stored 
    /// and return it. Used by kik_feeder.
    fn clone_message_data(&self) -> T;
    
    /// Construct a new message with default values. 
    /// Used by kik_feeder.
    fn new() -> Self;

}

在实现类型之后。通过调用类似以下内容创建一个新的通道:

let delivery_service = DeliveryService::default(),

通过调用以下内容为输入向量提供数据:

delivery_service.feed_feeder(input_vec: &mut Vec)

并通过迭代&mut delivery_service来获取您实现的MessageData

设置所有类型大约需要100行代码。但一旦设置好类型,只需要5-10行代码即可工作。

大型示例

以下是使用此crate的示例

#[cfg(test)]
mod tests{
    // replace crate for kik_sync_channel
    use crate::message::{Message, MessageData, MessageInput};
    use crate::channel::{DeliveryService};
    // What type of data should be returned.
    pub struct MessageArray{
        data: [u32; 1024],
    }

    impl Clone for MessageArray{
        fn clone(&self) -> Self{
            let mut new_array: [u32; 1024] = [0; 1024];
            for i in 0..1024{
                new_array[i] = self.data[i];
            }
            MessageArray{
                data: new_array,
            }
        }
    }

    // with this trait it can be used as data for a Message.
    impl MessageData for MessageArray{
        fn new() -> Self{
            MessageArray{
                data: [0; 1024],
            }
        }
    }

    impl MessageArray{
        pub fn get(&mut self) -> &mut [u32; 1024]{
            &mut self.data
        }
    }


    // What kind of input it needs.
    pub struct Coordinates{
        pub x0: usize,
        pub y0: usize,
        pub x1: usize,
        pub y1: usize,
    }

    // Doesn't need to implement Copy, but needs Clone.
    impl Clone for Coordinates{
        fn clone(&self) -> Self{
            Coordinates{
                x0: self.x0,
                y0: self.y0,
                x1: self.x1,
                y1: self.y1,
            }
        }
    }

    // This implementation tells the compiler that this object can be 
    // used as input for the worker threads, and it can only work with MessageArray.
    impl MessageInput<MessageArray> for Coordinates{
        fn new() -> Self{
            Coordinates{
                x0: 0,
                y0: 0,
                x1: 0,
                y1: 0,
            }
        }
    }


    // This is the message that holds both the data and input. 
    // Feel free to add anything else you might need to work with it.
    pub struct ThreadMessage{
        pub array: MessageArray,
        pub current_input: Coordinates,
    }

    impl Clone for ThreadMessage{
        fn clone(&self) -> Self{
            ThreadMessage{
                array: self.array.clone(),
                current_input: self.current_input.clone(),
            }
        }
    }

    // ThreadMessage uses MessageArray as data,
    // ThreadMessage uses Coordinates as input to change the data.
    impl Message<MessageArray, Coordinates> for ThreadMessage {

        fn set_input(&mut self, message_input: Coordinates){
            self.current_input = message_input.clone();
        }

        fn work (&mut self){
            let (x0, y0, x1, y1) = (
                self.current_input.x0,
                self.current_input.y0,
                self.current_input.x1,
                self.current_input.y1,
            );

            let array = self.array.get();
            let mut counter: usize = 0;

            // Not very creative right now, each operation will count from 0 to 1024.
            // This is just to show that the results are being made and returned. I'll use it to generate fractals in the next project.
            // I'm grateful for anyone offering a better example.
            // Counting in the first line will go like 0 1 2 3 ... 30 31 1 2 3 ...
            // Counting in the second line will go like 32 33 34 35 ... 59 60 61 62 63 ...
            // And so forth until 1023 in the last line.
            for _y in (y0)..(y1){
                for _x in (x0)..(x1){
                    let value = counter;
                    array[counter] = value as u32;

                    counter = counter + 1;
                }
            }
        }

        fn clone_message_data(&self) -> MessageArray{
            self.array.clone()
        }

        fn new() -> Self{
            let new_data = MessageArray::new();
            let new_input = Coordinates::new();

            ThreadMessage{
                current_input: new_input,
                array: new_data,
            }
        }
        
    }

    // Finally, Now that all the data structure is set, time to use the channel.
    #[test]
    fn test(){
        let width: usize = 1024;
        let height: usize = 768;
        let mut coordinates: Vec<Coordinates> = Vec::with_capacity((height as f32/32.0 * width as f32/32.0)as usize);
        assert_eq!(width % 32, 0);
        assert_eq!(height % 32, 0);

        // Creating a vec of coordinates to use as input.
        // for y in 0..24
        for y in 0..(((height as f32)/32.0) as usize){
            // for x in 0..32
            for x in 0..(((width as f32)/32.0) as usize){
                let (x0, y0) = (32 * x, 32 * y);
                coordinates.push(Coordinates{x0: x0, y0: y0, x1: x0 + 32, y1: y0 + 32});
            }
        }
        // Personal Note:
        // create a vec of inputs
        // create channel
        // send the vec of inputs
        // iterate through the channel
        // print the resulting values

        //data is MessageArray
        //input is Coordinates
        //message is ThreadMessage

        // Creating a channel that uses MessageArray as MessageData, Coordinates as MessageInput, ThreadMessage as Message. Default config values have been used.
        let mut kiki_channel: DeliveryService<MessageArray,Coordinates,ThreadMessage> = DeliveryService::default();
        kiki_channel.feed_feeder(&mut coordinates);

        let mut counter = 0;
        // Need to iterate through a mutable reference of kiki_channel to maintain ownership of it.
        for mut i in &mut kiki_channel{
            let mut highest: u32 = 0;
            let message_array = i.get();
            for j in message_array{
                if highest < *j {
                    highest = *j;
                }
            }
            // All the highest values for each line will be 31, 63, n * 32 -1, ...
            assert_eq!(highest % 32, 31);
            println!("Total line {}: {}", counter, highest);
            counter += 1;
        }

        // Creating another vec to feed the structure again.
        // for y in 0..24
        for y in 0..(((height as f32)/32.0) as usize){
            // for x in 0..32
            for x in 0..(((width as f32)/32.0) as usize){
                let (x0, y0) = (32 * x, 32 * y);
                coordinates.push(Coordinates{x0: x0, y0: y0, x1: x0 + 32, y1: y0 + 32});
            }
        }
        
        // You can feed more input values after emptying the results from last run.
        kiki_channel.feed_feeder(&mut coordinates);

        let mut counter = 0;
        // The worker threads and feeder will only be closed when channel goes out of scope (unless they panic).
        // Need to iterate through a mutable reference of kiki_channel to maintain ownership of it.
        for mut i in &mut kiki_channel{
            let mut highest: u32 = 0;
            let message_array = i.get();
            for j in message_array{
                if highest < *j {
                    highest = *j;
                }
            }
            // Used this when I was testing as fn main
            // if counter % 13 == 0{
            //     println!("Total linha {}: {}", counter, total);
            // }

            // All the highest values for each line will be 31, 63, n * 32 -1, ...
            assert_eq!(highest % 32, 31);
            println!("Total line {}: {}", counter, highest);
            counter += 1;
        }
    }
}

贡献

有一些部分请求帮助贡献。此crate的主功能在我的测试中运行正常,但还有许多需要改进的地方。详细信息请参考网络文档: https://on0n0k1.github.io/Projects/Rust%20crates/kik_sync_service/doc/kik_sync_service/index.html

没有运行时依赖项