Skip to content

Commit f1f2d75

Browse files
authored
Batch triggering timeseries backfills (#1221)
1 parent a727bc0 commit f1f2d75

File tree

4 files changed

+84
-100
lines changed

4 files changed

+84
-100
lines changed

core/commands/repository/interactors/activate_measurements.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@ def execute(
2020
)
2121

2222
dataset, created = Dataset.objects.get_or_create(
23-
name=measurement_type.value,
24-
repository_id=repo.pk,
23+
name=measurement_type.value, repository_id=repo.pk
2524
)
26-
2725
if created:
28-
trigger_backfill(dataset)
26+
trigger_backfill([dataset])
2927

3028
return dataset

core/commands/repository/interactors/tests/test_activate_measurements.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
from asgiref.sync import async_to_sync
66
from django.conf import settings
77
from django.test import TestCase, override_settings
8-
from django.utils import timezone
9-
from freezegun import freeze_time
108
from shared.django_apps.core.tests.factories import (
119
CommitFactory,
1210
OwnerFactory,
@@ -98,7 +96,6 @@ def test_creates_coverage_dataset(self, backfill_dataset):
9896
).exists()
9997

10098
@patch("services.task.TaskService.backfill_dataset")
101-
@freeze_time("2022-01-01T00:00:00")
10299
def test_triggers_task(self, backfill_dataset):
103100
CommitFactory(repository=self.repo, timestamp=datetime(2000, 1, 1, 1, 1, 1))
104101
CommitFactory(repository=self.repo, timestamp=datetime(2021, 12, 31, 1, 1, 1))
@@ -109,8 +106,8 @@ def test_triggers_task(self, backfill_dataset):
109106
).first()
110107
backfill_dataset.assert_called_once_with(
111108
dataset,
112-
start_date=timezone.datetime(2000, 1, 1),
113-
end_date=timezone.datetime(2022, 1, 1),
109+
start_date=datetime(2000, 1, 1, 1, 1, 1),
110+
end_date=datetime(2021, 12, 31, 1, 1, 1),
114111
)
115112

116113
@patch("services.task.TaskService.backfill_dataset")

timeseries/helpers.py

