Skip to content

Commit 2939bcd

Browse files
authored
Merge pull request #10 from sidequery/persons-iceberg-pipeline
Add queryable persons pipeline
2 parents 0265361 + b048133 commit 2939bcd

16 files changed

Lines changed: 2194 additions & 166 deletions

README.md

Lines changed: 223 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22

33
<img src="hog.png" alt="Hogflare" width="300">
44

5-
Hogflare is a Cloudflare Workers ingestion layer for PostHog SDKs. It supports PostHog-style ingestion, stateful persons/groups, and SDK feature flags, then streams events into Cloudflare Pipelines so data lands in R2 as Iceberg/Parquet.
5+
Hogflare is a Cloudflare Workers ingestion layer for PostHog SDKs. It supports PostHog-style ingestion, stateful persons/groups, and SDK feature flags, then streams events and person snapshots into Cloudflare Pipelines so data lands in R2 as Iceberg/Parquet.
66

77
#### What works today
88

99
- Ingestion endpoints: `/capture`, `/identify`, `/alias`, `/batch`, `/e`, `/engage`, `/groups`
1010
- Persons and groups: `$set`, `$set_once`, `$unset`, aliasing, and group properties
1111
- Feature flags: `/flags` and `/decide` are evaluated in the Worker (used by PostHog SDKs)
1212
- Request enrichment: Cloudflare IP/geo fields added when missing
13+
- Queryable people: append-only person snapshots can be written to a separate Iceberg table
1314

1415
## Architecture
1516

@@ -34,8 +35,10 @@ flowchart TB
3435
PersonsDO -.-> PersonIdDO
3536
end
3637
37-
Worker -->|"events"| Pipeline["Cloudflare Pipelines"]
38-
Pipeline --> R2["R2 Data Catalog<br/>(Iceberg/Parquet)"]
38+
Worker -->|"events"| EventsPipeline["Events Pipeline"]
39+
Worker -->|"person snapshots"| PersonsPipeline["Persons Pipeline"]
40+
EventsPipeline --> EventsR2["R2 Data Catalog<br/>events table"]
41+
PersonsPipeline --> PersonsR2["R2 Data Catalog<br/>persons table"]
3942
```
4043

4144
## Why?
@@ -48,44 +51,70 @@ Admittedly, PostHog does a *lot* more than this package, but some folks really j
4851

4952
## Quick start (Cloudflare)
5053

51-
1) Create a Pipeline stream and sink in the Cloudflare dashboard or via `wrangler pipelines setup`.
52-
2) Use the schema below for the stream.
53-
3) Copy `wrangler.toml.example` to `wrangler.toml` and set variables.
54-
4) Set Wrangler secrets.
55-
5) Deploy the Worker.
54+
1. Create R2 Data Catalog-backed Pipelines resources.
55+
2. Copy `wrangler.toml.example` to `wrangler.toml` and set the stream endpoints.
56+
3. Set Wrangler secrets.
57+
4. Build and deploy the Worker.
58+
5. Send a capture/identify verification flow and query the Iceberg tables.
5659

57-
### Pipeline schema (JSON)
60+
The examples below use stable table names for a fresh deployment: `default.hogflare_events` and `default.hogflare_persons`. If you use versioned names during migration, substitute those names consistently in the sink commands and queries.
5861

59-
```json
60-
{
61-
"fields": [
62-
{ "name": "uuid", "type": "string", "required": true },
63-
{ "name": "team_id", "type": "int64", "required": false },
64-
{ "name": "source", "type": "string", "required": true },
65-
{ "name": "event", "type": "string", "required": true },
66-
{ "name": "distinct_id", "type": "string", "required": true },
67-
{ "name": "timestamp", "type": "timestamp", "required": false },
68-
{ "name": "created_at", "type": "timestamp", "required": true },
69-
{ "name": "properties", "type": "json", "required": false },
70-
{ "name": "context", "type": "json", "required": false },
71-
{ "name": "person_id", "type": "string", "required": false },
72-
{ "name": "person_created_at", "type": "timestamp", "required": false },
73-
{ "name": "person_properties", "type": "json", "required": false },
74-
{ "name": "group0", "type": "string", "required": false },
75-
{ "name": "group1", "type": "string", "required": false },
76-
{ "name": "group2", "type": "string", "required": false },
77-
{ "name": "group3", "type": "string", "required": false },
78-
{ "name": "group4", "type": "string", "required": false },
79-
{ "name": "group_properties", "type": "json", "required": false },
80-
{ "name": "api_key", "type": "string", "required": false },
81-
{ "name": "extra", "type": "json", "required": false }
82-
]
83-
}
62+
### Create Pipelines Resources
63+
64+
Set these values before creating sinks:
65+
66+
```bash
67+
export R2_BUCKET="<bucket-name>"
68+
export R2_CATALOG_TOKEN="<r2-data-catalog-token>"
8469
```
8570

