← Back to Cookbook

Postgres CRM Pipeline

Reads leads from Postgres, scores with AI, writes results back.

postgresai

Source

/**
 * Demonstrates postgres node types. Reads leads from a database,
 * scores them with AI, and writes results back.
 */

secret db_creds {
  label: "Database"
  vars: [DATABASE_URL]
}

postgres crm_db {
  label: "CRM Database"
  secrets: db_creds
  connection: DATABASE_URL

  table leads {
    schema: @json {
      {
        "type": "object",
        "properties": {
          "id": { "type": "string" },
          "email": { "type": "string" },
          "company": { "type": "string" },
          "score": { "type": "number" },
          "status": { "type": "string" }
        },
        "required": ["id", "email"]
      }
    }
  }

  table lead_scores {
    schema: @json {
      {
        "type": "object",
        "properties": {
          "lead_id": { "type": "string" },
          "score": { "type": "number" },
          "tier": { "type": "string" },
          "scored_at": { "type": "string", "format": "date-time" }
        }
      }
    }
  }
}

schedule daily_scoring {
  label: "Daily Lead Scoring"
  cron: "0 6 * * *"
  timezone: "America/New_York"
}

graph score_leads {
  label: "Score Leads from DB"

  root {
    type: postgres
    label: "Fetch unscored leads"
    postgres: crm_db
    select: @sql {
      SELECT id, email, company
      FROM leads
      WHERE score IS NULL
      ORDER BY created_at DESC
      LIMIT 20
    }
    outputSchema: @json {
      {
        "type": "array",
        "items": {
          "type": "object",
          "properties": {
            "id": { "type": "string" },
            "email": { "type": "string" },
            "company": { "type": "string" }
          }
        }
      }
    }
  }

  node score {
    type: ai
    label: "Score leads"
    kind: object
    model: "google/gemini-2.5-flash"
    prompt: @ts {
      const leads = context.nodes.root.output
      return "Score these leads from 0-100 based on their email domain and company name. Return scores for each lead.\n\n" + JSON.stringify(leads)
    }
    schema: @json {
      {
        "type": "object",
        "required": ["scores"],
        "properties": {
          "scores": {
            "type": "array",
            "items": {
              "type": "object",
              "properties": {
                "id": { "type": "string" },
                "score": { "type": "number" },
                "tier": { "type": "string", "enum": ["hot", "warm", "cold"] }
              }
            }
          }
        }
      }
    }
  }

  node pick_top {
    type: code
    label: "Pick top lead"
    code: @ts {
      const scores = context.nodes.score.output.scores || []
      const top = scores.sort(function(a, b) { return b.score - a.score })[0]
      return top || { id: "", score: 0, tier: "cold" }
    }
  }

  node save_top_score {
    type: postgres
    label: "Save top score"
    postgres: crm_db
    condition: @ts {
      return context.nodes.pick_top.output.id !== ""
    }
    insert: @sql {
      INSERT INTO lead_scores (lead_id, score, tier, scored_at)
      VALUES ({{lead_id}}, {{score}}, {{tier}}, {{scored_at}})
    }
    params: @ts {
      const top = context.nodes.pick_top.output
      return {
        lead_id: top.id,
        score: top.score,
        tier: top.tier,
        scored_at: new Date().toISOString()
      }
    }
  }

  flow {
    root -> score
    score -> pick_top
    pick_top -> save_top_score
  }
}

trigger on_daily_scoring {
  schedule:daily_scoring -> score_leads
  enabled: true
}

Flow

Trigger → graph

Graph nodes