ScheduleView on GitHub
Postgres CRM Pipeline
Reads leads from Postgres, scores with AI, writes results back.
postgresai
Source
/**
* Demonstrates postgres node types. Reads leads from a database,
* scores them with AI, and writes results back.
*/
secret db_creds {
label: "Database"
vars: [DATABASE_URL]
}
postgres crm_db {
label: "CRM Database"
secrets: db_creds
connection: DATABASE_URL
table leads {
schema: @json {
{
"type": "object",
"properties": {
"id": { "type": "string" },
"email": { "type": "string" },
"company": { "type": "string" },
"score": { "type": "number" },
"status": { "type": "string" }
},
"required": ["id", "email"]
}
}
}
table lead_scores {
schema: @json {
{
"type": "object",
"properties": {
"lead_id": { "type": "string" },
"score": { "type": "number" },
"tier": { "type": "string" },
"scored_at": { "type": "string", "format": "date-time" }
}
}
}
}
}
schedule daily_scoring {
label: "Daily Lead Scoring"
cron: "0 6 * * *"
timezone: "America/New_York"
}
graph score_leads {
label: "Score Leads from DB"
root {
type: postgres
label: "Fetch unscored leads"
postgres: crm_db
select: @sql {
SELECT id, email, company
FROM leads
WHERE score IS NULL
ORDER BY created_at DESC
LIMIT 20
}
outputSchema: @json {
{
"type": "array",
"items": {
"type": "object",
"properties": {
"id": { "type": "string" },
"email": { "type": "string" },
"company": { "type": "string" }
}
}
}
}
}
node score {
type: ai
label: "Score leads"
kind: object
model: "google/gemini-2.5-flash"
prompt: @ts {
const leads = context.nodes.root.output
return "Score these leads from 0-100 based on their email domain and company name. Return scores for each lead.\n\n" + JSON.stringify(leads)
}
schema: @json {
{
"type": "object",
"required": ["scores"],
"properties": {
"scores": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": { "type": "string" },
"score": { "type": "number" },
"tier": { "type": "string", "enum": ["hot", "warm", "cold"] }
}
}
}
}
}
}
}
node pick_top {
type: code
label: "Pick top lead"
code: @ts {
const scores = context.nodes.score.output.scores || []
const top = scores.sort(function(a, b) { return b.score - a.score })[0]
return top || { id: "", score: 0, tier: "cold" }
}
}
node save_top_score {
type: postgres
label: "Save top score"
postgres: crm_db
condition: @ts {
return context.nodes.pick_top.output.id !== ""
}
insert: @sql {
INSERT INTO lead_scores (lead_id, score, tier, scored_at)
VALUES ({{lead_id}}, {{score}}, {{tier}}, {{scored_at}})
}
params: @ts {
const top = context.nodes.pick_top.output
return {
lead_id: top.id,
score: top.score,
tier: top.tier,
scored_at: new Date().toISOString()
}
}
}
flow {
root -> score
score -> pick_top
pick_top -> save_top_score
}
}
trigger on_daily_scoring {
schedule:daily_scoring -> score_leads
enabled: true
}
Flow
Trigger → graph
Graph nodes