@@ -17,9 +17,10 @@ use clowarden_core::{
1717 multierror:: MultiError ,
1818 services:: { BaseRefConfigStatus , ChangesApplied , ChangesSummary , DynServiceHandler , ServiceName } ,
1919} ;
20+ use futures:: future:: { self , JoinAll } ;
2021use octorust:: types:: { ChecksCreateRequestConclusion , JobStatus , PullRequestData } ;
2122use serde:: { Deserialize , Serialize } ;
22- use std:: collections:: HashMap ;
23+ use std:: { collections:: HashMap , sync :: Arc } ;
2324use tokio:: {
2425 sync:: { broadcast, mpsc} ,
2526 task:: JoinHandle ,
@@ -47,6 +48,16 @@ pub(crate) enum Job {
4748 Validate ( ValidateInput ) ,
4849}
4950
51+ impl Job {
52+ /// Get the name of the organization this job is related to.
53+ pub ( crate ) fn org_name ( & self ) -> & str {
54+ match self {
55+ Job :: Reconcile ( input) => & input. org . name ,
56+ Job :: Validate ( input) => & input. org . name ,
57+ }
58+ }
59+ }
60+
5061/// Information required to process a reconcile job.
5162#[ derive( Debug , Clone , Default , PartialEq , Serialize , Deserialize ) ]
5263pub ( crate ) struct ReconcileInput {
@@ -118,20 +129,68 @@ impl Handler {
118129 gh : DynGH ,
119130 ghc : core:: github:: DynGH ,
120131 services : HashMap < ServiceName , DynServiceHandler > ,
121- ) -> Self {
122- Self {
132+ ) -> Arc < Self > {
133+ Arc :: new ( Self {
123134 db,
124135 gh,
125136 ghc,
126137 services,
127- }
138+ } )
128139 }
129140
130- /// Spawn a new task to process jobs received on the jobs channel. The task
131- /// will stop when notified on the stop channel provided.
141+ /// Spawn some tasks to process jobs received on the jobs channel. We will
142+ /// create one worker per organization, plus an additional task to route
143+ /// jobs to the corresponding organization worker. All tasks will stop when
144+ /// notified on the stop channel provided.
132145 pub ( crate ) fn start (
133- self ,
146+ self : Arc < Self > ,
134147 mut jobs_rx : mpsc:: UnboundedReceiver < Job > ,
148+ stop_tx : & broadcast:: Sender < ( ) > ,
149+ orgs : Vec < Organization > ,
150+ ) -> JoinAll < JoinHandle < ( ) > > {
151+ let mut handles = Vec :: with_capacity ( orgs. len ( ) + 1 ) ;
152+ let mut orgs_jobs_tx_channels = HashMap :: new ( ) ;
153+
154+ // Create a worker for each organization
155+ for org in orgs {
156+ let ( org_jobs_tx, org_jobs_rx) = mpsc:: unbounded_channel ( ) ;
157+ orgs_jobs_tx_channels. insert ( org. name , org_jobs_tx) ;
158+ let org_worker = self . clone ( ) . organization_worker ( org_jobs_rx, stop_tx. subscribe ( ) ) ;
159+ handles. push ( org_worker) ;
160+ }
161+
162+ // Create a worker to route jobs to the corresponding org worker
163+ let mut stop_rx = stop_tx. subscribe ( ) ;
164+ let jobs_router = tokio:: spawn ( async move {
165+ loop {
166+ tokio:: select! {
167+ biased;
168+
169+ // Pick next job from the queue and send it to the corresponding org worker
170+ Some ( job) = jobs_rx. recv( ) => {
171+ if let Some ( org_jobs_tx) = orgs_jobs_tx_channels. get( job. org_name( ) ) {
172+ _ = org_jobs_tx. send( job) ;
173+ }
174+ }
175+
176+ // Exit if the handler has been asked to stop
177+ _ = stop_rx. recv( ) => {
178+ break
179+ }
180+ }
181+ }
182+ } ) ;
183+ handles. push ( jobs_router) ;
184+
185+ future:: join_all ( handles)
186+ }
187+
188+ /// Spawn a worker that will take care of processing jobs for a given
189+ /// organization. The worker will stop when notified on the stop channel
190+ /// provided.
191+ fn organization_worker (
192+ self : Arc < Self > ,
193+ mut org_jobs_rx : mpsc:: UnboundedReceiver < Job > ,
135194 mut stop_rx : broadcast:: Receiver < ( ) > ,
136195 ) -> JoinHandle < ( ) > {
137196 tokio:: spawn ( async move {
@@ -140,7 +199,7 @@ impl Handler {
140199 biased;
141200
142201 // Pick next job from the queue and process it
143- Some ( job) = jobs_rx . recv( ) => {
202+ Some ( job) = org_jobs_rx . recv( ) => {
144203 match job {
145204 Job :: Reconcile ( input) => _ = self . handle_reconcile_job( input) . await ,
146205 Job :: Validate ( input) => _ = self . handle_validate_job( input) . await ,
@@ -296,8 +355,8 @@ pub(crate) fn scheduler(
296355 jobs_tx : mpsc:: UnboundedSender < Job > ,
297356 mut stop_rx : broadcast:: Receiver < ( ) > ,
298357 orgs : Vec < Organization > ,
299- ) -> JoinHandle < ( ) > {
300- tokio:: spawn ( async move {
358+ ) -> JoinAll < JoinHandle < ( ) > > {
359+ let scheduler = tokio:: spawn ( async move {
301360 let reconcile_frequency = time:: Duration :: from_secs ( RECONCILE_FREQUENCY ) ;
302361 let mut reconcile = time:: interval ( reconcile_frequency) ;
303362 reconcile. set_missed_tick_behavior ( MissedTickBehavior :: Skip ) ;
@@ -322,5 +381,7 @@ pub(crate) fn scheduler(
322381 } ,
323382 }
324383 }
325- } )
384+ } ) ;
385+
386+ future:: join_all ( vec ! [ scheduler] )
326387}
0 commit comments