reorganising file structure

This commit is contained in:
lila 2026-04-20 07:48:44 +02:00
parent 3f125ba162
commit 1f42239779
43 changed files with 3 additions and 3350182 deletions

3
.gitignore vendored
View file

@ -9,3 +9,6 @@ repomix/
venv/
__pycache__/
*.pyc
archive/
output/

View file

View file

View file

View file

View file

View file

View file

@ -1,205 +0,0 @@
# CEFR Data Pipeline
This directory contains the source data files and extraction/merge pipeline for generating CEFR-enriched datasets. The final outputs (`english-merged.json`, `italian-merged.json`) are consumed by the database seeding process in `packages/db`.
## Overview
The pipeline transforms raw vocabulary data from multiple sources into a standardized format, resolves conflicts between sources, and produces an authoritative CEFR dataset per language. This dataset is then used by the lila database package to update translation records.
## Supported Languages
- ✅ English (`en`)
- ✅ Italian (`it`)
## Pipeline Stages
### Stage 1: Extraction
Each source file is processed by a dedicated extractor script. The extractor reads the source-specific format, normalizes the data, filters for supported parts of speech, and outputs a standardized JSON file.
**Input:** Raw source files (JSON, CSV, XLS)
**Output:** `{source}-extracted.json` files (same directory as source)
**Normalization rules:**
- Words are lowercased and trimmed
- Part of speech is mapped to supported values (noun, verb)
- Entries with unsupported POS are skipped
- CEFR levels are validated against A1-C2
- Each record includes the source identifier for traceability
**Extractor Scripts:**
| Language | Source | Script |
| -------- | -------------- | ---------------------------------------------------- |
| English | `cefrj.csv` | `extraction-scripts/english/extract-cefrj-csv.py` |
| English | `en_m3.xls` | `extraction-scripts/english/extract-en_m3.py` |
| English | `octanove.csv` | `extraction-scripts/english/extract-octanove.py` |
| English | `random.json` | `extraction-scripts/english/extract-random-json.py` |
| Italian | `it_m3.xls` | `extraction-scripts/italian/extract-it_m3.py` |
| Italian | `italian.json` | `extraction-scripts/italian/extract-italian-json.py` |
### Stage 2: Comparison
Before merging, sources are compared to identify agreements and conflicts. This stage is read-only and serves as a quality gate.
**Input:** All `{source}-extracted.json` files for a language
**Output:** Console report showing:
- Entry counts per source and CEFR level
- Overlap between sources (words appearing in multiple sources)
- Agreement rate (sources assigning the same CEFR level)
- Conflicts (same word/POS with different CEFR levels)
**Comparison Scripts:**
| Language | Script |
| -------- | --------------------------------------- |
| English | `comparison-scripts/compare-english.py` |
| Italian | `comparison-scripts/compare-italian.py` |
Run from the `scripts/` directory:
python comparison-scripts/compare-english.py
python comparison-scripts/compare-italian.py
### Stage 3: Merge
Multiple extracted sources are merged into a single authoritative JSON file per language. When the same word/POS appears in multiple sources with different CEFR levels, the conflict is resolved using a predefined priority order.
**Input:** All `{source}-extracted.json` files for a language
**Output:** `{language}-merged.json` in `../datafiles/`
**Merge rules:**
- Single source: use that source's CEFR level
- Multiple sources agree: use the agreed CEFR level
- Multiple sources conflict: use the level from the highest-priority source
**Difficulty derivation:**
Difficulty is not extracted from sources. It is derived from the final CEFR level:
- A1, A2 → easy
- B1, B2 → intermediate
- C1, C2 → hard
The merged file includes both CEFR level and derived difficulty, plus a list of sources that contributed to each entry.
**Merge Scripts & Priorities:**
| Language | Script | Priority (lowest → highest) |
| -------- | ------------------------------------- | -------------------------------------- |
| English | `merge-scripts/merge-english-json.py` | `random`, `octanove`, `cefrj`, `en_m3` |
| Italian | `merge-scripts/merge-italian-json.py` | `italian`, `it_m3` |
Run from the `scripts/` directory:
python merge-scripts/merge-english-json.py
python merge-scripts/merge-italian-json.py
### Stage 4: Enrichment
The authoritative merged file is consumed by the database package (packages/db) during the seeding or update process. This stage is implemented in TypeScript and is not part of the Python scripts in this directory.
## File Organization
```
scripts/
├── comparison-scripts/
│ ├── compare-english.py
│ └── compare-italian.py # Stage 2: compare extracted data
├── datafiles/
│ ├── english-merged.json # Stage 3 output (authoritative)
│ ├── italian-merged.json # Stage 3 output (authoritative)
│ ├── omw-noun.json
│ └── omw-verb.json
├── data-sources/
│ ├── english/
│ │ ├── cefrj.csv
│ │ ├── cefrj-extracted.json
│ │ ├── en_m3.xls
│ │ ├── en_m3-extracted.json
│ │ ├── octanove.csv
│ │ ├── octanove-extracted.json
│ │ ├── random.json
│ │ └── random-extracted.json
│ ├── french/ # (future)
│ ├── german/ # (future)
│ ├── italian/
│ │ ├── it_m3.xls
│ │ ├── it_m3-extracted.json
│ │ ├── italian.json
│ │ └── italian-extracted.json
│ └── spanish/ # (future)
├── extraction-scripts/
│ └── english/
│ ├── extract-cefrj-csv.py
│ ├── extract-en_m3.py
│ ├── extract-octanove.py
│ └── extract-random-json.py
│ └── italian/
│ ├── extract-it_m3.py
│ └── extract-italian-json.py
├── merge-scripts/
│ └── merge-english-json.py # Stage 3: merge into authority
├── extract-own-save-to-json.py # script to extract words from wordnet
├── requirements.txt
└── README.md # This file
```
Extracted files are co-located with their sources for easy traceability. Merged files live in `../datafiles/`.
## Source Priority by Language
Source priority determines which CEFR level wins when sources conflict:
**English:**
1. en_m3
2. cefrj
3. octanove
4. random
**Italian:**
1. it_m3
2. italian
Priority is defined in the merge configuration. Higher priority sources override lower priority sources when conflicts occur.
This is defined in merge-scripts/merge-english-json.py.
## Data Flow Summary
```
Raw Source → Extracted JSON → Merged JSON → Database
(1) (2) (3) (4)
```
1. **Extract:** Transform source formats to normalized records
2. **Compare:** Validate source quality and surface conflicts
3. **Merge:** Resolve conflicts, derive difficulty, create authority
4. **Enrich:** Write to database (handled in packages/db)
## Adding New Sources
To add a new source:
1. Place the raw file in the appropriate `data-sources/{language}/` directory
2. Create an extractor script in `../extractors/{language}/`
3. Run the extractor to generate `{source}-extracted.json`
4. Run comparison to assess coverage and conflicts
5. Update source priority in the merge configuration if needed
6. Run merge to regenerate the authoritative file
7. Run enrichment to update the database
## Constants and Constraints
The pipeline respects these constraints from the lila shared constants:
- **Supported languages:** en, it
- **Supported parts of speech:** noun, verb
- **CEFR levels:** A1, A2, B1, B2, C1, C2
- **Difficulty levels:** easy, intermediate, hard
Entries violating these constraints are filtered out during extraction.

