Records API Reference¶
Records are the actual data entries in Centrali. Each record belongs to a Structure (schema) and contains the data fields defined by that structure.
Core Features¶
- CRUD Operations - Create, read, update, and delete records
- Upsert - Atomic create-or-update with advisory locking
- Bulk Operations - Process multiple records in a single request
- Version History - Track all changes with automatic versioning
- Change Logs - Detailed audit trail of who changed what and when
- Validation - Automatic validation against structure rules
- Relationships - Reference other records and structures
- Querying - Powerful query language for complex data retrieval
- Real-time Events - Webhooks and triggers on data changes
Authentication¶
All Records API endpoints require authentication:
Base URL¶
Replace {workspace} with your workspace identifier.
Create a Record¶
Single Record Creation¶
POST /records
Creates a new record based on a structure.
Request Body¶
{
"structureId": "str_abc123",
"data": {
"field1": "value1",
"field2": "value2"
},
"metadata": {
"source": "api",
"userId": "user123"
}
}
| Field | Type | Required | Description |
|---|---|---|---|
| structureId | string | Yes | ID of the structure |
| data | object | Yes | Record data matching structure fields |
| metadata | object | No | Additional metadata for tracking |
Response¶
{
"id": "rec_xyz789",
"structureId": "str_abc123",
"workspaceId": "ws_123",
"data": {
"field1": "value1",
"field2": "value2"
},
"metadata": {
"source": "api",
"userId": "user123"
},
"version": 1,
"createdAt": "2024-01-15T10:30:00Z",
"updatedAt": "2024-01-15T10:30:00Z",
"createdBy": "user_abc",
"updatedBy": "user_abc"
}
Example¶
curl -X POST "https://api.centrali.io/data/workspace/acme/api/v1/records" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"structureId": "str_product",
"data": {
"name": "Premium Widget",
"price": 99.99,
"inStock": true,
"category": "electronics"
}
}'
Bulk Record Creation¶
POST /records/bulk
Create multiple records in a single request.
Request Body¶
{
"structureId": "str_abc123",
"records": [
{
"data": {
"name": "Record 1"
}
},
{
"data": {
"name": "Record 2"
}
}
],
"options": {
"skipValidation": false,
"returnRecords": true
}
}
| Field | Type | Required | Description |
|---|---|---|---|
| structureId | string | Yes | ID of the structure |
| records | array | Yes | Array of record objects |
| options | object | No | Bulk operation options |
Response¶
Read Records¶
Get a Single Record¶
GET /records/{recordId}
Retrieve a specific record by ID.
Path Parameters¶
| Parameter | Type | Description |
|---|---|---|
| recordId | string | The record ID |
Query Parameters¶
| Parameter | Type | Description |
|---|---|---|
| includeHistory | boolean | Include version history |
| includeChangelog | boolean | Include change log |
| version | number | Get specific version |
Response¶
{
"id": "rec_xyz789",
"structureId": "str_abc123",
"data": {
"field1": "value1",
"field2": "value2"
},
"version": 2,
"createdAt": "2024-01-15T10:30:00Z",
"updatedAt": "2024-01-15T11:30:00Z"
}
Example¶
# Get latest version
curl "https://api.centrali.io/data/workspace/acme/api/v1/records/rec_xyz789" \
-H "Authorization: Bearer YOUR_API_KEY"
# Get specific version
curl "https://api.centrali.io/data/workspace/acme/api/v1/records/rec_xyz789?version=1" \
-H "Authorization: Bearer YOUR_API_KEY"
# Include history
curl "https://api.centrali.io/data/workspace/acme/api/v1/records/rec_xyz789?includeHistory=true" \
-H "Authorization: Bearer YOUR_API_KEY"
List Records¶
GET /records
List records with filtering, pagination, and sorting.
Query Parameters¶
| Parameter | Type | Description |
|---|---|---|
| structureId | string | Filter by structure ID |
| filter | string | Filter expression |
| sort | string | Sort field and direction |
| limit | number | Maximum records to return (default: 20, max: 100) |
| offset | number | Number of records to skip |
| cursor | string | Cursor for pagination |
| includeCount | boolean | Include total count |
Filter Syntax¶
Filters use a simple key:value syntax with operators:
- Equality:
status:active - Inequality:
price:>100 - Range:
date:2024-01-01..2024-12-31 - Contains:
name:*widget* - In:
category:[electronics,accessories] - Not:
status:!archived - Multiple:
status:active,price:>50(AND)
Response¶
{
"results": [
{
"id": "rec_123",
"structureId": "str_abc123",
"data": {...}
}
],
"pagination": {
"limit": 20,
"offset": 0,
"total": 150,
"hasMore": true,
"nextCursor": "eyJpZCI6InJlY18xMjMifQ=="
}
}
Example¶
# Get all records for a structure
curl "https://api.centrali.io/data/workspace/acme/api/v1/records?structureId=str_product" \
-H "Authorization: Bearer YOUR_API_KEY"
# Filter and sort
curl "https://api.centrali.io/data/workspace/acme/api/v1/records?structureId=str_product&filter=inStock:true,price:>50&sort=-price&limit=10" \
-H "Authorization: Bearer YOUR_API_KEY"
# Pagination with cursor
curl "https://api.centrali.io/data/workspace/acme/api/v1/records?cursor=eyJpZCI6InJlY18xMjMifQ==&limit=20" \
-H "Authorization: Bearer YOUR_API_KEY"
Upsert a Record¶
POST /records/slug/{structureSlug}/upsert
Atomically find a record by match fields and update it, or create a new one if no match exists. Uses PostgreSQL advisory locking to prevent race conditions, making it safe for concurrent calls with the same match criteria.
The caller needs both CREATE and UPDATE permissions, since the outcome depends on whether a matching record already exists.
Request Body¶
{
"match": {
"externalId": "ext-123"
},
"data": {
"externalId": "ext-123",
"name": "Widget",
"price": 9.99
}
}
| Field | Type | Required | Description |
|---|---|---|---|
| match | object | Yes | Key-value pairs that identify the record (business key). Values must be primitives (string, number, boolean, null). |
| data | object | Yes | Full record data for create, or fields to update. On create, match fields are merged into data. On update, match fields are stripped to preserve the business key. |
Response¶
HTTP 201 — Record was created:
{
"data": {
"id": "rec_abc123",
"data": {
"externalId": "ext-123",
"name": "Widget",
"price": 9.99
},
"version": 1,
"createdAt": "2026-02-12T10:30:00Z"
},
"operation": "created"
}
HTTP 200 — Existing record was updated:
{
"data": {
"id": "rec_abc123",
"data": {
"externalId": "ext-123",
"name": "Widget",
"price": 12.99
},
"version": 2,
"updatedAt": "2026-02-12T11:00:00Z"
},
"operation": "updated"
}
Example¶
curl -X POST "https://api.centrali.io/data/workspace/acme/api/v1/records/slug/products/upsert" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"match": { "sku": "WDG-001" },
"data": {
"sku": "WDG-001",
"name": "Premium Widget",
"price": 29.99,
"inStock": true
}
}'
Idempotency¶
You can pass an Idempotency-Key header for safe HTTP retries. If the same key is sent again within the deduplication window, the original response is returned without re-executing the operation.
curl -X POST "https://api.centrali.io/data/workspace/acme/api/v1/records/slug/products/upsert" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-H "Idempotency-Key: import-batch-42-sku-WDG-001" \
-d '{
"match": { "sku": "WDG-001" },
"data": { "sku": "WDG-001", "name": "Premium Widget", "price": 29.99 }
}'
Use Cases¶
- External data sync — Sync records from webhooks, scheduled imports, or API polling using a stable external ID as the match key
- Event aggregation — Accumulate counters, rollups, or running totals in compute functions
- Idempotent writes — Safely retry writes in compute functions with at-least-once delivery without creating duplicates
- Deduplication — Ensure exactly one record exists per business key
Behavior Notes¶
- Atomicity: The entire operation runs inside a database transaction with a PostgreSQL advisory lock keyed on the match criteria. Concurrent calls with the same match fields are serialized automatically.
- Match field protection: On update, match field values are stripped from the data payload so the business key cannot be accidentally overwritten.
- Type coercion: JSONB comparison means
{ "count": 42 }and{ "count": "42" }are treated as different values. - Null matching: A match value of
nullmatches records where the field exists with a JSONnullvalue. - Versioning: Both create and update paths increment the record version and create changelog entries.
Update Records¶
Update a Single Record¶
PATCH /records/{recordId}
Update specific fields of a record.
Request Body¶
{
"data": {
"field1": "new value",
"field2": null
},
"metadata": {
"reason": "Price update"
},
"options": {
"createVersion": true,
"skipValidation": false
}
}
| Field | Type | Required | Description |
|---|---|---|---|
| data | object | Yes | Fields to update (null to unset) |
| metadata | object | No | Update metadata |
| options | object | No | Update options |
Response¶
{
"id": "rec_xyz789",
"structureId": "str_abc123",
"data": {
"field1": "new value",
"field2": null,
"field3": "unchanged"
},
"version": 2,
"previousVersion": 1,
"updatedAt": "2024-01-15T12:00:00Z",
"updatedBy": "user_abc"
}
Example¶
curl -X PATCH "https://api.centrali.io/data/workspace/acme/api/v1/records/rec_xyz789" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"data": {
"price": 89.99,
"inStock": false
},
"metadata": {
"reason": "Sale pricing"
}
}'
Replace a Record¶
PUT /records/{recordId}
Replace all fields of a record.
Request Body¶
All fields not specified will be removed.
Bulk Update¶
PATCH /records/bulk
Update multiple records at once.
Request Body¶
{
"updates": [
{
"id": "rec_123",
"data": {
"status": "archived"
}
},
{
"id": "rec_456",
"data": {
"status": "archived"
}
}
],
"options": {
"skipValidation": false,
"createVersions": true
}
}
Response¶
Delete Records¶
Delete a Single Record¶
DELETE /records/{recordId}
Delete a record (soft delete by default).
Query Parameters¶
| Parameter | Type | Description |
|---|---|---|
| permanent | boolean | Permanently delete (no recovery) |
Response¶
Example¶
# Soft delete
curl -X DELETE "https://api.centrali.io/data/workspace/acme/api/v1/records/rec_xyz789" \
-H "Authorization: Bearer YOUR_API_KEY"
# Permanent delete
curl -X DELETE "https://api.centrali.io/data/workspace/acme/api/v1/records/rec_xyz789?permanent=true" \
-H "Authorization: Bearer YOUR_API_KEY"
Bulk Delete¶
DELETE /records/bulk
Delete multiple records.
Request Body¶
Version History¶
Get Record History¶
GET /records/{recordId}/history
Get the complete version history of a record.
Response¶
{
"versions": [
{
"version": 3,
"data": {...},
"updatedAt": "2024-01-15T13:00:00Z",
"updatedBy": "user_abc",
"changes": {
"price": {
"old": 99.99,
"new": 89.99
}
}
},
{
"version": 2,
"data": {...},
"updatedAt": "2024-01-15T12:00:00Z"
},
{
"version": 1,
"data": {...},
"createdAt": "2024-01-15T10:00:00Z"
}
]
}
Restore Version¶
POST /records/{recordId}/restore
Restore a record to a previous version.
Request Body¶
Response¶
Change Logs¶
Get Record Changelog¶
GET /records/{recordId}/changelog
Get detailed change history with who, what, when.
Query Parameters¶
| Parameter | Type | Description |
|---|---|---|
| limit | number | Number of entries |
| startDate | string | Filter by date range |
| endDate | string | Filter by date range |
Response¶
{
"entries": [
{
"id": "log_abc123",
"recordId": "rec_xyz789",
"action": "update",
"userId": "user_abc",
"userName": "John Doe",
"timestamp": "2024-01-15T13:00:00Z",
"changes": {
"price": {
"field": "price",
"oldValue": 99.99,
"newValue": 89.99
}
},
"metadata": {
"reason": "Sale pricing",
"ipAddress": "192.168.1.1"
}
}
]
}
Advanced Queries¶
Query Language¶
POST /query
Execute complex queries using Centrali Query Language (CQL).
Request Body¶
{
"query": "FROM Product WHERE price > 50 AND inStock = true ORDER BY price DESC LIMIT 10",
"parameters": {
"minPrice": 50
}
}
CQL Syntax¶
FROM {Structure}
WHERE {conditions}
ORDER BY {field} [ASC|DESC]
LIMIT {number}
OFFSET {number}
INCLUDE {relation} FROM {Structure} WHERE {conditions}
Examples¶
-- Basic query
FROM Product WHERE category = "electronics"
-- Complex conditions
FROM Product WHERE price BETWEEN 50 AND 200 AND (inStock = true OR preorder = true)
-- With relationships
FROM Order
WHERE status = "pending"
INCLUDE items FROM OrderItem WHERE orderId = Order.id
INCLUDE customer FROM Customer WHERE id = Order.customerId
-- Aggregation
FROM Order
GROUP BY status
SELECT status, COUNT(*) as count, SUM(total) as totalRevenue
-- Full text search
FROM Article WHERE SEARCH("machine learning") IN (title, content)
Relationships¶
Reference Fields¶
Create relationships between records using reference fields:
{
"structureId": "str_order",
"data": {
"customerId": "rec_customer123", // Reference to Customer record
"items": ["rec_item1", "rec_item2"], // Array of references
"shippingAddress": { // Embedded reference
"addressId": "rec_address456"
}
}
}
Expanding References¶
GET /records/{recordId}?expand=customerId,items
Automatically expand referenced records in the response:
{
"id": "rec_order789",
"data": {
"customerId": {
"id": "rec_customer123",
"data": {
"name": "John Doe",
"email": "john@example.com"
}
},
"items": [
{
"id": "rec_item1",
"data": {
"name": "Widget",
"price": 29.99
}
}
]
}
}
Webhooks & Events¶
Records API automatically triggers events that can be captured by webhooks or compute functions:
Events¶
record.created- New record created (also emitted on upsert-create)record.updated- Record updated (also emitted on upsert-update)record.deleted- Record deletedrecord.restored- Record restored from versionrecord.bulk.created- Bulk create completedrecord.bulk.updated- Bulk update completedrecord.bulk.deleted- Bulk delete completed
Event Payload¶
{
"event": "record.updated",
"timestamp": "2024-01-15T13:00:00Z",
"workspace": "acme",
"data": {
"record": {
"id": "rec_xyz789",
"structureId": "str_product",
"data": {...}
},
"previous": {
"data": {...}
},
"changes": {
"price": {
"old": 99.99,
"new": 89.99
}
},
"metadata": {
"userId": "user_abc",
"source": "api"
}
}
}
Error Handling¶
Error Response Format¶
{
"error": {
"code": "VALIDATION_ERROR",
"message": "Record validation failed",
"details": {
"field": "price",
"constraint": "minimum",
"value": -10,
"message": "Price must be greater than 0"
}
}
}
Common Error Codes¶
| Code | HTTP Status | Description |
|---|---|---|
| RECORD_NOT_FOUND | 404 | Record doesn't exist |
| STRUCTURE_NOT_FOUND | 404 | Structure doesn't exist |
| VALIDATION_ERROR | 400 | Data validation failed |
| DUPLICATE_KEY | 409 | Unique constraint violation |
| VERSION_CONFLICT | 409 | Version mismatch during update |
| PERMISSION_DENIED | 403 | Insufficient permissions |
| QUOTA_EXCEEDED | 429 | Rate limit or storage quota exceeded |
Best Practices¶
Pagination¶
Always use pagination for large datasets:
async function getAllRecords(structureId) {
let allRecords = [];
let cursor = null;
do {
const response = await fetch(
`/records?structureId=${structureId}&limit=100${cursor ? `&cursor=${cursor}` : ''}`,
{ headers: { 'Authorization': `Bearer ${API_KEY}` } }
);
const data = await response.json();
allRecords = allRecords.concat(data.results);
cursor = data.pagination.nextCursor;
} while (cursor);
return allRecords;
}
Batch Operations¶
Use bulk endpoints for multiple operations:
// Instead of this
for (const record of records) {
await createRecord(record);
}
// Do this
await createRecordsBulk(records);
Version Control¶
Track important changes with versioning:
// Enable versioning for critical updates
await updateRecord(recordId, {
data: { status: 'approved' },
options: { createVersion: true },
metadata: { reason: 'Manager approval', approvedBy: userId }
});
Error Recovery¶
Implement retry logic for transient failures:
async function createRecordWithRetry(data, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
return await createRecord(data);
} catch (error) {
if (error.code === 'RATE_LIMIT' && i < maxRetries - 1) {
await sleep(Math.pow(2, i) * 1000); // Exponential backoff
} else {
throw error;
}
}
}
}
Rate Limits¶
| Operation | Limit | Window |
|---|---|---|
| Create/Update/Delete | 1000 | Per minute |
| Read/List | 5000 | Per minute |
| Bulk operations | 100 | Per minute |
| Query | 500 | Per minute |
Exceeding limits returns HTTP 429 with retry-after header.
SDK Examples¶
JavaScript/TypeScript¶
import { CentraliClient } from '@centrali/sdk';
const client = new CentraliClient({
workspace: 'acme',
apiKey: process.env.CENTRALI_API_KEY
});
// Create
const record = await client.records.create({
structureId: 'str_product',
data: {
name: 'Widget',
price: 99.99
}
});
// Read
const product = await client.records.get('rec_xyz789');
// Update
await client.records.update('rec_xyz789', {
data: { price: 89.99 }
});
// Upsert
const upsertResult = await client.upsertRecord('Product', {
match: { sku: 'WDG-001' },
data: { sku: 'WDG-001', name: 'Widget', price: 29.99 }
});
// upsertResult.operation → 'created' or 'updated'
// Query
const results = await client.query(
'FROM Product WHERE price < 100 ORDER BY createdAt DESC'
);
// Delete
await client.records.delete('rec_xyz789');
Python¶
from centrali import CentraliClient
client = CentraliClient(
workspace='acme',
api_key=os.environ['CENTRALI_API_KEY']
)
# Create
record = client.records.create(
structure_id='str_product',
data={
'name': 'Widget',
'price': 99.99
}
)
# Read
product = client.records.get('rec_xyz789')
# Update
client.records.update('rec_xyz789', {
'price': 89.99
})
# Query
results = client.query(
'FROM Product WHERE price < 100 ORDER BY createdAt DESC'
)
# Delete
client.records.delete('rec_xyz789')
Related Documentation¶
- Structures API - Define your data schemas
- Query Language - Advanced querying
- Compute Functions - Process record events
- Webhooks - Real-time notifications
- Access Control - Permissions and security