22
33import com .netease .nim .lbd .util .AdjustCount ;
44import com .netease .nim .lbd .util .AutoAdjustQueue ;
5+ import com .netease .nim .lbd .util .CloseUtils ;
56import com .netease .nim .lbd .util .NamedThreadFactory ;
67import org .slf4j .Logger ;
78import org .slf4j .LoggerFactory ;
89
10+ import java .sql .ResultSet ;
911import java .sql .SQLException ;
12+ import java .sql .Statement ;
1013import java .util .*;
1114import java .util .concurrent .ConcurrentHashMap ;
1215import java .util .concurrent .Executors ;
@@ -316,8 +319,6 @@ private AtomicInteger getConnectErrorCount(SqlProxy sqlProxy) {
316319 return count ;
317320 }
318321
319-
320-
321322 //定时检查所有的sql-proxy是否可达
322323 private void checkReachable () {
323324 if (healthCheckStatus .compareAndSet (false , true )) {
@@ -331,7 +332,7 @@ private void checkReachable() {
331332 if (!pool .isOnline ()) {
332333 continue ;
333334 }
334- boolean reachable = checkReachable0 (sqlProxy );
335+ boolean reachable = checkReachable0 (sqlProxy , pool );
335336 lock .lock ();
336337 try {
337338 if (reachable && !pool .isReachable ()) {
@@ -354,17 +355,44 @@ private void checkReachable() {
354355 }
355356
356357 //检查sql proxy是否可达
357- private boolean checkReachable0 (SqlProxy sqlProxy ) {
358- RealConnection realConnection = new RealConnection (sqlProxy , LBDriverEnv .getRealDriver (), lbDriverUrl );
358+ private boolean checkReachable0 (SqlProxy sqlProxy , SqlProxyConnectionPool pool ) {
359+ RealConnection connection ;
360+ if (pool == null ) {
361+ connection = new RealConnection (sqlProxy , LBDriverEnv .getRealDriver (), lbDriverUrl );
362+ } else {
363+ connection = pool .checkReadableConnection ;
364+ if (connection == null ) {
365+ connection = new RealConnection (sqlProxy , LBDriverEnv .getRealDriver (), lbDriverUrl );
366+ pool .checkReadableConnection = connection ;
367+ }
368+ }
369+ ResultSet rs = null ;
370+ Statement stmt = null ;
371+ boolean reachable ;
372+ boolean exception = false ;
359373 try {
360- realConnection .syncCreating ();
361- return true ;
374+ connection .syncCreating ();
375+ stmt = connection .getPhysicalConnection ().createStatement ();
376+ rs = stmt .executeQuery (Constants .VALIDATION_QUERY );
377+ reachable = rs .next ();
362378 } catch (Exception e ) {
379+ exception = true ;
380+ reachable = false ;
381+ }
382+ if (!reachable ) {
363383 logger .warn ("sql proxy = {} not reachable" , sqlProxy );
364- return false ;
365- } finally {
366- realConnection .close ();
367384 }
385+ CloseUtils .close (rs );
386+ CloseUtils .close (stmt );
387+ if (pool == null ) {
388+ CloseUtils .close (connection );
389+ } else {
390+ if (exception ) {
391+ CloseUtils .close (connection );
392+ pool .checkReadableConnection = null ;
393+ }
394+ }
395+ return reachable ;
368396 }
369397
370398 //定时检查是否连接数均衡
@@ -474,21 +502,33 @@ private void checkSqlProxyListFromProvider() {
474502 private void removeOfflineSqlProxy () {
475503 try {
476504 Set <SqlProxy > set = new HashSet <>(poolMap .keySet ());
505+ Set <RealConnection > toClosedConnection = new HashSet <>();
506+ Set <SqlProxy > offlinedSqlProxySet = new HashSet <>();
477507 for (SqlProxy sqlProxy : set ) {
478508 lock .lock ();
479509 try {
480510 SqlProxyConnectionPool pool = poolMap .get (sqlProxy );
481511 if (pool != null && !pool .isOnline () && pool .isConnectionZero ()) {
482512 poolMap .remove (sqlProxy );
483513 connectErrorCountMap .remove (sqlProxy );
484- logger .info ("offline sql proxy = {} removed for connection 0" , sqlProxy );
514+ RealConnection removed = pool .checkReadableConnection ;
515+ if (removed != null ) {
516+ toClosedConnection .add (removed );
517+ }
518+ offlinedSqlProxySet .add (sqlProxy );
485519 }
486520 } catch (Exception e ) {
487521 logger .error ("remove offline sql proxy error, sql proxy = {}" , sqlProxy , e );
488522 } finally {
489523 lock .unlock ();
490524 }
491525 }
526+ for (SqlProxy sqlProxy : offlinedSqlProxySet ) {
527+ logger .info ("offline sql proxy = {} removed for connection 0" , sqlProxy );
528+ }
529+ for (RealConnection realConnection : toClosedConnection ) {
530+ CloseUtils .close (realConnection );
531+ }
492532 } catch (Exception e ) {
493533 logger .error ("remove offline sql proxy error" , e );
494534 }
@@ -500,7 +540,7 @@ private void addSqlProxy(List<SqlProxy> list) {
500540 return ;
501541 }
502542 for (SqlProxy sqlProxy : list ) {
503- boolean reachable = checkReachable0 (sqlProxy );
543+ boolean reachable = checkReachable0 (sqlProxy , null );
504544 lock .lock ();
505545 try {
506546 SqlProxyConnectionPool pool = poolMap .get (sqlProxy );
@@ -613,6 +653,9 @@ private class SqlProxyConnectionPool {
613653 private final AtomicLong closeCount = new AtomicLong (0 );
614654 private final AtomicLong reuseCount = new AtomicLong (0 );
615655
656+ //用于检测健康检查的链接
657+ private RealConnection checkReadableConnection ;
658+
616659 //空闲的连接
617660 private final Deque <RealConnection > idleConnections = new ArrayDeque <>();
618661 //使用中的连接
0 commit comments