55次发布

0.3.5 2021年8月21日
0.3.4 2021年8月21日
0.2.8 2021年8月12日
0.1.40 2021年7月20日
0.1.36 2018年7月24日

#250 in Rust模式

Download history 229/week @ 2024-03-14 142/week @ 2024-03-21 700/week @ 2024-03-28 521/week @ 2024-04-04 543/week @ 2024-04-11 452/week @ 2024-04-18 663/week @ 2024-04-25 511/week @ 2024-05-02 732/week @ 2024-05-09 705/week @ 2024-05-16 475/week @ 2024-05-23 509/week @ 2024-05-30 602/week @ 2024-06-06 636/week @ 2024-06-13 362/week @ 2024-06-20 9/week @ 2024-06-27

1,715 每月下载量
6 crates 中使用

MIT 许可证

100KB
2.5K SLoC

fpRust

tag Crates.io Travis CI Build Status docs

license stars forks

为Rust提供Monad和函数式编程特性

为什么

我喜欢函数式编程和Rx风格的编码。

但是在Rust中实现它们很困难,而且很少有库可以完成它们的一部分。

因此,我实现了fpRust。希望你会喜欢它 :)

特性

  • MonadIO,类似Rx的 (fp_rust::monadio::MonadIO)

    • map/fmap/subscribe
    • 异步/同步
    • 支持 Future (使用 *feature: for_futures 特性)
  • 发布者 (fp_rust::publisher::Publisher)

    • 支持 Stream 实现(使用 *feature: for_futures 特性)
  • Fp函数 (fp_rust::fp)

    • compose!(), pipe!()
    • map!(), reduce!(), filter!(), foldl!(), foldr!()
    • contains!(), reverse!()
  • 异步 (fp_rust::sync & fp_rust::handler::HandlerThread)

    • 简单的BlockingQueue(受Java BlockingQueue启发,由内置 std::sync::mpsc::channel 实现)
    • HandlerThread(受Android Handler启发,由内置 std::thread 实现)
    • WillAsync(受Java Future启发)
      • 支持作为 Future (使用 *feature: for_futures 特性)
    • CountDownLatch(受Java CountDownLatch启发,由内置 std::sync::Mutex 实现)
      • 支持作为 Future (使用 *feature: for_futures 特性)
  • Cor (fp_rust::cor::Cor)

    • 类似PythonicGenerator的Coroutine
    • yield/yieldFrom
    • 异步/同步
  • Actor (fp_rust::actor::ActorAsync)

    • 纯净简单的 Actor 模型(receive/send/spawn
    • Context 用于保持内部状态
    • 能够与父/子 Actor 通信
  • DoNotation(《fp_rust::cor::Cor》)

    • Haskell DoNotation 类似,

* 模式匹配

用法

MonadIO(类似于 RxObserver)

示例


extern crate fp_rust;

use std::{
    thread,
    time,
    sync::{
        Arc,
        Mutex,
        Condvar,
    }
};

use fp_rust::handler::{
    Handler,
    HandlerThread,
};
use fp_rust::common::SubscriptionFunc;
use fp_rust::monadio::{
    MonadIO,
    of,
};
use fp_rust::sync::CountDownLatch;

// fmap & map (sync)
let mut _subscription = Arc::new(SubscriptionFunc::new(move |x: Arc<u16>| {
    println!("monadio_sync {:?}", x); // monadio_sync 36
    assert_eq!(36, *Arc::make_mut(&mut x.clone()));
}));
let subscription = _subscription.clone();
let monadio_sync = MonadIO::just(1)
    .fmap(|x| MonadIO::new(move || x * 4))
    .map(|x| x * 3)
    .map(|x| x * 3);
monadio_sync.subscribe(subscription);

// fmap & map (async)
let mut _handler_observe_on = HandlerThread::new_with_mutex();
let mut _handler_subscribe_on = HandlerThread::new_with_mutex();
let monadio_async = MonadIO::new_with_handlers(
    || {
        println!("In string");
        String::from("ok")
    },
    Some(_handler_observe_on.clone()),
    Some(_handler_subscribe_on.clone()),
);

let latch = CountDownLatch::new(1);
let latch2 = latch.clone();

thread::sleep(time::Duration::from_millis(1));

let subscription = Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
    println!("monadio_async {:?}", x); // monadio_async ok

    latch2.countdown(); // Unlock here
}));
monadio_async.subscribe(subscription);
monadio_async.subscribe(Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
    println!("monadio_async sub2 {:?}", x); // monadio_async sub2 ok
})));
{
    let mut handler_observe_on = _handler_observe_on.lock().unwrap();
    let mut handler_subscribe_on = _handler_subscribe_on.lock().unwrap();

    println!("hh2");
    handler_observe_on.start();
    handler_subscribe_on.start();
    println!("hh2 running");

    handler_observe_on.post(RawFunc::new(move || {}));
    handler_observe_on.post(RawFunc::new(move || {}));
    handler_observe_on.post(RawFunc::new(move || {}));
    handler_observe_on.post(RawFunc::new(move || {}));
    handler_observe_on.post(RawFunc::new(move || {}));
}
thread::sleep(time::Duration::from_millis(1));

