Skip to content

Commit b38edd1

Browse files
committed
source-posthog: make persons an incremental stream
1 parent 099951c commit b38edd1

File tree

8 files changed

+286
-85
lines changed

8 files changed

+286
-85
lines changed

source-posthog/acmeCo/Persons.schema.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ $defs:
1515
description: Row ID of the Document, counting up from zero, or -1 if not known
1616
title: Row Id
1717
type: integer
18+
project_id:
19+
anyOf:
20+
- type: integer
21+
- type: 'null'
22+
default: null
23+
description: The PostHog project this document belongs to
24+
title: Project Id
1825
title: Meta
1926
type: object
2027
additionalProperties: true
@@ -30,6 +37,13 @@ properties:
3037
format: date-time
3138
title: Created At
3239
type: string
40+
last_seen_at:
41+
anyOf:
42+
- format: date-time
43+
type: string
44+
- type: 'null'
45+
default: null
46+
title: Last Seen At
3347
required:
3448
- id
3549
- created_at

source-posthog/acmeCo/flow.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ collections:
2424
acmeCo/Persons:
2525
schema: Persons.schema.yaml
2626
key:
27-
- /_meta/row_id
27+
- /_meta/project_id
28+
- /id
2829
acmeCo/Projects:
2930
schema: Projects.schema.yaml
3031
key:

source-posthog/source_posthog/api.py