71+
`R2_CATALOG_TOKEN` is the token used by R2 Data Catalog/R2 SQL clients such as DuckDB or PyIceberg. The bucket must have R2 Data Catalog enabled before creating `r2-data-catalog` sinks.
72+
73+
Create the events stream, sink, and pipeline:
74+
75+
```bash
76+
bunx wrangler pipelines streams create hogflare_events_stream \
77+
--schema-file scripts/events-pipeline-schema.json \
78+
--http-enabled true \
79+
--http-auth true
80+
81+
bunx wrangler pipelines sinks create hogflare_events_sink \
82+
--type r2-data-catalog \
83+
--bucket "$R2_BUCKET" \
84+
--namespace default \
85+
--table hogflare_events \
86+
--catalog-token "$R2_CATALOG_TOKEN" \
87+
--roll-interval 60
88+
89+
bunx wrangler pipelines create hogflare_events_pipeline \
90+
--sql "INSERT INTO hogflare_events_sink SELECT * FROM hogflare_events_stream;"
91+
```
92+
93+
Create the persons stream, sink, and pipeline if you want queryable people in Iceberg:
94+
95+
```bash
96+
bunx wrangler pipelines streams create hogflare_persons_stream \
97+
--schema-file scripts/persons-pipeline-schema.json \
98+
--http-enabled true \
99+
--http-auth true
100+
101+
bunx wrangler pipelines sinks create hogflare_persons_sink \
102+
--type r2-data-catalog \
103+
--bucket "$R2_BUCKET" \
104+
--namespace default \
105+
--table hogflare_persons \
106+
--catalog-token "$R2_CATALOG_TOKEN" \
107+
--roll-interval 60
108+
109+
bunx wrangler pipelines create hogflare_persons_pipeline \
110+
--sql "INSERT INTO hogflare_persons_sink SELECT * FROM hogflare_persons_stream;"
111+
```
112+
113+
Each stream creation command prints an HTTP endpoint like `https://<stream-id>.ingest.cloudflare.com`. Use those endpoints in `wrangler.toml`.
114+
86115
### Wrangler config
87116

88-
Copy the example and fill in your stream endpoint:
117+
Copy the example and fill in the stream endpoints:
89118

90119
```bash
91120
cp wrangler.toml.example wrangler.toml
@@ -98,6 +127,7 @@ compatibility_date = "2025-01-09"
98127

99128
[vars]
100129
CLOUDFLARE_PIPELINE_ENDPOINT = "https://<stream-id>.ingest.cloudflare.com"
130+
CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT = "https://<persons-stream-id>.ingest.cloudflare.com"
101131
CLOUDFLARE_PIPELINE_TIMEOUT_SECS = "10"
102132

103133
# Optional
@@ -129,34 +159,120 @@ tag = "v2"
129159
new_sqlite_classes = ["PersonIdCounterDurableObject", "GroupDurableObject"]
130160
```
131161

