Metadata-Version: 2.4
Name: acelery
Version: 0.1.0
Summary: Asyncio plugin for Celery
Project-URL: Homepage, https://github.com/omer9564/acelery
Project-URL: Repository, https://github.com/omer9564/acelery
Project-URL: Issues, https://github.com/omer9564/acelery/issues
Project-URL: Changelog, https://github.com/omer9564/acelery/releases
Author-email: Omer Zuarets <42326891+omer9564@users.noreply.github.com>
Maintainer-email: Omer Zuarets <42326891+omer9564@users.noreply.github.com>
License-Expression: MIT
License-File: LICENSE
Keywords: async,asyncio,celery,pubsub,rabbitmq,redis,task-queue
Classifier: Development Status :: 5 - Production/Stable
Classifier: Framework :: AsyncIO
Classifier: Framework :: Celery
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Classifier: Typing :: Typed
Requires-Python: >=3.10
Requires-Dist: amqp<6,>=5.2.0
Requires-Dist: celery<6,>=5.4.0
Requires-Dist: pydantic<3
Requires-Dist: redis
Requires-Dist: tenacity>=9.1.2
Requires-Dist: typing-extensions>=4.15.0
Provides-Extra: dev
Requires-Dist: celery-types; extra == 'dev'
Requires-Dist: pre-commit>=4.3.0; extra == 'dev'
Requires-Dist: typing-extensions; extra == 'dev'
Provides-Extra: test
Requires-Dist: coverage; extra == 'test'
Requires-Dist: pytest-asyncio; extra == 'test'
Requires-Dist: pytest-mock; extra == 'test'
Requires-Dist: pytest-timeout; extra == 'test'
Requires-Dist: pytest>=7.1.2; extra == 'test'
Description-Content-Type: text/markdown

# acelery

A Celery plugin that enables seamless execution of `asyncio` coroutines as Celery tasks with proper event-loop management and asyncio-native result handling.

It lets you write your tasks as `async def` functions and consume their results from any asyncio caller without blocking the event loop.

## Why?

Celery is synchronous by design: tasks are executed inside worker processes that don't know about `asyncio`, and `AsyncResult.get()` blocks the calling thread. That's painful when:

- Your task logic naturally `await`s I/O (HTTP, DB, queues, gRPC).
- Your caller is an `asyncio` application (e.g. FastAPI / aiohttp) and can't afford a blocking `.get()`.
- You want to deduplicate "the same task with the same args" across concurrent callers so you only run the work once.

`acelery` addresses all three: a worker-side `Runner` that hosts a persistent event loop, an `AIOResult` that awaits results over Redis Pub/Sub instead of polling, and a `trigger_or_join_task` primitive that coalesces duplicate invocations.

## Features

- **Async task decorator** — write `async def` and decorate with `@async_task()` to run it inside a Celery worker.
- **Worker event-loop management** — a shared `asyncio` runner is set up on `worker_process_init` and torn down on `worker_process_shutdown`.
- **Dedicated runners** — opt into a per-task `asyncio.run` for stronger isolation when you need it.
- **Asyncio-native result handling** — `AIOResult.get()` is a coroutine, backed by Redis Pub/Sub with periodic ticks to avoid races.
- **Task deduplication** — `AIOResult.trigger_or_join_task(...)` either starts a new task or joins an existing in-flight one keyed by `(name, args, kwargs)`.
- **Leftover-coroutine cleanup** — orphan tasks left behind by your coroutine are detected, logged, and cancelled.
- **Retry on timeout** — built on `tenacity`, configurable per call.
- **Pydantic-typed results** — declare `return_type=...` and the raw backend payload is validated/parsed for you (Pydantic v1 and v2 supported).
- **Backends**
  - [x] **Redis** — full support (Pub/Sub-driven awaiting).
  - [ ] Other backends not currently supported.

## Requirements

