@@ -400,35 +400,19 @@ fn operation_manager(
400400 return ;
401401 }
402402 _ = tokio:: time:: sleep( refresh_interval) => {
403- // If there were no notifications, then just re-populate.
404- let new_active_operations = list_operations( & redis_client, & group_by, & index_name) . await ;
405- for ( properties, queued) in & new_active_operations {
406- let queue_length = count_queue( queued) ;
407- tracing:: info!( queue=queue_length, properties=?properties, "Refreshed queue" ) ;
408- if active_operations. get( properties) . is_none_or( |previously_queued: & QueuedOperations | count_queue( previously_queued) != queue_length) && tx. send( ( properties. clone( ) , queue_length) ) . await . is_err( ) {
409- return ;
410- }
411- }
412- // Check for any now zero length queues.
413- for properties in active_operations. keys( ) {
414- if !new_active_operations. contains_key( properties) {
415- tracing:: info!( queue=0 , properties=?properties, "Refreshed queue" ) ;
416- if tx. send( ( properties. clone( ) , 0 ) ) . await . is_err( ) {
417- return ;
418- }
419- }
403+ refresh_operations = true ;
404+ if operation_update. is_none( ) {
405+ operation_update = Some ( Instant :: now( ) + Duration :: from_millis( 500 ) ) ;
420406 }
421- active_operations = new_active_operations;
422407 }
423408 operation_id = operation_channel. recv( ) => {
424- let Some ( operation_id) = operation_id else {
425- return ;
409+ if let Some ( operation_id) = operation_id {
410+ if operation_id. is_empty( ) {
411+ refresh_operations = true ;
412+ } else {
413+ operation_ids. insert( operation_id) ;
414+ }
426415 } ;
427- if operation_id. is_empty( ) {
428- refresh_operations = true ;
429- } else {
430- operation_ids. insert( operation_id) ;
431- }
432416 if ( !operation_ids. is_empty( ) || refresh_operations) && operation_update. is_none( ) {
433417 operation_update = Some ( Instant :: now( ) + Duration :: from_millis( 500 ) ) ;
434418 }
0 commit comments