Skip to content

Commit 3d341f2

Browse files
committed
flink-connector: writeMode重命名为OnConflictAction
1 parent 93e16d3 commit 3d341f2

10 files changed

Lines changed: 49 additions & 29 deletions

File tree

hologres-connector-flink-1.15/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<dependency>
1717
<groupId>com.alibaba.hologres</groupId>
1818
<artifactId>hologres-connector-flink-1.15</artifactId>
19-
<version>1.5.0</version>
19+
<version>1.5.1</version>
2020
<classifier>jar-with-dependencies</classifier>
2121
</dependency>
2222
```

hologres-connector-flink-1.15/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<parent>
99
<groupId>com.alibaba.hologres</groupId>
1010
<artifactId>hologres-connector-parent</artifactId>
11-
<version>1.5.0-SNAPSHOT</version>
11+
<version>1.5.1-SNAPSHOT</version>
1212
</parent>
1313

1414
<artifactId>hologres-connector-flink-1.15</artifactId>

hologres-connector-flink-1.17/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<dependency>
1717
<groupId>com.alibaba.hologres</groupId>
1818
<artifactId>hologres-connector-flink-1.17</artifactId>
19-
<version>1.5.0</version>
19+
<version>1.5.1</version>
2020
<classifier>jar-with-dependencies</classifier>
2121
</dependency>
2222
```

hologres-connector-flink-1.17/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<parent>
99
<groupId>com.alibaba.hologres</groupId>
1010
<artifactId>hologres-connector-parent</artifactId>
11-
<version>1.5.0-SNAPSHOT</version>
11+
<version>1.5.1-SNAPSHOT</version>
1212
</parent>
1313

1414
<artifactId>hologres-connector-flink-1.17</artifactId>

hologres-connector-flink-base/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<parent>
99
<groupId>com.alibaba.hologres</groupId>
1010
<artifactId>hologres-connector-parent</artifactId>
11-
<version>1.5.0-SNAPSHOT</version>
11+
<version>1.5.1-SNAPSHOT</version>
1212
</parent>
1313

1414
<artifactId>hologres-connector-flink-base</artifactId>

hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/config/HologresConnectionParam.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.flink.configuration.ReadableConfig;
2222