View file

@ -1,166 +0,0 @@
#!/usr/bin/env python3
"""
CEFR Data Pipeline - Stage 2: English Comparison
Compares extracted JSON files for English and reports agreements and conflicts.
"""
import json
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Tuple
# Supported CEFR levels
CEFR_LEVELS = {"A1", "A2", "B1", "B2", "C1", "C2"}
def load_extracted_files(data_dir: Path) -> Dict[str, List[dict]]:
"""Load all *-extracted.json files from the English data directory."""
sources = {}
for file_path in data_dir.glob("*-extracted.json"):
source_name = file_path.stem.replace("-extracted", "")
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
sources[source_name] = data
else:
print(f"Warning: {file_path} does not contain a list, skipping.")
return sources
def normalize_entry(entry: dict) -> Tuple[str, str]:
"""Return (word, pos) key for comparison."""
return entry["word"].lower().strip(), entry["pos"].lower().strip()
def compute_statistics(sources: Dict[str, List[dict]]) -> dict:
"""Compute overlap, agreement, and conflict statistics."""
# Per-source counts by CEFR level
source_counts = {}
for src, entries in sources.items():
cefr_counts = defaultdict(int)
for e in entries:
cefr = e.get("cefr", "UNKNOWN")
cefr_counts[cefr] += 1
source_counts[src] = dict(cefr_counts)
# Build word->pos->sources and CEFR assignments
word_map = defaultdict(lambda: defaultdict(dict))
for src, entries in sources.items():
for e in entries:
key = normalize_entry(e)
word_map[key][src] = e["cefr"]
# Compute overlaps, agreements, conflicts
total_entries = sum(len(e) for e in sources.values())
unique_words = len(word_map)
overlap_stats = defaultdict(int)
agreement_count = 0
conflict_count = 0
conflict_details = []
for key, src_cefr_map in word_map.items():
num_sources = len(src_cefr_map)
overlap_stats[num_sources] += 1
if num_sources > 1:
cefr_values = set(src_cefr_map.values())
if len(cefr_values) == 1:
agreement_count += 1
else:
conflict_count += 1
conflict_details.append(
{"word": key[0], "pos": key[1], "assignments": dict(src_cefr_map)}
)
return {
"source_counts": source_counts,
"total_entries": total_entries,
"unique_words": unique_words,
"overlap_distribution": dict(overlap_stats),
"agreements": agreement_count,
"conflicts": conflict_count,
"conflict_details": conflict_details,
}
def print_report(stats: dict, sources: Dict[str, List[dict]]):
"""Print formatted comparison report."""
print(f"\n{'=' * 60}")
print("CEFR COMPARISON REPORT - ENGLISH")
print(f"{'=' * 60}")
# Source entry counts
print("\n📊 ENTRIES PER SOURCE AND CEFR LEVEL")
print("-" * 50)
for src, counts in stats["source_counts"].items():
total = sum(counts.values())
print(f"\n{src}: {total} total entries")
for level in CEFR_LEVELS:
cnt = counts.get(level, 0)
if cnt > 0:
print(f" {level}: {cnt}")
# Show non-standard levels
for level, cnt in counts.items():
if level not in CEFR_LEVELS and level != "UNKNOWN":
print(f" {level}: {cnt} (non-standard)")
# Overlap statistics
print("\n🔄 OVERLAP BETWEEN SOURCES")
print("-" * 50)
print(f"Total unique (word, POS) combinations: {stats['unique_words']}")
print(f"Total entries across all sources: {stats['total_entries']}")
overlap = stats["overlap_distribution"]
for n_sources in sorted(overlap.keys()):
count = overlap[n_sources]
pct = (count / stats["unique_words"]) * 100
print(f"Words appearing in {n_sources} source(s): {count} ({pct:.1f}%)")
# Agreement and conflicts
print("\n⚖️ AGREEMENT / CONFLICT SUMMARY")
print("-" * 50)
print(f"Words with >1 source: {stats['agreements'] + stats['conflicts']}")
print(f" ✅ Agreements (same CEFR): {stats['agreements']}")
print(f" ❌ Conflicts (different CEFR): {stats['conflicts']}")
if stats["conflicts"] > 0:
agreement_rate = (
stats["agreements"] / (stats["agreements"] + stats["conflicts"])
) * 100
print(f" Agreement rate: {agreement_rate:.1f}%")
print("\n📋 CONFLICT DETAILS (first 10 shown):")
for i, conflict in enumerate(stats["conflict_details"][:10]):
print(f" {i + 1}. {conflict['word']} ({conflict['pos']})")
for src, cefr in conflict["assignments"].items():
print(f" {src}: {cefr}")
if len(stats["conflict_details"]) > 10:
print(f" ... and {len(stats['conflict_details']) - 10} more conflicts.")
print(f"\n{'=' * 60}\n")
def main():
# Determine paths
script_dir = Path(__file__).parent
data_dir = script_dir.parent / "data-sources" / "english"
if not data_dir.exists():
print(f"Error: English data directory not found: {data_dir}")
return
print(f"Loading extracted files from {data_dir}...")
sources = load_extracted_files(data_dir)
if not sources:
print("No extracted files found.")
return
print(f"Found sources: {', '.join(sources.keys())}")
stats = compute_statistics(sources)
print_report(stats, sources)
if __name__ == "__main__":
main()

View file

