-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrecommender.py
More file actions
218 lines (191 loc) · 9.54 KB
/
Copy pathrecommender.py
File metadata and controls
218 lines (191 loc) · 9.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
"""
The recommender.
This is deliberately two models, not one:
Tier A -- Content-based interest profile (the default, always on).
Each item becomes a TF-IDF vector over its category + tags + text. A user's
interest profile is a time-decayed, value-weighted sum of the items they have
engaged with (Rocchio-style relevance feedback). We score a candidate item by
cosine similarity to that profile. This updates instantly on every click, needs
no training run, handles cold start via role priors, and -- crucially -- is
explainable: we can point at the exact tags that drove a recommendation.
Tier B -- Trained engagement model (opt-in, /retrain then ?model=trained).
A logistic regression over item features + the user's role, trained on the
accumulated event log to predict P(engage). This is the "real trained model"
story: it learns feature interactions across all users, at the cost of needing
data and a retrain step.
The single most important design choice is NOT which model -- it's the VALUE SCORE
below. A raw click is a bad target: it rewards clickbait. Instead we collapse each
user's events on an item into one graded score that rewards reading and acting,
and penalizes dismissals. Both models train on that.
"""
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from seed_data import ITEMS, PERSONAS
# ---------------------------------------------------------------------------
# Value scoring: turn raw events into one graded label per (user, item).
# Clicks alone lie -- an open with a 2-second bounce is not the same as a full
# read or a completed action. This mapping is the heart of the system.
# ---------------------------------------------------------------------------
EVENT_VALUE = {
"impression": 0.0, # shown, scrolled past -> implicit negative
"open": 0.25, # clicked in
"expand": 0.4, # opened the full content
"dwell": 0.6, # read it (dwell event is only logged past a threshold)
"action": 0.9, # registered / RSVP'd / acknowledged
"feedback_pos": 1.0, # explicit "useful"
"feedback_neg": -1.0, # explicit "not useful"
"dismiss": -0.6, # swiped away
}
# How fast old behaviour fades. Each new event nudges the profile; the profile is
# also decayed so last week's interests don't dominate this week's.
PROFILE_DECAY = 0.92
LEARN_RATE = 0.5
EXPLORE_EPSILON = 0.15 # share of feed reserved for off-profile discovery
FEED_SIZE = 6 # max items shown in the "For You" lane at once
class Recommender:
def __init__(self):
# Build one "document" per item from its structured fields. Tags are
# repeated so they carry more weight than the prose body.
docs = []
self.items = ITEMS
self.item_index = {it["id"]: i for i, it in enumerate(ITEMS)}
for it in ITEMS:
tag_text = " ".join(it["tags"]) + " "
doc = f"{it['category']} {tag_text * 3} {it['title']} {it['body']}"
docs.append(doc)
self.vectorizer = TfidfVectorizer(stop_words="english", min_df=1)
self.item_matrix = self.vectorizer.fit_transform(docs).toarray()
# L2-normalize so dot product == cosine similarity.
self.item_matrix = self._normalize_rows(self.item_matrix)
self.feature_names = np.array(self.vectorizer.get_feature_names_out())
self.dim = self.item_matrix.shape[1]
self.trained_model = None # populated by retrain()
# -- helpers ---------------------------------------------------------------
@staticmethod
def _normalize_rows(m):
norms = np.linalg.norm(m, axis=1, keepdims=True)
norms[norms == 0] = 1.0
return m / norms
def role_prior(self, role: str) -> np.ndarray:
"""Cold-start vector for a role, via the persona keyword string."""
prior_text = PERSONAS.get(role, {}).get("prior", "")
if not prior_text:
return np.zeros(self.dim)
vec = self.vectorizer.transform([prior_text]).toarray()[0]
n = np.linalg.norm(vec)
return vec / n if n else vec
# -- Tier A: content-based profile ----------------------------------------
def build_profile(self, role: str, events: list) -> np.ndarray:
"""
Reconstruct the user's interest vector from scratch each time, from the
role prior plus their event history. Recomputing (rather than storing) keeps
the event log as the single source of truth.
"""
profile = self.role_prior(role).copy()
for ev in events: # oldest -> newest
idx = self.item_index.get(ev["item_id"])
if idx is None:
continue
value = EVENT_VALUE.get(ev["event_type"], 0.0)
if value == 0.0:
# impressions carry a tiny negative pull so ignored topics fade
value = -0.05
profile = PROFILE_DECAY * profile + LEARN_RATE * value * self.item_matrix[idx]
n = np.linalg.norm(profile)
return profile / n if n else profile
def explain(self, profile: np.ndarray, item_idx: int, top_k=3):
"""Which tags drove this score: elementwise contribution to the cosine."""
contrib = profile * self.item_matrix[item_idx]
if contrib.max() <= 0:
return []
top = np.argsort(contrib)[::-1][:top_k]
return [self.feature_names[i] for i in top if contrib[i] > 0]
# -- Tier B: trained logistic model ---------------------------------------
def retrain(self, all_events: list, get_role) -> dict:
"""
Train a global engagement model on the event log. Features = item vector
plus a one-hot of the user's role; label = (value score > 0).
Returns simple training stats. No-op-safe if there isn't enough data.
"""
roles = list(PERSONAS.keys())
X, y = [], []
# collapse events to the strongest value per (user, item)
best = {}
for ev in all_events:
key = (ev["user_id"], ev["item_id"])
v = EVENT_VALUE.get(ev["event_type"], 0.0)
best[key] = max(best.get(key, -9), v)
for (user_id, item_id), value in best.items():
idx = self.item_index.get(item_id)
if idx is None:
continue
role = get_role(user_id) or ""
role_onehot = [1.0 if role == r else 0.0 for r in roles]
X.append(np.concatenate([self.item_matrix[idx], role_onehot]))
y.append(1 if value > 0 else 0)
if len(set(y)) < 2:
self.trained_model = None
return {"trained": False, "reason": "need both engaged and ignored examples",
"samples": len(y)}
clf = LogisticRegression(max_iter=1000, C=1.0)
clf.fit(np.array(X), np.array(y))
self.trained_model = clf
return {"trained": True, "samples": len(y),
"train_accuracy": round(float(clf.score(np.array(X), np.array(y))), 3)}
def trained_score(self, item_idx: int, role: str) -> float:
roles = list(PERSONAS.keys())
role_onehot = [1.0 if role == r else 0.0 for r in roles]
x = np.concatenate([self.item_matrix[item_idx], role_onehot]).reshape(1, -1)
return float(self.trained_model.predict_proba(x)[0][1])
# -- ranking ---------------------------------------------------------------
def rank(self, role: str, events: list, model: str = "content"):
"""
Produce the feed. Returns items split into routing lanes:
push -> urgent OR mandatory (bypasses personalization entirely)
recommended -> top FEED_SIZE scored discretionary items
discovery -> the most off-profile item beyond the main feed (deterministic)
Personalization decides ORDER and framing, never whether mandatory info
is delivered.
"""
profile = self.build_profile(role, events)
seen_dismissed = {e["item_id"] for e in events if e["event_type"] == "dismiss"}
scored = []
for i, it in enumerate(self.items):
if model == "trained" and self.trained_model is not None:
score = self.trained_score(i, role)
else:
score = float(profile @ self.item_matrix[i])
scored.append({
"item": it,
"score": round(score, 4),
"reasons": self.explain(profile, i),
})
# --- override lane: urgent or mandatory always surfaces ---
push, pool = [], []
for s in scored:
it = s["item"]
if it["urgency"] == "urgent" or it["mandatory"]:
push.append(s)
elif it["id"] not in seen_dismissed:
pool.append(s)
pool.sort(key=lambda s: s["score"], reverse=True)
push.sort(key=lambda s: 0 if s["item"]["urgency"] == "urgent" else 1)
# All Staff sees every item; other roles get a ranked window of FEED_SIZE.
if role == "all_staff":
discovery = None
else:
discovery = pool[-1] if len(pool) > FEED_SIZE else None
pool = pool[:FEED_SIZE]
return {
"push": push,
"recommended": pool,
"discovery": discovery,
"profile_terms": self._top_profile_terms(profile),
}
def _top_profile_terms(self, profile, top_k=8):
if profile.max() <= 0:
return []
top = np.argsort(profile)[::-1][:top_k]
return [{"term": self.feature_names[i], "weight": round(float(profile[i]), 3)}
for i in top if profile[i] > 0]