Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci-dagster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ jobs:
- 'ee/clickhouse/**'
- 'ee/hogai/**'
# --- Only the products that DAGs actually import from ---
- 'products/analytics_platform/backend/**/*.py'
- 'products/cohorts/backend/**/*.py'
- 'products/data_modeling/backend/**/*.py'
- 'products/data_warehouse/backend/**/*.py'
Expand Down
48 changes: 44 additions & 4 deletions products/web_analytics/dags/eager_web_analytics_precompute.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Eager web analytics precompute — hourly baseline warming.

A single Dagster job that pre-warms the lazy precompute cache for the
Web analytics dashboard's main tile matrix over the trailing 28 days,
Web analytics dashboard's main tile matrix over the trailing 31 days,
for every team in the `WEB_ANALYTICS_LAZY_PRECOMPUTE_TEAM_IDS` setting.

The job is intentionally thin: it enumerates the dashboard's query families
Expand Down Expand Up @@ -50,11 +50,14 @@
"""

import time
import itertools
import threading
from concurrent.futures import ThreadPoolExecutor
from datetime import UTC, datetime, timedelta

from django.conf import settings
from django.db import connections
from django.db.models import Max

import dagster
import structlog
Expand All @@ -71,6 +74,7 @@
from posthog.hogql_queries.query_runner import ExecutionMode, get_query_runner
from posthog.models import Team

from products.analytics_platform.backend.models.preaggregation_job import PreaggregationJob
from products.web_analytics.backend.hogql_queries.web_lazy_precompute_common import is_precompute_enabled_for_team
from products.web_analytics.dags.web_preaggregated_utils import check_for_concurrent_runs

Expand All @@ -88,9 +92,11 @@

# How many teams to warm concurrently. Each team's tiles still run sequentially
# inside `_warm_baseline_for_team`, so this is the number of simultaneous warm
# queries hitting ClickHouse — keep it small so warming never competes with
# user-facing traffic for the per-user query cap.
WARM_TEAM_CONCURRENCY = 5
# queries hitting ClickHouse. 10 sits comfortably inside the warmer user's
# concurrency cap (measured ~0 query rejections at this load) while keeping the
# run from competing with user-facing traffic; raise further only if a full pass
# still can't finish within the job's max_runtime.
WARM_TEAM_CONCURRENCY = 10


# `context.log` (Dagster's DagsterLogManager) is shared across the warm pool's
Expand Down Expand Up @@ -375,6 +381,38 @@ def warm_eager_baseline_op(context: dagster.OpExecutionContext) -> dict[str, int

eligible.append(team)

# Warm the least-recently-computed teams first, so a run cut short by
# `max_runtime` still makes progress on the teams that need it most. Teams
# that have never been warmed (no qualifying `PreaggregationJob` row) sort to
# the front. One indexed aggregate, not per-team.
#
# Scope the freshness signal to READY jobs whose coverage falls inside the
# baseline window this DAG actually warms. `PreaggregationJob` is a shared
# lazy-precompute table (web analytics, marketing analytics, experiments all
# write it) with no product column, so this can't perfectly isolate this
# warmer's own jobs — but restricting to READY + the trailing window drops
# the bulk of the noise (old one-off date ranges, pending/failed/stale rows)
# that would otherwise make a team with a stale baseline look freshly warmed.
window_start = datetime.now(UTC) - timedelta(days=BASELINE_WINDOW_DAYS)
last_computed: dict[int, datetime | None] = dict(
PreaggregationJob.objects.filter(
team_id__in=[t.pk for t in eligible],
status=PreaggregationJob.Status.READY,
time_range_end__gte=window_start,
)
.values("team_id")
.annotate(last=Max("computed_at"))
.values_list("team_id", "last")
)
never_computed = datetime.min.replace(tzinfo=UTC)
eligible.sort(key=lambda t: last_computed.get(t.pk) or never_computed)

# Running progress counter for the pool — `itertools.count.__next__` is
# atomic under the GIL, so it's safe to call from the worker threads. Each
# team's completion log carries `processed/total` as a live progress signal.
total = len(eligible)
progress = itertools.count(1)

def _warm(team: Team) -> tuple[int, int]:
team_started = time.monotonic()
team_warmed = team_failed = 0
Expand All @@ -395,6 +433,8 @@ def _warm(team: Team) -> tuple[int, int]:
team_id=team.pk,
warmed=team_warmed,
failed=team_failed,
processed=next(progress),
total=total,
duration_ms=round((time.monotonic() - team_started) * 1000),
)
return team_warmed, team_failed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from collections.abc import MutableMapping
from contextlib import contextmanager
from datetime import timedelta
from typing import Any

from posthog.test.base import APIBaseTest
from unittest.mock import Mock, patch

from django.test import override_settings
from django.utils import timezone

import dagster
from structlog.testing import capture_logs
Expand All @@ -15,6 +17,7 @@
from posthog.hogql_queries.query_runner import ExecutionMode
from posthog.models import Organization, Team

from products.analytics_platform.backend.models.preaggregation_job import PreaggregationJob
from products.web_analytics.dags.eager_web_analytics_precompute import (
BASELINE_BREAKDOWNS,
BASELINE_WINDOW_DAYS,
Expand All @@ -38,6 +41,24 @@ def _eager_audience(team_ids):
yield


def _make_preagg_job(
team: Team,
*,
computed_at,
status=PreaggregationJob.Status.READY,
time_range_end=None,
) -> PreaggregationJob:
end = time_range_end if time_range_end is not None else timezone.now()
return PreaggregationJob.objects.create(
team=team,
time_range_start=end - timedelta(days=1),
time_range_end=end,
query_hash="a" * 64,
status=status,
computed_at=computed_at,
)


@patch(f"{_EAGER_MODULE}.is_cloud", return_value=True)
class TestResolveEagerAudience:
@override_settings(WEB_ANALYTICS_LAZY_PRECOMPUTE_TEAM_IDS=[2, 7])
Expand Down Expand Up @@ -69,6 +90,68 @@ def _enroll_teams(self, *, count: int) -> list[Team]:
org = Organization.objects.create(name="Audience")
return [Team.objects.create(organization=org, name=f"team-{i}") for i in range(count)]

@patch(f"{_EAGER_MODULE}.WARM_TEAM_CONCURRENCY", 1)
@patch(f"{_EAGER_MODULE}.tag_queries")
@patch(f"{_EAGER_MODULE}.get_query_runner")
def test_warms_least_recently_computed_teams_first(self, get_runner, _tag, _is_cloud):
# Concurrency pinned to 1 so the pool drains `eligible` in order, making
# the staleness ordering observable through the runner call sequence.
never, old, recent = self._enroll_teams(count=3)
now = timezone.now()
_make_preagg_job(recent, computed_at=now)
_make_preagg_job(old, computed_at=now - timedelta(days=2))
# `never` has no PreaggregationJob row — it should warm first.
get_runner.return_value = Mock(run=Mock(return_value=Mock(usedLazyPrecompute=True)))

# Enrol in reverse-staleness order to prove the sort (not the input) drives it.
with _eager_audience([recent.pk, old.pk, never.pk]):
warm_eager_baseline_op(dagster.build_op_context())

seen: list[int] = []
for call in get_runner.call_args_list:
team = call.kwargs.get("team") or call.args[1]
if not seen or seen[-1] != team.pk:
seen.append(team.pk)
assert seen == [never.pk, old.pk, recent.pk]

@patch(f"{_EAGER_MODULE}.WARM_TEAM_CONCURRENCY", 1)
@patch(f"{_EAGER_MODULE}.tag_queries")
@patch(f"{_EAGER_MODULE}.get_query_runner")
def test_staleness_ordering_ignores_out_of_window_and_non_ready_jobs(self, get_runner, _tag, _is_cloud):
# `PreaggregationJob` is shared across products and has no product column, so the
# ordering scopes its freshness signal to READY jobs covering the baseline window.
# A recent-but-out-of-window job, or a recent non-READY job, must NOT make a team
# look freshly warmed — both should still sort ahead of a genuinely warm team.
genuine, noise_window, noise_status = self._enroll_teams(count=3)
now = timezone.now()
_make_preagg_job(genuine, computed_at=now)
_make_preagg_job(noise_window, computed_at=now, time_range_end=now - timedelta(days=BASELINE_WINDOW_DAYS + 5))
_make_preagg_job(noise_status, computed_at=now, status=PreaggregationJob.Status.STALE)
get_runner.return_value = Mock(run=Mock(return_value=Mock(usedLazyPrecompute=True)))

with _eager_audience([genuine.pk, noise_window.pk, noise_status.pk]):
warm_eager_baseline_op(dagster.build_op_context())

seen: list[int] = []
for call in get_runner.call_args_list:
team = call.kwargs.get("team") or call.args[1]
if not seen or seen[-1] != team.pk:
seen.append(team.pk)
# Both noise teams are treated as never-warmed (front); the genuinely warm team is last.
assert seen[-1] == genuine.pk
assert set(seen[:2]) == {noise_window.pk, noise_status.pk}

@patch(f"{_EAGER_MODULE}.tag_queries")
@patch(f"{_EAGER_MODULE}.get_query_runner")
def test_team_logs_carry_processed_total_progress(self, get_runner, _tag, _is_cloud):
get_runner.return_value = Mock(run=Mock(return_value=Mock(usedLazyPrecompute=True)))
teams = self._enroll_teams(count=3)
with _eager_audience([t.pk for t in teams]), capture_logs() as cap_logs:
warm_eager_baseline_op(dagster.build_op_context())
team_logs = [log for log in cap_logs if log.get("event") == "eager_baseline_warming_team"]
assert {log["processed"] for log in team_logs} == {1, 2, 3}
assert all(log["total"] == 3 for log in team_logs)

@patch("products.web_analytics.dags.eager_web_analytics_precompute.tag_queries")
@patch("products.web_analytics.dags.eager_web_analytics_precompute.get_query_runner")
def test_one_team_failure_does_not_poison_other_teams(self, get_runner, tag_queries_mock, _is_cloud):
Expand Down
Loading