159 lines
4.9 KiB
Python
159 lines
4.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
CEFR Data Pipeline - Stage 3: English Merge
|
|
Merges extracted JSON files for English into an authoritative dataset.
|
|
"""
|
|
|
|
import json
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
|
|
# Supported CEFR levels and difficulty mapping
|
|
CEFR_LEVELS = {"A1", "A2", "B1", "B2", "C1", "C2"}
|
|
DIFFICULTY_MAP = {
|
|
"A1": "easy",
|
|
"A2": "easy",
|
|
"B1": "intermediate",
|
|
"B2": "intermediate",
|
|
"C1": "hard",
|
|
"C2": "hard",
|
|
}
|
|
|
|
# Source priority order (from lowest to highest priority)
|
|
# Higher index = higher authority when conflicts occur
|
|
PRIORITY_ORDER = ["random", "octanove", "cefrj", "en_m3"]
|
|
|
|
|
|
def load_extracted_files(data_dir: Path) -> Dict[str, List[dict]]:
|
|
"""Load all *-extracted.json files from the English data directory."""
|
|
sources = {}
|
|
for file_path in data_dir.glob("*-extracted.json"):
|
|
source_name = file_path.stem.replace("-extracted", "")
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
sources[source_name] = data
|
|
else:
|
|
print(f"Warning: {file_path} does not contain a list, skipping.")
|
|
return sources
|
|
|
|
|
|
def normalize_entry(entry: dict) -> Tuple[str, str]:
|
|
"""Return (word, pos) key for merging."""
|
|
return entry["word"].lower().strip(), entry["pos"].lower().strip()
|
|
|
|
|
|
def get_source_priority(source_name: str) -> int:
|
|
"""Return priority index for a source (higher = more authoritative)."""
|
|
try:
|
|
return PRIORITY_ORDER.index(source_name)
|
|
except ValueError:
|
|
# If source not in list, assign lowest priority
|
|
return -1
|
|
|
|
|
|
def merge_entries(sources: Dict[str, List[dict]]) -> List[dict]:
|
|
"""Merge entries from multiple sources, resolving conflicts by priority."""
|
|
grouped = defaultdict(list)
|
|
for src_name, entries in sources.items():
|
|
for entry in entries:
|
|
key = normalize_entry(entry)
|
|
grouped[key].append((src_name, entry["cefr"], entry))
|
|
|
|
merged = []
|
|
conflicts_resolved = 0
|
|
total_multi_source = 0
|
|
|
|
for (word, pos), src_entries in grouped.items():
|
|
if len(src_entries) == 1:
|
|
src_name, cefr, original = src_entries[0]
|
|
final_cefr = cefr
|
|
contributing_sources = [src_name]
|
|
else:
|
|
total_multi_source += 1
|
|
sorted_entries = sorted(
|
|
src_entries, key=lambda x: get_source_priority(x[0]), reverse=True
|
|
)
|
|
highest_src, highest_cefr, _ = sorted_entries[0]
|
|
all_cefrs = {e[1] for e in src_entries}
|
|
if len(all_cefrs) > 1:
|
|
conflicts_resolved += 1
|
|
|
|
final_cefr = highest_cefr
|
|
contributing_sources = [e[0] for e in src_entries]
|
|
|
|
difficulty = DIFFICULTY_MAP.get(final_cefr, "unknown")
|
|
|
|
merged.append(
|
|
{
|
|
"word": word,
|
|
"pos": pos,
|
|
"cefr": final_cefr,
|
|
"difficulty": difficulty,
|
|
"sources": sorted(contributing_sources),
|
|
}
|
|
)
|
|
|
|
print(f"Merge statistics:")
|
|
print(f" Total unique entries: {len(merged)}")
|
|
print(f" Entries with multiple sources: {total_multi_source}")
|
|
print(f" Conflicts resolved by priority: {conflicts_resolved}")
|
|
|
|
return merged
|
|
|
|
|
|
def print_summary(merged: List[dict]):
|
|
"""Print distribution of CEFR levels and difficulty in final dataset."""
|
|
cefr_counts = defaultdict(int)
|
|
diff_counts = defaultdict(int)
|
|
|
|
for entry in merged:
|
|
cefr_counts[entry["cefr"]] += 1
|
|
diff_counts[entry["difficulty"]] += 1
|
|
|
|
print("\n📊 Final CEFR distribution:")
|
|
for level in sorted(CEFR_LEVELS):
|
|
count = cefr_counts.get(level, 0)
|
|
if count:
|
|
print(f" {level}: {count}")
|
|
|
|
print("\n📊 Final difficulty distribution:")
|
|
for diff in ["easy", "intermediate", "hard"]:
|
|
count = diff_counts.get(diff, 0)
|
|
print(f" {diff}: {count}")
|
|
|
|
|
|
def main():
|
|
script_dir = Path(__file__).parent
|
|
data_dir = script_dir.parent / "data-sources" / "english"
|
|
output_dir = script_dir.parent / "datafiles"
|
|
output_file = output_dir / "english-merged.json"
|
|
|
|
if not data_dir.exists():
|
|
print(f"Error: English data directory not found: {data_dir}")
|
|
return
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
print(f"Loading extracted files from {data_dir}...")
|
|
sources = load_extracted_files(data_dir)
|
|
|
|
if not sources:
|
|
print("No extracted files found.")
|
|
return
|
|
|
|
print(f"Found sources: {', '.join(sources.keys())}")
|
|
print(f"Priority order (lowest to highest): {PRIORITY_ORDER}")
|
|
|
|
merged = merge_entries(sources)
|
|
|
|
with open(output_file, "w", encoding="utf-8") as f:
|
|
json.dump(merged, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\n✅ Merged dataset written to: {output_file}")
|
|
print_summary(merged)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|