Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/frappe/frappe/llms.txt

Use this file to discover all available pages before exploring further.

Frappe Framework provides real-time communication between server and clients using Socket.IO, enabling live notifications, progress updates, and collaborative features.

Publishing real-time events

Send real-time updates to clients using publish_realtime:
import frappe

# Publish to specific user
frappe.publish_realtime(
    event="task_update",
    message={"task_id": "TASK-001", "status": "Completed"},
    user="user@example.com"
)

# Publish to all users
frappe.publish_realtime(
    event="system_notification",
    message={"title": "System Maintenance", "message": "Scheduled at 2 AM"},
    room="all"
)

# Publish to specific DocType
frappe.publish_realtime(
    event="list_update",
    message={"doctype": "Task"},
    doctype="Task"
)

Event types

Common real-time event types:

Progress updates

Show progress of long-running tasks:
import frappe
from frappe.realtime import publish_progress

def process_large_dataset():
    total = 1000
    
    for i in range(total):
        # Process item
        process_item(i)
        
        # Publish progress
        publish_progress(
            percent=((i + 1) / total) * 100,
            title="Processing Data",
            description=f"Processed {i + 1} of {total} items"
        )

Message print

Show messages to users:
import frappe

# Send message to current user
frappe.publish_realtime(
    event="msgprint",
    message={"message": "Operation completed successfully!"},
    user=frappe.session.user
)

List updates

Notify list views to refresh:
import frappe

class Task(Document):
    def on_update(self):
        # Notify all clients viewing Task list
        frappe.publish_realtime(
            event="list_update",
            message={"doctype": "Task"},
            doctype="Task"
        )

Document updates

Notify specific document viewers:
import frappe

class Task(Document):
    def on_update(self):
        # Notify all clients viewing this document
        frappe.publish_realtime(
            event="docinfo_update",
            message={"modified": self.modified},
            doctype=self.doctype,
            docname=self.name
        )

Real-time rooms

Real-time events are published to specific rooms:
from frappe.realtime import (
    get_user_room,
    get_site_room,
    get_doctype_room,
    get_doc_room,
    get_task_progress_room
)

# User-specific room
user_room = get_user_room("user@example.com")
# Returns: "user:user@example.com"

# Site-wide room
site_room = get_site_room()
# Returns: "all"

# DocType room
doctype_room = get_doctype_room("Task")
# Returns: "doctype:Task"

# Document-specific room
doc_room = get_doc_room("Task", "TASK-001")
# Returns: "doc:Task/TASK-001"

# Task progress room
task_room = get_task_progress_room("task-id-123")
# Returns: "task_progress:task-id-123"

Publishing to rooms

Publish events to specific rooms:
import frappe

# Publish to custom room
frappe.publish_realtime(
    event="custom_event",
    message={"data": "value"},
    room="custom_room_name"
)

# Publish to document room
frappe.publish_realtime(
    event="doc_update",
    message={"action": "save"},
    doctype="Sales Order",
    docname="SO-001"
)

After commit publishing

Publish events only after database commit:
import frappe

class SalesOrder(Document):
    def on_submit(self):
        # Only publish if submit succeeds
        frappe.publish_realtime(
            event="order_submitted",
            message={"order_id": self.name},
            after_commit=True
        )

Client-side listening

Listen for real-time events on the client:
// Listen for specific event
frappe.realtime.on('task_update', (data) => {
    console.log('Task updated:', data.task_id);
    // Refresh list or update UI
    frappe.views.list_view.refresh();
});

// Listen for progress updates
frappe.realtime.on('progress', (data) => {
    if (data.percent) {
        // Update progress bar
        frappe.show_progress(
            data.title,
            data.percent,
            100,
            data.description
        );
    }
});

// Listen for message print
frappe.realtime.on('msgprint', (data) => {
    frappe.msgprint(data.message);
});

Task progress tracking

Track progress of background tasks:
import frappe
from frappe.utils.background_jobs import enqueue

def long_running_task():
    """Function to run in background"""
    total = 100
    
    for i in range(total):
        # Do some work
        process_item(i)
        
        # Publish progress
        frappe.publish_realtime(
            "progress",
            {
                "percent": ((i + 1) / total) * 100,
                "title": "Processing Items",
                "description": f"Processing item {i + 1} of {total}"
            },
            task_id=frappe.local.task_id
        )

# Enqueue with task ID
task_id = "unique-task-id"
enqueue(
    "myapp.tasks.long_running_task",
    job_id=task_id
)
Client-side tracking:
// Track task progress
let task_id = 'unique-task-id';

frappe.realtime.on(`task_progress:${task_id}`, (data) => {
    if (data.percent) {
        // Update progress indicator
        update_progress_bar(data.percent, data.description);
    }
});

Real-time notifications

Send notifications to users:
import frappe

def send_notification(user, message):
    """Send real-time notification to user"""
    frappe.publish_realtime(
        event="notification",
        message={
            "type": "info",
            "message": message,
            "title": "Notification"
        },
        user=user
    )

# Usage
send_notification(
    "user@example.com",
    "Your report is ready for download"
)

Socket.IO authentication

Authentication is handled automatically using session tokens:
@frappe.whitelist(allow_guest=True)
def get_user_info():
    """Get user info for Socket.IO authentication"""
    trusted_secret = get_socketio_secret()
    provided_secret = frappe.get_request_header("X-Frappe-Socket-Secret")
    
    if trusted_secret != provided_secret:
        return {}
    
    return {
        "user": frappe.session.user,
        "user_type": frappe.session.data.user_type,
        "installed_apps": frappe.get_installed_apps()
    }

Real-time via Redis

Frappe uses Redis for real-time event distribution:
def emit_via_redis(event, message, room):
    """Publish event via Redis to Socket.IO server"""
    from frappe.utils.background_jobs import get_redis_connection_without_auth
    
    r = get_redis_connection_without_auth()
    r.publish(
        "events",
        frappe.as_json({
            "event": event,
            "message": message,
            "room": room,
            "namespace": frappe.local.site
        })
    )

Permission checks

Check permissions before publishing to document rooms:
@frappe.whitelist(allow_guest=True)
def has_permission(doctype, name):
    """Check if user has permission to subscribe to document updates"""
    frappe.has_permission(doctype, doc=name, throw=True)
    return True

Custom real-time events

Create custom real-time event handlers:
# Server-side
import frappe

class CustomDocType(Document):
    def on_update(self):
        # Publish custom event
        frappe.publish_realtime(
            event="custom_doc_update",
            message={
                "doc_id": self.name,
                "custom_field": self.custom_field,
                "timestamp": frappe.utils.now()
            },
            doctype=self.doctype,
            docname=self.name
        )
Client-side handler:
// Listen for custom event
frappe.realtime.on('custom_doc_update', (data) => {
    console.log('Custom doc updated:', data.doc_id);
    // Handle custom event
    handle_custom_update(data);
});

Batching real-time updates

Batch multiple updates for better performance:
import frappe

def bulk_update_tasks(task_ids):
    """Update multiple tasks and publish once"""
    for task_id in task_ids:
        task = frappe.get_doc("Task", task_id)
        task.status = "Completed"
        task.save()
    
    # Publish single update for all tasks
    frappe.publish_realtime(
        event="list_update",
        message={"doctype": "Task"},
        doctype="Task"
    )