@@ -202,7 +202,7 @@ func (p *Processor[T]) Partition(ctx context.Context, resources []T) map[string]
202202
203203// Dispatch sends all the requests to the target.
204204func (p * Processor [T ]) Dispatch (ctx context.Context , tenantMap map [string ][]T ) error {
205- waitGroup := sync.WaitGroup {}
205+ wg := sync.WaitGroup {}
206206
207207 for tenant , resources := range tenantMap {
208208 ctx , span := p .tracer .Start (
@@ -215,10 +215,10 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e
215215 )
216216 defer span .End ()
217217
218- waitGroup .Add (1 )
218+ wg .Add (1 )
219219
220220 go func (tenant string , resources []T ) {
221- defer waitGroup .Done ()
221+ defer wg .Done ()
222222
223223 tenantAttribute := attribute .String ("signal.tenant" , tenant )
224224
@@ -245,16 +245,18 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e
245245 return
246246 }
247247
248+ signalResponseStatusCodeAttr := attribute .String (
249+ "signal.response.status.code" ,
250+ strconv .Itoa (resp .StatusCode ),
251+ )
252+
248253 p .proxyRecordsMetric .Add (
249254 ctx ,
250255 int64 (len (resources )),
251256 metric .WithAttributes (
252257 p .signalTypeAttr (),
253258 tenantAttribute ,
254- attribute .String (
255- "signal.response.status.code" ,
256- strconv .Itoa (resp .StatusCode ),
257- ),
259+ signalResponseStatusCodeAttr ,
258260 ),
259261 )
260262
@@ -264,37 +266,37 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e
264266 metric .WithAttributes (
265267 p .signalTypeAttr (),
266268 tenantAttribute ,
267- attribute .String (
268- "signal.response.status.code" ,
269- strconv .Itoa (resp .StatusCode ),
270- ),
269+ signalResponseStatusCodeAttr ,
271270 ),
272271 )
273272
274273 logger .Debug (
275274 ctx ,
276275 p .logger ,
277276 fmt .Sprintf (
278- "sent %d records status %d for tenant %s " ,
277+ "sent %d records" ,
279278 len (resources ),
280- resp .StatusCode ,
281- tenant ,
282279 ),
283280 p .signalTypeLogAttr (),
281+ log .KeyValueFromAttribute (tenantAttribute ),
282+ log .KeyValueFromAttribute (signalResponseStatusCodeAttr ),
284283 )
285284
286285 logger .Trace (
287286 ctx ,
288287 p .logger ,
289288 fmt .Sprintf ("%+v" , resources ),
290289 p .signalTypeLogAttr (),
290+ log .KeyValueFromAttribute (tenantAttribute ),
291+ log .KeyValueFromAttribute (signalResponseStatusCodeAttr ),
291292 )
292293
293294 span .SetStatus (codes .Ok , "sent successfully" )
294295 }(tenant , resources )
295296 }
296297
297- waitGroup .Wait ()
298+ wg .Wait ()
299+
298300 return nil
299301}
300302
@@ -306,11 +308,13 @@ func (p *Processor[T]) send(
306308) (http.Response , error ) {
307309 start := time .Now ()
308310
311+ signalTenantAttr := attribute .String ("signal.tenant" , tenant )
312+
309313 ctx , span := p .tracer .Start (ctx ,
310314 fmt .Sprintf ("%s.send" , p .signalType ),
311315 trace .WithAttributes (
312316 p .signalTypeAttr (),
313- attribute . String ( "signal.tenant" , tenant ) ,
317+ signalTenantAttr ,
314318 attribute .Int ("signal.tenant.records" , len (resources )),
315319 ),
316320 )
@@ -319,6 +323,15 @@ func (p *Processor[T]) send(
319323 // Marshal resources to bytes
320324 body , err := p .marshalResources (resources )
321325 if err != nil {
326+ logger .Error (
327+ ctx ,
328+ p .logger ,
329+ err .Error (),
330+ p .signalTypeLogAttr (),
331+ log .KeyValueFromAttribute (signalTenantAttr ),
332+ )
333+ span .RecordError (err )
334+ span .SetStatus (codes .Error , "failed to marshal data" )
322335 return http.Response {}, fmt .Errorf ("failed to marshal data: %w" , err )
323336 }
324337
@@ -329,6 +342,15 @@ func (p *Processor[T]) send(
329342 io .NopCloser (bytes .NewReader (body )),
330343 )
331344 if err != nil {
345+ logger .Error (
346+ ctx ,
347+ p .logger ,
348+ err .Error (),
349+ p .signalTypeLogAttr (),
350+ log .KeyValueFromAttribute (signalTenantAttr ),
351+ )
352+ span .RecordError (err )
353+ span .SetStatus (codes .Error , "failed to create request" )
332354 return http.Response {}, fmt .Errorf ("failed to create request: %w" , err )
333355 }
334356
@@ -341,6 +363,13 @@ func (p *Processor[T]) send(
341363
342364 resp , err := p .client .Do (req )
343365 if err != nil {
366+ logger .Error (
367+ ctx ,
368+ p .logger ,
369+ err .Error (),
370+ p .signalTypeLogAttr (),
371+ log .KeyValueFromAttribute (signalTenantAttr ),
372+ )
344373 span .RecordError (err )
345374 span .SetStatus (codes .Error , "failed to send" )
346375 return http.Response {}, fmt .Errorf ("failed to send request: %w" , err )
@@ -354,26 +383,27 @@ func (p *Processor[T]) send(
354383 p .logger ,
355384 fmt .Sprintf ("failed to close response body: %v" , closeErr ),
356385 p .signalTypeLogAttr (),
386+ log .KeyValueFromAttribute (signalTenantAttr ),
357387 )
358388 span .RecordError (closeErr )
359389 span .SetStatus (codes .Error , "failed to close response body" )
360390 }
361391 }()
362392
363- respAttr := attribute .String (
393+ signalResponseStatusCodeAttr := attribute .String (
364394 "signal.response.status.code" ,
365395 strconv .Itoa (resp .StatusCode ),
366396 )
367397
368- span .SetAttributes (respAttr )
398+ span .SetAttributes (signalResponseStatusCodeAttr )
369399 span .SetStatus (codes .Ok , "sent successfully" )
370400
371401 p .proxyLatencyMetric .Record (ctx ,
372402 time .Since (start ).Milliseconds (),
373403 metric .WithAttributes (
374- respAttr ,
404+ signalResponseStatusCodeAttr ,
375405 p .signalTypeAttr (),
376- attribute . String ( "signal.tenant" , tenant ) ,
406+ signalTenantAttr ,
377407 ),
378408 )
379409
0 commit comments