lila/scripts/comparison-scripts/compare-italian.py
lila 3374bd8b20 feat(scripts): add Italian CEFR data pipeline
- Add extractors for Italian sources: it_m3.xls and italian.json
- Add comparison script (compare-italian.py) to report source overlaps and conflicts
- Add merge script (merge-italian-json.py) with priority order ['italian', 'it_m3']
- Output authoritative dataset to datafiles/italian-merged.json
- Update README to document both English and Italian pipelines
2026-04-08 18:32:03 +02:00

166 lines
5.6 KiB
Python

#!/usr/bin/env python3
"""
CEFR Data Pipeline - Stage 2: Italian Comparison
Compares extracted JSON files for Italian and reports agreements and conflicts.
"""
import json
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Tuple
# Supported CEFR levels
CEFR_LEVELS = {"A1", "A2", "B1", "B2", "C1", "C2"}
def load_extracted_files(data_dir: Path) -> Dict[str, List[dict]]:
"""Load all *-extracted.json files from the Italian data directory."""
sources = {}
for file_path in data_dir.glob("*-extracted.json"):
source_name = file_path.stem.replace("-extracted", "")
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
sources[source_name] = data
else:
print(f"Warning: {file_path} does not contain a list, skipping.")
return sources
def normalize_entry(entry: dict) -> Tuple[str, str]:
"""Return (word, pos) key for comparison."""
return entry["word"].lower().strip(), entry["pos"].lower().strip()
def compute_statistics(sources: Dict[str, List[dict]]) -> dict:
"""Compute overlap, agreement, and conflict statistics."""
# Per-source counts by CEFR level
source_counts = {}
for src, entries in sources.items():
cefr_counts = defaultdict(int)
for e in entries:
cefr = e.get("cefr", "UNKNOWN")
cefr_counts[cefr] += 1
source_counts[src] = dict(cefr_counts)
# Build word->pos->sources and CEFR assignments
word_map = defaultdict(lambda: defaultdict(dict))
for src, entries in sources.items():
for e in entries:
key = normalize_entry(e)
word_map[key][src] = e["cefr"]
# Compute overlaps, agreements, conflicts
total_entries = sum(len(e) for e in sources.values())
unique_words = len(word_map)
overlap_stats = defaultdict(int)
agreement_count = 0
conflict_count = 0
conflict_details = []
for key, src_cefr_map in word_map.items():
num_sources = len(src_cefr_map)
overlap_stats[num_sources] += 1
if num_sources > 1:
cefr_values = set(src_cefr_map.values())
if len(cefr_values) == 1:
agreement_count += 1
else:
conflict_count += 1
conflict_details.append(
{"word": key[0], "pos": key[1], "assignments": dict(src_cefr_map)}
)
return {
"source_counts": source_counts,
"total_entries": total_entries,
"unique_words": unique_words,
"overlap_distribution": dict(overlap_stats),
"agreements": agreement_count,
"conflicts": conflict_count,
"conflict_details": conflict_details,
}
def print_report(stats: dict, sources: Dict[str, List[dict]]):
"""Print formatted comparison report."""
print(f"\n{'=' * 60}")
print("CEFR COMPARISON REPORT - ITALIAN")
print(f"{'=' * 60}")
# Source entry counts
print("\n📊 ENTRIES PER SOURCE AND CEFR LEVEL")
print("-" * 50)
for src, counts in stats["source_counts"].items():
total = sum(counts.values())
print(f"\n{src}: {total} total entries")
for level in CEFR_LEVELS:
cnt = counts.get(level, 0)
if cnt > 0:
print(f" {level}: {cnt}")
# Show non-standard levels
for level, cnt in counts.items():
if level not in CEFR_LEVELS and level != "UNKNOWN":
print(f" {level}: {cnt} (non-standard)")
# Overlap statistics
print("\n🔄 OVERLAP BETWEEN SOURCES")
print("-" * 50)
print(f"Total unique (word, POS) combinations: {stats['unique_words']}")
print(f"Total entries across all sources: {stats['total_entries']}")
overlap = stats["overlap_distribution"]
for n_sources in sorted(overlap.keys()):
count = overlap[n_sources]
pct = (count / stats["unique_words"]) * 100
print(f"Words appearing in {n_sources} source(s): {count} ({pct:.1f}%)")
# Agreement and conflicts
print("\n⚖️ AGREEMENT / CONFLICT SUMMARY")
print("-" * 50)
print(f"Words with >1 source: {stats['agreements'] + stats['conflicts']}")
print(f" ✅ Agreements (same CEFR): {stats['agreements']}")
print(f" ❌ Conflicts (different CEFR): {stats['conflicts']}")
if stats["conflicts"] > 0:
agreement_rate = (
stats["agreements"] / (stats["agreements"] + stats["conflicts"])
) * 100
print(f" Agreement rate: {agreement_rate:.1f}%")
print("\n📋 CONFLICT DETAILS (first 10 shown):")
for i, conflict in enumerate(stats["conflict_details"][:10]):
print(f" {i + 1}. {conflict['word']} ({conflict['pos']})")
for src, cefr in conflict["assignments"].items():
print(f" {src}: {cefr}")
if len(stats["conflict_details"]) > 10:
print(f" ... and {len(stats['conflict_details']) - 10} more conflicts.")
print(f"\n{'=' * 60}\n")
def main():
# Determine paths
script_dir = Path(__file__).parent
data_dir = script_dir.parent / "data-sources" / "italian"
if not data_dir.exists():
print(f"Error: Italian data directory not found: {data_dir}")
return
print(f"Loading extracted files from {data_dir}...")
sources = load_extracted_files(data_dir)
if not sources:
print("No extracted files found.")
return
print(f"Found sources: {', '.join(sources.keys())}")
stats = compute_statistics(sources)
print_report(stats, sources)
if __name__ == "__main__":
main()