Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3210 batch uploads creation #1006

Merged
merged 12 commits into from
Jan 28, 2025
Merged
7 changes: 0 additions & 7 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,6 @@ def initialize_and_save_report(

return current_report_row

def create_report_upload(
self, arguments: UploadArguments, commit_report: CommitReport
) -> Upload:
upload = super().create_report_upload(arguments, commit_report)
self._attach_flags_to_upload(upload, arguments["flags"])
return upload

def _attach_flags_to_upload(self, upload: Upload, flag_names: list[str]):
"""
Internal function that manages creating the proper `RepositoryFlag`s,
Expand Down
4 changes: 0 additions & 4 deletions services/tests/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -3727,10 +3727,6 @@ def test_create_report_upload(self, dbsession):
assert res.totals is None
assert res.upload_extras == {}
assert res.upload_type == "uploaded"
assert len(res.flags) == 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer gets evaluated by this test/method since we got rid of the override

first_flag = res.flags[0]
assert first_flag.flag_name == "unittest"
assert first_flag.repository_id == commit.repoid

def test_shift_carryforward_report(
self, dbsession, sample_report, mocker, mock_repo_provider
Expand Down
201 changes: 165 additions & 36 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import uuid
from copy import deepcopy
from typing import Optional
from typing import Optional, TypedDict

import orjson
import sentry_sdk
Expand All @@ -25,7 +25,7 @@

from app import celery_app
from database.enums import CommitErrorTypes, ReportType
from database.models import Commit, CommitReport
from database.models import Commit, CommitReport, Repository, RepositoryFlag, Upload
from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME
from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow
from helpers.exceptions import RepositoryWithoutValidBotError
Expand Down Expand Up @@ -218,6 +218,12 @@
return None


class CreateUploadResponse(TypedDict):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice nice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's the naming here? I was not amazed by it 😂

argument_list: list[UploadArguments]
measurements_list: list[UserMeasurement]
upload_flag_map: dict[Upload, list | str | None]


class UploadTask(BaseCodecovTask, name=upload_task_name):
"""The first of a series of tasks designed to process an `upload` made by the user

Expand Down Expand Up @@ -500,18 +506,112 @@
)
self.retry(countdown=60, kwargs=upload_context.kwargs_for_retry(kwargs))

upload_argument_list = self.possibly_insert_uploads_and_side_effects(
db_session=db_session,
upload_context=upload_context,
commit=commit,
commit_report=commit_report,
report_service=report_service,
)

if upload_argument_list:
db_session.commit()

UPLOADS_PER_TASK_SCHEDULE.labels(report_type=report_type.value).observe(
len(upload_argument_list)
)
scheduled_tasks = self.schedule_task(
commit,
commit_yaml.to_dict(),
upload_argument_list,
commit_report,
upload_context,
)

log.info(
f"Scheduling {upload_context.report_type.value} processing tasks",
extra=upload_context.log_extra(
argument_list=upload_argument_list,
number_arguments=len(upload_argument_list),
scheduled_task_ids=scheduled_tasks.as_tuple(),
),
)

else:
self.maybe_log_upload_checkpoint(UploadFlow.INITIAL_PROCESSING_COMPLETE)
self.maybe_log_upload_checkpoint(UploadFlow.NO_REPORTS_FOUND)
log.info(
"Not scheduling task because there were no arguments found on redis",
extra=upload_context.log_extra(),
)
return {"was_setup": was_setup, "was_updated": was_updated}

def possibly_insert_uploads_and_side_effects(
self,
db_session: Session,
upload_context: UploadContext,
commit: Commit,
commit_report: CommitReport,
report_service: ReportService,
) -> list[UploadArguments]:
"""
This method possibly batch inserts uploads, flags and user measurements.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we know it "possibly" does something from the name. What are the conditions that the upload will or will not be inserted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's described in the sentence after that, https://github.com/codecov/worker/pull/1006/files#diff-7ea195247ce946074a2441630db2be4bda78feed427fc0a1f1bd170cfc84a7a7R557 😅, only happens for v4 uploads since cli uploads are created in api. Should I rephrase it a bit so it's clearer?

This only happens for v4 uploads as CLI uploads create the records mentioned
above in the uploads codecov-api route.
"""
repository: Repository = commit.repository

# Possibly batch insert uploads
create_upload_res = self._possibly_create_uploads_to_insert(
db_session=db_session,
commit=commit,
repository=repository,
commit_report=commit_report,
upload_context=upload_context,
report_service=report_service,
)

# Bulk insert flags
if uploads_flag_map := create_upload_res["upload_flag_map"]:
self._bulk_insert_flags(
db_session=db_session,
repoid=repository.repoid,
upload_flag_map=uploads_flag_map,
)

# Bulk insert coverage measurements
if measurements := create_upload_res["measurements_list"]:
self._bulk_insert_coverage_measurements(measurements=measurements)

return create_upload_res["argument_list"]

def _possibly_create_uploads_to_insert(
self,
db_session: Session,
commit: Commit,
repository: Repository,
upload_context: UploadContext,
report_service: ReportService,
commit_report: CommitReport,
) -> CreateUploadResponse:
# List to track arguments for the rest of uploads
argument_list: list[UploadArguments] = []
# Measurements insertion performance
measurements = []

# List to track possible measurements to insert later
measurements_list: list[UserMeasurement] = []
created_at = timezone.now()

