← Back to cookbook

Data Quality Checker

Validates incoming data records against quality rules and quarantines bad data.

switch

Source

/**
 * Validates incoming data records against quality rules
 * and quarantines bad data.
 */

webhook data_record {
  label: "Data Record"
  schema: @json {
    {
      "type": "object",
      "required": ["source", "payload"],
      "properties": {
        "source": { "type": "string" },
        "payload": { "type": "object" }
      }
    }
  }
}

workflow check_quality {
  label: "Check Data Quality"

  root {
    type: code
    label: "Extract record"
    code: @ts { return context.nodes.root.input }
    outputSchema: @json {
      {
        "type": "object",
        "properties": {
          "source": { "type": "string" },
          "payload": { "type": "object" }
        }
      }
    }
  }

  node validate {
    type: code
    label: "Run validation rules"
    code: @ts {
      const record = context.nodes.root.output
      const issues = []

      if (!record.payload || typeof record.payload !== "object") {
        issues.push("payload is not an object")
      }

      const keys = Object.keys(record.payload || {})
      if (keys.length === 0) {
        issues.push("payload is empty")
      }

      for (const key of keys) {
        if (record.payload[key] === null || record.payload[key] === undefined) {
          issues.push("null value for key: " + key)
        }
      }

      return {
        source: record.source,
        field_count: keys.length,
        issues: issues,
        is_valid: issues.length === 0
      }
    }
  }

  node route {
    type: switch
    label: "Route by validity"
    cases: ["valid", "invalid"]
    router: @ts {
      if (context.nodes.validate.output.is_valid) return "valid"
      return "invalid"
    }
  }

  node accept {
    type: code
    label: "Accept record"
    code: @ts {
      return {
        status: "accepted",
        source: context.nodes.validate.output.source,
        fields: context.nodes.validate.output.field_count
      }
    }
  }

  node quarantine {
    type: code
    label: "Quarantine record"
    code: @ts {
      return {
        status: "quarantined",
        source: context.nodes.validate.output.source,
        issues: context.nodes.validate.output.issues
      }
    }
  }

  flow {
    root -> validate
    validate -> route
    route -["valid"]-> accept
    route -["invalid"]-> quarantine
  }
}

stream quality_log {
  label: "Quality log"
  workflow: check_quality
  version: v1

  versions: {
    v1 {
      schema: @json {
        {
          "type": "object",
          "required": ["status", "source"],
          "properties": {
            "status": { "type": "string" },
            "source": { "type": "string" },
            "fields": { "type": "number" },
            "issues": { "type": "array", "items": { "type": "string" } }
          }
        }
      }

      condition: @ts { return true }

      prepare: @ts {
        const accepted = context.output.accept
        if (accepted) return accepted
        return context.output.quarantine
      }
    }
  }
}

trigger on_data {
  webhook:data_record -> check_quality
  enabled: true
}

Flow

Trigger to workflow

Workflow nodes