import fs from "node:fs/promises"; import path from "node:path"; import { fileURLToPath } from "node:url"; import { initDb } from "./db/init.js"; import { isImported, importStage2 } from "./db/import.js"; import { openDb } from "./db/index.js"; import { ALL_PROVIDERS, validateProviderKey } from "./stage-3-enrich/config.js"; import type { ProviderConfig } from "./stage-3-enrich/config.js"; // ── Types ───────────────────────────────────────────────────────────────────── type RunStage = | "round1" | "compile_candidates" | "round2" | "compile_votes" | "merge" | "tiebreak" | "compare"; type StageStatus = "complete" | "pending" | "in_progress"; type RunStats = { startedAt: Date; stoppedAt: Date | null; recordsProcessed: number; recordsSkipped: number; needsReview: number; modelsRun: string[]; currentStage: RunStage | null; }; // ── Constants ───────────────────────────────────────────────────────────────── const __dirname = path.dirname(fileURLToPath(import.meta.url)); const PATHS = { omw: path.join(__dirname, "stage-1-extract/output/omw.json"), db: path.join(__dirname, "db/pipeline.db"), reports: path.join(__dirname, "reports"), llamaHealth: "http://127.0.0.1:8080/health", }; const SENTINEL = { sourceId: "system", modelName: "system" }; // ── Startup checks ──────────────────────────────────────────────────────────── async function checkOmwExists(): Promise { try { await fs.access(PATHS.omw); } catch { console.error("\n ERROR: stage-1-extract/output/omw.json not found."); console.error(" Run the stage 1 extraction script first:"); console.error(" python stage-1-extract/scripts/extract.py\n"); process.exit(1); } } async function checkAndInitDb(): Promise { try { await fs.access(PATHS.db); } catch { console.log(" pipeline.db not found — initialising..."); await initDb(); } } async function checkAndImportDb(): Promise { if (!isImported()) { console.log(" Base tables empty — importing stage 2 data..."); await importStage2(); } } async function checkLlamaServer(): Promise { try { const res = await fetch(PATHS.llamaHealth); return res.ok; } catch { return false; } } function isLocalProvider(provider: ProviderConfig): boolean { return provider.apiKey === "none"; } async function checkProviderReady(provider: ProviderConfig): Promise { if (isLocalProvider(provider)) { const running = await checkLlamaServer(); if (!running) { console.error("\n ERROR: llama.cpp server is not running."); console.error(" Start the server before running the pipeline:"); console.error( " ./build/bin/llama-server --model models/.gguf \\", ); console.error(" --port 8080 --host 127.0.0.1"); console.error(" See llm-setup.md for full instructions.\n"); process.exit(1); } } else { validateProviderKey(provider); } } // ── Run name generation ─────────────────────────────────────────────────────── async function generateRunName(): Promise { await fs.mkdir(PATHS.reports, { recursive: true }); const date = new Date().toISOString().slice(0, 10); const files = await fs.readdir(PATHS.reports); const todaysRuns = files.filter( (f) => f.startsWith(date) && f.endsWith(".json"), ).length; return `${date}_run-${todaysRuns + 1}`; } // ── Shutdown handler ────────────────────────────────────────────────────────── let shutdownRequested = false; function registerShutdownHandler(stats: RunStats): void { const handler = (): void => { if (shutdownRequested) return; shutdownRequested = true; stats.stoppedAt = new Date(); console.log("\n\n Shutdown requested — finishing current record..."); }; process.on("SIGINT", handler); process.on("SIGTERM", handler); } // ── Stage status helpers ────────────────────────────────────────────────────── function getSentinelStatus(stage: RunStage): StageStatus { const db = openDb(); const row = db .prepare( `SELECT status FROM run_status WHERE source_id = ? AND model_name = ? AND stage = ?`, ) .get(SENTINEL.sourceId, SENTINEL.modelName, stage) as | { status: string } | undefined; db.close(); return row?.status === "complete" ? "complete" : "pending"; } function markSentinelComplete(stage: RunStage): void { const db = openDb(); db.prepare( `INSERT INTO run_status (source_id, model_name, stage, status) VALUES (?, ?, ?, 'complete') ON CONFLICT (source_id, model_name, stage) DO UPDATE SET status = 'complete', updated_at = datetime('now')`, ).run(SENTINEL.sourceId, SENTINEL.modelName, stage); db.close(); } function getModelRound1Status(modelName: string): StageStatus { const db = openDb(); const total = ( db.prepare("SELECT COUNT(*) as count FROM synsets").get() as { count: number; } ).count; const complete = ( db .prepare( `SELECT COUNT(*) as count FROM run_status WHERE model_name = ? AND stage = 'round1' AND status = 'complete'`, ) .get(modelName) as { count: number } ).count; db.close(); if (complete === 0) return "pending"; if (complete >= total) return "complete"; return "in_progress"; } function getModelRound2Status(modelName: string): StageStatus { const db = openDb(); const total = ( db.prepare("SELECT COUNT(*) as count FROM synsets").get() as { count: number; } ).count; const complete = ( db .prepare( `SELECT COUNT(*) as count FROM run_status WHERE model_name = ? AND stage = 'round2' AND status = 'complete'`, ) .get(modelName) as { count: number } ).count; db.close(); if (complete === 0) return "pending"; if (complete >= total) return "complete"; return "in_progress"; } // ── Stage runners (stubs) ───────────────────────────────────────────────────── function runRound1(provider: ProviderConfig, stats: RunStats): void { console.log(`\n [round 1] Running ${provider.name}...`); // TODO: implement round 1 enrich script console.log(` [round 1] ${provider.name} — not yet implemented`); stats.modelsRun.push(provider.name); } function compileCandidates(): void { console.log("\n [compile candidates] Compiling round 1 output..."); // TODO: implement compile candidates script console.log(" [compile candidates] not yet implemented"); markSentinelComplete("compile_candidates"); } function runRound2(provider: ProviderConfig, stats: RunStats): void { console.log(`\n [round 2] Running ${provider.name}...`); // TODO: implement round 2 enrich script console.log(` [round 2] ${provider.name} — not yet implemented`); stats.modelsRun.push(provider.name); } function compileVotes(): void { console.log("\n [compile votes] Compiling round 2 votes..."); // TODO: implement compile votes script console.log(" [compile votes] not yet implemented"); markSentinelComplete("compile_votes"); } function runMerge(): void { console.log("\n [merge] Resolving votes..."); // TODO: implement merge script console.log(" [merge] not yet implemented"); markSentinelComplete("merge"); } function runTiebreak(stats: RunStats): void { console.log("\n [tiebreak] Resolving flagged translations..."); // TODO: implement tiebreak logic console.log(" [tiebreak] not yet implemented"); stats.currentStage = "tiebreak"; } function runCompare(): void { console.log("\n [compare] Generating COVERAGE.md..."); // TODO: implement compare script console.log(" [compare] not yet implemented"); markSentinelComplete("compare"); } // ── Report generation ───────────────────────────────────────────────────────── async function generateReport(runName: string, stats: RunStats): Promise { const db = openDb(); const totalSynsets = ( db.prepare("SELECT COUNT(*) as count FROM synsets").get() as { count: number; } ).count; const resolvedTranslations = ( db.prepare("SELECT COUNT(*) as count FROM resolved_translations").get() as { count: number; } ).count; const flaggedTranslations = ( db .prepare( `SELECT COUNT(*) as count FROM run_status WHERE stage = 'merge' AND status = 'flagged'`, ) .get() as { count: number } ).count; const needsReview = ( db .prepare( `SELECT COUNT(*) as count FROM run_status WHERE status = 'needs_review'`, ) .get() as { count: number } ).count; db.close(); const stoppedAt = stats.stoppedAt ?? new Date(); const durationMs = stoppedAt.getTime() - stats.startedAt.getTime(); const durationMin = Math.round(durationMs / 60_000); const isFinal = getSentinelStatus("compare") === "complete" && flaggedTranslations === 0; const report = { runName, generatedAt: stoppedAt.toISOString(), durationMinutes: durationMin, isFinal, progress: { totalSynsets, resolvedTranslations, flaggedTranslations, needsReview, recordsProcessedThisRun: stats.recordsProcessed, recordsSkippedThisRun: stats.recordsSkipped, }, modelsRun: stats.modelsRun, stages: { round1: ALL_PROVIDERS.map((p) => ({ model: p.name, status: getModelRound1Status(p.name), })), compileCandidates: getSentinelStatus("compile_candidates"), round2: ALL_PROVIDERS.map((p) => ({ model: p.name, status: getModelRound2Status(p.name), })), compileVotes: getSentinelStatus("compile_votes"), merge: getSentinelStatus("merge"), compare: getSentinelStatus("compare"), }, }; await fs.mkdir(PATHS.reports, { recursive: true }); const jsonPath = path.join(PATHS.reports, `${runName}.json`); const mdPath = path.join(PATHS.reports, `${runName}.md`); await fs.writeFile(jsonPath, JSON.stringify(report, null, 2), "utf-8"); const md = [ `# Pipeline run: ${runName}`, ``, `Generated: ${stoppedAt.toISOString()}`, `Duration: ${durationMin} minutes`, isFinal ? `**Status: FINAL — pipeline complete**` : `**Status: In progress**`, ``, `## Progress`, ``, `| Metric | Value |`, `| ------ | ----- |`, `| Total synsets | ${totalSynsets.toLocaleString()} |`, `| Resolved translations | ${resolvedTranslations.toLocaleString()} |`, `| Flagged translations | ${flaggedTranslations.toLocaleString()} |`, `| Needs review | ${needsReview.toLocaleString()} |`, `| Records processed this run | ${stats.recordsProcessed.toLocaleString()} |`, `| Records skipped this run | ${stats.recordsSkipped.toLocaleString()} |`, ``, `## Stage status`, ``, `### Round 1`, ``, ...report.stages.round1.map( (s) => `- ${s.status === "complete" ? "✅" : s.status === "in_progress" ? "🔄" : "🔲"} ${s.model}`, ), ``, `### Compile candidates: ${report.stages.compileCandidates}`, ``, `### Round 2`, ``, ...report.stages.round2.map( (s) => `- ${s.status === "complete" ? "✅" : s.status === "in_progress" ? "🔄" : "🔲"} ${s.model}`, ), ``, `### Compile votes: ${report.stages.compileVotes}`, `### Merge: ${report.stages.merge}`, `### Compare: ${report.stages.compare}`, ``, `## Models run this session`, ``, stats.modelsRun.length > 0 ? stats.modelsRun.map((m) => `- ${m}`).join("\n") : "_none_", ].join("\n"); await fs.writeFile(mdPath, md, "utf-8"); console.log(`\n Report written → ${jsonPath}`); console.log(` Report written → ${mdPath}`); } // ── Main ────────────────────────────────────────────────────────────────────── async function main(): Promise { console.log("lila data pipeline\n"); // ── Startup checks console.log("Checking prerequisites..."); await checkOmwExists(); await checkAndInitDb(); await checkAndImportDb(); console.log(" Prerequisites OK"); // ── Run name const runName = await generateRunName(); console.log(`\n Run: ${runName}`); // ── Stats const stats: RunStats = { startedAt: new Date(), stoppedAt: null, recordsProcessed: 0, recordsSkipped: 0, needsReview: 0, modelsRun: [], currentStage: null, }; registerShutdownHandler(stats); // ── Round 1 console.log("\nRound 1 — generation"); for (const provider of ALL_PROVIDERS) { if (shutdownRequested) break; const status = getModelRound1Status(provider.name); if (status === "complete") { console.log(` [round 1] ${provider.name} — already complete, skipping`); continue; } await checkProviderReady(provider); stats.currentStage = "round1"; if (status === "in_progress") { console.log(` [round 1] ${provider.name} — resuming...`); } runRound1(provider, stats); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Compile candidates if (getSentinelStatus("compile_candidates") === "complete") { console.log("\n [compile candidates] Already complete, skipping"); } else { stats.currentStage = "compile_candidates"; compileCandidates(); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Round 2 console.log("\nRound 2 — voting"); for (const provider of ALL_PROVIDERS) { if (shutdownRequested) break; const status = getModelRound2Status(provider.name); if (status === "complete") { console.log(` [round 2] ${provider.name} — already complete, skipping`); continue; } await checkProviderReady(provider); stats.currentStage = "round2"; if (status === "in_progress") { console.log(` [round 2] ${provider.name} — resuming...`); } runRound2(provider, stats); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Compile votes if (getSentinelStatus("compile_votes") === "complete") { console.log("\n [compile votes] Already complete, skipping"); } else { stats.currentStage = "compile_votes"; compileVotes(); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Merge if (getSentinelStatus("merge") === "complete") { console.log("\n [merge] Already complete, skipping"); } else { stats.currentStage = "merge"; runMerge(); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Tiebreak const db = openDb(); const flagged = ( db .prepare( `SELECT COUNT(*) as count FROM run_status WHERE stage = 'merge' AND status = 'flagged'`, ) .get() as { count: number } ).count; db.close(); if (flagged > 0) { stats.currentStage = "tiebreak"; runTiebreak(stats); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Compare if (getSentinelStatus("compare") === "complete") { console.log("\n [compare] Already complete, skipping"); } else { stats.currentStage = "compare"; runCompare(); } // ── Report stats.stoppedAt = new Date(); await generateReport(runName, stats); console.log("\nPipeline complete."); } main().catch((err) => { console.error(err); process.exit(1); });