@ -1,166 +0,0 @@
#!/usr/bin/env python3
"""
CEFR Data Pipeline - Stage 2: Italian Comparison
Compares extracted JSON files for Italian and reports agreements and conflicts.
"""
import json
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Tuple
# Supported CEFR levels
CEFR_LEVELS = {"A1", "A2", "B1", "B2", "C1", "C2"}
def load_extracted_files(data_dir: Path) -> Dict[str, List[dict]]:
"""Load all *-extracted.json files from the Italian data directory."""
sources = {}
for file_path in data_dir.glob("*-extracted.json"):
source_name = file_path.stem.replace("-extracted", "")
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
sources[source_name] = data
else:
print(f"Warning: {file_path} does not contain a list, skipping.")
return sources
def normalize_entry(entry: dict) -> Tuple[str, str]:
"""Return (word, pos) key for comparison."""
return entry["word"].lower().strip(), entry["pos"].lower().strip()
def compute_statistics(sources: Dict[str, List[dict]]) -> dict:
"""Compute overlap, agreement, and conflict statistics."""
# Per-source counts by CEFR level
source_counts = {}
for src, entries in sources.items():
cefr_counts = defaultdict(int)
for e in entries:
cefr = e.get("cefr", "UNKNOWN")
cefr_counts[cefr] += 1
source_counts[src] = dict(cefr_counts)
# Build word->pos->sources and CEFR assignments
word_map = defaultdict(lambda: defaultdict(dict))
for src, entries in sources.items():
for e in entries:
key = normalize_entry(e)
word_map[key][src] = e["cefr"]
# Compute overlaps, agreements, conflicts
total_entries = sum(len(e) for e in sources.values())
unique_words = len(word_map)
overlap_stats = defaultdict(int)
agreement_count = 0
conflict_count = 0
conflict_details = []
for key, src_cefr_map in word_map.items():
num_sources = len(src_cefr_map)
overlap_stats[num_sources] += 1
if num_sources > 1:
cefr_values = set(src_cefr_map.values())
if len(cefr_values) == 1:
agreement_count += 1
else:
conflict_count += 1
conflict_details.append(
{"word": key[0], "pos": key[1], "assignments": dict(src_cefr_map)}
)
return {
"source_counts": source_counts,
"total_entries": total_entries,
"unique_words": unique_words,
"overlap_distribution": dict(overlap_stats),
"agreements": agreement_count,
"conflicts": conflict_count,
"conflict_details": conflict_details,
}
def print_report(stats: dict, sources: Dict[str, List[dict]]):
"""Print formatted comparison report."""
print(f"\n{'=' * 60}")
print("CEFR COMPARISON REPORT - ITALIAN")
print(f"{'=' * 60}")
# Source entry counts
print("\n📊 ENTRIES PER SOURCE AND CEFR LEVEL")
print("-" * 50)
for src, counts in stats["source_counts"].items():
total = sum(counts.values())
print(f"\n{src}: {total} total entries")
for level in CEFR_LEVELS:
cnt = counts.get(level, 0)
if cnt > 0:
print(f" {level}: {cnt}")
# Show non-standard levels
for level, cnt in counts.items():
if level not in CEFR_LEVELS and level != "UNKNOWN":
print(f" {level}: {cnt} (non-standard)")
# Overlap statistics
print("\n🔄 OVERLAP BETWEEN SOURCES")
print("-" * 50)
print(f"Total unique (word, POS) combinations: {stats['unique_words']}")
print(f"Total entries across all sources: {stats['total_entries']}")
overlap = stats["overlap_distribution"]
for n_sources in sorted(overlap.keys()):
count = overlap[n_sources]
pct = (count / stats["unique_words"]) * 100
print(f"Words appearing in {n_sources} source(s): {count} ({pct:.1f}%)")
# Agreement and conflicts
print("\n⚖️ AGREEMENT / CONFLICT SUMMARY")
print("-" * 50)
print(f"Words with >1 source: {stats['agreements'] + stats['conflicts']}")
print(f" ✅ Agreements (same CEFR): {stats['agreements']}")
print(f" ❌ Conflicts (different CEFR): {stats['conflicts']}")
if stats["conflicts"] > 0:
agreement_rate = (
stats["agreements"] / (stats["agreements"] + stats["conflicts"])
) * 100
print(f" Agreement rate: {agreement_rate:.1f}%")
print("\n📋 CONFLICT DETAILS (first 10 shown):")
for i, conflict in enumerate(stats["conflict_details"][:10]):
print(f" {i + 1}. {conflict['word']} ({conflict['pos']})")
for src, cefr in conflict["assignments"].items():
print(f" {src}: {cefr}")
if len(stats["conflict_details"]) > 10:
print(f" ... and {len(stats['conflict_details']) - 10} more conflicts.")
print(f"\n{'=' * 60}\n")
def main():
# Determine paths
script_dir = Path(__file__).parent
data_dir = script_dir.parent / "data-sources" / "italian"
if not data_dir.exists():
print(f"Error: Italian data directory not found: {data_dir}")
return
print(f"Loading extracted files from {data_dir}...")
sources = load_extracted_files(data_dir)
if not sources:
print("No extracted files found.")
return
print(f"Found sources: {', '.join(sources.keys())}")
stats = compute_statistics(sources)
print_report(stats, sources)
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,149 +0,0 @@
"""
scripts/extract-omw-data.py
Extract ALL synsets from Open Multilingual Wordnet (OMW) for every supported
language and POS. Replaces extract-en-it-nouns.py.
Output: one JSON file per POS, written to packages/db/src/data/datafiles/
omw-noun.json
omw-verb.json
Each file is a JSON array of objects matching SynsetRecord in seed.ts:
{
"source_id": "ili:i12345",
"pos": "noun",
"translations": { "en": ["dog", "canine"], "it": ["cane"] },
"glosses": { "en": ["a domesticated animal..."] }
}
Translations and glosses are absent for a language if that wordnet has no
coverage for the synset the seed script handles sparse data gracefully.
Usage:
python scripts/extract-omw-data.py [output_dir]
output_dir defaults to packages/db/src/data/datafiles/
Prerequisites:
pip install wn
python -c "import wn; wn.download('oewn:2024'); wn.download('omw-it:1.4')"
"""
import json
import sys
from pathlib import Path
import wn
# Mirror constants.ts — update both places if languages or POS change.
SUPPORTED_LANGUAGE_CODES: list[str] = ["en", "it"]
POS_MAP: dict[str, str] = {
"n": "noun",
"v": "verb",
}
def extract_all(output_dir: str = "packages/db/src/data/datafiles/") -> None:
out = Path(output_dir)
out.mkdir(parents=True, exist_ok=True)
# Load one Wordnet object per language up front.
print("Loading wordnets...")
wordnets: dict[str, wn.Wordnet] = {}
for lang in SUPPORTED_LANGUAGE_CODES:
try:
wordnets[lang] = wn.Wordnet(lang=lang)
synset_count = len(wordnets[lang].synsets())
print(f" {lang}: {synset_count:,} total synsets")
except wn.Error as e:
print(f" ERROR loading {lang}: {e}")
print(f" Run: python -c \"import wn; wn.download('omw-{lang}:1.4')\"")
sys.exit(1)
for omw_pos, pos_label in POS_MAP.items():
print(f"\n--- Extracting {pos_label}s (pos='{omw_pos}') ---")
# Collect per-ILI data across all languages.
# Structure: { ili -> { lang -> { "lemmas": [...], "glosses": [...] } } }
by_ili: dict[str, dict[str, dict[str, list[str]]]] = {}
for lang, wnet in wordnets.items():
synsets = wnet.synsets(pos=omw_pos)
covered = 0
for synset in synsets:
ili = synset.ili
if not ili:
continue # skip synsets without an ILI — can't cross-link
covered += 1
if ili not in by_ili:
by_ili[ili] = {}
lemmas = [str(lemma) for lemma in synset.lemmas()]
defns = [d for d in synset.definitions() if d]
by_ili[ili][lang] = {"lemmas": lemmas, "glosses": defns}
print(f" {lang}: {covered:,} {pos_label} synsets with ILI")
# Build output records — sort by ILI for a stable, diffable file.
records: list[dict] = []
for ili in sorted(by_ili.keys()):
lang_data = by_ili[ili]
translations: dict[str, list[str]] = {}
glosses: dict[str, list[str]] = {}
for lang, data in lang_data.items():
if data["lemmas"]:
translations[lang] = data["lemmas"]
if data["glosses"]:
glosses[lang] = data["glosses"]
# Include the record even if only one language has coverage —
# the seed script imports all terms regardless of cross-language overlap.
records.append(
{
"source_id": f"ili:{ili}",
"pos": pos_label,
"translations": translations,
"glosses": glosses,
}
)
output_file = out / f"omw-{pos_label}.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(records, f, indent=2, ensure_ascii=False)
print(f"\nWrote {len(records):,} {pos_label} synsets → {output_file}")
_print_coverage(records, pos_label)
def _print_coverage(records: list[dict], pos_label: str) -> None:
"""Print per-language translation and gloss counts."""
lang_stats: dict[str, dict[str, int]] = {}
for lang in SUPPORTED_LANGUAGE_CODES:
lang_stats[lang] = {"translations": 0, "glosses": 0}
for r in records:
for lang, lemmas in r["translations"].items():
if lang in lang_stats:
lang_stats[lang]["translations"] += len(lemmas)
for lang, gloss_list in r["glosses"].items():
if lang in lang_stats:
lang_stats[lang]["glosses"] += len(gloss_list)
print(f"\nCoverage for {pos_label}s:")
for lang, counts in lang_stats.items():
t = counts["translations"]
g = counts["glosses"]
avg_t = t / len(records) if records else 0
print(f" {lang}: {t:,} translations ({avg_t:.1f} avg/synset), {g:,} glosses")
# Sample output
print(f"\nSample {pos_label}s (records 10001004):")
for r in records[1000:1005]:
print(f" {r['source_id']}: {r['translations']}")
if __name__ == "__main__":
output_dir = sys.argv[1] if len(sys.argv) > 1 else "packages/db/src/data/datafiles/"
extract_all(output_dir)

