diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java index f9aad4ffd..853f93c33 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -43,6 +43,7 @@ import org.apache.doris.flink.rest.models.Tablet; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.HttpGetWithEntity; +import org.apache.doris.flink.util.DorisUrlUtils; import org.apache.http.HttpHeaders; import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; @@ -93,11 +94,6 @@ public class RestService implements Serializable { private static final String BACKENDS_V2 = "/api/backends?is_alive=true"; private static final String FE_LOGIN = "/rest/v1/login"; private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema"; - private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema"; - private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan"; - private static final String STATEMENT_EXEC_API = - "http://%s/api/query/default_cluster/information_schema"; /** * send request to Doris FE and get response json string. @@ -402,30 +398,32 @@ public static Schema getSchema( throws DorisException { logger.trace("Finding schema."); String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger); - String tableSchemaUri; + String tableSchemaUri = + buildTableSchemaUri(randomEndpoint(options.getFenodes(), logger), tableIdentifier); + HttpGet httpGet = new HttpGet(tableSchemaUri); + String response = send(options, readOptions, httpGet, logger); + logger.debug("Find schema response is '{}'.", response); + return parseSchema(response, logger); + } + + @VisibleForTesting + static String buildTableSchemaUri(String feNode, String... tableIdentifier) + throws IllegalArgumentException { if (tableIdentifier.length == 2) { - tableSchemaUri = - String.format( - TABLE_SCHEMA_API, - randomEndpoint(options.getFenodes(), logger), - tableIdentifier[0], - tableIdentifier[1]); + return DorisUrlUtils.buildHttpUrl( + feNode, "api", tableIdentifier[0], tableIdentifier[1], "_schema"); } else if (tableIdentifier.length == 3) { - tableSchemaUri = - String.format( - CATALOG_TABLE_SCHEMA_API, - randomEndpoint(options.getFenodes(), logger), - tableIdentifier[0], - tableIdentifier[1], - tableIdentifier[2]); + return DorisUrlUtils.buildHttpUrl( + feNode, + "api", + tableIdentifier[0], + tableIdentifier[1], + tableIdentifier[2], + "_schema"); } else { throw new IllegalArgumentException( "table identifier is illegal, should be db.table or catalog.db.table"); } - HttpGet httpGet = new HttpGet(tableSchemaUri); - String response = send(options, readOptions, httpGet, logger); - logger.debug("Find schema response is '{}'.", response); - return parseSchema(response, logger); } @VisibleForTesting @@ -435,11 +433,12 @@ public static Schema getSchema( Object responseData = null; try { String tableSchemaUri = - String.format( - TABLE_SCHEMA_API, + DorisUrlUtils.buildHttpUrl( randomEndpoint(dorisOptions.getFenodes(), logger), + "api", db, - table); + table, + "_schema"); HttpGetWithEntity httpGet = new HttpGetWithEntity(tableSchemaUri); httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader(dorisOptions)); JsonNode response = handleResponse(httpGet, logger); @@ -484,7 +483,12 @@ public static Integer tryGetArrowFlightSqlPort( Map param = new HashMap<>(); param.put("stmt", "show frontends"); String requestUrl = - String.format(STATEMENT_EXEC_API, randomEndpoint(options.getFenodes(), logger)); + DorisUrlUtils.buildHttpUrl( + randomEndpoint(options.getFenodes(), logger), + "api", + "query", + "default_cluster", + "information_schema"); HttpPost httpPost = new HttpPost(requestUrl); httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader(options)); httpPost.setHeader( @@ -621,8 +625,7 @@ public static List findPartitions( String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger); String queryPlanUri = - String.format( - QUERY_PLAN_API, + buildQueryPlanUri( randomEndpoint(options.getFenodes(), logger), tableIdentifier[0], tableIdentifier[1]); @@ -648,6 +651,11 @@ public static List findPartitions( logger); } + @VisibleForTesting + static String buildQueryPlanUri(String feNode, String database, String table) { + return DorisUrlUtils.buildHttpUrl(feNode, "api", database, table, "_query_plan"); + } + /** * translate Doris FE response string to inner {@link QueryPlan} struct. * diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index fc21bb979..d0048bbd7 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -34,6 +34,7 @@ import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.writer.LabelGenerator; +import org.apache.doris.flink.util.DorisUrlUtils; import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.impl.client.CloseableHttpClient; @@ -90,7 +91,6 @@ public class DorisBatchStreamLoad implements Serializable { private static final long STREAM_LOAD_MAX_ROWS = Integer.MAX_VALUE; private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; - private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; private String loadUrl; private String hostPort; private final String username; @@ -157,7 +157,7 @@ public DorisBatchStreamLoad( Preconditions.checkState( tableInfo.length == 2, "tableIdentifier input error, the format is database.table"); - this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]); + this.loadUrl = buildLoadUrl(hostPort, tableInfo[0], tableInfo[1]); } this.loadAsyncExecutor = new LoadAsyncExecutor(executionOptions.getFlushQueueSize()); this.loadExecutorService = @@ -567,10 +567,15 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { private void refreshLoadUrl(String database, String table) { hostPort = backendUtil.getAvailableBackend(subTaskId); - loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, table); + loadUrl = buildLoadUrl(hostPort, database, table); } } + @VisibleForTesting + static String buildLoadUrl(String hostPort, String database, String table) { + return DorisUrlUtils.buildHttpUrl(hostPort, "api", database, table, "_stream_load"); + } + static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final AtomicInteger threadNumber = new AtomicInteger(1); diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index 2593e3032..146e83ef5 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -33,6 +33,7 @@ import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.ResponseUtil; +import org.apache.doris.flink.util.DorisUrlUtils; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; @@ -52,7 +53,6 @@ /** The committer to commit transaction. */ public class DorisCommitter implements Committer, Closeable { private static final Logger LOG = LoggerFactory.getLogger(DorisCommitter.class); - private static final String commitPattern = "http://%s/api/%s/_stream_load_2pc"; private final CloseableHttpClient httpClient; private final DorisOptions dorisOptions; private final DorisReadOptions dorisReadOptions; @@ -116,7 +116,9 @@ private void commitTransaction(DorisCommittable committable) throws IOException int retry = 0; while (retry <= maxRetry) { // get latest-url - String url = String.format(commitPattern, hostPort, committable.getDb()); + String url = + DorisUrlUtils.buildHttpUrl( + hostPort, "api", committable.getDb(), "_stream_load_2pc"); HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build(); // http execute... diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 50ec1d34a..eb8ca50e7 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.schema; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.StringUtils; @@ -35,6 +36,7 @@ import org.apache.doris.flink.rest.models.Field; import org.apache.doris.flink.rest.models.Schema; import org.apache.doris.flink.sink.HttpGetWithEntity; +import org.apache.doris.flink.util.DorisUrlUtils; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -56,9 +58,6 @@ public class SchemaChangeManager implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class); - private static final String CHECK_SCHEMA_CHANGE_API = - "http://%s/api/enable_light_schema_change/%s/%s"; - private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; private ObjectMapper objectMapper = new ObjectMapper(); private DorisOptions dorisOptions; private String charsetEncoding = "UTF-8"; @@ -209,12 +208,7 @@ public boolean checkSchemaChange(String database, String table, Map param = new HashMap<>(); param.put("stmt", ddl); - String requestUrl = - String.format( - SCHEMA_CHANGE_API, - RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), - database); + String requestUrl = buildSchemaChangeUrl(database); HttpPost httpPost = new HttpPost(requestUrl); httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); httpPost.setHeader( @@ -292,6 +282,27 @@ public HttpPost buildHttpPost(String ddl, String database) return httpPost; } + @VisibleForTesting + String buildCheckSchemaChangeUrl(String database, String table) + throws IllegalArgumentException { + return DorisUrlUtils.buildHttpUrl( + RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), + "api", + "enable_light_schema_change", + database, + table); + } + + @VisibleForTesting + String buildSchemaChangeUrl(String database) throws IllegalArgumentException { + return DorisUrlUtils.buildHttpUrl( + RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), + "api", + "query", + "default_cluster", + database); + } + private String handleResponse(HttpUriRequest request) { try (CloseableHttpClient httpclient = HttpClients.createDefault()) { CloseableHttpResponse response = httpclient.execute(request); diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 18a979598..e25d3fa27 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -35,6 +35,7 @@ import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.LoadStatus; import org.apache.doris.flink.sink.ResponseUtil; +import org.apache.doris.flink.util.DorisUrlUtils; import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.InputStreamEntity; @@ -77,8 +78,6 @@ public class DorisStreamLoad implements Serializable { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; - private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; - private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc"; public static final String JOB_EXIST_FINISHED = "FINISHED"; private String loadUrlStr; @@ -114,8 +113,8 @@ public DorisStreamLoad( this.user = dorisOptions.getUsername(); this.passwd = dorisOptions.getPassword(); this.labelGenerator = labelGenerator; - this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table); - this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db); + this.loadUrlStr = buildLoadUrl(hostPort); + this.abortUrlStr = buildAbortUrl(hostPort); this.enable2PC = executionOptions.enabled2PC(); this.streamLoadProp = executionOptions.getStreamLoadProp(); this.enableDelete = executionOptions.getDeletable(); @@ -175,8 +174,16 @@ public byte[] getLineDelimiter() { public void setHostPort(String hostPort) { this.hostPort = hostPort; - this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, this.db, this.table); - this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db); + this.loadUrlStr = buildLoadUrl(hostPort); + this.abortUrlStr = buildAbortUrl(hostPort); + } + + private String buildLoadUrl(String hostPort) { + return DorisUrlUtils.buildHttpUrl(hostPort, "api", db, table, "_stream_load"); + } + + private String buildAbortUrl(String hostPort) { + return DorisUrlUtils.buildHttpUrl(hostPort, "api", db, "_stream_load_2pc"); } public Future getPendingLoadFuture() { diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/util/DorisUrlUtils.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/util/DorisUrlUtils.java new file mode 100644 index 000000000..297eedb00 --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/util/DorisUrlUtils.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.util; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; + +public class DorisUrlUtils { + private DorisUrlUtils() {} + + public static String buildHttpUrl(String hostPort, String... pathSegments) { + StringBuilder builder = new StringBuilder("http://").append(hostPort); + for (String pathSegment : pathSegments) { + builder.append('/').append(encodePathSegment(pathSegment)); + } + return builder.toString(); + } + + public static String encodePathSegment(String segment) { + try { + return URLEncoder.encode(segment, StandardCharsets.UTF_8.name()).replace("+", "%20"); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException("UTF-8 is not supported.", e); + } + } +} diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java index 6fe176cc2..3e996dc87 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java @@ -128,6 +128,20 @@ public void testParseIdentifierIllegal() throws Exception { RestService.parseIdentifier(invalidIdentifier3, logger); } + @Test + public void testBuildIdentifierPathUrls() throws Exception { + Assert.assertEquals( + "http://127.0.0.1:8030/api/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_schema", + RestService.buildTableSchemaUri("127.0.0.1:8030", "ods", "ods_新券表_copy1")); + Assert.assertEquals( + "http://127.0.0.1:8030/api/internal/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_schema", + RestService.buildTableSchemaUri( + "127.0.0.1:8030", "internal", "ods", "ods_新券表_copy1")); + Assert.assertEquals( + "http://127.0.0.1:8030/api/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_query_plan", + RestService.buildQueryPlanUri("127.0.0.1:8030", "ods", "ods_新券表_copy1")); + } + @Test public void testChoiceFe() throws Exception { String validFes = "1,2,3"; diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java index f45e69aca..61a5dd6f3 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java @@ -90,6 +90,13 @@ public void testInit() { options, readOptions, executionOptions, new LabelGenerator("xx", false), 0); } + @Test + public void testBuildEncodedLoadUrl() { + Assert.assertEquals( + "http://127.0.0.1:8040/api/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_stream_load", + DorisBatchStreamLoad.buildLoadUrl("127.0.0.1:8040", "ods", "ods_新券表_copy1")); + } + @Test public void testLoadFail() throws Exception { LOG.info("testLoadFail start"); diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java index e14a04606..9da26bc7f 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java @@ -30,13 +30,16 @@ import org.apache.http.ProtocolVersion; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.message.BasicStatusLine; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; import java.util.Collections; @@ -44,6 +47,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** Test for Doris Committer. */ @@ -101,6 +105,31 @@ public void testCommitted() throws Exception { dorisCommitter.commit(Collections.singletonList(request)); } + @Test + public void testBuildEncodedCommitUrl() throws Exception { + DorisOptions dorisOptions = OptionUtils.buildDorisOptions(); + DorisReadOptions readOptions = OptionUtils.buildDorisReadOptions(); + DorisExecutionOptions executionOptions = OptionUtils.buildExecutionOptional(); + CloseableHttpClient httpClient = mock(CloseableHttpClient.class); + CloseableHttpResponse response = mock(CloseableHttpResponse.class); + HttpEntityMock successEntity = new HttpEntityMock(); + successEntity.setValue("{\"status\":\"Success\",\"msg\":\"ok\"}"); + when(response.getStatusLine()).thenReturn(normalLine); + when(response.getEntity()).thenReturn(successEntity); + when(httpClient.execute(any())).thenReturn(response); + + DorisCommitter committer = + new DorisCommitter(dorisOptions, readOptions, executionOptions, httpClient); + DorisCommittable unicodeCommittable = new DorisCommittable("127.0.0.1:8040", "数仓", 1); + committer.commit(Collections.singletonList(new MockCommitRequest<>(unicodeCommittable))); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpPut.class); + verify(httpClient).execute(requestCaptor.capture()); + Assert.assertEquals( + "http://127.0.0.1:8040/api/%E6%95%B0%E4%BB%93/_stream_load_2pc", + requestCaptor.getValue().getURI().toASCIIString()); + } + @Test public void testCommitAbort() throws Exception { when(httpResponse.getStatusLine()).thenReturn(normalLine); diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeManagerTest.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeManagerTest.java new file mode 100644 index 000000000..ff58d9744 --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeManagerTest.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.schema; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.sink.BackendUtil; +import org.apache.http.client.methods.HttpPost; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.MockedStatic; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; + +public class SchemaChangeManagerTest { + @Test + public void testBuildEncodedSchemaChangeUrls() throws Exception { + DorisOptions dorisOptions = + DorisOptions.builder() + .setFenodes("127.0.0.1:8030") + .setTableIdentifier("ods.ods_新券表_copy1") + .setUsername("root") + .setPassword("") + .build(); + try (MockedStatic backendUtilMockedStatic = mockStatic(BackendUtil.class)) { + backendUtilMockedStatic + .when(() -> BackendUtil.tryHttpConnection(any())) + .thenReturn(true); + SchemaChangeManager manager = new SchemaChangeManager(dorisOptions); + + Assert.assertEquals( + "http://127.0.0.1:8030/api/enable_light_schema_change/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1", + manager.buildCheckSchemaChangeUrl("ods", "ods_新券表_copy1")); + Assert.assertEquals( + "http://127.0.0.1:8030/api/query/default_cluster/%E6%95%B0%E4%BB%93", + manager.buildSchemaChangeUrl("数仓")); + + HttpPost httpPost = manager.buildHttpPost("select 1", "数仓"); + Assert.assertEquals( + "http://127.0.0.1:8030/api/query/default_cluster/%E6%95%B0%E4%BB%93", + httpPost.getURI().toASCIIString()); + } + } +} diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java index cda8674ad..968f928a5 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java @@ -25,12 +25,14 @@ import org.apache.doris.flink.sink.HttpTestUtil; import org.apache.doris.flink.sink.OptionUtils; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.impl.client.CloseableHttpClient; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -42,6 +44,8 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** test for DorisStreamLoad. */ @@ -82,6 +86,42 @@ public void testAbortPreCommit() throws Exception { dorisStreamLoad.abortPreCommit("test001", 1); } + @Test + public void testBuildEncodedLoadAndAbortUrl() throws Exception { + DorisOptions unicodeOptions = + DorisOptions.builder() + .setFenodes("127.0.0.1:8030") + .setTableIdentifier("ods.ods_新券表_copy1") + .setUsername("root") + .setPassword("") + .build(); + CloseableHttpClient httpClient = mock(CloseableHttpClient.class); + CloseableHttpResponse preCommitResponse = + HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_TABLE_RESPONSE, true); + CloseableHttpResponse abortSuccessResponse = + HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true); + when(httpClient.execute(any())).thenReturn(preCommitResponse, abortSuccessResponse); + + DorisStreamLoad dorisStreamLoad = + new DorisStreamLoad( + "127.0.0.1:8040", + unicodeOptions, + executionOptions, + new LabelGenerator("test001", true, "ods.ods_新券表_copy1", 0), + httpClient); + dorisStreamLoad.abortPreCommit("test001", 1); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(HttpUriRequest.class); + verify(httpClient, times(2)).execute(requestCaptor.capture()); + Assert.assertEquals( + "http://127.0.0.1:8040/api/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_stream_load", + requestCaptor.getAllValues().get(0).getURI().toASCIIString()); + Assert.assertEquals( + "http://127.0.0.1:8040/api/ods/_stream_load_2pc", + requestCaptor.getAllValues().get(1).getURI().toASCIIString()); + } + @Test public void testAbortTransaction() throws Exception { CloseableHttpClient httpClient = mock(CloseableHttpClient.class); diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/util/DorisUrlUtilsTest.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/util/DorisUrlUtilsTest.java new file mode 100644 index 000000000..bc0bbc16e --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/util/DorisUrlUtilsTest.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.util; + +import org.junit.Assert; +import org.junit.Test; + +public class DorisUrlUtilsTest { + @Test + public void testEncodePathSegment() { + Assert.assertEquals("table_1", DorisUrlUtils.encodePathSegment("table_1")); + Assert.assertEquals( + "ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1", + DorisUrlUtils.encodePathSegment("ods_新券表_copy1")); + Assert.assertEquals("a%20b", DorisUrlUtils.encodePathSegment("a b")); + Assert.assertEquals("a%25b", DorisUrlUtils.encodePathSegment("a%b")); + } + + @Test + public void testBuildHttpUrl() { + Assert.assertEquals( + "http://127.0.0.1:8030/api/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_schema", + DorisUrlUtils.buildHttpUrl( + "127.0.0.1:8030", "api", "ods", "ods_新券表_copy1", "_schema")); + } +}