Skip to content

Commit 1a9cbc2

Browse files
committed
Fix Doris HTTP URL encoding for non-ASCII identifiers
1 parent 0044826 commit 1a9cbc2

12 files changed

Lines changed: 319 additions & 54 deletions

File tree

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.doris.flink.rest.models.Tablet;
4444
import org.apache.doris.flink.sink.BackendUtil;
4545
import org.apache.doris.flink.sink.HttpGetWithEntity;
46+
import org.apache.doris.flink.util.DorisUrlUtils;
4647
import org.apache.http.HttpHeaders;
4748
import org.apache.http.HttpStatus;
4849
import org.apache.http.client.config.RequestConfig;
@@ -93,11 +94,6 @@ public class RestService implements Serializable {
9394
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
9495
private static final String FE_LOGIN = "/rest/v1/login";
9596
private static final ObjectMapper objectMapper = new ObjectMapper();
96-
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
97-
private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema";
98-
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";
99-
private static final String STATEMENT_EXEC_API =
100-
"http://%s/api/query/default_cluster/information_schema";
10197

10298
/**
10399
* send request to Doris FE and get response json string.
@@ -402,30 +398,32 @@ public static Schema getSchema(
402398
throws DorisException {
403399
logger.trace("Finding schema.");
404400
String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger);
405-
String tableSchemaUri;
401+
String tableSchemaUri =
402+
buildTableSchemaUri(randomEndpoint(options.getFenodes(), logger), tableIdentifier);
403+
HttpGet httpGet = new HttpGet(tableSchemaUri);
404+
String response = send(options, readOptions, httpGet, logger);
405+
logger.debug("Find schema response is '{}'.", response);
406+
return parseSchema(response, logger);
407+
}
408+
409+
@VisibleForTesting
410+
static String buildTableSchemaUri(String feNode, String... tableIdentifier)
411+
throws IllegalArgumentException {
406412
if (tableIdentifier.length == 2) {
407-
tableSchemaUri =
408-
String.format(
409-
TABLE_SCHEMA_API,
410-
randomEndpoint(options.getFenodes(), logger),
411-
tableIdentifier[0],
412-
tableIdentifier[1]);
413+
return DorisUrlUtils.buildHttpUrl(
414+
feNode, "api", tableIdentifier[0], tableIdentifier[1], "_schema");
413415
} else if (tableIdentifier.length == 3) {
414-
tableSchemaUri =
415-
String.format(
416-
CATALOG_TABLE_SCHEMA_API,
417-
randomEndpoint(options.getFenodes(), logger),
418-
tableIdentifier[0],
419-
tableIdentifier[1],
420-
tableIdentifier[2]);
416+
return DorisUrlUtils.buildHttpUrl(
417+
feNode,
418+
"api",
419+
tableIdentifier[0],
420+
tableIdentifier[1],
421+
tableIdentifier[2],
422+
"_schema");
421423
} else {
422424
throw new IllegalArgumentException(
423425
"table identifier is illegal, should be db.table or catalog.db.table");
424426
}
425-
HttpGet httpGet = new HttpGet(tableSchemaUri);
426-
String response = send(options, readOptions, httpGet, logger);
427-
logger.debug("Find schema response is '{}'.", response);
428-
return parseSchema(response, logger);
429427
}
430428

431429
@VisibleForTesting
@@ -435,11 +433,12 @@ public static Schema getSchema(
435433
Object responseData = null;
436434
try {
437435
String tableSchemaUri =
438-
String.format(
439-
TABLE_SCHEMA_API,
436+
DorisUrlUtils.buildHttpUrl(
440437
randomEndpoint(dorisOptions.getFenodes(), logger),
438+
"api",
441439
db,
442-
table);
440+
table,
441+
"_schema");
443442
HttpGetWithEntity httpGet = new HttpGetWithEntity(tableSchemaUri);
444443
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader(dorisOptions));
445444
JsonNode response = handleResponse(httpGet, logger);
@@ -484,7 +483,12 @@ public static Integer tryGetArrowFlightSqlPort(
484483
Map<String, String> param = new HashMap<>();
485484
param.put("stmt", "show frontends");
486485
String requestUrl =
487-
String.format(STATEMENT_EXEC_API, randomEndpoint(options.getFenodes(), logger));
486+
DorisUrlUtils.buildHttpUrl(
487+
randomEndpoint(options.getFenodes(), logger),
488+
"api",
489+
"query",
490+
"default_cluster",
491+
"information_schema");
488492
HttpPost httpPost = new HttpPost(requestUrl);
489493
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader(options));
490494
httpPost.setHeader(
@@ -621,8 +625,7 @@ public static List<PartitionDefinition> findPartitions(
621625
String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger);
622626

623627
String queryPlanUri =
624-
String.format(
625-
QUERY_PLAN_API,
628+
buildQueryPlanUri(
626629
randomEndpoint(options.getFenodes(), logger),
627630
tableIdentifier[0],
628631
tableIdentifier[1]);
@@ -648,6 +651,11 @@ public static List<PartitionDefinition> findPartitions(
648651
logger);
649652
}
650653

654+
@VisibleForTesting
655+
static String buildQueryPlanUri(String feNode, String database, String table) {
656+
return DorisUrlUtils.buildHttpUrl(feNode, "api", database, table, "_query_plan");
657+
}
658+
651659
/**
652660
* translate Doris FE response string to inner {@link QueryPlan} struct.
653661
*

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.doris.flink.sink.HttpPutBuilder;
3535
import org.apache.doris.flink.sink.HttpUtil;
3636
import org.apache.doris.flink.sink.writer.LabelGenerator;
37+
import org.apache.doris.flink.util.DorisUrlUtils;
3738
import org.apache.http.client.entity.GzipCompressingEntity;
3839
import org.apache.http.client.methods.CloseableHttpResponse;
3940
import org.apache.http.impl.client.CloseableHttpClient;
@@ -90,7 +91,6 @@ public class DorisBatchStreamLoad implements Serializable {
9091
private static final long STREAM_LOAD_MAX_ROWS = Integer.MAX_VALUE;
9192
private final LabelGenerator labelGenerator;
9293
private final byte[] lineDelimiter;
93-
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
9494
private String loadUrl;
9595
private String hostPort;
9696
private final String username;
@@ -157,7 +157,7 @@ public DorisBatchStreamLoad(
157157
Preconditions.checkState(
158158
tableInfo.length == 2,
159159
"tableIdentifier input error, the format is database.table");
160-
this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]);
160+
this.loadUrl = buildLoadUrl(hostPort, tableInfo[0], tableInfo[1]);
161161
}
162162
this.loadAsyncExecutor = new LoadAsyncExecutor(executionOptions.getFlushQueueSize());
163163
this.loadExecutorService =
@@ -567,10 +567,15 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
567567

568568
private void refreshLoadUrl(String database, String table) {
569569
hostPort = backendUtil.getAvailableBackend(subTaskId);
570-
loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, table);
570+
loadUrl = buildLoadUrl(hostPort, database, table);
571571
}
572572
}
573573

574+
@VisibleForTesting
575+
static String buildLoadUrl(String hostPort, String database, String table) {
576+
return DorisUrlUtils.buildHttpUrl(hostPort, "api", database, table, "_stream_load");
577+
}
578+
574579
static class DefaultThreadFactory implements ThreadFactory {
575580
private static final AtomicInteger poolNumber = new AtomicInteger(1);
576581
private final AtomicInteger threadNumber = new AtomicInteger(1);

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.doris.flink.sink.HttpPutBuilder;
3434
import org.apache.doris.flink.sink.HttpUtil;
3535
import org.apache.doris.flink.sink.ResponseUtil;
36+
import org.apache.doris.flink.util.DorisUrlUtils;
3637
import org.apache.http.StatusLine;
3738
import org.apache.http.client.methods.CloseableHttpResponse;
3839
import org.apache.http.client.methods.HttpPut;
@@ -52,7 +53,6 @@
5253
/** The committer to commit transaction. */
5354
public class DorisCommitter implements Committer<DorisCommittable>, Closeable {
5455
private static final Logger LOG = LoggerFactory.getLogger(DorisCommitter.class);
55-
private static final String commitPattern = "http://%s/api/%s/_stream_load_2pc";
5656
private final CloseableHttpClient httpClient;
5757
private final DorisOptions dorisOptions;
5858
private final DorisReadOptions dorisReadOptions;
@@ -116,7 +116,9 @@ private void commitTransaction(DorisCommittable committable) throws IOException
116116
int retry = 0;
117117
while (retry <= maxRetry) {
118118
// get latest-url
119-
String url = String.format(commitPattern, hostPort, committable.getDb());
119+
String url =
120+
DorisUrlUtils.buildHttpUrl(
121+
hostPort, "api", committable.getDb(), "_stream_load_2pc");
120122
HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();
121123

122124
// http execute...

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.doris.flink.sink.schema;
1919

20+
import org.apache.flink.annotation.VisibleForTesting;
2021
import org.apache.flink.util.CollectionUtil;
2122
import org.apache.flink.util.StringUtils;
2223

@@ -35,6 +36,7 @@
3536
import org.apache.doris.flink.rest.models.Field;
3637
import org.apache.doris.flink.rest.models.Schema;
3738
import org.apache.doris.flink.sink.HttpGetWithEntity;
39+
import org.apache.doris.flink.util.DorisUrlUtils;
3840
import org.apache.http.HttpHeaders;
3941
import org.apache.http.client.methods.CloseableHttpResponse;
4042
import org.apache.http.client.methods.HttpPost;
@@ -56,9 +58,6 @@
5658
public class SchemaChangeManager implements Serializable {
5759
private static final long serialVersionUID = 1L;
5860
private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class);
59-
private static final String CHECK_SCHEMA_CHANGE_API =
60-
"http://%s/api/enable_light_schema_change/%s/%s";
61-
private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
6261
private ObjectMapper objectMapper = new ObjectMapper();
6362
private DorisOptions dorisOptions;
6463
private String charsetEncoding = "UTF-8";
@@ -209,12 +208,7 @@ public boolean checkSchemaChange(String database, String table, Map<String, Obje
209208
if (CollectionUtil.isNullOrEmpty(params)) {
210209
return false;
211210
}
212-
String requestUrl =
213-
String.format(
214-
CHECK_SCHEMA_CHANGE_API,
215-
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG),
216-
database,
217-
table);
211+
String requestUrl = buildCheckSchemaChangeUrl(database, table);
218212
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
219213
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
220214
httpGet.setEntity(
@@ -277,11 +271,7 @@ public HttpPost buildHttpPost(String ddl, String database)
277271
throws IllegalArgumentException, IOException {
278272
Map<String, String> param = new HashMap<>();
279273
param.put("stmt", ddl);
280-
String requestUrl =
281-
String.format(
282-
SCHEMA_CHANGE_API,
283-
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG),
284-
database);
274+
String requestUrl = buildSchemaChangeUrl(database);
285275
HttpPost httpPost = new HttpPost(requestUrl);
286276
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
287277
httpPost.setHeader(
@@ -292,6 +282,27 @@ public HttpPost buildHttpPost(String ddl, String database)
292282
return httpPost;
293283
}
294284

285+
@VisibleForTesting
286+
String buildCheckSchemaChangeUrl(String database, String table)
287+
throws IllegalArgumentException {
288+
return DorisUrlUtils.buildHttpUrl(
289+
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG),
290+
"api",
291+
"enable_light_schema_change",
292+
database,
293+
table);
294+
}
295+
296+
@VisibleForTesting
297+
String buildSchemaChangeUrl(String database) throws IllegalArgumentException {
298+
return DorisUrlUtils.buildHttpUrl(
299+
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG),
300+
"api",
301+
"query",
302+
"default_cluster",
303+
database);
304+
}
305+
295306
private String handleResponse(HttpUriRequest request) {
296307
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
297308
CloseableHttpResponse response = httpclient.execute(request);

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.doris.flink.sink.HttpPutBuilder;
3636
import org.apache.doris.flink.sink.LoadStatus;
3737
import org.apache.doris.flink.sink.ResponseUtil;
38+
import org.apache.doris.flink.util.DorisUrlUtils;
3839
import org.apache.http.client.entity.GzipCompressingEntity;
3940
import org.apache.http.client.methods.CloseableHttpResponse;
4041
import org.apache.http.entity.InputStreamEntity;
@@ -77,8 +78,6 @@ public class DorisStreamLoad implements Serializable {
7778
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
7879
private final LabelGenerator labelGenerator;
7980
private final byte[] lineDelimiter;
80-
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
81-
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
8281
public static final String JOB_EXIST_FINISHED = "FINISHED";
8382

8483
private String loadUrlStr;
@@ -114,8 +113,8 @@ public DorisStreamLoad(
114113
this.user = dorisOptions.getUsername();
115114
this.passwd = dorisOptions.getPassword();
116115
this.labelGenerator = labelGenerator;
117-
this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table);
118-
this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
116+
this.loadUrlStr = buildLoadUrl(hostPort);
117+
this.abortUrlStr = buildAbortUrl(hostPort);
119118
this.enable2PC = executionOptions.enabled2PC();
120119
this.streamLoadProp = executionOptions.getStreamLoadProp();
121120
this.enableDelete = executionOptions.getDeletable();
@@ -175,8 +174,16 @@ public byte[] getLineDelimiter() {
175174

176175
public void setHostPort(String hostPort) {
177176
this.hostPort = hostPort;
178-
this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, this.db, this.table);
179-
this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
177+
this.loadUrlStr = buildLoadUrl(hostPort);
178+
this.abortUrlStr = buildAbortUrl(hostPort);
179+
}
180+
181+
private String buildLoadUrl(String hostPort) {
182+
return DorisUrlUtils.buildHttpUrl(hostPort, "api", db, table, "_stream_load");
183+
}
184+
185+
private String buildAbortUrl(String hostPort) {
186+
return DorisUrlUtils.buildHttpUrl(hostPort, "api", db, "_stream_load_2pc");
180187
}
181188

182189
public Future<RespContent> getPendingLoadFuture() {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.flink.util;
19+
20+
import java.io.UnsupportedEncodingException;
21+
import java.net.URLEncoder;
22+
import java.nio.charset.StandardCharsets;
23+
24+
public class DorisUrlUtils {
25+
private DorisUrlUtils() {}
26+
27+
public static String buildHttpUrl(String hostPort, String... pathSegments) {
28+
StringBuilder builder = new StringBuilder("http://").append(hostPort);
29+
for (String pathSegment : pathSegments) {
30+
builder.append('/').append(encodePathSegment(pathSegment));
31+
}
32+
return builder.toString();
33+
}
34+
35+
public static String encodePathSegment(String segment) {
36+
try {
37+
return URLEncoder.encode(segment, StandardCharsets.UTF_8.name()).replace("+", "%20");
38+
} catch (UnsupportedEncodingException e) {
39+
throw new IllegalStateException("UTF-8 is not supported.", e);
40+
}
41+
}
42+
}

flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,20 @@ public void testParseIdentifierIllegal() throws Exception {
128128
RestService.parseIdentifier(invalidIdentifier3, logger);
129129
}
130130

131+
@Test
132+
public void testBuildIdentifierPathUrls() throws Exception {
133+
Assert.assertEquals(
134+
"http://127.0.0.1:8030/api/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_schema",
135+
RestService.buildTableSchemaUri("127.0.0.1:8030", "ods", "ods_新券表_copy1"));
136+
Assert.assertEquals(
137+
"http://127.0.0.1:8030/api/internal/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_schema",
138+
RestService.buildTableSchemaUri(
139+
"127.0.0.1:8030", "internal", "ods", "ods_新券表_copy1"));
140+
Assert.assertEquals(
141+
"http://127.0.0.1:8030/api/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_query_plan",
142+
RestService.buildQueryPlanUri("127.0.0.1:8030", "ods", "ods_新券表_copy1"));
143+
}
144+
131145
@Test
132146
public void testChoiceFe() throws Exception {
133147
String validFes = "1,2,3";

flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ public void testInit() {
9090
options, readOptions, executionOptions, new LabelGenerator("xx", false), 0);
9191
}
9292

93+
@Test
94+
public void testBuildEncodedLoadUrl() {
95+
Assert.assertEquals(
96+
"http://127.0.0.1:8040/api/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_stream_load",
97+
DorisBatchStreamLoad.buildLoadUrl("127.0.0.1:8040", "ods", "ods_新券表_copy1"));
98+
}
99+
93100
@Test
94101
public void testLoadFail() throws Exception {
95102
LOG.info("testLoadFail start");

0 commit comments

Comments
 (0)