@@ -115,7 +115,10 @@ struct ExtProcMock {
115115#[ tonic:: async_trait]
116116impl ExternalProcessor for ExtProcMock {
117117 type ProcessStream = ReceiverStream < Result < ProcessingResponse , Status > > ;
118- async fn process ( & self , request : Request < tonic:: Streaming < ProcessingRequest > > ) -> Result < Response < Self :: ProcessStream > , Status > {
118+ async fn process (
119+ & self ,
120+ request : Request < tonic:: Streaming < ProcessingRequest > > ,
121+ ) -> Result < Response < Self :: ProcessStream > , Status > {
119122 let mut inbound = request. into_inner ( ) ;
120123 let ( tx, rx) = mpsc:: channel :: < Result < ProcessingResponse , Status > > ( 32 ) ;
121124 let epp_upstream = self . epp_upstream . clone ( ) ;
@@ -129,54 +132,123 @@ impl ExternalProcessor for ExtProcMock {
129132 match msg {
130133 Ok ( pr) => match pr. request {
131134 Some ( processing_request:: Request :: RequestHeaders ( _) ) => {
132- if role == "EPP" {
133- eprintln ! ( "extproc_mock: EPP headers received, selecting endpoint: {}" , epp_upstream) ;
134- let resp = ProcessingResponse { response : Some ( processing_response:: Response :: RequestHeaders ( build_headers_response ( & epp_upstream, & bbr_model) ) ) , dynamic_metadata : None , mode_override : None , override_message_timeout : None } ; if tx. send ( Ok ( resp) ) . await . is_err ( ) { break ; } sent_headers_response = true ;
135+ if role == "EPP" {
136+ eprintln ! (
137+ "extproc_mock: EPP headers received, selecting endpoint: {}" ,
138+ epp_upstream
139+ ) ;
140+ let resp = ProcessingResponse {
141+ response : Some ( processing_response:: Response :: RequestHeaders (
142+ build_headers_response ( & epp_upstream, & bbr_model) ,
143+ ) ) ,
144+ dynamic_metadata : None ,
145+ mode_override : None ,
146+ override_message_timeout : None ,
147+ } ;
148+ if tx. send ( Ok ( resp) ) . await . is_err ( ) {
149+ break ;
150+ }
151+ sent_headers_response = true ;
135152 } else {
136- eprintln ! ( "extproc_mock: BBR headers received, waiting for body..." ) ;
153+ eprintln ! (
154+ "extproc_mock: BBR headers received, waiting for body..."
155+ ) ;
137156 }
138157 }
139158 Some ( processing_request:: Request :: RequestBody ( body) ) => {
140159 body_buf. extend_from_slice ( & body. body ) ;
141- if body. end_of_stream {
142- eprintln ! ( "extproc_mock: end of stream, body size: {} bytes" , body_buf. len( ) ) ;
143- if let Ok ( v) = serde_json:: from_slice :: < Value > ( & body_buf) {
144- if let Some ( m) = v. get ( "model" ) . and_then ( |x| x. as_str ( ) ) {
160+ if body. end_of_stream {
161+ eprintln ! (
162+ "extproc_mock: end of stream, body size: {} bytes" ,
163+ body_buf. len( )
164+ ) ;
165+ if let Ok ( v) = serde_json:: from_slice :: < Value > ( & body_buf) {
166+ if let Some ( m) = v. get ( "model" ) . and_then ( |x| x. as_str ( ) ) {
145167 current_bbr_model = m. to_string ( ) ;
146- eprintln ! ( "extproc_mock: detected model in JSON body: {}" , current_bbr_model) ;
147- }
148- }
149- let resp = ProcessingResponse { response : Some ( processing_response:: Response :: RequestBody ( build_body_response ( & epp_upstream, & current_bbr_model) ) ) , dynamic_metadata : None , mode_override : None , override_message_timeout : None } ;
168+ eprintln ! (
169+ "extproc_mock: detected model in JSON body: {}" ,
170+ current_bbr_model
171+ ) ;
172+ }
173+ }
174+ let resp = ProcessingResponse {
175+ response : Some ( processing_response:: Response :: RequestBody (
176+ build_body_response ( & epp_upstream, & current_bbr_model) ,
177+ ) ) ,
178+ dynamic_metadata : None ,
179+ mode_override : None ,
180+ override_message_timeout : None ,
181+ } ;
150182 if role == "BBR" {
151- eprintln ! ( "extproc_mock: BBR final response - model: {}" , current_bbr_model) ;
183+ eprintln ! (
184+ "extproc_mock: BBR final response - model: {}" ,
185+ current_bbr_model
186+ ) ;
187+ }
188+ if tx. send ( Ok ( resp) ) . await . is_err ( ) {
189+ break ;
152190 }
153- if tx. send ( Ok ( resp) ) . await . is_err ( ) { break ; }
154191 } else {
155192 eprintln ! ( "extproc_mock: received body chunk, size: {} bytes, total: {} bytes" , body. body. len( ) , body_buf. len( ) ) ;
156193 }
157194 }
158195 _ => { }
159196 } ,
160- Err ( status) => { let _ = tx. send ( Err ( status) ) . await ; break ; }
197+ Err ( status) => {
198+ let _ = tx. send ( Err ( status) ) . await ;
199+ break ;
200+ }
161201 }
162202 }
163- if !sent_headers_response && role == "EPP" { let resp = ProcessingResponse { response : Some ( processing_response:: Response :: RequestHeaders ( build_headers_response ( & epp_upstream, & bbr_model) ) ) , dynamic_metadata : None , mode_override : None , override_message_timeout : None } ; let _ = tx. send ( Ok ( resp) ) . await ; }
203+ if !sent_headers_response && role == "EPP" {
204+ let resp = ProcessingResponse {
205+ response : Some ( processing_response:: Response :: RequestHeaders (
206+ build_headers_response ( & epp_upstream, & bbr_model) ,
207+ ) ) ,
208+ dynamic_metadata : None ,
209+ mode_override : None ,
210+ override_message_timeout : None ,
211+ } ;
212+ let _ = tx. send ( Ok ( resp) ) . await ;
213+ }
164214 } ) ;
165215 Ok ( Response :: new ( ReceiverStream :: new ( rx) ) )
166216 }
167217}
168218
169219#[ tokio:: main( flavor = "multi_thread" ) ]
170220async fn main ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
171- let addr: SocketAddr = std:: env:: args ( ) . nth ( 1 ) . unwrap_or_else ( || "0.0.0.0:9001" . to_string ( ) ) . parse ( ) ?;
172- let epp_upstream = env:: var ( "EPP_UPSTREAM" ) . unwrap_or_else ( |_| "host.docker.internal:18080" . to_string ( ) ) ;
221+ let addr: SocketAddr = std:: env:: args ( )
222+ . nth ( 1 )
223+ . unwrap_or_else ( || "0.0.0.0:9001" . to_string ( ) )
224+ . parse ( ) ?;
225+ let epp_upstream =
226+ env:: var ( "EPP_UPSTREAM" ) . unwrap_or_else ( |_| "host.docker.internal:18080" . to_string ( ) ) ;
173227 let bbr_model = env:: var ( "BBR_MODEL" ) . unwrap_or_else ( |_| "bbr-chosen-model" . to_string ( ) ) ;
174- let default_role = if addr. port ( ) == 9001 { "EPP" } else if addr. port ( ) == 9000 { "BBR" } else { "EPP" } ;
228+ let default_role = if addr. port ( ) == 9001 {
229+ "EPP"
230+ } else if addr. port ( ) == 9000 {
231+ "BBR"
232+ } else {
233+ "EPP"
234+ } ;
175235 let role = env:: var ( "MOCK_ROLE" ) . unwrap_or_else ( |_| default_role. to_string ( ) ) ;
176236
177- println ! ( "extproc_mock: role={}, configured EPP_UPSTREAM={}, BBR_MODEL={}" , role, epp_upstream, bbr_model) ;
237+ println ! (
238+ "extproc_mock: role={}, configured EPP_UPSTREAM={}, BBR_MODEL={}" ,
239+ role, epp_upstream, bbr_model
240+ ) ;
178241
179- let svc = ExtProcMock { epp_upstream, bbr_model, role } ;
242+ let svc = ExtProcMock {
243+ epp_upstream,
244+ bbr_model,
245+ role,
246+ } ;
180247
181248 println ! ( "extproc_mock listening on {}" , addr) ;
182- tonic:: transport:: Server :: builder ( ) . add_service ( ExternalProcessorServer :: new ( svc) ) . serve ( addr) . await ?; Ok ( ( ) ) }
249+ tonic:: transport:: Server :: builder ( )
250+ . add_service ( ExternalProcessorServer :: new ( svc) )
251+ . serve ( addr)
252+ . await ?;
253+ Ok ( ( ) )
254+ }
0 commit comments