Skip to content

Commit 225937c

Browse files
committed
refactor(cdp): standalone daily salt provider, scope to hog runtime
1 parent c53a5fe commit 225937c

8 files changed

Lines changed: 284 additions & 260 deletions

File tree

nodejs/src/cdp/cdp-api.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { DateTime } from 'luxon'
22
import express from 'ultimate-express'
33

44
import { ModifiedRequest } from '~/api/router'
5-
import { DailySaltProvider } from '~/ingestion/common/cookieless/daily-salt-provider'
65
import { PluginEvent } from '~/plugin-scaffold'
76

87
import { createCookielessRedisConnectionConfig } from '../config/redis-pools'
@@ -31,6 +30,7 @@ import { BATCH_HOGFLOW_REQUESTS_OUTPUT } from './outputs/outputs'
3130
import { RerunJobManager } from './rerun/rerun-job.manager'
3231
import { RerunRequest } from './rerun/rerun-job.types'
3332
import { BatchExportHogFunctionService, NotFoundError, ParseError } from './services/batch-export-hog-function.service'
33+
import { DailySaltProvider } from './services/daily-salt-provider'
3434
import { HogExecutorExecuteAsyncOptions, HogExecutorService, MAX_ASYNC_STEPS } from './services/hog-executor.service'
3535
import { HogFlowExecutorService, createHogFlowInvocation } from './services/hogflows/hogflow-executor.service'
3636
import { HogFlowManagerService } from './services/hogflows/hogflow-manager.service'

nodejs/src/cdp/consumers/cdp-source-webhooks.consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ import { Counter } from 'prom-client'
33

44
import type { ModifiedRequest } from '~/api/router'
55
import { instrumented } from '~/common/tracing/tracing-utils'
6-
import { DailySaltProvider, deriveTeamDailySalt } from '~/ingestion/common/cookieless/daily-salt-provider'
76
import { HogFlow } from '~/schema/hogflow'
87

98
import { HealthCheckResult, HealthCheckResultOk, PluginsServerConfig } from '../../types'
109
import { logger } from '../../utils/logger'
1110
import { PromiseScheduler } from '../../utils/promise-scheduler'
1211
import { UUID, UUIDT } from '../../utils/utils'
12+
import { DailySaltProvider, deriveTeamDailySalt } from '../services/daily-salt-provider'
1313
import { createHogFlowInvocation } from '../services/hogflows/hogflow-executor.service'
1414
import { actionIdForLogging } from '../services/hogflows/hogflow-utils'
1515
import { JobQueue } from '../services/job-queue/job-queue.interface'

nodejs/src/ingestion/common/cookieless/daily-salt-provider.test.ts renamed to nodejs/src/cdp/services/daily-salt-provider.test.ts

