Skip to content

Commit f761ab5

Browse files
committed
feat: change ingester data insertion policy
Now allows for overwrites of null data Closes #1552
1 parent 12a4d10 commit f761ab5

File tree

4 files changed

+100
-16
lines changed

4 files changed

+100
-16
lines changed

backend/kernelCI_app/constants/ingester.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,9 @@
4848
except (ValueError, TypeError):
4949
logger.warning("Invalid INGEST_QUEUE_MAXSIZE, using default 5000")
5050
INGEST_QUEUE_MAXSIZE = 5000
51+
52+
53+
PRIO_DB = is_boolean_or_string_true(os.environ.get("PRIO_BD", "True"))
54+
"""Toggles the priority when updating database data.\n
55+
If True, this will prioritize what is already present in the database, never updating fields != null.
56+
If False, it will prioritize the incoming data, allowing it to overwrite existing data."""

backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
INGEST_FLUSH_TIMEOUT_SEC,
1111
INGEST_QUEUE_MAXSIZE,
1212
VERBOSE,
13+
PRIO_DB,
1314
)
1415
import threading
1516
import time
@@ -21,7 +22,7 @@
2122
extract_log_excerpt,
2223
)
2324
import kcidb_io
24-
from django.db import transaction
25+
from django.db import connections, transaction
2526
from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents
2627

2728
from kernelCI_app.management.commands.helpers.process_submissions import (
@@ -111,23 +112,95 @@ def prepare_file_data(
111112
}
112113

113114

114-
def consume_buffer(buffer: list[TableModels], item_type: TableNames) -> None:
115+
def _generate_model_insert_query(
116+
table_name: TableModels, model: TableModels
117+
) -> tuple[list[str], str]:
118+
"""
119+
Dynamically generates the insert query for any model.
120+
121+
The generated query follows the policy of never updating non-null values.
122+
This policy can be changed with the `PRIO_DB` env var.
123+
124+
Returns a list of which model properties can be updated and the insert query.
125+
"""
126+
updateable_model_fields: list[str] = []
127+
updateable_db_fields: list[str] = []
128+
query_params_properties: list[tuple[str, str]] = []
129+
130+
for field in model._meta.fields:
131+
if field.generated:
132+
continue
133+
134+
field_name = (
135+
field.name + "_id"
136+
if field.get_internal_type() == "ForeignKey"
137+
else field.name
138+
)
139+
real_name = field.db_column or field_name
140+
operation = "GREATEST" if real_name == "_timestamp" else "COALESCE"
141+
142+
query_params_properties.append((real_name, operation))
143+
updateable_model_fields.append(field_name)
144+
updateable_db_fields.append(real_name)
145+
146+
conflict_clauses = []
147+
for field, op in query_params_properties:
148+
if PRIO_DB:
149+
conflict_clauses.append(
150+
f"""
151+
{field} = {op}({table_name}.{field}, EXCLUDED.{field})"""
152+
)
153+
else:
154+
conflict_clauses.append(
155+
f"""
156+
{field} = {op}(EXCLUDED.{field}, {table_name}.{field})"""
157+
)
158+
159+
query = f"""
160+
INSERT INTO {table_name} (
161+
{', '.join(updateable_db_fields)}
162+
)
163+
VALUES (
164+
{', '.join(['%s'] * len(updateable_db_fields))}
165+
)
166+
ON CONFLICT (id)
167+
DO UPDATE SET {', '.join(conflict_clauses)};
168+
"""
169+
170+
return updateable_model_fields, query
171+
172+
173+
def consume_buffer(buffer: list[TableModels], table_name: TableNames) -> None:
115174
"""
116175
Consume a buffer of items and insert them into the database.
117176
This function is called by the db_worker thread.
118177
"""
119178
if not buffer:
120179
return
121180

122-
model = MODEL_MAP[item_type]
181+
try:
182+
model = MODEL_MAP[table_name]
183+
except KeyError:
184+
out(f"Unknown table '{table_name}' passed to consume_buffer")
185+
raise
186+
187+
updateable_model_fields, query = _generate_model_insert_query(table_name, model)
188+
189+
params = []
190+
for obj in buffer:
191+
obj_values = []
192+
for field in updateable_model_fields:
193+
value = getattr(obj, field)
194+
if isinstance(value, (dict, list)):
195+
value = json.dumps(value)
196+
obj_values.append(value)
197+
params.append(tuple(obj_values))
123198

124199
t0 = time.time()
125-
model.objects.bulk_create(
126-
buffer,
127-
batch_size=INGEST_BATCH_SIZE,
128-
ignore_conflicts=True,
129-
)
130-
out("bulk_create %s: n=%d in %.3fs" % (item_type, len(buffer), time.time() - t0))
200+
with connections["default"].cursor() as cursor:
201+
cursor.executemany(query, params)
202+
203+
out("bulk_create %s: n=%d in %.3fs" % (table_name, len(buffer), time.time() - t0))
131204

132205

133206
def flush_buffers(

backend/kernelCI_app/models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
"""Defines the models used in the main database.
2+
All models should have explicit id column for the ingester to work properly."""
3+
14
from django.db import models
25
from django.contrib.postgres.fields import ArrayField
36
from django.contrib.postgres.indexes import GinIndex

backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,24 +218,26 @@ class TestConsumeBuffer:
218218
INGEST_BATCH_SIZE_MOCK,
219219
)
220220
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out")
221+
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.connections")
221222
@patch("time.time", side_effect=TIME_MOCK)
222-
def test_consume_buffer_with_items(self, mock_time, mock_out):
223+
def test_consume_buffer_with_items(self, mock_time, mock_connections, mock_out):
223224
"""Test consume_buffer with items in buffer."""
225+
table_name = "issues"
224226
mock_model = MagicMock()
225227
mock_buffer = [MagicMock(), MagicMock()]
228+
mock_cursor = MagicMock()
229+
mock_connections["default"].cursor.return_value.__enter__.return_value = (
230+
mock_cursor
231+
)
226232

227233
with patch(
228234
"kernelCI_app.management.commands.helpers.kcidbng_ingester.MODEL_MAP",
229235
{"issues": mock_model},
230236
):
231-
consume_buffer(mock_buffer, "issues")
237+
consume_buffer(mock_buffer, table_name)
232238

233239
assert mock_time.call_count == 2
234-
mock_model.objects.bulk_create.assert_called_once_with(
235-
mock_buffer,
236-
batch_size=INGEST_BATCH_SIZE_MOCK,
237-
ignore_conflicts=True,
238-
)
240+
mock_cursor.executemany.assert_called_once()
239241
mock_out.assert_called_once()
240242

241243
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out")

0 commit comments

Comments
 (0)