SWIRLS_
Language

Streams and Persistence

Persist graph outputs to streams and query them with SQL.

A stream is a project-scoped data store associated with a graph. When persistence is enabled, each graph execution appends its output to the stream. Other graphs can query this data using stream nodes.

Enabling persistence

Add a persistence block inside a graph:

graph submissions {
  label: "Record submission"

  persistence {
    enabled: true
    name: "submission_stream"
    condition: @ts {
      return true
    }
  }

  root {
    type: code
    label: "Entry"
    inputSchema: @json {
      {
        "type": "object",
        "required": ["score", "message"],
        "properties": {
          "score": { "type": "number" },
          "message": { "type": "string" }
        },
        "additionalProperties": false
      }
    }
    outputSchema: @json {
      {
        "type": "object",
        "required": ["score", "message"],
        "properties": {
          "score": { "type": "number" },
          "message": { "type": "string" }
        },
        "additionalProperties": false
      }
    }
    code: @ts {
      const { score, message } = context.nodes.root.input
      return { score: Number(score) ?? 0, message: String(message ?? "").trim() }
    }
  }
}

Persistence fields

FieldTypeRequiredDescription
enabledbooleanYesEnable or disable persistence.
namestringNoStream name. Defaults to the graph name.
conditionts blockYesTypeScript returning a boolean. Receives latestValue and currentCandidate.

The condition function controls whether to persist a given execution. Return true to always persist, or add logic to filter.

Querying streams

Use a stream node in another graph to query persisted data:

graph report {
  label: "Generate Report"

  root {
    type: code
    label: "Entry"
    inputSchema: @json {
      {
        "type": "object",
        "additionalProperties": true
      }
    }
    outputSchema: @json {
      {
        "type": "object",
        "additionalProperties": true
      }
    }
    code: @ts {
      return {}
    }
  }

  node recent {
    type: stream
    label: "Recent submissions"
    stream: "submissions"
    outputSchema: @json {
      {
        "type": "object",
        "required": ["results", "pageInfo"],
        "properties": {
          "results": { "type": "array", "items": { "type": "object" } },
          "pageInfo": { "type": "object" }
        },
        "additionalProperties": false
      }
    }
    query: @sql {
      SELECT * FROM {{table}} ORDER BY created_at DESC LIMIT 10
    }
  }

  node summarize {
    type: code
    label: "Summarize"
    outputSchema: @json {
      {
        "type": "object",
        "required": ["count", "avgScore"],
        "properties": {
          "count": { "type": "number" },
          "avgScore": { "type": "number" }
        },
        "additionalProperties": false
      }
    }
    code: @ts {
      const rows = context.nodes.recent?.output?.results ?? []
      const count = rows.length
      const avgScore = count === 0 ? 0 : rows.reduce((sum, r) => sum + (Number(r?.score) || 0), 0) / count
      return { count, avgScore }
    }
  }

  flow {
    root -> recent
    recent -> summarize
  }
}

SQL in stream queries

  • Use {{table}} as the placeholder for the stream table name.
  • Only SELECT queries are allowed.
  • The LSP provides keyword, table, and column completions when the stream is defined in the same file.
  • Stream schema is derived from the persisting graph's output nodes.

Rules

  • One graph has at most one stream.
  • Stream names must be unique across the project.
  • When syncing from the DSL, the platform creates or updates the stream.

On this page