-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparser.py
More file actions
140 lines (116 loc) · 5.33 KB
/
parser.py
File metadata and controls
140 lines (116 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
Parse Voyager GraphQL/REST payloads into 20-column rows.
LinkedIn's GraphQL responses are deeply nested and contain a mix of
sparse URN-pointer stubs and fully-resolved entities. We don't try to
match exact paths (they change between page revisions). Instead we walk
the entire payload tree recursively and pick out the two object shapes
we care about:
1. JobPostingCard (resolved variant) — has `jobPostingTitle` plus
`primaryDescription.text` (company), `secondaryDescription.text`
(location), and `footerItems[].timeAt` (LISTED_DATE epoch).
Extract job ID from `entityUrn = urn:li:fsd_jobPostingCard:(ID,*)`.
2. JobDescription — has `entityUrn = urn:li:fsd_jobDescription:ID`
plus `descriptionText.text` (full JD body).
Multiple payloads contribute to one job: the list payload provides the
basics, and a separate JD-detail payload provides the body. We merge by
job ID and the driver flushes the merged dict to CSV.
"""
import re
from typing import Any, Dict, Iterator, List, Tuple
from schema import SOURCE_TAG, epoch_ms_to_iso, norm, normalize_row
JOB_CARD_URN_RE = re.compile(r"urn:li:fsd_jobPostingCard:\((\d+),")
JOB_DESC_URN_RE = re.compile(r"urn:li:fsd_jobDescription:(\d+)")
def _walk(node: Any) -> Iterator[Any]:
"""Yield every dict in the tree, depth-first."""
if isinstance(node, dict):
yield node
for v in node.values():
yield from _walk(v)
elif isinstance(node, list):
for v in node:
yield from _walk(v)
def _text(node: Any) -> str:
"""Extract `.text` from a TextViewModel-shaped dict, else norm() the value."""
if isinstance(node, dict) and "text" in node:
return norm(node.get("text"))
return norm(node)
def _listed_date_ms(footer_items: Any) -> int:
if not isinstance(footer_items, list):
return 0
for item in footer_items:
if isinstance(item, dict) and item.get("type") == "LISTED_DATE":
ts = item.get("timeAt")
if isinstance(ts, (int, float)):
return int(ts)
return 0
def _strip_cn_country(loc: str) -> str:
s = loc.strip()
s = re.sub(r"\s*\(.*?\)\s*$", "", s) # drop "(现场办公)" etc
s = re.sub(r"^中国\s+", "", s)
return s.strip()
def extract_from_payload(payload: Dict[str, Any]) -> Tuple[Dict[int, Dict], Dict[int, str]]:
"""Return ({job_id: list_fields}, {job_id: jd_text}) found in one payload."""
list_rows: Dict[int, Dict[str, Any]] = {}
jd_texts: Dict[int, str] = {}
for node in _walk(payload):
# --- Resolved JobPostingCard ---
if "jobPostingTitle" in node and "primaryDescription" in node:
urn = node.get("entityUrn", "")
if not isinstance(urn, str):
continue
m = JOB_CARD_URN_RE.search(urn)
if not m:
continue
job_id = int(m.group(1))
company = _text(node.get("primaryDescription"))
location = _strip_cn_country(_text(node.get("secondaryDescription")))
title = norm(node.get("jobPostingTitle")) or _text(node.get("title"))
listed_ms = _listed_date_ms(node.get("footerItems"))
existing = list_rows.get(job_id, {})
# Prefer the version with the longest title (some stubs lack it)
if not existing or len(title) > len(existing.get("name", "")):
list_rows[job_id] = {
"external_job_id": str(job_id),
"url": f"https://www.linkedin.com/jobs/view/{job_id}",
"name": title,
"company": company,
"city": location,
"publish_time": epoch_ms_to_iso(listed_ms) if listed_ms else "",
"publish_time_source": "voyager_listedAt" if listed_ms else "unknown",
"source": SOURCE_TAG,
"recruit_type": "intern",
}
# --- Resolved JobDescription ---
urn = node.get("entityUrn", "")
if not isinstance(urn, str):
continue
m = JOB_DESC_URN_RE.fullmatch(urn) if urn else None
if m and "descriptionText" in node:
job_id = int(m.group(1))
text = _text(node.get("descriptionText"))
if text and len(text) > len(jd_texts.get(job_id, "")):
jd_texts[job_id] = text
return list_rows, jd_texts
def merge_payloads(payloads: List[Dict[str, Any]]) -> List[Dict[str, str]]:
"""Walk all captured payloads, merge per-job-id, return normalized rows."""
merged_list: Dict[int, Dict[str, Any]] = {}
merged_jd: Dict[int, str] = {}
for p in payloads:
list_rows, jd_texts = extract_from_payload(p)
for jid, fields in list_rows.items():
current = merged_list.get(jid, {})
for k, v in fields.items():
if v and (not current.get(k) or len(str(v)) > len(str(current.get(k, "")))):
current[k] = v
merged_list[jid] = current
for jid, text in jd_texts.items():
if len(text) > len(merged_jd.get(jid, "")):
merged_jd[jid] = text
out: List[Dict[str, str]] = []
for jid, fields in merged_list.items():
if not fields.get("url"):
continue
if jid in merged_jd:
fields["jd_raw"] = merged_jd[jid]
out.append(normalize_row(fields))
return out