View file

@ -1,96 +0,0 @@
#!/usr/bin/env python3
"""
scripts/extraction-scripts/english/extract-cefrj-csv.py
Extracts CEFR data from cefrj.csv (CEFR-J vocabulary profile).
Filters for supported POS (noun, verb).
Input: scripts/data-sources/english/cefrj.csv
Output: scripts/data-sources/english/cefrj-extracted.json
Output format (normalized):
[
{ "word": "ability", "pos": "noun", "cefr": "A2", "source": "cefrj" }
]
"""
import csv
import json
from pathlib import Path
# Constants matching @lila/shared
SUPPORTED_POS = ["noun", "verb"]
CEFR_LEVELS = ["A1", "A2", "B1", "B2", "C1", "C2"]
# Paths (relative to project root)
INPUT_FILE = Path("scripts/data-sources/english/cefrj.csv")
OUTPUT_FILE = Path("scripts/data-sources/english/cefrj-extracted.json")
def extract() -> None:
print(f"Reading: {INPUT_FILE}")
records = []
skipped_pos = 0
skipped_invalid_cefr = 0
skipped_empty_word = 0
total_rows = 0
with open(INPUT_FILE, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
total_rows += 1
# Filter: must have supported POS
pos = row.get("pos", "").lower().strip()
if pos not in SUPPORTED_POS:
skipped_pos += 1
continue
# Filter: must have valid CEFR level
cefr = row.get("CEFR", "").upper().strip()
if cefr not in CEFR_LEVELS:
skipped_invalid_cefr += 1
continue
# Normalize word
word = row.get("headword", "").lower().strip()
if not word:
skipped_empty_word += 1
continue
record = {"word": word, "pos": pos, "cefr": cefr, "source": "cefrj"}
records.append(record)
# Write output
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(records, f, indent=2, ensure_ascii=False)
# Stats
noun_count = sum(1 for r in records if r["pos"] == "noun")
verb_count = sum(1 for r in records if r["pos"] == "verb")
cefr_distribution = {}
for level in CEFR_LEVELS:
count = sum(1 for r in records if r["cefr"] == level)
if count > 0:
cefr_distribution[level] = count
print(f"\nTotal rows in CSV: {total_rows}")
print(f"Extracted: {len(records)} records")
print(f" - Nouns: {noun_count}")
print(f" - Verbs: {verb_count}")
print("\nCEFR distribution:")
for level in CEFR_LEVELS:
if level in cefr_distribution:
print(f" - {level}: {cefr_distribution[level]}")
print("\nSkipped:")
print(f" - Unsupported POS: {skipped_pos}")
print(f" - Invalid CEFR: {skipped_invalid_cefr}")
print(f" - Empty word: {skipped_empty_word}")
print(f"\nOutput: {OUTPUT_FILE}")
if __name__ == "__main__":
extract()

View file

@ -1,107 +0,0 @@
#!/usr/bin/env python3
"""
scripts/extraction-scripts/english/extract-en_m3.py
Extracts CEFR data from en_m3.xls (M3 wordlist).
"""
import json
from pathlib import Path
import xlrd
# Constants matching @lila/shared
SUPPORTED_POS = ["noun", "verb"]
CEFR_LEVELS = ["A1", "A2", "B1", "B2", "C1", "C2"]
# POS mapping (case-insensitive)
POS_MAP = {
"noun": "noun",
"verb": "verb",
}
# Paths (relative to project root)
INPUT_FILE = Path("scripts/data-sources/english/en_m3.xls")
OUTPUT_FILE = Path("scripts/data-sources/english/en_m3-extracted.json")
def extract() -> None:
print(f"Reading: {INPUT_FILE}")
records = []
skipped_pos = 0
skipped_invalid_cefr = 0
skipped_empty_word = 0
total_rows = 0
wb = xlrd.open_workbook(INPUT_FILE)
ws = wb.sheet_by_index(0)
# Skip header row, start from row 1
for row_idx in range(1, ws.nrows):
total_rows += 1
# Unpack columns: ID number, Word, Part of Speech, CEFR, Points
word_raw = ws.cell_value(row_idx, 1)
pos_raw = ws.cell_value(row_idx, 2)
cefr_raw = ws.cell_value(row_idx, 3)
# Normalize POS (case-insensitive)
pos = str(pos_raw).lower().strip() if pos_raw else ""
if pos not in POS_MAP:
skipped_pos += 1
continue
pos = POS_MAP[pos]
# Normalize CEFR - handle smart quotes
cefr_str = str(cefr_raw).strip() if cefr_raw else ""
# Strip Unicode smart quotes (U+201C and U+201D)
cefr_str = cefr_str.strip("\u201c\u201d")
cefr = cefr_str.upper()
if cefr not in CEFR_LEVELS:
skipped_invalid_cefr += 1
continue
# Normalize word
word = str(word_raw).lower().strip() if word_raw else ""
if not word:
skipped_empty_word += 1
continue
record = {"word": word, "pos": pos, "cefr": cefr, "source": "en_m3"}
records.append(record)
# Write output
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(records, f, indent=2, ensure_ascii=False)
# Stats
noun_count = sum(1 for r in records if r["pos"] == "noun")
verb_count = sum(1 for r in records if r["pos"] == "verb")
cefr_distribution = {}
for level in CEFR_LEVELS:
count = sum(1 for r in records if r["cefr"] == level)
if count > 0:
cefr_distribution[level] = count
print(f"\nTotal rows in XLS: {total_rows}")
print(f"Extracted: {len(records)} records")
print(f" - Nouns: {noun_count}")
print(f" - Verbs: {verb_count}")
print("\nCEFR distribution:")
for level in CEFR_LEVELS:
if level in cefr_distribution:
print(f" - {level}: {cefr_distribution[level]}")
print("\nSkipped:")
print(f" - Unsupported POS: {skipped_pos}")
print(f" - Invalid CEFR: {skipped_invalid_cefr}")
print(f" - Empty word: {skipped_empty_word}")
print(f"\nOutput: {OUTPUT_FILE}")
if __name__ == "__main__":
extract()

View file

@ -1,90 +0,0 @@
#!/usr/bin/env python3
"""
scripts/extraction-scripts/english/extract-octanove.py
Extracts CEFR data from octanove.csv (Octanove vocabulary profile).
Filters for supported POS (noun, verb).
Input: scripts/data-sources/english/octanove.csv
Output: scripts/data-sources/english/octanove-extracted.json
Output format (normalized):
[
{ "word": "example", "pos": "noun", "cefr": "C1", "source": "octanove" }
]
"""
import csv
import json
from pathlib import Path
# Constants matching @lila/shared
SUPPORTED_POS = ["noun", "verb"]
CEFR_LEVELS = ["A1", "A2", "B1", "B2", "C1", "C2"]
# Paths (relative to project root)
INPUT_FILE = Path("scripts/data-sources/english/octanove.csv")
OUTPUT_FILE = Path("scripts/data-sources/english/octanove-extracted.json")
def extract() -> None:
print(f"Reading: {INPUT_FILE}")
records = []
skipped_pos = 0
skipped_invalid_cefr = 0
skipped_empty_word = 0
total_rows = 0
with open(INPUT_FILE, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
total_rows += 1
# Filter: must have supported POS
pos = row.get("pos", "").lower().strip()
if pos not in SUPPORTED_POS:
skipped_pos += 1
continue
# Filter: must have valid CEFR level
cefr = row.get("CEFR", "").upper().strip()
if cefr not in CEFR_LEVELS:
skipped_invalid_cefr += 1
continue
# Normalize word
word = row.get("headword", "").lower().strip()
if not word:
skipped_empty_word += 1
continue
record = {"word": word, "pos": pos, "cefr": cefr, "source": "octanove"}
records.append(record)
# Write output
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(records, f, indent=2, ensure_ascii=False)
# Stats
noun_count = sum(1 for r in records if r["pos"] == "noun")
verb_count = sum(1 for r in records if r["pos"] == "verb")
cefr_distribution = {}
for level in CEFR_LEVELS:
count = sum(1 for r in records if r["cefr"] == level)
if count > 0:
cefr_distribution[level] = count
print(f"\nTotal rows in CSV: {total_rows}")
print(f"Extracted: {len(records)} records")
print(f" - Nouns: {noun_count}")
print(f" - Verbs: {verb_count}")
print("\nCEFR distribution:")
for level in CEFR_LEVELS:
if level in cefr_distribution:
print(f" - {level}: {cefr_distribution[level]}")
print("\nSkipped:")
print(f" - Unsupported POS: {skipped_pos}")
print(f" - Invalid CEFR: {skipped_invalid_cefr}")
print(f" - Empty word: {skipped_empty_word}")
print(f"\nOutput: {OUTPUT_FILE}")
if __name__ == "__main__":
extract()

View file

@ -1,99 +0,0 @@
#!/usr/bin/env python3
"""
scripts/extraction-scripts/english/extract-random-json.py
Extracts CEFR data from random.json (English flashcard source).
Filters for useful_for_flashcard=true and supported POS (noun, verb).
Input: scripts/data-sources/english/random.json
Output: scripts/data-sources/english/random-extracted.json
Output format (normalized):
[
{ "word": "be", "pos": "verb", "cefr": "A1", "source": "random" }
]
"""
import json
from pathlib import Path
# Constants matching @lila/shared
SUPPORTED_POS = ["noun", "verb"]
CEFR_LEVELS = ["A1", "A2", "B1", "B2", "C1", "C2"]
# Paths (relative to project root)
INPUT_FILE = Path("scripts/data-sources/english/random.json")
OUTPUT_FILE = Path("scripts/data-sources/english/random-extracted.json")
def extract() -> None:
print(f"Reading: {INPUT_FILE}")
with open(INPUT_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
records = []
skipped_pos = 0
skipped_not_useful = 0
skipped_invalid_cefr = 0
skipped_empty_word = 0
for entry in data:
# Filter: must be useful for flashcard
if not entry.get("useful_for_flashcard", False):
skipped_not_useful += 1
continue
# Filter: must have supported POS
pos = entry.get("pos", "").lower().strip()
if pos not in SUPPORTED_POS:
skipped_pos += 1
continue
# Filter: must have valid CEFR level
cefr = entry.get("cefr_level", "").upper().strip()
if cefr not in CEFR_LEVELS:
skipped_invalid_cefr += 1
continue
# Normalize word
word = entry.get("word", "").lower().strip()
if not word:
skipped_empty_word += 1
continue
record = {"word": word, "pos": pos, "cefr": cefr, "source": "random"}
records.append(record)
# Write output
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(records, f, indent=2, ensure_ascii=False)
# Stats
noun_count = sum(1 for r in records if r["pos"] == "noun")
verb_count = sum(1 for r in records if r["pos"] == "verb")
cefr_distribution = {}
for level in CEFR_LEVELS:
count = sum(1 for r in records if r["cefr"] == level)
if count > 0:
cefr_distribution[level] = count
print(f"\nExtracted: {len(records)} records")
print(f" - Nouns: {noun_count}")
print(f" - Verbs: {verb_count}")
print("\nCEFR distribution:")
for level in CEFR_LEVELS:
if level in cefr_distribution:
print(f" - {level}: {cefr_distribution[level]}")
print("\nSkipped:")
print(f" - Not useful for flashcard: {skipped_not_useful}")
print(f" - Unsupported POS: {skipped_pos}")
print(f" - Invalid CEFR: {skipped_invalid_cefr}")
print(f" - Empty word: {skipped_empty_word}")
print(f"\nOutput: {OUTPUT_FILE}")
if __name__ == "__main__":
extract()

View file

@ -1,114 +0,0 @@
#!/usr/bin/env python3
"""
scripts/extraction-scripts/italian/extract-it_m3.py
Extracts CEFR data from it_m3.xls (Italian M3 wordlist).
"""
import json
from pathlib import Path
import xlrd
# Constants matching @glossa/shared
SUPPORTED_POS = ["noun", "verb"]
CEFR_LEVELS = ["A1", "A2", "B1", "B2", "C1", "C2"]
# POS mapping (case-insensitive) based on observed abbreviations
POS_MAP = {
"n": "noun", # nome
"v": "verb", # verbo
}
# Column indices (0-based) verified from sample
WORD_COL = 0 # Lemma
POS_COL = 1 # Pos
CEFR_COL = 2 # Points (CEFR level)
# Paths (relative to project root)
INPUT_FILE = Path("scripts/data-sources/italian/it_m3.xls")
OUTPUT_FILE = Path("scripts/data-sources/italian/it_m3-extracted.json")
def extract() -> None:
print(f"Reading: {INPUT_FILE}")
records = []
skipped_pos = 0
skipped_invalid_cefr = 0
skipped_empty_word = 0
total_rows = 0
wb = xlrd.open_workbook(INPUT_FILE)
ws = wb.sheet_by_index(0)
# Skip header row, start from row 1
for row_idx in range(1, ws.nrows):
total_rows += 1
word_raw = ws.cell_value(row_idx, WORD_COL)
pos_raw = ws.cell_value(row_idx, POS_COL)
cefr_raw = ws.cell_value(row_idx, CEFR_COL)
# Normalize POS (case-insensitive)
pos = str(pos_raw).lower().strip() if pos_raw else ""
if pos not in POS_MAP:
skipped_pos += 1
continue
pos = POS_MAP[pos]
# Normalize CEFR - handle smart quotes
cefr_str = str(cefr_raw).strip() if cefr_raw else ""
cefr_str = cefr_str.strip("\u201c\u201d") # strip Unicode smart quotes
cefr = cefr_str.upper()
if cefr not in CEFR_LEVELS:
skipped_invalid_cefr += 1
continue
# Normalize word handle multiple forms like "il, lo, la" → take first?
word_raw_str = str(word_raw).strip() if word_raw else ""
# If word contains comma, take first part (e.g., "il, lo, la" → "il")
# But this may lose variants; consider keeping as is or processing differently.
# For consistency, we'll keep the full string and lowercase it.
word = word_raw_str.lower()
if not word:
skipped_empty_word += 1
continue
record = {"word": word, "pos": pos, "cefr": cefr, "source": "it_m3"}
records.append(record)
# Write output
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(records, f, indent=2, ensure_ascii=False)
# Stats
noun_count = sum(1 for r in records if r["pos"] == "noun")
verb_count = sum(1 for r in records if r["pos"] == "verb")
cefr_distribution = {}
for level in CEFR_LEVELS:
count = sum(1 for r in records if r["cefr"] == level)
if count > 0:
cefr_distribution[level] = count
print(f"\nTotal rows in XLS: {total_rows}")
print(f"Extracted: {len(records)} records")
print(f" - Nouns: {noun_count}")
print(f" - Verbs: {verb_count}")
print(f"\nCEFR distribution:")
for level in CEFR_LEVELS:
if level in cefr_distribution:
print(f" - {level}: {cefr_distribution[level]}")
print(f"\nSkipped:")
print(f" - Unsupported POS: {skipped_pos}")
print(f" - Invalid CEFR: {skipped_invalid_cefr}")
print(f" - Empty word: {skipped_empty_word}")
print(f"\nOutput: {OUTPUT_FILE}")
if __name__ == "__main__":
extract()

View file

@ -1,91 +0,0 @@
#!/usr/bin/env python3
"""
scripts/extraction-scripts/italian/extract-italian-json.py
Extracts CEFR data from italian.json (Italian flashcard source).
Filters for useful_for_flashcard=true and supported POS (noun, verb).
"""
import json
from pathlib import Path
# Constants matching @glossa/shared
SUPPORTED_POS = ["noun", "verb"]
CEFR_LEVELS = ["A1", "A2", "B1", "B2", "C1", "C2"]
# Paths (relative to project root)
INPUT_FILE = Path("scripts/data-sources/italian/italian.json")
OUTPUT_FILE = Path("scripts/data-sources/italian/italian-extracted.json")
def extract() -> None:
print(f"Reading: {INPUT_FILE}")
with open(INPUT_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
records = []
skipped_pos = 0
skipped_not_useful = 0
skipped_invalid_cefr = 0
skipped_empty_word = 0
for entry in data:
# Filter: must be useful for flashcard
if not entry.get("useful_for_flashcard", False):
skipped_not_useful += 1
continue
# Filter: must have supported POS
pos = entry.get("pos", "").lower().strip()
if pos not in SUPPORTED_POS:
skipped_pos += 1
continue
# Filter: must have valid CEFR level
cefr = entry.get("cefr_level", "").upper().strip()
if cefr not in CEFR_LEVELS:
skipped_invalid_cefr += 1
continue
# Normalize word
word = entry.get("word", "").lower().strip()
if not word:
skipped_empty_word += 1
continue
record = {"word": word, "pos": pos, "cefr": cefr, "source": "italian"}
records.append(record)
# Write output
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(records, f, indent=2, ensure_ascii=False)
# Stats
noun_count = sum(1 for r in records if r["pos"] == "noun")
verb_count = sum(1 for r in records if r["pos"] == "verb")
cefr_distribution = {}
for level in CEFR_LEVELS:
count = sum(1 for r in records if r["cefr"] == level)
if count > 0:
cefr_distribution[level] = count
print(f"\nExtracted: {len(records)} records")
print(f" - Nouns: {noun_count}")
print(f" - Verbs: {verb_count}")
print("\nCEFR distribution:")
for level in CEFR_LEVELS:
if level in cefr_distribution:
print(f" - {level}: {cefr_distribution[level]}")
print("\nSkipped:")
print(f" - Not useful for flashcard: {skipped_not_useful}")
print(f" - Unsupported POS: {skipped_pos}")
print(f" - Invalid CEFR: {skipped_invalid_cefr}")
print(f" - Empty word: {skipped_empty_word}")
print(f"\nOutput: {OUTPUT_FILE}")
if __name__ == "__main__":
extract()

View file

@ -1,58 +0,0 @@
async function main() {
// Step 1: start a game
const startResponse = await fetch("http://localhost:3000/api/v1/game/start", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
source_language: "en",
target_language: "it",
pos: "noun",
difficulty: "easy",
rounds: "3",
}),
});
const game = await startResponse.json();
console.log("Game started:", JSON.stringify(game, null, 2));
// Step 2: answer each question (always pick option 0)
for (const question of game.data.questions) {
const answerResponse = await fetch(
"http://localhost:3000/api/v1/game/answer",
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
sessionId: game.data.sessionId,
questionId: question.questionId,
selectedOptionId: 0,
}),
},
);
const result = await answerResponse.json();
console.log("Raw result:", JSON.stringify(result, null, 2));
console.log(
`${question.prompt}: ${result.data.isCorrect ? "✓" : "✗"} (picked ${0}, correct was ${result.data.correctOptionId})`,
);
}
const badRequest = await fetch("http://localhost:3000/api/v1/game/start", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ source_language: "en" }),
});
console.log("400 test:", badRequest.status, await badRequest.json());
// Send a valid shape but a session that doesn't exist
const notFound = await fetch("http://localhost:3000/api/v1/game/answer", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
sessionId: "00000000-0000-0000-0000-000000000000",
questionId: "00000000-0000-0000-0000-000000000000",
selectedOptionId: 0,
}),
});
console.log("404 test:", notFound.status, await notFound.json());
}
main();