// Waiting for being unlcoked
latch.clone().wait();

发布者(类似于 PubSub)

示例


extern crate fp_rust;

use fp_rust::common::{SubscriptionFunc, RawFunc};
use fp_rust::handler::{Handler, HandlerThread};
use fp_rust::publisher::Publisher;
use std::sync::Arc;

use fp_rust::sync::CountDownLatch;

let mut pub1 = Publisher::new();
pub1.subscribe_fn(|x: Arc<u16>| {
    println!("pub1 {:?}", x);
    assert_eq!(9, *Arc::make_mut(&mut x.clone()));
});
pub1.publish(9);

let mut _h = HandlerThread::new_with_mutex();

let mut pub2 = Publisher::new_with_handlers(Some(_h.clone()));

let latch = CountDownLatch::new(1);
let latch2 = latch.clone();

let s = Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
    println!("pub2-s1 I got {:?}", x);

    latch2.countdown();
}));
pub2.subscribe(s.clone());
pub2.map(move |x: Arc<String>| {
    println!("pub2-s2 I got {:?}", x);
});

{
    let h = &mut _h.lock().unwrap();

    println!("hh2");
    h.start();
    println!("hh2 running");

    h.post(RawFunc::new(move || {}));
    h.post(RawFunc::new(move || {}));
    h.post(RawFunc::new(move || {}));
    h.post(RawFunc::new(move || {}));
    h.post(RawFunc::new(move || {}));
}

pub2.publish(String::from("OKOK"));
pub2.publish(String::from("OKOK2"));

pub2.unsubscribe(s.clone());

pub2.publish(String::from("OKOK3"));

latch.clone().wait();

Cor(类似于 PythonicGenerator)

示例


#[macro_use]
extern crate fp_rust;

use std::time;
use std::thread;

use fp_rust::cor::Cor;

println!("test_cor_new");

let _cor1 = cor_newmutex!(
    |this| {
        println!("cor1 started");

        let s = cor_yield!(this, Some(String::from("given_to_outside")));
        println!("cor1 {:?}", s);
    },
    String,
    i16
);
let cor1 = _cor1.clone();

let _cor2 = cor_newmutex!(
    move |this| {
        println!("cor2 started");

        println!("cor2 yield_from before");

        let s = cor_yield_from!(this, cor1, Some(3));
        println!("cor2 {:?}", s);
    },
    i16,
    i16
);

{
    let cor1 = _cor1.clone();
    cor1.lock().unwrap().set_async(true); // NOTE Cor default async
                                          // NOTE cor1 should keep async to avoid deadlock waiting.(waiting for each other)
}
{
    let cor2 = _cor2.clone();
    cor2.lock().unwrap().set_async(false);
    // NOTE cor2 is the entry point, so it could be sync without any deadlock.
}
cor_start!(_cor1);
cor_start!(_cor2);

