#fifo-queue #queue #message-queue #message #fifo #io-write

app queued

快速零配置单二进制简单队列服务

14个版本 (9个破坏性版本)

0.9.0 2024年4月7日
0.7.0 2024年4月5日
0.6.0 2023年7月19日
0.3.0 2023年3月15日

数据库实现中排名229

每月下载40

SSPL-1.0

77KB
2K SLoC

queued

快速零配置单二进制简单队列服务。

  • 查询内容,并批量推送和删除。
  • 带有速率限制和暂停的程序化或临时流量控制。
  • 具有最小写入量和强大API保证的持久性的快速I/O。
  • 作为简单库直接集成到更大的程序中。

快速开始

安装

确保您已安装Rust。

cargo install queued

运行

queued --data-dir /var/lib/queued

调用

// 🌐 PUT /queue/my-q
{
  "messages": [
    { "contents": "Hello, world!", "visibility_timeout_secs": 0 }
  ]
}
// ✅ 200 OK
{}


// 🌐 POST /queue/my-q/messages/push
{
  "messages": [
    { "contents": "Hello, world!", "visibility_timeout_secs": 0 }
  ]
}
// ✅ 200 OK
{
  "id": 190234
}


// 🌐 POST /queue/my-q/messages/poll
{
  "visibility_timeout_secs": 30
}
// ✅ 200 OK
{
  "messages": [
    {
      "contents": "Hello, world!",
      "created": "2023-01-03T12:00:00Z",
      "id": 190234,
      "poll_count": 1,
      "poll_tag": 33
    }
  ]
}


// 🌐 POST /queue/my-q/messages/update
{
  "id": 190234,
  "poll_tag": 33,
  "visibility_timeout_secs": 15
}
// ✅ 200 OK
{
  "new_poll_tag": 45
}


// 🌐 POST /queue/my-q/messages/delete
{
  "messages": [
    {
      "id": 190234,
      "poll_tag": 45
    }
  ]
}
// ✅ 200 OK
{}

性能

单个节点

使用单个Intel Alder Lake CPU核心和NVMe SSD,queued每秒管理约300,000个操作(推送、轮询、更新或删除),具有4,096个并发客户端和64的批量大小。内存使用量最小;仅存储每条消息的元数据。

由于每个操作都持久化到底层存储,存储I/O性能可能会迅速成为瓶颈。请考虑使用RAID 0和调整写入延迟以获得更好的性能。

安全性

在API层,只有成功的响应(即2xx)表示请求已成功持久化到磁盘(fdatasync)。假设任何中断或失败的请求都没有安全地存储,并适当地重试。更改对其他所有调用者立即可见。

建议在生产环境中运行时使用具有错误纠正功能的持久性存储,就像其他任何有状态的工作负载一样。

可以通过停止进程并复制文件/设备的全部内容来执行备份。

管理

POST /suspend可以暂停特定的API端点,这在临时调试或紧急干预而不停止服务器时非常有用。它需要一个请求体,如

{
  "delete": true,
  "poll": false,
  "push": false,
  "update": true
}

将属性设置为true以禁用该端点,并将false设置为重新启用它。禁用的端点将返回503 服务不可用。使用GET /suspend获取当前暂停的端点。

POST /throttle 将配置轮询节流,这对于流量控制和速率限制非常有用。它接受类似以下请求体

{
  "throttle": {
    "max_polls_per_time_window": 100,
    "time_window_sec": 60
  }
}

这将使轮询请求的速率限制为每60秒100次。没有其他端点被节流。节流请求将返回 429 请求过多。使用 GET /throttle 获取当前的节流设置。要禁用节流

{
  "throttle": null
}

GET /healthz 返回当前的构建版本。

GET /metrics 返回 Prometheus 或 JSON (接受 Accept: application/json) 格式的指标

# HELP queued_empty_poll Total number of poll requests that failed due to no message being available.
# TYPE queued_empty_poll counter
queued_empty_poll 0 1678525380549

# HELP queued_invisible Amount of invisible messages currently in the queue. They may have been created, polled, or updated.
# TYPE queued_invisible gauge
queued_invisible 0 1678525380549

# HELP queued_io_sync_background_loops Total number of delayed sync background loop iterations.
# TYPE queued_io_sync_background_loops counter
queued_io_sync_background_loops 19601 1678525380549

# HELP queued_io_sync Total number of fsync and fdatasync syscalls.
# TYPE queued_io_sync counter
queued_io_sync 0 1678525380549

# HELP queued_io_sync_delayed Total number of requested syncs that were delayed until a later time.
# TYPE queued_io_sync_delayed counter
queued_io_sync_delayed 0 1678525380549