View file

@ -1,159 +0,0 @@
#!/usr/bin/env python3
"""
CEFR Data Pipeline - Stage 3: English Merge
Merges extracted JSON files for English into an authoritative dataset.
"""
import json
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Tuple
# Supported CEFR levels and difficulty mapping
CEFR_LEVELS = {"A1", "A2", "B1", "B2", "C1", "C2"}
DIFFICULTY_MAP = {
"A1": "easy",
"A2": "easy",
"B1": "intermediate",
"B2": "intermediate",
"C1": "hard",
"C2": "hard",
}
# Source priority order (from lowest to highest priority)
# Higher index = higher authority when conflicts occur
PRIORITY_ORDER = ["random", "octanove", "cefrj", "en_m3"]
def load_extracted_files(data_dir: Path) -> Dict[str, List[dict]]:
"""Load all *-extracted.json files from the English data directory."""
sources = {}
for file_path in data_dir.glob("*-extracted.json"):
source_name = file_path.stem.replace("-extracted", "")
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
sources[source_name] = data
else:
print(f"Warning: {file_path} does not contain a list, skipping.")
return sources
def normalize_entry(entry: dict) -> Tuple[str, str]:
"""Return (word, pos) key for merging."""
return entry["word"].lower().strip(), entry["pos"].lower().strip()
def get_source_priority(source_name: str) -> int:
"""Return priority index for a source (higher = more authoritative)."""
try:
return PRIORITY_ORDER.index(source_name)
except ValueError:
# If source not in list, assign lowest priority
return -1
def merge_entries(sources: Dict[str, List[dict]]) -> List[dict]:
"""Merge entries from multiple sources, resolving conflicts by priority."""
grouped = defaultdict(list)
for src_name, entries in sources.items():
for entry in entries:
key = normalize_entry(entry)
grouped[key].append((src_name, entry["cefr"], entry))
merged = []
conflicts_resolved = 0
total_multi_source = 0
for (word, pos), src_entries in grouped.items():
if len(src_entries) == 1:
src_name, cefr, original = src_entries[0]
final_cefr = cefr
contributing_sources = [src_name]
else:
total_multi_source += 1
sorted_entries = sorted(
src_entries, key=lambda x: get_source_priority(x[0]), reverse=True
)
highest_src, highest_cefr, _ = sorted_entries[0]
all_cefrs = {e[1] for e in src_entries}
if len(all_cefrs) > 1:
conflicts_resolved += 1
final_cefr = highest_cefr
contributing_sources = [e[0] for e in src_entries]
difficulty = DIFFICULTY_MAP.get(final_cefr, "unknown")
merged.append(
{
"word": word,
"pos": pos,
"cefr": final_cefr,
"difficulty": difficulty,
"sources": sorted(contributing_sources),
}
)
print(f"Merge statistics:")
print(f" Total unique entries: {len(merged)}")
print(f" Entries with multiple sources: {total_multi_source}")
print(f" Conflicts resolved by priority: {conflicts_resolved}")
return merged
def print_summary(merged: List[dict]):
"""Print distribution of CEFR levels and difficulty in final dataset."""
cefr_counts = defaultdict(int)
diff_counts = defaultdict(int)
for entry in merged:
cefr_counts[entry["cefr"]] += 1
diff_counts[entry["difficulty"]] += 1
print("\n📊 Final CEFR distribution:")
for level in sorted(CEFR_LEVELS):
count = cefr_counts.get(level, 0)
if count:
print(f" {level}: {count}")
print("\n📊 Final difficulty distribution:")
for diff in ["easy", "intermediate", "hard"]:
count = diff_counts.get(diff, 0)
print(f" {diff}: {count}")
def main():
script_dir = Path(__file__).parent
data_dir = script_dir.parent / "data-sources" / "english"
output_dir = script_dir.parent / "datafiles"
output_file = output_dir / "english-merged.json"
if not data_dir.exists():
print(f"Error: English data directory not found: {data_dir}")
return
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Loading extracted files from {data_dir}...")
sources = load_extracted_files(data_dir)
if not sources:
print("No extracted files found.")
return
print(f"Found sources: {', '.join(sources.keys())}")
print(f"Priority order (lowest to highest): {PRIORITY_ORDER}")
merged = merge_entries(sources)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(merged, f, indent=2, ensure_ascii=False)
print(f"\n✅ Merged dataset written to: {output_file}")
print_summary(merged)
if __name__ == "__main__":
main()

