SWIRLS_
Memory

Streams

Top-level versioned stream blocks that persist workflow output, and stream nodes that read rows with filters.

What it is. Swirls-managed persistent storage for workflow output: structured, typed records.

Use it when you want to keep what a workflow produced and reuse it, in other workflows or later runs of the same one.

Works with workflows (the source), type: stream nodes (the reader), and schemas (the record shape). For files use a disk; for your existing database use postgres.

Streams in .swirls files are declared at the top level: a stream block names a persisted store, points at a source workflow, picks an active writer version, and defines one or more versions: entries with per-version schema, optional condition, and required prepare @ts blocks. A separate workflow uses a type: stream node with an explicit version pin to read rows from that version's table.

Stream data is written and read only in deployed projects. Deploy to Swirls Cloud to exercise streams end to end.

Top-level stream block

FieldTypeRequiredDescription
labelstringNoDisplay name.
descriptionstringNoDescription.
workflowidentifierYesSource workflow whose completions can append rows.
enabledbooleanNoWhen false, no new rows are written.
versionv1, v2, …YesActive writer version; must be a key in versions:.
versionsmapYesvN { schema, condition?, prepare } per version.

Per-version fields (inside versions:):

FieldTypeRequiredDescription
schema@json { } or named schemaYesJSON Schema for that version's row shape.
condition@ts { }NoIf present and falsy, prepare does not run for that completion.
prepare@ts { }YesReturns the object stored for that version (must match its schema).

In condition and prepare, use context.nodes for node outputs and context.output for leaf node outputs keyed by node name (see below).

stream store_topic_tokens {
  label: "Store topic tokens"
  workflow: count_topic_tokens
  enabled: true
  version: v1

  versions: {
    v1 {
      schema: @json {
        {
          "type": "object",
          "required": ["topic", "tokens"],
          "properties": {
            "topic": { "type": "string" },
            "tokens": { "type": "number" }
          }
        }
      }
      condition: @ts {
        return context.output.tokens.tokens > 0
      }
      prepare: @ts {
        return {
          topic: context.nodes.root.output.topic,
          tokens: context.output.tokens.tokens,
        }
      }
    }
  }
}

Deploy provisions one Postgres table per (stream, version) (for example stream_store_topic_tokens_v1 in the project schema). Workflow completion writes only to the deployment's current version. Re-deploying with a changed schema for an existing version id fails with a drift error.

context.output and switch workflows

  • Leaves not downstream of a switch are treated as must-run; their keys on context.output are required in typings.
  • Leaves downstream of a switch may not run in a given execution; each such leaf is optional on context.output.
  • The LSP does not model mutual exclusion between branches, so multiple optional leaves may appear even though only one runs at runtime.

In prepare, narrow which branch ran (for example with if (context.output.short_tokens) { … }) before returning the row. See stream-switch.swirls in the language examples package for a full pattern.

type: stream node

Reads rows from a named stream block at a pinned version.

FieldTypeRequiredDescription
streamidentifierYesTop-level stream block name.
versionv1, v2, …YesMust match a versions: key on that stream.
filter@ts { }YesReturns a filter object (e.g. { field: { eq: value } }). Use return {} for no filter.
node pull {
  type: stream
  label: "Matching rows"
  stream: store_topic_tokens
  version: v1
  filter: @ts {
    return {
      topic: { eq: context.nodes.root.output.topic },
    }
  }
}

Filter operators: eq, ne, gt, gte, lt, lte, like, in. Multiple top-level keys and multiple operators on one field combine with AND. Field keys name either a payload field from the version's schema or a system field (id, created_at, deployment_id, workflow_execution_id).

Downstream nodes read the array as context.nodes.pull.output (or your node name).

Where stream data lives

After deploy, each stream version has a dedicated table under the project stream schema (project_<uuid>.stream_<name>_<version>). The platform maps JSON Schema properties to Postgres columns; writers insert on workflow completion with the active deployment tagged on each row. Stream nodes and the API read by pinned version across all deployments in the project (optional filters can still target deployment_id).

Further reading

On this page