# List + helper mapping to track possible upload + flags to insert later
upload_flag_map: dict[Upload, list | str | None] = {}

for arguments in upload_context.arguments_list():
arguments = upload_context.normalize_arguments(commit, arguments)
if "upload_id" not in arguments:
upload = report_service.create_report_upload(arguments, commit_report)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I remember correctly, create_report_upload internally creates the Upload model, so I’m not sure the bulk_save_objects based on the uploads_list is effective at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you are right, I was doing that from a previous implementation.

I'd like to use the bulk_save but I need the upload's primary key for the "arguments["upload_id"] = upload.id_" line, so doing that's why we're doing the add/flush (and therefore the bulk_save is kinda redundant). I'll get rid of it for now, but can you think of a way to make it work?

I've been giving it some thought and the closest thing Ive thought of is to skip the add/flush, do an arguments_upload mapping (without the primary key), do the bulk_save, then requery the db for the uploads we just created and add the upload_id to the arguments based on the mapping, then continue execution. But that requerying seems inefficient and I feel makes this whole thing a bit more complicated.

I could just get rid of the bulk_save and still benefit from the other changes though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another idea here would be, since getting the arguments without an upload_id is one of the legacy upload paths:
we could just patch that legacy upload path to create the Upload object and trigger a PreProcess to do carry-forwarding, then we can delete the code responsible for this here.
its not ideal either :-(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is another option. I think the counter of that is that we couldn't batch insert since it would be done at an upload level, that being said, we already do this with the CLI, so arguably another point of performance there.

To me though, it seems the responsibility to make an upload shouldn't be in API (although I get why we currently do it this way), instead done by an inexistent "upload service" that would come w/ the new services. I think worker is the closest thing to that atm, but yeah I preferred to do it here since most of the code was here.

arguments["upload_id"] = upload.id_
# Adding measurements to array to later add in bulk
measurements.append(
# Adds objects to insert later in bulk
upload_flag_map[upload] = arguments.get("flags", [])
measurements_list.append(
UserMeasurement(
owner_id=repository.owner.ownerid,
repo_id=repository.repoid,
Expand All @@ -529,41 +629,70 @@
arguments["upload_pk"] = arguments["upload_id"]
argument_list.append(arguments)

# Bulk insert coverage measurements
if measurements:
self._bulk_insert_coverage_measurements(measurements=measurements)
db_session.commit()
return CreateUploadResponse(
argument_list=argument_list,
measurements_list=measurements_list,
upload_flag_map=upload_flag_map,
)

def _bulk_insert_flags(
self,
db_session: Session,
repoid: int,
upload_flag_map: dict[Upload, list | str | None] = None,
):
"""
This function possibly inserts flags in bulk for a Repository if these
don't exist already
"""
if upload_flag_map is None:
upload_flag_map = {}

Check warning on line 650 in tasks/upload.py

View check run for this annotation

Codecov Notifications / codecov/patch

tasks/upload.py#L650

Added line #L650 was not covered by tests

# Fetch all RepositoryFlags per repo
existing_flags = self._fetch_all_repo_flags(
db_session=db_session, repoid=repoid
)

# Prepare new flags and map relationships
flags_to_create: list[RepositoryFlag] = []
upload_flag_links = {}

if argument_list:
# Loops through upload_flag_map dict, possibly creates flags and maps them to their uploads
for upload, flag_names in upload_flag_map.items():
upload_flags = []

for flag_name in flag_names:
# Check for existing flag, create if missing
flag = existing_flags.get(flag_name)
if not flag:
flag = RepositoryFlag(repository_id=repoid, flag_name=flag_name)
flags_to_create.append(flag)
existing_flags[flag_name] = flag

upload_flags.append(flag)

# Save the relationship mapping without causing additional queries
upload_flag_links[upload] = upload_flags

if flags_to_create:
db_session.add_all(flags_to_create)
db_session.commit()

UPLOADS_PER_TASK_SCHEDULE.labels(report_type=report_type.value).observe(
len(argument_list)
)
scheduled_tasks = self.schedule_task(
commit,
commit_yaml.to_dict(),
argument_list,
commit_report,
upload_context,
)
# Assign flags to uploads
for upload, upload_flags in upload_flag_links.items():
upload.flags = upload_flags

log.info(
f"Scheduling {upload_context.report_type.value} processing tasks",
extra=upload_context.log_extra(
argument_list=argument_list,
number_arguments=len(argument_list),
scheduled_task_ids=scheduled_tasks.as_tuple(),
),
)
db_session.commit()

else:
self.maybe_log_upload_checkpoint(UploadFlow.INITIAL_PROCESSING_COMPLETE)
self.maybe_log_upload_checkpoint(UploadFlow.NO_REPORTS_FOUND)
log.info(
"Not scheduling task because there were no arguments found on redis",
extra=upload_context.log_extra(),
)
return {"was_setup": was_setup, "was_updated": was_updated}
def _fetch_all_repo_flags(
self, db_session: Session, repoid: int
) -> Optional[dict[str, RepositoryFlag] | dict]:
"""
Fetches all flags on a repository
"""
flags = db_session.query(RepositoryFlag).filter_by(repository_id=repoid).all()
return {flag.flag_name: flag for flag in flags} if flags else {}

def _bulk_insert_coverage_measurements(self, measurements: list[UserMeasurement]):
bulk_insert_coverage_measurements(measurements=measurements)
Expand Down
Loading