Skip to content

Commit 31e10ed

Browse files
authored
[Pick] Pick load banlance on each backend (#565)
1 parent e35239e commit 31e10ed

2 files changed

Lines changed: 14 additions & 5 deletions

File tree

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
9191
private String keysType;
9292
private List<BackendV2.BackendRowV2> backends;
9393
private long pos = 0L;
94+
private int subtaskId = 0;
9495
private transient volatile boolean closed = false;
9596
private transient ScheduledExecutorService scheduler;
9697
private transient ScheduledFuture<?> scheduledFuture;
@@ -191,16 +192,17 @@ public void configure(Configuration configuration) {
191192

192193
@Override
193194
public void open(int taskNumber, int numTasks) throws IOException {
195+
this.subtaskId = taskNumber;
194196
this.backends = settingBackends();
197+
String backend = getAvailableBackend();
195198
dorisStreamLoad = new DorisStreamLoad(
196-
backends.get(0).toBackendString(),
199+
backend,
197200
options.getTableIdentifier().split("\\.")[0],
198201
options.getTableIdentifier().split("\\.")[1],
199202
options.getUsername(),
200203
options.getPassword(),
201204
executionOptions.getStreamLoadProp(),
202205
readOptions);
203-
LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
204206

205207
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
206208
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output" +
@@ -321,6 +323,10 @@ public synchronized void flush() throws IOException {
321323
} else {
322324
result = String.join(this.lineDelimiter, batch);
323325
}
326+
327+
// refresh backend
328+
dorisStreamLoad.setHostPort(getAvailableBackend());
329+
324330
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
325331
try {
326332
dorisStreamLoad.load(result);
@@ -334,16 +340,18 @@ public synchronized void flush() throws IOException {
334340
}
335341
try {
336342
dorisStreamLoad.setHostPort(getAvailableBackend());
337-
LOG.warn("streamload error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e);
343+
LOG.warn("stream load error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e);
338344
Thread.sleep(1000L * ( i + 1 ));
339345
} catch (InterruptedException ex) {
340346
Thread.currentThread().interrupt();
341347
throw new IOException("unable to flush; interrupted while doing another attempt", e);
342348
}
343349
}
344350
}
351+
345352
}
346353

354+
@Deprecated
347355
private String getBackend() throws IOException {
348356
try {
349357
//get be url from fe
@@ -371,7 +379,7 @@ public String getAvailableBackend() {
371379
long tmp = pos + backends.size();
372380
while (pos < tmp) {
373381
BackendV2.BackendRowV2 backend =
374-
backends.get((int) (pos % backends.size()));
382+
backends.get((int) ((pos + subtaskId) % backends.size()));
375383
pos++;
376384
return backend.toBackendString();
377385
}

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void setHostPort(String hostPort) {
108108

109109
public void load(String value) throws StreamLoadException {
110110
LoadResponse loadResponse = loadBatch(value);
111-
LOG.info("Streamload Response:{}", loadResponse);
111+
LOG.info("stream load response:{}", loadResponse);
112112
if (loadResponse.status != 200) {
113113
throw new StreamLoadException("stream load error: " + loadResponse.respContent);
114114
} else {
@@ -133,6 +133,7 @@ private LoadResponse loadBatch(String value) {
133133
UUID.randomUUID().toString().replaceAll("-", ""));
134134
}
135135

136+
LOG.info("stream load started for {} on host {}", label, hostPort);
136137
try {
137138
HttpPut put = new HttpPut(loadUrlStr);
138139
put.setHeader(HttpHeaders.EXPECT, "100-continue");

0 commit comments

Comments
 (0)