Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.doris.flink.rest.models.Tablet;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.doris.flink.util.DorisUrlUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
Expand Down Expand Up @@ -93,11 +94,6 @@ public class RestService implements Serializable {
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
private static final String FE_LOGIN = "/rest/v1/login";
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema";
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";
private static final String STATEMENT_EXEC_API =
"http://%s/api/query/default_cluster/information_schema";

/**
* send request to Doris FE and get response json string.
Expand Down Expand Up @@ -402,30 +398,32 @@ public static Schema getSchema(
throws DorisException {
logger.trace("Finding schema.");
String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger);
String tableSchemaUri;
String tableSchemaUri =
buildTableSchemaUri(randomEndpoint(options.getFenodes(), logger), tableIdentifier);
HttpGet httpGet = new HttpGet(tableSchemaUri);
String response = send(options, readOptions, httpGet, logger);
logger.debug("Find schema response is '{}'.", response);
return parseSchema(response, logger);
}

@VisibleForTesting
static String buildTableSchemaUri(String feNode, String... tableIdentifier)
throws IllegalArgumentException {
if (tableIdentifier.length == 2) {
tableSchemaUri =
String.format(
TABLE_SCHEMA_API,
randomEndpoint(options.getFenodes(), logger),
tableIdentifier[0],
tableIdentifier[1]);
return DorisUrlUtils.buildHttpUrl(
feNode, "api", tableIdentifier[0], tableIdentifier[1], "_schema");
} else if (tableIdentifier.length == 3) {
tableSchemaUri =
String.format(
CATALOG_TABLE_SCHEMA_API,
randomEndpoint(options.getFenodes(), logger),
tableIdentifier[0],
tableIdentifier[1],
tableIdentifier[2]);
return DorisUrlUtils.buildHttpUrl(
feNode,
"api",
tableIdentifier[0],
tableIdentifier[1],
tableIdentifier[2],
"_schema");
} else {
throw new IllegalArgumentException(
"table identifier is illegal, should be db.table or catalog.db.table");
}
HttpGet httpGet = new HttpGet(tableSchemaUri);
String response = send(options, readOptions, httpGet, logger);
logger.debug("Find schema response is '{}'.", response);
return parseSchema(response, logger);
}

@VisibleForTesting
Expand All @@ -435,11 +433,12 @@ public static Schema getSchema(
Object responseData = null;
try {
String tableSchemaUri =
String.format(
TABLE_SCHEMA_API,
DorisUrlUtils.buildHttpUrl(
randomEndpoint(dorisOptions.getFenodes(), logger),
"api",
db,
table);
table,
"_schema");
HttpGetWithEntity httpGet = new HttpGetWithEntity(tableSchemaUri);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader(dorisOptions));
JsonNode response = handleResponse(httpGet, logger);
Expand Down Expand Up @@ -484,7 +483,12 @@ public static Integer tryGetArrowFlightSqlPort(
Map<String, String> param = new HashMap<>();
param.put("stmt", "show frontends");
String requestUrl =
String.format(STATEMENT_EXEC_API, randomEndpoint(options.getFenodes(), logger));
DorisUrlUtils.buildHttpUrl(
randomEndpoint(options.getFenodes(), logger),
"api",
"query",
"default_cluster",
"information_schema");
HttpPost httpPost = new HttpPost(requestUrl);
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader(options));
httpPost.setHeader(
Expand Down Expand Up @@ -621,8 +625,7 @@ public static List<PartitionDefinition> findPartitions(
String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger);

String queryPlanUri =
String.format(
QUERY_PLAN_API,
buildQueryPlanUri(
randomEndpoint(options.getFenodes(), logger),
tableIdentifier[0],
tableIdentifier[1]);
Expand All @@ -648,6 +651,11 @@ public static List<PartitionDefinition> findPartitions(
logger);
}

@VisibleForTesting
static String buildQueryPlanUri(String feNode, String database, String table) {
return DorisUrlUtils.buildHttpUrl(feNode, "api", database, table, "_query_plan");
}

/**
* translate Doris FE response string to inner {@link QueryPlan} struct.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.doris.flink.util.DorisUrlUtils;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -90,7 +91,6 @@ public class DorisBatchStreamLoad implements Serializable {
private static final long STREAM_LOAD_MAX_ROWS = Integer.MAX_VALUE;
private final LabelGenerator labelGenerator;
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
private String loadUrl;
private String hostPort;
private final String username;
Expand Down Expand Up @@ -157,7 +157,7 @@ public DorisBatchStreamLoad(
Preconditions.checkState(
tableInfo.length == 2,
"tableIdentifier input error, the format is database.table");
this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]);
this.loadUrl = buildLoadUrl(hostPort, tableInfo[0], tableInfo[1]);
}
this.loadAsyncExecutor = new LoadAsyncExecutor(executionOptions.getFlushQueueSize());
this.loadExecutorService =
Expand Down Expand Up @@ -567,10 +567,15 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {

private void refreshLoadUrl(String database, String table) {
hostPort = backendUtil.getAvailableBackend(subTaskId);
loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, table);
loadUrl = buildLoadUrl(hostPort, database, table);
}
}

@VisibleForTesting
static String buildLoadUrl(String hostPort, String database, String table) {
return DorisUrlUtils.buildHttpUrl(hostPort, "api", database, table, "_stream_load");
}

static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.ResponseUtil;
import org.apache.doris.flink.util.DorisUrlUtils;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
Expand All @@ -52,7 +53,6 @@
/** The committer to commit transaction. */
public class DorisCommitter implements Committer<DorisCommittable>, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(DorisCommitter.class);
private static final String commitPattern = "http://%s/api/%s/_stream_load_2pc";
private final CloseableHttpClient httpClient;
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
Expand Down Expand Up @@ -116,7 +116,9 @@ private void commitTransaction(DorisCommittable committable) throws IOException
int retry = 0;
while (retry <= maxRetry) {
// get latest-url
String url = String.format(commitPattern, hostPort, committable.getDb());
String url =
DorisUrlUtils.buildHttpUrl(
hostPort, "api", committable.getDb(), "_stream_load_2pc");
HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();

// http execute...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.flink.sink.schema;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;

Expand All @@ -35,6 +36,7 @@
import org.apache.doris.flink.rest.models.Field;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.doris.flink.util.DorisUrlUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
Expand All @@ -56,9 +58,6 @@
public class SchemaChangeManager implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class);
private static final String CHECK_SCHEMA_CHANGE_API =
"http://%s/api/enable_light_schema_change/%s/%s";
private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
private ObjectMapper objectMapper = new ObjectMapper();
private DorisOptions dorisOptions;
private String charsetEncoding = "UTF-8";
Expand Down Expand Up @@ -209,12 +208,7 @@ public boolean checkSchemaChange(String database, String table, Map<String, Obje
if (CollectionUtil.isNullOrEmpty(params)) {
return false;
}
String requestUrl =
String.format(
CHECK_SCHEMA_CHANGE_API,
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG),
database,
table);
String requestUrl = buildCheckSchemaChangeUrl(database, table);
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(
Expand Down Expand Up @@ -277,11 +271,7 @@ public HttpPost buildHttpPost(String ddl, String database)
throws IllegalArgumentException, IOException {
Map<String, String> param = new HashMap<>();
param.put("stmt", ddl);
String requestUrl =
String.format(
SCHEMA_CHANGE_API,
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG),
database);
String requestUrl = buildSchemaChangeUrl(database);
HttpPost httpPost = new HttpPost(requestUrl);
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpPost.setHeader(
Expand All @@ -292,6 +282,27 @@ public HttpPost buildHttpPost(String ddl, String database)
return httpPost;
}

@VisibleForTesting
String buildCheckSchemaChangeUrl(String database, String table)
throws IllegalArgumentException {
return DorisUrlUtils.buildHttpUrl(
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG),
"api",
"enable_light_schema_change",
database,
table);
}

@VisibleForTesting
String buildSchemaChangeUrl(String database) throws IllegalArgumentException {
return DorisUrlUtils.buildHttpUrl(
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG),
"api",
"query",
"default_cluster",
database);
}