thread::sleep(time::Duration::from_millis(1));

Do Notation(类似于 Haskell DoNotation)

示例


#[macro_use]
extern crate fp_rust;

use std::time;
use std::thread;

use fp_rust::cor::Cor;


let v = Arc::new(Mutex::new(String::from("")));

let _v = v.clone();
do_m!(move |this| {
    println!("test_cor_do_m started");

    let cor_inner1 = cor_newmutex_and_start!(
        |this| {
            let s = cor_yield!(this, Some(String::from("1")));
            println!("cor_inner1 {:?}", s);
        },
        String,
        i16
    );
    let cor_inner2 = cor_newmutex_and_start!(
        |this| {
            let s = cor_yield!(this, Some(String::from("2")));
            println!("cor_inner2 {:?}", s);
        },
        String,
        i16
    );
    let cor_inner3 = cor_newmutex_and_start!(
        |this| {
            let s = cor_yield!(this, Some(String::from("3")));
            println!("cor_inner3 {:?}", s);
        },
        String,
        i16
    );

    {
        (*_v.lock().unwrap()) = [
            cor_yield_from!(this, cor_inner1, Some(1)).unwrap(),
            cor_yield_from!(this, cor_inner2, Some(2)).unwrap(),
            cor_yield_from!(this, cor_inner3, Some(3)).unwrap(),
        ].join("");
    }
});

let _v = v.clone();

{
    assert_eq!("123", *_v.lock().unwrap());
}

Fp 函数(Compose,Pipe,Map,Reduce,Filter)

示例

#[macro_use]
extern crate fp_rust

use fp_rust::fp::{
  compose_two,
  map, reduce, filter,
};

let add = |x| x + 2;
let multiply = |x| x * 3;
let divide = |x| x / 2;

let result = (compose!(add, multiply, divide))(10);
assert_eq!(17, result);
println!("Composed FnOnce Result is {}", result);

let result = (pipe!(add, multiply, divide))(10);
assert_eq!(18, result);
println!("Piped FnOnce Result is {}", result);

let result = (compose!(reduce!(|a, b| a * b), filter!(|x| *x < 6), map!(|x| x * 2)))(vec![1, 2, 3, 4]);
assert_eq!(Some(8), result);
println!("test_map_reduce_filter Result is {:?}", result);

Actor

Actor 常用(send/receive/spawn/状态)

示例

use std::time::Duration;

use fp_rust::common::LinkedListAsync;

#[derive(Clone, Debug)]
enum Value {
    // Str(String),
    Int(i32),
    VecStr(Vec<String>),
    Spawn,
    Shutdown,
}

let result_i32 = LinkedListAsync::<i32>::new();
let result_i32_thread = result_i32.clone();
let result_string = LinkedListAsync::<Vec<String>>::new();
let result_string_thread = result_string.clone();
let mut root = ActorAsync::new(
    move |this: &mut ActorAsync<_, _>, msg: Value, context: &mut HashMap<String, Value>| {
        match msg {
            Value::Spawn => {
                println!("Actor Spawn");
                let result_i32_thread = result_i32_thread.clone();
                let spawned = this.spawn_with_handle(Box::new(
                    move |this: &mut ActorAsync<_, _>, msg: Value, _| {
                        match msg {
                            Value::Int(v) => {
                                println!("Actor Child Int");
                                result_i32_thread.push_back(v * 10);
                            }
                            Value::Shutdown => {
                                println!("Actor Child Shutdown");
                                this.stop();
                            }
                            _ => {}
                        };
                    },
                ));
                let list = context.get("children_ids").cloned();
                let mut list = match list {
                    Some(Value::VecStr(list)) => list,
                    _ => Vec::new(),
                };
                list.push(spawned.get_id());
                context.insert("children_ids".into(), Value::VecStr(list));
            }
            Value::Shutdown => {
                println!("Actor Shutdown");
                if let Some(Value::VecStr(ids)) = context.get("children_ids") {
                    result_string_thread.push_back(ids.clone());
                }

                this.for_each_child(move |id, handle| {
                    println!("Actor Shutdown id {:?}", id);
                    handle.send(Value::Shutdown);
                });
                this.stop();
            }
            Value::Int(v) => {
                println!("Actor Int");
                if let Some(Value::VecStr(ids)) = context.get("children_ids") {
                    for id in ids {
                        println!("Actor Int id {:?}", id);
                        if let Some(mut handle) = this.get_handle_child(id) {
                            handle.send(Value::Int(v));
                        }
                    }
                }
            }
            _ => {}
        }
    },
);

