Skip to content

Commit e35239e

Browse files
authored
[Pick] And timeout config and retry straegy (#529)
1 parent 7326a64 commit e35239e

4 files changed

Lines changed: 81 additions & 38 deletions

File tree

flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private static String send(DorisOptions options, DorisReadOptions readOptions, H
121121
try {
122122
String response;
123123
if (request instanceof HttpGet) {
124-
response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(), logger);
124+
response = getConnectionGet(request, options.getUsername(), options.getPassword(), logger);
125125
} else {
126126
response = getConnectionPost(request, options.getUsername(), options.getPassword(), logger);
127127
}
@@ -162,6 +162,8 @@ private static String getConnectionPost(HttpRequestBase request, String user, St
162162
String res = IOUtils.toString(content);
163163
conn.setDoOutput(true);
164164
conn.setDoInput(true);
165+
conn.setConnectTimeout(request.getConfig().getConnectTimeout());
166+
conn.setReadTimeout(request.getConfig().getSocketTimeout());
165167
PrintWriter out = new PrintWriter(conn.getOutputStream());
166168
// send request params
167169
out.print(res);
@@ -171,13 +173,15 @@ private static String getConnectionPost(HttpRequestBase request, String user, St
171173
return parseResponse(conn, logger);
172174
}
173175

174-
private static String getConnectionGet(String request, String user, String passwd, Logger logger) throws IOException {
175-
URL realUrl = new URL(request);
176+
private static String getConnectionGet(HttpRequestBase request, String user, String passwd, Logger logger) throws IOException {
177+
URL realUrl = new URL(request.getURI().toString());
176178
// open connection
177179
HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
178180
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
179181
connection.setRequestProperty("Authorization", "Basic " + authEncoding);
180182

183+
connection.setConnectTimeout(request.getConfig().getConnectTimeout());
184+
connection.setReadTimeout(request.getConfig().getSocketTimeout());
181185
connection.connect();
182186
return parseResponse(connection, logger);
183187
}
@@ -346,7 +350,7 @@ static List<BackendRow> parseBackend(String response, Logger logger) throws Dori
346350
* @throws IllegalArgumentException BE nodes is illegal
347351
*/
348352
@VisibleForTesting
349-
static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
353+
public static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
350354
String feNodes = options.getFenodes();
351355
List<String> feNodeList = allEndpoints(feNodes, logger);
352356
for (String feNode: feNodeList) {

flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,9 @@ public boolean isAlive() {
7070
public void setAlive(boolean alive) {
7171
isAlive = alive;
7272
}
73+
74+
public String toBackendString() {
75+
return ip + ":" + httpPort;
76+
}
7377
}
7478
}

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,16 @@
2323
import org.apache.doris.flink.exception.DorisException;
2424
import org.apache.doris.flink.exception.StreamLoadException;
2525
import org.apache.doris.flink.rest.RestService;
26+
import org.apache.doris.flink.rest.models.BackendV2;
2627
import org.apache.doris.flink.rest.models.Schema;
2728
import org.apache.flink.api.common.io.RichOutputFormat;
2829
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.runtime.util.ExecutorThreadFactory;
2931
import org.apache.flink.table.data.RowData;
3032
import org.apache.flink.table.types.DataType;
3133
import org.apache.flink.table.types.logical.LogicalType;
3234
import org.apache.flink.types.RowKind;
33-
import org.apache.flink.runtime.util.ExecutorThreadFactory;
35+
import org.apache.flink.util.CollectionUtil;
3436
import org.slf4j.Logger;
3537
import org.slf4j.LoggerFactory;
3638

@@ -87,7 +89,8 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
8789
private DorisExecutionOptions executionOptions;
8890
private DorisStreamLoad dorisStreamLoad;
8991
private String keysType;
90-
92+
private List<BackendV2.BackendRowV2> backends;
93+
private long pos = 0L;
9194
private transient volatile boolean closed = false;
9295
private transient ScheduledExecutorService scheduler;
9396
private transient ScheduledFuture<?> scheduledFuture;
@@ -105,11 +108,13 @@ public DorisDynamicOutputFormat(DorisOptions option,
105108
this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
106109
this.keysType = parseKeysType();
107110

111+
108112
handleStreamloadProp();
109113
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
110114
for (int i = 0; i < logicalTypes.length; i++) {
111115
fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
112116
}
117+
113118
}
114119

115120
/**
@@ -186,13 +191,15 @@ public void configure(Configuration configuration) {
186191

187192
@Override
188193
public void open(int taskNumber, int numTasks) throws IOException {
194+
this.backends = settingBackends();
189195
dorisStreamLoad = new DorisStreamLoad(
190-
getBackend(),
196+
backends.get(0).toBackendString(),
191197
options.getTableIdentifier().split("\\.")[0],
192198
options.getTableIdentifier().split("\\.")[1],
193199
options.getUsername(),
194200
options.getPassword(),
195-
executionOptions.getStreamLoadProp());
201+
executionOptions.getStreamLoadProp(),
202+
readOptions);
196203
LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
197204

198205
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
@@ -326,9 +333,9 @@ public synchronized void flush() throws IOException {
326333
throw new IOException(e);
327334
}
328335
try {
329-
dorisStreamLoad.setHostPort(getBackend());
336+
dorisStreamLoad.setHostPort(getAvailableBackend());
330337
LOG.warn("streamload error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e);
331-
Thread.sleep(1000 * i);
338+
Thread.sleep(1000L * ( i + 1 ));
332339
} catch (InterruptedException ex) {
333340
Thread.currentThread().interrupt();
334341
throw new IOException("unable to flush; interrupted while doing another attempt", e);
@@ -342,11 +349,35 @@ private String getBackend() throws IOException {
342349
//get be url from fe
343350
return RestService.randomBackend(options, readOptions, LOG);
344351
} catch (IOException | DorisException e) {
345-
LOG.error("get backends info fail");
352+
LOG.error("get backends info fail", e);
346353
throw new IOException(e);
347354
}
348355
}
349356

357+
private List<BackendV2.BackendRowV2> settingBackends(){
358+
try {
359+
List<BackendV2.BackendRowV2> backendsV2 = RestService.getBackendsV2(options, readOptions, LOG);
360+
if(CollectionUtil.isNullOrEmpty(backendsV2)){
361+
throw new RuntimeException("get no available backend.");
362+
}
363+
return backendsV2;
364+
} catch (Exception e) {
365+
LOG.error("get backends lists fail", e);
366+
throw new RuntimeException(e);
367+
}
368+
}
369+
370+
public String getAvailableBackend() {
371+
long tmp = pos + backends.size();
372+
while (pos < tmp) {
373+
BackendV2.BackendRowV2 backend =
374+
backends.get((int) (pos % backends.size()));
375+
pos++;
376+
return backend.toBackendString();
377+
}
378+
throw new RuntimeException("error cause no available backend.");
379+
}
380+
350381
/**
351382
* Builder for {@link DorisDynamicOutputFormat}.
352383
*/

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import org.apache.commons.codec.binary.Base64;
2222
import org.apache.commons.lang3.StringUtils;
23+
import org.apache.doris.flink.cfg.ConfigurationOptions;
24+
import org.apache.doris.flink.cfg.DorisReadOptions;
2325
import org.apache.doris.flink.exception.StreamLoadException;
2426
import org.apache.doris.flink.rest.models.RespContent;
2527
import org.apache.http.HttpHeaders;
28+
import org.apache.http.client.config.RequestConfig;
2629
import org.apache.http.client.methods.CloseableHttpResponse;
2730
import org.apache.http.client.methods.HttpPut;
2831
import org.apache.http.entity.StringEntity;
@@ -65,17 +68,9 @@ public class DorisStreamLoad implements Serializable {
6568
private String tbl;
6669
private String authEncoding;
6770
private Properties streamLoadProp;
68-
private final HttpClientBuilder httpClientBuilder = HttpClients
69-
.custom()
70-
.setRedirectStrategy(new DefaultRedirectStrategy() {
71-
@Override
72-
protected boolean isRedirectable(String method) {
73-
return true;
74-
}
75-
});
76-
private CloseableHttpClient httpClient;
71+
private final HttpClientBuilder httpClientBuilder;
7772

78-
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp) {
73+
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp, DorisReadOptions readOptions) {
7974
this.hostPort = hostPort;
8075
this.db = db;
8176
this.tbl = tbl;
@@ -84,7 +79,22 @@ public DorisStreamLoad(String hostPort, String db, String tbl, String user, Stri
8479
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
8580
this.authEncoding = basicAuthHeader(user, passwd);
8681
this.streamLoadProp = streamLoadProp;
87-
this.httpClient = httpClientBuilder.build();
82+
int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs();
83+
int socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs();
84+
this.httpClientBuilder = HttpClients
85+
.custom()
86+
.setRedirectStrategy(new DefaultRedirectStrategy() {
87+
@Override
88+
protected boolean isRedirectable(String method) {
89+
return true;
90+
}
91+
})
92+
.setDefaultRequestConfig(
93+
RequestConfig.custom()
94+
.setConnectTimeout(connectTimeout)
95+
.setConnectionRequestTimeout(connectTimeout)
96+
.setSocketTimeout(socketTimeout)
97+
.build());
8898
}
8999

90100
public String getLoadUrlStr() {
@@ -134,18 +144,20 @@ private LoadResponse loadBatch(String value) {
134144
StringEntity entity = new StringEntity(value, "UTF-8");
135145
put.setEntity(entity);
136146

137-
try (CloseableHttpResponse response = httpClient.execute(put)) {
138-
final int statusCode = response.getStatusLine().getStatusCode();
139-
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
140-
String loadResult = "";
141-
if (response.getEntity() != null) {
142-
loadResult = EntityUtils.toString(response.getEntity());
147+
try (CloseableHttpClient httpClient = httpClientBuilder.build()){
148+
try (CloseableHttpResponse response = httpClient.execute(put)) {
149+
final int statusCode = response.getStatusLine().getStatusCode();
150+
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
151+
String loadResult = "";
152+
if (response.getEntity() != null) {
153+
loadResult = EntityUtils.toString(response.getEntity());
154+
}
155+
return new LoadResponse(statusCode, reasonPhrase, loadResult);
143156
}
144-
return new LoadResponse(statusCode, reasonPhrase, loadResult);
145157
}
146158
} catch (Exception e) {
147159
String err = "failed to stream load data with label: " + label;
148-
LOG.warn(err, e);
160+
LOG.error(err, e);
149161
return new LoadResponse(-1, e.getMessage(), err);
150162
}
151163
}
@@ -157,14 +169,6 @@ private String basicAuthHeader(String username, String password) {
157169
}
158170

159171
public void close() throws IOException {
160-
if (null != httpClient) {
161-
try {
162-
httpClient.close();
163-
} catch (IOException e) {
164-
LOG.error("Closing httpClient failed.", e);
165-
throw new RuntimeException("Closing httpClient failed.", e);
166-
}
167-
}
168172
}
169173

170174
public static class LoadResponse {

0 commit comments

Comments
 (0)