Documentation Index
Fetch the complete documentation index at: https://mintlify.com/frappe/frappe/llms.txt
Use this file to discover all available pages before exploring further.
Frappe Framework uses RQ (Redis Queue) for background job processing, allowing you to execute long-running tasks asynchronously without blocking the main application.
Enqueuing jobs
Enqueue a function to run in the background:
import frappe
# Enqueue a simple function
frappe.enqueue(
"myapp.tasks.process_data",
queue="default",
timeout=300,
is_async=True
)
# Enqueue with arguments
frappe.enqueue(
"myapp.tasks.send_emails",
queue="default",
users=["user1@example.com", "user2@example.com"],
message="Hello!"
)
Queue types
Frappe provides three default queues with different timeout settings:
- short: For quick tasks (default timeout: 300 seconds)
- default: For normal tasks (default timeout: 300 seconds)
- long: For long-running tasks (default timeout: 1500 seconds)
# Use long queue for time-consuming tasks
frappe.enqueue(
"myapp.tasks.generate_report",
queue="long",
timeout=3600 # 1 hour
)
Enqueue parameters
The enqueue function accepts several parameters:
frappe.enqueue(
method="myapp.tasks.my_function",
queue="default",
timeout=300,
event=None,
is_async=True,
job_name=None,
now=False,
enqueue_after_commit=False,
on_success=callback_on_success,
on_failure=callback_on_failure,
at_front=False,
job_id="unique-job-id",
deduplicate=False,
**kwargs
)
Key parameters
method: Function path or callable
queue: Queue name (short, default, long)
timeout: Maximum execution time in seconds
is_async: Execute asynchronously (default: True)
now: Execute immediately in foreground (default: False)
enqueue_after_commit: Wait for DB commit before enqueuing
on_success: Callback function on success
on_failure: Callback function on failure
at_front: Add job to front of queue
job_id: Unique identifier for deduplication
deduplicate: Prevent duplicate jobs with same job_id
Enqueue after commit
Ensure job is only enqueued after database transaction commits:
import frappe
class SalesOrder(Document):
def on_submit(self):
# Only send email if submit is successful
frappe.enqueue(
"myapp.tasks.send_order_confirmation",
enqueue_after_commit=True,
order_id=self.name
)
Job deduplication
Prevent duplicate jobs from being queued:
import frappe
# Use job_id for deduplication
frappe.enqueue(
"myapp.tasks.sync_data",
job_id="sync-customer-data",
deduplicate=True,
customer="CUST-001"
)
# Check if job is already enqueued
from frappe.utils.background_jobs import is_job_enqueued
if is_job_enqueued("sync-customer-data"):
frappe.msgprint("Sync already in progress")
Enqueue document method
Execute a document method in the background:
import frappe
# Enqueue document method
frappe.enqueue_doc(
doctype="Sales Order",
name="SO-001",
method="create_delivery_note",
queue="default",
timeout=300
)
In the DocType controller:
class SalesOrder(Document):
def create_delivery_note(self):
"""This method will be executed in background"""
delivery_note = frappe.new_doc("Delivery Note")
# ... create delivery note
delivery_note.insert()
Job callbacks
Execute functions on job success or failure:
import frappe
def on_success_callback(job):
"""Called when job completes successfully"""
frappe.publish_realtime(
"job_complete",
{"job_id": job.id, "status": "success"},
user=frappe.session.user
)
def on_failure_callback(job, exc_type, exc_value, traceback):
"""Called when job fails"""
frappe.log_error(f"Job {job.id} failed: {exc_value}")
frappe.enqueue(
"myapp.tasks.process_data",
on_success=on_success_callback,
on_failure=on_failure_callback
)
Queue priority
Add jobs to the front of the queue:
import frappe
# High priority job
frappe.enqueue(
"myapp.tasks.urgent_task",
at_front=True
)
# Auto-prioritize when queue is starved
frappe.enqueue(
"myapp.tasks.interactive_task",
at_front_when_starved=True
)
Job execution
The background worker executes jobs using the execute_job function:
def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True, retry=0):
"""Execute job in worker, handle commit/rollback"""
retval = None
if is_async:
frappe.init(site, force=True)
frappe.connect()
if user:
frappe.set_user(user)
# Call method with arguments
method_fn = frappe.get_attr(method)
retval = method_fn(**kwargs)
# Commit transaction
frappe.db.commit()
return retval
Starting workers
Start background workers to process jobs:
# Start single worker
bench worker --queue default
# Start worker for specific queues
bench worker --queue default,short,long
# Start worker pool with multiple workers
bench worker --queue default --num-workers 4
# Start worker in burst mode (exits after processing all jobs)
bench worker --queue default --burst
Custom queues
Define custom worker queues:
# In common_site_config.json
{
"workers": {
"email_queue": {
"timeout": 600
},
"data_import": {
"timeout": 7200
}
}
}
Use custom queue:
frappe.enqueue(
"myapp.tasks.import_data",
queue="data_import",
file_path="/path/to/data.csv"
)
Job monitoring
Monitor job status and queue health:
import frappe
from frappe.utils.background_jobs import get_jobs, get_workers
# Get all jobs in a queue
jobs = get_jobs(site=frappe.local.site, queue="default")
# Get all active workers
workers = get_workers()
# Get job by ID
from frappe.utils.background_jobs import get_job, get_job_status
job = get_job("my-job-id")
status = get_job_status("my-job-id")
Retry failed jobs
Jobs automatically retry on certain errors:
def execute_job(site, method, event, job_name, kwargs, retry=0):
try:
retval = method(**kwargs)
except (frappe.db.InternalError, frappe.RetryBackgroundJobError) as e:
frappe.db.rollback()
if retry < 5:
# Retry job with exponential backoff
time.sleep(retry + 1)
return execute_job(site, method, event, job_name, kwargs, retry=retry + 1)
else:
frappe.log_error(title=method_name)
raise
Job hooks
Execute functions before or after job execution:
# In hooks.py
before_job = [
"myapp.jobs.before_job_hook"
]
after_job = [
"myapp.jobs.after_job_hook"
]
# Hook functions
def before_job_hook(method, kwargs, transaction_type):
"""Called before job executes"""
frappe.log(f"Starting job: {method}")
def after_job_hook(method, kwargs, result):
"""Called after job completes"""
frappe.log(f"Job completed: {method}")
Queue configuration
Configure queue behavior:
# In common_site_config.json
{
"rq_job_failure_ttl": 604800, # Keep failed jobs for 7 days
"rq_results_ttl": 600, # Keep results for 10 minutes
"rq_failed_jobs_limit": 1000, # Maximum failed jobs to keep
"email_queue_batch_size": 500, # Emails to process per batch
"max_queued_jobs": 500 # Maximum jobs in queue
}