#println #write #io #io-stream #stream #message-queue

comfy-print

println!及其朋友的非panic版本。异步实现。

2个不稳定版本

0.3.0 2024年1月3日
0.2.1 2023年12月30日
0.1.4 2023年12月29日

调试中排名第228

Download history

每月下载量59

MIT许可MIT

660KB
951

本crate提供以下函数的非panic版本:

  • std::print!()
  • std::println!()
  • std::eprint!()
  • std::eprintln!()

Print!会panic???

惊人的是:是的:问题

这种情况非常罕见,你可能永远都不会看到它,你也可以通过使用这个crate来保证你不会看到它。


用法

第1步 - 依赖关系

comfy-print添加到项目的依赖项中。

第2步 - 替换宏调用

  • 将每个std::print!()调用替换为comfy_print::print!
  • 将每个std::println!()调用替换为comfy_print::println!
  • 将每个std::eprint!()调用替换为comfy_print::eprint!
  • 将每个std::eprintln!()调用替换为comfy_print::eprintln!

如果你熟悉正则表达式,你可以使用IDE的"在文件中替换"命令一次性完成所有操作。

默认快捷键通常是 Ctrl + Shift + RCtrl + Shift + H

以下是我使用的模式(使用Jetbrains Intellij IDE和Java的正则表达式)

  • 匹配:(?<!comfy_e?)(?<type>print!|println!|eprint!|eprintln!)
  • 替换:comfy_print::comfy_${type}

步骤 3 - 适应

步骤 4 - 实际上,您也可以通过全局变量配置此crate的行为,更多详细信息请查看配置模块。


优点

  • 无不安全代码。
  • 仍然是线程安全的。
  • 为了避免死锁,std(out/err)上的锁以快速连续的方式获取/释放。
  • 您仍然可以自由使用std::print!宏,它们与任何comfy_print宏都不冲突。
  • 弹性
    • 提供的宏在写入std(out/err)返回错误时不会“静默失败”。相反,此crate将请求的打印内容保留在线程安全的队列中,然后稍后再次尝试打印。
    • 队列确保打印将始终按请求的顺序交付。

缺点

  • 性能较差
    • 一些非常基本的合成基准测试表明,平均而言,此版本的性能与std::print!相似。
    • 此版本的性能变化比std::print!更高。

代码逻辑

存储打印的数据类型

文件:utils.rs
use std::fmt::{Display, Formatter};

#[derive(Debug, Copy, Clone)]
pub enum OutputKind {
    Stdout,
    Stderr,
}

pub struct Message {
    string: String,
    output: OutputKind,
}

impl Message {
    pub fn str(&self) -> &str {
        return self.string.as_str();
    }
    
    pub fn output_kind(&self) -> OutputKind {
        return self.output;
    }
    
    pub fn standard(string: String) -> Self {
        return Self {
            string,
            output: OutputKind::Stdout,
        };
    }
    
    pub fn error(string: String) -> Self {
        return Self {
            string,
            output: OutputKind::Stderr,
        };
    }
}

impl Display for Message {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        return write!(f, "{}", self.string);
    }
}

pub fn try_write(msg_str: &impl Display, output_kind: OutputKind) -> std::io::Result<()> {
    match output_kind {
        OutputKind::Stdout => {
            let mut stdout = std::io::stdout().lock();
            write!(stdout, "{}", msg_str)?;
            stdout.flush()?;
            Ok(())
        }
        OutputKind::Stderr => {
            let mut stderr = std::io::stderr().lock();
            write!(stderr, "{}", msg_str)?;
            stderr.flush()?;
            Ok(())
        }
    }
}

一个简单的结构体 Message,其中包含要打印的字符串和它应该打印到的地方 std(out/err)

  • get() 函数用于访问私有字段,以及一个
  • 为每个打印目标构造函数。
  • 为了方便格式化,我为 Message 实现了 Display 特性。

还有一个 pub fn try_write(msg_str: &impl Display, output_kind: OutputKind) 函数,它尝试将消息打印到所需的输出,如果失败则返回错误。


这些宏就像 std::prints 一样,作为我们实际代码的桥梁。

在文件:async_impl.rs
pub fn _print(input: String) {
    _comfy_print_async(Message::standard(input));
}

pub fn _println(mut input: String) {
    input.push('\n');
    _comfy_print_async(Message::standard(input));
}

pub fn _eprint(input: String) {
    _comfy_print_async(Message::error(input));
}

pub fn _eprintln(mut input: String) {
    input.push('\n');
    _comfy_print_async(Message::error(input));
}

#[macro_export]
macro_rules! comfy_print {
    ($($arg:tt)*) => {{
        $crate::async_impl::_print(std::format!($($arg)*));
    }};
}

#[macro_export]
macro_rules! comfy_println {
    () => {
        $crate::async_impl::_println("\n")
    };
    ($($arg:tt)*) => {{
        $crate::async_impl::_println(std::format!($($arg)*));
    }};
}

