Language
Streams and Persistence
Persist graph outputs to streams and query them with SQL.
A stream is a project-scoped data store associated with a graph. When persistence is enabled, each graph execution appends its output to the stream. Other graphs can query this data using stream nodes.
Enabling persistence
Add a persistence block inside a graph:
graph submissions {
label: "Record submission"
persistence {
enabled: true
name: "submission_stream"
condition: @ts {
return true
}
}
root {
type: code
label: "Entry"
inputSchema: @json {
{
"type": "object",
"required": ["score", "message"],
"properties": {
"score": { "type": "number" },
"message": { "type": "string" }
},
"additionalProperties": false
}
}
outputSchema: @json {
{
"type": "object",
"required": ["score", "message"],
"properties": {
"score": { "type": "number" },
"message": { "type": "string" }
},
"additionalProperties": false
}
}
code: @ts {
const { score, message } = context.nodes.root.input
return { score: Number(score) ?? 0, message: String(message ?? "").trim() }
}
}
}Persistence fields
| Field | Type | Required | Description |
|---|---|---|---|
enabled | boolean | Yes | Enable or disable persistence. |
name | string | No | Stream name. Defaults to the graph name. |
condition | ts block | Yes | TypeScript returning a boolean. Receives latestValue and currentCandidate. |
The condition function controls whether to persist a given execution. Return true to always persist, or add logic to filter.
Querying streams
Use a stream node in another graph to query persisted data:
graph report {
label: "Generate Report"
root {
type: code
label: "Entry"
inputSchema: @json {
{
"type": "object",
"additionalProperties": true
}
}
outputSchema: @json {
{
"type": "object",
"additionalProperties": true
}
}
code: @ts {
return {}
}
}
node recent {
type: stream
label: "Recent submissions"
stream: "submissions"
outputSchema: @json {
{
"type": "object",
"required": ["results", "pageInfo"],
"properties": {
"results": { "type": "array", "items": { "type": "object" } },
"pageInfo": { "type": "object" }
},
"additionalProperties": false
}
}
query: @sql {
SELECT * FROM {{table}} ORDER BY created_at DESC LIMIT 10
}
}
node summarize {
type: code
label: "Summarize"
outputSchema: @json {
{
"type": "object",
"required": ["count", "avgScore"],
"properties": {
"count": { "type": "number" },
"avgScore": { "type": "number" }
},
"additionalProperties": false
}
}
code: @ts {
const rows = context.nodes.recent?.output?.results ?? []
const count = rows.length
const avgScore = count === 0 ? 0 : rows.reduce((sum, r) => sum + (Number(r?.score) || 0), 0) / count
return { count, avgScore }
}
}
flow {
root -> recent
recent -> summarize
}
}SQL in stream queries
- Use
{{table}}as the placeholder for the stream table name. - Only
SELECTqueries are allowed. - The LSP provides keyword, table, and column completions when the stream is defined in the same file.
- Stream schema is derived from the persisting graph's output nodes.
Rules
- One graph has at most one stream.
- Stream names must be unique across the project.
- When syncing from the DSL, the platform creates or updates the stream.