Skip to content

[do not merge] Reusable requests functionality #9230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 110 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
110 commits
Select commit Hold shift + click to select a range
77ce443
Move common logic into a separate app
Marishka17 Mar 18, 2025
f525c7b
Update quality control app
Marishka17 Mar 18, 2025
77274fd
Update consensus app
Marishka17 Mar 18, 2025
5bd0b89
[quality/consensus] Allow users with resource access to check operati…
Marishka17 Mar 18, 2025
d312343
Merge develop
Marishka17 Mar 18, 2025
622952a
Update import/export
Marishka17 Mar 19, 2025
95b2b85
Drop POST /api/consensus/merges?rq_id= support
Marishka17 Mar 19, 2025
792295f
Fix permissions
Marishka17 Mar 19, 2025
d11b73c
Define parsed job id class based on queue config
Marishka17 Mar 19, 2025
6e45837
Fix && update consensus REST API tests
Marishka17 Mar 19, 2025
e8d53fa
Update quality reports REST API tests
Marishka17 Mar 19, 2025
50d3df5
Refactor && fix some bugs
Marishka17 Mar 24, 2025
7b75bdc
Update task creation flow && add request validation when exporting so…
Marishka17 Mar 25, 2025
8111cab
Update events API
Marishka17 Mar 25, 2025
0adada6
[events] Update query params supported by API endpoint
Marishka17 Mar 25, 2025
b5b313e
Small code cleanup
Marishka17 Mar 25, 2025
d94cf50
Merge develop
Marishka17 Mar 25, 2025
9e53f02
Remove unused imports
Marishka17 Mar 25, 2025
5358448
Drop tmp_file from import-specific metadata
Marishka17 Mar 26, 2025
7cb5006
Fix AbstractExporter
Marishka17 Mar 26, 2025
37f6d25
Move rq job owner check into rego
Marishka17 Mar 26, 2025
9c4c595
Update events REST API tests
Marishka17 Mar 26, 2025
06951a3
Update server schema
Marishka17 Mar 26, 2025
69b9d30
Cleanup unit tests
Marishka17 Mar 27, 2025
d97a677
[events] Add deprecation response header
Marishka17 Mar 27, 2025
be5348b
Update some rest api tests
Marishka17 Mar 27, 2025
06a9aba
Merge branch 'develop' into mk/reusable_requests_functionality
Marishka17 Mar 27, 2025
5ccee8f
isort
Marishka17 Mar 27, 2025
03f445e
Resolve linter warnings
Marishka17 Mar 27, 2025
9ded53c
black tests
Marishka17 Mar 27, 2025
0440ddc
fix type in tests
Marishka17 Mar 27, 2025
3415c54
API fixes
Marishka17 Mar 27, 2025
f2e5cca
t
Marishka17 Mar 27, 2025
07b2a0d
Fix import from cloud
Marishka17 Mar 31, 2025
305aac9
p
Marishka17 Mar 31, 2025
cf6f932
Update test_resource_import_export.py
Marishka17 Mar 31, 2025
dd3d292
Fix operation type returned by server for import requests
Marishka17 Mar 31, 2025
7d1bb10
Skip broken test
Marishka17 Mar 31, 2025
6988d95
black code
Marishka17 Mar 31, 2025
6af24e1
remove unused import
Marishka17 Mar 31, 2025
d752b3e
Fix server schema
Marishka17 Apr 1, 2025
12fc0dd
Remove extra lines
Marishka17 Apr 1, 2025
e1df007
Update POST /api/quality/reports API description
Marishka17 Apr 1, 2025
afce3d1
Cleanup API
Marishka17 Apr 1, 2025
0de036c
Fix typos
Marishka17 Apr 1, 2025
602b7da
Merge branch 'develop' into mk/reusable_requests_functionality
Marishka17 Apr 2, 2025
b961e9c
small refactoring
Marishka17 Apr 4, 2025
6d49dde
Resolve conflicts
Marishka17 Apr 4, 2025
2fa1504
Use return value instead of custom meta key in serializer
Marishka17 Apr 7, 2025
8b4e946
Fix permissions
Marishka17 Apr 7, 2025
4155f2a
isort && pylint
Marishka17 Apr 7, 2025
5593d94
black
Marishka17 Apr 7, 2025
43da03a
Fix automatic change
Marishka17 Apr 7, 2025
3a902c3
disable linter check
Marishka17 Apr 7, 2025
37d1a0c
Remove assert
Marishka17 Apr 7, 2025
5781f96
Update events API
Marishka17 Apr 8, 2025
dbd8b46
[events] Use cache/export dir
Marishka17 Apr 10, 2025
b7fa661
[ResourceImporter] Cleanup callback/callback_args initialization
Marishka17 Apr 10, 2025
510fef7
[AbstractExporter] Split logic into 2 classes
Marishka17 Apr 10, 2025
dbde0d2
Drop hidden meta field
Marishka17 Apr 10, 2025
272afdf
Revert changes not related to this PR
Marishka17 Apr 10, 2025
cf6118b
Add DeprecatedResponse
Marishka17 Apr 10, 2025
9e8ccb6
schedule_job -> enqueue_job
Marishka17 Apr 10, 2025
5a4d6ad
Fix analytics tests
Marishka17 Apr 10, 2025
b8465b0
Fix requests permissions
Marishka17 Apr 10, 2025
74afbfc
Rename back: RequestIdSerializer -> RqIdSerializer
Marishka17 Apr 11, 2025
440f78a
Refactor a bit
Marishka17 Apr 14, 2025
bf830f0
t
Marishka17 Apr 15, 2025
db4380e
Update test_user_without_rights_cannot_check_status_of_report_creatio…
Marishka17 Apr 15, 2025
277a140
Remove unused import
Marishka17 Apr 15, 2025
bf5f09d
Fix type
Marishka17 Apr 15, 2025
0aea750
test_cant_import_annotations_as_project: revert expected exception back
Marishka17 Apr 15, 2025
546b527
fix legacy events api
Marishka17 Apr 15, 2025
fdc8f08
Fix typo
Marishka17 Apr 15, 2025
4a4c871
Remove wrong validation
Marishka17 Apr 15, 2025
8b10294
Merge branch 'develop' into mk/reusable_requests_functionality
Marishka17 Apr 15, 2025
ac5f8d1
Do not use spaces in rq job ids && fix test_list_requests_when_there_…
Marishka17 Apr 15, 2025
aa8a9bd
remove queue from ids && split parsed.id to target_id & id && declare…
Marishka17 Apr 17, 2025
9044c00
[consensus] drop outdated permissions
Marishka17 Apr 18, 2025
d041f58
Support legacy requests formats
Marishka17 Apr 18, 2025
a368dba
pylint && typo
Marishka17 Apr 18, 2025
f9525a1
resource -> target
Marishka17 Apr 21, 2025
023cb1c
Remove queue_name arg for the validate_request_id method
Marishka17 Apr 21, 2025
69e0ed3
Add return type for the build_meta method
Marishka17 Apr 21, 2025
e75917b
Apply comments
Marishka17 Apr 22, 2025
68af8d5
Add some REST API tests to check legacy rq_id format usage
Marishka17 Apr 22, 2025
27fdb93
Move RequestStatus into redis_handler
Marishka17 Apr 23, 2025
4ef2279
Add REQUEST_ID_SUBCLASSES
Marishka17 Apr 23, 2025
620b81b
Rename to SELECTOR_TO_QUEUE
Marishka17 Apr 23, 2025
e48dffc
More REST API tests
Marishka17 Apr 23, 2025
bd69c96
Update cvat/apps/engine/rq.py
Marishka17 Apr 23, 2025
779ef18
Update cvat/apps/engine/views.py
Marishka17 Apr 23, 2025
48dd577
Rename LocationConfig.storage_id -> cloud_storage_id
Marishka17 Apr 23, 2025
db76a56
[unit tests] pass query_params to post method
Marishka17 Apr 23, 2025
087d53c
Add assert
Marishka17 Apr 23, 2025
a219509
Distinguish drf ValidationError from django one
Marishka17 Apr 23, 2025
6746b00
Change link
Marishka17 Apr 23, 2025
f40dcdc
isort && black
Marishka17 Apr 23, 2025
06ee637
t
Marishka17 Apr 23, 2025
f89a73b
remove comment
Marishka17 Apr 24, 2025
38bac3f
Update test_cannot_export_backup_for_task_without_data
Marishka17 Apr 24, 2025
30fc0dc
Add FileId class with attribute declaration
Marishka17 Apr 24, 2025
719dda2
t
Marishka17 Apr 24, 2025
f89f7cf
[REST API tests] add missing test && reduce code duplication
Marishka17 Apr 28, 2025
6c93d35
Use deprecate_response func
Marishka17 Apr 29, 2025
6d9e5c2
empty commit: check notifications
Marishka17 Apr 29, 2025
8bc0b7d
Remove return_request_id arg
Marishka17 Apr 29, 2025
1b9a9cd
Remove ParsedExportFilenameWithConstructedId class
Marishka17 Apr 29, 2025
678ec1b
update changelog
Marishka17 Apr 29, 2025
3487651
Merge branch 'develop' into mk/reusable_requests_functionality
Marishka17 Apr 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 57 additions & 68 deletions cvat-core/src/server-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@
return new ServerError(message, 0);
}

