Documentation Index
Fetch the complete documentation index at: https://mintlify.com/frappe/frappe/llms.txt
Use this file to discover all available pages before exploring further.
Frappe Framework provides real-time communication between server and clients using Socket.IO, enabling live notifications, progress updates, and collaborative features.
Publishing real-time events
Send real-time updates to clients using publish_realtime:
import frappe
# Publish to specific user
frappe.publish_realtime(
event="task_update",
message={"task_id": "TASK-001", "status": "Completed"},
user="user@example.com"
)
# Publish to all users
frappe.publish_realtime(
event="system_notification",
message={"title": "System Maintenance", "message": "Scheduled at 2 AM"},
room="all"
)
# Publish to specific DocType
frappe.publish_realtime(
event="list_update",
message={"doctype": "Task"},
doctype="Task"
)
Event types
Common real-time event types:
Progress updates
Show progress of long-running tasks:
import frappe
from frappe.realtime import publish_progress
def process_large_dataset():
total = 1000
for i in range(total):
# Process item
process_item(i)
# Publish progress
publish_progress(
percent=((i + 1) / total) * 100,
title="Processing Data",
description=f"Processed {i + 1} of {total} items"
)
Message print
Show messages to users:
import frappe
# Send message to current user
frappe.publish_realtime(
event="msgprint",
message={"message": "Operation completed successfully!"},
user=frappe.session.user
)
List updates
Notify list views to refresh:
import frappe
class Task(Document):
def on_update(self):
# Notify all clients viewing Task list
frappe.publish_realtime(
event="list_update",
message={"doctype": "Task"},
doctype="Task"
)
Document updates
Notify specific document viewers:
import frappe
class Task(Document):
def on_update(self):
# Notify all clients viewing this document
frappe.publish_realtime(
event="docinfo_update",
message={"modified": self.modified},
doctype=self.doctype,
docname=self.name
)
Real-time rooms
Real-time events are published to specific rooms:
from frappe.realtime import (
get_user_room,
get_site_room,
get_doctype_room,
get_doc_room,
get_task_progress_room
)
# User-specific room
user_room = get_user_room("user@example.com")
# Returns: "user:user@example.com"
# Site-wide room
site_room = get_site_room()
# Returns: "all"
# DocType room
doctype_room = get_doctype_room("Task")
# Returns: "doctype:Task"
# Document-specific room
doc_room = get_doc_room("Task", "TASK-001")
# Returns: "doc:Task/TASK-001"
# Task progress room
task_room = get_task_progress_room("task-id-123")
# Returns: "task_progress:task-id-123"
Publishing to rooms
Publish events to specific rooms:
import frappe
# Publish to custom room
frappe.publish_realtime(
event="custom_event",
message={"data": "value"},
room="custom_room_name"
)
# Publish to document room
frappe.publish_realtime(
event="doc_update",
message={"action": "save"},
doctype="Sales Order",
docname="SO-001"
)
After commit publishing
Publish events only after database commit:
import frappe
class SalesOrder(Document):
def on_submit(self):
# Only publish if submit succeeds
frappe.publish_realtime(
event="order_submitted",
message={"order_id": self.name},
after_commit=True
)
Client-side listening
Listen for real-time events on the client:
// Listen for specific event
frappe.realtime.on('task_update', (data) => {
console.log('Task updated:', data.task_id);
// Refresh list or update UI
frappe.views.list_view.refresh();
});
// Listen for progress updates
frappe.realtime.on('progress', (data) => {
if (data.percent) {
// Update progress bar
frappe.show_progress(
data.title,
data.percent,
100,
data.description
);
}
});
// Listen for message print
frappe.realtime.on('msgprint', (data) => {
frappe.msgprint(data.message);
});
Task progress tracking
Track progress of background tasks:
import frappe
from frappe.utils.background_jobs import enqueue
def long_running_task():
"""Function to run in background"""
total = 100
for i in range(total):
# Do some work
process_item(i)
# Publish progress
frappe.publish_realtime(
"progress",
{
"percent": ((i + 1) / total) * 100,
"title": "Processing Items",
"description": f"Processing item {i + 1} of {total}"
},
task_id=frappe.local.task_id
)
# Enqueue with task ID
task_id = "unique-task-id"
enqueue(
"myapp.tasks.long_running_task",
job_id=task_id
)
Client-side tracking:
// Track task progress
let task_id = 'unique-task-id';
frappe.realtime.on(`task_progress:${task_id}`, (data) => {
if (data.percent) {
// Update progress indicator
update_progress_bar(data.percent, data.description);
}
});
Real-time notifications
Send notifications to users:
import frappe
def send_notification(user, message):
"""Send real-time notification to user"""
frappe.publish_realtime(
event="notification",
message={
"type": "info",
"message": message,
"title": "Notification"
},
user=user
)
# Usage
send_notification(
"user@example.com",
"Your report is ready for download"
)
Socket.IO authentication
Authentication is handled automatically using session tokens:
@frappe.whitelist(allow_guest=True)
def get_user_info():
"""Get user info for Socket.IO authentication"""
trusted_secret = get_socketio_secret()
provided_secret = frappe.get_request_header("X-Frappe-Socket-Secret")
if trusted_secret != provided_secret:
return {}
return {
"user": frappe.session.user,
"user_type": frappe.session.data.user_type,
"installed_apps": frappe.get_installed_apps()
}
Real-time via Redis
Frappe uses Redis for real-time event distribution:
def emit_via_redis(event, message, room):
"""Publish event via Redis to Socket.IO server"""
from frappe.utils.background_jobs import get_redis_connection_without_auth
r = get_redis_connection_without_auth()
r.publish(
"events",
frappe.as_json({
"event": event,
"message": message,
"room": room,
"namespace": frappe.local.site
})
)
Permission checks
Check permissions before publishing to document rooms:
@frappe.whitelist(allow_guest=True)
def has_permission(doctype, name):
"""Check if user has permission to subscribe to document updates"""
frappe.has_permission(doctype, doc=name, throw=True)
return True
Custom real-time events
Create custom real-time event handlers:
# Server-side
import frappe
class CustomDocType(Document):
def on_update(self):
# Publish custom event
frappe.publish_realtime(
event="custom_doc_update",
message={
"doc_id": self.name,
"custom_field": self.custom_field,
"timestamp": frappe.utils.now()
},
doctype=self.doctype,
docname=self.name
)
Client-side handler:
// Listen for custom event
frappe.realtime.on('custom_doc_update', (data) => {
console.log('Custom doc updated:', data.doc_id);
// Handle custom event
handle_custom_update(data);
});
Batching real-time updates
Batch multiple updates for better performance:
import frappe
def bulk_update_tasks(task_ids):
"""Update multiple tasks and publish once"""
for task_id in task_ids:
task = frappe.get_doc("Task", task_id)
task.status = "Completed"
task.save()
# Publish single update for all tasks
frappe.publish_realtime(
event="list_update",
message={"doctype": "Task"},
doctype="Task"
)