162+
### Configuration Reference
163+
164+
| Setting | Required | Notes |
165+
| --- | --- | --- |
166+
| `CLOUDFLARE_PIPELINE_ENDPOINT` | Yes | Events stream HTTP endpoint from `wrangler pipelines streams create`. |
167+
| `CLOUDFLARE_PIPELINE_AUTH_TOKEN` | Yes, for authenticated streams | Bearer token used for events stream HTTP ingest. |
168+
| `CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT` | No | Persons stream endpoint. Set this to write person snapshots to Iceberg. |
169+
| `CLOUDFLARE_PERSONS_PIPELINE_AUTH_TOKEN` | No | Falls back to `CLOUDFLARE_PIPELINE_AUTH_TOKEN` when omitted. |
170+
| `CLOUDFLARE_PIPELINE_TIMEOUT_SECS` | No | Defaults to 10 seconds. |
171+
| `POSTHOG_API_KEY` | No | Default project token returned by `/decide` when request/header token is absent. |
172+
| `POSTHOG_TEAM_ID` | No | Optional team id attached to event and person rows. |
173+
| `POSTHOG_GROUP_TYPE_0..4` | No | Maps PostHog group types to `group0..group4`; set `POSTHOG_GROUP_TYPE_0=company` to populate `group0` for company groups. |
174+
| `POSTHOG_SESSION_RECORDING_ENDPOINT` | No | Returned in `/decide` session recording config. |
175+
| `POSTHOG_SIGNING_SECRET` | No | Enables HMAC request signature checks. |
176+
| `PERSON_DEBUG_TOKEN` | No | Enables `/__debug/person/:id` for deployment verification. |
177+
| `HOGFLARE_FEATURE_FLAGS` | No | JSON flag config used by `/decide` and `/flags`. |
178+
132179
### Secrets
133180

181+
Use a Cloudflare API token that can write to Pipelines for `CLOUDFLARE_PIPELINE_AUTH_TOKEN`. The same token can usually be reused for the persons stream.
182+
134183
```bash
135184
bunx wrangler secret put CLOUDFLARE_PIPELINE_AUTH_TOKEN
185+
# Optional. If omitted, the persons pipeline uses CLOUDFLARE_PIPELINE_AUTH_TOKEN.
186+
bunx wrangler secret put CLOUDFLARE_PERSONS_PIPELINE_AUTH_TOKEN
187+
188+
# Optional.
136189
bunx wrangler secret put POSTHOG_SIGNING_SECRET
190+
bunx wrangler secret put PERSON_DEBUG_TOKEN
191+
bunx wrangler secret put HOGFLARE_FEATURE_FLAGS
137192
```
138193

139194
### Deploy
140195

141196
```bash
197+
worker-build --release
142198
bunx wrangler deploy
143199
```
144200

145-
## Send a test event
201+
## Verify Deployment
146202

147203
```bash
148-
curl -X POST https://<your-worker>.workers.dev/capture \
204+
export HOGFLARE_URL="https://<your-worker>.workers.dev"
205+
export HOGFLARE_API_KEY="phc_verify_$(date -u +%Y%m%d%H%M%S)"
206+
export HOGFLARE_ANON_ID="${HOGFLARE_API_KEY}_anon"
207+
export HOGFLARE_USER_ID="${HOGFLARE_API_KEY}_user"
208+
```
209+
210+
Send an anonymous capture:
211+
212+
```bash
213+
curl -X POST "$HOGFLARE_URL/capture" \
149214
-H "Content-Type: application/json" \
150-
-d '[
151-
{
152-
"api_key": "phc_example",
153-
"event": "purchase",
154-
"distinct_id": "user_12345",
155-
"properties": { "amount": 29.99, "product_id": "widget-001" }
215+
-d "{
216+
\"api_key\": \"$HOGFLARE_API_KEY\",
217+
\"event\": \"verify-anon-capture\",
218+
\"distinct_id\": \"$HOGFLARE_ANON_ID\",
219+
\"properties\": {
220+
\"\$set\": { \"initial_referrer\": \"docs\" },
221+
\"\$set_once\": { \"first_seen_source\": \"readme\" }
156222
}
157-
]'
223+
}"
158224
```
159225

