feat(scripts): add Italian CEFR data pipeline

- Add extractors for Italian sources: it_m3.xls and italian.json
- Add comparison script (compare-italian.py) to report source overlaps and conflicts
- Add merge script (merge-italian-json.py) with priority order ['italian', 'it_m3']
- Output authoritative dataset to datafiles/italian-merged.json
- Update README to document both English and Italian pipelines
This commit is contained in:
lila 2026-04-08 18:32:03 +02:00
parent 59152950d6
commit 3374bd8b20
9 changed files with 208535 additions and 26 deletions

View file

@ -0,0 +1,159 @@
#!/usr/bin/env python3
"""
CEFR Data Pipeline - Stage 3: Italian Merge
Merges extracted JSON files for Italian into an authoritative dataset.
"""
import json
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Tuple
# Supported CEFR levels and difficulty mapping
CEFR_LEVELS = {"A1", "A2", "B1", "B2", "C1", "C2"}
DIFFICULTY_MAP = {
"A1": "easy",
"A2": "easy",
"B1": "intermediate",
"B2": "intermediate",
"C1": "hard",
"C2": "hard",
}
# Source priority order (from lowest to highest priority)
# Higher index = higher authority when conflicts occur
PRIORITY_ORDER = ["italian", "it_m3"]
def load_extracted_files(data_dir: Path) -> Dict[str, List[dict]]:
"""Load all *-extracted.json files from the Italian data directory."""
sources = {}
for file_path in data_dir.glob("*-extracted.json"):
source_name = file_path.stem.replace("-extracted", "")
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
sources[source_name] = data
else:
print(f"Warning: {file_path} does not contain a list, skipping.")
return sources
def normalize_entry(entry: dict) -> Tuple[str, str]:
"""Return (word, pos) key for merging."""
return entry["word"].lower().strip(), entry["pos"].lower().strip()
def get_source_priority(source_name: str) -> int:
"""Return priority index for a source (higher = more authoritative)."""
try:
return PRIORITY_ORDER.index(source_name)
except ValueError:
# If source not in list, assign lowest priority
return -1
def merge_entries(sources: Dict[str, List[dict]]) -> List[dict]:
"""Merge entries from multiple sources, resolving conflicts by priority."""
grouped = defaultdict(list)
for src_name, entries in sources.items():
for entry in entries:
key = normalize_entry(entry)
grouped[key].append((src_name, entry["cefr"], entry))
merged = []
conflicts_resolved = 0
total_multi_source = 0
for (word, pos), src_entries in grouped.items():
if len(src_entries) == 1:
src_name, cefr, original = src_entries[0]
final_cefr = cefr
contributing_sources = [src_name]
else:
total_multi_source += 1
sorted_entries = sorted(
src_entries, key=lambda x: get_source_priority(x[0]), reverse=True
)
highest_src, highest_cefr, _ = sorted_entries[0]
all_cefrs = {e[1] for e in src_entries}
if len(all_cefrs) > 1:
conflicts_resolved += 1
final_cefr = highest_cefr
contributing_sources = [e[0] for e in src_entries]
difficulty = DIFFICULTY_MAP.get(final_cefr, "unknown")
merged.append(
{
"word": word,
"pos": pos,
"cefr": final_cefr,
"difficulty": difficulty,
"sources": sorted(contributing_sources),
}
)
print(f"Merge statistics:")
print(f" Total unique entries: {len(merged)}")
print(f" Entries with multiple sources: {total_multi_source}")
print(f" Conflicts resolved by priority: {conflicts_resolved}")
return merged
def print_summary(merged: List[dict]):
"""Print distribution of CEFR levels and difficulty in final dataset."""
cefr_counts = defaultdict(int)
diff_counts = defaultdict(int)
for entry in merged:
cefr_counts[entry["cefr"]] += 1
diff_counts[entry["difficulty"]] += 1
print("\n📊 Final CEFR distribution:")
for level in sorted(CEFR_LEVELS):
count = cefr_counts.get(level, 0)
if count:
print(f" {level}: {count}")
print("\n📊 Final difficulty distribution:")
for diff in ["easy", "intermediate", "hard"]:
count = diff_counts.get(diff, 0)
print(f" {diff}: {count}")
def main():
script_dir = Path(__file__).parent
data_dir = script_dir.parent / "data-sources" / "italian"
output_dir = script_dir.parent / "datafiles"
output_file = output_dir / "italian-merged.json"
if not data_dir.exists():
print(f"Error: Italian data directory not found: {data_dir}")
return
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Loading extracted files from {data_dir}...")
sources = load_extracted_files(data_dir)
if not sources:
print("No extracted files found.")
return
print(f"Found sources: {', '.join(sources.keys())}")
print(f"Priority order (lowest to highest): {PRIORITY_ORDER}")
merged = merge_entries(sources)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(merged, f, indent=2, ensure_ascii=False)
print(f"\n✅ Merged dataset written to: {output_file}")
print_summary(merged)
if __name__ == "__main__":
main()