Skip to content

Guide

This guide covers all the features and usage patterns of django-vtasks.

Defining Tasks

Create a tasks.py in your Django app and use the @task decorator:

# myapp/tasks.py
from django_vtasks import task

@task
def send_email(to: str, subject: str, body: str):
    """Send an email."""
    # Your email sending logic
    pass

@task
async def process_image(image_id: int):
    """Process an image asynchronously."""
    # Async tasks are processed natively without thread overhead
    pass

Note

You can also use from django.tasks import task, but this won't allow vtasks-specific features like unique.

Enqueueing Tasks

Basic Enqueue

Both sync and async contexts are supported. Async performs better as it fully leverages the asyncio loop.

from myapp.tasks import send_email

# Synchronous
result = send_email.enqueue("user@example.com", "Hello", "Welcome!")

# Asynchronous (preferred)
result = await send_email.aenqueue("user@example.com", "Hello", "Welcome!")

Using the .using() API

The .using() method lets you configure task execution options:

# Route to a specific queue
await send_email.using(queue_name="emails").aenqueue(
    "user@example.com", "Hello", "Welcome!"
)

# Set priority
await send_email.using(priority=10).aenqueue(
    "admin@example.com", "Urgent", "Server down!"
)

# Combine multiple options
await send_email.using(
    queue_name="high-priority",
    priority=10,
    unique=True,
).aenqueue("admin@example.com", "Alert", "Important message")

Bulk Enqueueing

For high-performance scenarios where you need to dispatch many tasks at once, use enqueue_many or aenqueue_many:

from django.tasks import task_backends
from myapp.tasks import process_user, cleanup_job

tasks_to_send = [
    # (task, args, kwargs)
    (process_user, (user1.id,), {}),
    (process_user, (user2.id,), {}),
    (cleanup_job, (), {"queue_name": "low_priority"}),
]

# Async context
await task_backends["default"].aenqueue_many(tasks_to_send)

# Sync context (usually within a transaction)
task_backends["default"].enqueue_many(tasks_to_send)

Tasks are automatically grouped by queue name and sent in optimized batches using Valkey's variadic LPUSH or Postgres's bulk_create.

Unique Tasks

Prevent duplicate tasks from being enqueued using the unique parameter. This is useful for tasks that should only run once at a time.

Mutex Mode (Default)

The lock is released as soon as the task finishes:

# Auto-generated unique key from task name + args
result = await send_email.using(unique=True).aenqueue(
    "user@example.com", "Welcome", "..."
)

# If the task is already queued or running, this returns None
result2 = await send_email.using(unique=True).aenqueue(
    "user@example.com", "Welcome", "..."
)
# result2 is None (rejected as duplicate)

Custom Unique Keys

Use a custom key for finer control over uniqueness:

result = await send_email.using(
    unique=True,
    unique_key=f"email-user-{user_id}"
).aenqueue("user@example.com", "Welcome", "...")

Throttling Mode

Keep the lock for a specified TTL even after the task completes. This is useful for rate-limiting:

# Only allow 1 API call per 60 seconds
await fetch_data.using(
    unique=True,
    unique_key=f"api-fetch-{user_id}",
    ttl=60,
    remove_unique_on_complete=False,  # Throttle mode
).aenqueue(user_id)

Handling Rejections

Always check for None when using unique tasks:

result = await send_email.using(unique=True).aenqueue(user.id)
if result is None:
    # Task was rejected (duplicate found)
    return JsonResponse({"error": "Email already being sent"}, status=429)
else:
    # Task was enqueued successfully
    return JsonResponse({"task_id": result.id})

Backend Support for Unique Tasks

Mode Database (PostgreSQL) Database (SQLite) Database (MySQL) Valkey
Mutex Yes Yes No Yes
Throttling No No No Yes

Priority

Tasks can be assigned a priority to influence execution order:

# High priority task (processed first)
await send_email.using(priority=10).aenqueue(user.id)

# Low priority task
await send_email.using(priority=-10).aenqueue(user.id)

Database Backend: Supports full integer sorting between -100 and 100 (default 0). Tasks are ordered by priority (descending), then by creation time.

