Skip to content

Conversation

@epipav
Copy link
Collaborator

@epipav epipav commented Dec 19, 2025

Note

Introduces Tinybird datasources, materialized/copy pipes, and Kafka sinks to compute and export member and organization segment aggregates (leaf/parent/grandparent), including backfill and daily changed-segment jobs.

  • Data sources:
    • Add cdp_member_segment_aggregates_ds and cdp_organization_segment_aggregates_ds as AggregatingMergeTree tables with aggregate state columns and partition/sort keys.
  • Ingestion/aggregation:
    • Add materialized views cdp_member_segment_aggregates_MV and cdp_organization_segment_aggregates_MV to build aggregate states from snapshot MV sources.
    • Add initial snapshot COPY pipes for both datasets from deduplicated sources.
  • Exports (Kafka sinks):
    • Add on-demand bucketed backfill sinks for members (cdp_member_aggregates_bucket_backfiller_sink.pipe) and organizations (cdp_organization_aggregates_bucket_backfiller_sink.pipe), computing aggregates for leaf/parent/grandparent segments and unioning results.
    • Add scheduled daily sinks for changed segments at leaf/parent/grandparent levels for members and organizations, exporting to memberSegmentsAgg_sink and organizationSegmentsAgg_sink topics.

Written by Cursor Bugbot for commit f3e7aa1. This will update automatically on new commits. Configure here.

@github-actions
Copy link
Contributor

⚠️ Jira Issue Key Missing

Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability.

Example:

  • feat: add user authentication (CM-123)
  • feat: add user authentication (IN-123)

Projects:

  • CM: Community Data Platform
  • IN: Insights

Please add a Jira issue key to your PR title.

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conventional Commits FTW!

@github-actions
Copy link
Contributor

⚠️ Jira Issue Key Missing

Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability.

Example:

  • feat: add user authentication (CM-123)
  • feat: add user authentication (IN-123)

Projects:

  • CM: Community Data Platform
  • IN: Insights

Please add a Jira issue key to your PR title.

1 similar comment
@github-actions
Copy link
Contributor

⚠️ Jira Issue Key Missing

Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability.

Example:

  • feat: add user authentication (CM-123)
  • feat: add user authentication (IN-123)

Projects:

  • CM: Community Data Platform
  • IN: Insights

Please add a Jira issue key to your PR title.

@epipav epipav changed the title Tinybird resources for CDP aggs Tinybird pipes and datasources for CDP aggs Dec 19, 2025
@github-actions
Copy link
Contributor

⚠️ Jira Issue Key Missing

Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability.

Example:

  • feat: add user authentication (CM-123)
  • feat: add user authentication (IN-123)

Projects:

  • CM: Community Data Platform
  • IN: Insights

Please add a Jira issue key to your PR title.

@epipav epipav changed the title Tinybird pipes and datasources for CDP aggs Pipes and datasources for CDP aggs Dec 19, 2025
@github-actions
Copy link
Contributor

⚠️ Jira Issue Key Missing

Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability.

Example:

  • feat: add user authentication (CM-123)
  • feat: add user authentication (IN-123)

Projects:

  • CM: Community Data Platform
  • IN: Insights

Please add a Jira issue key to your PR title.

@epipav epipav changed the title Pipes and datasources for CDP aggs feat: pipes and datasources for CDP aggs (CDP-804) Dec 19, 2025
groupArrayDistinctMerge(activityTypesState) AS activityTypes,
groupArrayDistinctMerge(activeOnState) AS activeOn,
avgMerge(averageSentimentState) AS averageSentiment,
maxMerge(updatedAtState) AS updatedAt
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: References non-existent column updatedAtState in queries

The query references maxMerge(updatedAtState) but this column doesn't exist in cdp_member_segment_aggregates_ds. The datasource schema defines lastActivityUpdatedAtState (an aggregate function) and updatedAt (a regular timestamp), not updatedAtState. This will cause the query to fail at runtime. The same issue occurs in all three nodes: leaf_segment_aggregates, parent_segment_aggregates, and grandparent_segment_aggregates.

Additional Locations (2)

Fix in Cursor Fix in Web

EXPORT_SCHEDULE @on-demand
EXPORT_FORMAT csv
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC memberSegmentsAggs_sink
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Kafka topic name typo sends data to wrong topic

The backfiller sink exports to memberSegmentsAggs_sink (with extra 's') while all other member aggregate sinks export to memberSegmentsAgg_sink. This inconsistency will cause backfill data to be sent to a different Kafka topic than the incremental updates, potentially causing data to be lost or processed incorrectly by downstream consumers.

Fix in Cursor Fix in Web

SQL >
select distinct organizationId
from cdp_organization_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Missing empty organizationId filter in parent segment pipe

The organizations_with_changed_aggs_previous_day node is missing the organizationId <> '' filter that exists in both the equivalent leaf segment pipe (line 17) and grandparent segment pipe (line 6). This inconsistency will cause rows with empty organizationId values to be exported to Kafka when processing parent segments, while they're correctly filtered out in the leaf and grandparent segment sinks.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants