#tor #dispatcher #requests #queue #async #network-programming

concurrent_tor

一个异步的多 tor 客户端实现,具有用户定义队列传递请求的调度器

1 个不稳定版本

0.1.0 2023年2月17日

#1780 in 异步

MIT/Apache

31KB
439

Concurrent tor

一个旨在同时运行多个 tor 实例的库。

用户定义的任务调度器允许不同的队列设计,例如优先队列、二叉堆等...

请参阅 /examples/basic.rs 了解使用此库的模板代码。

任务

您的任务实现。此库使用 enum_delegate enum_delegate 允许多态性。存在一个 request::Task trait,必须实现。此外,还可以添加更多 trait 以实现更多功能。例如

注意:如果您正在定义自己的 trait(如以下示例中的 MyExt),请添加 enum_delegate = { package = "temporary_enum_delegate_0_3_0" }

#[delegate]  // <-- Additional traits must be annotated with `#[delegate]`
pub trait MyExt {
    fn get_priority(&self) -> usize;
}

#[delegate(derive(Task, MyExt))]  // <-- Task enum must implement `Task`
pub enum ExampleTaskEnum {
    TaskOne(ExampleTaskOne),
    // TaskTwo(ExampleTaskTwo),
}

#[derive(Debug, Default)]
pub struct ExampleTaskOne {
    priority: usize,
    request: request::Request,
}

impl ExampleTaskOne {
    fn new(priority: usize, request: request::Request) -> ExampleTaskOne {
        ExampleTaskOne { priority, request }
    }
}

#[async_trait]
impl Task for ExampleTaskOne {
    // The request can either be generated on each call of this function, or it can be stored in the
    // struct. The advantage of storing in the struct is that, if a retry is necessary, the original
    // Task object can be reused. Storing the request in the struct means errors (i.e. uri parsing)
    // can be handled elsewhere.
    fn get_request(&mut self) -> &mut request::Request {
        &mut self.request
    }

    async fn request_completed(
        &mut self,
        response: errors::RequestResult,
    ) -> Result<(), anyhow::Error> {
        println!("{:?}", self.request.get_next_attempt());
        println!("{:?}", response?.body_bytes().await?);
        // logic to decide if the task was successful (i.e. insert to a database)

        // or if it failed due to a network error (i.e. check self.can_try() == true, if no, maybe
        // you want to increase max_tries, self.next_attempt = request::RequestResult::Retry)

        // or if the page is no longer online (i.e. self.can_try() == true ... ,
        // self.next_attempt = request::RequestResult::AttemptWebArchive)
        self.request.next_attempt(request::RequestType::WebArchive);
        Ok(())
    }
}

任务调度器

您的队列/基本调度器实现。

pub struct ExampleTaskDispatcher<T> {
    tasks: Arc<Mutex<Vec<T>>>,
}

impl<T> ExampleTaskDispatcher<T>
where
    T: Task + MyExt,
{
    fn new() -> ExampleTaskDispatcher<T> {
        let tasks = arc!(Mutex::new(vec![]));
        ExampleTaskDispatcher { tasks }
    }
}

impl<T> dispatcher::Dispatcher<T> for ExampleTaskDispatcher<T>
where
    T: Task + MyExt,
{
    fn get_task(&self) -> Option<T> {
        let mut guard = self.tasks.lock().unwrap();
        let rtn_value = guard.pop();
        rtn_value
    }

    fn add_task(&self, task: T) {
        let mut guard = self.tasks.lock().unwrap();
        dbg!(task.get_priority());
        guard.push(task);
    }
}

请求

存储请求的

  • Uri
  • 头信息
  • 方法
  • 允许重定向
  • 最大尝试次数
  • Next attempt -> Standard/WebArchive/Ignore

以及一些基本的逻辑/函数。

依赖项

~76MB
~1.5M SLoC