33import cn .hutool .http .HttpUtil ;
44import io .tapdata .common .CommonDbTest ;
55import io .tapdata .connector .doris .bean .DorisConfig ;
6- import io .tapdata .connector .doris .streamload .DorisStreamLoader ;
76import io .tapdata .kit .EmptyKit ;
8- import io .tapdata .kit .ErrorKit ;
97import io .tapdata .pdk .apis .entity .TestItem ;
10- import io .tapdata .pdk .apis .exception .testItem .TapTestConnectionEx ;
11- import io .tapdata .pdk .apis .exception .testItem .TapTestUnknownEx ;
12- import io .tapdata .pdk .apis .exception .testItem .TapTestVersionEx ;
13- import io .tapdata .pdk .apis .exception .testItem .TapTestWritePrivilegeEx ;
8+ import io .tapdata .pdk .apis .exception .testItem .*;
9+ import io .tapdata .util .NetUtil ;
1410import org .apache .http .client .methods .HttpGet ;
1511import org .apache .http .impl .client .CloseableHttpClient ;
1612import org .apache .http .util .EntityUtils ;
1713
14+ import java .io .IOException ;
1815import java .net .URI ;
1916import java .sql .Connection ;
2017import java .sql .SQLException ;
@@ -71,6 +68,14 @@ protected Boolean testWritePrivilege() {
7168 jdbcContext .normalQuery ("show backends" , resultSet -> {
7269 while (resultSet .next ()) {
7370 beCount .incrementAndGet ();
71+ String beHost = (resultSet .getString ("Host" ));
72+ Integer httpPort = (resultSet .getInt ("HttpPort" ));
73+ if (null == beHost || null == httpPort ) continue ;
74+ try {
75+ NetUtil .validateHostPortWithSocket (beHost , httpPort );
76+ } catch (IOException e ) {
77+ throw new TapTestHostPortEx (e , beHost , String .valueOf (httpPort ));
78+ }
7479 }
7580 });
7681 } catch (SQLSyntaxErrorException e ) {
@@ -82,6 +87,9 @@ protected Boolean testWritePrivilege() {
8287 beCount .set (1 );
8388 }
8489 }
90+ } catch (TapTestHostPortEx e ) {
91+ consumer .accept (new TestItem (TestItem .ITEM_WRITE , TestItem .RESULT_FAILED , "Validate BE nodes failed: " + e .getMessage ()));
92+ return false ;
8593 }
8694
8795 List <String > sqls = new ArrayList <>();
0 commit comments