← Back to Cookbook

Stream Reporting Dashboard

One graph logs events with persistence, another queries the stream and emails a report.

streamresend

Source

/**
 * Demonstrates persistence and stream queries.
 * One graph logs events, another queries and reports on them.
 */

webhook log_event {
  label: "Log Event"
  schema: @json {
    {
      "type": "object",
      "required": ["event_type", "value"],
      "properties": {
        "event_type": { "type": "string" },
        "value": { "type": "number" },
        "source": { "type": "string" }
      }
    }
  }
}

schedule daily_report {
  label: "Daily Report"
  cron: "0 17 * * *"
  timezone: "UTC"
}

graph record_event {
  label: "Record Event"

  persistence {
    enabled: true
    condition: @ts { return true }
    name: "events"
  }

  root {
    type: code
    label: "Store event"
    code: @ts { return context.nodes.root.input }
    outputSchema: @json {
      {
        "type": "object",
        "properties": {
          "event_type": { "type": "string" },
          "value": { "type": "number" },
          "source": { "type": "string" }
        }
      }
    }
  }
}

graph generate_report {
  label: "Generate Daily Report"

  root {
    type: code
    label: "Start"
    code: @ts { return { date: new Date().toISOString().split("T")[0] } }
    outputSchema: @json {
      {
        "type": "object",
        "properties": {
          "date": { "type": "string" }
        }
      }
    }
  }

  node recent_events {
    type: stream
    label: "Query recent events"
    stream: "events"
    query: @sql {
      SELECT "root.event_type" AS event_type, "root.value" AS value, "root.source" AS source
      FROM {{table}}
      WHERE created_at > NOW() - INTERVAL '24 hours'
      ORDER BY created_at DESC
      LIMIT 100
    }
    schema: @json {
      {
        "type": "array",
        "items": {
          "type": "object",
          "properties": {
            "event_type": { "type": "string" },
            "value": { "type": "number" },
            "source": { "type": "string" }
          }
        }
      }
    }
  }

  node summarize {
    type: code
    label: "Summarize events"
    code: @ts {
      const events = context.nodes.recent_events.output || []
      const byType = {}
      for (const event of events) {
        const t = event.event_type || "unknown"
        if (!byType[t]) byType[t] = { count: 0, total_value: 0 }
        byType[t].count += 1
        byType[t].total_value += event.value || 0
      }
      return {
        date: context.nodes.root.output.date,
        total_events: events.length,
        by_type: byType
      }
    }
  }

  node send_report {
    type: resend
    label: "Send report"
    from: @ts { return "[email protected]" }
    to: @ts { return "[email protected]" }
    subject: @ts { return "Daily Event Report - " + context.nodes.root.output.date }
    text: @ts {
      const summary = context.nodes.summarize.output
      return "Daily Event Report\n\nTotal events: " + summary.total_events + "\n\nBreakdown:\n" + JSON.stringify(summary.by_type, null, 2)
    }
  }

  flow {
    root -> recent_events
    recent_events -> summarize
    summarize -> send_report
  }
}

trigger on_event {
  webhook:log_event -> record_event
  enabled: true
}

trigger on_daily_report {
  schedule:daily_report -> generate_report
  enabled: true
}

Flow

Trigger → graph

Graph nodes