Skip to content

Commit 5165cec

Browse files
authored
ref: allow processing jobs at once up to max concurrency (#244)
1 parent e094d53 commit 5165cec

File tree

3 files changed

+281
-18
lines changed

3 files changed

+281
-18
lines changed

skynet/modules/ttt/summaries/jobs.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -399,30 +399,24 @@ def remove_task(t):
399399

400400

401401
async def maybe_run_next_job() -> None:
402-
next_job_id = None
403-
404402
# Priority order for processor queues - prefer faster external APIs over local processing
405403
processor_priority = [Processors.OCI, Processors.OPENAI, Processors.AZURE, Processors.LOCAL]
406404

407-
# Try each processor queue in priority order, but only if it can handle more jobs
405+
# Try to fill capacity for each processor
408406
for processor in processor_priority:
409-
if not can_run_next_job(processor):
410-
continue
407+
while can_run_next_job(processor):
408+
pending_key = get_processor_queue_keys(processor)[0]
409+
next_job_id = await db.lpop(pending_key)
411410

412-
pending_key = get_processor_queue_keys(processor)[0]
413-
next_job_id = await db.lpop(pending_key)
411+
if not next_job_id:
412+
break # No more jobs in this queue
414413

415-
if next_job_id:
416414
log.info(f"Found job {next_job_id} in {processor.value} queue")
417-
break
415+
next_job = await get_job(next_job_id)
416+
create_run_job_task(next_job)
418417

419418
await update_summary_queue_metric()
420419

421-
if next_job_id:
422-
log.info(f"Next job id: {next_job_id}")
423-
next_job = await get_job(next_job_id)
424-
create_run_job_task(next_job)
425-
426420

427421
async def monitor_candidate_jobs() -> None:
428422
# Run one-time migration from legacy queues to processor-specific queues

skynet/modules/ttt/summaries/jobs_test.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,266 @@ def mock_mget(keys):
147147
assert db.lpush.call_count == 2
148148
db.lpush.assert_any_call(PENDING_JOBS_LOCAL_KEY, job_2.id)
149149
db.lpush.assert_any_call(PENDING_JOBS_LOCAL_KEY, job_3.id)
150+
151+
152+
class TestMaybeRunNextJob:
153+
@pytest.mark.asyncio
154+
async def test_pulls_multiple_jobs_when_capacity_available(self, mocker):
155+
'''Test that multiple jobs are pulled when there's capacity for multiple.'''
156+
from skynet.constants import PENDING_JOBS_OPENAI_KEY
157+
from skynet.modules.ttt.summaries.jobs import maybe_run_next_job
158+
from skynet.modules.ttt.summaries.v1.models import Processors
159+
160+
# Mock dependencies
161+
mocker.patch('skynet.modules.ttt.summaries.jobs.modules', {'summaries:executor'})
162+
mocker.patch('skynet.modules.ttt.summaries.jobs.update_summary_queue_metric')
163+
mock_get_job = mocker.patch('skynet.modules.ttt.summaries.jobs.get_job')
164+
mock_create_task = mocker.patch('skynet.modules.ttt.summaries.jobs.create_run_job_task')
165+
166+
# Create mock jobs
167+
job_ids = ['job:1:openai', 'job:2:openai', 'job:3:openai']
168+
mock_jobs = [
169+
Job(
170+
id=job_id,
171+
payload=DocumentPayload(text='test'),
172+
metadata=DocumentMetadata(customer_id='test'),
173+
type=JobType.SUMMARY,
174+
)
175+
for job_id in job_ids
176+
]
177+
178+
# Mock lpop to return None for OCI queue, then 3 jobs for OPENAI, then None
179+
openai_job_index = [0]
180+
181+
def lpop_side_effect(key):
182+
if key == PENDING_JOBS_OPENAI_KEY:
183+
if openai_job_index[0] < len(job_ids):
184+
job_id = job_ids[openai_job_index[0]]
185+
openai_job_index[0] += 1
186+
return job_id
187+
return None
188+
189+
mocker.patch('skynet.modules.ttt.persistence.db.lpop', side_effect=lpop_side_effect)
190+
191+
mock_get_job.side_effect = mock_jobs
192+
193+
# Mock capacity: current_tasks starts empty, max_concurrency is 5
194+
mocker.patch(
195+
'skynet.modules.ttt.summaries.jobs.current_tasks',
196+
{
197+
Processors.OPENAI: set(),
198+
Processors.AZURE: set(),
199+
Processors.OCI: set(),
200+
Processors.LOCAL: set(),
201+
},
202+
)
203+
mocker.patch('skynet.modules.ttt.summaries.jobs.max_concurrency_openai', 5)
204+
205+
await maybe_run_next_job()
206+
207+
# Should have pulled all 3 jobs from OPENAI queue
208+
assert mock_get_job.call_count == 3
209+
assert mock_create_task.call_count == 3
210+
211+
@pytest.mark.asyncio
212+
async def test_stops_when_capacity_reached(self, mocker):
213+
'''Test that job pulling stops when processor reaches max concurrency.'''
214+
import asyncio
215+
216+
from skynet.constants import PENDING_JOBS_OPENAI_KEY
217+
from skynet.modules.ttt.summaries.jobs import maybe_run_next_job
218+
from skynet.modules.ttt.summaries.v1.models import Processors
219+
220+
# Mock dependencies
221+
mocker.patch('skynet.modules.ttt.summaries.jobs.modules', {'summaries:executor'})
222+
mocker.patch('skynet.modules.ttt.summaries.jobs.update_summary_queue_metric')
223+
mock_get_job = mocker.patch('skynet.modules.ttt.summaries.jobs.get_job')
224+
mock_create_task = mocker.patch('skynet.modules.ttt.summaries.jobs.create_run_job_task')
225+
226+
# Mock 2 running tasks, max is 3, so should only pull 1 more job
227+
mock_tasks = {asyncio.create_task(asyncio.sleep(0)), asyncio.create_task(asyncio.sleep(0))}
228+
mocker.patch(
229+
'skynet.modules.ttt.summaries.jobs.current_tasks',
230+
{
231+
Processors.OPENAI: mock_tasks,
232+
Processors.AZURE: set(),
233+
Processors.OCI: set(),
234+
Processors.LOCAL: set(),
235+
},
236+
)
237+
mocker.patch('skynet.modules.ttt.summaries.jobs.max_concurrency_openai', 3)
238+
239+
# Mock queue with jobs available - return one job for OPENAI, None for others
240+
job = Job(
241+
id='job:1:openai',
242+
payload=DocumentPayload(text='test'),
243+
metadata=DocumentMetadata(customer_id='test'),
244+
type=JobType.SUMMARY,
245+
)
246+
247+
openai_call_count = [0]
248+
249+
def lpop_side_effect(key):
250+
if key == PENDING_JOBS_OPENAI_KEY and openai_call_count[0] == 0:
251+
openai_call_count[0] += 1
252+
return 'job:1:openai'
253+
return None
254+
255+
mocker.patch('skynet.modules.ttt.persistence.db.lpop', side_effect=lpop_side_effect)
256+
mock_get_job.return_value = job
257+
258+
await maybe_run_next_job()
259+
260+
# Should only pull 1 job (capacity is 3, currently 2 running)
261+
assert openai_call_count[0] == 1
262+
assert mock_create_task.call_count == 1
263+
264+
@pytest.mark.asyncio
265+
async def test_processes_multiple_processors(self, mocker):
266+
'''Test that jobs are pulled from multiple processor queues in priority order.'''
267+
from skynet.constants import PENDING_JOBS_OCI_KEY, PENDING_JOBS_OPENAI_KEY
268+
from skynet.modules.ttt.summaries.jobs import maybe_run_next_job
269+
from skynet.modules.ttt.summaries.v1.models import Processors
270+
271+
# Mock dependencies
272+
mocker.patch('skynet.modules.ttt.summaries.jobs.modules', {'summaries:executor'})
273+
mocker.patch('skynet.modules.ttt.summaries.jobs.update_summary_queue_metric')
274+
mock_get_job = mocker.patch('skynet.modules.ttt.summaries.jobs.get_job')
275+
mock_create_task = mocker.patch('skynet.modules.ttt.summaries.jobs.create_run_job_task')
276+
277+
# Create mock jobs for different processors
278+
oci_job = Job(
279+
id='job:1:oci',
280+
payload=DocumentPayload(text='test'),
281+
metadata=DocumentMetadata(customer_id='test'),
282+
type=JobType.SUMMARY,
283+
)
284+
openai_job = Job(
285+
id='job:2:openai',
286+
payload=DocumentPayload(text='test'),
287+
metadata=DocumentMetadata(customer_id='test'),
288+
type=JobType.SUMMARY,
289+
)
290+
291+
# Track which queues have been popped
292+
oci_popped = [False]
293+
openai_popped = [False]
294+
295+
def lpop_side_effect(key):
296+
if key == PENDING_JOBS_OCI_KEY and not oci_popped[0]:
297+
oci_popped[0] = True
298+
return 'job:1:oci'
299+
elif key == PENDING_JOBS_OPENAI_KEY and not openai_popped[0]:
300+
openai_popped[0] = True
301+
return 'job:2:openai'
302+
return None
303+
304+
mocker.patch('skynet.modules.ttt.persistence.db.lpop', side_effect=lpop_side_effect)
305+
306+
def get_job_side_effect(job_id):
307+
if job_id == 'job:1:oci':
308+
return oci_job
309+
return openai_job
310+
311+
mock_get_job.side_effect = get_job_side_effect
312+
313+
# Mock capacity for all processors
314+
mocker.patch(
315+
'skynet.modules.ttt.summaries.jobs.current_tasks',
316+
{
317+
Processors.OPENAI: set(),
318+
Processors.AZURE: set(),
319+
Processors.OCI: set(),
320+
Processors.LOCAL: set(),
321+
},
322+
)
323+
mocker.patch('skynet.modules.ttt.summaries.jobs.max_concurrency_openai', 5)
324+
mocker.patch('skynet.modules.ttt.summaries.jobs.max_concurrency_oci', 5)
325+
326+
await maybe_run_next_job()
327+
328+
# Should have pulled jobs from both OCI and OPENAI
329+
assert mock_create_task.call_count == 2
330+
331+
@pytest.mark.asyncio
332+
async def test_stops_when_queue_empty(self, mocker):
333+
'''Test that job pulling stops when queue is empty.'''
334+
from skynet.modules.ttt.summaries.jobs import maybe_run_next_job
335+
from skynet.modules.ttt.summaries.v1.models import Processors
336+
337+
# Mock dependencies
338+
mocker.patch('skynet.modules.ttt.summaries.jobs.modules', {'summaries:executor'})
339+
mocker.patch('skynet.modules.ttt.summaries.jobs.update_summary_queue_metric')
340+
mock_get_job = mocker.patch('skynet.modules.ttt.summaries.jobs.get_job')
341+
mock_create_task = mocker.patch('skynet.modules.ttt.summaries.jobs.create_run_job_task')
342+
343+
# Mock empty queue (lpop returns None)
344+
mocker.patch('skynet.modules.ttt.persistence.db.lpop', return_value=None)
345+
346+
# Mock capacity available
347+
mocker.patch(
348+
'skynet.modules.ttt.summaries.jobs.current_tasks',
349+
{
350+
Processors.OPENAI: set(),
351+
Processors.AZURE: set(),
352+
Processors.OCI: set(),
353+
Processors.LOCAL: set(),
354+
},
355+
)
356+
mocker.patch('skynet.modules.ttt.summaries.jobs.max_concurrency_openai', 5)
357+
358+
await maybe_run_next_job()
359+
360+
# Should not start any jobs
361+
assert mock_get_job.call_count == 0
362+
assert mock_create_task.call_count == 0
363+
364+
@pytest.mark.asyncio
365+
async def test_skips_processor_without_capacity(self, mocker):
366+
'''Test that processors without capacity are skipped.'''
367+
import asyncio
368+
369+
from skynet.constants import (
370+
PENDING_JOBS_AZURE_KEY,
371+
PENDING_JOBS_LOCAL_KEY,
372+
PENDING_JOBS_OCI_KEY,
373+
PENDING_JOBS_OPENAI_KEY,
374+
)
375+
from skynet.modules.ttt.summaries.jobs import maybe_run_next_job
376+
from skynet.modules.ttt.summaries.v1.models import Processors
377+
378+
# Mock dependencies
379+
mocker.patch('skynet.modules.ttt.summaries.jobs.modules', {'summaries:executor'})
380+
mocker.patch('skynet.modules.ttt.summaries.jobs.update_summary_queue_metric')
381+
382+
keys_checked = []
383+
384+
def lpop_side_effect(key):
385+
keys_checked.append(key)
386+
return None # All queues empty
387+
388+
mock_lpop = mocker.patch('skynet.modules.ttt.persistence.db.lpop', side_effect=lpop_side_effect)
389+
390+
# OPENAI at capacity, others have capacity
391+
mock_openai_tasks = {asyncio.create_task(asyncio.sleep(0)) for _ in range(3)}
392+
mocker.patch(
393+
'skynet.modules.ttt.summaries.jobs.current_tasks',
394+
{
395+
Processors.OPENAI: mock_openai_tasks,
396+
Processors.AZURE: set(),
397+
Processors.OCI: set(),
398+
Processors.LOCAL: set(),
399+
},
400+
)
401+
mocker.patch('skynet.modules.ttt.summaries.jobs.max_concurrency_openai', 3) # At capacity
402+
mocker.patch('skynet.modules.ttt.summaries.jobs.max_concurrency_oci', 5)
403+
mocker.patch('skynet.modules.ttt.summaries.jobs.max_concurrency_azure', 5)
404+
mocker.patch('skynet.modules.ttt.summaries.jobs.max_concurrency_local', 5)
405+
406+
await maybe_run_next_job()
407+
408+
# Should check OCI, AZURE, and LOCAL queues but NOT OPENAI (since it's at capacity)
409+
assert PENDING_JOBS_OCI_KEY in keys_checked
410+
assert PENDING_JOBS_AZURE_KEY in keys_checked
411+
assert PENDING_JOBS_LOCAL_KEY in keys_checked
412+
assert PENDING_JOBS_OPENAI_KEY not in keys_checked

skynet/modules/ttt/summaries/processor_queue_test.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,12 @@ async def test_maybe_run_next_job_follows_priority_order(self, mocker):
158158
mocker.patch('skynet.modules.ttt.summaries.jobs.get_job')
159159
mocker.patch('skynet.modules.ttt.summaries.jobs.create_run_job_task')
160160

161-
# Mock lpop to return job only for OCI queue (first in priority)
161+
# Mock lpop to return job only once for OCI queue (first in priority), then None
162+
oci_called = [False]
163+
162164
def mock_lpop(key):
163-
if 'oci' in key:
165+
if 'oci' in key and not oci_called[0]:
166+
oci_called[0] = True
164167
return 'test_job_id:OCI'
165168
return None
166169

@@ -188,9 +191,12 @@ def mock_can_run(processor):
188191
mocker.patch('skynet.modules.ttt.summaries.jobs.get_job')
189192
mocker.patch('skynet.modules.ttt.summaries.jobs.create_run_job_task')
190193

191-
# Mock lpop to return job for AZURE queue
194+
# Mock lpop to return job only once for AZURE queue, then None
195+
azure_called = [False]
196+
192197
def mock_lpop(key):
193-
if 'azure' in key:
198+
if 'azure' in key and not azure_called[0]:
199+
azure_called[0] = True
194200
return 'test_job_id:AZURE'
195201
return None
196202

0 commit comments

Comments
 (0)