Skip to content

Commit ae2fc62

Browse files
wip(ingester): aggregation tables
1 parent ca5d24d commit ae2fc62

File tree

4 files changed

+409
-1
lines changed

4 files changed

+409
-1
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
from datetime import datetime
2+
import math
3+
4+
from django.db import connection
5+
6+
from kernelCI_app.constants.general import MAESTRO_DUMMY_BUILD_PREFIX
7+
from kernelCI_app.constants.ingester import INGEST_BATCH_SIZE
8+
from kernelCI_app.models import (
9+
BuildStatusByHardware,
10+
Builds,
11+
NewBuild,
12+
NewTest,
13+
Tests,
14+
)
15+
from kernelCI_app.utils import is_boot
16+
17+
18+
def ceil_to_next_half_hour(dt_input: str) -> int:
19+
half_hour_in_seconds = 1800
20+
timestamp = datetime.fromisoformat(dt_input).timestamp()
21+
return math.ceil(timestamp / half_hour_in_seconds) * half_hour_in_seconds
22+
23+
24+
def convert_test(t: Tests) -> NewTest:
25+
is_boot_test = is_boot(t.path)
26+
start_time = ceil_to_next_half_hour(t.start_time)
27+
return NewTest(
28+
test_id=t.id,
29+
build_id=t.build_id,
30+
test_origin=t.origin,
31+
test_platform=t.environment_misc.get("platform"),
32+
test_compatible=t.environment_compatible,
33+
status=t.status,
34+
is_boot=is_boot_test,
35+
start_time=start_time,
36+
)
37+
38+
39+
def convert_to_build_status_by_hardware(test: NewTest) -> BuildStatusByHardware:
40+
return BuildStatusByHardware(
41+
hardware_origin=test.test_origin,
42+
hardware_platform=test.test_platform,
43+
build_id=test.build_id,
44+
)
45+
46+
47+
def prepare_build_status_by_hardware(
48+
tests: list[NewTest],
49+
) -> list[BuildStatusByHardware]:
50+
return [convert_to_build_status_by_hardware(test) for test in tests]
51+
52+
53+
def aggregate_hardware_status_data(tests: list[NewTest]) -> dict:
54+
aggregated_data = {}
55+
56+
for test in tests:
57+
pass_count = 1 if test.status == "PASS" else 0
58+
failed_count = 1 if test.status == "FAIL" else 0
59+
inc_count = 1 if test.status not in ("PASS", "FAIL") else 0
60+
61+
key = (test.test_origin, test.test_platform, test.start_time)
62+
63+
if key not in aggregated_data:
64+
aggregated_data[key] = {
65+
"hardware_origin": test.test_origin,
66+
"hardware_platform": test.test_platform,
67+
"date": test.start_time,
68+
"compatibles": test.test_compatible,
69+
"build_pass": 0,
70+
"build_failed": 0,
71+
"build_inc": 0,
72+
"boot_pass": 0,
73+
"boot_failed": 0,
74+
"boot_inc": 0,
75+
"test_pass": 0,
76+
"test_failed": 0,
77+
"test_inc": 0,
78+
}
79+
80+
if aggregated_data[key]["compatibles"] is None and test.test_compatible:
81+
aggregated_data[key]["compatibles"] = test.test_compatible
82+
83+
if test.is_boot:
84+
aggregated_data[key]["boot_pass"] += pass_count
85+
aggregated_data[key]["boot_failed"] += failed_count
86+
aggregated_data[key]["boot_inc"] += inc_count
87+
else:
88+
aggregated_data[key]["test_pass"] += pass_count
89+
aggregated_data[key]["test_failed"] += failed_count
90+
aggregated_data[key]["test_inc"] += inc_count
91+
92+
return aggregated_data
93+
94+
95+
def convert_build(b: Builds) -> NewBuild:
96+
return NewBuild(
97+
build_id=b.id,
98+
checkout_id=b.checkout_id,
99+
build_origin=b.origin,
100+
status=b.status,
101+
)
102+
103+
104+
def aggregate_builds_status(builds_instances: list[Builds]) -> None:
105+
builds_filtered = (
106+
b for b in builds_instances if not b.id.startswith(MAESTRO_DUMMY_BUILD_PREFIX)
107+
)
108+
109+
builds_to_insert = (convert_build(b) for b in builds_filtered)
110+
111+
NewBuild.objects.bulk_create(
112+
builds_to_insert,
113+
batch_size=INGEST_BATCH_SIZE,
114+
ignore_conflicts=True,
115+
)
116+
117+
118+
def aggregate_tests_status(tests_instances: list[Tests]) -> None:
119+
tests_filtered = (
120+
t
121+
for t in tests_instances
122+
if t.environment_misc and t.environment_misc.get("platform")
123+
)
124+
tests_to_insert = (convert_test(t) for t in tests_filtered)
125+
126+
with connection.cursor() as cursor:
127+
tests_created = NewTest.objects.bulk_create(
128+
tests_to_insert,
129+
batch_size=INGEST_BATCH_SIZE,
130+
ignore_conflicts=True,
131+
)
132+
133+
build_status_by_hardware = prepare_build_status_by_hardware(tests_created)
134+
BuildStatusByHardware.objects.bulk_create(
135+
build_status_by_hardware,
136+
batch_size=INGEST_BATCH_SIZE,
137+
ignore_conflicts=True,
138+
)
139+
140+
aggregated_data = aggregate_hardware_status_data(tests_created)
141+
142+
if aggregated_data:
143+
insert_query = """
144+
INSERT INTO hardware_status (
145+
hardware_origin, hardware_platform, date, compatibles,
146+
build_pass, build_failed, build_inc,
147+
boot_pass, boot_failed, boot_inc,
148+
test_pass, test_failed, test_inc
149+
)
150+
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
151+
ON CONFLICT (hardware_origin, hardware_platform, date)
152+
DO UPDATE SET
153+
compatibles = COALESCE(hardware_status.compatibles, EXCLUDED.compatibles),
154+
boot_pass = hardware_status.boot_pass + EXCLUDED.boot_pass,
155+
boot_failed = hardware_status.boot_failed + EXCLUDED.boot_failed,
156+
boot_inc = hardware_status.boot_inc + EXCLUDED.boot_inc,
157+
test_pass = hardware_status.test_pass + EXCLUDED.test_pass,
158+
test_failed = hardware_status.test_failed + EXCLUDED.test_failed,
159+
test_inc = hardware_status.test_inc + EXCLUDED.test_inc
160+
"""
161+
162+
values = [
163+
(
164+
data["hardware_origin"],
165+
data["hardware_platform"],
166+
data["date"],
167+
data["compatibles"],
168+
data["build_pass"],
169+
data["build_failed"],
170+
data["build_inc"],
171+
data["boot_pass"],
172+
data["boot_failed"],
173+
data["boot_inc"],
174+
data["test_pass"],
175+
data["test_failed"],
176+
data["test_inc"],
177+
)
178+
for data in aggregated_data.values()
179+
]
180+
181+
cursor.executemany(insert_query, values)

backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,19 @@
2222
)
2323
import kcidb_io
2424
from django.db import transaction
25-
from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents
25+
from kernelCI_app.models import (
26+
Issues,
27+
Checkouts,
28+
Builds,
29+
NewBuild,
30+
NewTest,
31+
Tests,
32+
Incidents,
33+
)
34+
from kernelCI_app.management.commands.helpers.aggregation_helpers import (
35+
aggregate_builds_status,
36+
aggregate_tests_status,
37+
)
2638

2739
from kernelCI_app.management.commands.helpers.process_submissions import (
2840
TableNames,
@@ -204,6 +216,8 @@ def db_worker(stop_event: threading.Event) -> None: # noqa: C901
204216
builds_buf: list[Builds] = []
205217
tests_buf: list[Tests] = []
206218
incidents_buf: list[Incidents] = []
219+
new_tests_buf: list[NewTest] = []
220+
new_builds_buf: list[NewBuild] = []
207221

208222
last_flush_ts = time.time()
209223

@@ -230,6 +244,8 @@ def buffered_total() -> int:
230244
builds_buf.extend(inst["builds"])
231245
tests_buf.extend(inst["tests"])
232246
incidents_buf.extend(inst["incidents"])
247+
new_tests_buf.extend(inst["tests"])
248+
new_builds_buf.extend(inst["builds"])
233249

234250
if buffered_total() >= INGEST_BATCH_SIZE:
235251
flush_buffers(
@@ -239,6 +255,10 @@ def buffered_total() -> int:
239255
tests_buf=tests_buf,
240256
incidents_buf=incidents_buf,
241257
)
258+
aggregate_tests_status(new_tests_buf)
259+
aggregate_builds_status(new_builds_buf)
260+
new_tests_buf.clear()
261+
new_builds_buf.clear()
242262
last_flush_ts = time.time()
243263

244264
if VERBOSE:
@@ -278,6 +298,10 @@ def buffered_total() -> int:
278298
tests_buf=tests_buf,
279299
incidents_buf=incidents_buf,
280300
)
301+
aggregate_tests_status(new_tests_buf)
302+
aggregate_builds_status(new_builds_buf)
303+
new_tests_buf.clear()
304+
new_builds_buf.clear()
281305
last_flush_ts = time.time()
282306
continue
283307
except Exception as e:
@@ -291,6 +315,10 @@ def buffered_total() -> int:
291315
tests_buf=tests_buf,
292316
incidents_buf=incidents_buf,
293317
)
318+
aggregate_tests_status(new_tests_buf)
319+
aggregate_builds_status(new_builds_buf)
320+
new_tests_buf.clear()
321+
new_builds_buf.clear()
294322

