WebhookView on GitHub
Data Quality Checker
Validates incoming data records against quality rules and quarantines bad data.
switch
Source
/**
* Validates incoming data records against quality rules
* and quarantines bad data.
*/
webhook data_record {
label: "Data Record"
schema: @json {
{
"type": "object",
"required": ["source", "payload"],
"properties": {
"source": { "type": "string" },
"payload": { "type": "object" }
}
}
}
}
workflow check_quality {
label: "Check Data Quality"
root {
type: code
label: "Extract record"
code: @ts { return context.nodes.root.input }
outputSchema: @json {
{
"type": "object",
"properties": {
"source": { "type": "string" },
"payload": { "type": "object" }
}
}
}
}
node validate {
type: code
label: "Run validation rules"
code: @ts {
const record = context.nodes.root.output
const issues = []
if (!record.payload || typeof record.payload !== "object") {
issues.push("payload is not an object")
}
const keys = Object.keys(record.payload || {})
if (keys.length === 0) {
issues.push("payload is empty")
}
for (const key of keys) {
if (record.payload[key] === null || record.payload[key] === undefined) {
issues.push("null value for key: " + key)
}
}
return {
source: record.source,
field_count: keys.length,
issues: issues,
is_valid: issues.length === 0
}
}
}
node route {
type: switch
label: "Route by validity"
cases: ["valid", "invalid"]
router: @ts {
if (context.nodes.validate.output.is_valid) return "valid"
return "invalid"
}
}
node accept {
type: code
label: "Accept record"
code: @ts {
return {
status: "accepted",
source: context.nodes.validate.output.source,
fields: context.nodes.validate.output.field_count
}
}
}
node quarantine {
type: code
label: "Quarantine record"
code: @ts {
return {
status: "quarantined",
source: context.nodes.validate.output.source,
issues: context.nodes.validate.output.issues
}
}
}
flow {
root -> validate
validate -> route
route -["valid"]-> accept
route -["invalid"]-> quarantine
}
}
stream quality_log {
label: "Quality log"
workflow: check_quality
version: v1
versions: {
v1 {
schema: @json {
{
"type": "object",
"required": ["status", "source"],
"properties": {
"status": { "type": "string" },
"source": { "type": "string" },
"fields": { "type": "number" },
"issues": { "type": "array", "items": { "type": "string" } }
}
}
}
condition: @ts { return true }
prepare: @ts {
const accepted = context.output.accept
if (accepted) return accepted
return context.output.quarantine
}
}
}
}
trigger on_data {
webhook:data_record -> check_quality
enabled: true
}
Flow
Trigger to workflow
Workflow nodes