@@ -8,8 +8,9 @@ use crossbeam_channel::TryRecvError;
88
99use cxdb:: types:: ConversationItem ;
1010use cxdb:: {
11- decode_msgpack_into, dial, dial_tls, follow_turns, subscribe_events, with_client_tag, CompressionNone ,
12- EncodingMsgpack , Event , FollowError , FollowTurn , RequestContext , SubscribeError , TurnClient ,
11+ decode_msgpack_into, dial, dial_tls, follow_turns, subscribe_events, with_client_tag,
12+ CompressionNone , EncodingMsgpack , Event , FollowError , FollowTurn , RequestContext ,
13+ SubscribeError , TurnClient ,
1314} ;
1415
1516#[ derive( Default ) ]
@@ -84,13 +85,17 @@ fn main() {
8485 let error_count = consume (
8586 & ctx,
8687 & cancel_handle,
87- event_out,
88- errs,
89- Some ( turn_errs) ,
90- Some ( turns) ,
91- config. max_events ,
92- config. max_turns ,
93- config. max_errors ,
88+ ConsumeChannels {
89+ events : event_out,
90+ errs,
91+ turn_errs : Some ( turn_errs) ,
92+ turns : Some ( turns) ,
93+ } ,
94+ ConsumeOptions {
95+ max_events : config. max_events ,
96+ max_turns : config. max_turns ,
97+ max_errors : config. max_errors ,
98+ } ,
9499 ) ;
95100 if config. max_errors > 0 && error_count >= config. max_errors {
96101 std:: process:: exit ( 1 ) ;
@@ -101,32 +106,46 @@ fn main() {
101106 let error_count = consume (
102107 & ctx,
103108 & cancel_handle,
104- events,
105- errs,
106- None ,
107- None ,
108- config. max_events ,
109- config. max_turns ,
110- config. max_errors ,
109+ ConsumeChannels {
110+ events,
111+ errs,
112+ turn_errs : None ,
113+ turns : None ,
114+ } ,
115+ ConsumeOptions {
116+ max_events : config. max_events ,
117+ max_turns : config. max_turns ,
118+ max_errors : config. max_errors ,
119+ } ,
111120 ) ;
112121 if config. max_errors > 0 && error_count >= config. max_errors {
113122 std:: process:: exit ( 1 ) ;
114123 }
115124}
116125
117- fn consume (
118- ctx : & RequestContext ,
119- cancel_handle : & Arc < cxdb:: client:: CancelHandle > ,
126+ struct ConsumeChannels {
120127 events : crossbeam_channel:: Receiver < Event > ,
121128 errs : crossbeam_channel:: Receiver < SubscribeError > ,
122- mut turn_errs : Option < crossbeam_channel:: Receiver < FollowError > > ,
123- mut turns : Option < crossbeam_channel:: Receiver < FollowTurn > > ,
129+ turn_errs : Option < crossbeam_channel:: Receiver < FollowError > > ,
130+ turns : Option < crossbeam_channel:: Receiver < FollowTurn > > ,
131+ }
132+
133+ struct ConsumeOptions {
124134 max_events : usize ,
125135 max_turns : usize ,
126136 max_errors : usize ,
137+ }
138+
139+ fn consume (
140+ ctx : & RequestContext ,
141+ cancel_handle : & Arc < cxdb:: client:: CancelHandle > ,
142+ channels : ConsumeChannels ,
143+ options : ConsumeOptions ,
127144) -> usize {
128- let mut events = Some ( events) ;
129- let mut errs = Some ( errs) ;
145+ let mut events = Some ( channels. events ) ;
146+ let mut errs = Some ( channels. errs ) ;
147+ let mut turn_errs = channels. turn_errs ;
148+ let mut turns = channels. turns ;
130149 let mut event_count = 0usize ;
131150 let mut turn_count = 0usize ;
132151 let mut error_count = 0usize ;
@@ -207,9 +226,9 @@ fn consume(
207226 & mut event_count,
208227 & mut turn_count,
209228 & mut error_count,
210- max_events,
211- max_turns,
212- max_errors,
229+ options . max_events ,
230+ options . max_turns ,
231+ options . max_errors ,
213232 ) ;
214233
215234 if !progressed {
@@ -262,7 +281,10 @@ fn print_event(ev: &Event) {
262281
263282fn print_turn ( turn : & FollowTurn ) {
264283 let mut output = serde_json:: Map :: new ( ) ;
265- output. insert ( "kind" . to_string ( ) , serde_json:: Value :: String ( "turn" . to_string ( ) ) ) ;
284+ output. insert (
285+ "kind" . to_string ( ) ,
286+ serde_json:: Value :: String ( "turn" . to_string ( ) ) ,
287+ ) ;
266288 output. insert (
267289 "context_id" . to_string ( ) ,
268290 serde_json:: Value :: Number ( turn. context_id . into ( ) ) ,
@@ -306,7 +328,10 @@ fn print_turn(turn: &FollowTurn) {
306328 output. insert ( "decode_error" . to_string ( ) , serde_json:: Value :: String ( err) ) ;
307329 }
308330 if let Some ( item) = item {
309- output. insert ( "item" . to_string ( ) , serde_json:: to_value ( item) . unwrap_or ( serde_json:: Value :: Null ) ) ;
331+ output. insert (
332+ "item" . to_string ( ) ,
333+ serde_json:: to_value ( item) . unwrap_or ( serde_json:: Value :: Null ) ,
334+ ) ;
310335 }
311336
312337 let output = serde_json:: Value :: Object ( output) ;
@@ -315,7 +340,10 @@ fn print_turn(turn: &FollowTurn) {
315340
316341fn tee_events (
317342 events : crossbeam_channel:: Receiver < Event > ,
318- ) -> ( crossbeam_channel:: Receiver < Event > , crossbeam_channel:: Receiver < Event > ) {
343+ ) -> (
344+ crossbeam_channel:: Receiver < Event > ,
345+ crossbeam_channel:: Receiver < Event > ,
346+ ) {
319347 let ( out_tx, out_rx) = crossbeam_channel:: bounded ( 128 ) ;
320348 let ( follow_tx, follow_rx) = crossbeam_channel:: bounded ( 128 ) ;
321349 std:: thread:: spawn ( move || {
@@ -356,19 +384,19 @@ fn parse_args() -> Result<Config, String> {
356384 cfg. client_tag = next_value ( & mut args, & arg) ?;
357385 }
358386 "--max-events" => {
359- cfg. max_events = next_value ( & mut args, & arg) ?. parse ( ) . map_err ( |_| {
360- format ! ( "invalid --max-events value" )
361- } ) ?;
387+ cfg. max_events = next_value ( & mut args, & arg) ?
388+ . parse ( )
389+ . map_err ( |_| "invalid --max-events value" . to_string ( ) ) ?;
362390 }
363391 "--max-turns" => {
364- cfg. max_turns = next_value ( & mut args, & arg) ?. parse ( ) . map_err ( |_| {
365- format ! ( "invalid --max-turns value" )
366- } ) ?;
392+ cfg. max_turns = next_value ( & mut args, & arg) ?
393+ . parse ( )
394+ . map_err ( |_| "invalid --max-turns value" . to_string ( ) ) ?;
367395 }
368396 "--max-errors" => {
369- cfg. max_errors = next_value ( & mut args, & arg) ?. parse ( ) . map_err ( |_| {
370- format ! ( "invalid --max-errors value" )
371- } ) ?;
397+ cfg. max_errors = next_value ( & mut args, & arg) ?
398+ . parse ( )
399+ . map_err ( |_| "invalid --max-errors value" . to_string ( ) ) ?;
372400 }
373401 _ => {
374402 return Err ( format ! ( "unknown argument: {}" , arg) ) ;
0 commit comments