File renamed without changes.
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import { createHash, randomBytes } from 'crypto'
2+
import { Pool as GenericPool } from 'generic-pool'
3+
import Redis from 'ioredis'
4+
5+
import { ConcurrencyController } from '~/utils/concurrencyController'
6+
7+
/*
8+
* Daily-rotating salt for Hog-derived distinct_ids.
9+
*
10+
* A 128-bit random value per calendar day, stored in Redis with a TTL. Once the TTL expires the
11+
* salt is gone, so any hash that mixed it in becomes irreversible. This is intentionally independent
12+
* of cookieless ingestion — it manages its own salt under its own key — so the cookieless code stays
13+
* untouched and the two never share state.
14+
*/
15+
16+
// Redis key namespace. Deliberately separate from cookieless (`cookieless_salt:`) — own salt, own data.
17+
const SALT_KEY_PREFIX = 'hog_distinct_id_salt:'
18+
19+
// Calendar-day validity window: accept any day some timezone could currently be in (UTC−12…UTC+14),
20+
// plus a 72h buffer. Mirrors the cookieless salt window without sharing its code.
21+
const MAX_NEGATIVE_TIMEZONE_HOURS = 12
22+
const MAX_POSITIVE_TIMEZONE_HOURS = 14
23+
const MAX_SUPPORTED_INGESTION_LAG_HOURS = 72
24+
25+
export type DailySaltResult = { success: true; salt: Buffer } | { success: false; reason: 'date_out_of_range' }
26+
27+
export interface DailySaltProviderConfig {
28+
saltTtlSeconds: number
29+
deleteExpiredLocalSaltsIntervalMs: number
30+
}
31+
32+
export class DailySaltProvider {
33+
private readonly saltTtlSeconds: number
34+
private readonly localSaltMap: Record<string, Buffer> = {}
35+
private readonly mutex = new ConcurrencyController(1)
36+
private cleanupInterval: NodeJS.Timeout | null = null
37+
38+
constructor(
39+
config: DailySaltProviderConfig,
40+
private readonly redisPool: GenericPool<Redis.Redis>
41+
) {
42+
this.saltTtlSeconds = config.saltTtlSeconds
43+
// Periodically drop expired salts from the local cache; Redis TTLs handle the durable copy.
44+
this.cleanupInterval = setInterval(this.deleteExpiredLocalSalts, config.deleteExpiredLocalSaltsIntervalMs)
45+
// unref so the timer never keeps the process alive.
46+
this.cleanupInterval.unref()
47+
}
48+
49+
getSaltForDay(yyyymmdd: string, timestampMs: number): Promise<DailySaltResult> {
50+
if (!isCalendarDateValid(yyyymmdd)) {
51+
return Promise.resolve({ success: false, reason: 'date_out_of_range' })
52+
}
53+
if (this.localSaltMap[yyyymmdd]) {
54+
return Promise.resolve({ success: true, salt: this.localSaltMap[yyyymmdd] })
55+
}
56+
57+
// Fetch from Redis once per node process per day, behind a mutex so concurrent callers share one round-trip.
58+
return this.mutex.run({
59+
fn: async (): Promise<DailySaltResult> => {
60+
if (this.localSaltMap[yyyymmdd]) {
61+
return { success: true, salt: this.localSaltMap[yyyymmdd] }
62+
}
63+
64+
const key = `${SALT_KEY_PREFIX}${yyyymmdd}`
65+
const client = await this.redisPool.acquire()
66+
try {
67+
const existing = await client.get(key)
68+
if (existing) {
69+
const salt = Buffer.from(existing, 'base64')
70+
this.localSaltMap[yyyymmdd] = salt
71+
return { success: true, salt }
72+
}
73+
74+
// Create the day's salt, but don't overwrite a racing writer (SET NX).
75+
const newSalt = randomBytes(16)
76+
const setResult = await client.set(key, newSalt.toString('base64'), 'EX', this.saltTtlSeconds, 'NX')
77+
if (setResult === 'OK') {
78+
this.localSaltMap[yyyymmdd] = newSalt
79+
return { success: true, salt: newSalt }
80+
}
81+
82+
// Lost the race — read the value the winner wrote.
83+
const retry = await client.get(key)
84+
if (!retry) {
85+
throw new Error('Failed to read Hog daily salt from redis')
86+
}
87+
const salt = Buffer.from(retry, 'base64')
88+
this.localSaltMap[yyyymmdd] = salt
89+
return { success: true, salt }
90+
} finally {
91+
await this.redisPool.release(client)
92+
}
93+
},
94+
priority: timestampMs,
95+
})
96+
}
97+
98+
deleteExpiredLocalSalts = (): void => {
99+
for (const key in this.localSaltMap) {
100+
if (!isCalendarDateValid(key)) {
101+
delete this.localSaltMap[key]
102+
}
103+
}
104+
}
105+
106+
deleteAllLocalSalts(): void {
107+
for (const key in this.localSaltMap) {
108+
delete this.localSaltMap[key]
109+
}
110+
}
111+
112+
shutdown(): void {
113+
if (this.cleanupInterval) {
114+
clearInterval(this.cleanupInterval)
115+
this.cleanupInterval = null
116+
}
117+
this.deleteAllLocalSalts()
118+
}
119+
}
120+
121+
/**
122+
* Derive a per-team daily salt from the random daily salt. Forward-derivable from
123+
* (dailySalt, teamId, yyyymmdd), but not reversible — sha256 is one-way and the daily salt is
124+
* random and discarded after its TTL. Mixing in `teamId` isolates teams: leaking one team's
125+
* derived salt reveals nothing about the daily salt or any other team.
126+
*/
127+
export function deriveTeamDailySalt(dailySalt: Buffer, teamId: number, yyyymmdd: string): string {
128+
return createHash('sha256').update(dailySalt).update(`:${teamId}:${yyyymmdd}`).digest('base64')
129+
}
130+
131+
export function isCalendarDateValid(yyyymmdd: string): boolean {
132+
const utcDate = new Date(`${yyyymmdd}T00:00:00Z`)
133+
const nowUTC = new Date(Date.now())
134+
135+
const startOfDayMinus12 = new Date(utcDate)
136+
startOfDayMinus12.setUTCHours(-MAX_NEGATIVE_TIMEZONE_HOURS)
137+
138+
const endOfDayPlus14 = new Date(utcDate)
139+
endOfDayPlus14.setUTCHours(MAX_POSITIVE_TIMEZONE_HOURS + MAX_SUPPORTED_INGESTION_LAG_HOURS)
140+
141+
return nowUTC >= startOfDayMinus12 && nowUTC < endOfDayPlus14
142+
}

