Puff - 深度堆栈 Python 运行时和 GraphQL 库

0.1.8 2022 年 11 月 13 日
0.1.7 2022 年 11 月 5 日
0.1.1 2022 年 10 月 24 日

☁ Puff ☁

Puff 是一个内置 Rust 异步运行时的 Python,支持 GraphQL、ASGI、WSGI、Postgres、PubSub、Redis、分布式任务和 HTTP2 客户端。

什么是 Puff?

Puff 是一个为 Python 提供所有功能的“深度堆栈”。它是将 Python 和 Rust 之间的障碍最小化以释放高级语言全部潜能的实验。使用标准 CPython 构建自己的运行时,并用 Rust 扩展它。想象一下,如果 GraphQL、Postgres、Redis 和 PubSub、分布式任务都是标准库的一部分。那就是 Puff。

以前将 Rust 集成到 Python 的方法是通过创建一个使用 rust 的 Python 包并将其从 Python 中导入。这种方法存在一些缺陷,因为 rust 包无法协作。Puff 为 Rust 提供了自己的层,因此您可以在 Rust 中构建一组协同工作的工具,而无需再次进入 Python。

高层次的概述是 Puff 为 Python

  • 在 Rust 的 Tokio 上提供 Greenlets。
  • 高性能 HTTP 服务器 - 将 Axum 与 Python WSGI 应用程序(Flask、Django 等)结合使用
  • Rust / Python 在同一进程中本地化,无需套接字或序列化。
  • 与 Rust 集成的 AsyncIO / uvloop / ASGI
  • 易于使用的 GraphQL 服务
  • 多节点 Pub/Sub
  • Rust 级别 Redis 池
  • Rust 级别 Postgres 池
  • WebSocket
  • HTTP 客户端
  • 分布式、至少一次、优先级和计划任务队列
  • 与 Psycopg2 半兼容(希望足够好,适用于大多数 Django)
  • 一种安全方便的方式进入 rust 以获得最大性能


Python Rust
✅ 高级 ✅ 低级
✅ 工具和包众多 ✅ 工具和包众多
✅ 入门容易 ✅ 入门容易
🟡 解释型(牺牲速度换取生产力) 🟡 编译型(牺牲生产力换取速度)
✅ 容易掌握 ❌ 学习曲线陡峭
✅ 快速迭代原型 ❌ 需要计划以确保正确性
✅ 搜索问题,复制粘贴,就能工作。 ❌ 示例较少
❌ 弱类型系统 ✅ 优秀的类型系统
❌ GIL阻止多线程 ✅ 高性能
❌ 不够安全 ✅ 安全






curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh



cargo install puff-rs

Puff需要Python >= 3.10。Python的Poetry是可选的。



poetry new my_puff_proj_py
cd my_puff_proj_py
poetry add puff-py

现在从my_puff_proj_py中,您可以使用poetry run puff运行您的项目,以从Poetry中访问Cargo并将虚拟环境暴露给Puff。


Puff ♥ Python




function = "my_puff_proj_py.hello_world"
command_name = "hello_world"


import puff

# Standard python functions run on Puff greenlets. You can only use special Puff async functions without pausing other greenlets.
def hello_world():
    fn = "my_file.zip"
    result_bytes = puff.read_file_bytes(fn) # Puff async function that runs in Tokio.
    result_py_bytes = do_some_blocking_work(fn) # Python blocking that spawns a thread to prevent pausing the greenlet thread.
    print(f"Hello from python!! Zip is {len(result_bytes)} bytes long from rust and {len(result_py_bytes)} bytes from Python.")

# 100% of python packages are compatible by wrapping them in blocking decorator.
def do_some_blocking_work(fn):
    with open(fn, "rb") as f:
        return f.read()

Puff ♥ Django


用几行代码将您的同步Django项目转换成一个高度并发的Puff程序。Puff包装了管理命令,所以迁移等命令都按预期工作。简单地运行poetry run run_cargo django [command],而不是使用./manage.py [command]。例如,poetry run run_cargo django migrate。不要使用Django的开发服务器,而要使用Puff的poetry run run_cargo serve


django = true
wsgi = "my_django_application.wsgi.application"

name = "default"

name = "default"



Puff ♥ Graphql