- Python **3.10+**
- A Celery broker (RabbitMQ, Redis, etc. — Celery's defaults apply)
- **Redis** as the result backend (required by `AIOResult`)

## Installation

Using uv:
```bash
uv add acelery
```
Using poetry:
```bash
poetry add acelery
```
Using pip:
```bash
pip install acelery
```

## Quick Start

### Configure Celery

`AIOResult` reads task metadata over Redis Pub/Sub, so configure Celery with a Redis result backend:

```python
from celery import Celery

app = Celery(
    "myapp",
    broker="amqp://guest:guest@localhost:5672//",
    backend="redis://localhost:6379/0",
)
```

### Define an async task

```python
import asyncio
from acelery import async_task

@app.task()
@async_task()
async def greet(name: str) -> str:
    await asyncio.sleep(0.1)
    return f"Hello, {name}!"
```

The decorator order matters: `@app.task()` must wrap `@async_task()`. The inner decorator turns the coroutine function into a synchronous callable that Celery can run; the outer one registers it as a Celery task.

### Dedicated runner (per-task isolation)

By default tasks share the worker's global asyncio runner. Pass `dedicated_runner=True` for tasks that need their own `asyncio.run` invocation (at the cost of ~10µs of overhead per call):

```python
@app.task()
@async_task(dedicated_runner=True)
async def isolated_task() -> str:
    await asyncio.sleep(1)
    return "isolated"
```

### Awaiting results from asyncio

Wrap the standard Celery `AsyncResult` in `AIOResult` to await it without blocking:

```python
from acelery import AIOResult

async def main() -> None:
    result = AIOResult(greet.delay("World"), return_type=str)
    value = await result.get(timeout=10)
    print(value)  # "Hello, World!"
```

`get()` accepts:

| Argument | Default | Purpose |
| --- | --- | --- |
| `timeout` | `60` | Hard upper bound for the whole wait. |
| `propagate` | `True` | Re-raise exceptions stored in the backend instead of returning them. |
| `forget` | `True` | Extend the result key's TTL to `forget_cooldown` after a successful read (so other late readers can still see it). |
| `forget_cooldown` | `90` | Seconds to retain the result before Redis evicts it. |
| `interrupt_every` | `5.0` | How often to fall back from Pub/Sub to a direct backend read (defends against missed messages). |
| `max_pending_interrupts` | `3` | Maximum consecutive "still pending" ticks before raising `MaxPendingInterruptsError`. |

### Trigger-or-join: deduplicate concurrent calls

If multiple callers fire the same task with the same args, you usually want one execution and many readers. `trigger_or_join_task` does exactly that, keyed by `name(args, kwargs)`:

```python
import asyncio
from acelery import AIOResult

@app.task(track_started=True)
@async_task()
async def sleeping_task(sleep_time: float) -> str:
    await asyncio.sleep(sleep_time)
    return "OK"

async def main() -> None:
    aio_result, value = await AIOResult.trigger_or_join_task(
        sleeping_task.s(2.0),
        timeout=30,
        return_type=str,
        track_started=True,  # set this whenever the task itself sets track_started=True
    )
    print(value)  # "OK"
```

Notes:
- The first caller publishes the task and stores `task_id` under the dedupe key.
- Subsequent callers find the existing `task_id` and `await` its result instead of re-publishing.
- Retries on timeout are on by default (`retry_on_timeout=True`, `max_retries_on_timeout=3`).
- If the task declares `track_started=True`, pass `track_started=True` here too so a `PENDING` state isn't mis-read as "still alive".

### Calling Celery primitives from asyncio

`AIOExecutable` wraps a `Signature` / `Task` and exposes async `delay` / `apply_async` / `apply`, executed in a thread pool so they don't block your event loop:

```python
from acelery import AIOExecutable

async def main() -> None:
    aio_result = await AIOExecutable(greet.s("World"), return_type=str).apply_async()
    print(await aio_result.get(timeout=10))
```

## How it works

- On `worker_process_init`, a module-level `Runner` (`asyncio.Runner` on 3.11+, polyfilled on 3.10) is started; on `worker_process_shutdown` it is closed cleanly.
- `@async_task` runs the coroutine inside that shared runner, copies the current `contextvars` context for proper propagation, and after the coroutine returns it cancels any `asyncio.Task`s left behind by the user code (with a warning log).
- `AIOResult.get()` opens a Redis Pub/Sub subscription on the result key and races it against a periodic "tick" that reads the backend directly — this closes the race where the task finishes before the subscriber is fully attached.

## License

See [LICENSE](LICENSE).

## Contributing

Issues, bug reports, and PRs are welcome. See [CONTRIBUTING.md](CONTRIBUTING.md) for development setup, testing, and submission guidelines.