+70-81
Original file line numberDiff line numberDiff line change
@@ -145,33 +145,27 @@ def coverage_measurements(
145145
return aggregate_measurements(queryset).order_by("timestamp_bin")
146146

147147

148-
def trigger_backfill(dataset: Dataset):
148+
def trigger_backfill(datasets: list[Dataset]):
149149
"""
150150
Triggers a backfill for the full timespan of the dataset's repo's commits.
151151
"""
152-
oldest_commit = (
153-
Commit.objects.filter(repository_id=dataset.repository_id)
154-
.order_by("timestamp")
155-
.first()
152+
repo_ids = {d.repository_id for d in datasets}
153+
timeranges = (
154+
Commit.objects.filter(repository_id__in=repo_ids)
155+
.values_list("repository_id")
156+
.annotate(start_date=Min("timestamp"), end_date=Max("timestamp"))
156157
)
157158

158-
newest_commit = (
159-
Commit.objects.filter(repository_id=dataset.repository_id)
160-
.order_by("-timestamp")
161-
.first()
162-
)
163-
164-
if oldest_commit and newest_commit:
165-
# dates to span the entire range of commits
166-
start_date = oldest_commit.timestamp.date()
167-
start_date = datetime.fromordinal(start_date.toordinal())
168-
end_date = newest_commit.timestamp.date() + timedelta(days=1)
169-
end_date = datetime.fromordinal(end_date.toordinal())
159+
timerange_by_repo = {
160+
repo_id: (start_date, end_date) for repo_id, start_date, end_date in timeranges
161+
}
170162

163+
for dataset in datasets:
164+
if dataset.repository_id not in timerange_by_repo:
165+
continue # there are no commits, and thus nothing to backfill
166+
start_date, end_date = timerange_by_repo[dataset.repository_id]
171167
TaskService().backfill_dataset(
172-
dataset,
173-
start_date=start_date,
174-
end_date=end_date,
168+
dataset, start_date=start_date, end_date=end_date
175169
)
176170

177171

@@ -340,42 +334,41 @@ def repository_coverage_measurements_with_fallback(
340334
If those are not available then we trigger a backfill and return computed results
341335
directly from the primary database (much slower to query).
342336
"""
343-
dataset = None
344337
if settings.TIMESERIES_ENABLED:
345338
dataset = Dataset.objects.filter(
346339
name=MeasurementName.COVERAGE.value,
347340
repository_id=repository.pk,
348341
).first()
349342

350-
if settings.TIMESERIES_ENABLED and dataset and dataset.is_backfilled():
351-
# timeseries data is ready
352-
return coverage_measurements(
353-
interval,
354-
start_date=start_date,
355-
end_date=end_date,
356-
owner_id=repository.author_id,
357-
repo_id=repository.pk,
358-
measurable_id=str(repository.pk),
359-
branch=branch or repository.branch,
360-
)
361-
else:
362-
if settings.TIMESERIES_ENABLED and not dataset:
343+
if dataset and dataset.is_backfilled():
344+
# timeseries data is ready
345+
return coverage_measurements(
346+
interval,
347+
start_date=start_date,
348+
end_date=end_date,
349+
owner_id=repository.author_id,
350+
repo_id=repository.pk,
351+
measurable_id=str(repository.pk),
352+
branch=branch or repository.branch,
353+
)
354+
355+
if not dataset:
363356
# we need to backfill
364357
dataset, created = Dataset.objects.get_or_create(
365358
name=MeasurementName.COVERAGE.value,
366359
repository_id=repository.pk,
367360
)
368361
if created:
369-
trigger_backfill(dataset)
370-
371-
# we're still backfilling or timeseries is disabled
372-
return coverage_fallback_query(
373-
interval,
374-
start_date=start_date,
375-
end_date=end_date,
376-
repository_id=repository.pk,
377-
branch=branch or repository.branch,
378-
)
362+
trigger_backfill([dataset])
363+
364+
# we're still backfilling or timeseries is disabled
365+
return coverage_fallback_query(
366+
interval,
367+
start_date=start_date,
368+
end_date=end_date,
369+
repository_id=repository.pk,
370+
branch=branch or repository.branch,
371+
)
379372

380373

381374
@sentry_sdk.trace
@@ -391,48 +384,44 @@ def owner_coverage_measurements_with_fallback(
391384
If those are not available then we trigger a backfill and return computed results
392385
directly from the primary database (much slower to query).
393386
"""
394-
datasets = []
387+
# we can't join across databases so we need to load all this into memory.
388+
# select just the needed columns to keep this manageable
389+
repos = Repository.objects.filter(repoid__in=repo_ids).only("repoid", "branch")
390+
395391
if settings.TIMESERIES_ENABLED:
396392
datasets = Dataset.objects.filter(
397393
name=MeasurementName.COVERAGE.value,
398394
repository_id__in=repo_ids,
399395
)
400-
401-
all_backfilled = len(datasets) == len(repo_ids) and all(
402-
dataset.is_backfilled() for dataset in datasets
403-
)
404-
405-
# we can't join across databases so we need to load all this into memory.
406-
# select just the needed columns to keep this manageable
407-
repos = Repository.objects.filter(repoid__in=repo_ids).only("repoid", "branch")
408-
409-
if settings.TIMESERIES_ENABLED and all_backfilled:
410-
# timeseries data is ready
411-
return coverage_measurements(
412-
interval,
413-
start_date=start_date,
414-
end_date=end_date,
415-
owner_id=owner.pk,
416-
repos=repos,
396+
all_backfilled = len(datasets) == len(repo_ids) and all(
397+
dataset.is_backfilled() for dataset in datasets
417398
)
418-
else:
419-
if settings.TIMESERIES_ENABLED:
420-
# we need to backfill some datasets
421-
dataset_repo_ids = {dataset.repository_id for dataset in datasets}
422-
missing_dataset_repo_ids = set(repo_ids) - dataset_repo_ids
423-
created_datasets = Dataset.objects.bulk_create(
424-
[
425-
Dataset(name=MeasurementName.COVERAGE.value, repository_id=repo_id)
426-
for repo_id in missing_dataset_repo_ids
427-
]
399+
400+
if all_backfilled:
401+
# timeseries data is ready
402+
return coverage_measurements(
403+
interval,
404+
start_date=start_date,
405+
end_date=end_date,
406+
owner_id=owner.pk,
407+
repos=repos,
428408
)
429-
for dataset in created_datasets:
430-
trigger_backfill(dataset)
431-
432-
# we're still backfilling or timeseries is disabled
433-
return coverage_fallback_query(
434-
interval,
435-
start_date=start_date,
436-
end_date=end_date,
437-
repos=repos,
409+
410+
# we need to backfill some datasets
411+
dataset_repo_ids = {dataset.repository_id for dataset in datasets}
412+
missing_dataset_repo_ids = set(repo_ids) - dataset_repo_ids
413+
created_datasets = Dataset.objects.bulk_create(
414+
[
415+
Dataset(name=MeasurementName.COVERAGE.value, repository_id=repo_id)
416+
for repo_id in missing_dataset_repo_ids
417+
]
438418
)
419+
trigger_backfill(created_datasets)
420+
421+
# we're still backfilling or timeseries is disabled
422+
return coverage_fallback_query(
423+
interval,
424+
start_date=start_date,
425+
end_date=end_date,
426+
repos=repos,
427+
)

timeseries/tests/test_helpers.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime, timezone
2-
from unittest.mock import call, patch
2+
from unittest.mock import patch
33

44
import pytest
55
from django.conf import settings
@@ -731,7 +731,7 @@ def test_no_dataset(self, trigger_backfill):
731731
repository_id=self.repo.pk,
732732
).first()
733733
assert dataset
734-
trigger_backfill.assert_called_once_with(dataset)
734+
trigger_backfill.assert_called_once_with([dataset])
735735

736736
@patch("timeseries.models.Dataset.is_backfilled")
737737
@patch("timeseries.helpers.trigger_backfill")
@@ -766,7 +766,7 @@ def test_backfill_trigger_on_dataset_creation(
766766

767767
# Ensure trigger_backfill was called when a new Dataset was created
768768
mock_trigger_backfill.assert_called_once_with(
769-
mock_get_or_create.return_value[0]
769+
[mock_get_or_create.return_value[0]]
770770
)
771771

772772
@patch("timeseries.models.Dataset.is_backfilled")
@@ -1172,14 +1172,14 @@ def test_no_dataset(self, trigger_backfill):
11721172
},
11731173
]
11741174

1175-
datasets = Dataset.objects.filter(
1176-
name=MeasurementName.COVERAGE.value,
1177-
repository_id__in=[self.repo1.pk, self.repo2.pk],
1178-
)
1179-
assert datasets.count() == 2
1180-
trigger_backfill.assert_has_calls(
1181-
[call(datasets[0]), call(datasets[1])], any_order=True
1175+
datasets = list(
1176+
Dataset.objects.filter(
1177+
name=MeasurementName.COVERAGE.value,
1178+
repository_id__in=[self.repo1.pk, self.repo2.pk],
1179+
)
11821180
)
1181+
assert len(datasets) == 2
1182+
trigger_backfill.assert_called_once_with(datasets)
11831183

11841184
res = owner_coverage_measurements_with_fallback(
11851185
owner=self.owner,

0 commit comments

Comments
 (0)