let mut root_handle = root.get_handle();
root.start();

// One child
root_handle.send(Value::Spawn);
root_handle.send(Value::Int(10));
// Two children
root_handle.send(Value::Spawn);
root_handle.send(Value::Int(20));
// Three children
root_handle.send(Value::Spawn);
root_handle.send(Value::Int(30));

// Send Shutdown
root_handle.send(Value::Shutdown);

thread::sleep(Duration::from_millis(1));
// 3 children Actors
assert_eq!(3, result_string.pop_front().unwrap().len());

let mut v = Vec::<Option<i32>>::new();
for _ in 1..7 {
    let i = result_i32.pop_front();
    println!("Actor {:?}", i);
    v.push(i);
}
v.sort();
assert_eq!(
    [
        Some(100),
        Some(200),
        Some(200),
        Some(300),
        Some(300),
        Some(300)
    ],
    v.as_slice()
)

Actor Ask(受 Akka/Erlang 启发)

示例

use std::time::Duration;

use fp_rust::common::LinkedListAsync;

#[derive(Clone, Debug)]
enum Value {
    AskIntByLinkedListAsync((i32, LinkedListAsync<i32>)),
    AskIntByBlockingQueue((i32, BlockingQueue<i32>)),
}

let mut root = ActorAsync::new(
    move |_: &mut ActorAsync<_, _>, msg: Value, _: &mut HashMap<String, Value>| match msg {
        Value::AskIntByLinkedListAsync(v) => {
            println!("Actor AskIntByLinkedListAsync");
            v.1.push_back(v.0 * 10);
        }
        Value::AskIntByBlockingQueue(mut v) => {
            println!("Actor AskIntByBlockingQueue");

            // NOTE If negative, hanging for testing timeout
            if v.0 < 0 {
                return;
            }

            // NOTE General Cases
            v.1.offer(v.0 * 10);
        } // _ => {}
    },
);

let mut root_handle = root.get_handle();
root.start();

// LinkedListAsync<i32>
let result_i32 = LinkedListAsync::<i32>::new();
root_handle.send(Value::AskIntByLinkedListAsync((1, result_i32.clone())));
root_handle.send(Value::AskIntByLinkedListAsync((2, result_i32.clone())));
root_handle.send(Value::AskIntByLinkedListAsync((3, result_i32.clone())));
thread::sleep(Duration::from_millis(1));
let i = result_i32.pop_front();
assert_eq!(Some(10), i);
let i = result_i32.pop_front();
assert_eq!(Some(20), i);
let i = result_i32.pop_front();
assert_eq!(Some(30), i);

// BlockingQueue<i32>
let mut result_i32 = BlockingQueue::<i32>::new();
result_i32.timeout = Some(Duration::from_millis(1));
root_handle.send(Value::AskIntByBlockingQueue((4, result_i32.clone())));
root_handle.send(Value::AskIntByBlockingQueue((5, result_i32.clone())));
root_handle.send(Value::AskIntByBlockingQueue((6, result_i32.clone())));
thread::sleep(Duration::from_millis(1));
let i = result_i32.take();
assert_eq!(Some(40), i);
let i = result_i32.take();
assert_eq!(Some(50), i);
let i = result_i32.take();
assert_eq!(Some(60), i);

// Timeout case:
root_handle.send(Value::AskIntByBlockingQueue((-1, result_i32.clone())));
let i = result_i32.take();
assert_eq!(None, i);

依赖关系

~0–285KB