20
20
import com .tencent .polaris .api .config .Configuration ;
21
21
import com .tencent .polaris .api .config .global .ClusterType ;
22
22
import com .tencent .polaris .api .config .global .ServerConnectorConfig ;
23
+ import com .tencent .polaris .api .config .verify .DefaultValues ;
23
24
import com .tencent .polaris .api .control .Destroyable ;
24
25
import com .tencent .polaris .api .exception .ErrorCode ;
25
26
import com .tencent .polaris .api .exception .PolarisException ;
50
51
import java .util .concurrent .TimeUnit ;
51
52
import java .util .concurrent .atomic .AtomicBoolean ;
52
53
import java .util .concurrent .atomic .AtomicReference ;
54
+ import java .util .function .Consumer ;
53
55
import org .slf4j .Logger ;
54
56
55
57
/**
@@ -74,6 +76,7 @@ public class ConnectionManager extends Destroyable {
74
76
private final String clientId ;
75
77
private Extensions extensions ;
76
78
private final ChannelTlsCertificates tlsCertificates ;
79
+ private Consumer <ConnID > callbackOnSwitched ;
77
80
78
81
/**
79
82
* 构造器
@@ -159,6 +162,15 @@ public boolean checkReady(ClusterType clusterType) {
159
162
return serverAddressList .ready .get ();
160
163
}
161
164
165
+ public Consumer <ConnID > getCallbackOnSwitched () {
166
+ return callbackOnSwitched ;
167
+ }
168
+
169
+ public void setCallbackOnSwitched (
170
+ Consumer <ConnID > callbackOnSwitched ) {
171
+ this .callbackOnSwitched = callbackOnSwitched ;
172
+ }
173
+
162
174
/**
163
175
* 设置准备状态
164
176
*
@@ -257,9 +269,6 @@ private class SwitchServerTask implements Runnable {
257
269
public void run () {
258
270
for (Map .Entry <ClusterType , ServerAddressList > entry : serverAddresses .entrySet ()) {
259
271
ClusterType clusterType = entry .getKey ();
260
- if (clusterType == ClusterType .BUILTIN_CLUSTER ) {
261
- continue ;
262
- }
263
272
try {
264
273
ServerAddressList serverAddressList = entry .getValue ();
265
274
serverAddressList .switchClient ();
@@ -399,21 +408,35 @@ public void switchClientOnFail(ConnID lastConn) throws PolarisException {
399
408
//已经完成切换,不处理
400
409
return ;
401
410
}
402
- Node servAddress = getServerAddress ();
403
- if (null == servAddress ) {
404
- return ;
405
- }
406
- if (null != curConnection ) {
407
- if (servAddress .getHost ().equals (curConnection .getConnID ().getHost ())
408
- && servAddress .getPort () == curConnection .getConnID ().getPort ()) {
409
- return ;
410
- }
411
- curConnection .lazyClose ();
412
- }
413
- ConnID connID = new ConnID (serverServiceInfo .getServiceKey (), clusterType , servAddress .getHost (),
414
- servAddress .getPort (), protocol );
415
- Connection connection = connectTarget (connID );
416
- curConnectionValue .set (connection );
411
+ doSwitchClient (curConnection );
412
+ }
413
+ }
414
+
415
+ private void doSwitchClient (Connection curConnection ) throws PolarisException {
416
+ Node servAddress = getServerAddress ();
417
+ if (null == servAddress ) {
418
+ return ;
419
+ }
420
+ String preAddress = null ;
421
+ if (null != curConnection ) {
422
+ curConnection .lazyClose ();
423
+ preAddress = String .format (
424
+ "%s:%d" , curConnection .getConnID ().getHost (), curConnection .getConnID ().getPort ());
425
+ }
426
+ String namespace = DefaultValues .DEFAULT_SYSTEM_NAMESPACE ;
427
+ String serviceName = DefaultValues .DEFAULT_BUILTIN_DISCOVER ;
428
+ if (null != serverServiceInfo ) {
429
+ namespace = serverServiceInfo .getServiceKey ().getNamespace ();
430
+ serviceName = serverServiceInfo .getServiceKey ().getService ();
431
+ }
432
+ ConnID connID = new ConnID (new ServiceKey (namespace , serviceName ), clusterType , servAddress .getHost (),
433
+ servAddress .getPort (), protocol );
434
+ Connection connection = connectTarget (connID );
435
+ curConnectionValue .set (connection );
436
+ LOG .info ("server {} connection switched from {} to {}:{}" ,
437
+ serviceName , preAddress , servAddress .getHost (), servAddress .getPort ());
438
+ if (null != callbackOnSwitched ) {
439
+ callbackOnSwitched .accept (connection .getConnID ());
417
440
}
418
441
}
419
442
@@ -423,25 +446,12 @@ public void switchClient() throws PolarisException {
423
446
if (!Connection .isAvailableConnection (curConnection )) {
424
447
return ;
425
448
}
426
- LOG .info ("start switch for {}" , serverServiceInfo .getServiceKey ());
427
449
synchronized (lock ) {
428
450
curConnection = curConnectionValue .get ();
429
451
if (!Connection .isAvailableConnection (curConnection )) {
430
452
return ;
431
453
}
432
- Node servAddress = getServerAddress ();
433
- if (null == servAddress ) {
434
- return ;
435
- }
436
- if (servAddress .getHost ().equals (curConnection .getConnID ().getHost ())
437
- && servAddress .getPort () == curConnection .getConnID ().getPort ()) {
438
- return ;
439
- }
440
- ConnID connID = new ConnID (serverServiceInfo .getServiceKey (), clusterType , servAddress .getHost (),
441
- servAddress .getPort (), protocol );
442
- Connection connection = connectTarget (connID );
443
- curConnection .lazyClose ();
444
- curConnectionValue .set (connection );
454
+ doSwitchClient (curConnection );
445
455
}
446
456
}
447
457
0 commit comments