source-posthog: turn persons into an incremental stream#3957
source-posthog: turn persons into an incremental stream#3957nicolaslazo merged 2 commits intomainfrom
Conversation
20e74eb to
b38edd1
Compare
Alex-Bair
left a comment
There was a problem hiding this comment.
Had a few items to address before approving. This comment about adding new projects to the Persons resource state looks like it also applies to the Events stream. I missed that when I reviewed the connector initially, but we'll want to improve that handling soon, either in this PR or in a follow up one.
| project_id: | ||
| anyOf: | ||
| - type: integer | ||
| - type: 'null' | ||
| default: null | ||
| description: The PostHog project this document belongs to | ||
| title: Project Id |
There was a problem hiding this comment.
_meta/project_id is part of the composite key, and we shouldn't allow it to be nullable in order to make it easier to materialize Persons to an SQL destination. If we did have nullable keys, SQL materializations wouldn't be able to materialize Persons out of the box since SQL destinations reject nullable primary keys.
| async for item in _query_hogql( | ||
| Person, | ||
| new_cursor, | ||
| cutoff, | ||
| base_url, | ||
| project_id, | ||
| http, | ||
| log, | ||
| ): | ||
| batch_count += 1 | ||
| doc_count += 1 | ||
| item_cursor = item.get_cursor() | ||
| new_cursor = max(new_cursor, item_cursor) | ||
|
|
||
| while True: | ||
| async for item in _query_hogql( | ||
| Person, project_cursor, None, base_url, project_id, http, log | ||
| ): | ||
| doc_count += 1 | ||
| project_cursor = item.get_cursor() | ||
| yield item | ||
|
|
||
| yield item | ||
| if batch_count < HOGQL_PAGE_SIZE: | ||
| break | ||
|
|
||
| if datetime.now(tz=UTC) > backfill_timeout: | ||
| log.info( | ||
| f"{BACKFILL_TIMEOUT_PERIOD.total_seconds() / 60} " | ||
| + "minutes have elapsed, emitting a checkpoint" | ||
| ) | ||
| break |
There was a problem hiding this comment.
Thinking through an edge case, what happens if there is a long run of records all with the same cursor value? Is it possible that we the following could happen?
- We fetch the first handful of records at cursor
N. - We stop fetching the remaining records at cursor
Nbecause theBACKFILL_TIMEOUT_PERIODhas elapsed. - We yield
Nas the next cursor value. - We reinvoke
backfill_personsand have it start fetching records with cursor values greater thanN. Meaning, we miss the remaining records at cursorNthat would have taken more thanBACKFILL_TIMEOUT_PERIODto fetch.
That looks possible to me, but let me know if you disagree. I've handled the above situation in other connectors by only breaking out of the pagination loop early when both BACKFILL_TIMEOUT_PERIOD has elapsed and the next document's cursor value is greater than the previous document's cursor value. Here's an example from source-klaviyo-native where I implemented that logic:
connectors/source-klaviyo-native/source_klaviyo_native/api/common.py
Lines 299 to 312 in 5bf47b1
There was a problem hiding this comment.
I've already had to tackle a similar issue in source-quickbooks. Like mentioned on our 1:1, I'll try to extract the checkpointing logic to a decorator abstract enough it could become a CDK feature 👍
There was a problem hiding this comment.
That backfill_timeout decorator is interesting! It looks like it may be general enough to use elsewhere as long as the item's cursor value is a datetime that's exposed via the get_cursor method. It may also be worth having a comment or docstring stating that the decorator should only be used when the backfill_fn yields documents in ascending order, just so it's easier for us to remember the specific situations this decorator should be used in. Let's monitor how it does, and if we like it, we can figure out how to abstract it into the CDK somehow.
Some linters complain about the `dict` variant in the PageCursor definition not having any type parameters set.
a30b4a9 to
fec8523
Compare
Alex-Bair
left a comment
There was a problem hiding this comment.
LGTM % small nit of regenerating the schema.yaml files with flowctl raw discover to reflect that project_id is now non-nullable.
Note: We'll need to perform dataflow resets for all production captures' Persons binding due to the change in the key field.
fec8523 to
1e97768
Compare
1e97768 to
400330f
Compare
Description:
PostHog's persons stream is a snapshot type, which is currently filling up storage space in some reactors as it tries to accumulate thousands of documents in memory without checkpointing its progress.
This PR implements secondary cursor support for HogQL queries. This allows us to rely on
last_seen_atfor update timestamps, falling back tocreated_atwhen not available.Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)
Notes for reviewers:
Tested locally through
flowctl preview.