function prepareData(details) {

Check warning on line 248 in cvat-core/src/server-proxy.ts

View workflow job for this annotation

GitHub Actions / Linter

Missing return type on function
const data = new FormData();
for (const [key, value] of Object.entries(details)) {
if (Array.isArray(value)) {
Expand Down Expand Up @@ -287,7 +287,7 @@
return requestId++;
}

async function get(url: string, requestConfig) {

Check warning on line 290 in cvat-core/src/server-proxy.ts

View workflow job for this annotation

GitHub Actions / Linter

Missing return type on function
return new Promise((resolve, reject) => {
const newRequestId = getRequestId();
requests[newRequestId] = { resolve, reject };
Expand Down Expand Up @@ -612,6 +612,57 @@
fetchAll: false,
};

async function getRequestsList(): Promise<PaginatedResource<SerializedRequest>> {
const { backendAPI } = config;
const params = enableOrganization();

try {
const response = await fetchAll(`${backendAPI}/requests`, params);

return response.results;
} catch (errorData) {
throw generateError(errorData);
}
}

// Temporary solution for server availability problems
const retryTimeouts = [5000, 10000, 15000];
async function getRequestStatus(rqID: string): Promise<SerializedRequest> {
const { backendAPI } = config;
let retryCount = 0;
let lastError = null;

while (retryCount < 3) {
try {
const response = await Axios.get(`${backendAPI}/requests/${rqID}`);

return response.data;
} catch (errorData) {
lastError = generateError(errorData);
const { response } = errorData;
if (response && [502, 503, 504].includes(response.status)) {
const timeout = retryTimeouts[retryCount];
await new Promise((resolve) => { setTimeout(resolve, timeout); });
retryCount++;
} else {
throw generateError(errorData);
}
}
}

throw lastError;
}

async function cancelRequest(requestID): Promise<void> {
const { backendAPI } = config;

try {
await Axios.post(`${backendAPI}/requests/${requestID}/cancel`);
} catch (errorData) {
throw generateError(errorData);
}
}

async function serverRequest(
url: string, data: object,
requestConfig: ServerRequestConfig = defaultRequestConfig,
Expand Down Expand Up @@ -768,30 +819,19 @@
}
}

async function mergeConsensusJobs(id: number, instanceType: string): Promise<void> {
async function mergeConsensusJobs(id: number, instanceType: string): Promise<string> {
const { backendAPI } = config;
const url = `${backendAPI}/consensus/merges`;
const params = {
rq_id: null,
};
const requestBody = {
task_id: undefined,
job_id: undefined,
};
const requestBody = (instanceType === 'task') ? { task_id: id } : { job_id: id };

if (instanceType === 'task') requestBody.task_id = id;
else requestBody.job_id = id;

return new Promise<void>((resolve, reject) => {
return new Promise<string>((resolve, reject) => {
async function request() {

Check warning on line 828 in cvat-core/src/server-proxy.ts

View workflow job for this annotation

GitHub Actions / Linter

Missing return type on function
try {
const response = await Axios.post(url, requestBody, { params });
params.rq_id = response.data.rq_id;
const response = await Axios.post(url, requestBody);
const rqID = response.data.rq_id;
const { status } = response;
if (status === 202) {
setTimeout(request, 3000);
} else if (status === 201) {
resolve();
resolve(rqID);
} else {
reject(generateError(response));
}
Expand Down Expand Up @@ -2304,57 +2344,6 @@
}
}

async function getRequestsList(): Promise<PaginatedResource<SerializedRequest>> {
const { backendAPI } = config;
const params = enableOrganization();

try {
const response = await fetchAll(`${backendAPI}/requests`, params);

return response.results;
} catch (errorData) {
throw generateError(errorData);
}
}

// Temporary solution for server availability problems
const retryTimeouts = [5000, 10000, 15000];
async function getRequestStatus(rqID: string): Promise<SerializedRequest> {
const { backendAPI } = config;
let retryCount = 0;
let lastError = null;

while (retryCount < 3) {
try {
const response = await Axios.get(`${backendAPI}/requests/${rqID}`);

return response.data;
} catch (errorData) {
lastError = generateError(errorData);
const { response } = errorData;
if (response && [502, 503, 504].includes(response.status)) {
const timeout = retryTimeouts[retryCount];
await new Promise((resolve) => { setTimeout(resolve, timeout); });
retryCount++;
} else {
throw generateError(errorData);
}
}
}

throw lastError;
}

async function cancelRequest(requestID): Promise<void> {
const { backendAPI } = config;

try {
await Axios.post(`${backendAPI}/requests/${requestID}/cancel`);
} catch (errorData) {
throw generateError(errorData);
}
}

const listenToCreateAnalyticsReportCallbacks: {
job: LongProcessListener<void>;
task: LongProcessListener<void>;
Expand Down
4 changes: 2 additions & 2 deletions cvat-core/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ export class Job extends Session {
return result;
}

async mergeConsensusJobs(): Promise<void> {
async mergeConsensusJobs(): Promise<string> {
const result = await PluginRegistry.apiWrapper.call(this, Job.prototype.mergeConsensusJobs);
return result;
}
Expand Down Expand Up @@ -1204,7 +1204,7 @@ export class Task extends Session {
return result;
}

async mergeConsensusJobs(): Promise<void> {
async mergeConsensusJobs(): Promise<string> {
const result = await PluginRegistry.apiWrapper.call(this, Task.prototype.mergeConsensusJobs);
return result;
}
Expand Down
11 changes: 9 additions & 2 deletions cvat-ui/src/actions/consensus-actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
// SPDX-License-Identifier: MIT

import { ActionUnion, createAction, ThunkAction } from 'utils/redux';
import { Project, ProjectOrTaskOrJob } from 'cvat-core-wrapper';
import { Project, ProjectOrTaskOrJob, getCore } from 'cvat-core-wrapper';

import { updateRequestProgress } from './requests-actions';

const core = getCore();

export enum ConsensusActionTypes {
MERGE_CONSENSUS_JOBS = 'MERGE_CONSENSUS_JOBS',
Expand All @@ -28,7 +32,10 @@ export const mergeConsensusJobsAsync = (
): ThunkAction => async (dispatch) => {
try {
dispatch(consensusActions.mergeConsensusJobs(instance));
await instance.mergeConsensusJobs();
const rqID = await instance.mergeConsensusJobs();
await core.requests.listen(rqID, {
callback: (updatedRequest) => updateRequestProgress(updatedRequest, dispatch),
});
} catch (error) {
dispatch(consensusActions.mergeConsensusJobsFailed(instance, error));
return;
Expand Down
100 changes: 35 additions & 65 deletions cvat/apps/consensus/merging_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
from typing import Type

import datumaro as dm
import django_rq
from django.conf import settings
from django.db import transaction
from django.http import HttpResponseBadRequest
from django_rq.queues import DjangoRQ as RqQueue
from rq.job import Job as RqJob
from rq.job import JobStatus as RqJobStatus

from cvat.apps.consensus.intersect_merge import IntersectMerge
from cvat.apps.consensus.models import ConsensusSettings
Expand All @@ -21,17 +19,19 @@
DimensionType,
Job,
JobType,
RequestTarget,
StageChoice,
StateChoice,
Task,
User,
clear_annotations_in_jobs,
)
from cvat.apps.engine.rq import BaseRQMeta, define_dependent_job
from cvat.apps.engine.types import ExtendedRequest
from cvat.apps.engine.utils import get_rq_lock_by_user
from cvat.apps.profiler import silk_profile
from cvat.apps.quality_control.quality_reports import ComparisonParameters, JobDataProvider
from cvat.apps.redis_handler.background import AbstractRQJobManager
from cvat.apps.redis_handler.rq import RQId


class _TaskMerger:
Expand Down Expand Up @@ -159,84 +159,54 @@
pass


class JobAlreadyExists(MergingNotAvailable):
def __init__(self, instance: Task | Job):
super().__init__()
self.instance = instance
class MergingManager(AbstractRQJobManager):
QUEUE_NAME = settings.CVAT_QUEUES.CONSENSUS.value
SUPPORTED_RESOURCES = {RequestTarget.TASK, RequestTarget.JOB}

def __str__(self):
return f"Merging for this {type(self.instance).__name__.lower()} already enqueued"


class MergingManager:
_QUEUE_CUSTOM_JOB_PREFIX = "consensus-merge-"
_JOB_RESULT_TTL = 300
_JOB_FAILURE_TTL = _JOB_RESULT_TTL

def _get_queue(self) -> RqQueue:
return django_rq.get_queue(settings.CVAT_QUEUES.CONSENSUS.value)
def build_rq_id(self) -> str:
# todo: add redis migration
return RQId(
queue=self.QUEUE_NAME,
action="merge",
target=self.resource,
id=self.db_instance.pk,
).render()

def _make_job_id(self, task_id: int, job_id: int | None, user_id: int) -> str:
key = f"{self._QUEUE_CUSTOM_JOB_PREFIX}task-{task_id}"
if job_id:
key += f"-job-{job_id}"
key += f"-user-{user_id}" # TODO: remove user id, add support for non owners to get status
return key
def _split_to_task_and_job(self) -> tuple[Task, Job | None]:
if isinstance(self.db_instance, Job):
return self.db_instance.segment.task, self.db_instance

def _check_merging_available(self, task: Task, job: Job | None):
_TaskMerger(task=task).check_merging_available(parent_job_id=job.id if job else None)
return self.db_instance, None

def schedule_merge(self, target: Task | Job, *, request: ExtendedRequest) -> str:
if isinstance(target, Job):
target_task = target.segment.task
target_job = target
else:
target_task = target
target_job = None
def validate_request(self):
# FUTURE-FIXME: check that there is no indirectly dependent RQ jobs:
# e.g merge whole task and merge a particular job from the task
task, job = self._split_to_task_and_job()

self._check_merging_available(target_task, target_job)
try:
_TaskMerger(task=task).check_merging_available(parent_job_id=job.pk if job else None)
except MergingNotAvailable as ex:
return HttpResponseBadRequest(str(ex))

queue = self._get_queue()
def setup_background_job(self, queue: RqQueue, rq_id: str) -> None:
user_id = self.request.user.id

user_id = request.user.id
with get_rq_lock_by_user(queue, user_id=user_id):
rq_id = self._make_job_id(
task_id=target_task.id,
job_id=target_job.id if target_job else None,
user_id=user_id,
)
rq_job = queue.fetch_job(rq_id)
if rq_job:
if rq_job.get_status(refresh=False) in (
RqJobStatus.QUEUED,
RqJobStatus.STARTED,
RqJobStatus.SCHEDULED,
RqJobStatus.DEFERRED,
):
raise JobAlreadyExists(target)

rq_job.delete()

dependency = define_dependent_job(
queue, user_id=user_id, rq_id=rq_id, should_be_dependent=True
)

dependency = define_dependent_job(queue, user_id=user_id, rq_id=rq_id)
queue.enqueue(
self._merge,
target_type=type(target),
target_id=target.id,
target_type=type(self.db_instance),
target_id=self.db_instance.pk,
job_id=rq_id,
meta=BaseRQMeta.build(request=request, db_obj=target),
meta=BaseRQMeta.build(request=self.request, db_obj=self.db_instance),
result_ttl=self._JOB_RESULT_TTL,
failure_ttl=self._JOB_RESULT_TTL,
failure_ttl=self._JOB_FAILURE_TTL,
depends_on=dependency,
)

return rq_id

def get_job(self, rq_id: str) -> RqJob | None:
queue = self._get_queue()
return queue.fetch_job(rq_id)

@classmethod
@silk_profile()
def _merge(cls, *, target_type: Type[Task | Job], target_id: int) -> int:
Expand Down
Loading
Loading