diff --git a/data-pipeline/db/import.ts b/data-pipeline/db/import.ts index 276536f..f176d29 100644 --- a/data-pipeline/db/import.ts +++ b/data-pipeline/db/import.ts @@ -216,7 +216,9 @@ async function main(): Promise { await importStage2(); } -main().catch((err) => { - console.error(err); - process.exit(1); -}); +if (import.meta.url === `file://${process.argv[1]}`) { + main().catch((err) => { + console.error(err); + process.exit(1); + }); +} diff --git a/data-pipeline/db/init.ts b/data-pipeline/db/init.ts index f85d213..3ba0558 100644 --- a/data-pipeline/db/init.ts +++ b/data-pipeline/db/init.ts @@ -33,7 +33,10 @@ async function main(): Promise { await initDb(); } -main().catch((err) => { - console.error(err); - process.exit(1); -}); +// after +if (import.meta.url === `file://${process.argv[1]}`) { + main().catch((err) => { + console.error(err); + process.exit(1); + }); +} diff --git a/data-pipeline/db/schema.sql b/data-pipeline/db/schema.sql index fb4f838..6044a1d 100644 --- a/data-pipeline/db/schema.sql +++ b/data-pipeline/db/schema.sql @@ -41,7 +41,16 @@ CREATE TABLE IF NOT EXISTS cefr_source_votes ( -- stage: round1 | round2 | tiebreak -- status: pending | complete | needs_review | flagged - +CREATE TABLE IF NOT EXISTS run_status ( + id INTEGER PRIMARY KEY, + source_id TEXT NOT NULL, + model_name TEXT NOT NULL, + stage TEXT NOT NULL, + status TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')), + UNIQUE (source_id, model_name, stage) +); -- ── Round 1 output ──────────────────────────────────────────────────────────── -- One row per translation/language per model. Written atomically per record. diff --git a/data-pipeline/pipeline.ts b/data-pipeline/pipeline.ts new file mode 100644 index 0000000..43e33c1 --- /dev/null +++ b/data-pipeline/pipeline.ts @@ -0,0 +1,561 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { fileURLToPath } from "node:url"; +import { initDb } from "./db/init.js"; +import { isImported, importStage2 } from "./db/import.js"; +import { openDb } from "./db/index.js"; +import { ALL_PROVIDERS, validateProviderKey } from "./stage-3-enrich/config.js"; +import type { ProviderConfig } from "./stage-3-enrich/config.js"; + +// ── Types ───────────────────────────────────────────────────────────────────── + +type RunStage = + | "round1" + | "compile_candidates" + | "round2" + | "compile_votes" + | "merge" + | "tiebreak" + | "compare"; + +type StageStatus = "complete" | "pending" | "in_progress"; + +type RunStats = { + startedAt: Date; + stoppedAt: Date | null; + recordsProcessed: number; + recordsSkipped: number; + needsReview: number; + modelsRun: string[]; + currentStage: RunStage | null; +}; + +// ── Constants ───────────────────────────────────────────────────────────────── + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); + +const PATHS = { + omw: path.join(__dirname, "stage-1-extract/output/omw.json"), + db: path.join(__dirname, "db/pipeline.db"), + reports: path.join(__dirname, "reports"), + llamaHealth: "http://127.0.0.1:8080/health", +}; + +const SENTINEL = { sourceId: "system", modelName: "system" }; + +// ── Startup checks ──────────────────────────────────────────────────────────── + +async function checkOmwExists(): Promise { + try { + await fs.access(PATHS.omw); + } catch { + console.error("\n ERROR: stage-1-extract/output/omw.json not found."); + console.error(" Run the stage 1 extraction script first:"); + console.error(" python stage-1-extract/scripts/extract.py\n"); + process.exit(1); + } +} + +async function checkAndInitDb(): Promise { + try { + await fs.access(PATHS.db); + } catch { + console.log(" pipeline.db not found — initialising..."); + await initDb(); + } +} + +async function checkAndImportDb(): Promise { + if (!isImported()) { + console.log(" Base tables empty — importing stage 2 data..."); + await importStage2(); + } +} + +async function checkLlamaServer(): Promise { + try { + const res = await fetch(PATHS.llamaHealth); + return res.ok; + } catch { + return false; + } +} + +function isLocalProvider(provider: ProviderConfig): boolean { + return provider.apiKey === "none"; +} + +async function checkProviderReady(provider: ProviderConfig): Promise { + if (isLocalProvider(provider)) { + const running = await checkLlamaServer(); + if (!running) { + console.error("\n ERROR: llama.cpp server is not running."); + console.error(" Start the server before running the pipeline:"); + console.error( + " ./build/bin/llama-server --model models/.gguf \\", + ); + console.error(" --port 8080 --host 127.0.0.1"); + console.error(" See llm-setup.md for full instructions.\n"); + process.exit(1); + } + } else { + validateProviderKey(provider); + } +} + +// ── Run name generation ─────────────────────────────────────────────────────── + +async function generateRunName(): Promise { + await fs.mkdir(PATHS.reports, { recursive: true }); + + const date = new Date().toISOString().slice(0, 10); + const files = await fs.readdir(PATHS.reports); + const todaysRuns = files.filter( + (f) => f.startsWith(date) && f.endsWith(".json"), + ).length; + + return `${date}_run-${todaysRuns + 1}`; +} + +// ── Shutdown handler ────────────────────────────────────────────────────────── + +let shutdownRequested = false; + +function registerShutdownHandler(stats: RunStats): void { + const handler = (): void => { + if (shutdownRequested) return; + shutdownRequested = true; + stats.stoppedAt = new Date(); + console.log("\n\n Shutdown requested — finishing current record..."); + }; + + process.on("SIGINT", handler); + process.on("SIGTERM", handler); +} +// ── Stage status helpers ────────────────────────────────────────────────────── + +function getSentinelStatus(stage: RunStage): StageStatus { + const db = openDb(); + const row = db + .prepare( + `SELECT status FROM run_status + WHERE source_id = ? AND model_name = ? AND stage = ?`, + ) + .get(SENTINEL.sourceId, SENTINEL.modelName, stage) as + | { status: string } + | undefined; + db.close(); + return row?.status === "complete" ? "complete" : "pending"; +} + +function markSentinelComplete(stage: RunStage): void { + const db = openDb(); + db.prepare( + `INSERT INTO run_status (source_id, model_name, stage, status) + VALUES (?, ?, ?, 'complete') + ON CONFLICT (source_id, model_name, stage) + DO UPDATE SET status = 'complete', updated_at = datetime('now')`, + ).run(SENTINEL.sourceId, SENTINEL.modelName, stage); + db.close(); +} + +function getModelRound1Status(modelName: string): StageStatus { + const db = openDb(); + + const total = ( + db.prepare("SELECT COUNT(*) as count FROM synsets").get() as { + count: number; + } + ).count; + + const complete = ( + db + .prepare( + `SELECT COUNT(*) as count FROM run_status + WHERE model_name = ? AND stage = 'round1' AND status = 'complete'`, + ) + .get(modelName) as { count: number } + ).count; + + db.close(); + + if (complete === 0) return "pending"; + if (complete >= total) return "complete"; + return "in_progress"; +} + +function getModelRound2Status(modelName: string): StageStatus { + const db = openDb(); + + const total = ( + db.prepare("SELECT COUNT(*) as count FROM synsets").get() as { + count: number; + } + ).count; + + const complete = ( + db + .prepare( + `SELECT COUNT(*) as count FROM run_status + WHERE model_name = ? AND stage = 'round2' AND status = 'complete'`, + ) + .get(modelName) as { count: number } + ).count; + + db.close(); + + if (complete === 0) return "pending"; + if (complete >= total) return "complete"; + return "in_progress"; +} + +// ── Stage runners (stubs) ───────────────────────────────────────────────────── + +function runRound1(provider: ProviderConfig, stats: RunStats): void { + console.log(`\n [round 1] Running ${provider.name}...`); + // TODO: implement round 1 enrich script + console.log(` [round 1] ${provider.name} — not yet implemented`); + stats.modelsRun.push(provider.name); +} + +function compileCandidates(): void { + console.log("\n [compile candidates] Compiling round 1 output..."); + // TODO: implement compile candidates script + console.log(" [compile candidates] not yet implemented"); + markSentinelComplete("compile_candidates"); +} + +function runRound2(provider: ProviderConfig, stats: RunStats): void { + console.log(`\n [round 2] Running ${provider.name}...`); + // TODO: implement round 2 enrich script + console.log(` [round 2] ${provider.name} — not yet implemented`); + stats.modelsRun.push(provider.name); +} + +function compileVotes(): void { + console.log("\n [compile votes] Compiling round 2 votes..."); + // TODO: implement compile votes script + console.log(" [compile votes] not yet implemented"); + markSentinelComplete("compile_votes"); +} + +function runMerge(): void { + console.log("\n [merge] Resolving votes..."); + // TODO: implement merge script + console.log(" [merge] not yet implemented"); + markSentinelComplete("merge"); +} + +function runTiebreak(stats: RunStats): void { + console.log("\n [tiebreak] Resolving flagged translations..."); + // TODO: implement tiebreak logic + console.log(" [tiebreak] not yet implemented"); + stats.currentStage = "tiebreak"; +} + +function runCompare(): void { + console.log("\n [compare] Generating COVERAGE.md..."); + // TODO: implement compare script + console.log(" [compare] not yet implemented"); + markSentinelComplete("compare"); +} + +// ── Report generation ───────────────────────────────────────────────────────── + +async function generateReport(runName: string, stats: RunStats): Promise { + const db = openDb(); + + const totalSynsets = ( + db.prepare("SELECT COUNT(*) as count FROM synsets").get() as { + count: number; + } + ).count; + + const resolvedTranslations = ( + db.prepare("SELECT COUNT(*) as count FROM resolved_translations").get() as { + count: number; + } + ).count; + + const flaggedTranslations = ( + db + .prepare( + `SELECT COUNT(*) as count FROM run_status + WHERE stage = 'merge' AND status = 'flagged'`, + ) + .get() as { count: number } + ).count; + + const needsReview = ( + db + .prepare( + `SELECT COUNT(*) as count FROM run_status + WHERE status = 'needs_review'`, + ) + .get() as { count: number } + ).count; + + db.close(); + + const stoppedAt = stats.stoppedAt ?? new Date(); + const durationMs = stoppedAt.getTime() - stats.startedAt.getTime(); + const durationMin = Math.round(durationMs / 60_000); + + const isFinal = + getSentinelStatus("compare") === "complete" && flaggedTranslations === 0; + + const report = { + runName, + generatedAt: stoppedAt.toISOString(), + durationMinutes: durationMin, + isFinal, + progress: { + totalSynsets, + resolvedTranslations, + flaggedTranslations, + needsReview, + recordsProcessedThisRun: stats.recordsProcessed, + recordsSkippedThisRun: stats.recordsSkipped, + }, + modelsRun: stats.modelsRun, + stages: { + round1: ALL_PROVIDERS.map((p) => ({ + model: p.name, + status: getModelRound1Status(p.name), + })), + compileCandidates: getSentinelStatus("compile_candidates"), + round2: ALL_PROVIDERS.map((p) => ({ + model: p.name, + status: getModelRound2Status(p.name), + })), + compileVotes: getSentinelStatus("compile_votes"), + merge: getSentinelStatus("merge"), + compare: getSentinelStatus("compare"), + }, + }; + + await fs.mkdir(PATHS.reports, { recursive: true }); + + const jsonPath = path.join(PATHS.reports, `${runName}.json`); + const mdPath = path.join(PATHS.reports, `${runName}.md`); + + await fs.writeFile(jsonPath, JSON.stringify(report, null, 2), "utf-8"); + + const md = [ + `# Pipeline run: ${runName}`, + ``, + `Generated: ${stoppedAt.toISOString()}`, + `Duration: ${durationMin} minutes`, + isFinal + ? `**Status: FINAL — pipeline complete**` + : `**Status: In progress**`, + ``, + `## Progress`, + ``, + `| Metric | Value |`, + `| ------ | ----- |`, + `| Total synsets | ${totalSynsets.toLocaleString()} |`, + `| Resolved translations | ${resolvedTranslations.toLocaleString()} |`, + `| Flagged translations | ${flaggedTranslations.toLocaleString()} |`, + `| Needs review | ${needsReview.toLocaleString()} |`, + `| Records processed this run | ${stats.recordsProcessed.toLocaleString()} |`, + `| Records skipped this run | ${stats.recordsSkipped.toLocaleString()} |`, + ``, + `## Stage status`, + ``, + `### Round 1`, + ``, + ...report.stages.round1.map( + (s) => + `- ${s.status === "complete" ? "✅" : s.status === "in_progress" ? "🔄" : "🔲"} ${s.model}`, + ), + ``, + `### Compile candidates: ${report.stages.compileCandidates}`, + ``, + `### Round 2`, + ``, + ...report.stages.round2.map( + (s) => + `- ${s.status === "complete" ? "✅" : s.status === "in_progress" ? "🔄" : "🔲"} ${s.model}`, + ), + ``, + `### Compile votes: ${report.stages.compileVotes}`, + `### Merge: ${report.stages.merge}`, + `### Compare: ${report.stages.compare}`, + ``, + `## Models run this session`, + ``, + stats.modelsRun.length > 0 + ? stats.modelsRun.map((m) => `- ${m}`).join("\n") + : "_none_", + ].join("\n"); + + await fs.writeFile(mdPath, md, "utf-8"); + + console.log(`\n Report written → ${jsonPath}`); + console.log(` Report written → ${mdPath}`); +} + +// ── Main ────────────────────────────────────────────────────────────────────── + +async function main(): Promise { + console.log("lila data pipeline\n"); + + // ── Startup checks + console.log("Checking prerequisites..."); + await checkOmwExists(); + await checkAndInitDb(); + await checkAndImportDb(); + console.log(" Prerequisites OK"); + + // ── Run name + const runName = await generateRunName(); + console.log(`\n Run: ${runName}`); + + // ── Stats + const stats: RunStats = { + startedAt: new Date(), + stoppedAt: null, + recordsProcessed: 0, + recordsSkipped: 0, + needsReview: 0, + modelsRun: [], + currentStage: null, + }; + + registerShutdownHandler(stats); + + // ── Round 1 + console.log("\nRound 1 — generation"); + for (const provider of ALL_PROVIDERS) { + if (shutdownRequested) break; + + const status = getModelRound1Status(provider.name); + + if (status === "complete") { + console.log(` [round 1] ${provider.name} — already complete, skipping`); + continue; + } + + await checkProviderReady(provider); + stats.currentStage = "round1"; + + if (status === "in_progress") { + console.log(` [round 1] ${provider.name} — resuming...`); + } + + runRound1(provider, stats); + } + + if (shutdownRequested) { + await generateReport(runName, stats); + process.exit(0); + } + + // ── Compile candidates + if (getSentinelStatus("compile_candidates") === "complete") { + console.log("\n [compile candidates] Already complete, skipping"); + } else { + stats.currentStage = "compile_candidates"; + compileCandidates(); + } + + if (shutdownRequested) { + await generateReport(runName, stats); + process.exit(0); + } + + // ── Round 2 + console.log("\nRound 2 — voting"); + for (const provider of ALL_PROVIDERS) { + if (shutdownRequested) break; + + const status = getModelRound2Status(provider.name); + + if (status === "complete") { + console.log(` [round 2] ${provider.name} — already complete, skipping`); + continue; + } + + await checkProviderReady(provider); + stats.currentStage = "round2"; + + if (status === "in_progress") { + console.log(` [round 2] ${provider.name} — resuming...`); + } + + runRound2(provider, stats); + } + + if (shutdownRequested) { + await generateReport(runName, stats); + process.exit(0); + } + + // ── Compile votes + if (getSentinelStatus("compile_votes") === "complete") { + console.log("\n [compile votes] Already complete, skipping"); + } else { + stats.currentStage = "compile_votes"; + compileVotes(); + } + + if (shutdownRequested) { + await generateReport(runName, stats); + process.exit(0); + } + + // ── Merge + if (getSentinelStatus("merge") === "complete") { + console.log("\n [merge] Already complete, skipping"); + } else { + stats.currentStage = "merge"; + runMerge(); + } + + if (shutdownRequested) { + await generateReport(runName, stats); + process.exit(0); + } + + // ── Tiebreak + const db = openDb(); + const flagged = ( + db + .prepare( + `SELECT COUNT(*) as count FROM run_status + WHERE stage = 'merge' AND status = 'flagged'`, + ) + .get() as { count: number } + ).count; + db.close(); + + if (flagged > 0) { + stats.currentStage = "tiebreak"; + runTiebreak(stats); + } + + if (shutdownRequested) { + await generateReport(runName, stats); + process.exit(0); + } + + // ── Compare + if (getSentinelStatus("compare") === "complete") { + console.log("\n [compare] Already complete, skipping"); + } else { + stats.currentStage = "compare"; + runCompare(); + } + + // ── Report + stats.stoppedAt = new Date(); + await generateReport(runName, stats); + + console.log("\nPipeline complete."); +} + +main().catch((err) => { + console.error(err); + process.exit(1); +});