-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathenrich_dumps.py
More file actions
77 lines (59 loc) · 2.65 KB
/
enrich_dumps.py
File metadata and controls
77 lines (59 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
One-shot script: re-parse every captured Voyager payload on disk to fill
sparse columns (raw_tags, company_size, deadline) in the existing CSV.
Idempotent: only fills empty fields; never overwrites populated ones.
Safe to re-run after each crawl session.
Usage:
python enrich_dumps.py
"""
import sys
from pathlib import Path
import pandas as pd
import config
from enrich import enrich_row, extract_from_dumps
from schema import SCHEMA
def main() -> int:
if not config.RAW_CSV.exists():
print(f"[FATAL] No CSV at {config.RAW_CSV}; nothing to enrich")
return 1
# Gather all dump files: current crawler dumps + the probe-era dumps if kept
dump_paths = sorted(config.PAYLOAD_DIR.glob("*.json"))
probe_dir = config.OUTPUT_DIR / "intercepted_payloads_probe"
if probe_dir.exists():
dump_paths += sorted(probe_dir.glob("*.json"))
print(f"[INFO] Reading {len(dump_paths)} payload dumps")
if not dump_paths:
print("[FATAL] No dumps to process")
return 1
per_job, per_company, company_of_job = extract_from_dumps(dump_paths)
print(f" per-job records : {len(per_job)}")
print(f" per-company records: {len(per_company)}")
print(f" job->company links: {len(company_of_job)}")
df = pd.read_csv(config.RAW_CSV, encoding="utf-8-sig", dtype=str).fillna("")
print(f"[INFO] CSV before: {len(df)} rows")
before = {col: (df[col].astype(str).str.len() > 0).sum() for col in
["raw_tags", "company_size", "deadline"]}
# Apply enrichment row by row
rows = df.to_dict(orient="records")
for row in rows:
# Treat the placeholder deadline as empty for enrichment purposes
if row.get("deadline") == "未知截止时间":
row["deadline"] = ""
enrich_row(row, per_job, per_company, company_of_job)
# Restore deadline placeholder if still empty
if not row.get("deadline"):
row["deadline"] = "未知截止时间"
out_df = pd.DataFrame(rows, columns=SCHEMA)
out_df.to_csv(config.RAW_CSV, index=False, encoding="utf-8-sig")
after = {col: (out_df[col].astype(str).str.len() > 0).sum()
- (out_df[col].astype(str).isin(["未知截止时间"]).sum() if col == "deadline" else 0)
for col in ["raw_tags", "company_size", "deadline"]}
print(f"[INFO] CSV after: {len(out_df)} rows")
print()
print(f" Field before -> after")
for col in ["raw_tags", "company_size", "deadline"]:
print(f" {col:16s} {before[col]:>4} -> {after[col]:>4} (+{after[col]-before[col]})")
print(f"\n[DONE] {config.RAW_CSV}")
return 0
if __name__ == "__main__":
sys.exit(main())