Skip to content

Commit 8ec1a25

Browse files
author
Alex Stelea
authored
feat: create process-week queue for improved week processing (#225)
1 parent 865cb55 commit 8ec1a25

8 files changed

Lines changed: 104 additions & 13 deletions

File tree

apps/admin/src/app/seasons/[seasonId]/weeks/[weekId]/page.tsx

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ type WeekPageProps = {
1515

1616
const WeekPage: FC<WeekPageProps> = ({ params: paramsPromise }) => {
1717
const params = use(paramsPromise);
18-
const recalculatePoints =
19-
api.season.addCalculateSeasonPointsJob.useMutation();
18+
const addProcessWeekJob = api.season.addProcessWeekJob.useMutation();
2019
const updatePointsPool = api.week.updatePointsPool.useMutation();
2120
const updateMultiplier = api.week.updateActivityWeekMultiplier.useMutation();
2221
const {
@@ -51,9 +50,8 @@ const WeekPage: FC<WeekPageProps> = ({ params: paramsPromise }) => {
5150
const week = weekData;
5251

5352
const handleProcessWeek = async () => {
54-
await recalculatePoints.mutateAsync({
53+
await addProcessWeekJob.mutateAsync({
5554
weekId: params.weekId,
56-
force: week?.processed,
5755
});
5856

5957
toast.info('Processing week job started', {
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { redisClient } from '../../redis';
2+
import { createQueue } from '../createQueue';
3+
import { QueueName } from '../types';
4+
import { processWeekWorker } from './worker';
5+
6+
export const processWeekQueue = createQueue({
7+
name: QueueName.processWeek,
8+
redisClient,
9+
worker: processWeekWorker,
10+
onError: async (_, error) => {
11+
console.error(error);
12+
},
13+
});
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { z } from 'zod';
2+
3+
export const ProcessWeekJobSchema = z.object({
4+
weekId: z.string(),
5+
});
6+
7+
export type ProcessWeekJob = z.infer<typeof ProcessWeekJobSchema>;
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { dependencyLayer } from 'api/incentives';
2+
import type { Job } from 'bullmq';
3+
import { FlowProducer } from 'bullmq';
4+
import { Exit } from 'effect';
5+
import { handleExitError } from '../../helpers/handleExitError';
6+
import { redisClient } from '../../redis';
7+
import { QueueName } from '../types';
8+
import type { ProcessWeekJob } from './schemas';
9+
10+
const flowProducer = new FlowProducer({ connection: redisClient });
11+
12+
export const processWeekWorker = async (job: Job<ProcessWeekJob>) => {
13+
const weekId = job.data.weekId;
14+
15+
const seasonResult = await dependencyLayer.getSeasonByWeekId(weekId);
16+
17+
if (Exit.isFailure(seasonResult)) {
18+
return handleExitError(seasonResult);
19+
}
20+
21+
const seasonId = seasonResult.value.id;
22+
23+
job.log(`starting scheduled calculations for weekId: ${weekId}`);
24+
25+
await flowProducer.add({
26+
queueName: QueueName.populateLeaderboardCache,
27+
name: 'process-week',
28+
data: { weekId },
29+
opts: {
30+
attempts: 3,
31+
backoff: {
32+
type: 'exponential',
33+
delay: 5000,
34+
},
35+
},
36+
children: [
37+
{
38+
queueName: QueueName.calculateSeasonPoints,
39+
name: 'process-week',
40+
data: { weekId, seasonId, markAsProcessed: true },
41+
opts: { failParentOnFailure: true },
42+
children: [
43+
{
44+
queueName: QueueName.seasonPointsMultiplier,
45+
name: 'process-week',
46+
data: { weekId },
47+
opts: { failParentOnFailure: true },
48+
children: [
49+
{
50+
queueName: QueueName.calculateActivityPoints,
51+
name: 'process-week',
52+
data: { weekId },
53+
opts: { failParentOnFailure: true },
54+
},
55+
],
56+
},
57+
],
58+
},
59+
],
60+
});
61+
};

apps/workers/src/queues/scheduled-calculations/worker.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { FlowProducer } from 'bullmq';
44
import { Exit } from 'effect';
55
import { handleExitError } from '../../helpers/handleExitError';
66
import { redisClient } from '../../redis';
7-
import { calculateSeasonPointsQueue } from '../calculate-season-points/queue';
7+
import { processWeekQueue } from '../process-week/queue';
88
import { QueueName } from '../types';
99
import type { ScheduledCalculationsJob } from './schemas';
1010

@@ -41,9 +41,8 @@ export const scheduledCalculationsWorker = async (
4141
if (unprocessedWeeks.length > 0) {
4242
for (const week of unprocessedWeeks) {
4343
job.log(`adding end-of-week-calculation job for weekId: ${week.weekId}`);
44-
await calculateSeasonPointsQueue.queue.add('end-of-week-calculation', {
44+
await processWeekQueue.queue.add('process-week', {
4545
weekId: week.weekId,
46-
markAsProcessed: true,
4746
});
4847
}
4948
}

apps/workers/src/queues/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export const QueueName = {
88
snapshotDateRange: 'snapshotDateRange',
99
scheduledCalculations: 'scheduledCalculations',
1010
populateLeaderboardCache: 'populateLeaderboardCache',
11+
processWeek: 'processWeek',
1112
} as const;
1213

1314
export type QueueName = (typeof QueueName)[keyof typeof QueueName];

apps/workers/src/server/index.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import { eventQueue } from '../queues/event/queue';
1919
import { eventQueueJobSchema } from '../queues/event/schemas';
2020
import { populateLeaderboardCacheQueue } from '../queues/populate-leaderboard-cache/queue';
2121
import { populateLeaderboardCacheSchema } from '../queues/populate-leaderboard-cache/schemas';
22+
import { processWeekQueue } from '../queues/process-week/queue';
23+
import { ProcessWeekJobSchema } from '../queues/process-week/schemas';
2224
import { scheduledCalculationsQueue } from '../queues/scheduled-calculations/queue';
2325
import { scheduledSnapshotQueue } from '../queues/scheduled-snapshot/queue';
2426
import { snapshotQueue } from '../queues/snapshot/queue';
@@ -46,6 +48,8 @@ metricsApp.get('/metrics', async (c) => {
4648
await scheduledCalculationsQueue.queue.exportPrometheusMetrics();
4749
const populateLeaderboardCacheQueueMetrics =
4850
await populateLeaderboardCacheQueue.queue.exportPrometheusMetrics();
51+
const processWeekQueueMetrics =
52+
await processWeekQueue.queue.exportPrometheusMetrics();
4953
return c.text(
5054
[
5155
snapshotQueueMetrics,
@@ -55,6 +59,7 @@ metricsApp.get('/metrics', async (c) => {
5559
calculateActivityPointsQueueMetrics,
5660
scheduledCalculationsQueueMetrics,
5761
populateLeaderboardCacheQueueMetrics,
62+
processWeekQueueMetrics,
5863
].join('\n'),
5964
);
6065
});
@@ -116,6 +121,16 @@ app.post('/queues/calculate-season-points/add', async (c) => {
116121
return c.text('ok');
117122
});
118123

124+
app.post('/queues/process-week/add', async (c) => {
125+
const input = await c.req.json();
126+
const parsedInput = ProcessWeekJobSchema.safeParse(input);
127+
if (!parsedInput.success) {
128+
return c.json({ error: parsedInput.error.message }, 400);
129+
}
130+
await processWeekQueue.queue.add('process-week', parsedInput.data);
131+
return c.text('ok');
132+
});
133+
119134
app.post('/queues/calculate-season-points-multiplier/add', async (c) => {
120135
const input = await c.req.json();
121136
const parsedInput = seasonPointsMultiplierJobSchema.safeParse(input);
@@ -169,6 +184,7 @@ createBullBoard({
169184
new BullMQAdapter(seasonPointsMultiplierQueue.queue),
170185
new BullMQAdapter(scheduledCalculationsQueue.queue),
171186
new BullMQAdapter(populateLeaderboardCacheQueue.queue),
187+
new BullMQAdapter(processWeekQueue.queue),
172188
],
173189
serverAdapter,
174190
});

packages/api/src/incentives/season/seasonRouter.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,23 +145,19 @@ export const adminSeasonRouter = createTRPCRouter({
145145
});
146146
}),
147147

148-
addCalculateSeasonPointsJob: publicProcedure
148+
addProcessWeekJob: publicProcedure
149149
.input(
150150
z.object({
151151
weekId: z.string(),
152-
force: z.boolean().optional(),
153152
}),
154153
)
155154
.mutation(async ({ input }) => {
156155
const response = await fetch(
157-
`${process.env.WORKERS_API_BASE_URL}/queues/scheduled-calculations/add`,
156+
`${process.env.WORKERS_API_BASE_URL}/queues/process-week/add`,
158157
{
159158
method: 'POST',
160159
body: JSON.stringify({
161160
weekId: input.weekId,
162-
force: input.force,
163-
markAsProcessed: true,
164-
includeSPCalculations: true,
165161
}),
166162
},
167163
);

0 commit comments

Comments
 (0)