Realtime API Reference¶
HTTP API reference for the Centrali Realtime Service. For most use cases, we recommend using the SDK which handles authentication, reconnection, and event parsing automatically.
Overview¶
The Realtime Service uses Server-Sent Events (SSE) to stream record events to clients. SSE is a standard HTTP-based protocol supported by all modern browsers and most HTTP clients.
Base URL: https://api.centrali.io/realtime
Endpoints¶
Subscribe to Events¶
Opens an SSE stream for receiving record events in the specified workspace.
Path Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
workspaceSlug | string | Yes | The workspace to subscribe to |
Query Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
access_token | string | Yes* | JWT access token for authentication |
structures | string | No | Comma-separated structure slugs to filter |
events | string | No | Comma-separated event types to filter |
filter | string | No | CFL v1 filter expression |
*The access_token can also be provided via the Authorization: Bearer <token> header.
Example Request¶
curl -N "https://api.centrali.io/realtime/workspace/my-workspace/events?\
access_token=eyJhbG...&\
structures=order,invoice&\
events=record_created,record_updated&\
filter=data.status:pending"
Response Headers¶
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no
SSE Stream Format¶
The server sends events in standard SSE format:
retry: 3000
: connected to workspace my-workspace
event: message
data: {"event":"record_created","workspaceSlug":"my-workspace","recordSlug":"order","recordId":"rec_abc123","data":{"customer":"John","total":99.99},"timestamp":"2025-01-15T10:30:00Z","createdBy":"user_123"}
: ping
event: message
data: {"event":"record_updated","workspaceSlug":"my-workspace","recordSlug":"order","recordId":"rec_abc123","data":{"before":{"status":"pending"},"after":{"status":"shipped"}},"timestamp":"2025-01-15T10:35:00Z","updatedBy":"user_456"}
event: close
data: {"reason":"timeout","reconnect":true}
SSE Event Types¶
| Event Name | Description |
|---|---|
message | Record event (create, update, delete) |
close | Connection closing, includes reconnection hint |
| (comment) | Keep-alive ping or status message |
Event Payloads¶
Record Event¶
{
"event": "record_created",
"workspaceSlug": "my-workspace",
"recordSlug": "order",
"recordId": "rec_abc123",
"data": {
"customer": "John Doe",
"total": 99.99,
"status": "pending"
},
"timestamp": "2025-01-15T10:30:00Z",
"createdBy": "user_123"
}
| Field | Type | Description |
|---|---|---|
event | string | Event type: record_created, record_updated, record_deleted |
workspaceSlug | string | Workspace where event occurred |
recordSlug | string | Structure's slug (e.g., "order") |
recordId | string | Unique record identifier |
data | object | Record data (see format below) |
timestamp | string | ISO 8601 timestamp |
createdBy | string | User ID (for create events) |
updatedBy | string | User ID (for update events) |
deletedBy | string | User ID (for delete events) |
Data Format by Event Type¶
record_created:
record_updated:
record_deleted:
Close Event¶
Sent before server-initiated disconnection:
| Field | Type | Description |
|---|---|---|
reason | string | Reason for closing: timeout, permission_revoked, maintenance |
reconnect | boolean | Whether client should attempt to reconnect |
Error Responses¶
Error response format varies by error type. Authentication errors return JSON; other errors return plain text.
400 Bad Request¶
Plain text response:
401 Unauthorized¶
JSON response with error code:
403 Forbidden¶
JSON response with error code:
429 Too Many Requests¶
JSON response for plan-based rate limits:
Plain text for workspace connection limits:
500 Internal Server Error¶
JSON response for auth service errors:
Plain text for other errors:
503 Service Unavailable¶
Plain text response:
Error Codes¶
Note: Not all errors return a code field. Authentication errors (401, 403) include JSON with codes; other errors may return plain text.
| Code | HTTP Status | Description | Recoverable |
|---|---|---|---|
MISSING_TOKEN | 401 | No token provided | No |
TOKEN_EXPIRED | 401 | Token has expired | Yes |
INVALID_TOKEN | 401 | Token is malformed or signature invalid | No |
WORKSPACE_MISMATCH | 403 | Token not valid for requested workspace | No |
FORBIDDEN | 403 | User lacks permission | No |
AUTH_ERROR | 500 | Authorization service unavailable | Yes |
RATE_LIMIT_EXCEEDED | 429 | Connection limit reached | Yes |
CFL Filter Syntax¶
Centrali Filter Language (CFL) v1 for filtering events by data values.
Format¶
Operators¶
| Operator | Description | Example |
|---|---|---|
eq | Equals (default) | data.status:shipped |
ne | Not equals | data.status:ne:cancelled |
gt | Greater than | data.amount:gt:100 |
lt | Less than | data.amount:lt:50 |
gte | Greater than or equal | data.priority:gte:5 |
lte | Less than or equal | data.quantity:lte:10 |
in | Value in list | data.status:in:a,b,c |
nin | Not in list | data.status:nin:x,y |
contains | Contains substring | data.name:contains:test |
startswith | Starts with | data.sku:startswith:PROD- |
endswith | Ends with | data.email:endswith:@co.com |
Event Data Structure¶
Filter paths depend on the event type:
| Event | Data Structure | Filter Example |
|---|---|---|
record_created | { field, ... } | data.status:pending |
record_updated | { before: {...}, after: {...} } | data.after.status:shipped |
record_deleted | { field, ... } | data.status:archived |
Examples¶
# Simple equality (for create/delete events)
filter=data.status:shipped
# For update events, filter on new value
filter=data.after.status:shipped
# Or filter on old value
filter=data.before.status:pending
# Numeric comparison
filter=data.total:gt:1000
# Multiple values
filter=data.status:in:pending,processing,shipped
# Nested fields
filter=data.address.city:New York
Rate Limits¶
Connection limits are based on workspace plan:
| Plan | Concurrent Connections |
|---|---|
| Free | 10 per workspace |
| Standard | 100 per workspace |
| Pro | 1,000 per workspace |
| Enterprise | Custom |
Connections exceeding the limit receive HTTP 429.
Connection Behavior¶
Keep-Alive¶
The server sends comment pings every 30 seconds:
Connection Timeout¶
Connections automatically close after 1 hour with a close event:
Clients should reconnect when reconnect: true.
Retry Directive¶
The server sends a retry directive at connection start:
This tells SSE clients to wait 3 seconds before reconnecting after disconnection.
Client Implementation¶
JavaScript (Browser)¶
const url = new URL('https://api.centrali.io/realtime/workspace/my-workspace/events');
url.searchParams.set('access_token', token);
url.searchParams.set('structures', 'order');
const eventSource = new EventSource(url);
eventSource.addEventListener('message', (e) => {
const event = JSON.parse(e.data);
console.log('Event:', event);
});
eventSource.addEventListener('close', (e) => {
const data = JSON.parse(e.data);
if (data.reconnect) {
eventSource.close();
// Reconnect with fresh token
setTimeout(() => connect(), 1000);
}
});
eventSource.onerror = (e) => {
console.error('Connection error');
};
Node.js¶
import { EventSource } from 'eventsource';
const url = `https://api.centrali.io/realtime/workspace/my-workspace/events?access_token=${token}`;
const eventSource = new EventSource(url);
eventSource.addEventListener('message', (e) => {
const event = JSON.parse(e.data);
console.log('Event:', event);
});
eventSource.addEventListener('error', (e) => {
console.error('Error:', e);
});
cURL¶
# Stream events (use -N to disable buffering)
curl -N "https://api.centrali.io/realtime/workspace/my-workspace/events?access_token=eyJ..."
Python¶
import sseclient
import requests
import json
url = "https://api.centrali.io/realtime/workspace/my-workspace/events"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers, stream=True)
client = sseclient.SSEClient(response)
for event in client.events():
if event.event == "message":
data = json.loads(event.data)
print(f"Event: {data}")
elif event.event == "close":
data = json.loads(event.data)
print(f"Connection closing: {data['reason']}")
if data.get('reconnect'):
# Reconnect logic here
pass
Health Endpoints¶
Liveness¶
Returns 200 if service is running.
Readiness¶
Returns 200 if service is ready to accept connections (Redis connected).
Related Documentation¶
- SDK Quickstart - Recommended approach
- Authentication - Token management
- Filtering - CFL filter syntax
- Troubleshooting - Common issues