Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/frappe/frappe/llms.txt

Use this file to discover all available pages before exploring further.

Frappe Framework uses RQ (Redis Queue) for background job processing, allowing you to execute long-running tasks asynchronously without blocking the main application.

Enqueuing jobs

Enqueue a function to run in the background:
import frappe

# Enqueue a simple function
frappe.enqueue(
    "myapp.tasks.process_data",
    queue="default",
    timeout=300,
    is_async=True
)

# Enqueue with arguments
frappe.enqueue(
    "myapp.tasks.send_emails",
    queue="default",
    users=["user1@example.com", "user2@example.com"],
    message="Hello!"
)

Queue types

Frappe provides three default queues with different timeout settings:
  • short: For quick tasks (default timeout: 300 seconds)
  • default: For normal tasks (default timeout: 300 seconds)
  • long: For long-running tasks (default timeout: 1500 seconds)
# Use long queue for time-consuming tasks
frappe.enqueue(
    "myapp.tasks.generate_report",
    queue="long",
    timeout=3600  # 1 hour
)

Enqueue parameters

The enqueue function accepts several parameters:
frappe.enqueue(
    method="myapp.tasks.my_function",
    queue="default",
    timeout=300,
    event=None,
    is_async=True,
    job_name=None,
    now=False,
    enqueue_after_commit=False,
    on_success=callback_on_success,
    on_failure=callback_on_failure,
    at_front=False,
    job_id="unique-job-id",
    deduplicate=False,
    **kwargs
)

Key parameters

  • method: Function path or callable
  • queue: Queue name (short, default, long)
  • timeout: Maximum execution time in seconds
  • is_async: Execute asynchronously (default: True)
  • now: Execute immediately in foreground (default: False)
  • enqueue_after_commit: Wait for DB commit before enqueuing
  • on_success: Callback function on success
  • on_failure: Callback function on failure
  • at_front: Add job to front of queue
  • job_id: Unique identifier for deduplication
  • deduplicate: Prevent duplicate jobs with same job_id

Enqueue after commit

Ensure job is only enqueued after database transaction commits:
import frappe

class SalesOrder(Document):
    def on_submit(self):
        # Only send email if submit is successful
        frappe.enqueue(
            "myapp.tasks.send_order_confirmation",
            enqueue_after_commit=True,
            order_id=self.name
        )

Job deduplication

Prevent duplicate jobs from being queued:
import frappe

# Use job_id for deduplication
frappe.enqueue(
    "myapp.tasks.sync_data",
    job_id="sync-customer-data",
    deduplicate=True,
    customer="CUST-001"
)

# Check if job is already enqueued
from frappe.utils.background_jobs import is_job_enqueued

if is_job_enqueued("sync-customer-data"):
    frappe.msgprint("Sync already in progress")

Enqueue document method

Execute a document method in the background:
import frappe

# Enqueue document method
frappe.enqueue_doc(
    doctype="Sales Order",
    name="SO-001",
    method="create_delivery_note",
    queue="default",
    timeout=300
)
In the DocType controller:
class SalesOrder(Document):
    def create_delivery_note(self):
        """This method will be executed in background"""
        delivery_note = frappe.new_doc("Delivery Note")
        # ... create delivery note
        delivery_note.insert()

Job callbacks

Execute functions on job success or failure:
import frappe

def on_success_callback(job):
    """Called when job completes successfully"""
    frappe.publish_realtime(
        "job_complete",
        {"job_id": job.id, "status": "success"},
        user=frappe.session.user
    )

def on_failure_callback(job, exc_type, exc_value, traceback):
    """Called when job fails"""
    frappe.log_error(f"Job {job.id} failed: {exc_value}")

frappe.enqueue(
    "myapp.tasks.process_data",
    on_success=on_success_callback,
    on_failure=on_failure_callback
)

Queue priority

Add jobs to the front of the queue:
import frappe

# High priority job
frappe.enqueue(
    "myapp.tasks.urgent_task",
    at_front=True
)

# Auto-prioritize when queue is starved
frappe.enqueue(
    "myapp.tasks.interactive_task",
    at_front_when_starved=True
)

Job execution

The background worker executes jobs using the execute_job function:
def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True, retry=0):
    """Execute job in worker, handle commit/rollback"""
    retval = None
    
    if is_async:
        frappe.init(site, force=True)
        frappe.connect()
        if user:
            frappe.set_user(user)
    
    # Call method with arguments
    method_fn = frappe.get_attr(method)
    retval = method_fn(**kwargs)
    
    # Commit transaction
    frappe.db.commit()
    
    return retval

Starting workers

Start background workers to process jobs:
# Start single worker
bench worker --queue default

# Start worker for specific queues
bench worker --queue default,short,long

# Start worker pool with multiple workers
bench worker --queue default --num-workers 4

# Start worker in burst mode (exits after processing all jobs)
bench worker --queue default --burst

Custom queues

Define custom worker queues:
# In common_site_config.json
{
    "workers": {
        "email_queue": {
            "timeout": 600
        },
        "data_import": {
            "timeout": 7200
        }
    }
}
Use custom queue:
frappe.enqueue(
    "myapp.tasks.import_data",
    queue="data_import",
    file_path="/path/to/data.csv"
)

Job monitoring

Monitor job status and queue health:
import frappe
from frappe.utils.background_jobs import get_jobs, get_workers

# Get all jobs in a queue
jobs = get_jobs(site=frappe.local.site, queue="default")

# Get all active workers
workers = get_workers()

# Get job by ID
from frappe.utils.background_jobs import get_job, get_job_status

job = get_job("my-job-id")
status = get_job_status("my-job-id")

Retry failed jobs

Jobs automatically retry on certain errors:
def execute_job(site, method, event, job_name, kwargs, retry=0):
    try:
        retval = method(**kwargs)
    except (frappe.db.InternalError, frappe.RetryBackgroundJobError) as e:
        frappe.db.rollback()
        
        if retry < 5:
            # Retry job with exponential backoff
            time.sleep(retry + 1)
            return execute_job(site, method, event, job_name, kwargs, retry=retry + 1)
        else:
            frappe.log_error(title=method_name)
            raise

Job hooks

Execute functions before or after job execution:
# In hooks.py
before_job = [
    "myapp.jobs.before_job_hook"
]

after_job = [
    "myapp.jobs.after_job_hook"
]

# Hook functions
def before_job_hook(method, kwargs, transaction_type):
    """Called before job executes"""
    frappe.log(f"Starting job: {method}")

def after_job_hook(method, kwargs, result):
    """Called after job completes"""
    frappe.log(f"Job completed: {method}")

Queue configuration

Configure queue behavior:
# In common_site_config.json
{
    "rq_job_failure_ttl": 604800,  # Keep failed jobs for 7 days
    "rq_results_ttl": 600,  # Keep results for 10 minutes
    "rq_failed_jobs_limit": 1000,  # Maximum failed jobs to keep
    "email_queue_batch_size": 500,  # Emails to process per batch
    "max_queued_jobs": 500  # Maximum jobs in queue
}