# HELP queued_io_sync_longest_delay_us Total number of microseconds spent waiting for a sync by one or more delayed syncs.
# TYPE queued_io_sync_longest_delay_us counter
queued_io_sync_longest_delay_us 0 1678525380549

# HELP queued_io_sync_shortest_delay_us Total number of microseconds spent waiting after a final delayed sync before the actual sync.
# TYPE queued_io_sync_shortest_delay_us counter
queued_io_sync_shortest_delay_us 0 1678525380549

# HELP queued_io_sync_us Total number of microseconds spent in fsync and fdatasync syscalls.
# TYPE queued_io_sync_us counter
queued_io_sync_us 0 1678525380549

# HELP queued_io_write_bytes Total number of bytes written.
# TYPE queued_io_write_bytes counter
queued_io_write_bytes 0 1678525380549

# HELP queued_io_write Total number of write syscalls.
# TYPE queued_io_write counter
queued_io_write 0 1678525380549

# HELP queued_io_write_us Total number of microseconds spent in write syscalls.
# TYPE queued_io_write_us counter
queued_io_write_us 0 1678525380549

# HELP queued_missing_delete Total number of delete requests that failed due to the requested message not being found.
# TYPE queued_missing_delete counter
queued_missing_delete 0 1678525380549

# HELP queued_missing_update Total number of update requests that failed due to the requested message not being found.
# TYPE queued_missing_update counter
queued_missing_update 0 1678525380549

# HELP queued_successful_delete Total number of delete requests that did delete a message successfully.
# TYPE queued_successful_delete counter
queued_successful_delete 0 1678525380549

# HELP queued_successful_poll Total number of poll requests that did poll a message successfully.
# TYPE queued_successful_poll counter
queued_successful_poll 0 1678525380549

# HELP queued_successful_push Total number of push requests that did push a message successfully.
# TYPE queued_successful_push counter
queued_successful_push 0 1678525380549

# HELP queued_successful_update Total number of update requests that did update a message successfully.
# TYPE queued_successful_update counter
queued_successful_update 0 1678525380549

# HELP queued_suspended_delete Total number of delete requests while the endpoint was suspended.
# TYPE queued_suspended_delete counter
queued_suspended_delete 0 1678525380549

# HELP queued_suspended_poll Total number of poll requests while the endpoint was suspended.
# TYPE queued_suspended_poll counter
queued_suspended_poll 0 1678525380549

# HELP queued_suspended_push Total number of push requests while the endpoint was suspended.
# TYPE queued_suspended_push counter
queued_suspended_push 0 1678525380549

# HELP queued_suspended_update Total number of update requests while the endpoint was suspended.
# TYPE queued_suspended_update counter
queued_suspended_update 0 1678525380549

# HELP queued_throttled_poll Total number of poll requests that were throttled.
# TYPE queued_throttled_poll counter
queued_throttled_poll 0 1678525380549

# HELP queued_vacant How many more messages that can currently be pushed into the queue.
# TYPE queued_vacant gauge
queued_vacant 0 1678525380549

# HELP queued_visible Amount of visible messages currently in the queue, which can be polled. This may be delayed by a few seconds.
# TYPE queued_visible gauge
queued_visible 4000000 1678525380549

重要细节

  • 消息按其可见时间顺序交付。同一时间可见的消息可以以任何顺序交付。消息将永远不会在可见时间之前交付,但可能在几秒后交付。轮询消息可能在可见时间后几秒被更新或删除,原因相同。
  • ID和轮询标签值是唯一的且不透明的。
  • 消息的大小没有限制。HTTP API对请求体的限制为每个请求128 MiB。
  • 非2xx响应仅包含文本,通常包含错误消息,因此在解析为JSON之前请检查状态。
  • 当磁盘空间耗尽时,进程将退出。

开发

example-client 中的客户端可以帮助进行压力测试、性能调整和配置文件生成。

随着I/O成为优化的主要关注点,请记住

  • 我们假设 powersafe overwrites 即一个 write 不会影响目标范围之外的数据。
  • write 系统调用数据立即对所有线程和进程中的所有 read 系统调用可见。
  • write 系统调用 可以 重新排序,除非使用了 fdatasync/fsync,它既是一个屏障也是一个缓存清除器。这意味着一系列快速的 write (1: 创建) -> read (2: 检查) -> write (3: 更新) 实际上可能导致1覆盖3。理想情况下,会有两个不同的API用于创建屏障和清除缓存。

依赖项

~61MB
~1M SLoC