nodejs/src/cdp/templates/_sources/vercel/vercel_log_drain.template.test.ts

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -617,51 +617,6 @@ describe('vercel log drain template', () => {
617617
expect(day1.capturedPostHogEvents[0].properties.$distinct_id_strategy).toBe('fixed_salt')
618618
})
619619

620-
it('managed_rotating_salt: rotates by day and by injected salt', async () => {
621-
setMockedDay('2025-01-01T00:00:00Z')
622-
const day1 = await tester.invoke(
623-
{ distinct_id_strategy: 'managed_rotating_salt' },
624-
{ request: createVercelRequest(vercelLogDrain), salt: 'team-a-2025-01-01' }
625-
)
626-
const day1Repeat = await tester.invoke(
627-
{ distinct_id_strategy: 'managed_rotating_salt' },
628-
{ request: createVercelRequest(vercelLogDrain), salt: 'team-a-2025-01-01' }
629-
)
630-
// Different injected salt (another team, or the next day's derived salt) → different id
631-
const otherTeam = await tester.invoke(
632-
{ distinct_id_strategy: 'managed_rotating_salt' },
633-
{ request: createVercelRequest(vercelLogDrain), salt: 'team-b-2025-01-01' }
634-
)
635-
// Next calendar day → different id even with the same injected salt (the {day} component)
636-
setMockedDay('2025-01-02T00:00:00Z')
637-
const day2 = await tester.invoke(
638-
{ distinct_id_strategy: 'managed_rotating_salt' },
639-
{ request: createVercelRequest(vercelLogDrain), salt: 'team-a-2025-01-01' }
640-
)
641-
642-
const id1 = day1.capturedPostHogEvents[0].distinct_id
643-
expect(id1).toMatch(/^http_log_[A-Za-z0-9+/]{22}$/)
644-
expect(day1Repeat.capturedPostHogEvents[0].distinct_id).toBe(id1)
645-
expect(otherTeam.capturedPostHogEvents[0].distinct_id).not.toBe(id1)
646-
expect(day2.capturedPostHogEvents[0].distinct_id).not.toBe(id1)
647-
expect(day1.capturedPostHogEvents[0].properties.$distinct_id_strategy).toBe('managed_rotating_salt')
648-
})
649-
650-
it('managed_rotating_salt: derived from the injected salt, not inputs.salt_secret', async () => {
651-
setMockedDay('2025-01-01T00:00:00Z')
652-
const base = await tester.invoke(
653-
{ salt_secret: 'secret-one', distinct_id_strategy: 'managed_rotating_salt' },
654-
{ request: createVercelRequest(vercelLogDrain), salt: 'team-a-salt' }
655-
)
656-
// Changing the user secret must NOT change the id — guards against the local shadowing the global.
657-
const differentSecret = await tester.invoke(
658-
{ salt_secret: 'secret-two', distinct_id_strategy: 'managed_rotating_salt' },
659-
{ request: createVercelRequest(vercelLogDrain), salt: 'team-a-salt' }
660-
)
661-
662-
expect(differentSecret.capturedPostHogEvents[0].distinct_id).toBe(base.capturedPostHogEvents[0].distinct_id)
663-
})
664-
665620
it('ip: literal client IP after the prefix; stable across days', async () => {
666621
setMockedDay('2025-01-01T00:00:00Z')
667622
const day1 = await tester.invoke(

nodejs/src/cdp/templates/_sources/vercel/vercel_log_drain.template.ts

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,7 @@ let scheme := proxy.scheme ?? 'https'
165165
let path := proxy.path ?? log.path ?? ''
166166
167167
let day := formatDateTime(now(), '%Y-%m-%d')
168-
let userSalt := inputs.salt_secret ?? ''
169-
// PostHog-managed per-team daily-rotating salt, injected as the 'salt' runtime global before this runs.
170-
let managedSalt := salt ?? ''
168+
let salt := inputs.salt_secret ?? ''
171169
let strategy := inputs.distinct_id_strategy ?? 'fixed_salt'
172170
let activeStrategy := strategy
173171
let distinctId := ''
@@ -179,21 +177,17 @@ fun shortHash(input) {
179177
return substring(sha256(input, 'base64'), 1, 22)
180178
}
181179
182-
if (strategy == 'managed_rotating_salt') {
183-
// PostHog-managed daily salt — rotates daily and is irreversible by design (the underlying daily
184-
// salt is random and discarded after its TTL), so there's no secret to configure or store.
185-
distinctId := f'http_log_{shortHash(f'{managedSalt}:{day}:{clientIp}:{host}:{userAgent}')}'
186-
} else if (strategy == 'rotating_salt') {
187-
distinctId := f'http_log_{shortHash(f'{userSalt}:{day}:{clientIp}:{host}:{userAgent}')}'
180+
if (strategy == 'rotating_salt') {
181+
distinctId := f'http_log_{shortHash(f'{salt}:{day}:{clientIp}:{host}:{userAgent}')}'
188182
} else if (strategy == 'fixed_salt') {
189-
distinctId := f'http_log_{shortHash(f'{userSalt}:{clientIp}:{host}:{userAgent}')}'
183+
distinctId := f'http_log_{shortHash(f'{salt}:{clientIp}:{host}:{userAgent}')}'
190184
} else if (strategy == 'ip') {
191185
distinctId := f'http_log_{clientIp}'
192186
} else if (strategy == 'custom') {
193187
let customTemplate := inputs.custom_template ?? ''
194188
if (empty(customTemplate)) {
195189
print('vercel log drain: custom_template empty, falling back to rotating_salt')
196-
distinctId := f'http_log_{shortHash(f'{userSalt}:{day}:{clientIp}:{host}:{userAgent}')}'
190+
distinctId := f'http_log_{shortHash(f'{salt}:{day}:{clientIp}:{host}:{userAgent}')}'
197191
activeStrategy := 'rotating_salt_fallback'
198192
} else {
199193
let result := customTemplate
@@ -209,15 +203,15 @@ if (strategy == 'managed_rotating_salt') {
209203
// so we don't collapse all such requests onto a single 'http_log_' id.
210204
if (empty(result)) {
211205
print('vercel log drain: custom_template substituted to empty, falling back to rotating_salt')
212-
distinctId := f'http_log_{shortHash(f'{userSalt}:{day}:{clientIp}:{host}:{userAgent}')}'
206+
distinctId := f'http_log_{shortHash(f'{salt}:{day}:{clientIp}:{host}:{userAgent}')}'
213207
activeStrategy := 'rotating_salt_fallback'
214208
} else {
215209
distinctId := f'http_log_{result}'
216210
}
217211
}
218212
} else {
219213
// Unknown strategy value — treat as rotating_salt
220-
distinctId := f'http_log_{shortHash(f'{userSalt}:{day}:{clientIp}:{host}:{userAgent}')}'
214+
distinctId := f'http_log_{shortHash(f'{salt}:{day}:{clientIp}:{host}:{userAgent}')}'
221215
activeStrategy := 'rotating_salt'
222216
}
223217
@@ -371,7 +365,7 @@ return {
371365
type: 'choice',
372366
label: 'Distinct ID strategy',
373367
description:
374-
'How distinct IDs are derived from the request. Because events are anonymous by default (no person profiles), this affects unique-visitor counting rather than cost. The default, fixed salt, gives one stable ID per client (IP + host + user agent) for accurate uniques. Rotating salt rotates that ID daily for extra privacy, at the cost of inflated unique counts. Managed rotating salt does the same using a PostHog-managed daily salt, so there is no secret to configure and prior days become irreversible. The active strategy is recorded on each event as $distinct_id_strategy for debugging.',
368+
'How distinct IDs are derived from the request. Because events are anonymous by default (no person profiles), this affects unique-visitor counting rather than cost. The default, fixed salt, gives one stable ID per client (IP + host + user agent) for accurate uniques. Rotating salt rotates that ID daily for extra privacy, at the cost of inflated unique counts. The active strategy is recorded on each event as $distinct_id_strategy for debugging.',
375369
choices: [
376370
{
377371
value: 'fixed_salt',
@@ -381,10 +375,6 @@ return {
381375
value: 'rotating_salt',
382376
label: 'Rotating salt (sha256(salt:day:ip:host:ua)) — rotates daily for privacy',
383377
},
384-
{
385-
value: 'managed_rotating_salt',
386-
label: 'Managed rotating salt — PostHog-managed daily salt, rotates daily, no secret needed',
387-
},
388378
{
389379
value: 'ip',
390380
label: 'Raw IP — stores client IPs unhashed as queryable distinct IDs',

0 commit comments

Comments
 (0)