@@ -125,7 +125,7 @@ impl<E: Spawner + BufferPooler + Clock + CryptoRngCore + Metrics, C: PublicKey>
125125 }
126126
127127 pub async fn run < Si : Sink , St : Stream > (
128- mut self ,
128+ self ,
129129 peer : C ,
130130 ( mut conn_sender, mut conn_receiver) : ( Sender < Si > , Receiver < St > ) ,
131131 channels : Channels < C > ,
@@ -158,6 +158,7 @@ impl<E: Spawner + BufferPooler + Clock + CryptoRngCore + Metrics, C: PublicKey>
158158
159159 // Enter into the main loop
160160 let mut batch = Vec :: with_capacity ( self . send_batch_size ) ;
161+ let ( control, high, low) = & mut ( self . control , self . high , self . low ) ;
161162 select_loop ! {
162163 context,
163164 on_stopped => { } ,
@@ -171,33 +172,57 @@ impl<E: Spawner + BufferPooler + Clock + CryptoRngCore + Metrics, C: PublicKey>
171172 types:: Message :: Ping . encode_with_pool( & pool) . into( ) ,
172173 ) ;
173174 Self :: extend_send_many(
174- & peer, self . send_batch_size, & mut batch,
175- & mut self . control, & mut self . high, & mut self . low,
176- & rate_limits, & self . sent_messages,
175+ & peer,
176+ self . send_batch_size,
177+ & mut batch,
178+ control,
179+ high,
180+ low,
181+ & rate_limits,
182+ & self . sent_messages,
177183 ) ?;
178- conn_sender. send_many( batch. drain( ..) ) . await . map_err( Error :: SendFailed ) ?;
184+ conn_sender
185+ . send_many( batch. drain( ..) )
186+ . await
187+ . map_err( Error :: SendFailed ) ?;
179188 deadline = context. current( ) + self . ping_frequency;
180189 } ,
181190 // Await any outbound message (control, high, or low), then
182191 // drain already-queued messages into a single runtime write.
183192 // Priority order: control > high > low.
184- msg = recv_prioritized( & mut self . control, & mut self . high, & mut self . low) => {
193+ msg = recv_prioritized( control, high, low) => {
185194 match msg {
186195 Prioritized :: Closed => return Err ( Error :: PeerDisconnected ) ,
187196 Prioritized :: Control ( msg) => match msg {
188- Message :: Kill => return Err ( Error :: PeerKilled ( peer. to_string( ) ) ) ,
197+ Message :: Kill => {
198+ return Err ( Error :: PeerKilled ( peer. to_string( ) ) )
199+ }
189200 } ,
190201 Prioritized :: Data ( encoded) => {
191- let ( metric, payload) = Self :: prepare_data( & peer, encoded, & rate_limits) ;
192- Self :: push_batched( & self . sent_messages, & mut batch, metric, payload) ;
193- } ,
202+ let ( metric, payload) =
203+ Self :: prepare_data( & peer, encoded, & rate_limits) ;
204+ Self :: push_batched(
205+ & self . sent_messages,
206+ & mut batch,
207+ metric,
208+ payload,
209+ ) ;
210+ }
194211 }
195212 Self :: extend_send_many(
196- & peer, self . send_batch_size, & mut batch,
197- & mut self . control, & mut self . high, & mut self . low,
198- & rate_limits, & self . sent_messages,
213+ & peer,
214+ self . send_batch_size,
215+ & mut batch,
216+ control,
217+ high,
218+ low,
219+ & rate_limits,
220+ & self . sent_messages,
199221 ) ?;
200- conn_sender. send_many( batch. drain( ..) ) . await . map_err( Error :: SendFailed ) ?;
222+ conn_sender
223+ . send_many( batch. drain( ..) )
224+ . await
225+ . map_err( Error :: SendFailed ) ?;
201226 } ,
202227 }
203228
0 commit comments