2323
import com.alibaba.hologres.client.copy.CopyMode;
24-
import com.alibaba.hologres.client.model.WriteMode;
24+
import com.alibaba.hologres.client.model.OnConflictAction;
2525
import com.alibaba.hologres.client.model.checkandput.CheckAndPutCondition;
2626
import com.alibaba.ververica.connectors.hologres.jdbc.HologresJDBCConfigs;
2727
import com.alibaba.ververica.connectors.hologres.utils.HologresUtils;
@@ -63,7 +63,7 @@ public class HologresConnectionParam implements Serializable {
6363
private final int jdbcScanFetchSize;
6464
private final int jdbcScanTimeoutSeconds;
6565
// JDBC sink
66-
private final WriteMode writeMode;
66+
private final OnConflictAction writeMode;
6767
private final int jdbcWriteBatchSize;
6868
private final long jdbcWriteBatchByteSize;
6969
private final long jdbcWriteBatchTotalByteSize;
@@ -151,22 +151,22 @@ public HologresConnectionParam(ReadableConfig properties) {
151151
this.checkAndPutCondition = HologresUtils.getCheckAndPutCondition(properties);
152152
}
153153

154-
public static WriteMode getJDBCWriteMode(ReadableConfig tableProperties) {
155-
WriteMode writeMode = WriteMode.INSERT_OR_IGNORE;
154+
public static OnConflictAction getJDBCWriteMode(ReadableConfig tableProperties) {
155+
OnConflictAction writeMode = OnConflictAction.INSERT_OR_IGNORE;
156156
if (tableProperties.get(HologresConfigs.INSERT_OR_UPDATE)) {
157-
writeMode = WriteMode.INSERT_OR_UPDATE;
157+
writeMode = OnConflictAction.INSERT_OR_UPDATE;
158158
}
159159
if (tableProperties.getOptional(HologresConfigs.MUTATE_TYPE).isPresent()) {
160160
String mutateType = tableProperties.get(HologresConfigs.MUTATE_TYPE).toLowerCase();
161161
switch (mutateType) {
162162
case "insertorignore":
163-
writeMode = WriteMode.INSERT_OR_IGNORE;
163+
writeMode = OnConflictAction.INSERT_OR_IGNORE;
164164
break;
165165
case "insertorreplace":
166-
writeMode = WriteMode.INSERT_OR_REPLACE;
166+
writeMode = OnConflictAction.INSERT_OR_REPLACE;
167167
break;
168168
case "insertorupdate":
169-
writeMode = WriteMode.INSERT_OR_UPDATE;
169+
writeMode = OnConflictAction.INSERT_OR_UPDATE;
170170
break;
171171
default:
172172
throw new RuntimeException("Could not recognize mutate type " + mutateType);
@@ -307,7 +307,7 @@ public String getJdbcSharedConnectionPoolName() {
307307
return connectionPoolName;
308308
}
309309

310-
public WriteMode getJDBCWriteMode() {
310+
public OnConflictAction getJDBCWriteMode() {
311311
return this.writeMode;
312312
}
313313

hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/jdbc/HologresJDBCClientProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public HoloConfig generateHoloConfig() {
9090
holoConfig.setWriteMaxIntervalMs(param.getJdbcWriteFlushInterval());
9191
holoConfig.setUseLegacyPutHandler(param.isJdbcUseLegacyPutHandler());
9292
holoConfig.setEnableDefaultForNotNullColumn(param.getJdbcEnableDefaultForNotNullColumn());
93-
holoConfig.setWriteMode(param.getJDBCWriteMode());
93+
holoConfig.setOnConflictAction(param.getJDBCWriteMode());
9494
holoConfig.setEnableDeduplication(param.isEnableDeduplication());
9595
holoConfig.setRemoveU0000InTextColumnValue(param.isJdbcEnableRemoveU0000InText());
9696
holoConfig.setEnableAggressive(param.isEnableAggressive());

hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/jdbc/HologresJDBCReader.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
import java.util.List;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
2527

2628
/** An IO reader implementation for JDBC. */
2729
public class HologresJDBCReader<T> extends HologresReader<T> {
2830
private static final transient Logger LOG = LoggerFactory.getLogger(HologresJDBCReader.class);
2931
private final HologresRecordConverter<T, Record> recordConverter;
3032
private transient HologresJDBCClientProvider clientProvider;
3133
protected final boolean insertIfNotExists;
34+
protected ExecutorService callbackPool;
3235

3336
public HologresJDBCReader(
3437
String[] primaryKeys,
@@ -71,6 +74,7 @@ public void open(Integer subtaskIdx, Integer numSubtasks) throws IOException {
7174
if (insertIfNotExists) {
7275
LOG.info("Hologres dim table will insert new record if primary key does not exist.");
7376
}
77+
this.callbackPool = Executors.newFixedThreadPool(1);
7478
this.clientProvider = new HologresJDBCClientProvider(param);
7579
LOG.info(
7680
"Successfully initiated connection to database [{}] / table[{}]",
@@ -121,7 +125,8 @@ public CompletableFuture<T> asyncGet(T in) throws IOException {
121125
result.completeExceptionally(e);
122126
}
123127
return null;
124-
});
128+
},
129+
callbackPool);
125130
} catch (HoloClientException e) {
126131
result.completeExceptionally(e);
127132
}
@@ -154,22 +159,28 @@ public CompletableFuture<List<T>> asyncGetMany(T in) throws IOException {
154159
.asyncScan(scanBuilder.build())
155160
.handleAsync(
156161
(scanner, throwable) -> {
157-
List<T> result = new ArrayList<>();
158-
while (true) {
159-
try {
160-
if (!scanner.next()) {
162+
// caught an error
163+
if (throwable != null) {
164+
scanResult.completeExceptionally(throwable);
165+
} else {
166+
List<T> result = new ArrayList<>();
167+
while (true) {
168+
try {
169+
if (!scanner.next()) {
170+
break;
171+
}
172+
Record resultRecord = scanner.getRecord();
173+
result.add(recordConverter.convertTo(resultRecord));
174+
} catch (HoloClientException e) {
175+
scanResult.completeExceptionally(e);
161176
break;
162177
}
163-
Record resultRecord = scanner.getRecord();
164-
result.add(recordConverter.convertTo(resultRecord));
165-
} catch (HoloClientException e) {
166-
scanResult.completeExceptionally(e);
167-
break;
168178
}
179+
scanResult.complete(result);
169180
}
170-
scanResult.complete(result);
171181
return null;
172-
});
182+
},
183+
callbackPool);
173184
} catch (HoloClientException e) {
174185
throw new IOException(e);
175186
}

hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/jdbc/copy/HologresJDBCCopyWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@
4545
import java.util.stream.Collectors;
4646

4747
import static com.alibaba.hologres.client.copy.CopyUtil.buildCopyInSql;
48-
import static com.alibaba.hologres.client.model.WriteMode.INSERT_OR_IGNORE;
49-
import static com.alibaba.hologres.client.model.WriteMode.INSERT_OR_UPDATE;
48+
import static com.alibaba.hologres.client.model.OnConflictAction.INSERT_OR_IGNORE;
49+
import static com.alibaba.hologres.client.model.OnConflictAction.INSERT_OR_UPDATE;
5050
import static com.alibaba.ververica.connectors.hologres.utils.JDBCUtils.executeSql;
5151

5252
/** An IO writer implementation for JDBC. */

hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/sink/HologresOutputFormat.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,15 @@ public void sync() throws IOException {
107107
hologresIOClient.flush();
108108
LOG.info("end to wait request to finish");
109109
} catch (IOException e) {
110+
// When checkpointing, this sync will be called(see OutputFormatSinkFunction
111+
// snapshotState).
112+
// If this sync(flush) throws an exception due to dirty data or other reasons, the
113+
// checkpoint will fail. However, checkpoint has a failure-ignoring configuration
114+
// "execution.checkpointing.tolerable-failed-checkpoints".
115+
// If this failure is ignored, this batch of data may be lost. so if checkpoint
116+
// failed because write
117+
// error, we throw exception later, make sure the job will fail over successfully.
118+
exception = e;
110119
throw new IOException(e);
111120
}
112121
}

0 commit comments

Comments
 (0)