# Repid > Repid is a simple, fast, and extensible async task queue framework, with built-in AsyncAPI 3.0 schema generation. Repid is a simple, fast, and extensible async task queue framework for Python 3.10+, featuring built-in AsyncAPI 3.0 schema generation. # Getting Started # Glossary When working with Repid, you'll encounter a few core concepts and terms that form the foundation of the framework. This brief glossary will help you familiarize yourself with the terminology used throughout the documentation. ## Core Concepts - **Message:** The fundamental unit of data sent through the queue. A message contains a payload (your data) and metadata (like a topic, queue name, and headers). - **Actor:** An asynchronous Python function decorated with `@router.actor` that is responsible for processing a specific type of message. It is the "consumer" of your background tasks. - **Topic:** A string label attached to a message that dictates how it should be routed. By default, an Actor listens for messages with a topic matching its function name. - **Queue:** A named buffer within the message broker where messages wait before being processed by a worker. - **Worker:** A background process (managed by `app.run_worker()`) that constantly pulls messages from the message broker and dispatches them to the corresponding actors. - **Router:** An object (like `repid.Router()`) used to organize and group multiple actors together. You attach routers to your application when starting a worker. - **Server / Broker:** The underlying infrastructure (like RabbitMQ, Redis, or an in-memory store) that physically stores and routes your messages. Repid connects to these via "Server" adapters (e.g., `AmqpServer`, `RedisServer`). ## Message States & Acknowledgments When an actor finishes processing a message (or encounters an error), Repid must signal the broker about the outcome. This is known as acknowledgment. The framework handles this based on the `@router.actor(confirmation_mode=...)` setting, but the core states remain the same: - **Ack (Acknowledge):** The actor successfully processed the message. The broker is instructed to safely remove the message from the queue. - **Nack (Negative Acknowledge):** The actor encountered an error and failed to process the message. The broker is instructed to negatively acknowledge it. Depending on your specific broker's configuration, this might mean the message is retried, dropped, or moved to a Dead Letter Queue (DLQ). - **Reject:** The execution is rejected, returning the message back to its original queue. This doesn't necessarily mean an error occurred; it can simply indicate that the actor or worker cannot accept the message at the moment. For example, Repid uses this state to return messages to the queue during a graceful shutdown, allowing them to be attempted again later or processed by another worker. ## AsyncAPI & Channels Repid implements the AsyncAPI 3.0 specification for schema generation. A key concept here is the "Channel." - **Channel:** In AsyncAPI, a channel is a generic pathway for transmitting messages. However, Repid seamlessly maps this generic concept to the specific routing infrastructure of your chosen message broker. Depending on the context: - **AMQP (RabbitMQ):** For a producer (sending a message), a channel can map to either a *queue* (the default behavior) or an *exchange*. For a consumer (a worker receiving messages), a channel always maps strictly to a *queue*. - **Google Cloud PubSub:** A producer's channel maps to a *Topic*, whereas a consumer's channel maps to a *Subscription*. - **Redis / In-Memory:** The channel typically maps directly to the target list/queue key name. ## Advanced Concepts - **Dependency Injection (DI):** A mechanism in Repid that allows your actor functions to declare dependencies (like a database connection, or specific message headers) in their signature. Repid automatically resolves and provides these dependencies when the actor is called. - **Middleware:** Functions that wrap around the sending or processing of messages. - **Producer Middleware:** Intercepts a message right before it is sent to the broker. - **Actor Middleware:** Intercepts a message right before it is executed by an actor, and can modify the result or handle exceptions. - **Health Check Server:** An optional, lightweight embedded HTTP server provided by Repid that exposes endpoints (like `/healthz` or `/livez`) to signal the worker's status to container orchestration tools (like Kubernetes). - **AsyncAPI:** An open-source specification for defining asynchronous APIs. Repid can automatically generate AsyncAPI schemas from your actors and routers, and serve them via an embedded UI. # Quickstart ## App structure Usually, Repid applications are split into two conceptual parts: - **Producer**: Enqueues or sends new messages to the queue (usually within your web server). - **Consumer (Worker)**: A background process that loops, pulls messages, and executes them via actors. The framework is intentionally designed to separate these concerns, meaning you can easily have them in different codebases. Framework & Language Agnostic Repid is highly unopinionated. It does not enforce any specific internal message format! Because you can easily override how payloads are decoded (via serializers/converters) and how messages are routed (via custom routing strategies), you can use Repid strictly as a Consumer to process jobs sent from an entirely different system, or strictly as a Producer to push jobs into queues. ## Servers (Choosing a Broker) First of all, you need to establish a connection to a message broker. Repid abstracts brokers into "Servers". Repid provides a few servers out-of-the-box (like AMQP, Redis, and GCP PubSub), but you will need to specify extra dependencies to use them: For AMQP / RabbitMQ, install: ``` pip install repid[amqp] ``` For Redis, install: ``` pip install repid[redis] ``` For Google Cloud PubSub, install: ``` pip install repid[pubsub] ``` Mostly for test purposes or simple local scripts, Repid provides an `InMemoryServer`. You don't need any extra dependencies for it. ## Infrastructure Topology Queue Creation Repid assumes that your underlying infrastructure (queues, topics, exchanges) **already exists**. It does not handle "topology creation" (like running `queue_declare` on RabbitMQ or `create_topic` on Google Cloud PubSub) automatically. It is up to you (or your deployment scripts) to ensure the target queues/channels exist on your broker before attempting to send or consume messages. ## The Repid Application To initialize Repid, you instantiate a `Repid` object and register your servers. ``` import os from repid import Repid, AmqpServer # 1. Create your application app = Repid(title="My Awesome App") # 2. Register a server and mark it as default server = AmqpServer("amqp://localhost") app.servers.register_server("my_rabbitmq", server, is_default=True) ``` The name you provide (e.g., `"my_rabbitmq"`) is just an identifier you can use later. Because we passed `is_default=True`, any messages sent without specifying a particular server will automatically be routed to this one. ## Sending Messages (Producer) Whenever you want to send a message to a queue, you need to ensure the connection to the server is open. You can handle connections using async context managers. ``` import asyncio from repid import Repid, AmqpServer app = Repid() server = AmqpServer("amqp://localhost") app.servers.register_server("my_rabbitmq", server, is_default=True) async def main(): # Retrieve the default server and open a connection async with app.servers.default.connection(): # You can now safely publish messages await app.send_message_json( channel="default_queue", payload={"message": "Hello, Repid!"}, headers={"topic": "my_actor"} # Don't forget the topic if using topic routing! ) if __name__ == "__main__": asyncio.run(main()) ``` ### Integration with Web Frameworks If you are using Repid alongside an ASGI web framework (like FastAPI, Robyn, or Litestar), you should open the server connection during the application's startup phase and close it during shutdown using the framework's native `lifespan` handlers. You can find a complete, ready-to-run code example of this exact pattern in our [Integrations Guide](https://repid.aleksul.space/integrations/fastapi_and_pydantic/index.md). ## Running Workers (Consumer) To actually *process* the messages you just sent, you need to define an Actor and run a Worker. Workers should generally be run as an entirely separate script or process from your web server. It does not need to be the exact same `Repid` application instance in memory, it just needs to be connected to the same underlying server/broker! worker.py ``` import asyncio from repid import Repid, Router, AmqpServer # 1. Initialize an app for the worker to use app = Repid() server = AmqpServer("amqp://localhost") app.servers.register_server("my_rabbitmq", server, is_default=True) # 2. Create a router and an actor router = Router() @router.actor(channel="default_queue") async def my_action(message: str): print(f"Executing with payload: {message}") app.include_router(router) # 3. Start the worker loop! async def run_worker(): # Like the producer, the connection must be opened first async with app.servers.default.connection(): # Blocks forever, constantly pulling new messages and running actors await app.run_worker() if __name__ == "__main__": asyncio.run(run_worker()) ``` # Supported Brokers Repid includes built-in support for several message brokers, accommodating various infrastructure requirements from local development to distributed enterprise systems. Here is a quick overview to help you choose the right one, followed by detailed descriptions of each. | Broker | Best for... | Installation | | ------------------------------------------------ | --------------------------------------------------------------- | ----------------------------- | | **[In-Memory](#in-memory-built-in)** | Local development, testing, and rapid prototyping | *Built-in* | | **[Redis](#redis)** | Simple deployments and existing Redis stacks | `pip install "repid[redis]"` | | **[AMQP 1.0](#amqp-10-rabbitmq)** | Enterprise architectures, robust routing, and strong guarantees | `pip install "repid[amqp]"` | | **[Google Cloud Pub/Sub](#google-cloud-pubsub)** | Serverless capabilities and GCP-native applications | `pip install "repid[pubsub]"` | ______________________________________________________________________ ## In-Memory (Built-in) The **In-Memory** broker uses Python's standard `asyncio.Queue` to store messages directly in your application's memory. - **When to use it:** The default choice for local development, rapid prototyping, and unit testing. While it *can* be used for single-process monolithic applications, it is **not recommended** for production environments. - **Installation:** *Built-in (no installation required)* - **Caveats:** All queued messages are permanently lost if your application crashes or restarts. Additionally, it cannot distribute tasks across multiple worker processes or machines. - **Under the hood:** Relies entirely on native Python asynchronous primitives stored in memory. This ensures zero network overhead and immediate message delivery within a single process. ## Redis The **Redis** broker offers an excellent balance between the simplicity of an in-memory queue and the advanced features of a dedicated message broker. - **When to use it:** Ideal for persistent queuing and horizontal scalability, particularly if Redis is already part of your infrastructure stack. - **Installation:** `pip install "repid[redis]"` - **Caveats:** Requires Redis server version **5.0.0 or higher**, and the Python `redis` client **>=7.0.0**. While performant for many use cases, it lacks the complex routing capabilities found in brokers like RabbitMQ. - **Under the hood:** Built on top of `redis.asyncio`, Repid leverages modern **Redis Streams** (using commands like `XADD`, `XREADGROUP`, and `XACK`). It natively handles consumer group offsets, making it significantly more reliable than older Redis Pub/Sub or list-based (`BLPOP`) task queue implementations. ## AMQP 1.0 (RabbitMQ) The **AMQP** broker provides seamless integration with mature, enterprise-grade messaging systems using the AMQP 1.0 protocol, most notably RabbitMQ. - **When to use it:** Required for high reliability, granular message routing, strict acknowledgment mechanisms, or when integrating Repid into a polyglot microservice architecture. - **Installation:** `pip install "repid[amqp]"` - **Caveats:** Requires a broker that explicitly supports AMQP 1.0. For RabbitMQ versions prior to 4.0, you must enable the `rabbitmq_amqp1_0` plugin. Starting with version 4.0, AMQP 1.0 is supported natively. - **Under the hood:** Repid manages its own native connection states and protocol framing using a vendorized AMQP layer. **Unlike comparable libraries that typically rely on the legacy AMQP 0.9.1 protocol, Repid uses AMQP 1.0, which is significantly more optimized and reliable.** By avoiding heavy third-party async Python clients, it delivers a streamlined and highly performant experience. ## Google Cloud Pub/Sub The **Google Cloud Pub/Sub** broker is designed specifically for serverless, horizontally scaling architectures on the Google Cloud Platform. - **When to use it:** Ideal for fully managed, globally scalable queuing in GCP-exclusive environments, eliminating the maintenance overhead of self-managed clusters. - **Installation:** `pip install "repid[pubsub]"` - **Caveats:** Tightly coupled to the Google Cloud ecosystem. Not suitable for on-premise deployments or other cloud providers. - **Under the hood:** Rather than wrapping the standard `google-cloud-pubsub` client library (which is internally synchronous), Repid reimplements the connection asynchronously. By utilizing raw async gRPC calls (`grpc.aio`) directly to Pub/Sub endpoints, Repid achieves superior connection resilience, concurrency, and performance tailored to its task processing needs. ______________________________________________________________________ ## What if my broker isn't supported? Repid's core framework is completely decoupled from its network layer. If your infrastructure relies on Kafka, AWS SQS, NATS, or other systems, you can implement a custom connector. Check out the [Your own brokers](https://repid.aleksul.space/advanced_user_guide/your_own_brokers/index.md) guide for instructions on extending Repid. # Core Concepts # Dependency injection Repid includes a powerful, flexible Dependency Injection (DI) system out-of-the-box. Dependency Injection allows you to inject required components (like database connections, settings, or generic logic) directly into your actor functions without tightly coupling them or passing the same objects repeatedly. ## Depends In your actor, you can declare a dependency as follows: ``` from typing import Annotated from repid import Repid, Router, Depends app = Repid() router = Router() def dependency_function() -> str: return "Hello!" @router.actor async def my_actor( my_dependency: Annotated[str, Depends(dependency_function)] ) -> None: print(my_dependency) # Will print `Hello!` ``` ## Sub-dependencies Your dependency can also have some dependencies of its own! ``` from typing import Annotated from repid import Depends, Router router = Router() def subdependency_function() -> str: return "world!" def dependency_function( sub: Annotated[str, Depends(subdependency_function)] ) -> str: return "Hello " + sub @router.actor async def my_actor( my_dependency: Annotated[str, Depends(dependency_function)] ) -> None: print(my_dependency) # Will print `Hello world!` ``` ## Extracting parameters from payload A dependency can even request fields that are provided via the incoming message payload or headers. If a parameter in the dependency has no `Depends` annotation, Repid will look for it in the parsed JSON payload. Dependencies can request the exact same parameters that the main actor requests, but they can also request their own unique parameters. Repid intelligently inspects the main actor and all of its dependencies, combining all requested fields into one big model that is parsed simultaneously! ``` from repid import Header from typing import Annotated async def verify_user( # These fields must exist in the payload user_id: int, verification_token: str, # This will extract a header correlation_id: Annotated[str | None, Header(alias="X-Correlation-ID")] = None ): if user_id <= 0: raise ValueError("Invalid user_id") return True @router.actor async def update_user( # user_id is requested by BOTH the actor and the dependency! user_id: int, # The dependency requires verification_token, so it becomes required in the payload! is_verified: Annotated[bool, Depends(verify_user)] ): # `user_id` and `verification_token` from the payload were passed to `verify_user` first! return f"User {user_id} is verified: {is_verified}" ``` ## Sync and Async dependencies Your dependencies' functions can be both synchronous and asynchronous. ### Asynchronous ``` async def dependency_function() -> str: await asyncio.sleep(0.1) # Imitates some async work return "Hello world!" Depends(dependency_function) ``` ### Synchronous ``` def dependency_function() -> str: return "Hello world!" Depends(dependency_function) ``` Sync Overhead Because synchronous dependencies are executed in a thread or process pool (to avoid blocking the main event loop), they introduce a small amount of context-switching overhead. If your dependency is extremely lightweight and doesn't actually block (e.g., simply returning a value from a dictionary or reading an environment variable), it is often more performant to define it as an `async def` function anyway, even if it contains no `await` statements. ### Synchronous (CPU-heavy) In case your function is synchronous, it will be run in a thread pool executor to avoid blocking the event loop. You can also opt to run it in a process pool executor if your function is CPU bound. ``` def dependency_function() -> int: # some CPU-heavy computation here return 123 Depends(dependency_function, run_in_process=True) ``` ## Overriding dependencies You can override a dependency globally. For Testing Purposes Only Dependency overrides are designed **primarily for testing**. Because overrides cause global state mutations across your application, it is highly recommended *not* to use them in production code. Use them exclusively in your test suites to safely mock external resources like database connections or API clients. ``` def dependency_function() -> str: return "Hello!" d = Depends(dependency_function) d.override(lambda: "Overridden!") @router.actor async def my_actor( my_dependency: Annotated[str, d] ) -> None: print(my_dependency) # Will print `Overridden!` ``` # Message Registry In Repid, you can send messages to a raw channel (queue or topic), but you can also decouple your application logic from physical queue names using **Operations**. The **Message Registry** is responsible for keeping track of these operations. It is accessible via the `app.messages` property. ## Registering Operations An operation is essentially a logical name for an action that targets a specific channel. You can register an operation like this: ``` from repid import Repid app = Repid() app.messages.register_operation( operation_id="send_welcome_email", channel="email_queue", title="Send Welcome Email", description="Publishes a message to send a welcome email to a newly registered user." ) ``` The `register_operation` method accepts the following arguments: - `operation_id`: A unique identifier for the operation. - `channel`: The physical channel (or `Channel` object) the message will be routed to. - *AsyncAPI metadata*: `title`, `summary`, `description`, `messages` (schemas), `security`, `tags`, `external_docs`, and `bindings`. ## Using Operations Once an operation is registered, you can use its `operation_id` instead of the raw `channel` when sending messages: ``` await app.send_message_json( operation_id="send_welcome_email", payload={"user_id": 123}, headers={"topic": "send_email_actor"} ) ``` When you do this, Repid looks up the operation in the Message Registry and routes the message to the underlying `channel` (in this case, `"email_queue"`). Note You must provide either an `operation_id` or a `channel` to `send_message_json` or `send_message`, but never both. ## Retrieving Operations You can retrieve a registered operation to inspect its details: ``` operation = app.messages.get_operation("send_welcome_email") print(operation.channel.address) # "email_queue" print(operation.title) # "Send Welcome Email" ``` ## Integration with AsyncAPI The primary benefit of registering operations with metadata (like `title` and `description`) is that it heavily enriches your auto-generated **AsyncAPI 3.0** documentation. Repid uses the Message Registry to build the `operations` section of the AsyncAPI schema. # Messages Sending messages to a queue is the entry point for triggering background tasks in Repid. ## Sending raw data To send a message, you call `send_message` on your initialized `Repid` application. The payload for `send_message` must be raw `bytes`. ``` import asyncio from repid import Repid, InMemoryServer app = Repid() app.servers.register_server("default", InMemoryServer(), is_default=True) async def main(): # A connection must be active to publish! async with app.servers.default.connection(): await app.send_message( channel="my_channel", payload=b"Hello, world!", content_type="text/plain", headers={"topic": "my_actor"} # Routing topic ) if __name__ == "__main__": asyncio.run(main()) ``` ## Sending JSON Since JSON is the standard for most applications, Repid provides a helper method `send_message_json`. It takes any Python dictionary, list, or Pydantic model, automatically serializes it to bytes, and sets the `content-type` to `application/json`. ``` await app.send_message_json( channel="email_queue", payload={ "to": "user@example.com", "subject": "Welcome!", }, headers={"topic": "send_welcome_email"} ) ``` ## Routing: Channels vs Operation IDs When you send a message, you must specify **where** it goes so the correct worker can pick it up. Repid supports two ways to route messages, directly mirroring the AsyncAPI specification: ### 1. Channels (Basic Routing) The simplest way to route a message is by specifying the raw queue or topic name as the `channel`. By default, Repid uses a topic-based routing strategy on the worker side. This means that even if a worker is listening to `email_queue`, you usually need to supply a `"topic"` header matching the name of the actor function you want to execute (e.g. `headers={"topic": "send_welcome_email"}`). ``` await app.send_message_json( channel="email_queue", payload={"msg": "hi"}, headers={"topic": "send_welcome_email"} ) ``` ### 2. Operation IDs (Advanced Routing) In a complex system, you might have multiple distinct actions going over the same channel, or you might want to decouple the logical "action" from the physical queue infrastructure. You can register an `operation_id` to a specific channel in Repid's message registry. ``` # During setup, tell Repid that the "send_welcome_email" operation # happens on "email_queue" app.messages.register_operation( operation_id="send_welcome_email", channel="email_queue" ) # Later, send by operation_id instead of channel! await app.send_message_json( operation_id="send_welcome_email", payload={"user_id": 123}, headers={"topic": "send_welcome_email"} ) ``` If you specify `operation_id`, Repid looks it up in the registry to determine the underlying channel. As an added benefit, registering operations allows Repid to automatically include them in the AsyncAPI schema. Note You must provide either a `channel` or an `operation_id`, but never both. # Raw Message & Eager response When writing actors, sometimes you need to access the raw message data directly or interact with the message broker prematurely (before the actor finishes executing). Repid provides the `Message` dependency for exactly this purpose. ## The `Message` object You can access the current message being processed by injecting the `Message` object into your actor. Under the hood, this uses the `MessageDependency` alias in Repid's dependency injection system. ``` from typing import Annotated from repid import Router from repid import Message router = Router() @router.actor async def my_actor(message: Message) -> None: print(message.payload) # Raw bytes payload print(message.headers) # Dictionary of headers print(message.channel) # The channel this message was received from print(message.message_id) # The unique ID of the message (if supported by broker) ``` ## Eager responses By default, Repid automatically acknowledges (`ack`) a message if your actor returns successfully, and negatively acknowledges (`nack`) or rejects it if an exception is raised. However, sometimes you might want to immediately act on the message. Using the `Message` dependency, you can manually trigger these actions inside your actor. ``` from repid import Router, Message router = Router() @router.actor async def my_actor(user_id: int, message: Message) -> None: if user_id < 0: # Invalid user_id. Nack the message immediately. await message.nack() return # Acknowledge early - fire-and-forget pattern await message.ack() # Now we can do some long-running processing... await do_heavy_lifting(user_id) ``` ### Available Actions The `Message` object provides the following actions: - `await message.ack()`: Acknowledge the message (successful processing). - `await message.nack()`: Negatively acknowledge the message (e.g. temporary failure, usually retry or DLQ depending on the broker). - `await message.reject()`: Reject the message (wasn't accepted for processing, put back in the original queue). - `await message.reply(payload=b"...")`: Atomically (if supported by the server) acknowledge the message and send a reply message. - `await message.reply_json(payload={"status": "ok"})`: Atomically acknowledge and reply with JSON data. If atomic reply is not suppored by the broker, it's usually the same as calling `.ack()` and `.send_message()`. You can also check if a message has already been acted upon using the `.is_acted_on` property. Note Message can only be acted on once. Any later actions are discarded. ## Sending New Messages You can also use the `Message` object to publish entirely new messages while processing the current one. This is very useful for chaining tasks or event-driven architectures. ``` from repid import Router, Message router = Router() @router.actor async def process_order(order_id: int, message: Message) -> None: # Process the order here... # Send an event to another channel await message.send_message_json( channel="notifications", payload={"event": "order_processed", "order_id": order_id} ) ``` # Server Registry Repid allows you to connect to multiple message brokers simultaneously. To manage these connections, Repid uses a **Server Registry**. The Server Registry is accessible via the `app.servers` property. ## Registering Servers When you initialize your Repid application, you typically register at least one server. ``` from repid import Repid, AmqpServer, RedisServer app = Repid() # Registering a RabbitMQ server app.servers.register_server("rabbitmq", AmqpServer("amqp://localhost"), is_default=True) # Registering a Redis server at the same time app.servers.register_server("redis", RedisServer("redis://localhost")) ``` When registering a server, you must provide a unique name (e.g., `"rabbitmq"`) and the server instance itself. If `is_default=True` is provided (or if it is the first server registered), it will be used as the default server across the framework. ## Retrieving Servers You can access registered servers by their name: ``` # Returns the server registered as "rabbitmq" server = app.servers.get_server("rabbitmq") ``` You can also use dictionary-style access: ``` server = app.servers["rabbitmq"] ``` ## Default Server You can get the default server by accessing the `default` property: ``` default_server = app.servers.default ``` If you need to change the default server later, you can use the `set_default_server` method: ``` app.servers.set_default_server("redis") ``` ## Usage in Routing and Workers When sending messages or starting workers, if no server name is specified, Repid will automatically fallback to the default server in the registry. ``` # Will use the default server await app.send_message(channel="tasks", payload=b"...") # Will explicitly use the "redis" server await app.send_message( channel="tasks", payload=b"...", server_name="redis" ) ``` Likewise, when running a worker: ``` # Runs a worker consuming from the explicit "redis" server await app.run_worker(server_name="redis") ``` # Server Specific Parameters Repid is designed to be highly unopinionated and framework-agnostic. However, different message brokers (like RabbitMQ, Redis, or Google Cloud Pub/Sub) often have unique features that don't map cleanly to a universal abstraction. For these situations, Repid provides an "escape hatch" known as `server_specific_parameters`. ## Publishing with Parameters When you use `send_message`, `send_message_json`, or the eagerly-returned `message.reply()` and `message.send_message()`, you can pass a `server_specific_parameters` dictionary. ``` await app.send_message_json( channel="tasks", payload={"task": "process_video"}, headers={"topic": "video_worker"}, server_specific_parameters={ # Your broker-specific flags go here } ) ``` The keys and values inside this dictionary are passed directly to the underlying connection implementation (`ServerT.publish`). If a parameter is not recognized by the active server, it is usually safely ignored, though behavior depends on the specific connection library. ## Supported Parameters by Broker Here is a list of parameters officially supported by Repid's built-in servers: ### RabbitMQ (AMQP) When using the `AmqpServer`, you can customize how the internal AMQP 1.0 protocol publishes the message by providing instances of AMQP `AmqpMessageHeader` or `AmqpMessageProperties` objects. ``` from repid import AmqpServer from repid.connections.amqp import AmqpMessageHeader, AmqpMessageProperties await app.send_message_json( channel="tasks", payload={"msg": "hello"}, server_specific_parameters={ "header": AmqpMessageHeader( durable=True, priority=10, ttl=5000, # Time to live in ms delivery_count=0 ), "properties": AmqpMessageProperties( subject="task_subject", reply_to="reply_queue", group_id="group_1", absolute_expiry_time=1700000000 ), "routing_key": "my.custom.routing.key" } ) ``` The supported keys include: - `header` (`AmqpHeader`): Override the AMQP header. Includes properties like `durable`, `priority`, `ttl` (Time to live in ms), `first_acquirer`, and `delivery_count`. - `properties` (`AmqpProperties`): Override standard AMQP properties. Includes properties like `message_id`, `user_id`, `to`, `subject`, `reply_to`, `correlation_id`, `content_type`, `content_encoding`, `absolute_expiry_time`, `creation_time`, `group_id`, `group_sequence`, and `reply_to_group_id`. - `delivery_annotations` (`dict[str, Any]`): Add AMQP delivery annotations. - `message_annotations` (`dict[str, Any]`): Add AMQP message annotations. - `footers` (`dict[str, Any]`): Add AMQP footers. - `to` (`str`): Explicitly override the AMQP `to` address, bypassing Repid's default naming strategy. - `routing_key` (`str`): Specify a custom routing key to use in combination with Repid's exchange naming strategy. ### Redis Streams When using the `RedisServer`, the parameters directly translate to `XADD` command options. - `maxlen` (`int`): Maximum stream length. This forces the stream to drop older entries if it exceeds this length. - `approximate` (`bool`): Used in conjunction with `maxlen`. If `True` (default), uses `~` for `MAXLEN` which is much more performant. - `nomkstream` (`bool`): If `True`, the message will not be added and an error will be thrown if the stream doesn't already exist. - `stream_id` (`str`): Custom stream entry ID. Defaults to `"*"` (auto-generate). ### Google Cloud Pub/Sub When using the `PubsubServer`, you can customize the gRPC `PublishRequest`. - `topic` (`str`): Override the target topic name (bypasses default prefixing). - `project` (`str`): Override the Google Cloud project ID for this specific publish. - `timeout` (`int | float | timedelta`): Override the timeout for the gRPC publish call. - `ordering_key` (`str`): A string identifying related messages for which publish order should be respected. - `attributes` (`dict[str, str]`): Extra metadata attributes to attach directly to the Pub/Sub message envelope. ### In-Memory When using the `InMemoryServer` for testing or local development: - `message_id` (`str`): Explicitly set the unique ID of the dummy message in the queue. If omitted, a random UUID is generated. # Actors Actors are the functions that actually do the work. When a worker pulls a message from the broker, it looks for a matching actor and executes it. ## Defining Actors with Routers Repid routes queue messages to functions using a `Router`. By attaching actors to a router, you can organize your tasks logically and keep your application modular. To declare an actor, create a `Router` and decorate an async function with `@router.actor()`. ``` from repid import Router router = Router() @router.actor(channel="user_tasks") async def send_welcome_email() -> None: print("Sending welcome email...") ``` Unit Testing The `@router.actor` decorator does not mutate your function. You can still import and call `send_welcome_email()` directly in your unit tests without needing a running Repid application. # Confirmation Modes & Error Handling When processing background tasks, exceptions and failures are inevitable. Repid gives you granular control over how errors are handled and how messages are acknowledged back to the message broker. ## Confirmation Mode and Errors Repid allows you to control exactly when a message is acknowledged (`ack`) back to the broker using the `confirmation_mode` setting on the decorator. The default mode is `"auto"`. - `"auto"`: Acknowledges the message if the actor completes successfully. If an exception is raised, it handles it based on the `on_error` policy. - `"always_ack"`: Always acknowledges the message, even if the actor raises an exception. - `"ack_first"`: Acknowledges the message *before* the actor even runs (Fire-and-forget). - `"manual"`: Never automatically acknowledges. You must inject the `Message` dependency and manually `await message.ack()`. - `"manual_explicit"`: Same as `"manual"`, but enforces explicitly returning an acknowledgment status. When using `"auto"`, the `on_error` policy defaults to `"nack"`. You can change this to `"reject"` to simply return the message to the queue, or `"ack"` to acknowledge it even on failure: ``` @router.actor( channel="tasks", confirmation_mode="auto", on_error="reject" # Can be "nack", "reject", or "ack" ) async def my_actor(): pass ``` Alternatively, you can provide a callable that inspects the exception dynamically to decide the outcome: ``` def handle_error(exc: Exception) -> Literal["reject", "nack", "ack"]: # Reject ValueError so they are re-queued, Nack everything else if isinstance(exc, ValueError): return "reject" return "nack" @router.actor(channel="tasks", on_error=handle_error) async def my_smart_actor(): pass ``` Infinite retries Ensure your message broker is configured with delivery limits (max retries) to prevent infinite reprocessing of failed messages. ### `manual_explicit` confirmation mode The `"manual_explicit"` mode works exactly like `"manual"`, but ensures that you explicitly return one of the literals `"ack"`, `"nack"`, `"reject"`, or `"no_action"`. The runner will act on the returned value immediately after execution. This helps to prevent accidentally leaving a message without a confirmation. ``` from typing import Literal @router.actor(channel="tasks", confirmation_mode="manual_explicit") async def my_explicit_actor() -> Literal["ack", "nack", "reject", "no_action"]: try: # Do some work return "ack" except Exception: # You must explicitly return a confirmation action return "nack" ``` ### `always_ack` confirmation mode The `"always_ack"` mode is essentially the same as setting `confirmation_mode="auto"` combined with `on_error="ack"`. Note that when using `"always_ack"` or `"ack_first"` modes, you cannot provide an `on_error` parameter to the decorator, as the behavior is already strictly defined by the mode itself. ### Manual modes and validation errors If you are using `confirmation_mode="manual"` or `confirmation_mode="manual_explicit"`, you must handle your message's acknowledgment. However, if a message fails to parse (e.g. `pydantic.ValidationError` when reading arguments), your actor code will never be executed. In `manual` and `manual_explicit` modes, the easiest way to avoid leaving the message unacknowledged is to use `on_error` (alternatively, you can catch the exception and act on the message in a middleware). By default, in `manual` and `manual_explicit` modes `on_error` is set to `"no_action"`. You can override this to acknowledge or reject the message directly: ``` @router.actor( confirmation_mode="manual_explicit", on_error="nack" ) async def my_actor(payload: MyPydanticModel) -> Literal["ack", "nack", "reject", "no_action"]: # the actor body won't run if MyPydanticModel fails validation return "ack" ``` ## Retries Repid **does not provide built-in retry policies or delay mechanics out of the box**. Because retry semantics (e.g., dead-lettering, exponential backoff, delaying messages) are highly dependent on the underlying message broker and the specific needs of the user, Repid delegates this responsibility. If an actor raises an exception, the worker catches it and instructs the broker based on the actor's `on_error` configuration. If you need to implement retries, you have two main options: 1. **Broker-Level Retries:** Configure your message broker with a Dead Letter Exchange (DLX) to automatically re-route failed messages. You can either leave messages in the DLX for manual inspection and processing, or configure message TTLs to automatically re-queue them after a delay for retry attempts. 1. **Application-Level Retries:** Wrap your actor function with error handling logic (e.g., using a library like `tenacity`, or writing a custom decorator) that catches exceptions, sleeps, and re-attempts the logic internally before ever yielding an error back to Repid. Note that from the broker's perspective, processing of a single message takes much longer, which can lead to worse concurrency. ## Poison Message Handling A "poison message" is a message that consistently fails to process, potentially causing an infinite loop of errors and retries that clogs your queue. How Repid handles poison messages depends on *where* the failure occurs: during routing, or during execution. ### Unrouted Messages If a message is pulled from a channel but Repid cannot find a matching actor to route it to, Repid will initially `reject` (requeue) the message. However, to prevent infinite loops, Repid maintains an internal counter of how many times it has seen the same unrouted message. If an unrouted message is encountered **10 times** (by default), Repid classifies it as a poison message. It logs an `actor.route.poison_message` error and will `nack` the message instead of requeuing it, allowing the broker to drop it or move it to a Dead Letter Queue (DLQ). ### Actor Execution Failures Because Repid delegates execution retry logic to the underlying message broker, handling poison messages that crash *inside* your actor code largely depends on your infrastructure: 1. **Broker-Level Delivery Limits (Recommended):** Most mature brokers support configuring maximum delivery attempts. Once a message exceeds this limit, the broker automatically routes it to a Dead Letter Queue (DLQ) or drops it. This prevents the message from being infinitely re-queued and keeps your application logic clean. 1. **Application-Level Handling:** You can also handle poison messages explicitly within your actor. By catching exceptions and explicitly acknowledging (`ack`) the failing message—perhaps after logging it or saving the payload to a separate database table—you remove it from the queue and stop the retry loop. # Execution & Timouts Repid gives you several options for configuring how an actor executes under the hood, depending on whether it is CPU-bound, I/O-bound, or prone to hanging. ## Timeouts By default, an actor can run indefinitely. If a task hangs (e.g. waiting for a database lock or an external API without a timeout), it will occupy a slot in the worker forever. You can set a `timeout` in seconds to ensure the actor is killed if it takes too long. If the timeout is reached, Repid will cancel the task and reject the message (returning it to the queue). ``` import asyncio @router.actor(channel="api_tasks", timeout=10.0) async def fetch_slow_api(): # If this takes more than 10 seconds, it will be cancelled await asyncio.sleep(20) ``` ## Thread vs Process Execution If your actor function is purely synchronous (e.g., standard `def` instead of `async def`), Repid will automatically run it in a thread pool to avoid blocking the main async event loop. However, if your synchronous function is heavily CPU-bound (like image processing or large math computations), threads will bottleneck due to Python's Global Interpreter Lock (GIL). You can tell Repid to run the actor in a completely separate process using `run_in_process=True`. ``` import math @router.actor(channel="heavy_math", run_in_process=True) def calculate_prime(number: int) -> bool: # This CPU-heavy computation runs in an isolated process! if number <= 1: return False for i in range(2, int(math.sqrt(number)) + 1): if number % i == 0: return False return True ``` Process Limitations When using `run_in_process=True`, your actor function and all of its arguments must be fully `pickle`-able. Dependencies cannot be injected if they maintain active sockets or unpicklable state. ## Custom Executors When you run synchronous functions in threads or processes, Repid creates default `ThreadPoolExecutor` or `ProcessPoolExecutor` pools for you. If you want finer control (like limiting the number of worker threads specifically for one actor, or reusing an existing executor), you can pass a custom `pool_executor`: ``` from concurrent.futures import ThreadPoolExecutor # Create a dedicated thread pool for database tasks, limiting concurrent connections db_pool = ThreadPoolExecutor(max_workers=5) @router.actor(channel="db_tasks", pool_executor=db_pool) def run_heavy_query(): # Only up to 5 of these will run at the same time across the entire worker pass ``` Note You cannot specify both `pool_executor` and `run_in_process`, as `run_in_process` implies using Repid's process pool executor. # Payload & Headers Parsing When a worker pulls a message from the broker, it receives a raw byte payload and a dictionary of string headers. Repid automatically parses this data and maps it to your actor function's arguments. How Repid performs this parsing depends on whether you have **Pydantic** installed in your environment. ## Parsing without Pydantic If you run Repid in a vanilla Python environment without Pydantic, Repid uses a lightweight `BasicConverter` under the hood. 1. **Payload Decoding**: Repid uses the configured `default_serializer` (which defaults to standard `json`) to decode the incoming byte payload into a Python dictionary. 1. **Argument Matching**: Repid inspects your actor function's signature. If the keys in the decoded JSON dictionary match the names of your function arguments, the values are passed in directly. ``` # Assuming payload: {"user_id": 123, "is_active": true} @router.actor async def process_user(user_id, is_active): # user_id will be an int (123) # is_active will be a bool (True) pass ``` No Type Validation Without Pydantic, Repid **does not validate or coerce types**. If the sender passes `{"user_id": "123"}` (a string), your function will receive a string, even if you typed it as `user_id: int`. ## Parsing with Pydantic If you install Pydantic (`pip install repid[pydantic]` or just `pip install pydantic>=2.0.0`), Repid automatically upgrades its internal parsing engine to use the `PydanticConverter`. This enables strict type validation, automatic type coercion, and AsyncAPI schema generation out of the box! ``` # Assuming payload: {"user_id": "123", "is_active": 1} @router.actor async def process_user(user_id: int, is_active: bool): # user_id is automatically coerced to an int (123) # is_active is automatically coerced to a bool (True) pass ``` Because Repid relies directly on Pydantic's powerful validation engine, you can use all of Pydantic's advanced typing features directly in your actor's function signature: ``` from typing import Annotated from pydantic import BaseModel, Field from annotated_types import Gt import uuid class UserAddress(BaseModel): city: str country: str @router.actor async def process_order( # Use annotated-types for simple constraints quantity: Annotated[int, Gt(0)], # Use Pydantic's Field for complex defaults (like UUIDs or timestamps) order_id: Annotated[str, Field(default_factory=lambda: uuid.uuid4().hex)], # Use nested Pydantic models for complex nested payload data shipping_address: UserAddress, ): # Repid guarantees that if the actor executes, `quantity` > 0, # `order_id` is auto-generated if missing, # and `shipping_address` is strictly formatted! pass ``` ### The `FullPayload()` Annotation Sometimes you don't want your JSON payload's keys scattered as individual arguments. If you want to accept an entire Pydantic model representing the exact root JSON payload, you can use the `FullPayload` annotation: ``` from typing import Annotated from pydantic import BaseModel from repid import FullPayload class UserPayload(BaseModel): user_id: int is_active: bool @router.actor async def process_user(user: Annotated[UserPayload, FullPayload()]): # `user` contains the fully validated payload model print(user.user_id) ``` Tip `FullPayload()` is only available when Pydantic is installed. ## Extracting Headers Headers are metadata attached to a message (like `topic`, `correlation_id`, or custom tracking tags). You can extract specific headers directly into your actor arguments using the `Header` dependency injection. ``` from typing import Annotated from repid import Header @router.actor async def my_actor( payload_data: str, correlation_id: Annotated[str, Header(alias="correlation-id")], custom_trace: Annotated[str | None, Header()] = None ): print(f"Tracking: {correlation_id} / {custom_trace}") ``` If Pydantic is installed, headers extracted this way are also strongly validated and coerced! If a header is required (no default value) but is missing from the message, Pydantic will raise a validation error. ## Explicitly Specifying the Converter By default, Repid automatically selects the converter based on whether Pydantic is available in your environment (i.e. `DefaultConverter`). However, you can explicitly specify which converter to use when creating your router or actor. To use the lightweight `BasicConverter` even when Pydantic is installed: ``` from repid import Router, BasicConverter router = Router(converter=BasicConverter) # (1) @router.actor async def my_actor(user_id: int, is_active: bool): # Uses BasicConverter - no type validation or coercion pass ``` 1. First option - override via Router To explicitly use the `PydanticConverter`: ``` from repid import Router, PydanticConverter router = Router() @router.actor(converter=PydanticConverter) # (1) async def my_actor(user_id: int, is_active: bool): # Uses PydanticConverter - full type validation and coercion pass ``` 1. Second option - override direcly on the actor This is useful when you want to ensure consistent behavior across different environments or when you have specific performance or validation requirements. Tip You can also implement custom converters to define your own parsing logic. This allows you to integrate alternative validation frameworks, add custom serialization support, or optimize for specific use cases in your application. # Connecting & Nesting Routers By themselves, routers do nothing. You must include them in your main Repid application instance so the worker knows they exist and can start listening for their tasks. ``` from repid import Repid from myapp.routers import router app = Repid() # Register all actors defined on the router app.include_router(router) ``` ## Default Configurations When you create a `Router`, you can pass default configurations. Any actor attached to this router will inherit these defaults unless they explicitly override them. ``` from repid import Router # All actors on this router will default to the "background_jobs" channel # and have a timeout of 10 seconds. router = Router(channel="background_jobs", timeout=10.0) @router.actor() # Inherits "background_jobs" and 10s timeout async def task_a(): pass @router.actor(timeout=30.0) # Overrides timeout, but inherits "background_jobs" async def task_b(): pass ``` ## Nesting Routers Just like how you can attach routers to your `Repid` app, you can also attach routers *to other routers* using `router.include_router()`. This allows you to build deeply nested, highly modular applications. When you include a child router into a parent router, **defaults propagate downwards**. If the child router doesn't specify a configuration (like `channel` or `timeout`), it will inherit the parent's configuration! ``` parent_router = Router(channel="main_queue", timeout=60.0) child_router = Router() @child_router.actor() async def my_nested_actor(): pass # The child_router will now inherit `channel="main_queue"` and `timeout=60.0` parent_router.include_router(child_router) # Finally, connect the parent to the app app.include_router(parent_router) ``` Middlewares Concatenation When dealing with middlewares during nesting, they are **concatenated**, not overwritten. The parent router's middlewares will run *before* the child router's middlewares! # Routing Strategies (Topics) By default, Repid uses a **Topic-based routing strategy**. This means a worker reading from a channel (like `user_tasks`) won't just execute *any* actor. It looks for an actor whose name matches the `topic` header of the message. If your actor function is named `send_welcome_email`, the message must be sent with `headers={"topic": "send_welcome_email"}`. ## Modifying the Topic Name If you want the actor to listen to a specific topic that differs from its function name, you can set the `name` parameter in the decorator: ``` @router.actor(channel="user_tasks", name="email_worker") async def send_welcome_email() -> None: pass ``` Now, messages sent to the `user_tasks` channel must have `headers={"topic": "email_worker"}` for this actor to process them. ## Catch-all Routing Strategy If you only have one actor per channel and want it to process *everything* on that channel regardless of the topic header, you can switch to the **Catch-all routing strategy**: ``` from repid import catch_all_routing_strategy @router.actor( channel="user_tasks", routing_strategy=catch_all_routing_strategy, ) async def process_everything_on_channel() -> None: # This will process any message arriving on the "user_tasks" channel pass ``` Tip Catch-all routing is especially useful when integrating Repid with external systems or legacy queues where you cannot enforce specific metadata headers on the incoming messages. ## Custom Routing Strategy If your messaging logic relies on headers other than `topic` to determine routing, you can easily define your own custom routing strategy! A routing strategy is simply a factory function that takes the `actor_name` and returns a callable. That callable takes an incoming `BaseMessageT` and returns a `bool` indicating whether this actor should process the message. ``` from repid import BaseMessageT def my_custom_routing_strategy(*, actor_name: str, **kwargs) -> callable: def strategy(message: BaseMessageT) -> bool: # Example: route based on a custom "action" header if message.headers is None: return False return message.headers.get("action") == actor_name return strategy @router.actor( channel="user_tasks", name="send_email_action", routing_strategy=my_custom_routing_strategy ) async def process_something() -> None: pass ``` ## Routing Priority & Fallbacks When a worker pulls a message from a channel, it evaluates the routing strategies of all actors listening on that channel. - **Multiple Matches**: If multiple actors match a single message, the message is **always routed to the first matching actor** (based on the order they were registered to the router). A common example is defining 2 actors on the same channel where one actor uses `catch_all_routing_strategy`: if that catch-all actor is registered first, it will consume messages before a more specific topic-based actor can match them. - **No Matches**: If no actor matches the message, the worker will reject it (returning it to the queue). If the same unrouted message is pulled repeatedly, Repid will eventually classify it as a "poison message" and negatively acknowledge (`Nack`) it, to prevent it from permanently blocking the queue. # Workers Workers are the long-running loops that continuously pull messages from the broker and pass them to your actors for processing. ## Running a Worker To start processing messages, you call `run_worker()` on your Repid application instance. Keep in mind that it blocks for the whole time of the worker execution, like a `while True`. worker.py ``` import asyncio from repid import Repid, Router, AmqpServer app = Repid() app.servers.register_server("default", AmqpServer("amqp://localhost"), is_default=True) router = Router() @router.actor(channel="user_events") async def process_user_event(event_type: str): print(f"Event: {event_type}") app.include_router(router) async def main(): async with app.servers.default.connection(): await app.run_worker() if __name__ == "__main__": asyncio.run(main()) ``` Deployment You usually run your worker loop as an independent process/instance, separately from your main web server. ## Messages Limit In distributed orchestrated systems (like Kubernetes or Docker Swarm), it is a common practice to periodically restart worker processes to prevent long-term memory leaks or zombie connections. You can configure the worker to gracefully exit after processing a certain number of messages (by default, it runs indefinitely): ``` # Worker will exit successfully after processing exactly 1000 messages await app.run_worker(messages_limit=1000) ``` Once the limit is reached, the worker triggers its internal shutdown sequence and returns control back to your script, allowing the process to restart naturally. # Built-in Servers The `run_worker` function is the entrypoint for spinning up Repid's built-in sub-servers. These servers run as background tasks alongside the main consumer loop, sharing its lifecycle. Internal Infrastructure Only These built-in servers are extremely lightweight and simple by design. They do not possess the security, performance, or robust routing features of a dedicated web framework like FastAPI or Litestar. **As a best practice, avoid exposing these built- in servers directly to the public internet.** They should be used strictly for internal infrastructure (like Kubernetes health probes) or local development. ## AsyncAPI Server If you want to serve an interactive documentation UI alongside your worker on a specific port, pass an `asyncapi_server=AsyncAPIServerSettings(...)` object. [Read more here.](https://repid.aleksul.space/integrations/asyncapi/index.md) ## Health Check Server In containerized environments like Kubernetes, the orchestrator needs a way to know if your worker is alive, responsive, and hasn't silently deadlocked. Repid comes with a built-in lightweight HTTP server designed specifically for liveness/readiness probes. To enable it, pass a `HealthCheckServerSettings` configuration to `run_worker`. ``` from repid import Repid, HealthCheckServerSettings app = Repid() ... # broker configuration is omitted await app.run_worker( health_check_server=HealthCheckServerSettings( address="0.0.0.0", port=8080, endpoint_name="/healthz", ) ) ``` With this configuration, your worker will spin up a small HTTP server in the background. Kubernetes can then hit `http://:8080/healthz`. As long as the Repid worker loop is successfully iterating and the server connection is alive, the endpoint will return an HTTP 200 OK. # Concurrency limit A Repid worker runs inside a single `asyncio` event loop. It can pull multiple messages from the broker and execute them concurrently. You can configure `tasks_limit` when starting the worker: ``` # Cap the worker at processing up to 2000 messages concurrently await app.run_worker(tasks_limit=2000) ``` By default, Repid allows up to **1000 tasks** inside a single worker. Concurrency vs Bottlenecks The `tasks_limit` is not primarily meant as a strict business concurrency restriction, but rather a way to avoid overloading the Python process. Because `asyncio` runs entirely on a single CPU core, scheduling too many concurrent tasks will cause the event loop to start throttling. This typically happens in the range of ~1000 to 4000 concurrent tasks depending on your hardware. You are highly encouraged to benchmark your specific workload and adjust this limit to find your process's optimal sweet spot. Scaling Out If you need to process more messages than a single Python event loop can handle without throttling, you should scale vertically (run multiple workers using multiprocessing) and/or horizontally (by running multiple instances of the application, e.g. docker containers or kubernetes pods) rather than endlessly increasing `tasks_limit`. # Worker Lifecycle & Production When moving your Repid applications from local development into a production environment (like Docker or Kubernetes), there are a few major considerations for workers, primarily how they handle OS signals and shutdown sequences. ## The Worker Loop A Repid worker runs as a standalone, blocking event loop. It connects to your message broker and waits for new tasks (either by listening to a push-based subscription or by polling, depending on the specific broker). Because Repid natively manages its own `asyncio` execution lifecycle, it does not require an external ASGI runner (like `uvicorn` or `gunicorn`) to operate. However, the best practice is to run this worker as a completely independent background process, separate from your main web application. worker.py ``` import asyncio from app import app # your Repid app instance async def main(): async with app.servers.default.connection(): # Starts the worker event loop and blocks until shutdown signal await app.run_worker() if __name__ == "__main__": asyncio.run(main()) ``` ## Graceful Shutdowns When you stop a worker (e.g., during a deployment rollout), the OS sends a termination signal (`SIGTERM` or `SIGINT`) to the process. If the process dies immediately, any messages that actors were currently executing might be lost or stuck in an ambiguous state. Repid handles these signals automatically to ensure a **graceful shutdown**. This prevents data loss and allows messages to be safely returned to the broker if they cannot finish in time. ### The Shutdown Sequence Here is exactly what happens when a Repid worker receives a termination signal: ``` sequenceDiagram participant OS as Operating System participant Worker as Repid Worker participant Broker as Message Broker participant Actors as Running Actors OS->>Worker: Sends SIGTERM / SIGINT Note over Worker: 1. Stop consuming Worker->>Broker: Pause subscription (Stop receiving new messages) Note over Worker,Actors: 2. Wait up to `graceful_shutdown_time` par Some actors finish in time Actors-->>Worker: Execution finishes Worker->>Broker: Ack / Nack message and Some actors are still running Worker->>Worker: `graceful_shutdown_time` expires! Note over Worker: 3. Cancel remaining tasks Worker->>Actors: Raise `asyncio.CancelledError` Note over Actors: Wait `cancellation_timeout` (1s) for cleanup Actors-->>Worker: Task cancelled Worker->>Broker: Reject message (Return to queue) end Note over Worker: 4. Final Cleanup Worker->>Broker: Close connection Worker-->>OS: Process Exits ``` 1. **Pause Subscription**: The worker immediately stops accepting *new* messages from the broker. 1. **Grace Period**: It enters a waiting phase (determined by `graceful_shutdown_time`), allowing currently executing actors to finish their processing naturally. 1. **Cancellation (if necessary)**: If the timeout is reached and actors are still running, Repid triggers an `asyncio.CancelledError` inside those tasks. They are given a brief moment (`cancellation_timeout`, defaulting to 1s) to run any `finally` blocks, while the worker simultaneously `Reject`s their messages, returning them to the queue so another worker can pick them up. 1. **Disconnect**: Finally, the worker safely closes its connection to the broker and shuts down the process. ### Configuring the Shutdown Timeout By default, Repid gives actors **25 seconds** to finish after a shutdown signal is received. Note The default is 25 seconds because orchestrators like Kubernetes usually wait 30 seconds between sending `SIGTERM` and `SIGKILL`. The 5-second buffer ensures Repid can properly cancel tasks and Reject messages before the hard kill. If your actors take longer to run, you can increase this timeout—just remember to also increase your orchestrator's termination grace period! ``` await app.run_worker( graceful_shutdown_time=120.0 # Wait up to 2 minutes ) ``` By ensuring that messages are always properly Acked, Nacked, or Rejected during a deployment rollout, Repid guarantees that your task queue remains consistent and reliable. ### Custom Shutdown Signals By default, Repid listens for `SIGTERM` and `SIGINT` signals to trigger graceful shutdown. You can customize which signals the worker responds to or disable it entirely. To specify custom signals: ``` import signal await app.run_worker( register_signals=[signal.SIGUSR1] ) ``` To disable Repid listening for signals: ``` await app.run_worker( register_signals=[] # No signals will trigger graceful shutdown ) ``` Warning Disabling signal registration prevents graceful shutdown, risking message loss or inconsistent state during termination. Only disable this when you have alternative mechanisms to ensure message safety. # Ecosystem & Integrations # AsyncAPI Schema & Server Repid has native support for generating AsyncAPI 3.0 documentation from your actors and routers, and can automatically serve it via an embedded lightweight HTTP server. ## Defining the Application Metadata When you instantiate your `Repid` application, you can specify metadata about your background task queue: ``` from repid import Repid, Contact, License app = Repid( title="My Repid Task Queue", version="1.0.0", description="Background task queue processing orders.", contact=Contact(name="Support", url="https://example.com", email="support@example.com"), license=License(name="MIT", url="https://opensource.org/licenses/MIT"), ) ``` ## Generating the Schema To output the raw AsyncAPI dictionary (which can be serialized and saved as JSON/YAML): ``` schema = app.generate_asyncapi_schema() ``` ## Generating the HTML Documentation If you want to render the AsyncAPI schema as a React UI (for instance, to host it on a custom endpoint via your own web server), you can use the `app.asyncapi_html()` method. It allows you to customize the rendered components. ``` html_content = app.asyncapi_html( sidebar=True, info=True, servers=True, operations=True, messages=True, schemas=True, errors=True, expand_message_examples=True, ) ``` ## Running the Embedded Server Repid can serve the interactive AsyncAPI React UI alongside your workers natively. To do this, configure the `AsyncAPIServerSettings` when calling `run_worker`: ``` import asyncio from repid import Repid, AsyncAPIServerSettings, AmqpServer app = Repid() # Register your broker to show up as a server in the generated docs broker = AmqpServer("amqp://localhost") app.servers.register_server("rabbitmq_default", broker, is_default=True) async def main(): async with app.servers.default.connection(): await app.run_worker( asyncapi_server=AsyncAPIServerSettings( address="0.0.0.0", port=8081, endpoint_name="/" ) ) asyncio.run(main()) ``` Once started, navigate to `http://localhost:8081/` in your browser to view your AsyncAPI documentation in real-time. This server runs automatically inside a dedicated asyncio task and shares the lifecycle of the worker. # Using Pydantic & FastAPI Repid offers seamless integration with both Pydantic (for data validation) and FastAPI (for web serving). ## Validating with Pydantic Repid provides Pydantic validation out-of-the-box. When you define an actor with type hints, Repid will automatically validate incoming JSON payloads against those types if Pydantic is installed. ### Prerequisite To ensure correct installation of a supported version, you can run: ``` pip install repid[pydantic] ``` Currently, official support targets Pydantic v2 (though v1 may also work). ### Submitting Pydantic Models as Payloads Repid's default JSON serializer natively supports Pydantic models. You can specify them directly when sending messages. ``` from repid import Repid from pydantic import BaseModel app = Repid() class MyPydanticModel(BaseModel): user_id: int actions: list[str] # Inside an async function: await app.send_message_json( channel="some_channel", payload=MyPydanticModel( user_id=123, actions=["First action"], ), ) ``` You can also nest Pydantic models inside dictionaries or lists, and Repid will automatically extract them using `.model_dump(mode="json")`. ### Validating Actor Input As long as Pydantic is installed in your environment, Repid will automatically select the `PydanticConverter`. This means any type hints you use in your actor signature will be strictly validated, and you can use `pydantic.Field` for defaults and aliases. For detailed examples on defining actors with Pydantic validation and using the `FullPayload()` dependency annotation, see the [Actors guide](https://repid.aleksul.space/user_guide/actors/parsing/#parsing-with-pydantic). ## Integration with FastAPI Because Repid applications manage persistent server connections, you need to ensure the connection is opened when your web server starts, and closed when it shuts down. FastAPI's `lifespan` events are perfect for this. ``` from contextlib import asynccontextmanager from fastapi import FastAPI from repid import Repid, AmqpServer app = Repid() app.servers.register_server("default", AmqpServer("amqp://localhost"), is_default=True) @asynccontextmanager async def lifespan(fastapi_app: FastAPI): # Open the Repid connection on startup async with app.servers.default.connection(): yield # The connection automatically closes when the app shuts down fastapi_app = FastAPI(lifespan=lifespan) @fastapi_app.post("/create-job") async def create_repid_job(data: dict) -> dict: # We can safely send messages here because # the connection is kept alive by the lifespan await app.send_message_json( channel="my_background_task", payload=data ) return {"status": "ok"} ``` # Best Practices & Patterns # Testing Repid provides a built-in `TestClient` to make testing your actors and application logic simple and fast. You don't need any complex pytest plugins or real message brokers. ## Preparation In the following examples, we will assume you have created an application with the following structure: ``` . ├── myapp │ └── app.py └── tests └── test_app.py ``` We will use a simple actor defined on our `Repid` application instance: myapp/app.py ``` from repid import Repid, Router app = Repid(title="My App") router = Router() @router.actor(channel="user_messages") async def actor_with_args(user_id: int, user_name: str, user_messages: list[str]) -> list[str]: user_message = f"Hi {user_name}! Your id is: {user_id}." user_messages.append(user_message) return user_messages app.include_router(router) ``` ## Using the TestClient The `TestClient` allows you to send messages directly to your application without needing a running server. It intercepts the messages and can either automatically process them, or let you process them manually one-by-one. tests/test_app.py ``` import pytest from repid import TestClient from myapp.app import app async def test_actor_processing() -> None: # 1. Initialize the TestClient using your app as the parameter async with TestClient(app) as client: # 2. Send a message to the channel await client.send_message_json( channel="user_messages", payload=dict(user_id=123, user_name="Alex", user_messages=[]), headers={"topic": "actor_with_args"} ) # 3. By default (auto_process=True), the client instantly processes messages. # Let's verify the processed messages. processed = client.get_processed_messages() assert len(processed) == 1 assert processed[0].success is True assert processed[0].result == ["Hi Alex! Your id is: 123."] ``` ### Manual Message Processing If you want to control exactly when messages are processed (e.g. to test queue buildup or chaining), you can set `auto_process=False` on the test client. tests/test_app.py ``` import pytest from repid import TestClient from myapp.app import app async def test_manual_processing() -> None: async with TestClient(app, auto_process=False) as client: await client.send_message_json( channel="user_messages", payload=dict(user_id=123, user_name="Alex", user_messages=[]), headers={"topic": "actor_with_args"} ) # The message is in the queue, but hasn't been processed yet assert len(client.get_processed_messages()) == 0 # Process the next message in the queue msg = await client.process_next() assert msg is not None assert msg.success is True assert msg.result == ["Hi Alex! Your id is: 123."] ``` ### Inspecting Messages The `TestMessage` object returned by the `TestClient` tracks the full lifecycle of the message. You can assert against various properties to make sure your actors behave correctly: ``` msg = await client.process_next() assert msg.acked is True # Was it acknowledged? assert msg.nacked is False # Was it NACKed? assert msg.rejected is False # Was it rejected? assert msg.exception is None # Did it raise an exception? assert msg.result == ["..."] # The return value of the actor ``` You can retrieve all sent or processed messages using: - `client.get_sent_messages()` - `client.get_processed_messages()` Both functions optionally accept a `channel` or `operation_id` argument to filter the list of messages. ## Unit Testing Actors Remember that Repid doesn't modify your actor functions in any way. You can always write simple, isolated unit tests by importing your function and calling it directly: tests/test_app.py ``` from myapp.app import actor_with_args async def test_actor_with_args() -> None: expected = ["Hi Alex! Your id is: 123."] actual = await actor_with_args(user_id=123, user_name="Alex", user_messages=[]) assert actual == expected ``` # Chaining jobs Sometimes you want to break a complex task into multiple smaller steps, where the output of one task triggers the next. In Repid, you can easily chain tasks by using the `Message` dependency to send a new message directly from within your actor. Because you are using the same server connection, this is extremely fast. ## Example Let's imagine you are designing a user registration pipeline. The first job creates a user ID, and the second job sends a welcome email using that ID. ``` import asyncio from repid import Repid, Router, InMemoryServer, Message app = Repid() app.servers.register_server("default", InMemoryServer(), is_default=True) router = Router() @router.actor(channel="registration") async def create_user(username: str, message: Message) -> None: # 1. Pretend we save to the database and generate an ID user_id = 123 print(f"Created user {username} with ID {user_id}") # 2. Chain the next job! # We send a message to the email queue, passing along the generated user_id await message.send_message_json( channel="email", payload={"user_id": user_id, "username": username}, headers={"topic": "send_welcome_email"} ) # The current message is automatically acknowledged when this actor finishes successfully. @router.actor(channel="email") async def send_welcome_email(user_id: int, username: str) -> None: print(f"Sending welcome email to User #{user_id} ({username})!") app.include_router(router) async def main() -> None: async with app.servers.default.connection(): # Kick off the chain by sending the first message await app.send_message_json( channel="registration", payload={"username": "Alex"}, headers={"topic": "create_user"} ) # Process the queue. # It will first process the 'create_user' message, # which will then enqueue the 'send_welcome_email' message, # which will then be processed in the next iteration! await app.run_worker(messages_limit=2) if __name__ == "__main__": asyncio.run(main()) ``` When you run this script, the worker will process both jobs in sequence! ``` Created user Alex with ID 123 Sending welcome email to User #123 (Alex)! ``` ### The `reply_json` helper If you are designing a strict request-response or RPC-like architecture, you can use the `message.reply_json()` method. This will automatically publish a message to a reply queue (if supported and specified by your broker) while simultaneously acknowledging the current message. # Middlewares Middlewares in Repid allow you to hook into the lifecycle of messages. You can use middlewares to add logging, telemetry, tracking, or alter messages on the fly. There are two distinct types of middlewares depending on the part of the lifecycle you want to intercept: **Producer Middlewares** and **Actor Middlewares**. ## Actor Middleware An actor middleware intercepts the execution of an actor when it receives a message from the queue. This is useful for metrics, tracing, error handling, or performance monitoring. To create an actor middleware, you implement the `ActorMiddlewareT` protocol. ``` from typing import Any, Coroutine, Callable from repid import ReceivedMessageT, ActorData async def logging_middleware[T]( call_next: Callable[[ReceivedMessageT, ActorData], Coroutine[None, None, T]], message: ReceivedMessageT, actor: ActorData, ) -> T: print(f"Starting execution for channel: {message.channel}") try: # call_next invokes the next middleware, and eventually the actor itself result = await call_next(message, actor) print(f"Finished execution with result: {result}") return result except Exception as e: print(f"Actor raised an exception: {e}") raise e ``` ``` from typing import Any, Coroutine, Callable, TypeVar from repid import ReceivedMessageT, ActorData T = TypeVar("T") async def logging_middleware( call_next: Callable[[ReceivedMessageT, ActorData], Coroutine[None, None, T]], message: ReceivedMessageT, actor: ActorData, ) -> T: print(f"Starting execution for channel: {message.channel}") try: # call_next invokes the next middleware, and eventually the actor itself result = await call_next(message, actor) print(f"Finished execution with result: {result}") return result except Exception as e: print(f"Actor raised an exception: {e}") raise e ``` ## Producer Middleware A producer middleware intercepts messages being sent to the message broker. This is perfect for appending distributed tracing IDs, adding default headers, or validating payloads before they are serialized and published. To create a producer middleware, you implement the `ProducerMiddlewareT` protocol. ``` import uuid from typing import Any, Coroutine, Callable from repid import MessageData async def producer_tracing_middleware[T]( call_next: Callable[ [str, MessageData, dict[str, object] | None], Coroutine[None, None, T], ], channel: str, message: MessageData, server_specific_parameters: dict[str, object] | None, ) -> T: # Add a default tracking header if not present headers = message.headers or {} if "X-Trace-Id" not in headers: headers["X-Trace-Id"] = str(uuid.uuid4()) # Create a new message object with updated headers message = MessageData( payload=message.payload, headers=headers, content_type=message.content_type, ) print(f"Publishing to '{channel}' with Trace ID: {headers['X-Trace-Id']}") # Proceed with the actual publish return await call_next(channel, message, server_specific_parameters) ``` ``` import uuid from typing import Any, Coroutine, Callable, TypeVar from repid import MessageData T = TypeVar("T") async def producer_tracing_middleware( call_next: Callable[ [str, MessageData, dict[str, object] | None], Coroutine[None, None, T], ], channel: str, message: MessageData, server_specific_parameters: dict[str, object] | None, ) -> T: # Add a default tracking header if not present headers = message.headers or {} if "X-Trace-Id" not in headers: headers["X-Trace-Id"] = str(uuid.uuid4()) # Create a new message object with updated headers message = MessageData( payload=message.payload, headers=headers, content_type=message.content_type, ) print(f"Publishing to '{channel}' with Trace ID: {headers['X-Trace-Id']}") # Proceed with the actual publish return await call_next(channel, message, server_specific_parameters) ``` ## Registering Middlewares Middlewares are registered when you instantiate your components. You can attach actor middlewares to the main `Repid` application, a specific `Router`, or even an individual actor. Producer middlewares are attached to the main `Repid` application. They execute in an onion-like chain (first added -> wraps everything -> executes first and finishes last). ``` from repid import Repid, Router # Register on the main app app = Repid( actor_middlewares=[MetricsMiddleware(), LoggingActorMiddleware()], producer_middlewares=[TraceIdProducerMiddleware(), ValidationMiddleware()] ) # Or register specifically on a router router = Router(middlewares=[SpecificRouterMiddleware()]) # Or even a single actor! @router.actor(middlewares=[SingleActorMiddleware()]) async def my_job() -> None: pass ``` # Cookbook # Sentry Middleware [Sentry](https://sentry.io/) is an industry-standard error tracking platform. This cookbook explains how to write a custom Repid Middleware to capture unhandled exceptions in your actors and automatically report them to your Sentry dashboard. ## The Actor Middleware When a worker picks up a message, we want to add context (like the channel and message ID) and capture any exceptions that bubble up from the actor's execution. We can achieve this by implementing an **Actor Middleware** that wraps the execution of our tasks. By using `sentry_sdk.new_scope()`, we ensure that the tags and context are isolated to this specific task execution and don't bleed into other asynchronous tasks running concurrently. ``` import sentry_sdk from typing import Callable, Coroutine, TypeVar from repid.connections.abc import ReceivedMessageT from repid.data import ActorData T = TypeVar("T") async def sentry_actor_middleware( call_next: Callable[[ReceivedMessageT, ActorData], Coroutine[None, None, T]], message: ReceivedMessageT, actor: ActorData, ) -> T: with sentry_sdk.new_scope() as scope: # (1)! scope.set_tag("repid.channel", message.channel) # (2)! scope.set_tag("repid.actor", actor.name) if message.message_id: scope.set_tag("repid.message_id", message.message_id) try: return await call_next(message, actor) # (3)! except Exception as e: sentry_sdk.capture_exception(e) # (4)! raise ``` 1. Isolate the context for this specific task execution so tags don't bleed into other concurrent tasks. 1. Add useful contextual tags that will appear in your Sentry dashboard. 1. Execute the actor (and any subsequent middlewares). 1. Capture the exception before letting it bubble up to Repid's internal error handler. ## Registering the Middleware Once defined, you apply the middleware to your Repid application, router, or specific actor. Registering it on the main `Repid` app ensures all actors automatically report errors. ``` import sentry_sdk from repid import Repid, Router sentry_sdk.init() # (1)! app = Repid(actor_middlewares=[sentry_actor_middleware]) # (2)! router = Router() @router.actor(channel="my_queue") # (3)! async def process_task(data: dict) -> None: print("Processing task...") raise ValueError("Something went wrong!") # (4)! ``` 1. Initialize Sentry. 1. Register the middleware globally on the Repid app so all routers inherit it. 1. Any actor on this router will now automatically report errors. 1. Any exceptions here will immediately show up in your Sentry dashboard! They will include the channel, actor name, and message ID tags. # RabbitMQ Retries with Exponential Backoff When a task fails, retrying it with increasing delays (exponential backoff) prevents your system from being overwhelmed by failing dependencies. This cookbook explains how to implement exponential backoff using **RabbitMQ 4.x** (or 3.13+ by enabling plugin `rabbitmq-plugins enable rabbitmq_amqp1_0`). It uses **Quorum Queues** to prevent poison messages and a smart **Topic Exchange** topology that shares a single set of delay queues across all your work queues. *(Inspired by [Brian Storti's article](https://www.brianstorti.com/rabbitmq-exponential-backoff/))* ## The Architecture & Message Flow RabbitMQ lacks native per-message delays. We simulate delays using queues with a Time-To-Live (`x-message-ttl`). When the TTL expires, messages are dead-lettered back to an exchange. Creating a dedicated delay queue for *every* work queue and delay interval is inefficient. Instead, we use **Topic Exchanges** to share delay queues. Here is the lifecycle of a retried message: ``` graph TD WQ[(my_work_queue)] -->|Consumes| W[Worker] W -->|1st Fail Routing Key:
delay.10000.my_work_queue| RX{{retry_exchange}} W -->|2nd Fail Routing Key:
delay.60000.my_work_queue| RX RX -->|Binding:
delay.10000.#| DQ1[(delay_10s
TTL: 10s)] RX -->|Binding:
delay.60000.#| DQ2[(delay_60s
TTL: 60s)] DQ1 -.->|Dead-letter
Routing Key preserved| RQX{{requeue_exchange}} DQ2 -.->|Dead-letter
Routing Key preserved| RQX RQX -->|Binding:
*.*.my_work_queue| WQ ``` 1. **Fail**: A worker fails to process a message. The application calculates the delay, increments the retry counter, and publishes it to `retry_exchange` with a routing key like `delay.10000.my_work_queue` (or `delay.60000.my_work_queue` for the next attempt). 1. **Wait**: The exchange routes it to the corresponding shared delay queue (e.g., `delay_10s` or `delay_60s`) via the wildcard bindings. These queues have no consumers and a matching `x-message-ttl`. 1. **Expire**: The TTL expires. RabbitMQ dead-letters the message to `requeue_exchange`, preserving the original routing key (e.g., `delay.10000.my_work_queue`). 1. **Requeue**: `requeue_exchange` routes the message back to `my_work_queue` via the wildcard binding `*.*.my_work_queue`. ## Setting Up the Topology Configure this topology using your preferred administration tool. Below is an example using RabbitMQ's native Definitions JSON format. Create a `definitions.json` file: ``` { "vhosts": [{ "name": "/" }], "exchanges": [ { "name": "retry_exchange", "vhost": "/", "type": "topic", "durable": true, "auto_delete": false }, { "name": "requeue_exchange", "vhost": "/", "type": "topic", "durable": true, "auto_delete": false } ], "queues": [ { "name": "delay_10s", "vhost": "/", "durable": true, "auto_delete": false, "arguments": { "x-queue-type": "quorum", "x-message-ttl": 10000, "x-dead-letter-exchange": "requeue_exchange" } }, { "name": "delay_60s", "vhost": "/", "durable": true, "auto_delete": false, "arguments": { "x-queue-type": "quorum", "x-message-ttl": 60000, "x-dead-letter-exchange": "requeue_exchange" } }, { "name": "my_work_queue", "vhost": "/", "durable": true, "auto_delete": false, "arguments": { "x-queue-type": "quorum", "x-delivery-limit": 5 } } ], "bindings": [ { "source": "retry_exchange", "vhost": "/", "destination": "delay_10s", "destination_type": "queue", "routing_key": "delay.10000.#" }, { "source": "retry_exchange", "vhost": "/", "destination": "delay_60s", "destination_type": "queue", "routing_key": "delay.60000.#" }, { "source": "requeue_exchange", "vhost": "/", "destination": "my_work_queue", "destination_type": "queue", "routing_key": "*.*.my_work_queue" } ] } ``` You can upload this configuration using the RabbitMQ Management HTTP API via `curl` (assuming the management plugin `rabbitmq_management` is enabled): ``` curl -i -u guest:guest -H "content-type:application/json" -X POST \ # (1)! -d @definitions.json http://localhost:15672/api/definitions ``` 1. Or specify your proper credentials instead of `guest:guest` *(Alternatively, you can load these on server startup by pointing the `load_definitions` configuration file in `rabbitmq.conf` to this JSON file).* ## The Decorator Implementation To keep our actors clean and reusable, we can encapsulate the entire retry, delay calculation, and republishing logic into a single Python decorator. This decorator dynamically merges the `Message` dependency into your actor's signature so Repid can inject it at runtime. It leverages Repid's default `confirmation_mode="auto"` to elegantly handle successes (auto-ack) and exhausted retries (re-raise for `on_error` handling). ``` import inspect from collections.abc import Awaitable, Callable from functools import wraps from typing import Any, Concatenate, ParamSpec, TypeVar, cast from repid import Message, Router P = ParamSpec("P") R = TypeVar("R") def with_rabbitmq_retries( max_retries: int = 5, backoff_delays: list[int] | None = None, retry_exchange: str = "retry_exchange", retry_exceptions: type[Exception] | tuple[type[Exception], ...] = Exception, ) -> Callable[[Callable[P, Awaitable[R]]], Callable[Concatenate[Message, P], Awaitable[R]]]: """ Wraps a Repid actor with RabbitMQ exponential backoff logic. Assumes the actor uses the default `confirmation_mode="auto"`. """ if backoff_delays is None: # Default delays: 10s, 1m, 5m, 10m, 30m backoff_delays = [10_000, 60_000, 300_000, 600_000, 1_800_000] def decorator( func: Callable[P, Awaitable[R]], ) -> Callable[Concatenate[Message, P], Awaitable[R]]: sig = inspect.signature(func) # (1)! if "message" not in sig.parameters: new_params = [ *list(sig.parameters.values()), inspect.Parameter("message", inspect.Parameter.KEYWORD_ONLY, annotation=Message) ] new_sig = sig.replace(parameters=new_params) else: new_sig = sig @wraps(func) async def wrapper(message: Message, *args: P.args, **kwargs: P.kwargs) -> R: try: if "message" in sig.parameters: kw = cast(dict[str, Any], kwargs) kw["message"] = message return await func(*args, **kw) # (2)! else: return await func(*args, **kwargs) except retry_exceptions as e: # (3)! headers = message.headers or {} retry_count = int(headers.get("x-retry-count", 0)) if retry_count >= max_retries: print(f"Max retries ({max_retries}) reached for message {message.message_id}") raise # (4)! delay_index = min(retry_count, len(backoff_delays) - 1) delay_ms = backoff_delays[delay_index] print(f"Task failed: {e}. Retrying in {delay_ms}ms (Attempt {retry_count + 1})") new_headers = headers.copy() new_headers["x-retry-count"] = str(retry_count + 1) amqp_to_address = ( f"/exchanges/{retry_exchange}/delay.{delay_ms}.{message.channel}" # (5)! ) await message.send_message( channel=message.channel, payload=message.payload, content_type=message.content_type, headers=new_headers, server_specific_parameters={"to": amqp_to_address}, ) return None # (6)! wrapper.__signature__ = new_sig # (7)! return wrapper return decorator ``` 1. We must dynamically merge the signature so Repid injects BOTH your custom payload arguments AND the `Message` dependency. Without this, `@wraps` hides `message`, or removing `@wraps` hides your payload args! 1. Execute the original actor code, safely omitting `message` if they didn't explicitly request it in their signature. 1. Handle the failure and calculate backoff. Any exception NOT in `retry_exceptions` will bypass this block and be immediately handled by Repid's `on_error` policy (no retries). 1. Re-raise the exception so Repid's auto mode catches it and naturally nacks/rejects it based on your actor settings. 1. Explicitly construct the AMQP 1.0 address for a RabbitMQ exchange (format: `/exchanges//`). 1. Return normally to suppress the exception. In auto mode, this causes Repid to automatically `ack` the original message! 1. Apply the merged signature to the wrapper so Repid's DI parser knows `message` needs to be provided. ### Using the Decorator Now, your actual actor implementation becomes incredibly clean. You only need to focus on your business logic, and the decorator handles the rest: ``` router = Router() @router.actor(channel="my_work_queue") @with_rabbitmq_retries(max_retries=5, retry_exceptions=(ConnectionError, TimeoutError)) async def process_task(data: dict) -> None: print(f"Processing data: {data}") if data.get("bad_payload"): raise ValueError("Invalid data format") # (1)! raise ConnectionError("Temporary API failure") # (2)! ``` 1. This will NOT be retried. It bypasses our decorator exception block and immediately propagates to Repid to be handled (e.g. NACK-ed or Dead Lettered). 1. This WILL be caught by the decorator and retried with exponential backoff! # Handling Large Payloads Generally, message brokers are optimized for routing millions of tiny messages quickly. They are not designed to store or transfer massive binary blobs. If your application needs to process large datasets asynchronously - such as parsing a big CSV upload or performing analytics on millions of records - sending that data directly through your message queue causes memory spikes, high serialization overhead, and hurts performance. This cookbook explains how to safely handle large payloads in Repid by passing data by reference using S3-compatible storage and the [Polars](https://pola.rs/) DataFrame library. ## The Scenario: Bulk Audience Processing Imagine a marketing platform where users upload massive CSV files containing millions of customer records to generate a "Custom Audience Summary Report". Uploading and parsing this file synchronously in a web server (like FastAPI) would tie up the worker and time out the HTTP request. Instead, we use **Presigned URLs**: 1. The client requests a Presigned URL from the web server. 1. The client uploads the large CSV directly to an S3-compatible storage bucket. 1. The client notifies the web server that the upload is complete, providing the S3 key. 1. The web server enqueues a Repid task containing only a lightweight reference: `{"job_id": 123, "s3_key": "uploads/123.csv"}`. This completely bypasses the web server for the heavy network I/O and keeps the message queue blazing fast. ## Prerequisites For this example, we will use `aioboto3` for asynchronous S3 interactions and `polars` for fast data processing. ``` pip install repid aioboto3 polars ``` ## The Publisher: Enqueuing the Reference The publisher code (e.g., your FastAPI endpoint) doesn't handle the file at all. It simply receives the confirmation from the client and enqueues the Repid task with the S3 reference. ``` from repid import Repid # Assuming `app` is your initialized and configured Repid instance async def enqueue_audience_processing(job_id: int, s3_key: str) -> None: await app.send_message_json( channel="process_audience", payload={ "job_id": job_id, "s3_key": s3_key, }, ) # (1)! ``` 1. We enqueue a tiny JSON payload containing just the reference (the `s3_key`). The actual 50MB file safely resides in S3-compatible storage. ## The Worker: Downloading, Processing, and Uploading The Repid actor receives the lightweight payload and performs the computation. ``` import io import asyncio import polars as pl import aioboto3 from repid import Router router = Router() def _heavy_computation(file_data: bytes) -> bytes: # (1)! df = pl.read_csv(io.BytesIO(file_data)) # Perform heavy lifting: e.g., aggregations and deduplication summary_df = ( df.unique(subset=["email"]) .group_by("country") .agg(pl.len().alias("user_count")) ) buffer = io.BytesIO() summary_df.write_csv(buffer) return buffer.getvalue() @router.actor async def process_audience(job_id: int, s3_key: str) -> None: session = aioboto3.Session() # Step 1: Download the raw data async with session.client("s3") as s3: response = await s3.get_object(Bucket="my-marketing-bucket", Key=s3_key) file_data = await response["Body"].read() # Step 2: Offload CPU-bound work to a separate thread summary_bytes = await asyncio.to_thread(_heavy_computation, file_data) # (2)! # Step 3: Upload the result and clean up summary_key = f"reports/summary_{job_id}.csv" async with session.client("s3") as s3: await s3.put_object( Bucket="my-marketing-bucket", Key=summary_key, Body=summary_bytes, ) # (3)! await s3.delete_object(Bucket="my-marketing-bucket", Key=s3_key) # (4)! ``` 1. This function runs synchronously in a separate thread. It takes the raw bytes, parses them, performs heavy analytical aggregations, and returns the serialized result bytes. 1. `asyncio.to_thread` offloads the work to avoid blocking the event loop. 1. We upload the result (summary report) back to S3-compatible storage so that the client can download it later. 1. We delete the temporary raw upload to minimize storage costs now that the job is done. # Extending Repid # Your own brokers Coming soon! Help us write documentation by submitting a [pull request](https://github.com/aleksul/repid/pulls)! Also check out [contributing guide](https://repid.aleksul.space/latest/contributing/)! # Your own converter Coming soon! Help us write documentation by submitting a [pull request](https://github.com/aleksul/repid/pulls)! Also check out [contributing guide](https://repid.aleksul.space/latest/contributing/)! # Your own serializer Coming soon! Help us write documentation by submitting a [pull request](https://github.com/aleksul/repid/pulls)! Also check out [contributing guide](https://repid.aleksul.space/latest/contributing/)!