166 lines
5.6 KiB
Python
166 lines
5.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
CEFR Data Pipeline - Stage 2: English Comparison
|
|
Compares extracted JSON files for English and reports agreements and conflicts.
|
|
"""
|
|
|
|
import json
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
|
|
# Supported CEFR levels
|
|
CEFR_LEVELS = {"A1", "A2", "B1", "B2", "C1", "C2"}
|
|
|
|
|
|
def load_extracted_files(data_dir: Path) -> Dict[str, List[dict]]:
|
|
"""Load all *-extracted.json files from the English data directory."""
|
|
sources = {}
|
|
for file_path in data_dir.glob("*-extracted.json"):
|
|
source_name = file_path.stem.replace("-extracted", "")
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
sources[source_name] = data
|
|
else:
|
|
print(f"Warning: {file_path} does not contain a list, skipping.")
|
|
return sources
|
|
|
|
|
|
def normalize_entry(entry: dict) -> Tuple[str, str]:
|
|
"""Return (word, pos) key for comparison."""
|
|
return entry["word"].lower().strip(), entry["pos"].lower().strip()
|
|
|
|
|
|
def compute_statistics(sources: Dict[str, List[dict]]) -> dict:
|
|
"""Compute overlap, agreement, and conflict statistics."""
|
|
# Per-source counts by CEFR level
|
|
source_counts = {}
|
|
for src, entries in sources.items():
|
|
cefr_counts = defaultdict(int)
|
|
for e in entries:
|
|
cefr = e.get("cefr", "UNKNOWN")
|
|
cefr_counts[cefr] += 1
|
|
source_counts[src] = dict(cefr_counts)
|
|
|
|
# Build word->pos->sources and CEFR assignments
|
|
word_map = defaultdict(lambda: defaultdict(dict))
|
|
for src, entries in sources.items():
|
|
for e in entries:
|
|
key = normalize_entry(e)
|
|
word_map[key][src] = e["cefr"]
|
|
|
|
# Compute overlaps, agreements, conflicts
|
|
total_entries = sum(len(e) for e in sources.values())
|
|
unique_words = len(word_map)
|
|
|
|
overlap_stats = defaultdict(int)
|
|
agreement_count = 0
|
|
conflict_count = 0
|
|
conflict_details = []
|
|
|
|
for key, src_cefr_map in word_map.items():
|
|
num_sources = len(src_cefr_map)
|
|
overlap_stats[num_sources] += 1
|
|
if num_sources > 1:
|
|
cefr_values = set(src_cefr_map.values())
|
|
if len(cefr_values) == 1:
|
|
agreement_count += 1
|
|
else:
|
|
conflict_count += 1
|
|
conflict_details.append(
|
|
{"word": key[0], "pos": key[1], "assignments": dict(src_cefr_map)}
|
|
)
|
|
|
|
return {
|
|
"source_counts": source_counts,
|
|
"total_entries": total_entries,
|
|
"unique_words": unique_words,
|
|
"overlap_distribution": dict(overlap_stats),
|
|
"agreements": agreement_count,
|
|
"conflicts": conflict_count,
|
|
"conflict_details": conflict_details,
|
|
}
|
|
|
|
|
|
def print_report(stats: dict, sources: Dict[str, List[dict]]):
|
|
"""Print formatted comparison report."""
|
|
print(f"\n{'=' * 60}")
|
|
print("CEFR COMPARISON REPORT - ENGLISH")
|
|
print(f"{'=' * 60}")
|
|
|
|
# Source entry counts
|
|
print("\n📊 ENTRIES PER SOURCE AND CEFR LEVEL")
|
|
print("-" * 50)
|
|
for src, counts in stats["source_counts"].items():
|
|
total = sum(counts.values())
|
|
print(f"\n{src}: {total} total entries")
|
|
for level in CEFR_LEVELS:
|
|
cnt = counts.get(level, 0)
|
|
if cnt > 0:
|
|
print(f" {level}: {cnt}")
|
|
# Show non-standard levels
|
|
for level, cnt in counts.items():
|
|
if level not in CEFR_LEVELS and level != "UNKNOWN":
|
|
print(f" {level}: {cnt} (non-standard)")
|
|
|
|
# Overlap statistics
|
|
print("\n🔄 OVERLAP BETWEEN SOURCES")
|
|
print("-" * 50)
|
|
print(f"Total unique (word, POS) combinations: {stats['unique_words']}")
|
|
print(f"Total entries across all sources: {stats['total_entries']}")
|
|
|
|
overlap = stats["overlap_distribution"]
|
|
for n_sources in sorted(overlap.keys()):
|
|
count = overlap[n_sources]
|
|
pct = (count / stats["unique_words"]) * 100
|
|
print(f"Words appearing in {n_sources} source(s): {count} ({pct:.1f}%)")
|
|
|
|
# Agreement and conflicts
|
|
print("\n⚖️ AGREEMENT / CONFLICT SUMMARY")
|
|
print("-" * 50)
|
|
print(f"Words with >1 source: {stats['agreements'] + stats['conflicts']}")
|
|
print(f" ✅ Agreements (same CEFR): {stats['agreements']}")
|
|
print(f" ❌ Conflicts (different CEFR): {stats['conflicts']}")
|
|
|
|
if stats["conflicts"] > 0:
|
|
agreement_rate = (
|
|
stats["agreements"] / (stats["agreements"] + stats["conflicts"])
|
|
) * 100
|
|
print(f" Agreement rate: {agreement_rate:.1f}%")
|
|
|
|
print("\n📋 CONFLICT DETAILS (first 10 shown):")
|
|
for i, conflict in enumerate(stats["conflict_details"][:10]):
|
|
print(f" {i + 1}. {conflict['word']} ({conflict['pos']})")
|
|
for src, cefr in conflict["assignments"].items():
|
|
print(f" {src}: {cefr}")
|
|
if len(stats["conflict_details"]) > 10:
|
|
print(f" ... and {len(stats['conflict_details']) - 10} more conflicts.")
|
|
|
|
print(f"\n{'=' * 60}\n")
|
|
|
|
|
|
def main():
|
|
# Determine paths
|
|
script_dir = Path(__file__).parent
|
|
data_dir = script_dir.parent / "data-sources" / "english"
|
|
|
|
if not data_dir.exists():
|
|
print(f"Error: English data directory not found: {data_dir}")
|
|
return
|
|
|
|
print(f"Loading extracted files from {data_dir}...")
|
|
sources = load_extracted_files(data_dir)
|
|
|
|
if not sources:
|
|
print("No extracted files found.")
|
|
return
|
|
|
|
print(f"Found sources: {', '.join(sources.keys())}")
|
|
|
|
stats = compute_statistics(sources)
|
|
print_report(stats, sources)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|