226+
Identify the user and link the anonymous ID:
227+
228+
```bash
229+
curl -X POST "$HOGFLARE_URL/identify" \
230+
-H "Content-Type: application/json" \
231+
-d "{
232+
\"api_key\": \"$HOGFLARE_API_KEY\",
233+
\"distinct_id\": \"$HOGFLARE_USER_ID\",
234+
\"properties\": {
235+
\"\$anon_distinct_id\": \"$HOGFLARE_ANON_ID\",
236+
\"\$set\": { \"email\": \"verify@example.com\", \"plan\": \"pro\" },
237+
\"\$set_once\": { \"signup_source\": \"readme\" }
238+
}
239+
}"
240+
```
241+
242+
Send a post-identify capture:
243+
244+
```bash
245+
curl -X POST "$HOGFLARE_URL/capture" \
246+
-H "Content-Type: application/json" \
247+
-d "{
248+
\"api_key\": \"$HOGFLARE_API_KEY\",
249+
\"event\": \"verify-identified-capture\",
250+
\"distinct_id\": \"$HOGFLARE_USER_ID\",
251+
\"properties\": { \"button\": \"verify\" }
252+
}"
253+
```
254+
255+
Wait for the sink roll interval, then query R2 SQL:
256+
257+
```bash
258+
export R2_WAREHOUSE="<account-id>_<bucket-name>"
259+
export WRANGLER_R2_SQL_AUTH_TOKEN="$R2_CATALOG_TOKEN"
260+
261+
bunx wrangler r2 sql query "$R2_WAREHOUSE" \
262+
"select event, distinct_id, person_id, person_properties
263+
from default.hogflare_events
264+
where api_key = '$HOGFLARE_API_KEY'
265+
order by created_at asc"
266+
267+
bunx wrangler r2 sql query "$R2_WAREHOUSE" \
268+
"select operation, canonical_distinct_id, person_id, distinct_ids, merged_properties
269+
from default.hogflare_persons
270+
where api_key = '$HOGFLARE_API_KEY'
271+
order by updated_at asc"
272+
```
273+
274+
Expected result: the three event rows share one `person_id`, and the persons table has `capture`, `identify`, `capture` snapshots. After identify, `distinct_ids` should include both the anonymous and identified IDs.
275+
160276
## HMAC signing (optional)
161277

162278
If `POSTHOG_SIGNING_SECRET` is set, requests must include a valid signature.
@@ -227,6 +343,7 @@ docker compose up --build -d fake-pipeline
227343
```bash
228344
# .env.local (not committed)
229345
CLOUDFLARE_PIPELINE_ENDPOINT=http://127.0.0.1:8088/
346+
CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT=http://127.0.0.1:8088/
230347
CLOUDFLARE_PIPELINE_TIMEOUT_SECS=5
231348
```
232349

@@ -252,8 +369,44 @@ ATTACH '<ACCOUNT_ID>_<BUCKET>' AS iceberg_catalog (
252369
ENDPOINT 'https://catalog.cloudflarestorage.com/<ACCOUNT_ID>/<BUCKET>'
253370
);
254371

