feat: add db import script, fix duplicate translations in extract, add annotate script

This commit is contained in:
lila 2026-05-03 22:05:10 +02:00
parent 4a842140b9
commit f59399be02
7 changed files with 274 additions and 62 deletions

222
data-pipeline/db/import.ts Normal file
View file

@ -0,0 +1,222 @@
import fs from "node:fs/promises";
import path from "node:path";
import { fileURLToPath } from "node:url";
import { SUPPORTED_LANGUAGE_CODES } from "@lila/shared";
import type { SupportedLanguageCode, SupportedPos } from "@lila/shared";
import { openDb } from "./index.js";
// ── Types ─────────────────────────────────────────────────────────────────────
type Example = { text: string; source: "omw" | "cefr" };
type AnnotatedRecord = {
source_id: string;
pos: SupportedPos;
translations: Partial<Record<SupportedLanguageCode, string[]>>;
glosses: Partial<Record<SupportedLanguageCode, string[]>>;
examples: Partial<Record<SupportedLanguageCode, Example[]>>;
votes: Partial<
Record<SupportedLanguageCode, Record<string, { cefr_source: string }>>
>;
};
// ── Paths ─────────────────────────────────────────────────────────────────────
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const PATHS = {
annotatedDir: path.resolve(__dirname, "../stage-2-annotate/output"),
};
// ── Loading ───────────────────────────────────────────────────────────────────
async function loadAnnotated(): Promise<AnnotatedRecord[]> {
// Use en.json as the base — it has the most complete glosses and examples.
// Merge votes and CEFR examples from the other language files.
const baseRaw = await fs.readFile(
path.join(PATHS.annotatedDir, "en.json"),
"utf-8",
);
const base = JSON.parse(baseRaw) as AnnotatedRecord[];
const byId = new Map<string, AnnotatedRecord>();
for (const record of base) {
byId.set(record.source_id, record);
}
for (const lang of SUPPORTED_LANGUAGE_CODES) {
if (lang === "en") continue;
const raw = await fs.readFile(
path.join(PATHS.annotatedDir, `${lang}.json`),
"utf-8",
);
const records = JSON.parse(raw) as AnnotatedRecord[];
for (const record of records) {
const base = byId.get(record.source_id);
if (!base) continue;
// Merge votes
for (const [l, langVotes] of Object.entries(record.votes)) {
if (!base.votes[l as SupportedLanguageCode]) {
base.votes[l as SupportedLanguageCode] = {};
}
Object.assign(base.votes[l as SupportedLanguageCode]!, langVotes);
}
// Merge CEFR examples not already in base
for (const [l, examples] of Object.entries(record.examples)) {
const lang = l as SupportedLanguageCode;
const cefrExamples = examples.filter((e) => e.source === "cefr");
if (cefrExamples.length === 0) continue;
if (!base.examples[lang]) {
base.examples[lang] = cefrExamples;
} else {
base.examples[lang].push(...cefrExamples);
}
}
}
}
return [...byId.values()];
}
// ── Import ────────────────────────────────────────────────────────────────────
export async function importStage2(): Promise<void> {
console.log("Loading stage 2 annotated files...");
const records = await loadAnnotated();
console.log(` Loaded ${records.length.toLocaleString()} synsets`);
const db = openDb();
const insertSynset = db.prepare(
`INSERT INTO synsets (source_id, pos) VALUES (?, ?)`,
);
const insertTranslation = db.prepare(
`INSERT INTO translations (source_id, language, word) VALUES (?, ?, ?)`,
);
const insertGloss = db.prepare(
`INSERT INTO glosses (source_id, language, text) VALUES (?, ?, ?)`,
);
const insertExample = db.prepare(
`INSERT INTO examples (source_id, language, text, source) VALUES (?, ?, ?, ?)`,
);
const insertCefrVote = db.prepare(`
INSERT INTO cefr_source_votes (translation_id, cefr_level)
VALUES (
(SELECT id FROM translations WHERE source_id = ? AND language = ? AND word = ?),
?
)
`);
console.log("\nImporting into pipeline.db...");
const importAll = db.transaction(() => {
let synsets = 0;
let translations = 0;
let glosses = 0;
let examples = 0;
let cefrVotes = 0;
for (const record of records) {
insertSynset.run(record.source_id, record.pos);
synsets++;
// Translations
for (const [lang, words] of Object.entries(record.translations)) {
const unique = [...new Set(words)];
for (const word of unique) {
insertTranslation.run(record.source_id, lang, word);
translations++;
}
}
// Glosses
for (const [lang, glossList] of Object.entries(record.glosses)) {
for (const text of glossList) {
insertGloss.run(record.source_id, lang, text);
glosses++;
}
}
// Examples
for (const [lang, exList] of Object.entries(record.examples)) {
for (const example of exList) {
insertExample.run(
record.source_id,
lang,
example.text,
example.source,
);
examples++;
}
}
// CEFR source votes
for (const [lang, langVotes] of Object.entries(record.votes)) {
for (const [word, vote] of Object.entries(
langVotes as Record<string, { cefr_source: string }>,
)) {
insertCefrVote.run(record.source_id, lang, word, vote.cefr_source);
cefrVotes++;
}
}
}
return { synsets, translations, glosses, examples, cefrVotes };
});
const counts = importAll();
console.log(` synsets: ${counts.synsets.toLocaleString()}`);
console.log(` translations: ${counts.translations.toLocaleString()}`);
console.log(` glosses: ${counts.glosses.toLocaleString()}`);
console.log(` examples: ${counts.examples.toLocaleString()}`);
console.log(` cefr votes: ${counts.cefrVotes.toLocaleString()}`);
db.close();
console.log("\nImport complete.");
}
// ── Check if already imported ─────────────────────────────────────────────────
export function isImported(): boolean {
const db = openDb();
const row = db.prepare(`SELECT COUNT(*) as count FROM synsets`).get() as {
count: number;
};
db.close();
return row.count > 0;
}
// ── Main ─────────────────────────────────────────────────────────────────────
async function main(): Promise<void> {
const db = openDb();
const row = db.prepare(`SELECT COUNT(*) as count FROM synsets`).get() as {
count: number;
};
db.close();
if (row.count > 0) {
console.log(
`pipeline.db already contains ${row.count.toLocaleString()} synsets — skipping import.`,
);
console.log("Delete pipeline.db and re-run db:init to start fresh.");
process.exit(0);
}
await importStage2();
}
main().catch((err) => {
console.error(err);
process.exit(1);
});

