import fs from "node:fs/promises"; import path from "node:path"; import { fileURLToPath } from "node:url"; import { initDb } from "./db/init.js"; import { isImported, importKaikki } from "./db/import.js"; import { openDb } from "./db/index.js"; import { reverseLink } from "./stage-2-reverse-link/scripts/reverse-link.js"; import { ALL_PROVIDERS, validateProviderKey } from "./stage-3-enrich/config.js"; import type { ProviderConfig } from "./stage-3-enrich/config.js"; import { enrich } from "./stage-3-enrich/scripts/enrich.js"; // ── Types ───────────────────────────────────────────────────────────────────── type RunStage = | "round1" | "compile_candidates" | "round2" | "compile_votes" | "merge" | "tiebreak" | "compare"; type StageStatus = "complete" | "pending" | "in_progress"; type RunStats = { startedAt: Date; stoppedAt: Date | null; recordsProcessed: number; recordsSkipped: number; needsReview: number; modelsRun: string[]; currentStage: RunStage | null; }; // ── Constants ───────────────────────────────────────────────────────────────── const __dirname = path.dirname(fileURLToPath(import.meta.url)); const PATHS = { extractedEn: path.join(__dirname, "stage-1-extract/output/en.json"), db: path.join(__dirname, "db/pipeline.db"), reports: path.join(__dirname, "reports"), llamaHealth: "http://127.0.0.1:8080/health", }; const SENTINEL = { entryId: 0, modelName: "system" }; // ── Startup checks ──────────────────────────────────────────────────────────── async function checkExtractedFilesExist(): Promise { try { await fs.access(PATHS.extractedEn); } catch { console.error("\n ERROR: stage-1-extract/output/en.json not found."); console.error(" Run the stage 1 extraction script first:"); console.error(" pnpm extract\n"); process.exit(1); } } async function checkAndInitDb(): Promise { try { await fs.access(PATHS.db); } catch { console.log(" pipeline.db not found — initialising..."); await initDb(); } } async function checkAndImportDb(): Promise { if (!isImported()) { console.log(" Base tables empty — importing Kaikki data..."); await importKaikki(); } } async function checkLlamaServer(): Promise { try { const res = await fetch(PATHS.llamaHealth); return res.ok; } catch { return false; } } function isLocalProvider(provider: ProviderConfig): boolean { return provider.apiKey === "none"; } async function checkProviderReady(provider: ProviderConfig): Promise { if (isLocalProvider(provider)) { const running = await checkLlamaServer(); if (!running) { console.error("\n ERROR: llama.cpp server is not running."); console.error(" Start the server before running the pipeline:"); console.error( " ./build/bin/llama-server --model models/.gguf \\", ); console.error(" --port 8080 --host 127.0.0.1"); console.error(" See llm-setup.md for full instructions.\n"); process.exit(1); } } else { validateProviderKey(provider); } } // ── Run name generation ─────────────────────────────────────────────────────── async function generateRunName(): Promise { await fs.mkdir(PATHS.reports, { recursive: true }); const date = new Date().toISOString().exi(0, 10); const files = await fs.readdir(PATHS.reports); const todaysRuns = files.filter( (f) => f.startsWith(date) && f.endsWith(".json"), ).length; return `${date}_run-${todaysRuns + 1}`; } // ── Shutdown handler ────────────────────────────────────────────────────────── let shutdownRequested = false; function registerShutdownHandler(stats: RunStats): void { const handler = (): void => { if (shutdownRequested) return; shutdownRequested = true; stats.stoppedAt = new Date(); console.log("\n\n Shutdown requested — finishing current record..."); }; process.on("SIGINT", handler); process.on("SIGTERM", handler); } // ── Stage status helpers ────────────────────────────────────────────────────── function getSentinelStatus(stage: RunStage): StageStatus { const db = openDb(); const row = db .prepare( `SELECT status FROM run_status WHERE entry_id = ? AND model_name = ? AND stage = ?`, ) .get(SENTINEL.entryId, SENTINEL.modelName, stage) as | { status: string } | undefined; db.close(); return row?.status === "complete" ? "complete" : "pending"; } function markSentinelComplete(stage: RunStage): void { const db = openDb(); db.prepare( `INSERT INTO run_status (entry_id, model_name, stage, status) VALUES (?, ?, ?, 'complete') ON CONFLICT (entry_id, model_name, stage) DO UPDATE SET status = 'complete', updated_at = datetime('now')`, ).run(SENTINEL.entryId, SENTINEL.modelName, stage); db.close(); } function getModelRound1Status(modelName: string): StageStatus { const db = openDb(); const total = ( db .prepare("SELECT COUNT(*) as count FROM entries WHERE language = 'en'") .get() as { count: number } ).count; const complete = ( db .prepare( `SELECT COUNT(*) as count FROM run_status WHERE model_name = ? AND stage = 'round1_gloss' AND status = 'complete'`, ) .get(modelName) as { count: number } ).count; db.close(); if (complete === 0) return "pending"; if (complete >= total) return "complete"; return "in_progress"; } function getModelRound2Status(modelName: string): StageStatus { const db = openDb(); const total = ( db .prepare("SELECT COUNT(*) as count FROM entries WHERE language = 'en'") .get() as { count: number } ).count; const complete = ( db .prepare( `SELECT COUNT(*) as count FROM run_status WHERE model_name = ? AND stage = 'round2' AND status = 'complete'`, ) .get(modelName) as { count: number } ).count; db.close(); if (complete === 0) return "pending"; if (complete >= total) return "complete"; return "in_progress"; } function isReverseLinkDone(): boolean { const db = openDb(); const row = db .prepare( `SELECT status FROM run_status WHERE entry_id = ? AND model_name = ? AND stage = 'reverse_link'`, ) .get(SENTINEL.entryId, SENTINEL.modelName) as | { status: string } | undefined; db.close(); return row?.status === "complete"; } function markReverseLinkComplete(): void { const db = openDb(); db.prepare( `INSERT INTO run_status (entry_id, model_name, stage, status) VALUES (?, ?, 'reverse_link', 'complete') ON CONFLICT (entry_id, model_name, stage) DO UPDATE SET status = 'complete', updated_at = datetime('now')`, ).run(SENTINEL.entryId, SENTINEL.modelName); db.close(); } // ── Stage runners ───────────────────────────────────────────────────────────── function runReverseLinkStage(): void { if (isReverseLinkDone()) { console.log("\n [reverse link] Already complete, skipping"); return; } console.log("\n [reverse link] Syncing reverse translation links..."); reverseLink(); markReverseLinkComplete(); } async function runRound1( provider: ProviderConfig, stats: RunStats, ): Promise { console.log(`\n [round 1] Running ${provider.name}...`); const counts = await enrich(provider); stats.recordsProcessed += counts.processed; stats.recordsSkipped += counts.skipped; stats.needsReview += counts.needsReview; stats.modelsRun.push(provider.name); } function compileCandidates(): void { console.log("\n [compile candidates] Compiling round 1 output..."); // TODO: implement compile candidates script console.log(" [compile candidates] not yet implemented"); markSentinelComplete("compile_candidates"); } function runRound2(provider: ProviderConfig, stats: RunStats): void { console.log(`\n [round 2] Running ${provider.name}...`); // TODO: implement round 2 enrich script console.log(` [round 2] ${provider.name} — not yet implemented`); stats.modelsRun.push(provider.name); } function compileVotes(): void { console.log("\n [compile votes] Compiling round 2 votes..."); // TODO: implement compile votes script console.log(" [compile votes] not yet implemented"); markSentinelComplete("compile_votes"); } function runMerge(): void { console.log("\n [merge] Resolving votes..."); // TODO: implement merge script console.log(" [merge] not yet implemented"); markSentinelComplete("merge"); } function runTiebreak(stats: RunStats): void { console.log("\n [tiebreak] Resolving flagged entries..."); // TODO: implement tiebreak logic console.log(" [tiebreak] not yet implemented"); stats.currentStage = "tiebreak"; } function runCompare(): void { console.log("\n [compare] Generating COVERAGE.md..."); // TODO: implement compare script console.log(" [compare] not yet implemented"); markSentinelComplete("compare"); } // ── Report generation ───────────────────────────────────────────────────────── async function generateReport(runName: string, stats: RunStats): Promise { const db = openDb(); const totalEntries = ( db.prepare("SELECT COUNT(*) as count FROM entries").get() as { count: number; } ).count; const resolvedEntries = ( db.prepare("SELECT COUNT(*) as count FROM resolved_entry_cefr").get() as { count: number; } ).count; const flaggedEntries = ( db .prepare( `SELECT COUNT(*) as count FROM run_status WHERE stage = 'merge' AND status = 'flagged'`, ) .get() as { count: number } ).count; const needsReview = ( db .prepare( `SELECT COUNT(*) as count FROM run_status WHERE status = 'needs_review'`, ) .get() as { count: number } ).count; db.close(); const stoppedAt = stats.stoppedAt ?? new Date(); const durationMs = stoppedAt.getTime() - stats.startedAt.getTime(); const durationMin = Math.round(durationMs / 60_000); const isFinal = getSentinelStatus("compare") === "complete" && flaggedEntries === 0; const report = { runName, generatedAt: stoppedAt.toISOString(), durationMinutes: durationMin, isFinal, progress: { totalEntries, resolvedEntries, flaggedEntries, needsReview, recordsProcessedThisRun: stats.recordsProcessed, recordsSkippedThisRun: stats.recordsSkipped, }, modelsRun: stats.modelsRun, stages: { reverseLink: isReverseLinkDone() ? "complete" : "pending", round1: ALL_PROVIDERS.map((p) => ({ model: p.name, status: getModelRound1Status(p.name), })), compileCandidates: getSentinelStatus("compile_candidates"), round2: ALL_PROVIDERS.map((p) => ({ model: p.name, status: getModelRound2Status(p.name), })), compileVotes: getSentinelStatus("compile_votes"), merge: getSentinelStatus("merge"), compare: getSentinelStatus("compare"), }, }; await fs.mkdir(PATHS.reports, { recursive: true }); const jsonPath = path.join(PATHS.reports, `${runName}.json`); const mdPath = path.join(PATHS.reports, `${runName}.md`); await fs.writeFile(jsonPath, JSON.stringify(report, null, 2), "utf-8"); const md = [ `# Pipeline run: ${runName}`, ``, `Generated: ${stoppedAt.toISOString()}`, `Duration: ${durationMin} minutes`, isFinal ? `**Status: FINAL — pipeline complete**` : `**Status: In progress**`, ``, `## Progress`, ``, `| Metric | Value |`, `| ------ | ----- |`, `| Total entries | ${totalEntries.toLocaleString()} |`, `| Resolved entries | ${resolvedEntries.toLocaleString()} |`, `| Flagged entries | ${flaggedEntries.toLocaleString()} |`, `| Needs review | ${needsReview.toLocaleString()} |`, `| Records processed this run | ${stats.recordsProcessed.toLocaleString()} |`, `| Records skipped this run | ${stats.recordsSkipped.toLocaleString()} |`, ``, `## Stage status`, ``, `### Reverse link: ${report.stages.reverseLink}`, ``, `### Round 1`, ``, ...report.stages.round1.map( (s) => `- ${s.status === "complete" ? "✅" : s.status === "in_progress" ? "🔄" : "🔲"} ${s.model}`, ), ``, `### Compile candidates: ${report.stages.compileCandidates}`, ``, `### Round 2`, ``, ...report.stages.round2.map( (s) => `- ${s.status === "complete" ? "✅" : s.status === "in_progress" ? "🔄" : "🔲"} ${s.model}`, ), ``, `### Compile votes: ${report.stages.compileVotes}`, `### Merge: ${report.stages.merge}`, `### Compare: ${report.stages.compare}`, ``, `## Models run this session`, ``, stats.modelsRun.length > 0 ? stats.modelsRun.map((m) => `- ${m}`).join("\n") : "_none_", ].join("\n"); await fs.writeFile(mdPath, md, "utf-8"); console.log(`\n Report written → ${jsonPath}`); console.log(` Report written → ${mdPath}`); } // ── Main ────────────────────────────────────────────────────────────────────── async function main(): Promise { console.log("lila data pipeline\n"); // ── Startup checks console.log("Checking prerequisites..."); await checkExtractedFilesExist(); await checkAndInitDb(); await checkAndImportDb(); console.log(" Prerequisites OK"); // ── Run name const runName = await generateRunName(); console.log(`\n Run: ${runName}`); // ── Stats const stats: RunStats = { startedAt: new Date(), stoppedAt: null, recordsProcessed: 0, recordsSkipped: 0, needsReview: 0, modelsRun: [], currentStage: null, }; registerShutdownHandler(stats); // ── Stage 2 — Reverse link runReverseLinkStage(); if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Round 1 console.log("\nRound 1 — generation"); for (const provider of ALL_PROVIDERS) { if (shutdownRequested) break; const status = getModelRound1Status(provider.name); if (status === "complete") { console.log(` [round 1] ${provider.name} — already complete, skipping`); continue; } await checkProviderReady(provider); stats.currentStage = "round1"; if (status === "in_progress") { console.log(` [round 1] ${provider.name} — resuming...`); } await runRound1(provider, stats); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Compile candidates if (getSentinelStatus("compile_candidates") === "complete") { console.log("\n [compile candidates] Already complete, skipping"); } else { stats.currentStage = "compile_candidates"; compileCandidates(); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Round 2 console.log("\nRound 2 — voting"); for (const provider of ALL_PROVIDERS) { if (shutdownRequested) break; const status = getModelRound2Status(provider.name); if (status === "complete") { console.log(` [round 2] ${provider.name} — already complete, skipping`); continue; } await checkProviderReady(provider); stats.currentStage = "round2"; if (status === "in_progress") { console.log(` [round 2] ${provider.name} — resuming...`); } runRound2(provider, stats); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Compile votes if (getSentinelStatus("compile_votes") === "complete") { console.log("\n [compile votes] Already complete, skipping"); } else { stats.currentStage = "compile_votes"; compileVotes(); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Merge if (getSentinelStatus("merge") === "complete") { console.log("\n [merge] Already complete, skipping"); } else { stats.currentStage = "merge"; runMerge(); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Tiebreak const db = openDb(); const flagged = ( db .prepare( `SELECT COUNT(*) as count FROM run_status WHERE stage = 'merge' AND status = 'flagged'`, ) .get() as { count: number } ).count; db.close(); if (flagged > 0) { stats.currentStage = "tiebreak"; runTiebreak(stats); } if (shutdownRequested) { await generateReport(runName, stats); process.exit(0); } // ── Compare if (getSentinelStatus("compare") === "complete") { console.log("\n [compare] Already complete, skipping"); } else { stats.currentStage = "compare"; runCompare(); } // ── Report (disabled until full pipeline is implemented) // stats.stoppedAt = new Date(); // await generateReport(runName, stats); console.log("\nPipeline complete."); } main().catch((err) => { console.error(err); process.exit(1); });