-
Notifications
You must be signed in to change notification settings - Fork 3
add: Round service #48
Changes from 9 commits
d29133e
11eee5a
7e0a4e2
91fa889
bf300bf
538a697
0617456
fb6591a
85b80e0
c581ec7
348e3bb
639783f
f1329c2
2cac5a0
f073bc6
e9f6bfe
c5d9c11
1a37ac2
5cfe64e
5a3085d
4b15395
a93dd4c
3cf95d7
1ff9a62
8a1fe17
a37eb0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
/** @typedef {{id: number; start_time: string; end_time: string; max_tasks_per_node: number; }} Round */ | ||
/** @typedef {{ roundDurationMs: number; maxTasksPerNode: number; checkRoundIntervalMs: number }} RoundConfig */ | ||
|
||
export class RoundService { | ||
/** | ||
* @type {Round | null} | ||
*/ | ||
#currentRound = null | ||
#isInitializing = false | ||
/** | ||
* @type {NodeJS.Timeout | null} | ||
*/ | ||
#checkRoundIntervalId = null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noting that this might be necessary for tests, but not production usage |
||
#db | ||
#config | ||
|
||
#taskingService | ||
|
||
/** | ||
* @param {import('./typings.js').PgPool} db | ||
* @param {import('./tasking-service.js').TaskingService} taskingService | ||
* @param {RoundConfig} config | ||
*/ | ||
constructor (db, taskingService, config) { | ||
this.#db = db | ||
this.#config = config | ||
this.#taskingService = taskingService | ||
} | ||
|
||
/** | ||
* Start the round service | ||
*/ | ||
async start () { | ||
if (this.#isInitializing) return | ||
juliangruber marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.#isInitializing = true | ||
|
||
try { | ||
await this.#initializeRound() | ||
this.#scheduleRoundCheck() | ||
console.log(`Round service started. Round duration: ${this.#config.roundDurationMs / 60000} minutes`) | ||
} catch (error) { | ||
console.error('Failed to start round service:', error) | ||
pyropy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} finally { | ||
this.#isInitializing = false | ||
} | ||
} | ||
|
||
/** | ||
* Stop the round service | ||
*/ | ||
stop () { | ||
if (this.#checkRoundIntervalId) clearInterval(this.#checkRoundIntervalId) | ||
console.log('Round service stopped') | ||
} | ||
|
||
/** | ||
* Initialize the current round | ||
*/ | ||
async #initializeRound () { | ||
const activeRound = await this.#getActiveRound() | ||
|
||
if (activeRound) { | ||
this.#currentRound = activeRound | ||
console.log(`Resuming active round #${activeRound.id}`) | ||
} else { | ||
await this.#startNewRound() | ||
} | ||
} | ||
|
||
/** | ||
* Schedule periodic checks for round end | ||
*/ | ||
#scheduleRoundCheck () { | ||
this.#checkRoundIntervalId = setInterval(async () => { | ||
if (!this.#currentRound) return | ||
|
||
juliangruber marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const now = new Date() | ||
if (new Date(this.#currentRound.end_time) <= now) { | ||
pyropy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try { | ||
await this.#startNewRound() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What shall we do when this fails? |
||
} catch (error) { | ||
console.error('Error handling round end:', error) | ||
} | ||
} | ||
}, this.#config.checkRoundIntervalMs) | ||
} | ||
|
||
/** | ||
* Start a new round | ||
*/ | ||
async #startNewRound () { | ||
const previousRound = await this.#getActiveRound() | ||
this.#currentRound = await this.#createNewRound() | ||
if (!this.#currentRound) { | ||
juliangruber marked this conversation as resolved.
Show resolved
Hide resolved
|
||
throw new Error('Failed to start a new round') | ||
} | ||
|
||
if (this.#taskingService) { | ||
await this.#taskingService.generateTasksForRound(this.#currentRound.id) | ||
} | ||
|
||
if (previousRound) { | ||
await this.#changeRoundActive(previousRound.id, false) | ||
juliangruber marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
await this.#changeRoundActive(this.#currentRound.id, true) | ||
} | ||
|
||
/** | ||
* Get the current active round from the database | ||
*/ | ||
async #getActiveRound () { | ||
try { | ||
const { rows } = await this.#db.query(` | ||
SELECT * FROM checker_rounds | ||
WHERE active = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Motivation behind the Thing that I'm questioning is should we or should we not leave the last round as active in case the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, you can only consider the round active once tasks have been sampled, but you can also only sample tasks once the round exists, right? Can we then rename |
||
ORDER BY start_time DESC | ||
LIMIT 1 | ||
`) | ||
return rows[0] || null | ||
} catch (error) { | ||
console.error('Error getting active round:', error) | ||
pyropy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return null | ||
} | ||
} | ||
|
||
/** | ||
* Create a new round | ||
*/ | ||
async #createNewRound () { | ||
try { | ||
const now = new Date() | ||
const endTime = new Date(now.getTime() + this.#config.roundDurationMs) | ||
pyropy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
const { rows } = await this.#db.query(` | ||
INSERT INTO checker_rounds (start_time, end_time, max_tasks_per_node, active) | ||
VALUES ($1, $2, $3, $4) | ||
RETURNING * | ||
`, [now, endTime, this.#config.maxTasksPerNode, true]) | ||
|
||
const round = rows[0] | ||
console.log(`Created new round #${round.id} starting at ${round.start_time}`) | ||
return round | ||
} catch (error) { | ||
console.error('Error creating new round:', error) | ||
throw error | ||
} | ||
} | ||
|
||
/** | ||
* Change the status of a round using a transaction | ||
* @param {number} roundId | ||
* @param {Boolean} active | ||
*/ | ||
async #changeRoundActive (roundId, active) { | ||
const client = await this.#db.connect() | ||
|
||
try { | ||
await client.query('BEGIN') | ||
const { rows } = await client.query(` | ||
UPDATE checker_rounds | ||
SET active = $1 | ||
WHERE id = $2 | ||
RETURNING * | ||
`, [active, roundId]) | ||
await client.query('COMMIT') | ||
|
||
console.log(`Round #${rows[0].id} active: ${rows[0].active}`) | ||
return rows[0] | ||
} catch (error) { | ||
await client.query('ROLLBACK') | ||
console.error('Error changing round status:', error) | ||
throw error | ||
} finally { | ||
client.release() | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/** @typedef {any} Task */ | ||
/** @typedef {() => Promise<Task[]>} TaskSamplingFn */ | ||
/** @typedef {{ maxTasks: number }} TaskingConfig */ | ||
|
||
export class TaskingService { | ||
#db | ||
#config | ||
|
||
/** | ||
* @param {import('./typings.js').PgPool} db | ||
* @param {TaskingConfig} config | ||
*/ | ||
constructor (db, config) { | ||
this.#db = db | ||
this.#config = config | ||
} | ||
|
||
/** | ||
* Register a task sampler for a specific subnet | ||
* @param {string} subnet - The subnet identifier | ||
* @param {TaskSamplingFn} sampleFn - Function that generates tasks for a subnet | ||
*/ | ||
registerTaskSampler (subnet, sampleFn) { | ||
console.warn('Registering task sampler is not implemented.') | ||
} | ||
|
||
/** | ||
* Generate tasks for all registered subnets for a specific round | ||
* @param {number} roundId | ||
*/ | ||
async generateTasksForRound (roundId) { | ||
// TODO: Implement the logic to generate tasks for all registered subnets | ||
console.warn('Tasking service is not implemented.') | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
CREATE TABLE IF NOT EXISTS checker_rounds ( | ||
id BIGSERIAL PRIMARY KEY, | ||
start_time TIMESTAMPTZ NOT NULL, | ||
end_time TIMESTAMPTZ NOT NULL, | ||
active BOOLEAN NOT NULL DEFAULT FALSE, | ||
max_tasks_per_node INT NOT NULL DEFAULT 360 | ||
); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import assert from 'assert' | ||
import { after, before, beforeEach, describe, it } from 'node:test' | ||
import { createPgPool } from '../lib/pool.js' | ||
import { migrateWithPgClient } from '../lib/migrate.js' | ||
import { DATABASE_URL } from '../lib/config.js' | ||
import { RoundService } from '../lib/round-service.js' | ||
import { TaskingService } from '../lib/tasking-service.js' | ||
|
||
const DEFAULT_CONFIG = { | ||
roundDurationMs: 1000, | ||
maxTasks: 100, | ||
maxTasksPerNode: 10, | ||
checkRoundIntervalMs: 200 | ||
} | ||
|
||
describe('RoundService', () => { | ||
/** @type {import('pg').Pool} */ | ||
let pgPool | ||
/** @type {TaskingService} */ | ||
let taskingService | ||
|
||
before(async () => { | ||
pgPool = await createPgPool(DATABASE_URL) | ||
await migrateWithPgClient(pgPool) | ||
taskingService = new TaskingService(pgPool, { | ||
maxTasks: DEFAULT_CONFIG.maxTasks | ||
}) | ||
}) | ||
|
||
after(async () => { | ||
await pgPool.end() | ||
}) | ||
|
||
beforeEach(async () => { | ||
// Reset the database state before each test | ||
await pgPool.query('DELETE FROM checker_rounds') | ||
}) | ||
|
||
describe('rounds', () => { | ||
it('should create a new round if no active round exists', async () => { | ||
const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) | ||
|
||
await roundService.start() | ||
roundService.stop() | ||
|
||
const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') | ||
assert.strictEqual(rounds.length, 1) | ||
assert.ok(new Date(rounds[0].end_time) > new Date()) | ||
}) | ||
|
||
it('should resume an active round if one exists', async () => { | ||
const now = new Date() | ||
const endTime = new Date(now.getTime() + DEFAULT_CONFIG.roundDurationMs) | ||
await pgPool.query(` | ||
INSERT INTO checker_rounds (start_time, end_time, max_tasks_per_node, active) | ||
VALUES ($1, $2, $3, $4) | ||
`, [now, endTime, DEFAULT_CONFIG.maxTasksPerNode, true]) | ||
|
||
const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) | ||
|
||
await roundService.start() | ||
roundService.stop() | ||
|
||
const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') | ||
assert.strictEqual(rounds.length, 1) | ||
assert.strictEqual(new Date(rounds[0].start_time).toISOString(), now.toISOString()) | ||
}) | ||
|
||
it('should stop the round service and prevent further round checks', async () => { | ||
const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) | ||
|
||
await roundService.start() | ||
roundService.stop() | ||
|
||
const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') | ||
assert.strictEqual(rounds.length, 1) | ||
|
||
// Wait for the check interval to pass and ensure no new rounds are created | ||
await new Promise(resolve => setTimeout(resolve, DEFAULT_CONFIG.checkRoundIntervalMs + 1000)) | ||
|
||
const { rows: newRounds } = await pgPool.query('SELECT * FROM checker_rounds') | ||
assert.strictEqual(newRounds.length, 1) | ||
}) | ||
}) | ||
|
||
describe('round transitions', () => { | ||
it('should deactivate the old round and create a new one when the current round ends', async () => { | ||
const now = new Date() | ||
const endTime = new Date(now.getTime() + 1000) // 1 second duration | ||
await pgPool.query(` | ||
INSERT INTO checker_rounds (start_time, end_time, max_tasks_per_node, active) | ||
VALUES ($1, $2, $3, $4) | ||
`, [now, endTime, DEFAULT_CONFIG.maxTasksPerNode, true]) | ||
|
||
const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG) | ||
|
||
await roundService.start() | ||
|
||
// Wait for the current round to end | ||
await new Promise(resolve => setTimeout(resolve, 2000)) | ||
|
||
roundService.stop() | ||
|
||
const { rows: activeRounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true') | ||
assert.strictEqual(activeRounds.length, 1) | ||
assert.ok(new Date(activeRounds[0].start_time) > endTime) | ||
|
||
const { rows: allRounds } = await pgPool.query('SELECT * FROM checker_rounds') | ||
assert.strictEqual(allRounds.length, 2) | ||
}) | ||
}) | ||
}) |
Uh oh!
There was an error while loading. Please reload this page.