#!/usr/bin/env python3 """ CEFR Data Pipeline - Stage 2: English Comparison Compares extracted JSON files for English and reports agreements and conflicts. """ import json from collections import defaultdict from pathlib import Path from typing import Dict, List, Tuple # Supported CEFR levels CEFR_LEVELS = {"A1", "A2", "B1", "B2", "C1", "C2"} def load_extracted_files(data_dir: Path) -> Dict[str, List[dict]]: """Load all *-extracted.json files from the English data directory.""" sources = {} for file_path in data_dir.glob("*-extracted.json"): source_name = file_path.stem.replace("-extracted", "") with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): sources[source_name] = data else: print(f"Warning: {file_path} does not contain a list, skipping.") return sources def normalize_entry(entry: dict) -> Tuple[str, str]: """Return (word, pos) key for comparison.""" return entry["word"].lower().strip(), entry["pos"].lower().strip() def compute_statistics(sources: Dict[str, List[dict]]) -> dict: """Compute overlap, agreement, and conflict statistics.""" # Per-source counts by CEFR level source_counts = {} for src, entries in sources.items(): cefr_counts = defaultdict(int) for e in entries: cefr = e.get("cefr", "UNKNOWN") cefr_counts[cefr] += 1 source_counts[src] = dict(cefr_counts) # Build word->pos->sources and CEFR assignments word_map = defaultdict(lambda: defaultdict(dict)) for src, entries in sources.items(): for e in entries: key = normalize_entry(e) word_map[key][src] = e["cefr"] # Compute overlaps, agreements, conflicts total_entries = sum(len(e) for e in sources.values()) unique_words = len(word_map) overlap_stats = defaultdict(int) agreement_count = 0 conflict_count = 0 conflict_details = [] for key, src_cefr_map in word_map.items(): num_sources = len(src_cefr_map) overlap_stats[num_sources] += 1 if num_sources > 1: cefr_values = set(src_cefr_map.values()) if len(cefr_values) == 1: agreement_count += 1 else: conflict_count += 1 conflict_details.append( {"word": key[0], "pos": key[1], "assignments": dict(src_cefr_map)} ) return { "source_counts": source_counts, "total_entries": total_entries, "unique_words": unique_words, "overlap_distribution": dict(overlap_stats), "agreements": agreement_count, "conflicts": conflict_count, "conflict_details": conflict_details, } def print_report(stats: dict, sources: Dict[str, List[dict]]): """Print formatted comparison report.""" print(f"\n{'=' * 60}") print("CEFR COMPARISON REPORT - ENGLISH") print(f"{'=' * 60}") # Source entry counts print("\nšŸ“Š ENTRIES PER SOURCE AND CEFR LEVEL") print("-" * 50) for src, counts in stats["source_counts"].items(): total = sum(counts.values()) print(f"\n{src}: {total} total entries") for level in CEFR_LEVELS: cnt = counts.get(level, 0) if cnt > 0: print(f" {level}: {cnt}") # Show non-standard levels for level, cnt in counts.items(): if level not in CEFR_LEVELS and level != "UNKNOWN": print(f" {level}: {cnt} (non-standard)") # Overlap statistics print("\nšŸ”„ OVERLAP BETWEEN SOURCES") print("-" * 50) print(f"Total unique (word, POS) combinations: {stats['unique_words']}") print(f"Total entries across all sources: {stats['total_entries']}") overlap = stats["overlap_distribution"] for n_sources in sorted(overlap.keys()): count = overlap[n_sources] pct = (count / stats["unique_words"]) * 100 print(f"Words appearing in {n_sources} source(s): {count} ({pct:.1f}%)") # Agreement and conflicts print("\nāš–ļø AGREEMENT / CONFLICT SUMMARY") print("-" * 50) print(f"Words with >1 source: {stats['agreements'] + stats['conflicts']}") print(f" āœ… Agreements (same CEFR): {stats['agreements']}") print(f" āŒ Conflicts (different CEFR): {stats['conflicts']}") if stats["conflicts"] > 0: agreement_rate = ( stats["agreements"] / (stats["agreements"] + stats["conflicts"]) ) * 100 print(f" Agreement rate: {agreement_rate:.1f}%") print("\nšŸ“‹ CONFLICT DETAILS (first 10 shown):") for i, conflict in enumerate(stats["conflict_details"][:10]): print(f" {i + 1}. {conflict['word']} ({conflict['pos']})") for src, cefr in conflict["assignments"].items(): print(f" {src}: {cefr}") if len(stats["conflict_details"]) > 10: print(f" ... and {len(stats['conflict_details']) - 10} more conflicts.") print(f"\n{'=' * 60}\n") def main(): # Determine paths script_dir = Path(__file__).parent data_dir = script_dir.parent / "data-sources" / "english" if not data_dir.exists(): print(f"Error: English data directory not found: {data_dir}") return print(f"Loading extracted files from {data_dir}...") sources = load_extracted_files(data_dir) if not sources: print("No extracted files found.") return print(f"Found sources: {', '.join(sources.keys())}") stats = compute_statistics(sources) print_report(stats, sources) if __name__ == "__main__": main()