private String handleResponse(HttpUriRequest request) {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.LoadStatus;
import org.apache.doris.flink.sink.ResponseUtil;
import org.apache.doris.flink.util.DorisUrlUtils;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.InputStreamEntity;
Expand Down Expand Up @@ -77,8 +78,6 @@ public class DorisStreamLoad implements Serializable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final LabelGenerator labelGenerator;
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
public static final String JOB_EXIST_FINISHED = "FINISHED";

private String loadUrlStr;
Expand Down Expand Up @@ -114,8 +113,8 @@ public DorisStreamLoad(
this.user = dorisOptions.getUsername();
this.passwd = dorisOptions.getPassword();
this.labelGenerator = labelGenerator;
this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table);
this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
this.loadUrlStr = buildLoadUrl(hostPort);
this.abortUrlStr = buildAbortUrl(hostPort);
this.enable2PC = executionOptions.enabled2PC();
this.streamLoadProp = executionOptions.getStreamLoadProp();
this.enableDelete = executionOptions.getDeletable();
Expand Down Expand Up @@ -175,8 +174,16 @@ public byte[] getLineDelimiter() {

public void setHostPort(String hostPort) {
this.hostPort = hostPort;
this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, this.db, this.table);
this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
this.loadUrlStr = buildLoadUrl(hostPort);
this.abortUrlStr = buildAbortUrl(hostPort);
}

private String buildLoadUrl(String hostPort) {
return DorisUrlUtils.buildHttpUrl(hostPort, "api", db, table, "_stream_load");
}

private String buildAbortUrl(String hostPort) {
return DorisUrlUtils.buildHttpUrl(hostPort, "api", db, "_stream_load_2pc");
}

public Future<RespContent> getPendingLoadFuture() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.util;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;

public class DorisUrlUtils {
private DorisUrlUtils() {}

public static String buildHttpUrl(String hostPort, String... pathSegments) {
StringBuilder builder = new StringBuilder("http://").append(hostPort);
for (String pathSegment : pathSegments) {
builder.append('/').append(encodePathSegment(pathSegment));
}
return builder.toString();
}

public static String encodePathSegment(String segment) {
try {
return URLEncoder.encode(segment, StandardCharsets.UTF_8.name()).replace("+", "%20");
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException("UTF-8 is not supported.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ public void testParseIdentifierIllegal() throws Exception {
RestService.parseIdentifier(invalidIdentifier3, logger);
}

@Test
public void testBuildIdentifierPathUrls() throws Exception {
Assert.assertEquals(
"http://127.0.0.1:8030/api/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_schema",
RestService.buildTableSchemaUri("127.0.0.1:8030", "ods", "ods_新券表_copy1"));
Assert.assertEquals(
"http://127.0.0.1:8030/api/internal/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_schema",
RestService.buildTableSchemaUri(
"127.0.0.1:8030", "internal", "ods", "ods_新券表_copy1"));
Assert.assertEquals(
"http://127.0.0.1:8030/api/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_query_plan",
RestService.buildQueryPlanUri("127.0.0.1:8030", "ods", "ods_新券表_copy1"));
}

@Test
public void testChoiceFe() throws Exception {
String validFes = "1,2,3";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ public void testInit() {
options, readOptions, executionOptions, new LabelGenerator("xx", false), 0);
}

@Test
public void testBuildEncodedLoadUrl() {
Assert.assertEquals(
"http://127.0.0.1:8040/api/ods/ods_%E6%96%B0%E5%88%B8%E8%A1%A8_copy1/_stream_load",
DorisBatchStreamLoad.buildLoadUrl("127.0.0.1:8040", "ods", "ods_新券表_copy1"));
}

@Test
public void testLoadFail() throws Exception {
LOG.info("testLoadFail start");
Expand Down
Loading
Loading