Streams
Top-level stream blocks that persist graph output, and stream nodes that read rows with filters.
Streams in .swirls files are declared at the top level: a stream block names a persisted store, points at a source graph, and defines condition and prepare @ts blocks that shape each stored row. A separate graph can use a type: stream node to read from that store.
Top-level stream block
| Field | Type | Required | Description |
|---|---|---|---|
label | string | No | Display name. |
description | string | No | Description. |
graph | identifier | Yes | Source graph whose runs can append rows when conditions pass. |
enabled | boolean | No | Whether new rows are written. |
schema | @json { } or named schema | Yes | JSON Schema for one persisted row. |
condition | @ts { } | Yes | If true, a row is prepared for persistence. |
prepare | @ts { } | Yes | Returns the object to store (must match schema). |
In condition and prepare, use context.nodes for node outputs and context.output for leaf node outputs keyed by node name (see below).
graph count_topic_tokens {
label: "Count tokens for a topic"
root {
type: code
label: "Accept topic"
inputSchema: @json {
{ "type": "object", "required": ["topic"], "properties": { "topic": { "type": "string" } } }
}
outputSchema: @json {
{ "type": "object", "required": ["topic", "length"], "properties": { "topic": { "type": "string" }, "length": { "type": "number" } } }
}
code: @ts {
const { topic } = context.nodes.root.input
return { topic, length: topic.length }
}
}
node tokens {
type: code
label: "Count tokens"
schema: @json {
{ "type": "object", "required": ["tokens"], "properties": { "tokens": { "type": "number" } } }
}
code: @ts {
return { tokens: context.nodes.root.output.length * 3 }
}
}
flow {
root -> tokens
}
}
stream store_topic_tokens {
label: "Store topic tokens"
graph: count_topic_tokens
enabled: true
schema: @json {
{
"type": "object",
"required": ["topic", "tokens"],
"properties": {
"topic": { "type": "string" },
"tokens": { "type": "number" }
}
}
}
condition: @ts {
return context.output.tokens.tokens > 0
}
prepare: @ts {
return {
topic: context.nodes.root.output.topic,
tokens: context.output.tokens.tokens
}
}
}context.output and switch graphs
- Leaves not downstream of a switch are treated as must-run; their keys on
context.outputare required in typings. - Leaves downstream of a switch may not run in a given execution; each such leaf is optional on
context.output. - The LSP does not model mutual exclusion between branches, so multiple optional leaves may appear even though only one runs at runtime.
In prepare, narrow which branch ran (for example with if (context.output.short_tokens) { … }) before returning the row. See stream-switch.swirls in the language examples package for a full pattern.
type: stream node
Reads rows from a stream block in the same file by name.
| Field | Type | Required | Description |
|---|---|---|---|
stream | identifier | Yes | Top-level stream block name. |
filter | @ts { } | Yes* | Returns a filter object (e.g. { field: { eq: value } }). Operators: eq, ne, gt, gte, lt, lte, like, in. |
schema | @json { } | Recommended | Usually type: "array" of row objects. |
*The engine expects a filter; use return {} for no filter.
node pull {
type: stream
label: "Matching rows"
stream: store_topic_tokens
filter: @ts {
return {
topic: { eq: context.nodes.root.output.topic },
}
}
}Downstream nodes read the array as context.nodes.pull.output (or your node name).