Skip to content

Commit fedc75b

Browse files
committed
feat: add infra for segments metrics
1 parent 294ac7c commit fedc75b

File tree

5 files changed

+235
-0
lines changed

5 files changed

+235
-0
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
DESCRIPTION >
2+
Segment-level aggregate states (one row per segmentId per snapshot).
3+
Stores AggregateFunction states for DISTINCT rollups (uniqCombinedState).
4+
5+
SCHEMA >
6+
`snapshotDate` Date,
7+
`segmentId` String,
8+
`parentId` String,
9+
`grandparentId` String,
10+
`activitiesTotalState` AggregateFunction(sum, UInt64),
11+
`activitiesLast30DaysState` AggregateFunction(sum, UInt64),
12+
`membersUniqState` AggregateFunction(uniqCombined, String),
13+
`membersLast30UniqState` AggregateFunction(uniqCombined, String),
14+
`orgsUniqState` AggregateFunction(uniqCombined, String),
15+
`orgsLast30UniqState` AggregateFunction(uniqCombined, String)
16+
17+
ENGINE AggregatingMergeTree
18+
ENGINE_PARTITION_KEY snapshotDate
19+
ENGINE_SORTING_KEY (snapshotDate, segmentId)
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
DESCRIPTION >
2+
CDP dashboard metrics per segment - PROJECT GROUP level.
3+
Rolls up subproject aggregate states via grandparentId.
4+
5+
NODE projectGroupMetrics
6+
SQL >
7+
SELECT
8+
s.grandparentId AS segmentId,
9+
'projectGroup' AS segmentType,
10+
s.grandparentId AS parentId,
11+
s.grandparentId AS grandparentId,
12+
s.grandparentSlug AS segmentSlug,
13+
s.grandparentSlug AS parentSlug,
14+
s.grandparentSlug AS grandparentSlug,
15+
s.grandparentName AS segmentName,
16+
s.grandparentName AS parentName,
17+
s.grandparentName AS grandparentName,
18+
sumMerge(sa.activitiesTotalState) AS activitiesTotal,
19+
sumMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days,
20+
uniqCombinedMerge(sa.membersUniqState) AS membersTotal,
21+
uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days,
22+
uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal,
23+
uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days
24+
FROM segments AS s
25+
LEFT JOIN
26+
cdp_segment_metrics_agg_states_ds AS sa ON sa.segmentId = s.id AND sa.snapshotDate = today()
27+
WHERE
28+
s.parentSlug IS NOT NULL
29+
AND s.grandparentSlug IS NOT NULL
30+
AND s.parentId IS NOT NULL
31+
AND s.grandparentId IS NOT NULL
32+
AND s.parentId != ''
33+
AND s.grandparentId != ''
34+
GROUP BY s.grandparentId, s.grandparentSlug, s.grandparentName
35+
36+
TYPE SINK
37+
EXPORT_SERVICE kafka
38+
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
39+
EXPORT_SCHEDULE 10 9 * * *
40+
EXPORT_FORMAT json
41+
EXPORT_STRATEGY @new
42+
EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
DESCRIPTION >
2+
CDP dashboard metrics per segment - PROJECT level.
3+
Rolls up subproject aggregate states via parentId.
4+
5+
NODE projectMetrics
6+
SQL >
7+
SELECT
8+
s.parentId AS segmentId,
9+
'project' AS segmentType,
10+
s.parentId AS parentId,
11+
s.grandparentId AS grandparentId,
12+
s.parentSlug AS segmentSlug,
13+
s.parentSlug AS parentSlug,
14+
s.grandparentSlug AS grandparentSlug,
15+
s.parentName AS segmentName,
16+
s.parentName AS parentName,
17+
s.grandparentName AS grandparentName,
18+
sumMerge(sa.activitiesTotalState) AS activitiesTotal,
19+
sumMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days,
20+
uniqCombinedMerge(sa.membersUniqState) AS membersTotal,
21+
uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days,
22+
uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal,
23+
uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days
24+
FROM segments AS s
25+
LEFT JOIN
26+
cdp_segment_metrics_agg_states_ds AS sa ON sa.segmentId = s.id AND sa.snapshotDate = today()
27+
WHERE
28+
s.parentSlug IS NOT NULL
29+
AND s.grandparentSlug IS NOT NULL
30+
AND s.parentId IS NOT NULL
31+
AND s.grandparentId IS NOT NULL
32+
AND s.parentId != ''
33+
AND s.grandparentId != ''
34+
GROUP BY
35+
s.parentId, s.grandparentId, s.parentSlug, s.grandparentSlug, s.parentName, s.grandparentName
36+
37+
TYPE SINK
38+
EXPORT_SERVICE kafka
39+
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
40+
EXPORT_SCHEDULE 5 9 * * *
41+
EXPORT_FORMAT json
42+
EXPORT_STRATEGY @new
43+
EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
DESCRIPTION >
2+
CDP dashboard metrics per segment - SUBPROJECT level.
3+
Finalizes segment-level aggregate states and exports them.
4+
5+
NODE subprojectMetrics
6+
SQL >
7+
SELECT
8+
s.id AS segmentId,
9+
'subproject' AS segmentType,
10+
s.parentId AS parentId,
11+
s.grandparentId AS grandparentId,
12+
s.slug AS segmentSlug,
13+
s.parentSlug AS parentSlug,
14+
s.grandparentSlug AS grandparentSlug,
15+
s.name AS segmentName,
16+
s.parentName AS parentName,
17+
s.grandparentName AS grandparentName,
18+
sumMerge(sa.activitiesTotalState) AS activitiesTotal,
19+
sumMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days,
20+
uniqCombinedMerge(sa.membersUniqState) AS membersTotal,
21+
uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days,
22+
uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal,
23+
uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days
24+
FROM segments AS s
25+
LEFT JOIN
26+
cdp_segment_metrics_agg_states_ds AS sa ON sa.segmentId = s.id AND sa.snapshotDate = today()
27+
WHERE
28+
s.parentSlug IS NOT NULL
29+
AND s.grandparentSlug IS NOT NULL
30+
AND s.parentId IS NOT NULL
31+
AND s.grandparentId IS NOT NULL
32+
AND s.parentId != ''
33+
AND s.grandparentId != ''
34+
GROUP BY
35+
s.id,
36+
s.parentId,
37+
s.grandparentId,
38+
s.slug,
39+
s.parentSlug,
40+
s.grandparentSlug,
41+
s.name,
42+
s.parentName,
43+
s.grandparentName
44+
45+
TYPE SINK
46+
EXPORT_SERVICE kafka
47+
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
48+
EXPORT_SCHEDULE 0 9 * * *
49+
EXPORT_FORMAT json
50+
EXPORT_STRATEGY @new
51+
EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
DESCRIPTION >
2+
Daily batch job that builds segment-level aggregate states from existing
3+
member and organization segment aggregate datasources.
4+
Heavy DISTINCT logic lives here.
5+
6+
NODE memberPerSegment
7+
SQL >
8+
-- Finalize per (segmentId, memberId)
9+
SELECT
10+
segmentId,
11+
memberId,
12+
countMerge(activityCountState) AS activityCount,
13+
maxMerge(lastActiveState) AS lastActive
14+
FROM cdp_member_segment_aggregates_ds
15+
GROUP BY segmentId, memberId
16+
17+
NODE memberSegmentStates
18+
SQL >
19+
-- Build segment-level activity + member DISTINCT states
20+
SELECT
21+
segmentId,
22+
sumState(activityCount) AS activitiesTotalState,
23+
sumStateIf(activityCount, lastActive >= now() - INTERVAL 30 DAY) AS activitiesLast30DaysState,
24+
uniqCombinedState(memberId) AS membersUniqState,
25+
uniqCombinedStateIf(memberId, lastActive >= now() - INTERVAL 30 DAY) AS membersLast30UniqState
26+
FROM memberPerSegment
27+
GROUP BY segmentId
28+
29+
NODE orgPerSegment
30+
SQL >
31+
-- Finalize per (segmentId, organizationId)
32+
SELECT segmentId, organizationId, maxMerge(lastActiveState) AS lastActive
33+
FROM cdp_organization_segment_aggregates_ds
34+
GROUP BY segmentId, organizationId
35+
36+
NODE orgSegmentStates
37+
SQL >
38+
-- Build segment-level organization DISTINCT states
39+
SELECT
40+
segmentId,
41+
uniqCombinedState(organizationId) AS orgsUniqState,
42+
uniqCombinedStateIf(
43+
organizationId, lastActive >= now() - INTERVAL 30 DAY
44+
) AS orgsLast30UniqState
45+
FROM orgPerSegment
46+
GROUP BY segmentId
47+
48+
NODE segmentAggStates
49+
SQL >
50+
-- Attach hierarchy and snapshot date
51+
SELECT
52+
toDate(now()) AS snapshotDate,
53+
s.id AS segmentId,
54+
s.parentId AS parentId,
55+
s.grandparentId AS grandparentId,
56+
ms.activitiesTotalState,
57+
ms.activitiesLast30DaysState,
58+
ms.membersUniqState,
59+
ms.membersLast30UniqState,
60+
os.orgsUniqState,
61+
os.orgsLast30UniqState
62+
FROM
63+
(
64+
SELECT id, parentId, grandparentId
65+
FROM segments
66+
WHERE
67+
parentSlug IS NOT NULL
68+
AND grandparentSlug IS NOT NULL
69+
AND parentId IS NOT NULL
70+
AND grandparentId IS NOT NULL
71+
AND parentId != ''
72+
AND grandparentId != ''
73+
) AS s
74+
LEFT JOIN memberSegmentStates AS ms ON ms.segmentId = s.id
75+
LEFT JOIN orgSegmentStates AS os ON os.segmentId = s.id
76+
77+
TYPE COPY
78+
TARGET_DATASOURCE cdp_segment_metrics_agg_states_ds
79+
COPY_MODE append
80+
COPY_SCHEDULE 0 9 * * *

0 commit comments

Comments
 (0)