Binary file not shown.

View file

@ -4,7 +4,9 @@
"private": true, "private": true,
"type": "module", "type": "module",
"scripts": { "scripts": {
"db:import": "tsx db/import.ts",
"db:init": "tsx db/init.ts", "db:init": "tsx db/init.ts",
"annotate": "tsx stage-2-annotate/scripts/annotate.ts",
"test": "vitest run", "test": "vitest run",
"test:watch": "vitest" "test:watch": "vitest"
}, },

View file

@ -80,7 +80,7 @@ def extract_all(
continue continue
covered += 1 covered += 1
lemmas = [str(lemma) for lemma in synset.lemmas()] lemmas = list(dict.fromkeys(str(lemma) for lemma in synset.lemmas()))
defns = [d for d in synset.definitions() if d] defns = [d for d in synset.definitions() if d]
examples = [e for e in synset.examples() if e] examples = [e for e in synset.examples() if e]

View file

@ -196,12 +196,12 @@ async function annotate(): Promise<void> {
// Add CEFR vote // Add CEFR vote
if (!annotated.votes[lang]) annotated.votes[lang] = {}; if (!annotated.votes[lang]) annotated.votes[lang] = {};
annotated.votes[lang]![word] = { cefr_source: cefrEntry.level }; annotated.votes[lang][word] = { cefr_source: cefrEntry.level };
// Add native example if present // Add native example if present
if (cefrEntry.example) { if (cefrEntry.example) {
if (!annotated.examples[lang]) annotated.examples[lang] = []; if (!annotated.examples[lang]) annotated.examples[lang] = [];
annotated.examples[lang]!.push({ annotated.examples[lang].push({
text: cefrEntry.example, text: cefrEntry.example,
source: "cefr" as const, source: "cefr" as const,
}); });

View file

@ -140,4 +140,27 @@ describe("stage 1 — omw.json validation", () => {
expect(errors, `\n${errors.join("\n")}`).toHaveLength(0); expect(errors, `\n${errors.join("\n")}`).toHaveLength(0);
}); });
it("no duplicate translations within a single synset and language", async () => {
const raw = await fs.readFile(OMW_PATH, "utf-8");
const records = JSON.parse(raw) as OmwRecord[];
const errors: string[] = [];
for (const record of records) {
for (const [lang, words] of Object.entries(record.translations)) {
const seen = new Set<string>();
for (const word of words) {
if (seen.has(word)) {
errors.push(
`${record.source_id} (${lang}): duplicate translation "${word}"`,
);
}
seen.add(word);
}
}
}
expect(errors, `\n${errors.join("\n")}`).toHaveLength(0);
});
}); });

View file

@ -63,8 +63,9 @@ The database serves three purposes:
- **Resolved output** — the final resolved records live here and are read by - **Resolved output** — the final resolved records live here and are read by
the sync script to seed the production database. the sync script to seed the production database.
The schema is defined in `data-pipeline/db/schema.sql`. Never edit `pipeline.db` The schema is defined in `data-pipeline/db/schema.sql`. Never edit `pipeline.db` directly — all writes go through the pipeline scripts.
directly — all writes go through the pipeline scripts.
On first run the orchestrator initialises `pipeline.db` automatically and imports the stage 2 output into the base tables. This happens once — subsequent runs skip the import if the base tables are already populated.
## Data sources ## Data sources
@ -230,15 +231,11 @@ Words not present in the CEFR source file will have an empty `votes` object.
> `http://127.0.0.1:8080/health` and exits with instructions if it is not > `http://127.0.0.1:8080/health` and exits with instructions if it is not
> reachable. See `llm-setup.md` for setup instructions. > reachable. See `llm-setup.md` for setup instructions.
The enrich stage runs in two rounds, both designed to execute overnight one The enrich stage runs in two rounds, both designed to execute overnight one model at a time. All output is written to `pipeline.db` atomically per record — runs are fully resumable if interrupted. Each model is run once — one model produces one vote.
model at a time. All output is written to `pipeline.db` atomically per record
— runs are fully resumable if interrupted. Each model is run once — one model
produces one vote.
**Round 1 — generation** **Round 1 — generation**
Each model processes every word in every language one term at a time and Each model processes every word in every language one term at a time and generates:
generates:
- A CEFR level vote for each translation - A CEFR level vote for each translation
- A description for each language - A description for each language
@ -246,20 +243,11 @@ generates:
- A gloss for each language, only if OMW provides none - A gloss for each language, only if OMW provides none
- Usage examples for each language, only if OMW provides none - Usage examples for each language, only if OMW provides none
OMW data is never duplicated — the script checks what OMW already provides OMW data is never duplicated — the script checks what OMW already provides before building the prompt. For translations, glosses and examples, if OMW data exists for that language the LLM skips generation entirely. This significantly reduces compute time for languages with good OMW coverage such as English.
before building the prompt. For translations, glosses and examples, if OMW
data exists for that language the LLM skips generation entirely. This
significantly reduces compute time for languages with good OMW coverage such
as English.
All model-generated content is stored with an anonymised source (`model_1`, All model-generated content is stored with an anonymised source (`model_1`, `model_2` etc.) so models cannot be biased by knowing who generated what in round 2.
`model_2` etc.) so models cannot be biased by knowing who generated what in
round 2.
Each record is written to `pipeline.db` with status `complete` or Each record is written to `pipeline.db` with status `complete` or `needs_review` immediately after processing. If a record fails structural validation (invalid JSON, missing required fields, invalid CEFR value) it is marked `needs_review` and skipped — the run continues without interruption.
`needs_review` immediately after processing. If a record fails structural
validation (invalid JSON, missing required fields, invalid CEFR value) it is
marked `needs_review` and skipped — the run continues without interruption.
**Input:** `stage-2-annotate/output/{lang}.json` **Input:** `stage-2-annotate/output/{lang}.json`
**Output:** `pipeline.db` — round 1 results per record per model **Output:** `pipeline.db` — round 1 results per record per model
@ -270,9 +258,7 @@ pnpm --filter @lila/pipeline enrich --round 1 --model {model}
**Compiling candidates** **Compiling candidates**
Once all round 1 runs are complete, compile all generated candidates into a Once all round 1 runs are complete, compile all generated candidates into a single structured record per term in `pipeline.db`. This is the input to round 2.
single structured record per term in `pipeline.db`. This is the input to
round 2.
```bash ```bash
pnpm --filter @lila/pipeline enrich --compile-candidates pnpm --filter @lila/pipeline enrich --compile-candidates
@ -287,10 +273,7 @@ Each model receives the compiled candidate list for every word and votes on:
- The best usage examples candidate (if multiple exist) - The best usage examples candidate (if multiple exist)
- A CEFR level vote for each translation - A CEFR level vote for each translation
OMW data is not put to a vote — it automatically wins over any LLM-generated OMW data is not put to a vote — it automatically wins over any LLM-generated candidate. Round 2 only resolves conflicts between model-generated candidates. The prompt is kept small — one word at a time, a clean numbered candidate list — to fit within a limited context window.
candidate. Round 2 only resolves conflicts between model-generated candidates.
The prompt is kept small — one word at a time, a clean numbered candidate
list — to fit within a limited context window.
**Input:** `pipeline.db` — compiled candidates **Input:** `pipeline.db` — compiled candidates
**Output:** `pipeline.db` — round 2 votes per record per model **Output:** `pipeline.db` — round 2 votes per record per model
@ -301,8 +284,7 @@ pnpm --filter @lila/pipeline enrich --round 2 --model {model}
**Compiling votes** **Compiling votes**
Once all round 2 runs are complete, compile all votes into a final votes Once all round 2 runs are complete, compile all votes into a final votes record per term in `pipeline.db`. This is the input to the merge stage.
record per term in `pipeline.db`. This is the input to the merge stage.
```bash ```bash
pnpm --filter @lila/pipeline enrich --compile-votes pnpm --filter @lila/pipeline enrich --compile-votes
@ -310,9 +292,7 @@ pnpm --filter @lila/pipeline enrich --compile-votes
### 4. Merge ### 4. Merge
Reads compiled votes from `pipeline.db` and resolves the final value for Reads compiled votes from `pipeline.db` and resolves the final value for every field. Updates each record in `pipeline.db` with status `final` or `flagged`.
every field. Updates each record in `pipeline.db` with status `final` or
`flagged`.
**Merge rules:** **Merge rules:**
@ -340,18 +320,9 @@ pnpm --filter @lila/pipeline merge
### 4b. Tiebreak ### 4b. Tiebreak
Runs automatically after merge if any translations remain flagged. The script Runs automatically after merge if any translations remain flagged. The script queries `pipeline.db` for flagged translations, identifies which configured models have not yet voted on each word, and runs those models on the flagged subset only. Merge is re-run after each tiebreaker pass. This repeats until all flagged translations are resolved or no unused models remain.
queries `pipeline.db` for flagged translations, identifies which configured
models have not yet voted on each word, and runs those models on the flagged
subset only. Merge is re-run after each tiebreaker pass. This repeats until
all flagged translations are resolved or no unused models remain.
If unused models are exhausted and flagged translations remain, the script If unused models are exhausted and flagged translations remain, the script logs a detailed report showing the exact vote split for each unresolved word and lists available models from OpenRouter that have not been used. Seeding is blocked until all translations are resolved. To continue, add one or more models to the config and re-run the pipeline — the tiebreaker will pick up automatically.
logs a detailed report showing the exact vote split for each unresolved word
and lists available models from OpenRouter that have not been used. Seeding
is blocked until all translations are resolved. To continue, add one or more
models to the config and re-run the pipeline — the tiebreaker will pick up
automatically.
**Input:** `pipeline.db` — flagged translations from merge **Input:** `pipeline.db` — flagged translations from merge
**Output:** `pipeline.db` — flagged translations resolved to `final` **Output:** `pipeline.db` — flagged translations resolved to `final`
@ -361,9 +332,7 @@ automatically.
### 5. Compare / QA ### 5. Compare / QA
Read-only. Generates `COVERAGE.md` with a full breakdown of the pipeline Read-only. Generates `COVERAGE.md` with a full breakdown of the pipeline output quality per language. Run this after merge to verify output before syncing to the database.
output quality per language. Run this after merge to verify output before
syncing to the database.
**Input:** `pipeline.db` — records with status `final` **Input:** `pipeline.db` — records with status `final`
**Output:** `COVERAGE.md` **Output:** `COVERAGE.md`
@ -393,10 +362,7 @@ pnpm --filter @lila/pipeline compare
## Sync ## Sync
The sync script transfers all records with status `final` in `pipeline.db` to The sync script transfers all records with status `final` in `pipeline.db` to the production PostgreSQL database. It is upsert-based and never wipes existing data. For each record it checks whether a matching `source_id` already exists in the target database:
the production PostgreSQL database. It is upsert-based and never wipes
existing data. For each record it checks whether a matching `source_id`
already exists in the target database:
- **Missing** → insert - **Missing** → insert
- **Present but changed** → update - **Present but changed** → update
@ -408,14 +374,11 @@ Run this after all records are resolved and Compare / QA has been reviewed.
pnpm --filter @lila/pipeline sync pnpm --filter @lila/pipeline sync
``` ```
The sync script requires a connection string to the target database. Set The sync script requires a connection string to the target database. Set `DATABASE_URL` in your `.env` file before running.
`DATABASE_URL` in your `.env` file before running.
## Reports ## Reports
The pipeline generates a report at the end of every run. Reports are written The pipeline generates a report at the end of every run. Reports are written to `data-pipeline/reports/` as a JSON file and a markdown file with the same name. The markdown is generated from the JSON and contains identical data.
to `data-pipeline/reports/` as a JSON file and a markdown file with the same
name. The markdown is generated from the JSON and contains identical data.
``` ```
data-pipeline/reports/ data-pipeline/reports/
@ -497,10 +460,7 @@ dataset matures:
## Roadmap ## Roadmap
**Current state:** Stages 1 and 2 are complete and output has been reviewed **Current state:** Stages 1 and 2 are complete, validated, and imported into `pipeline.db`. Schema, init, import scripts, validation tests, and fixtures are all in place. Stage 3 scripts have not been written yet and llama.cpp is not installed.
for all five languages. Architecture for stages 36, the tiebreaker, and the
report system are finalised. Stage 3 scripts have not been written yet and
llama.cpp is not installed.
**Next action:** Write the stage 3 round 1 script. **Next action:** Write the stage 3 round 1 script.
@ -523,6 +483,11 @@ llama.cpp is not installed.
- [x] Write annotation script - [x] Write annotation script
- [x] Run annotation → per-language JSON + `conflicts.json` - [x] Run annotation → per-language JSON + `conflicts.json`
- [x] Add annotate script to package.json
- [x] Fix duplicate translations in extract.py
- [x] Write stage 1 and 2 validation tests
- [x] Write db schema, init, and import scripts
- [x] Write test fixtures
### Stage 3 — Enrich `🔲 not started` ### Stage 3 — Enrich `🔲 not started`