Puff通过Python类定义暴露GraphQL的Mutations、Queries和Subscriptions。Puff GraphQL引擎的核心“杀手级特性”是它在“层基础”上工作,而不是在Node基础上。这允许GraphQL的每一步都能一次性收集查询所需的所有完整数据。这避免了传统上与GraphQL相关联的令人头疼的n+1和数据加载器开销。

GraphQL Python函数可以将纯SQL查询传递给Puff,Puff将渲染和转换查询,无需返回到Python。这允许Python GraphQL接口在很大程度上是I/O自由的,但在需要时仍然可以灵活地访问Puff资源。

from dataclasses import dataclass
from typing import Optional, Tuple, List, Any
from puff.pubsub import global_pubsub

pubsub = global_pubsub
CHANNEL = "my_puff_chat_channel"

class SomeInputObject:
    some_count: int
    some_string: str

class SomeObject:
    field1: int
    field2: str
class DbObject:
    was_input: int
    title: str
    def child_sub_query(cls, context, /) -> Tuple[DbObject, str, List[Any], List[str], List[str]]:
        # Extract column values from the previous layer to use in this one.
        parent_values = [r[0] for r in context.parent_values(["field1"])]
        sql_q = "SELECT a::int as was_input, $2 as title FROM unnest($1::int[]) a"
        # returning a sql query along with 2 lists allow you to correlate and join the parent with the child.
        return ..., sql_q, [parent_values, "from child"], ["field1"], ["was_input"]

class Query:

    def hello_world(cls, parents, context, /, my_input: int) -> Tuple[List[DbObject], str, List[Any]]:
        # Return a Raw query for Puff to execute in Postgres.
        # The ellipsis is a placeholder allowing the Python type system to know which Field type it should transform into.
        return ..., "SELECT $1::int as was_input, \'hi from pg\'::TEXT as title", [my_input]

    def hello_world_object(cls, parents, context, /, my_input: List[SomeInputObject]) -> Tuple[List[SomeObject], List[SomeObject]]:
        objs = [SomeObject(field1=0, field2="Python object")]
        if my_input:
            for inp in my_input:
                objs.append(SomeObject(field1=inp.some_count, field2=inp.some_string))
        # Return some normal Python objects.
        return ..., objs
    def new_connection_id(cls, context, /) -> str:
        # Get a new connection identifier for pubsub.
        return pubsub.new_connection_id()

class Mutation:
    def send_message_to_channel(cls, context, /, connection_id: str, message: str) -> bool:
        print(context.auth_token) #  Authoritzation bearer token passed in the context
        return pubsub.publish_as(connection_id, CHANNEL, message)

class MessageObject:
    message_text: str
    from_connection_id: str
    num_processed: int

class Subscription:
    def read_messages_from_channel(cls, context, /, connection_id: Optional[str] = None) -> Iterable[MessageObject]:
        if connection_id is not None:
            conn = pubsub.connection_with_id(connection_id)
            conn = pubsub.connection()
        num_processed = 0
        while msg := conn.receive():
            from_connection_id = msg.from_connection_id
            # Filter out messages from yourself.
            if connection_id != from_connection_id:
                yield MessageObject(message_text=msg.text, from_connection_id=from_connection_id, num_processed=num_processed)
                num_processed += 1

class Schema:
    query: Query
    mutation: Mutation
    subscription: Subscription


django = true
pytest = true

enable = true

enable = true

enable = true

schema = "my_python_gql_app.Schema"
url = "/graphql/"
subscriptions_url = "/subscriptions/"
playground_url = "/playground/"

function = "my_puff_proj_py.hello_world"
command_name = "hello_world"




Puff ♥ Pytest

通过将PytestCommand添加到您的程序中,与pytest轻松集成以测试您的GraphQL和Puff应用程序。只需像往常一样编写测试,只需用puff pytest运行它们即可。

pytest = true
from hello_world_py_app import __version__
from puff.graphql import global_graphql

gql = global_graphql

def test_version():
    assert __version__ == '0.1.0'

