@@ -30,6 +30,7 @@ import (
3030	"sync" 
3131	"time" 
3232
33+ 	common "github.com/Azure/azure-amqp-common-go/v3" 
3334	"github.com/Azure/azure-amqp-common-go/v3/rpc" 
3435	"github.com/Azure/azure-amqp-common-go/v3/uuid" 
3536	"github.com/Azure/go-amqp" 
@@ -60,65 +61,132 @@ func newRPCClient(ctx context.Context, ec entityConnector, opts ...rpcClientOpti
6061			return  nil , err 
6162		}
6263	}
63- 
64+ 	if  err  :=  r .newClient (ctx ); err  !=  nil  {
65+ 		tab .For (ctx ).Error (err )
66+ 		return  nil , err 
67+ 	}
6468	return  r , nil 
6569}
6670
71+ // newClient will replace the existing client and start auth auto-refresh. 
72+ // any pre-existing client MUST be closed before calling this method. 
73+ // NOTE: this does *not* take the write lock, callers must hold it as required! 
74+ func  (r  * rpcClient ) newClient (ctx  context.Context ) error  {
75+ 	var  err  error 
76+ 	r .client , err  =  r .ec .Namespace ().newClient (ctx )
77+ 	if  err  !=  nil  {
78+ 		return  err 
79+ 	}
80+ 	r .cancelAuthRefresh , err  =  r .ec .Namespace ().negotiateClaim (ctx , r .client , r .ec .ManagementPath ())
81+ 	if  err  !=  nil  {
82+ 		return  err 
83+ 	}
84+ 	return  nil 
85+ }
86+ 
6787// Recover will attempt to close the current session and link, then rebuild them 
6888func  (r  * rpcClient ) Recover (ctx  context.Context ) error  {
6989	ctx , span  :=  r .startSpanFromContext (ctx , "sb.rpcClient.Recover" )
7090	defer  span .End ()
71- 
72- 	_  =  r .Close ()
73- 	return  r .ensureConn (ctx )
91+ 	// atomically close and rebuild the client 
92+ 	r .clientMu .Lock ()
93+ 	defer  r .clientMu .Unlock ()
94+ 	_  =  r .close ()
95+ 	if  err  :=  r .newClient (ctx ); err  !=  nil  {
96+ 		tab .For (ctx ).Error (err )
97+ 		return  err 
98+ 	}
99+ 	return  nil 
74100}
75101
76102// Close will close the AMQP connection 
77103func  (r  * rpcClient ) Close () error  {
78104	r .clientMu .Lock ()
79105	defer  r .clientMu .Unlock ()
106+ 	return  r .close ()
107+ }
108+ 
109+ // closes the AMQP connection.  callers *must* hold the client write lock before calling! 
110+ func  (r  * rpcClient ) close () error  {
80111	if  r .cancelAuthRefresh  !=  nil  {
81112		<- r .cancelAuthRefresh ()
82113	}
83- 
84114	return  r .client .Close ()
85115}
86116
87- func  (r  * rpcClient ) ensureConn (ctx  context.Context ) error  {
88- 	ctx , span  :=  r .startSpanFromContext (ctx , "sb.rpcClient.ensureConn" )
89- 	defer  span .End ()
90- 
91- 	if  r .client  !=  nil  {
92- 		return  nil 
117+ // creates a new link and sends the RPC request, recovering and retrying on certain AMQP errors 
118+ func  (r  * rpcClient ) doRPCWithRetry (ctx  context.Context , address  string , msg  * amqp.Message , times  int , delay  time.Duration , opts  ... rpc.LinkOption ) (* rpc.Response , error ) {
119+ 	// track the number of times we attempt to perform the RPC call. 
120+ 	// this is to avoid a potential infinite loop if the returned error 
121+ 	// is always transient and Recover() doesn't fail. 
122+ 	sendCount  :=  0 
123+ 	for  {
124+ 		r .clientMu .RLock ()
125+ 		client  :=  r .client 
126+ 		r .clientMu .RUnlock ()
127+ 		var  link  * rpc.Link 
128+ 		var  rsp  * rpc.Response 
129+ 		var  err  error 
130+ 		link , err  =  rpc .NewLink (client , address , opts ... )
131+ 		if  err  ==  nil  {
132+ 			rsp , err  =  link .RetryableRPC (ctx , times , delay , msg )
133+ 			if  err  ==  nil  {
134+ 				return  rsp , err 
135+ 			}
136+ 		}
137+ 		if  sendCount  >=  amqpRetryDefaultTimes  ||  ! isAMQPTransientError (ctx , err ) {
138+ 			return  nil , err 
139+ 		}
140+ 		sendCount ++ 
141+ 		// if we get here, recover and try again 
142+ 		tab .For (ctx ).Debug ("recovering RPC connection" )
143+ 		_ , retryErr  :=  common .Retry (amqpRetryDefaultTimes , amqpRetryDefaultDelay , func () (interface {}, error ) {
144+ 			ctx , sp  :=  r .startProducerSpanFromContext (ctx , "sb.rpcClient.doRPCWithRetry.tryRecover" )
145+ 			defer  sp .End ()
146+ 			if  err  :=  r .Recover (ctx ); err  ==  nil  {
147+ 				tab .For (ctx ).Debug ("recovered RPC connection" )
148+ 				return  nil , nil 
149+ 			}
150+ 			select  {
151+ 			case  <- ctx .Done ():
152+ 				return  nil , ctx .Err ()
153+ 			default :
154+ 				return  nil , common .Retryable (err .Error ())
155+ 			}
156+ 		})
157+ 		if  retryErr  !=  nil  {
158+ 			tab .For (ctx ).Debug ("RPC recovering retried, but error was unrecoverable" )
159+ 			return  nil , retryErr 
160+ 		}
93161	}
162+ }
94163
95- 	r .clientMu .Lock ()
96- 	defer  r .clientMu .Unlock ()
97- 
98- 	client , err  :=  r .ec .Namespace ().newClient (ctx )
99- 	r .cancelAuthRefresh , err  =  r .ec .Namespace ().negotiateClaim (ctx , client , r .ec .ManagementPath ())
100- 	if  err  !=  nil  {
101- 		tab .For (ctx ).Error (err )
102- 		_  =  client .Close ()
103- 		return  err 
164+ // returns true if the AMQP error is considered transient 
165+ func  isAMQPTransientError (ctx  context.Context , err  error ) bool  {
166+ 	// always retry on a detach error 
167+ 	var  amqpDetach  * amqp.DetachError 
168+ 	if  errors .As (err , & amqpDetach ) {
169+ 		return  true 
170+ 	}
171+ 	// for an AMQP error, only retry depending on the condition 
172+ 	var  amqpErr  * amqp.Error 
173+ 	if  errors .As (err , & amqpErr ) {
174+ 		switch  amqpErr .Condition  {
175+ 		case  errorServerBusy , errorTimeout , errorOperationCancelled , errorContainerClose :
176+ 			return  true 
177+ 		default :
178+ 			tab .For (ctx ).Debug (fmt .Sprintf ("isAMQPTransientError: condition %s is not transient" , amqpErr .Condition ))
179+ 			return  false 
180+ 		}
104181	}
105- 
106- 	r .client  =  client 
107- 	return  err 
182+ 	tab .For (ctx ).Debug (fmt .Sprintf ("isAMQPTransientError: %T is not transient" , err ))
183+ 	return  false 
108184}
109185
110186func  (r  * rpcClient ) ReceiveDeferred (ctx  context.Context , mode  ReceiveMode , sequenceNumbers  ... int64 ) ([]* Message , error ) {
111187	ctx , span  :=  startConsumerSpanFromContext (ctx , "sb.rpcClient.ReceiveDeferred" )
112188	defer  span .End ()
113189
114- 	if  err  :=  r .ensureConn (ctx ); err  !=  nil  {
115- 		tab .For (ctx ).Error (err )
116- 		return  nil , err 
117- 	}
118- 
119- 	r .clientMu .RLock ()
120- 	defer  r .clientMu .RUnlock ()
121- 
122190	const  messagesField , messageField  =  "messages" , "message" 
123191
124192	backwardsMode  :=  uint32 (0 )
@@ -137,20 +205,14 @@ func (r *rpcClient) ReceiveDeferred(ctx context.Context, mode ReceiveMode, seque
137205		values ["session-id" ] =  r .sessionID 
138206	}
139207
140- 	link , err  :=  rpc .NewLink (r .client , r .ec .ManagementPath (), opts ... )
141- 	if  err  !=  nil  {
142- 		tab .For (ctx ).Error (err )
143- 		return  nil , err 
144- 	}
145- 
146208	msg  :=  & amqp.Message {
147209		ApplicationProperties : map [string ]interface {}{
148210			operationFieldName : "com.microsoft:receive-by-sequence-number" ,
149211		},
150212		Value : values ,
151213	}
152214
153- 	rsp , err  :=  link . RetryableRPC (ctx , 5 , 5 * time .Second , msg )
215+ 	rsp , err  :=  r . doRPCWithRetry (ctx , r . ec . ManagementPath (),  msg ,  5 , 5 * time .Second , opts ... )
154216	if  err  !=  nil  {
155217		tab .For (ctx ).Error (err )
156218		return  nil , err 
@@ -227,14 +289,6 @@ func (r *rpcClient) GetNextPage(ctx context.Context, fromSequenceNumber int64, m
227289	ctx , span  :=  startConsumerSpanFromContext (ctx , "sb.rpcClient.GetNextPage" )
228290	defer  span .End ()
229291
230- 	if  err  :=  r .ensureConn (ctx ); err  !=  nil  {
231- 		tab .For (ctx ).Error (err )
232- 		return  nil , err 
233- 	}
234- 
235- 	r .clientMu .RLock ()
236- 	defer  r .clientMu .RUnlock ()
237- 
238292	const  messagesField , messageField  =  "messages" , "message" 
239293
240294	msg  :=  & amqp.Message {
@@ -251,13 +305,7 @@ func (r *rpcClient) GetNextPage(ctx context.Context, fromSequenceNumber int64, m
251305		msg .ApplicationProperties ["server-timeout" ] =  uint (time .Until (deadline ) /  time .Millisecond )
252306	}
253307
254- 	link , err  :=  rpc .NewLink (r .client , r .ec .ManagementPath ())
255- 	if  err  !=  nil  {
256- 		tab .For (ctx ).Error (err )
257- 		return  nil , err 
258- 	}
259- 
260- 	rsp , err  :=  link .RetryableRPC (ctx , 5 , 5 * time .Second , msg )
308+ 	rsp , err  :=  r .doRPCWithRetry (ctx , r .ec .ManagementPath (), msg , 5 , 5 * time .Second )
261309	if  err  !=  nil  {
262310		tab .For (ctx ).Error (err )
263311		return  nil , err 
@@ -348,14 +396,6 @@ func (r *rpcClient) RenewLocks(ctx context.Context, messages ...*Message) error
348396	ctx , span  :=  startConsumerSpanFromContext (ctx , "sb.RenewLocks" )
349397	defer  span .End ()
350398
351- 	if  err  :=  r .ensureConn (ctx ); err  !=  nil  {
352- 		tab .For (ctx ).Error (err )
353- 		return  err 
354- 	}
355- 
356- 	r .clientMu .RLock ()
357- 	defer  r .clientMu .RUnlock ()
358- 
359399	lockTokens  :=  make ([]amqp.UUID , 0 , len (messages ))
360400	for  _ , m  :=  range  messages  {
361401		if  m .LockToken  ==  nil  {
@@ -381,13 +421,7 @@ func (r *rpcClient) RenewLocks(ctx context.Context, messages ...*Message) error
381421		},
382422	}
383423
384- 	rpcLink , err  :=  rpc .NewLink (r .client , r .ec .ManagementPath ())
385- 	if  err  !=  nil  {
386- 		tab .For (ctx ).Error (err )
387- 		return  err 
388- 	}
389- 
390- 	response , err  :=  rpcLink .RetryableRPC (ctx , 3 , 1 * time .Second , renewRequestMsg )
424+ 	response , err  :=  r .doRPCWithRetry (ctx , r .ec .ManagementPath (), renewRequestMsg , 3 , 1 * time .Second )
391425	if  err  !=  nil  {
392426		tab .For (ctx ).Error (err )
393427		return  err 
@@ -406,14 +440,6 @@ func (r *rpcClient) SendDisposition(ctx context.Context, m *Message, state dispo
406440	ctx , span  :=  startConsumerSpanFromContext (ctx , "sb.rpcClient.SendDisposition" )
407441	defer  span .End ()
408442
409- 	if  err  :=  r .ensureConn (ctx ); err  !=  nil  {
410- 		tab .For (ctx ).Error (err )
411- 		return  err 
412- 	}
413- 
414- 	r .clientMu .RLock ()
415- 	defer  r .clientMu .RUnlock ()
416- 
417443	if  m .LockToken  ==  nil  {
418444		err  :=  errors .New ("lock token on the message is not set, thus cannot send disposition" )
419445		tab .For (ctx ).Error (err )
@@ -446,14 +472,8 @@ func (r *rpcClient) SendDisposition(ctx context.Context, m *Message, state dispo
446472		Value : value ,
447473	}
448474
449- 	link , err  :=  rpc .NewLink (r .client , m .ec .ManagementPath (), opts ... )
450- 	if  err  !=  nil  {
451- 		tab .For (ctx ).Error (err )
452- 		return  err 
453- 	}
454- 
455475	// no error, then it was successful 
456- 	_ , err  =   link . RetryableRPC (ctx , 5 , 5 * time .Second , msg )
476+ 	_ , err  :=   r . doRPCWithRetry (ctx , m . ec . ManagementPath (),  msg ,  5 , 5 * time .Second , opts ... )
457477	if  err  !=  nil  {
458478		tab .For (ctx ).Error (err )
459479		return  err 
@@ -468,14 +488,6 @@ func (r *rpcClient) ScheduleAt(ctx context.Context, enqueueTime time.Time, messa
468488	ctx , span  :=  startConsumerSpanFromContext (ctx , "sb.rpcClient.ScheduleAt" )
469489	defer  span .End ()
470490
471- 	if  err  :=  r .ensureConn (ctx ); err  !=  nil  {
472- 		tab .For (ctx ).Error (err )
473- 		return  nil , err 
474- 	}
475- 
476- 	r .clientMu .RLock ()
477- 	defer  r .clientMu .RUnlock ()
478- 
479491	if  len (messages ) <=  0  {
480492		return  nil , errors .New ("expected one or more messages" )
481493	}
@@ -531,13 +543,9 @@ func (r *rpcClient) ScheduleAt(ctx context.Context, enqueueTime time.Time, messa
531543		msg .ApplicationProperties [serverTimeoutFieldName ] =  uint (time .Until (deadline ) /  time .Millisecond )
532544	}
533545
534- 	link , err  :=  rpc .NewLink (r .client , r .ec .ManagementPath ())
535- 	if  err  !=  nil  {
536- 		return  nil , err 
537- 	}
538- 
539- 	resp , err  :=  link .RetryableRPC (ctx , 5 , 5 * time .Second , msg )
546+ 	resp , err  :=  r .doRPCWithRetry (ctx , r .ec .ManagementPath (), msg , 5 , 5 * time .Second )
540547	if  err  !=  nil  {
548+ 		tab .For (ctx ).Error (err )
541549		return  nil , err 
542550	}
543551
@@ -568,14 +576,6 @@ func (r *rpcClient) CancelScheduled(ctx context.Context, seq ...int64) error {
568576	ctx , span  :=  startConsumerSpanFromContext (ctx , "sb.rpcClient.CancelScheduled" )
569577	defer  span .End ()
570578
571- 	if  err  :=  r .ensureConn (ctx ); err  !=  nil  {
572- 		tab .For (ctx ).Error (err )
573- 		return  err 
574- 	}
575- 
576- 	r .clientMu .RLock ()
577- 	defer  r .clientMu .RUnlock ()
578- 
579579	msg  :=  & amqp.Message {
580580		ApplicationProperties : map [string ]interface {}{
581581			operationFieldName : cancelScheduledOperationID ,
@@ -589,13 +589,9 @@ func (r *rpcClient) CancelScheduled(ctx context.Context, seq ...int64) error {
589589		msg .ApplicationProperties [serverTimeoutFieldName ] =  uint (time .Until (deadline ) /  time .Millisecond )
590590	}
591591
592- 	link , err  :=  rpc .NewLink (r .client , r .ec .ManagementPath ())
593- 	if  err  !=  nil  {
594- 		return  err 
595- 	}
596- 
597- 	resp , err  :=  link .RetryableRPC (ctx , 5 , 5 * time .Second , msg )
592+ 	resp , err  :=  r .doRPCWithRetry (ctx , r .ec .ManagementPath (), msg , 5 , 5 * time .Second )
598593	if  err  !=  nil  {
594+ 		tab .For (ctx ).Error (err )
599595		return  err 
600596	}
601597
0 commit comments