@@ -24,6 +24,7 @@ package servicebus
2424
2525import  (
2626	"context" 
27+ 	"errors" 
2728	"sync" 
2829	"time" 
2930
@@ -234,31 +235,10 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {
234235
235236			switch  err .(type ) {
236237			case  * amqp.Error , * amqp.DetachError :
237- 				tab .For (ctx ).Debug ("recovering connection" )
238- 				_ , retryErr  :=  common .Retry (10 , 10 * time .Second , func () (interface {}, error ) {
239- 					ctx , sp  :=  s .startProducerSpanFromContext (ctx , "sb.Sender.trySend.tryRecover" )
240- 					defer  sp .End ()
241- 
242- 					err  :=  s .Recover (ctx )
243- 					if  err  ==  nil  {
244- 						tab .For (ctx ).Debug ("recovered connection" )
245- 						return  nil , nil 
246- 					}
247- 
248- 					select  {
249- 					case  <- ctx .Done ():
250- 						return  nil , ctx .Err ()
251- 					default :
252- 						return  nil , common .Retryable (err .Error ())
253- 					}
254- 				})
255- 
256- 				if  retryErr  !=  nil  {
257- 					tab .For (ctx ).Debug ("sender recovering retried, but error was unrecoverable" )
258- 					if  err  :=  s .Close (ctx ); err  !=  nil  {
259- 						tab .For (ctx ).Error (err )
260- 					}
261- 					return  retryErr 
238+ 				err  =  s .handleAMQPError (ctx , err )
239+ 				if  err  !=  nil  {
240+ 					tab .For (ctx ).Error (err )
241+ 					return  err 
262242				}
263243			default :
264244				tab .For (ctx ).Error (err )
@@ -268,6 +248,53 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {
268248	}
269249}
270250
251+ // handleAMQPError is called internally when an event has failed to send so we 
252+ // can parse the error to determine whether we should attempt to retry sending the event again. 
253+ func  (s  * Sender ) handleAMQPError (ctx  context.Context , err  error ) error  {
254+ 	var  amqpError  * amqp.Error 
255+ 	if  errors .As (err , & amqpError ) {
256+ 		switch  amqpError .Condition  {
257+ 		case  errorServerBusy :
258+ 			return  s .retryRetryableAmqpError (ctx , amqpRetryDefaultTimes , amqpRetryBusyServerDelay )
259+ 		case  errorTimeout :
260+ 			return  s .retryRetryableAmqpError (ctx , amqpRetryDefaultTimes , amqpRetryDefaultDelay )
261+ 		case  errorOperationCancelled :
262+ 			return  s .retryRetryableAmqpError (ctx , amqpRetryDefaultTimes , amqpRetryDefaultDelay )
263+ 		case  errorContainerClose :
264+ 			return  s .retryRetryableAmqpError (ctx , amqpRetryDefaultTimes , amqpRetryDefaultDelay )
265+ 		default :
266+ 			return  err 
267+ 		}
268+ 	}
269+ 	return  s .retryRetryableAmqpError (ctx , amqpRetryDefaultTimes , amqpRetryDefaultDelay )
270+ }
271+ 
272+ func  (s  * Sender ) retryRetryableAmqpError (ctx  context.Context , times  int , delay  time.Duration ) error  {
273+ 	tab .For (ctx ).Debug ("recovering sender connection" )
274+ 	_ , retryErr  :=  common .Retry (times , delay , func () (interface {}, error ) {
275+ 		ctx , sp  :=  s .startProducerSpanFromContext (ctx , "sb.Sender.trySend.tryRecover" )
276+ 		defer  sp .End ()
277+ 
278+ 		err  :=  s .Recover (ctx )
279+ 		if  err  ==  nil  {
280+ 			tab .For (ctx ).Debug ("recovered connection" )
281+ 			return  nil , nil 
282+ 		}
283+ 
284+ 		select  {
285+ 		case  <- ctx .Done ():
286+ 			return  nil , ctx .Err ()
287+ 		default :
288+ 			return  nil , common .Retryable (err .Error ())
289+ 		}
290+ 	})
291+ 	if  retryErr  !=  nil  {
292+ 		tab .For (ctx ).Debug ("sender recovering retried, but error was unrecoverable" )
293+ 		return  retryErr 
294+ 	}
295+ 	return  nil 
296+ }
297+ 
271298func  (s  * Sender ) connClosedError (ctx  context.Context ) error  {
272299	name  :=  "Sender" 
273300	if  s .Name  !=  ""  {
0 commit comments