#[macro_export]
macro_rules! comfy_eprint {
    ($($arg:tt)*) => {{
        $crate::async_impl::_eprint(std::format!($($arg)*));
    }};
}

#[macro_export]
macro_rules! comfy_eprintln {
    () => {
        $crate::async_impl::_eprintln("\n")
    };
    ($($arg:tt)*) => {{
        $crate::async_impl::_eprintln(std::format!($($arg)*));
    }};
}

存储队列打印的收集类型

use parking_lot::FairMutex;
static QUEUE: FairMutex<Vec<Message>> = FairMutex::new(Vec::new());

队列由一个普通的 Vec<Message> 组成,它被 parking_lotFairMutex 包装。

普通的 Mutex 在解锁后,会无条件地将锁交给随后执行的线程,不管最初是哪个线程请求了锁。这种锁在我们的用例中存在问题,我们希望确保打印顺序与请求顺序完全一致。如果有多个线程正在等待锁,普通的 Mutex 不会关心哪个线程先请求,这可能导致打印顺序混乱。

另一方面,FairMutex 会让线程在请求锁时形成一个队列,确保第一个请求锁的线程先获得机会。


在尝试打印之前,我们需要检查队列中是否已有其他打印任务。如果有,我们不能立即打印 msg,因为这会破坏“请求打印 -> 交付打印”的顺序。

如果打印任务未能写入目标输出,它们将加入队列。

最后,所有 4 个宏最终都会调用 comfy_print::async_impl::_comfy_print_async(msg)

pub fn _comfy_print_async(msg: Message)
pub fn _comfy_print_async(msg: Message) {
    let mut queue_guard = QUEUE.lock();
    
    if queue_guard.len() == 0 {
        drop(queue_guard);
        write_first_in_line(msg);
    } else {
        queue_guard.push(msg);
        drop(queue_guard);
        check_thread();
    }
}

我们锁定队列并检查它是否为空。


当队列空时
if queue_guard.len() == 0 {
    drop(queue_guard);
    write_first_in_line(msg);
}

我们不需要等待其他线程,可以直接尝试打印。这在大多数情况下都会发生。

由于我们不再需要队列,我们立即释放它。避免同时拥有两个锁有助于避免某些死锁情况。

fn write_first_in_line(msg: Message)
fn write_first_in_line(msg: Message) {
    let msg_str: &str = msg.str();
    
    if let Err(err) = utils::try_write(&msg_str, msg.output_kind()) {
        let mut queue_guard = QUEUE.lock();
        queue_guard.insert(0, Message::error(format!(
            "comfy_print::blocking_write_first_in_line(): Failed to print first message in queue, it was pushed to the front again.\n\
            Error: {err}\n\
            Message: {msg_str}")));
        
        queue_guard.insert(1, msg);
    }
}

在这里,我们尝试写入目标输出。如果失败,我们将错误信息插入队列前面,然后是原始信息。

再次尝试不太可能产生结果,所以我们不应该做任何事情。

下次调用 comfy_print! 时我们会再次尝试。


当队列有元素时
} else {
    queue_guard.push(msg);
    drop(queue_guard);
    check_thread();
}

我们加入队列,然后检查是否有线程正在打印。

如果没有,我们将承担这个责任。

static ACTIVE_THREAD: FairMutex<Option<JoinHandle<()>>> = FairMutex::new(None);

我们使用 ACTIVE_THREAD 处理器来跟踪负责的线程。

fn check_thread()
fn check_thread() {
    let Some(mut thread_guard) = ACTIVE_THREAD.try_lock()
        else { return; };
    
    let is_printing = thread_guard.as_ref().is_some_and(|handle| !handle.is_finished());
    if is_printing { // We already pushed our msg to the queue and there's already a thread printing it, so we can return.
        return;
    }
    
    match thread::Builder::new().spawn(print_until_empty) {
        Ok(ok) => {
            *thread_guard = Some(ok);
            drop(thread_guard);
        },
        Err(err) => { // We couldn't create a thread, we'll have to block this one
            drop(thread_guard);
            
            let mut queue_guard = QUEUE.lock();
            queue_guard.insert(0, Message::error(format!(
                "comfy_print::queue_then_check_thread(): Failed to create a thread to print the queue.\n\
                Error: {err}.")));
            
            drop(queue_guard);
            print_until_empty();
        }
    }
}

这里有很多事情在进行,所以让我们将其分解为更小的步骤

let Some(mut thread_guard) = ACTIVE_THREAD.try_lock()
    else { return; };

let is_printing = thread_guard.as_ref().is_some_and(|handle| !handle.is_finished());
if is_printing { // We already pushed our msg to the queue and there's already a thread printing it, so we can return.
    return;
}

