feat: add pipeline orchestrator skeleton with startup checks, stage runners, shutdown handler, and report generation

This commit is contained in:
lila 2026-05-03 23:01:29 +02:00
parent 080fad1998
commit 87aeb072c5
4 changed files with 584 additions and 9 deletions

View file

@ -216,7 +216,9 @@ async function main(): Promise<void> {
await importStage2(); await importStage2();
} }
main().catch((err) => { if (import.meta.url === `file://${process.argv[1]}`) {
console.error(err); main().catch((err) => {
process.exit(1); console.error(err);
}); process.exit(1);
});
}

View file

@ -33,7 +33,10 @@ async function main(): Promise<void> {
await initDb(); await initDb();
} }
main().catch((err) => { // after
console.error(err); if (import.meta.url === `file://${process.argv[1]}`) {
process.exit(1); main().catch((err) => {
}); console.error(err);
process.exit(1);
});
}

View file

@ -41,7 +41,16 @@ CREATE TABLE IF NOT EXISTS cefr_source_votes (
-- stage: round1 | round2 | tiebreak -- stage: round1 | round2 | tiebreak
-- status: pending | complete | needs_review | flagged -- status: pending | complete | needs_review | flagged
CREATE TABLE IF NOT EXISTS run_status (
id INTEGER PRIMARY KEY,
source_id TEXT NOT NULL,
model_name TEXT NOT NULL,
stage TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE (source_id, model_name, stage)
);
-- ── Round 1 output ──────────────────────────────────────────────────────────── -- ── Round 1 output ────────────────────────────────────────────────────────────
-- One row per translation/language per model. Written atomically per record. -- One row per translation/language per model. Written atomically per record.

561
data-pipeline/pipeline.ts Normal file
View file

@ -0,0 +1,561 @@
import fs from "node:fs/promises";
import path from "node:path";
import { fileURLToPath } from "node:url";
import { initDb } from "./db/init.js";
import { isImported, importStage2 } from "./db/import.js";
import { openDb } from "./db/index.js";
import { ALL_PROVIDERS, validateProviderKey } from "./stage-3-enrich/config.js";
import type { ProviderConfig } from "./stage-3-enrich/config.js";
// ── Types ─────────────────────────────────────────────────────────────────────
type RunStage =
| "round1"
| "compile_candidates"
| "round2"
| "compile_votes"
| "merge"
| "tiebreak"
| "compare";
type StageStatus = "complete" | "pending" | "in_progress";
type RunStats = {
startedAt: Date;
stoppedAt: Date | null;
recordsProcessed: number;
recordsSkipped: number;
needsReview: number;
modelsRun: string[];
currentStage: RunStage | null;
};
// ── Constants ─────────────────────────────────────────────────────────────────
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const PATHS = {
omw: path.join(__dirname, "stage-1-extract/output/omw.json"),
db: path.join(__dirname, "db/pipeline.db"),
reports: path.join(__dirname, "reports"),
llamaHealth: "http://127.0.0.1:8080/health",
};
const SENTINEL = { sourceId: "system", modelName: "system" };
// ── Startup checks ────────────────────────────────────────────────────────────
async function checkOmwExists(): Promise<void> {
try {
await fs.access(PATHS.omw);
} catch {
console.error("\n ERROR: stage-1-extract/output/omw.json not found.");
console.error(" Run the stage 1 extraction script first:");
console.error(" python stage-1-extract/scripts/extract.py\n");
process.exit(1);
}
}
async function checkAndInitDb(): Promise<void> {
try {
await fs.access(PATHS.db);
} catch {
console.log(" pipeline.db not found — initialising...");
await initDb();
}
}
async function checkAndImportDb(): Promise<void> {
if (!isImported()) {
console.log(" Base tables empty — importing stage 2 data...");
await importStage2();
}
}
async function checkLlamaServer(): Promise<boolean> {
try {
const res = await fetch(PATHS.llamaHealth);
return res.ok;
} catch {
return false;
}
}
function isLocalProvider(provider: ProviderConfig): boolean {
return provider.apiKey === "none";
}
async function checkProviderReady(provider: ProviderConfig): Promise<void> {
if (isLocalProvider(provider)) {
const running = await checkLlamaServer();
if (!running) {
console.error("\n ERROR: llama.cpp server is not running.");
console.error(" Start the server before running the pipeline:");
console.error(
" ./build/bin/llama-server --model models/<model>.gguf \\",
);
console.error(" --port 8080 --host 127.0.0.1");
console.error(" See llm-setup.md for full instructions.\n");
process.exit(1);
}
} else {
validateProviderKey(provider);
}
}
// ── Run name generation ───────────────────────────────────────────────────────
async function generateRunName(): Promise<string> {
await fs.mkdir(PATHS.reports, { recursive: true });
const date = new Date().toISOString().slice(0, 10);
const files = await fs.readdir(PATHS.reports);
const todaysRuns = files.filter(
(f) => f.startsWith(date) && f.endsWith(".json"),
).length;
return `${date}_run-${todaysRuns + 1}`;
}
// ── Shutdown handler ──────────────────────────────────────────────────────────
let shutdownRequested = false;
function registerShutdownHandler(stats: RunStats): void {
const handler = (): void => {
if (shutdownRequested) return;
shutdownRequested = true;
stats.stoppedAt = new Date();
console.log("\n\n Shutdown requested — finishing current record...");
};
process.on("SIGINT", handler);
process.on("SIGTERM", handler);
}
// ── Stage status helpers ──────────────────────────────────────────────────────
function getSentinelStatus(stage: RunStage): StageStatus {
const db = openDb();
const row = db
.prepare(
`SELECT status FROM run_status
WHERE source_id = ? AND model_name = ? AND stage = ?`,
)
.get(SENTINEL.sourceId, SENTINEL.modelName, stage) as
| { status: string }
| undefined;
db.close();
return row?.status === "complete" ? "complete" : "pending";
}
function markSentinelComplete(stage: RunStage): void {
const db = openDb();
db.prepare(
`INSERT INTO run_status (source_id, model_name, stage, status)
VALUES (?, ?, ?, 'complete')
ON CONFLICT (source_id, model_name, stage)
DO UPDATE SET status = 'complete', updated_at = datetime('now')`,
).run(SENTINEL.sourceId, SENTINEL.modelName, stage);
db.close();
}
function getModelRound1Status(modelName: string): StageStatus {
const db = openDb();
const total = (
db.prepare("SELECT COUNT(*) as count FROM synsets").get() as {
count: number;
}
).count;
const complete = (
db
.prepare(
`SELECT COUNT(*) as count FROM run_status
WHERE model_name = ? AND stage = 'round1' AND status = 'complete'`,
)
.get(modelName) as { count: number }
).count;
db.close();
if (complete === 0) return "pending";
if (complete >= total) return "complete";
return "in_progress";
}
function getModelRound2Status(modelName: string): StageStatus {
const db = openDb();
const total = (
db.prepare("SELECT COUNT(*) as count FROM synsets").get() as {
count: number;
}
).count;
const complete = (
db
.prepare(
`SELECT COUNT(*) as count FROM run_status
WHERE model_name = ? AND stage = 'round2' AND status = 'complete'`,
)
.get(modelName) as { count: number }
).count;
db.close();
if (complete === 0) return "pending";
if (complete >= total) return "complete";
return "in_progress";
}
// ── Stage runners (stubs) ─────────────────────────────────────────────────────
function runRound1(provider: ProviderConfig, stats: RunStats): void {
console.log(`\n [round 1] Running ${provider.name}...`);
// TODO: implement round 1 enrich script
console.log(` [round 1] ${provider.name} — not yet implemented`);
stats.modelsRun.push(provider.name);
}
function compileCandidates(): void {
console.log("\n [compile candidates] Compiling round 1 output...");
// TODO: implement compile candidates script
console.log(" [compile candidates] not yet implemented");
markSentinelComplete("compile_candidates");
}
function runRound2(provider: ProviderConfig, stats: RunStats): void {
console.log(`\n [round 2] Running ${provider.name}...`);
// TODO: implement round 2 enrich script
console.log(` [round 2] ${provider.name} — not yet implemented`);
stats.modelsRun.push(provider.name);
}
function compileVotes(): void {
console.log("\n [compile votes] Compiling round 2 votes...");
// TODO: implement compile votes script
console.log(" [compile votes] not yet implemented");
markSentinelComplete("compile_votes");
}
function runMerge(): void {
console.log("\n [merge] Resolving votes...");
// TODO: implement merge script
console.log(" [merge] not yet implemented");
markSentinelComplete("merge");
}
function runTiebreak(stats: RunStats): void {
console.log("\n [tiebreak] Resolving flagged translations...");
// TODO: implement tiebreak logic
console.log(" [tiebreak] not yet implemented");
stats.currentStage = "tiebreak";
}
function runCompare(): void {
console.log("\n [compare] Generating COVERAGE.md...");
// TODO: implement compare script
console.log(" [compare] not yet implemented");
markSentinelComplete("compare");
}
// ── Report generation ─────────────────────────────────────────────────────────
async function generateReport(runName: string, stats: RunStats): Promise<void> {
const db = openDb();
const totalSynsets = (
db.prepare("SELECT COUNT(*) as count FROM synsets").get() as {
count: number;
}
).count;
const resolvedTranslations = (
db.prepare("SELECT COUNT(*) as count FROM resolved_translations").get() as {
count: number;
}
).count;
const flaggedTranslations = (
db
.prepare(
`SELECT COUNT(*) as count FROM run_status
WHERE stage = 'merge' AND status = 'flagged'`,
)
.get() as { count: number }
).count;
const needsReview = (
db
.prepare(
`SELECT COUNT(*) as count FROM run_status
WHERE status = 'needs_review'`,
)
.get() as { count: number }
).count;
db.close();
const stoppedAt = stats.stoppedAt ?? new Date();
const durationMs = stoppedAt.getTime() - stats.startedAt.getTime();
const durationMin = Math.round(durationMs / 60_000);
const isFinal =
getSentinelStatus("compare") === "complete" && flaggedTranslations === 0;
const report = {
runName,
generatedAt: stoppedAt.toISOString(),
durationMinutes: durationMin,
isFinal,
progress: {
totalSynsets,
resolvedTranslations,
flaggedTranslations,
needsReview,
recordsProcessedThisRun: stats.recordsProcessed,
recordsSkippedThisRun: stats.recordsSkipped,
},
modelsRun: stats.modelsRun,
stages: {
round1: ALL_PROVIDERS.map((p) => ({
model: p.name,
status: getModelRound1Status(p.name),
})),
compileCandidates: getSentinelStatus("compile_candidates"),
round2: ALL_PROVIDERS.map((p) => ({
model: p.name,
status: getModelRound2Status(p.name),
})),
compileVotes: getSentinelStatus("compile_votes"),
merge: getSentinelStatus("merge"),
compare: getSentinelStatus("compare"),
},
};
await fs.mkdir(PATHS.reports, { recursive: true });
const jsonPath = path.join(PATHS.reports, `${runName}.json`);
const mdPath = path.join(PATHS.reports, `${runName}.md`);
await fs.writeFile(jsonPath, JSON.stringify(report, null, 2), "utf-8");
const md = [
`# Pipeline run: ${runName}`,
``,
`Generated: ${stoppedAt.toISOString()}`,
`Duration: ${durationMin} minutes`,
isFinal
? `**Status: FINAL — pipeline complete**`
: `**Status: In progress**`,
``,
`## Progress`,
``,
`| Metric | Value |`,
`| ------ | ----- |`,
`| Total synsets | ${totalSynsets.toLocaleString()} |`,
`| Resolved translations | ${resolvedTranslations.toLocaleString()} |`,
`| Flagged translations | ${flaggedTranslations.toLocaleString()} |`,
`| Needs review | ${needsReview.toLocaleString()} |`,
`| Records processed this run | ${stats.recordsProcessed.toLocaleString()} |`,
`| Records skipped this run | ${stats.recordsSkipped.toLocaleString()} |`,
``,
`## Stage status`,
``,
`### Round 1`,
``,
...report.stages.round1.map(
(s) =>
`- ${s.status === "complete" ? "✅" : s.status === "in_progress" ? "🔄" : "🔲"} ${s.model}`,
),
``,
`### Compile candidates: ${report.stages.compileCandidates}`,
``,
`### Round 2`,
``,
...report.stages.round2.map(
(s) =>
`- ${s.status === "complete" ? "✅" : s.status === "in_progress" ? "🔄" : "🔲"} ${s.model}`,
),
``,
`### Compile votes: ${report.stages.compileVotes}`,
`### Merge: ${report.stages.merge}`,
`### Compare: ${report.stages.compare}`,
``,
`## Models run this session`,
``,
stats.modelsRun.length > 0
? stats.modelsRun.map((m) => `- ${m}`).join("\n")
: "_none_",
].join("\n");
await fs.writeFile(mdPath, md, "utf-8");
console.log(`\n Report written → ${jsonPath}`);
console.log(` Report written → ${mdPath}`);
}
// ── Main ──────────────────────────────────────────────────────────────────────
async function main(): Promise<void> {
console.log("lila data pipeline\n");
// ── Startup checks
console.log("Checking prerequisites...");
await checkOmwExists();
await checkAndInitDb();
await checkAndImportDb();
console.log(" Prerequisites OK");
// ── Run name
const runName = await generateRunName();
console.log(`\n Run: ${runName}`);
// ── Stats
const stats: RunStats = {
startedAt: new Date(),
stoppedAt: null,
recordsProcessed: 0,
recordsSkipped: 0,
needsReview: 0,
modelsRun: [],
currentStage: null,
};
registerShutdownHandler(stats);
// ── Round 1
console.log("\nRound 1 — generation");
for (const provider of ALL_PROVIDERS) {
if (shutdownRequested) break;
const status = getModelRound1Status(provider.name);
if (status === "complete") {
console.log(` [round 1] ${provider.name} — already complete, skipping`);
continue;
}
await checkProviderReady(provider);
stats.currentStage = "round1";
if (status === "in_progress") {
console.log(` [round 1] ${provider.name} — resuming...`);
}
runRound1(provider, stats);
}
if (shutdownRequested) {
await generateReport(runName, stats);
process.exit(0);
}
// ── Compile candidates
if (getSentinelStatus("compile_candidates") === "complete") {
console.log("\n [compile candidates] Already complete, skipping");
} else {
stats.currentStage = "compile_candidates";
compileCandidates();
}
if (shutdownRequested) {
await generateReport(runName, stats);
process.exit(0);
}
// ── Round 2
console.log("\nRound 2 — voting");
for (const provider of ALL_PROVIDERS) {
if (shutdownRequested) break;
const status = getModelRound2Status(provider.name);
if (status === "complete") {
console.log(` [round 2] ${provider.name} — already complete, skipping`);
continue;
}
await checkProviderReady(provider);
stats.currentStage = "round2";
if (status === "in_progress") {
console.log(` [round 2] ${provider.name} — resuming...`);
}
runRound2(provider, stats);
}
if (shutdownRequested) {
await generateReport(runName, stats);
process.exit(0);
}
// ── Compile votes
if (getSentinelStatus("compile_votes") === "complete") {
console.log("\n [compile votes] Already complete, skipping");
} else {
stats.currentStage = "compile_votes";
compileVotes();
}
if (shutdownRequested) {
await generateReport(runName, stats);
process.exit(0);
}
// ── Merge
if (getSentinelStatus("merge") === "complete") {
console.log("\n [merge] Already complete, skipping");
} else {
stats.currentStage = "merge";
runMerge();
}
if (shutdownRequested) {
await generateReport(runName, stats);
process.exit(0);
}
// ── Tiebreak
const db = openDb();
const flagged = (
db
.prepare(
`SELECT COUNT(*) as count FROM run_status
WHERE stage = 'merge' AND status = 'flagged'`,
)
.get() as { count: number }
).count;
db.close();
if (flagged > 0) {
stats.currentStage = "tiebreak";
runTiebreak(stats);
}
if (shutdownRequested) {
await generateReport(runName, stats);
process.exit(0);
}
// ── Compare
if (getSentinelStatus("compare") === "complete") {
console.log("\n [compare] Already complete, skipping");
} else {
stats.currentStage = "compare";
runCompare();
}
// ── Report
stats.stoppedAt = new Date();
await generateReport(runName, stats);
console.log("\nPipeline complete.");
}
main().catch((err) => {
console.error(err);
process.exit(1);
});