255-
SELECT count(*) FROM iceberg_catalog.default.hogflare;
256-
SELECT * FROM iceberg_catalog.default.hogflare LIMIT 5;
372+
SELECT count(*) FROM iceberg_catalog.default.hogflare_events;
373+
SELECT count(*) FROM iceberg_catalog.default.hogflare_persons;
374+
SELECT * FROM iceberg_catalog.default.hogflare_persons LIMIT 5;
375+
```
376+
377+
If you used versioned table names during a migration, substitute those names here.
378+
379+
## Cleanup
380+
381+
Delete Pipelines resources in dependency order: pipelines first, then streams and sinks.
382+
383+
```bash
384+
bunx wrangler pipelines list
385+
bunx wrangler pipelines delete <pipeline-id> --force
386+
387+
bunx wrangler pipelines streams list
388+
bunx wrangler pipelines streams delete <stream-id> --force
389+
390+
bunx wrangler pipelines sinks list
391+
bunx wrangler pipelines sinks delete <sink-id> --force
392+
```
393+
394+
`wrangler r2 sql query` is read-only. To drop an Iceberg table from R2 Data Catalog, use the Iceberg catalog API. One local option is PyIceberg:
395+
396+
```bash
397+
R2_CATALOG_TOKEN="<r2-data-catalog-token>" uv run --with pyiceberg python - <<'PY'
398+
import os
399+
from pyiceberg.catalog.rest import RestCatalog
400+
401+
catalog = RestCatalog(
402+
name="hogflare",
403+
warehouse="<account-id>_<bucket-name>",
404+
uri="https://catalog.cloudflarestorage.com/<account-id>/<bucket-name>",
405+
token=os.environ["R2_CATALOG_TOKEN"],
406+
)
407+
408+
catalog.drop_table(("default", "<table-name>"), purge_requested=True)
409+
PY
257410
```
258411

259412
## PostHog compatibility
@@ -276,7 +429,7 @@ Identify, capture `$set` / `$set_once` / `$unset`, and alias events update a per
276429
- `person_created_at`
277430
- `person_properties`
278431

279-
Person DO state is not written to R2. Only event-level snapshots are stored in the pipeline sink.
432+
The Durable Object is the source of truth for the current person record. When `CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT` is configured, Hogflare also writes append-only person snapshots to the persons pipeline so the state is queryable in Iceberg.
280433

281434
### Groups
282435

@@ -403,3 +556,26 @@ Each row is a `PipelineEvent` with these columns:
403556
| `group_properties` | JSON (by group type) |
404557
| `api_key` | string |
405558
| `extra` | JSON |
559+
560+
## Person shape in R2
561+
562+
Each row is a `PersonPipelineRecord` snapshot with these columns:
563+
564+
| Field | Type / Notes |
565+
| --- | --- |
566+
| `uuid` | string (snapshot UUID v4) |
567+
| `team_id` | int64 (optional) |
568+
| `source` | string |
569+
| `operation` | capture, identify, alias, engage, session_recording |
570+
| `person_id` | string (person UUID) |
571+
| `person_int_id` | int64 |
572+
| `canonical_distinct_id` | string |
573+
| `distinct_ids` | string list / array |
574+
| `created_at` | person creation timestamp |
575+
| `updated_at` | snapshot timestamp |
576+
| `version` | person version |
577+
| `properties` | JSON `$set` properties |
578+
| `properties_set_once` | JSON `$set_once` properties |
579+
| `merged_properties` | JSON merged person properties |
580+
| `api_key` | string |
581+
| `source_event_uuid` | event row UUID that produced the snapshot |
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"fields": [
3+
{ "name": "uuid", "type": "string", "required": true },
4+
{ "name": "team_id", "type": "int64", "required": false },
5+
{ "name": "source", "type": "string", "required": true },
6+
{ "name": "event", "type": "string", "required": true },
7+
{ "name": "distinct_id", "type": "string", "required": true },
8+
{ "name": "timestamp", "type": "timestamp", "required": false },
9+
{ "name": "created_at", "type": "timestamp", "required": true },
10+
{ "name": "properties", "type": "json", "required": false },
11+
{ "name": "context", "type": "json", "required": false },
12+
{ "name": "person_id", "type": "string", "required": false },
13+
{ "name": "person_created_at", "type": "timestamp", "required": false },
14+
{ "name": "person_properties", "type": "json", "required": false },
15+
{ "name": "group0", "type": "string", "required": false },
16+
{ "name": "group1", "type": "string", "required": false },
17+
{ "name": "group2", "type": "string", "required": false },
18+
{ "name": "group3", "type": "string", "required": false },
19+
{ "name": "group4", "type": "string", "required": false },
20+
{ "name": "group_properties", "type": "json", "required": false },
21+
{ "name": "api_key", "type": "string", "required": false },
22+
{ "name": "extra", "type": "json", "required": false }
23+
]
24+
}

0 commit comments

Comments
 (0)