Skip to content

Realtime API Reference

HTTP API reference for the Centrali Realtime Service. For most use cases, we recommend using the SDK which handles authentication, reconnection, and event parsing automatically.

Overview

The Realtime Service uses Server-Sent Events (SSE) to stream record events to clients. SSE is a standard HTTP-based protocol supported by all modern browsers and most HTTP clients.

Base URL: https://api.centrali.io/realtime

Endpoints

Subscribe to Events

GET /workspace/{workspaceSlug}/events

Opens an SSE stream for receiving record events in the specified workspace.

Path Parameters

Parameter Type Required Description
workspaceSlug string Yes The workspace to subscribe to

Query Parameters

Parameter Type Required Description
access_token string Yes* JWT access token for authentication
structures string No Comma-separated structure slugs to filter
events string No Comma-separated event types to filter
filter string No CFL v1 filter expression

*The access_token can also be provided via the Authorization: Bearer <token> header.

Example Request

curl -N "https://api.centrali.io/realtime/workspace/my-workspace/events?\
access_token=eyJhbG...&\
structures=order,invoice&\
events=record_created,record_updated&\
filter=data.status:pending"

Response Headers

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no

SSE Stream Format

The server sends events in standard SSE format:

retry: 3000
: connected to workspace my-workspace

event: message
data: {"event":"record_created","workspaceSlug":"my-workspace","recordSlug":"order","recordId":"rec_abc123","data":{"customer":"John","total":99.99},"timestamp":"2025-01-15T10:30:00Z","createdBy":"user_123"}

: ping

event: message
data: {"event":"record_updated","workspaceSlug":"my-workspace","recordSlug":"order","recordId":"rec_abc123","data":{"before":{"status":"pending"},"after":{"status":"shipped"}},"timestamp":"2025-01-15T10:35:00Z","updatedBy":"user_456"}

event: close
data: {"reason":"timeout","reconnect":true}

SSE Event Types

Event Name Description
message Record event (create, update, delete)
close Connection closing, includes reconnection hint
(comment) Keep-alive ping or status message

Event Payloads

Record Event

{
  "event": "record_created",
  "workspaceSlug": "my-workspace",
  "recordSlug": "order",
  "recordId": "rec_abc123",
  "data": {
    "customer": "John Doe",
    "total": 99.99,
    "status": "pending"
  },
  "timestamp": "2025-01-15T10:30:00Z",
  "createdBy": "user_123"
}
Field Type Description
event string Event type: record_created, record_updated, record_deleted
workspaceSlug string Workspace where event occurred
recordSlug string Structure's slug (e.g., "order")
recordId string Unique record identifier
data object Record data (see format below)
timestamp string ISO 8601 timestamp
createdBy string User ID (for create events)
updatedBy string User ID (for update events)
deletedBy string User ID (for delete events)

Data Format by Event Type

record_created:

{
  "data": {
    "field1": "value1",
    "field2": 123
  }
}

record_updated:

{
  "data": {
    "before": { "status": "pending" },
    "after": { "status": "shipped" }
  }
}

record_deleted:

{
  "data": {
    "field1": "value1",
    "field2": 123
  }
}

Close Event

Sent before server-initiated disconnection:

{
  "reason": "timeout",
  "reconnect": true
}
Field Type Description
reason string Reason for closing: timeout, permission_revoked, maintenance
reconnect boolean Whether client should attempt to reconnect

Error Responses

Error response format varies by error type. Authentication errors return JSON; other errors return plain text.

400 Bad Request

Plain text response:

workspace slug required

invalid filter: <details>

401 Unauthorized

JSON response with error code:

{"error":"authentication required","code":"MISSING_TOKEN"}

{"error":"token expired","code":"TOKEN_EXPIRED"}
{"error":"invalid token","code":"INVALID_TOKEN"}

403 Forbidden

JSON response with error code:

{"error":"token not valid for this workspace","code":"WORKSPACE_MISMATCH"}

{"error":"not authorized to subscribe to realtime events","code":"FORBIDDEN"}

429 Too Many Requests

JSON response for plan-based rate limits:

{"error":"connection limit exceeded for your plan","code":"RATE_LIMIT_EXCEEDED"}

Plain text for workspace connection limits:

workspace connection limit reached

500 Internal Server Error

JSON response for auth service errors:

{"error":"authorization check failed","code":"AUTH_ERROR"}

Plain text for other errors:

streaming not supported
connection error

503 Service Unavailable

Plain text response:

service at capacity

Error Codes

