#!/usr/bin/env python3 """ CEFR Data Pipeline - Stage 3: English Merge Merges extracted JSON files for English into an authoritative dataset. """ import json from collections import defaultdict from pathlib import Path from typing import Dict, List, Tuple # Supported CEFR levels and difficulty mapping CEFR_LEVELS = {"A1", "A2", "B1", "B2", "C1", "C2"} DIFFICULTY_MAP = { "A1": "easy", "A2": "easy", "B1": "intermediate", "B2": "intermediate", "C1": "hard", "C2": "hard", } # Source priority order (from lowest to highest priority) # Higher index = higher authority when conflicts occur PRIORITY_ORDER = ["random", "octanove", "cefrj", "en_m3"] def load_extracted_files(data_dir: Path) -> Dict[str, List[dict]]: """Load all *-extracted.json files from the English data directory.""" sources = {} for file_path in data_dir.glob("*-extracted.json"): source_name = file_path.stem.replace("-extracted", "") with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): sources[source_name] = data else: print(f"Warning: {file_path} does not contain a list, skipping.") return sources def normalize_entry(entry: dict) -> Tuple[str, str]: """Return (word, pos) key for merging.""" return entry["word"].lower().strip(), entry["pos"].lower().strip() def get_source_priority(source_name: str) -> int: """Return priority index for a source (higher = more authoritative).""" try: return PRIORITY_ORDER.index(source_name) except ValueError: # If source not in list, assign lowest priority return -1 def merge_entries(sources: Dict[str, List[dict]]) -> List[dict]: """Merge entries from multiple sources, resolving conflicts by priority.""" grouped = defaultdict(list) for src_name, entries in sources.items(): for entry in entries: key = normalize_entry(entry) grouped[key].append((src_name, entry["cefr"], entry)) merged = [] conflicts_resolved = 0 total_multi_source = 0 for (word, pos), src_entries in grouped.items(): if len(src_entries) == 1: src_name, cefr, original = src_entries[0] final_cefr = cefr contributing_sources = [src_name] else: total_multi_source += 1 sorted_entries = sorted( src_entries, key=lambda x: get_source_priority(x[0]), reverse=True ) highest_src, highest_cefr, _ = sorted_entries[0] all_cefrs = {e[1] for e in src_entries} if len(all_cefrs) > 1: conflicts_resolved += 1 final_cefr = highest_cefr contributing_sources = [e[0] for e in src_entries] difficulty = DIFFICULTY_MAP.get(final_cefr, "unknown") merged.append( { "word": word, "pos": pos, "cefr": final_cefr, "difficulty": difficulty, "sources": sorted(contributing_sources), } ) print(f"Merge statistics:") print(f" Total unique entries: {len(merged)}") print(f" Entries with multiple sources: {total_multi_source}") print(f" Conflicts resolved by priority: {conflicts_resolved}") return merged def print_summary(merged: List[dict]): """Print distribution of CEFR levels and difficulty in final dataset.""" cefr_counts = defaultdict(int) diff_counts = defaultdict(int) for entry in merged: cefr_counts[entry["cefr"]] += 1 diff_counts[entry["difficulty"]] += 1 print("\nšŸ“Š Final CEFR distribution:") for level in sorted(CEFR_LEVELS): count = cefr_counts.get(level, 0) if count: print(f" {level}: {count}") print("\nšŸ“Š Final difficulty distribution:") for diff in ["easy", "intermediate", "hard"]: count = diff_counts.get(diff, 0) print(f" {diff}: {count}") def main(): script_dir = Path(__file__).parent data_dir = script_dir.parent / "data-sources" / "english" output_dir = script_dir.parent / "datafiles" output_file = output_dir / "english-merged.json" if not data_dir.exists(): print(f"Error: English data directory not found: {data_dir}") return output_dir.mkdir(parents=True, exist_ok=True) print(f"Loading extracted files from {data_dir}...") sources = load_extracted_files(data_dir) if not sources: print("No extracted files found.") return print(f"Found sources: {', '.join(sources.keys())}") print(f"Priority order (lowest to highest): {PRIORITY_ORDER}") merged = merge_entries(sources) with open(output_file, "w", encoding="utf-8") as f: json.dump(merged, f, indent=2, ensure_ascii=False) print(f"\nāœ… Merged dataset written to: {output_file}") print_summary(merged) if __name__ == "__main__": main()