首先,通过尝试获取锁,我们执行一个非阻塞操作,这会告诉我们是否已有其他线程正在使用它。如果是这样,我们可以假设其他线程也即将开始打印队列。我们可以在这里停止。

如果我们成功获取了锁,我们可以检查那里是否有东西,并且如果里面的处理器属于已经完成的线程。

如果处理器存在且未完成,这意味着其他线程正在积极打印队列,因此我们可以返回。

match thread::Builder::new().spawn(print_until_empty) {

在这里,我们尝试创建一个新的线程,请求它执行函数 fn print_until_empty().

Ok(handle) => {
    *thread_guard = Some(handle);
    drop(thread_guard);
},

如果创建线程成功,我们将处理器插入我们的静态 Mutex 中,其他线程将检查它以确定它们是否可以承担打印责任。

像往常一样,我们立即释放我们持有的锁。

Err(err) => {
    drop(thread_guard);
    
    let mut queue_guard = QUEUE.lock();
    queue_guard.insert(0, Message::error(format!(
        "comfy_print::queue_then_check_thread(): Failed to create a thread to print the queue.\n\
        Error: {err}.")));
    
    drop(queue_guard);
}

如果由于任何原因创建线程失败,我们有一个新的错误信息要打印。

再次提醒,在获取队列的锁之前,我们释放引用处理器的锁,然后将在队列前面插入错误信息。

现在我们返回,希望用户再次调用打印,这将读取队列并尝试打印所有存储的消息。

fn print_until_empty() {
    const MAX_RETRIES: u8 = 50;
    let mut retries = 0;
    
    loop {
        let mut queue_guard = QUEUE.lock();
        
        if queue_guard.len() <= 0 {
            drop(queue_guard);
            break;
        }
        
        let msg = queue_guard.remove(0);
        let msg_str: &str = msg.str();
        let output_kind = msg.output_kind();
        drop(queue_guard); // unlock the queue before blocking stdout/err

        let write_result = utils::try_write(&msg_str, output_kind);
        
        if let Err(err) = write_result {
            let mut queue_guard = QUEUE.lock();
            queue_guard.insert(0, Message::error(format!(
                "comfy_print::write_until_empty(): Failed to print first message in queue.\n\
                Error: {err}\n\
                Message: {msg_str}\n\
                Target output: {output_kind:?}")));
            
            queue_guard.insert(1, msg);
            drop(queue_guard);
            
            retries += 1;
            if retries >= MAX_RETRIES {
                break;
            }
        }

        thread::yield_now();
    }
}

这是实际打印队列的函数,让我们将其分解为更小的步骤。

const MAX_RETRIES: u8 = 50;
let mut retries = 0;

首先,我们有一个任意整数定义了在打印操作失败时最大重试次数,以及用于计数尝试的局部整数 retries

let mut queue_guard = QUEUE.lock();

if queue_guard.len() <= 0 {
    drop(queue_guard);
    break;
}

在循环内部,我们锁定队列,然后如果队列为空则停止,因为这表示我们的任务已完成。

let msg = queue_guard.remove(0);
let msg_str: &str = msg.str();
let output_kind = msg.output_kind();
drop(queue_guard); // unlock the queue before blocking stdout/err

我们从队列中弹出前端的元素,然后立即释放锁。

在这里释放锁也确保我们不会同时持有两个锁,因为我们即将锁定输出流。

let write_result = utils::try_write(&msg_str, output_kind);

if let Err(err) = write_result {
    let mut queue_guard = QUEUE.lock();
    queue_guard.insert(0, Message::error(format!(
        "comfy_print::write_until_empty(): Failed to print first message in queue.\n\
        Error: {err}\n\
        Message: {msg_str}\n\
        Target output: {output_kind:?}")));
    
    queue_guard.insert(1, msg);
    drop(queue_guard);
    
    retries += 1;
    if retries >= MAX_RETRIES {
        break;
    }
}

thread::yield_now();

如果写入输出失败,我们将在队列前插入错误消息,然后是原始消息。

然而,由于我们保证不在主线程中,我们可以稍微延长打印责任,我们将尝试打印最多 MAX_RETRIES 次。

无论打印结果如何,在每个迭代结束时,我们都调用 thread::yield_now(); 这将给其他线程一个机会,希望它们能够解决输出流的错误,同时允许更多消息加入队列。


QA

为什么在那些会自动隐式调用而无需显式调用 drop(guard) 的实例上显式调用它?

A: 为了照顾我未来的自己:通过让它隐式执行,我依靠我未来的大脑阅读代码并找出锁定的守卫的精确顺序。

通过显式写入 drop(guard),我清楚地表明了锁的释放位置,因此我未来的大脑犯错误的机会更少。

这也使其他阅读代码的程序员更容易理解我的意图。


这有点过度设计了

A: 是的,我写这个crate是为了学习/练习Rust中的线程/并发。

我也非常讨厌看到 print! 调用引发panic。

依赖项

~0.4–5.5MB
~11K SLoC