11
11
from datetime import timedelta
12
12
from functools import cached_property , partial
13
13
from typing import Any , Callable , Dict , Hashable , List , Optional , Sequence , Tuple , Union , cast
14
- from uuid import uuid4
15
14
16
15
import datumaro as dm
17
16
import datumaro .util .mask_tools
18
17
import django_rq
19
18
import numpy as np
19
+ import rq
20
20
from attrs import asdict , define , fields_dict
21
21
from datumaro .util import dump_json , parse_json
22
22
from django .conf import settings
23
23
from django .db import transaction
24
24
from django .utils import timezone
25
+ from django_rq .queues import DjangoRQ as RqQueue
26
+ from rest_framework .request import Request
27
+ from rq .job import Job as RqJob
28
+ from rq_scheduler import Scheduler as RqScheduler
25
29
from scipy .optimize import linear_sum_assignment
26
30
27
31
from cvat .apps .dataset_manager .bindings import (
48
52
User ,
49
53
ValidationMode ,
50
54
)
55
+ from cvat .apps .engine .utils import define_dependent_job , get_rq_job_meta , get_rq_lock_by_user
51
56
from cvat .apps .profiler import silk_profile
52
57
from cvat .apps .quality_control import models
53
58
from cvat .apps .quality_control .models import (
@@ -2115,25 +2120,30 @@ def generate_report(self) -> ComparisonReport:
2115
2120
2116
2121
2117
2122
class QualityReportUpdateManager :
2118
- _QUEUE_JOB_PREFIX = "update-quality-metrics-task-"
2123
+ _QUEUE_AUTOUPDATE_JOB_PREFIX = "update-quality-metrics-"
2124
+ _QUEUE_CUSTOM_JOB_PREFIX = "quality-check-"
2119
2125
_RQ_CUSTOM_QUALITY_CHECK_JOB_TYPE = "custom_quality_check"
2120
2126
_JOB_RESULT_TTL = 120
2121
2127
2122
2128
@classmethod
2123
2129
def _get_quality_check_job_delay (cls ) -> timedelta :
2124
2130
return timedelta (seconds = settings .QUALITY_CHECK_JOB_DELAY )
2125
2131
2126
- def _get_scheduler (self ):
2132
+ def _get_scheduler (self ) -> RqScheduler :
2127
2133
return django_rq .get_scheduler (settings .CVAT_QUEUES .QUALITY_REPORTS .value )
2128
2134
2129
- def _get_queue (self ):
2135
+ def _get_queue (self ) -> RqQueue :
2130
2136
return django_rq .get_queue (settings .CVAT_QUEUES .QUALITY_REPORTS .value )
2131
2137
2132
2138
def _make_queue_job_id_base (self , task : Task ) -> str :
2133
- return f"{ self ._QUEUE_JOB_PREFIX } { task .id } "
2139
+ return f"{ self ._QUEUE_AUTOUPDATE_JOB_PREFIX } task- { task .id } "
2134
2140
2135
- def _make_custom_quality_check_job_id (self ) -> str :
2136
- return uuid4 ().hex
2141
+ def _make_custom_quality_check_job_id (self , task_id : int , user_id : int ) -> str :
2142
+ # FUTURE-TODO: it looks like job ID template should not include user_id because:
2143
+ # 1. There is no need to compute quality reports several times for different users
2144
+ # 2. Each user (not only rq job owner) that has permission to access a task should
2145
+ # be able to check the status of the computation process
2146
+ return f"{ self ._QUEUE_CUSTOM_JOB_PREFIX } task-{ task_id } -user-{ user_id } "
2137
2147
2138
2148
@classmethod
2139
2149
def _get_last_report_time (cls , task : Task ) -> Optional [timezone .datetime ]:
@@ -2186,28 +2196,52 @@ def schedule_quality_autoupdate_job(self, task: Task):
2186
2196
task_id = task .id ,
2187
2197
)
2188
2198
2189
- def schedule_quality_check_job (self , task : Task , * , user_id : int ) -> str :
2199
+ class JobAlreadyExists (QualityReportsNotAvailable ):
2200
+ def __str__ (self ):
2201
+ return "Quality computation job for this task already enqueued"
2202
+
2203
+ def schedule_custom_quality_check_job (
2204
+ self , request : Request , task : Task , * , user_id : int
2205
+ ) -> str :
2190
2206
"""
2191
2207
Schedules a quality report computation job, supposed for updates by a request.
2192
2208
"""
2193
2209
2194
2210
self ._check_quality_reporting_available (task )
2195
2211
2196
- rq_id = self ._make_custom_quality_check_job_id ()
2197
-
2198
2212
queue = self ._get_queue ()
2199
- queue .enqueue (
2200
- self ._check_task_quality ,
2201
- task_id = task .id ,
2202
- job_id = rq_id ,
2203
- meta = {"user_id" : user_id , "job_type" : self ._RQ_CUSTOM_QUALITY_CHECK_JOB_TYPE },
2204
- result_ttl = self ._JOB_RESULT_TTL ,
2205
- failure_ttl = self ._JOB_RESULT_TTL ,
2206
- )
2213
+
2214
+ with get_rq_lock_by_user (queue , user_id = user_id ):
2215
+ rq_id = self ._make_custom_quality_check_job_id (task_id = task .id , user_id = user_id )
2216
+ rq_job = queue .fetch_job (rq_id )
2217
+ if rq_job :
2218
+ if rq_job .get_status (refresh = False ) in (
2219
+ rq .job .JobStatus .QUEUED ,
2220
+ rq .job .JobStatus .STARTED ,
2221
+ rq .job .JobStatus .SCHEDULED ,
2222
+ rq .job .JobStatus .DEFERRED ,
2223
+ ):
2224
+ raise self .JobAlreadyExists ()
2225
+
2226
+ rq_job .delete ()
2227
+
2228
+ dependency = define_dependent_job (
2229
+ queue , user_id = user_id , rq_id = rq_id , should_be_dependent = True
2230
+ )
2231
+
2232
+ queue .enqueue (
2233
+ self ._check_task_quality ,
2234
+ task_id = task .id ,
2235
+ job_id = rq_id ,
2236
+ meta = get_rq_job_meta (request = request , db_obj = task ),
2237
+ result_ttl = self ._JOB_RESULT_TTL ,
2238
+ failure_ttl = self ._JOB_RESULT_TTL ,
2239
+ depends_on = dependency ,
2240
+ )
2207
2241
2208
2242
return rq_id
2209
2243
2210
- def get_quality_check_job (self , rq_id : str ):
2244
+ def get_quality_check_job (self , rq_id : str ) -> Optional [ RqJob ] :
2211
2245
queue = self ._get_queue ()
2212
2246
rq_job = queue .fetch_job (rq_id )
2213
2247
@@ -2216,8 +2250,8 @@ def get_quality_check_job(self, rq_id: str):
2216
2250
2217
2251
return rq_job
2218
2252
2219
- def is_custom_quality_check_job (self , rq_job ) -> bool :
2220
- return rq_job .meta . get ( "job_type" ) == self ._RQ_CUSTOM_QUALITY_CHECK_JOB_TYPE
2253
+ def is_custom_quality_check_job (self , rq_job : RqJob ) -> bool :
2254
+ return isinstance ( rq_job .id , str ) and rq_job . id . startswith ( self ._QUEUE_CUSTOM_JOB_PREFIX )
2221
2255
2222
2256
@classmethod
2223
2257
@silk_profile ()
0 commit comments