Lines changed: 100 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
PostHog API client functions.
33
"""
44

5-
import functools
65
from collections.abc import AsyncGenerator
76
from datetime import UTC, datetime, timedelta
87
from logging import Logger
@@ -30,6 +29,7 @@
3029
)
3130

3231
HOGQL_PAGE_SIZE = 50_000
32+
BACKFILL_TIMEOUT_PERIOD = timedelta(minutes=5)
3333

3434

3535
# Cache for project IDs per organization (avoids re-fetching on retry).
@@ -160,9 +160,21 @@ async def _query_hogql[T: HogQLEntity[str] | HogQLEntity[int]](
160160
http: HTTPSession,
161161
log: Logger,
162162
) -> AsyncGenerator[T]:
163+
log.debug(
164+
"Querying HogQL",
165+
{
166+
"table": model.table_name,
167+
"project_id": project_id,
168+
"start": start_date,
169+
"end": end_date,
170+
},
171+
)
172+
163173
url = model.get_api_endpoint_url(base_url, project_id)
164174
column_names = await _get_hogql_columns(model, base_url, project_id, http, log)
165175

176+
coalesced_cursor_fields = f'COALESCE({",".join(model.cursor_columns)})'
177+
166178
serialized_start_date = start_date.astimezone(UTC).replace(tzinfo=None).isoformat()
167179
serialized_end_date = (
168180
end_date.astimezone(UTC).replace(tzinfo=None).isoformat()
@@ -171,11 +183,11 @@ async def _query_hogql[T: HogQLEntity[str] | HogQLEntity[int]](
171183
)
172184

173185
start_date_clause = (
174-
f"WHERE {model.cursor_column} > "
186+
f"WHERE {coalesced_cursor_fields} > "
175187
+ f"toDateTime64('{serialized_start_date}', 6, 'UTC') "
176188
)
177189
end_date_clause = (
178-
f"AND {model.cursor_column} <= toDateTime64('{serialized_end_date}', 6, 'UTC') "
190+
f"AND {coalesced_cursor_fields} <= toDateTime64('{serialized_end_date}', 6, 'UTC') "
179191
if end_date is not None
180192
else ""
181193
)
@@ -187,7 +199,7 @@ async def _query_hogql[T: HogQLEntity[str] | HogQLEntity[int]](
187199
+ f"FROM {model.table_name} "
188200
+ start_date_clause
189201
+ end_date_clause
190-
+ f"ORDER BY {model.cursor_column} DESC "
202+
+ f"ORDER BY {coalesced_cursor_fields} ASC "
191203
+ f"LIMIT {HOGQL_PAGE_SIZE}",
192204
},
193205
}
@@ -299,7 +311,6 @@ async def backfill_project_events(
299311
return
300312

301313
base_url = config.advanced.base_url
302-
ctx = ProjectIdValidationContext(project_id=project_id)
303314
new_cursor = start_date
304315
doc_count = 0
305316

@@ -343,7 +354,6 @@ async def fetch_project_events(
343354
assert isinstance(cursor, datetime)
344355

345356
base_url = config.advanced.base_url
346-
ctx = ProjectIdValidationContext(project_id=project_id)
347357
now = datetime.now(tz=UTC)
348358
upper_bound = now - horizon if horizon else None
349359

@@ -380,34 +390,98 @@ async def fetch_project_events(
380390
yield new_cursor
381391

382392

383-
async def snapshot_persons(
393+
async def backfill_persons(
384394
http: HTTPSession,
385395
config: EndpointConfig,
396+
project_id: int,
386397
log: Logger,
387-
) -> AsyncGenerator[Person, None]:
398+
page: PageCursor | None,
399+
cutoff: LogCursor,
400+
) -> AsyncGenerator[Person | PageCursor, None]:
401+
assert isinstance(page, str | None)
402+
assert isinstance(cutoff, datetime)
403+
404+
start_date = datetime.fromisoformat(page) if page is not None else config.start_date
405+
406+
if start_date >= cutoff:
407+
return
408+
388409
base_url = config.advanced.base_url
389-
project_ids = await fetch_project_ids(http, config, log)
410+
new_cursor = start_date
411+
doc_count = 0
412+
backfill_timeout = datetime.now(tz=UTC) + BACKFILL_TIMEOUT_PERIOD
390413

391-
total_doc_count = 0
414+
while True:
415+
batch_count = 0
392416

393-
for project_id in project_ids:
394-
doc_count = 0
395-
project_cursor = datetime.min.replace(tzinfo=UTC)
417+
async for item in _query_hogql(
418+
Person,
419+
new_cursor,
420+
cutoff,
421+
base_url,
422+
project_id,
423+
http,
424+
log,
425+
):
426+
batch_count += 1
427+
doc_count += 1
428+
item_cursor = item.get_cursor()
429+
new_cursor = max(new_cursor, item_cursor)
396430

397-
while True:
398-
async for item in _query_hogql(
399-
Person, project_cursor, None, base_url, project_id, http, log
400-
):
401-
doc_count += 1
402-
project_cursor = item.get_cursor()
431+
yield item
403432

404-
yield item
433+
if batch_count < HOGQL_PAGE_SIZE:
434+
break
435+
436+
if datetime.now(tz=UTC) > backfill_timeout:
437+
log.info(
438+
f"{BACKFILL_TIMEOUT_PERIOD.total_seconds() / 60} "
439+
+ "minutes have elapsed, emitting a checkpoint"
440+
)
441+
break
442+
443+
log.info(f"Backfilled {doc_count} persons from project {project_id}")
444+
445+
if new_cursor > start_date:
446+
yield new_cursor.isoformat()
447+
448+
449+
async def fetch_persons(
450+
http: HTTPSession,
451+
config: EndpointConfig,
452+
project_id: int,
453+
log: Logger,
454+
cursor: LogCursor,
455+
) -> AsyncGenerator[Person | LogCursor, None]:
456+
assert isinstance(cursor, datetime)
457+
458+
base_url = config.advanced.base_url
459+
new_cursor = cursor
460+
doc_count = 0
461+
462+
while True:
463+
batch_count = 0
405464

406-
# If we got fewer rows than page_size, we've reached the end
407-
if doc_count < HOGQL_PAGE_SIZE:
408-
break
465+
async for item in _query_hogql(
466+
Person,
467+
new_cursor,
468+
None,
469+
base_url,
470+
project_id,
471+
http,
472+
log,
473+
):
474+
batch_count += 1
475+
doc_count += 1
476+
item_cursor = item.get_cursor()
477+
new_cursor = max(new_cursor, item_cursor)
409478

410-
log.info(f"Fetched {doc_count} persons from project {project_id}")
411-
total_doc_count += doc_count
479+
yield item
412480

413-
log.info(f"Fetched {total_doc_count} total persons across all projects")
481+
if batch_count < HOGQL_PAGE_SIZE:
482+
break
483+
484+
log.info(f"Fetched {doc_count} persons from project {project_id}")
485+
486+
if new_cursor > cursor:
487+
yield new_cursor

source-posthog/source_posthog/models.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ class HogQLEntity[T: PostHogPrimaryKey](BasePostHogEntity[T], metaclass=ABCMeta)
164164
"""Entity fetched via HogQL Query API."""
165165

166166
table_name: ClassVar[str]
167-
cursor_column: ClassVar[str]
167+
cursor_columns: ClassVar[list[str]]
168168

169169
@classmethod
170170
def get_api_endpoint_url(cls, base_url: str, project_id: int) -> str:
@@ -288,7 +288,7 @@ class Event(HogQLEntity[str]):
288288

289289
resource_name: ClassVar[str] = "Events"
290290
table_name: ClassVar[str] = "events"
291-
cursor_column: ClassVar[str] = "timestamp"
291+
cursor_columns: ClassVar[list[str]] = ["timestamp"]
292292

293293
class Meta(BaseDocument.Meta):
294294
model_config = ConfigDict(validate_assignment=True)
@@ -327,13 +327,38 @@ class Person(HogQLEntity[str]):
327327

328328
resource_name: ClassVar[str] = "Persons"
329329
table_name: ClassVar[str] = "persons"
330-
cursor_column: ClassVar[str] = "created_at"
330+
cursor_columns: ClassVar[list[str]] = ["last_seen_at", "created_at"]
331+
332+
class Meta(BaseDocument.Meta):
333+
model_config = ConfigDict(validate_assignment=True)
334+
project_id: int | None = Field(
335+
default=None,
336+
description="The PostHog project this document belongs to",
337+
)
338+
339+
meta_: Meta = Field( # pyright: ignore[reportIncompatibleVariableOverride]
340+
default_factory=lambda: Person.Meta(op="u"),
341+
alias="_meta",
342+
description="Document metadata",
343+
)
331344

332345
created_at: AwareDatetime
346+
last_seen_at: AwareDatetime | None = None
347+
348+
@model_validator(mode="before")
349+
@classmethod
350+
def _inject_project_id_from_context(cls, data: Any, info: ValidationInfo) -> Any:
351+
if not isinstance(data, dict):
352+
return data
353+
if info.context and isinstance(info.context, ProjectIdValidationContext):
354+
meta = data.get("_meta") or {}
355+
meta["project_id"] = info.context.project_id
356+
data["_meta"] = meta
357+
return data
333358

334359
@override
335360
def get_cursor(self) -> AwareDatetime:
336-
return self.created_at
361+
return self.last_seen_at or self.created_at
337362

338363

339364
# =============================================================================

0 commit comments

Comments
 (0)