Skip to content

Commit 6898604

Browse files
xtexxjiegec
authored andcommitted
feat(server): deliver job assignment events to SSE feed
1 parent 3124c93 commit 6898604

1 file changed

Lines changed: 25 additions & 2 deletions

File tree

server/src/routes/worker.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ pub async fn worker_poll(
199199
.get()
200200
.context("Failed to get db connection from pool")?;
201201

202+
let mut feed_events: Vec<EventContent> = Vec::new();
203+
202204
match conn.transaction::<Option<(Pipeline, Job)>, diesel::result::Error, _>(|conn| {
203205
use crate::schema::jobs::dsl::*;
204206

@@ -209,9 +211,19 @@ pub async fn worker_poll(
209211
.first::<Worker>(conn)?;
210212

211213
// remove if any job is already allocated to the worker
212-
diesel::update(jobs.filter(assigned_worker_id.eq(worker.id)))
214+
let unassigned_jobs = diesel::update(jobs.filter(assigned_worker_id.eq(worker.id)))
213215
.set((status.eq("created"), assigned_worker_id.eq(None::<i32>)))
214-
.execute(conn)?;
216+
.get_results::<Job>(conn)?;
217+
for unassigned_job in unassigned_jobs {
218+
feed_events.push(EventContent::JobUnassigned(Box::new(JobAssignmentUpdate {
219+
pipeline_id: unassigned_job.pipeline_id,
220+
job_id: unassigned_job.id,
221+
arch: unassigned_job.arch,
222+
worker_id: worker.id,
223+
worker_name: worker.hostname.clone(),
224+
worker_arch: worker.arch.clone(),
225+
})));
226+
}
215227

216228
// prioritize jobs on stable branch
217229
let mut sql = jobs
@@ -270,12 +282,23 @@ pub async fn worker_poll(
270282
))
271283
.execute(conn)?;
272284

285+
feed_events.push(EventContent::JobAssigned(Box::new(JobAssignmentUpdate {
286+
pipeline_id: pipeline.id,
287+
job_id: job.id,
288+
arch: job.arch.clone(),
289+
worker_id: worker.id,
290+
worker_name: worker.hostname,
291+
worker_arch: worker.arch,
292+
})));
293+
273294
Ok(Some((pipeline, job)))
274295
}
275296
None => Ok(None),
276297
}
277298
})? {
278299
Some((pipeline, job)) => {
300+
deliver_feed_events(feed_events);
301+
279302
// update github check run status to in-progress
280303
if let Some(github_check_run_id) = job.github_check_run_id {
281304
tokio::spawn(async move {

0 commit comments

Comments
 (0)