Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 94 additions & 5 deletions ee/clickhouse/materialized_columns/analyze.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from collections import defaultdict
from datetime import timedelta
from typing import Optional
from typing import Optional, cast

import structlog

from posthog.clickhouse.client import sync_execute
from posthog.clickhouse.property_groups import property_groups
from posthog.models.property import PropertyName, TableColumn, TableWithProperties
from posthog.settings import CLICKHOUSE_CLUSTER

Expand All @@ -26,7 +27,14 @@
logger = structlog.get_logger(__name__)


def _analyze(since_hours_ago: int, min_query_time: int, team_id: Optional[int] = None) -> list[Suggestion]:
def _analyze(
since_hours_ago: int,
min_query_time: int,
team_id: Optional[int] = None,
*,
min_bytes_read: int = 20 * 1000 * 1000 * 1000,
min_read_rows: int = 5_000_000,
) -> list[Suggestion]:
"Finds columns that should be materialized"

raw_queries = sync_execute(
Expand All @@ -37,8 +45,8 @@ def _analyze(since_hours_ago: int, min_query_time: int, team_id: Optional[int] =
159, -- TIMEOUT EXCEEDED
160, -- TOO SLOW (estimated query execution time)
) as exception_codes,
20 * 1000 * 1000 * 1000 as min_bytes_read,
5000000 as min_read_rows
{min_bytes_read} as min_bytes_read,
{min_read_rows} as min_read_rows
SELECT
arrayJoin(
extractAll(query, 'JSONExtract[a-zA-Z0-9]*?\\((?:[a-zA-Z0-9\\`_-]+\\.)?(.*?), .*?\\)')
Expand Down Expand Up @@ -80,10 +88,91 @@ def _analyze(since_hours_ago: int, min_query_time: int, team_id: Optional[int] =
min_query_time=min_query_time,
team_id_filter=f"and JSONExtractInt(log_comment, 'team_id') = {team_id}" if team_id else "",
cluster=CLICKHOUSE_CLUSTER,
min_bytes_read=min_bytes_read,
min_read_rows=min_read_rows,
),
)

return [("events", table_column, property_name) for (table_column, property_name) in raw_queries]
suggestions: list[Suggestion] = [
("events", table_column, property_name) for (table_column, property_name) in raw_queries
]

# With property groups enabled, the printer reads unmaterialized properties through map columns
# (e.g. properties_group_custom['foo']) instead of JSONExtract, so those reads never match the regex
# above even when a dedicated materialized column would cut the scan by orders of magnitude. Run the
# same gating over property group map accesses and translate each group column back to its source
# column. Group columns are restricted to a known alternation so unrelated map accesses never match.
group_columns_to_source = property_groups.get_group_columns_to_source_columns("events")
if group_columns_to_source:
group_column_alternation = "|".join(sorted(group_columns_to_source))
group_column_prefilter = " OR ".join(
f"query LIKE '%{group_column}[%'" for group_column in sorted(group_columns_to_source)
)
raw_group_queries = sync_execute(
"""
WITH
{min_query_time} as slow_query_minimum,
(
159, -- TIMEOUT EXCEEDED
160, -- TOO SLOW (estimated query execution time)
) as exception_codes,
{min_bytes_read} as min_bytes_read,
{min_read_rows} as min_read_rows
SELECT
group_access[1] as column,
group_access[2] as prop_to_materialize
FROM
(
SELECT
arrayJoin(
extractAllGroups(query, '({group_column_alternation})\\[\\'([a-zA-Z0-9_\\-\\.\\$\\/\\ ]*?)\\'\\]')
) as group_access,
exception_code,
query_duration_ms
FROM
clusterAllReplicas({cluster}, system, query_log)
WHERE
query_start_time > now() - toIntervalHour({since})
and ({group_column_prefilter})
and type > 1
and is_initial_query
and JSONExtractString(log_comment, 'access_method') != 'personal_api_key' -- API requests failing is less painful than queries in the interface
and JSONExtractString(log_comment, 'kind') != 'celery'
and JSONExtractInt(log_comment, 'team_id') != 0
and query not like '%person_distinct_id2%' -- Old style person properties that are joined, no need to optimize those queries
and read_bytes > min_bytes_read
and (exception_code IN exception_codes OR query_duration_ms > slow_query_minimum)
and read_rows > min_read_rows
{team_id_filter}
)
GROUP BY
1, 2
HAVING
countIf(exception_code IN exception_codes) > 0 OR countIf(query_duration_ms > slow_query_minimum) > 9
ORDER BY
countIf(exception_code IN exception_codes) DESC,
countIf(query_duration_ms > slow_query_minimum) DESC
LIMIT 100 -- Make sure we don't add 100s of columns in one run
""".format(
since=since_hours_ago,
min_query_time=min_query_time,
team_id_filter=f"and JSONExtractInt(log_comment, 'team_id') = {team_id}" if team_id else "",
cluster=CLICKHOUSE_CLUSTER,
group_column_alternation=group_column_alternation,
group_column_prefilter=group_column_prefilter,
min_bytes_read=min_bytes_read,
min_read_rows=min_read_rows,
),
)
Comment on lines +111 to +166

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Duplicated SQL structure across both query passes

The group-query block repeats the CTE definitions (slow_query_minimum, exception_codes, min_bytes_read, min_read_rows), the six WHERE filters that follow the prefilter, the HAVING clause (including the hardcoded > 9), and the ORDER BY / LIMIT. If a threshold or exception code needs updating — e.g. adding a third exception code, tightening the HAVING floor — it must be changed in two places. Extracting the shared gating into a CTE or a helper that emits a parameterised subquery fragment would satisfy OnceAndOnlyOnce and keep future changes localised.

Prompt To Fix With AI
This is a comment left during a code review.
Path: ee/clickhouse/materialized_columns/analyze.py
Line: 109-164

Comment:
**Duplicated SQL structure across both query passes**

The group-query block repeats the CTE definitions (`slow_query_minimum`, `exception_codes`, `min_bytes_read`, `min_read_rows`), the six WHERE filters that follow the prefilter, the HAVING clause (including the hardcoded `> 9`), and the ORDER BY / LIMIT. If a threshold or exception code needs updating — e.g. adding a third exception code, tightening the HAVING floor — it must be changed in two places. Extracting the shared gating into a CTE or a helper that emits a parameterised subquery fragment would satisfy OnceAndOnlyOnce and keep future changes localised.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


seen = set(suggestions)
for group_column, property_name in raw_group_queries:
suggestion: Suggestion = ("events", cast(TableColumn, group_columns_to_source[group_column]), property_name)
if suggestion not in seen:
seen.add(suggestion)
suggestions.append(suggestion)

return suggestions


def materialize_properties_task(
Expand Down
47 changes: 45 additions & 2 deletions ee/clickhouse/materialized_columns/test/test_analyze.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,56 @@
import pytest
from posthog.test.base import BaseTest, ClickhouseTestMixin
from posthog.test.base import BaseTest, ClickhouseTestMixin, _create_event, flush_persons_and_events
from unittest.mock import call, patch

from posthog.clickhouse.client import sync_execute
from posthog.clickhouse.property_groups import property_groups
from posthog.clickhouse.query_tagging import tags_context

from ee.clickhouse.materialized_columns.analyze import materialize_properties_task
from ee.clickhouse.materialized_columns.analyze import _analyze, materialize_properties_task


class TestMaterializedColumnsAnalyze(ClickhouseTestMixin, BaseTest):
def test_property_group_reads_suggest_materialization(self):
# Real queries against events whose log entries read via property group map columns. Inserting
# synthetic rows into system.query_log is silently ignored on current ClickHouse versions, so the
# analyzer is exercised against genuine query log entries with the gating thresholds opened up.
# Every probed key must exist on at least one event: the map-key bloom filter otherwise prunes the
# scan to zero rows read, which fails the analyzer's read_rows/read_bytes gates even when opened to 0.
_create_event(
team=self.team,
distinct_id="d1",
event="e",
properties={"materialize_me_group": "x", "mat_group_ternary": "y2", "$feature/my-flag": "true"},
person_properties={"mat_person_group": "z"},
)
flush_persons_and_events()

group_read_queries = [
f"SELECT count() FROM events WHERE team_id = {self.team.pk} AND properties_group_custom['materialize_me_group'] = 'x'",
f"SELECT count() FROM events WHERE team_id = {self.team.pk} AND if(has(properties_group_custom, 'mat_group_ternary'), properties_group_custom['mat_group_ternary'], NULL) != 'y'",
f"SELECT count() FROM events WHERE team_id = {self.team.pk} AND person_properties_map_custom['mat_person_group'] = 'z'",
f"SELECT count() FROM events WHERE team_id = {self.team.pk} AND properties_group_feature_flags['$feature/my-flag'] = 'true'",
]
with tags_context(team_id=self.team.pk):
for query in group_read_queries:
for _ in range(10):
sync_execute(query)
sync_execute("SYSTEM FLUSH LOGS")

suggestions = set(
_analyze(since_hours_ago=1, min_query_time=-1, team_id=self.team.pk, min_bytes_read=0, min_read_rows=0)
)

assert ("events", "properties", "materialize_me_group") in suggestions
assert ("events", "properties", "mat_group_ternary") in suggestions
assert ("events", "person_properties", "mat_person_group") in suggestions
assert ("events", "properties", "$feature/my-flag") in suggestions
Comment on lines +13 to +47

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Test covers four cases but is not parameterised

test_property_group_reads_suggest_materialization exercises four distinct property group types (custom, ternary form of custom, person_properties, feature_flags) in a single assertion block. Following the team's preference for parameterised tests, each case (query pattern + expected source_column + expected property_name) could be a separate parameter. This would let each case fail independently, simplify debugging, and make it easier to add new group types later without growing one monolithic test.

Prompt To Fix With AI
This is a comment left during a code review.
Path: ee/clickhouse/materialized_columns/test/test_analyze.py
Line: 13-47

Comment:
**Test covers four cases but is not parameterised**

`test_property_group_reads_suggest_materialization` exercises four distinct property group types (`custom`, ternary form of `custom`, `person_properties`, `feature_flags`) in a single assertion block. Following the team's preference for parameterised tests, each case (query pattern + expected `source_column` + expected `property_name`) could be a separate parameter. This would let each case fail independently, simplify debugging, and make it easier to add new group types later without growing one monolithic test.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


def test_group_columns_to_source_columns(self):
mapping = property_groups.get_group_columns_to_source_columns("events")
assert mapping["properties_group_custom"] == "properties"
assert mapping["person_properties_map_custom"] == "person_properties"

@pytest.mark.skip(reason="Test is failing for some reason")
@patch("ee.clickhouse.materialized_columns.analyze.materialize")
@patch("ee.clickhouse.materialized_columns.analyze.backfill_materialized_columns")
Expand Down
8 changes: 8 additions & 0 deletions posthog/clickhouse/property_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ def get_property_group_columns(
if group_definition.contains(property_key):
yield group_definition.get_column_name(source_column, group_name)

def get_group_columns_to_source_columns(self, table: TableName) -> dict[str, PropertySourceColumnName]:
"""Map each of the table's property group column names to the source column its keys come from."""
return {
group_definition.get_column_name(source_column, group_name): source_column
for source_column, column_groups in self.__groups.get(table, {}).items()
for group_name, group_definition in column_groups.items()
}

def get_create_table_pieces(self, table: TableName) -> Iterable[str]:
"""
Returns an iterable of SQL DDL chunks that can be used to define all property groups for the provided table as
Expand Down
Loading