← Back to Cookbook

Data Quality Checker

Validates incoming data records against quality rules and quarantines bad data.

switch

Source

/**
 * Validates incoming data records against quality rules
 * and quarantines bad data.
 */

webhook data_record {
  label: "Data Record"
  schema: @json {
    {
      "type": "object",
      "required": ["source", "payload"],
      "properties": {
        "source": { "type": "string" },
        "payload": { "type": "object" }
      }
    }
  }
}

graph check_quality {
  label: "Check Data Quality"

  persistence {
    enabled: true
    condition: @ts { return true }
    name: "quality_log"
  }

  root {
    type: code
    label: "Extract record"
    code: @ts { return context.nodes.root.input }
    outputSchema: @json {
      {
        "type": "object",
        "properties": {
          "source": { "type": "string" },
          "payload": { "type": "object" }
        }
      }
    }
  }

  node validate {
    type: code
    label: "Run validation rules"
    code: @ts {
      const record = context.nodes.root.output
      const issues = []

      if (!record.payload || typeof record.payload !== "object") {
        issues.push("payload is not an object")
      }

      const keys = Object.keys(record.payload || {})
      if (keys.length === 0) {
        issues.push("payload is empty")
      }

      for (const key of keys) {
        if (record.payload[key] === null || record.payload[key] === undefined) {
          issues.push("null value for key: " + key)
        }
      }

      return {
        source: record.source,
        field_count: keys.length,
        issues: issues,
        is_valid: issues.length === 0
      }
    }
  }

  node route {
    type: switch
    label: "Route by validity"
    cases: ["valid", "invalid"]
    router: @ts {
      if (context.nodes.validate.output.is_valid) return "valid"
      return "invalid"
    }
  }

  node accept {
    type: code
    label: "Accept record"
    code: @ts {
      return {
        status: "accepted",
        source: context.nodes.validate.output.source,
        fields: context.nodes.validate.output.field_count
      }
    }
  }

  node quarantine {
    type: code
    label: "Quarantine record"
    code: @ts {
      return {
        status: "quarantined",
        source: context.nodes.validate.output.source,
        issues: context.nodes.validate.output.issues
      }
    }
  }

  flow {
    root -> validate
    validate -> route
    route -["valid"]-> accept
    route -["invalid"]-> quarantine
  }
}

trigger on_data {
  webhook:data_record -> check_quality
  enabled: true
}

Flow

Trigger → graph

Graph nodes