11package com .alibaba .otter .canal .client .adapter .es8x .support ;
22
3- import com .alibaba .otter .canal .client .adapter .es .core .support .ESBulkRequest ;
3+ import java .io .IOException ;
4+ import java .io .InputStream ;
5+ import java .net .URI ;
6+ import java .net .URISyntaxException ;
7+ import java .net .UnknownHostException ;
8+ import java .nio .file .Files ;
9+ import java .nio .file .Path ;
10+ import java .nio .file .Paths ;
11+ import java .security .KeyStore ;
12+ import java .security .cert .Certificate ;
13+ import java .security .cert .CertificateFactory ;
14+ import java .util .Arrays ;
15+ import java .util .Map ;
16+
17+ import javax .net .ssl .SSLContext ;
18+
419import org .apache .commons .lang .StringUtils ;
520import org .apache .http .HttpHost ;
621import org .apache .http .auth .AuthScope ;
3247import org .slf4j .Logger ;
3348import org .slf4j .LoggerFactory ;
3449
35- import javax .net .ssl .SSLContext ;
36- import java .io .IOException ;
37- import java .io .InputStream ;
38- import java .net .URI ;
39- import java .net .URISyntaxException ;
40- import java .net .UnknownHostException ;
41- import java .nio .file .Files ;
42- import java .nio .file .Path ;
43- import java .nio .file .Paths ;
44- import java .security .KeyStore ;
45- import java .security .cert .Certificate ;
46- import java .security .cert .CertificateFactory ;
47- import java .util .Arrays ;
48- import java .util .Map ;
50+ import com .alibaba .otter .canal .client .adapter .es .core .support .ESBulkRequest ;
4951
5052/**
5153 * ES 连接器, 只支持 Rest 方式
@@ -59,14 +61,15 @@ public class ESConnection {
5961
6062 private RestHighLevelClient restHighLevelClient ;
6163
62- public ESConnection (String [] hosts , Map <String , String > properties ) throws UnknownHostException {
64+ public ESConnection (String [] hosts , Map <String , String > properties ) throws UnknownHostException {
6365 String caPath = properties .get ("security.ca.path" );
6466 if (StringUtils .isNotEmpty (caPath )) {
6567 connectEsWithCa (hosts , properties , caPath );
6668 } else {
6769 connectEsWithoutCa (hosts , properties );
6870 }
6971 }
72+
7073 private void connectEsWithCa (String [] hosts , Map <String , String > properties , String caPath ) {
7174 Path caCertificatePath = Paths .get (caPath );
7275 try (InputStream is = Files .newInputStream (caCertificatePath )) {
@@ -75,8 +78,7 @@ private void connectEsWithCa(String[] hosts, Map<String, String> properties, Str
7578 KeyStore trustStore = KeyStore .getInstance ("pkcs12" );
7679 trustStore .load (null , null );
7780 trustStore .setCertificateEntry ("ca" , trustedCa );
78- SSLContextBuilder sslContextBuilder = SSLContexts .custom ()
79- .loadTrustMaterial (trustStore , null );
81+ SSLContextBuilder sslContextBuilder = SSLContexts .custom ().loadTrustMaterial (trustStore , null );
8082 final SSLContext sslContext = sslContextBuilder .build ();
8183
8284 HttpHost [] httpHosts = Arrays .stream (hosts ).map (this ::createHttpHost ).toArray (HttpHost []::new );
@@ -86,13 +88,15 @@ private void connectEsWithCa(String[] hosts, Map<String, String> properties, Str
8688 String [] nameAndPwdArr = nameAndPwd .split (":" );
8789 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider ();
8890 credentialsProvider .setCredentials (AuthScope .ANY ,
89- new UsernamePasswordCredentials (nameAndPwdArr [0 ], nameAndPwdArr [1 ]));
91+ new UsernamePasswordCredentials (nameAndPwdArr [0 ], nameAndPwdArr [1 ]));
9092 restClientBuilder .setHttpClientConfigCallback (httpClientBuilder -> {
9193 httpClientBuilder .setDefaultCredentialsProvider (credentialsProvider );
9294 return httpClientBuilder .setSSLContext (sslContext );
9395 });
9496 }
95- restHighLevelClient = new RestHighLevelClientBuilder (restClientBuilder .build ()).setApiCompatibilityMode (true ).build ();
97+ restHighLevelClient = new RestHighLevelClientBuilder (restClientBuilder .build ())
98+ .setApiCompatibilityMode (true )
99+ .build ();
96100 } catch (Exception e ) {
97101 throw new RuntimeException (e );
98102 }
@@ -106,12 +110,12 @@ private void connectEsWithoutCa(String[] hosts, Map<String, String> properties)
106110 String [] nameAndPwdArr = nameAndPwd .split (":" );
107111 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider ();
108112 credentialsProvider .setCredentials (AuthScope .ANY ,
109- new UsernamePasswordCredentials (nameAndPwdArr [0 ], nameAndPwdArr [1 ]));
113+ new UsernamePasswordCredentials (nameAndPwdArr [0 ], nameAndPwdArr [1 ]));
110114 restClientBuilder .setHttpClientConfigCallback (
111- httpClientBuilder -> httpClientBuilder .setDefaultCredentialsProvider (credentialsProvider ));
115+ httpClientBuilder -> httpClientBuilder .setDefaultCredentialsProvider (credentialsProvider ));
112116 }
113117 restHighLevelClient = new RestHighLevelClientBuilder (restClientBuilder .build ()).setApiCompatibilityMode (true )
114- .build ();
118+ .build ();
115119 }
116120
117121 public void close () {
@@ -147,9 +151,9 @@ public class ES8xIndexRequest implements ESBulkRequest.ESIndexRequest {
147151
148152 private IndexRequestBuilder indexRequestBuilder ;
149153
150- private IndexRequest indexRequest ;
154+ private IndexRequest indexRequest ;
151155
152- public ES8xIndexRequest (String index , String id ) {
156+ public ES8xIndexRequest (String index , String id ){
153157 indexRequest = new IndexRequest (index );
154158 indexRequest .id (id );
155159
@@ -190,9 +194,9 @@ public class ES8xUpdateRequest implements ESBulkRequest.ESUpdateRequest {
190194
191195 private UpdateRequestBuilder updateRequestBuilder ;
192196
193- private UpdateRequest updateRequest ;
197+ private UpdateRequest updateRequest ;
194198
195- public ES8xUpdateRequest (String index , String id ) {
199+ public ES8xUpdateRequest (String index , String id ){
196200
197201 updateRequest = new UpdateRequest (index , id );
198202 }
@@ -239,9 +243,9 @@ public class ES8xDeleteRequest implements ESBulkRequest.ESDeleteRequest {
239243
240244 private DeleteRequestBuilder deleteRequestBuilder ;
241245
242- private DeleteRequest deleteRequest ;
246+ private DeleteRequest deleteRequest ;
243247
244- public ES8xDeleteRequest (String index , String id ) {
248+ public ES8xDeleteRequest (String index , String id ){
245249
246250 deleteRequest = new DeleteRequest (index , id );
247251
@@ -268,11 +272,11 @@ public class ESSearchRequest {
268272
269273 private SearchRequestBuilder searchRequestBuilder ;
270274
271- private SearchRequest searchRequest ;
275+ private SearchRequest searchRequest ;
272276
273- private SearchSourceBuilder sourceBuilder ;
277+ private SearchSourceBuilder sourceBuilder ;
274278
275- public ESSearchRequest (String index ) {
279+ public ESSearchRequest (String index ){
276280
277281 searchRequest = new SearchRequest (index );
278282 sourceBuilder = new SearchSourceBuilder ();
@@ -325,12 +329,10 @@ public class ES8xBulkRequest implements ESBulkRequest {
325329
326330 private BulkRequestBuilder bulkRequestBuilder ;
327331
328- private BulkRequest bulkRequest ;
329-
330- public ES8xBulkRequest () {
332+ private BulkRequest bulkRequest ;
331333
334+ public ES8xBulkRequest (){
332335 bulkRequest = new BulkRequest ();
333-
334336 }
335337
336338 public void resetBulk () {
@@ -398,7 +400,7 @@ public static class ES8xBulkResponse implements ESBulkRequest.ESBulkResponse {
398400
399401 private BulkResponse bulkResponse ;
400402
401- public ES8xBulkResponse (BulkResponse bulkResponse ) {
403+ public ES8xBulkResponse (BulkResponse bulkResponse ){
402404 this .bulkResponse = bulkResponse ;
403405 }
404406
@@ -438,7 +440,7 @@ private HttpHost createHttpHost(String uriStr) {
438440 }
439441 try {
440442 return HttpHost .create (new URI (uri
441- .getScheme (), null , uri .getHost (), uri .getPort (), uri .getPath (), uri .getQuery (), uri .getFragment ())
443+ .getScheme (), null , uri .getHost (), uri .getPort (), uri .getPath (), uri .getQuery (), uri .getFragment ())
442444 .toString ());
443445 } catch (URISyntaxException ex ) {
444446 throw new IllegalStateException (ex );
0 commit comments