View file

@ -1,159 +0,0 @@
#!/usr/bin/env python3
"""
CEFR Data Pipeline - Stage 3: Italian Merge
Merges extracted JSON files for Italian into an authoritative dataset.
"""
import json
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Tuple
# Supported CEFR levels and difficulty mapping
CEFR_LEVELS = {"A1", "A2", "B1", "B2", "C1", "C2"}
DIFFICULTY_MAP = {
"A1": "easy",
"A2": "easy",
"B1": "intermediate",
"B2": "intermediate",
"C1": "hard",
"C2": "hard",
}
# Source priority order (from lowest to highest priority)
# Higher index = higher authority when conflicts occur
PRIORITY_ORDER = ["italian", "it_m3"]
def load_extracted_files(data_dir: Path) -> Dict[str, List[dict]]:
"""Load all *-extracted.json files from the Italian data directory."""
sources = {}
for file_path in data_dir.glob("*-extracted.json"):
source_name = file_path.stem.replace("-extracted", "")
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
sources[source_name] = data
else:
print(f"Warning: {file_path} does not contain a list, skipping.")
return sources
def normalize_entry(entry: dict) -> Tuple[str, str]:
"""Return (word, pos) key for merging."""
return entry["word"].lower().strip(), entry["pos"].lower().strip()
def get_source_priority(source_name: str) -> int:
"""Return priority index for a source (higher = more authoritative)."""
try:
return PRIORITY_ORDER.index(source_name)
except ValueError:
# If source not in list, assign lowest priority
return -1
def merge_entries(sources: Dict[str, List[dict]]) -> List[dict]:
"""Merge entries from multiple sources, resolving conflicts by priority."""
grouped = defaultdict(list)
for src_name, entries in sources.items():
for entry in entries:
key = normalize_entry(entry)
grouped[key].append((src_name, entry["cefr"], entry))
merged = []
conflicts_resolved = 0
total_multi_source = 0
for (word, pos), src_entries in grouped.items():
if len(src_entries) == 1:
src_name, cefr, original = src_entries[0]
final_cefr = cefr
contributing_sources = [src_name]
else:
total_multi_source += 1
sorted_entries = sorted(
src_entries, key=lambda x: get_source_priority(x[0]), reverse=True
)
highest_src, highest_cefr, _ = sorted_entries[0]
all_cefrs = {e[1] for e in src_entries}
if len(all_cefrs) > 1:
conflicts_resolved += 1
final_cefr = highest_cefr
contributing_sources = [e[0] for e in src_entries]
difficulty = DIFFICULTY_MAP.get(final_cefr, "unknown")
merged.append(
{
"word": word,
"pos": pos,
"cefr": final_cefr,
"difficulty": difficulty,
"sources": sorted(contributing_sources),
}
)
print(f"Merge statistics:")
print(f" Total unique entries: {len(merged)}")
print(f" Entries with multiple sources: {total_multi_source}")
print(f" Conflicts resolved by priority: {conflicts_resolved}")
return merged
def print_summary(merged: List[dict]):
"""Print distribution of CEFR levels and difficulty in final dataset."""
cefr_counts = defaultdict(int)
diff_counts = defaultdict(int)
for entry in merged:
cefr_counts[entry["cefr"]] += 1
diff_counts[entry["difficulty"]] += 1
print("\n📊 Final CEFR distribution:")
for level in sorted(CEFR_LEVELS):
count = cefr_counts.get(level, 0)
if count:
print(f" {level}: {count}")
print("\n📊 Final difficulty distribution:")
for diff in ["easy", "intermediate", "hard"]:
count = diff_counts.get(diff, 0)
print(f" {diff}: {count}")
def main():
script_dir = Path(__file__).parent
data_dir = script_dir.parent / "data-sources" / "italian"
output_dir = script_dir.parent / "datafiles"
output_file = output_dir / "italian-merged.json"
if not data_dir.exists():
print(f"Error: Italian data directory not found: {data_dir}")
return
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Loading extracted files from {data_dir}...")
sources = load_extracted_files(data_dir)
if not sources:
print("No extracted files found.")
return
print(f"Found sources: {', '.join(sources.keys())}")
print(f"Priority order (lowest to highest): {PRIORITY_ORDER}")
merged = merge_entries(sources)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(merged, f, indent=2, ensure_ascii=False)
print(f"\n✅ Merged dataset written to: {output_file}")
print_summary(merged)
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,2 +0,0 @@
wn==1.1.0
openpyxl==3.1.5