Valkey Backend: Supports binary priority:

  • High Priority (> 0): Tasks are pushed to the front of the queue ("Express Lane"). Multiple high-priority tasks are processed LIFO.
  • Normal Priority (<= 0): Tasks are pushed to the back and processed FIFO.

Periodic Tasks

Schedule tasks to run on a cron-like schedule:

# settings.py
from django_vtasks.scheduler import crontab

VTASKS_SCHEDULE = {
    "daily_report": {
        "task": "myapp.tasks.report",
        "schedule": crontab(hour=5, minute=0),  # Runs at 5:00 AM
    },
    "cleanup": {
        "task": "myapp.tasks.cleanup",
        "schedule": 3600,  # Runs every hour (in seconds)
    },
}

Run the worker with the scheduler enabled:

python manage.py runworker --scheduler

Scheduler Safety

For deployments running multiple scheduler instances, you must use one of these backends:

  • ValkeyTaskBackend
  • DatabaseTaskBackend with PostgreSQL or MySQL

Using SQLite with multiple schedulers may result in duplicate task runs.

Batch Processing

For high-throughput scenarios, batch processing allows the worker to fetch multiple tasks at once and process them in a single function call.

1. Configure a Batch Queue

# settings.py
VTASKS_BATCH_QUEUES = {
    "batch_queue": {
        "count": 100,    # Max tasks to fetch at once
        "timeout": 5.0,  # Max seconds to wait for tasks
    }
}

2. Create a Batch Processing Task

Your task must accept a list of task dictionaries:

@task
def process_widgets_batch(tasks: list[dict]):
    """Processes a batch of widgets."""
    widget_ids = [task["kwargs"]["widget_id"] for task in tasks]
    Widget.objects.filter(id__in=widget_ids).update(processed=True)
    print(f"Processed {len(widget_ids)} widgets")

3. Enqueue to the Batch Queue

for i in range(10):
    process_widgets_batch.using(queue_name="batch_queue").enqueue(widget_id=i)

The worker collects tasks (up to count or until timeout), groups them by task type, and calls your function with the batch.

Metrics & Observability

django-vtasks provides optional Prometheus metrics.

Installation

pip install "django-vtasks[metrics]"

Available Metrics

All metrics are prefixed with vtasks_.

Metric Type Labels Description
tasks_submitted_total Counter task_name, queue Tasks enqueued
tasks_processed_total Counter task_name, queue, status Tasks processed
task_duration_seconds Histogram task_name, queue Execution time
active_tasks Gauge queue Currently processing
queue_depth Gauge queue Tasks waiting

Enabling Metrics

For standalone workers:

python manage.py runworker --metrics-port 9100

For embedded workers, metrics are exposed via your application's /metrics endpoint (e.g., using django-prometheus).

Queue Management

Clearing Queues

For debugging or emergencies, clear tasks from queues:

# Clear a specific queue
python manage.py clear_queue --backend-alias=default --queue=default

# Clear all queues
python manage.py clear_queue --backend-alias=default --all-queues --force

# Clear failed tasks (DLQ)
python manage.py clear_queue --backend-alias=default --failed --force

Testing

For unit tests, use the immediate backend to execute tasks synchronously:

from django.test import TestCase, override_settings

@override_settings(
    TASKS={
        "default": {
            "BACKEND": "django_vtasks.backends.immediate.ImmediateBackend",
        }
    }
)
class MyTaskTests(TestCase):
    def test_something(self):
        # Tasks run immediately and synchronously
        my_task.enqueue()
        # ... assertions

Testing Batch Tasks

The ImmediateBackend stores batch queue tasks in memory. Use flush_batch() to process them:

@override_settings(
    TASKS={"default": {"BACKEND": "django_vtasks.backends.immediate.ImmediateBackend"}},
    VTASKS_BATCH_QUEUES={"batch_queue": {"count": 100, "timeout": 5.0}}
)
class MyBatchingTest(TestCase):
    def test_batching(self):
        backend = task_backends["default"]

        for i in range(5):
            process_batch.using(queue_name="batch_queue").enqueue(item_id=i)

        # Tasks are stored, not executed
        self.assertEqual(len(backend.pending_batches["batch_queue"]), 5)

        # Manually trigger batch processing
        backend.flush_batch("batch_queue")

        # Now verify your batch logic ran
        self.assertEqual(len(backend.pending_batches["batch_queue"]), 0)