295323

296324
def process_file(
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# Generated by Django 5.1.13 on 2025-11-13 14:42
2+
3+
import django.contrib.postgres.fields
4+
from django.db import migrations, models
5+
6+
7+
class Migration(migrations.Migration):
8+
9+
dependencies = [
10+
("kernelCI_app", "0009_add_test_origin_start_time_platform_index"),
11+
]
12+
13+
operations = [
14+
migrations.CreateModel(
15+
name="BuildStatusByHardware",
16+
fields=[
17+
(
18+
"id",
19+
models.BigAutoField(
20+
auto_created=True,
21+
primary_key=True,
22+
serialize=False,
23+
verbose_name="ID",
24+
),
25+
),
26+
("hardware_origin", models.CharField(max_length=100)),
27+
("hardware_platform", models.CharField(max_length=100)),
28+
("build_id", models.TextField()),
29+
],
30+
options={
31+
"db_table": "build_status_by_hardware",
32+
"unique_together": {
33+
("hardware_origin", "hardware_platform", "build_id")
34+
},
35+
},
36+
),
37+
migrations.CreateModel(
38+
name="HardwareStatus",
39+
fields=[
40+
(
41+
"id",
42+
models.BigAutoField(
43+
auto_created=True,
44+
primary_key=True,
45+
serialize=False,
46+
verbose_name="ID",
47+
),
48+
),
49+
("hardware_origin", models.CharField(max_length=100)),
50+
("hardware_platform", models.CharField(max_length=100)),
51+
(
52+
"compatibles",
53+
django.contrib.postgres.fields.ArrayField(
54+
base_field=models.TextField(), null=True, size=None
55+
),
56+
),
57+
("date", models.IntegerField()),
58+
("build_pass", models.IntegerField()),
59+
("build_failed", models.IntegerField()),
60+
("build_inc", models.IntegerField()),
61+
("boot_pass", models.IntegerField()),
62+
("boot_failed", models.IntegerField()),
63+
("boot_inc", models.IntegerField()),
64+
("test_pass", models.IntegerField()),
65+
("test_failed", models.IntegerField()),
66+
("test_inc", models.IntegerField()),
67+
],
68+
options={
69+
"db_table": "hardware_status",
70+
"unique_together": {("hardware_origin", "hardware_platform", "date")},
71+
},
72+
),
73+
migrations.CreateModel(
74+
name="NewBuild",
75+
fields=[
76+
("build_id", models.TextField(primary_key=True, serialize=False)),
77+
("checkout_id", models.TextField()),
78+
("build_origin", models.CharField(max_length=100)),
79+
(
80+
"status",
81+
models.CharField(
82+
blank=True,
83+
choices=[
84+
("PASS", "Pass"),
85+
("FAIL", "Fail"),
86+
("SKIP", "Skip"),
87+
("ERROR", "Error"),
88+
("MISS", "Miss"),
89+
("DONE", "Done"),
90+
],
91+
max_length=10,
92+
null=True,
93+
),
94+
),
95+
],
96+
options={
97+
"db_table": "new_build",
98+
"unique_together": {("build_id", "checkout_id")},
99+
},
100+
),
101+
migrations.CreateModel(
102+
name="NewTest",
103+
fields=[
104+
("test_id", models.TextField(primary_key=True, serialize=False)),
105+
("build_id", models.TextField()),
106+
("test_origin", models.CharField(max_length=100)),
107+
("test_platform", models.CharField(max_length=100)),
108+
(
109+
"test_compatible",
110+
django.contrib.postgres.fields.ArrayField(
111+
base_field=models.TextField(), null=True, size=None
112+
),
113+
),
114+
(
115+
"status",
116+
models.CharField(
117+
blank=True,
118+
choices=[
119+
("PASS", "Pass"),
120+
("FAIL", "Fail"),
121+
("SKIP", "Skip"),
122+
("ERROR", "Error"),
123+
("MISS", "Miss"),
124+
("DONE", "Done"),
125+
],
126+
max_length=10,
127+
null=True,
128+
),
129+
),
130+
("start_time", models.IntegerField()),
131+
("is_boot", models.BooleanField(default=False)),
132+
],
133+
options={
134+
"db_table": "new_test",
135+
"unique_together": {("test_id", "build_id")},
136+
},
137+
),
138+
]

0 commit comments

Comments
 (0)