Note: Not all errors return a code field. Authentication errors (401, 403) include JSON with codes; other errors may return plain text.

Code HTTP Status Description Recoverable
MISSING_TOKEN 401 No token provided No
TOKEN_EXPIRED 401 Token has expired Yes
INVALID_TOKEN 401 Token is malformed or signature invalid No
WORKSPACE_MISMATCH 403 Token not valid for requested workspace No
FORBIDDEN 403 User lacks permission No
AUTH_ERROR 500 Authorization service unavailable Yes
RATE_LIMIT_EXCEEDED 429 Connection limit reached Yes

CFL Filter Syntax

Centrali Filter Language (CFL) v1 for filtering events by data values.

Format

field:value                    # Equality
field:operator:value          # With operator

Operators

Operator Description Example
eq Equals (default) data.status:shipped
ne Not equals data.status:ne:cancelled
gt Greater than data.amount:gt:100
lt Less than data.amount:lt:50
gte Greater than or equal data.priority:gte:5
lte Less than or equal data.quantity:lte:10
in Value in list data.status:in:a,b,c
nin Not in list data.status:nin:x,y
contains Contains substring data.name:contains:test
startswith Starts with data.sku:startswith:PROD-
endswith Ends with data.email:endswith:@co.com

Event Data Structure

Filter paths depend on the event type:

Event Data Structure Filter Example
record_created { field, ... } data.status:pending
record_updated { before: {...}, after: {...} } data.after.status:shipped
record_deleted { field, ... } data.status:archived

Examples

# Simple equality (for create/delete events)
filter=data.status:shipped

# For update events, filter on new value
filter=data.after.status:shipped

# Or filter on old value
filter=data.before.status:pending

# Numeric comparison
filter=data.total:gt:1000

# Multiple values
filter=data.status:in:pending,processing,shipped

# Nested fields
filter=data.address.city:New York

Rate Limits

Connection limits are based on workspace plan:

Plan Concurrent Connections
Free 10 per workspace
Standard 100 per workspace
Pro 1,000 per workspace
Enterprise Custom

Connections exceeding the limit receive HTTP 429.

Connection Behavior

Keep-Alive

The server sends comment pings every 30 seconds:

: ping

Connection Timeout

Connections automatically close after 1 hour with a close event:

event: close
data: {"reason":"timeout","reconnect":true}

Clients should reconnect when reconnect: true.

Retry Directive

The server sends a retry directive at connection start:

retry: 3000

This tells SSE clients to wait 3 seconds before reconnecting after disconnection.

Client Implementation

JavaScript (Browser)

const url = new URL('https://api.centrali.io/realtime/workspace/my-workspace/events');
url.searchParams.set('access_token', token);
url.searchParams.set('structures', 'order');

const eventSource = new EventSource(url);

eventSource.addEventListener('message', (e) => {
  const event = JSON.parse(e.data);
  console.log('Event:', event);
});

eventSource.addEventListener('close', (e) => {
  const data = JSON.parse(e.data);
  if (data.reconnect) {
    eventSource.close();
    // Reconnect with fresh token
    setTimeout(() => connect(), 1000);
  }
});

eventSource.onerror = (e) => {
  console.error('Connection error');
};

Node.js

import { EventSource } from 'eventsource';

const url = `https://api.centrali.io/realtime/workspace/my-workspace/events?access_token=${token}`;
const eventSource = new EventSource(url);

eventSource.addEventListener('message', (e) => {
  const event = JSON.parse(e.data);
  console.log('Event:', event);
});

eventSource.addEventListener('error', (e) => {
  console.error('Error:', e);
});

cURL

# Stream events (use -N to disable buffering)
curl -N "https://api.centrali.io/realtime/workspace/my-workspace/events?access_token=eyJ..."

Python

import sseclient
import requests
import json

url = "https://api.centrali.io/realtime/workspace/my-workspace/events"
headers = {"Authorization": f"Bearer {token}"}

response = requests.get(url, headers=headers, stream=True)
client = sseclient.SSEClient(response)

for event in client.events():
    if event.event == "message":
        data = json.loads(event.data)
        print(f"Event: {data}")
    elif event.event == "close":
        data = json.loads(event.data)
        print(f"Connection closing: {data['reason']}")
        if data.get('reconnect'):
            # Reconnect logic here
            pass

Health Endpoints

Liveness

GET /health/live

Returns 200 if service is running.

Readiness

GET /health/ready

Returns 200 if service is ready to accept connections (Redis connected).