def test_gql():
    QUERY = """
    query {
        hello_world(my_input: 3) {
    result = gql.query(QUERY, {})
    assert 'data' in result
    assert 'errors' not in result
    assert result['data']["hello_world"][0]["title"] == "hi from pg"
    assert result['data']["hello_world"][0]["was_input"] == 3

Puff ♥ AsyncIO


asgiref.sync.async_to_syncasgiref.sync.sync_to_async都已被修补,因此您可以轻松地从异步调用puff greenlets或从puff greenlets调用异步。

from fastapi import FastAPI
from puff import global_state, wrap_async

state = global_state

app = FastAPI()

async def read_root():
    result = await wrap_async(lambda r: state.hello_from_rust_async(r, "hello from asyncio"))
    return {"Hello": "World", "from": "Fast API", "rust_value": result}


asyncio = true
asgi = "my_python_app.app"

Puff ♥ Django + GraphQL

Puff GraphQL与Django无缝集成。将Django查询集转换为SQL,将所有计算卸载到Rust。或者用borrow_db_context装饰,并让Django能够访问GraphQL连接,允许您在复杂的查找中回退到Django的稳健性。

from dataclasses import dataclass
from puff import graphql
from polls.models import Question, Choice
from django.utils import timezone
from  puff.contrib.django import query_and_params

class ChoiceObject:
    id: int
    question_id: int
    choice_text: str
    votes: int

class QuestionObject:
    id: int
    pub_date: str
    question_text: str

    def choices(cls, context, /) -> Tuple[List[ChoiceObject], str, List[Any], List[str], List[str]]:
        # Extract column values from the previous layer to use in this one.
        parent_values = [r[0] for r in context.parent_values(["id"])]
        # Convert a Django queryset to sql and params to pass off to Puff. This function does 0 IO in Python.
        qs = Choice.objects.filter(question_id__in=parent_values)
        sql_q, params = query_and_params(qs)
        return ..., sql_q, params, ["id"], ["question_id"]

class Query:

    def questions(cls, context, /) -> Tuple[List[QuestionObject], str, List[Any]]:
        # Convert a Django queryset to sql and params to pass off to Puff. This function does 0 IO in Python.
        qs = Question.objects.all()
        sql_q, params = query_and_params(qs)
        return ..., sql_q, params

    @graphql.borrow_db_context  # Decorate with borrow_db_context to use same DB connection in Django as the rest of GQL
    def question_objs(cls, context, /) -> Tuple[List[QuestionObject], List[Any]]:
        # You can also compute the python values with Django and hand them off to Puff.
        # This version of the same `questions` field, is slower since Django is constructing the objects.
        objs = list(Question.objects.all())
        return ..., objs

class Mutation:
    @graphql.borrow_db_context  # Decorate with borrow_db_context to use same DB connection in Django as the rest of GQL
    def create_question(cls, context, /, question_text: str) -> QuestionObject:
        question = Question.objects.create(question_text=question_text, pub_date=timezone.now())
        return question

class Subscription:

class Schema:
    query: Query
    mutation: Mutation
    subscription: Subscription

Puff ♥ 分布式任务






通过使用 scheduled_time_unix_ms 来实现优先级。工作者将所有任务按此值排序,并执行到当前时间为止的第一个任务。因此,如果您调度 scheduled_time_unix_ms=1,则该函数将在第一个可用时执行。使用 scheduled_time_unix_ms=1scheduled_time_unix_ms=2scheduled_time_unix_ms=3 等等,为不同类型的高优先级任务。请注意,如果您不够快地处理这些高优先级任务,可能会导致其他任务饿死。默认情况下,Puff 使用当前 Unix 时间调度新任务以实现“公平”并提供“先进先出”的顺序感。您还可以将此值设置为将来的 Unix 时间戳以延迟任务执行。

您可以运行任意数量的任务(使用 set_task_queue_concurrent_tasks),但是通过增加此值,在监控和查找新任务方面会有一定的开销。默认值为 num_cpu x 4

请参阅 使用 Puff 构建RPC 中的其他设计模式。

from puff.task_queue import global_task_queue

task_queue = global_task_queue

def run_main():
    all_tasks = []
    for x in range(100):
        #  Schedule some tasks on any coroutine thread of any Puff instance connected through Redis.
        task1 = task_queue.schedule_function(my_awesome_task, {"type": "coroutine", "x": [x]}, timeout_ms=100, keep_results_for_ms=5 * 1000)
        #  Override `scheduled_time_unix_ms` so that async tasks execute with priority over the coroutine tasks.
        #  Notice that since all of these tasks have the same priority, they may be executed out of the order they were scheduled.
        task2 = task_queue.schedule_function(my_awesome_task_async, {"type": "async", "x": [x]}, scheduled_time_unix_ms=1)
        #  These tasks will keep their order since their priorities as defined by `scheduled_time_unix_ms` match the order scheduled.
        task3 = task_queue.schedule_function(my_awesome_task_async, {"type": "async-ordered", "x": [x]}, scheduled_time_unix_ms=x)
        print(f"Put tasks {task1}, {task2}, {task3} in queue")

    for task in all_tasks:
        result = task_queue.wait_for_task_result(task, 100, 1000)
        print(f"{task} returned {result}")

def my_awesome_task(payload):
    print(f"In task {payload}")
    return payload["x"][0]

async def my_awesome_task_async(payload):
    print(f"In async task {payload}")
    return payload["x"][0]


asyncio = true

enable = true

Puff ♥ HTTP

Puff 内置了一个基于 reqwests 的异步 HTTP 客户端,它可以处理 HTTP2(由 Puff WSGI/ASGI 集成提供)并重用连接。它使用 Rust 超快速地编码和解码 JSON。

from puff.http import global_http_client

http_client = global_http_client

async def do_http_request():
    this_response = await http_client.post("", json={"my_data": ["some", "json_data"]})
    return await this_response.json()

def do_http_request_greenlet():
    """greenlets can use the same async functions. Puff will automatically handle awaiting and context switching."""
    this_response = http_client.post("", json={"my_data": ["some", "json_data"]})
    return this_response.json()

您可以通过 RuntimeConfig 设置 HTTP 客户端选项。如果您的程序只与其他 Puff 实例或 HTTP2 服务通信,则仅启用 HTTP2 是有意义的。您还可以通过此方法配置用户代理以及许多其他 HTTP 选项。

asyncio = true

http2_prior_knowledge = true


Puff 支持多个服务池。

name = "default"

name = "readonly"

name = "audit"

name = "default"

name = "other"

name = "default"

name = "internal"
http2_prior_knowledge = true

name = "default"

name = "otherpubsub"

schema = "my_python_gql_app.Schema"
url = "/graphql/"
subscriptions_url = "/subscriptions/"
playground_url = "/playground/"
database = "readonly"

name = "audit"
schema = "my_python_gql_app.AuditSchema"
url = "/audit/graphql/"
subscriptions_url = "/audit/subscriptions/"
playground_url = "/audit/playground/"
database = "audit"


      --default-postgres-url <DEFAULT_POSTGRES_URL>
          Postgres pool configuration for 'default'. [env: PUFF_DEFAULT_POSTGRES_URL=] [default: postgres://postgres:password@localhost:5432/postgres]
      --audit-postgres-url <AUDIT_POSTGRES_URL>
          Postgres pool configuration for 'audit'. [env: PUFF_AUDIT_POSTGRES_URL=] [default: postgres://postgres:password@localhost:5432/postgres]
      --readonly-postgres-url <READONLY_POSTGRES_URL>
          Postgres pool configuration for 'readonly'. [env: PUFF_READONLY_POSTGRES_URL=] [default: postgres://postgres:password@localhost:5432/postgres]
      --default-redis-url <DEFAULT_REDIS_URL>
          Redis pool configuration for 'default'. [env: PUFF_DEFAULT_REDIS_URL=] [default: redis://]
      --other-redis-url <OTHER_REDIS_URL>
          Redis pool configuration for 'other'. [env: PUFF_OTHER_REDIS_URL=] [default: redis://]
      --default-pubsub-url <DEFAULT_PUBSUB_URL>
          PubSub configuration for 'default'. [env: PUFF_DEFAULT_PUBSUB_URL=] [default: redis://]
      --otherpubsub-pubsub-url <OTHERPUBSUB_PUBSUB_URL>
          PubSub configuration for 'otherpubsub'. [env: PUFF_OTHERPUBSUB_PUBSUB_URL=] [default: redis://]


Puff 设计得如此之好,以至于您可以将其作为库来构建自己的版本。如果您的项目需要,这允许进行令人难以置信的性能优化。


Puff 由多线程的 Tokio 运行时和一个单线程组成,该线程在 Greenlets 上运行所有 Python 计算。Python 将 IO 卸载到 Tokio,它调度 IO 并在必要时将其返回。

Untitled Diagram-2



Puff 的最终目标可能是像 gevent 的 monkeypatch 一样,自动使现有项目与 Puff 兼容。


