Skip to content

Commit d5d8a8f

Browse files
authored
fix: use overwrite instead of delete+append when updating offset backing store (memiiso#690)
* Use overwrite instead of delete+append * Formatting * Formatting
1 parent 81c1980 commit d5d8a8f

1 file changed

Lines changed: 95 additions & 86 deletions

File tree

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java

Lines changed: 95 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
package io.debezium.server.iceberg.offset;
1010

11+
import static org.apache.iceberg.types.Types.NestedField.optional;
12+
import static org.apache.iceberg.types.Types.NestedField.required;
13+
1114
import com.fasterxml.jackson.core.JsonProcessingException;
1215
import com.fasterxml.jackson.core.type.TypeReference;
1316
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -16,10 +19,26 @@
1619
import io.debezium.server.iceberg.IcebergUtil;
1720
import io.debezium.util.Strings;
1821
import jakarta.enterprise.context.Dependent;
22+
import java.io.File;
23+
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
import java.nio.charset.StandardCharsets;
26+
import java.nio.file.Files;
27+
import java.time.OffsetDateTime;
28+
import java.time.ZoneOffset;
29+
import java.util.Arrays;
30+
import java.util.Collection;
31+
import java.util.HashMap;
32+
import java.util.Map;
33+
import java.util.Set;
34+
import java.util.UUID;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Future;
37+
import java.util.concurrent.TimeUnit;
1938
import org.apache.iceberg.FileFormat;
39+
import org.apache.iceberg.OverwriteFiles;
2040
import org.apache.iceberg.Schema;
2141
import org.apache.iceberg.Table;
22-
import org.apache.iceberg.Transaction;
2342
import org.apache.iceberg.catalog.Catalog;
2443
import org.apache.iceberg.catalog.TableIdentifier;
2544
import org.apache.iceberg.data.GenericAppenderFactory;
@@ -42,38 +61,16 @@
4261
import org.slf4j.Logger;
4362
import org.slf4j.LoggerFactory;
4463

45-
import java.io.File;
46-
import java.io.IOException;
47-
import java.nio.ByteBuffer;
48-
import java.nio.charset.StandardCharsets;
49-
import java.nio.file.Files;
50-
import java.time.OffsetDateTime;
51-
import java.time.ZoneOffset;
52-
import java.util.Arrays;
53-
import java.util.Collection;
54-
import java.util.HashMap;
55-
import java.util.Map;
56-
import java.util.Set;
57-
import java.util.UUID;
58-
import java.util.concurrent.ExecutorService;
59-
import java.util.concurrent.Future;
60-
import java.util.concurrent.TimeUnit;
61-
62-
import static org.apache.iceberg.types.Types.NestedField.optional;
63-
import static org.apache.iceberg.types.Types.NestedField.required;
64-
65-
/**
66-
* Implementation of OffsetBackingStore that saves data to Iceberg table.
67-
*/
64+
/** Implementation of OffsetBackingStore that saves data to Iceberg table. */
6865
@Dependent
69-
public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implements OffsetBackingStore {
70-
71-
static final Schema OFFSET_STORAGE_TABLE_SCHEMA = new Schema(
72-
required(1, "id", Types.StringType.get()),
73-
optional(2, "offset_data", Types.StringType.get()),
74-
optional(3, "record_insert_ts", Types.TimestampType.withZone()
75-
)
76-
);
66+
public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore
67+
implements OffsetBackingStore {
68+
69+
static final Schema OFFSET_STORAGE_TABLE_SCHEMA =
70+
new Schema(
71+
required(1, "id", Types.StringType.get()),
72+
optional(2, "offset_data", Types.StringType.get()),
73+
optional(3, "record_insert_ts", Types.TimestampType.withZone()));
7774
protected static final ObjectMapper mapper = new ObjectMapper();
7875
public static final String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage.";
7976
private static final Logger LOG = LoggerFactory.getLogger(IcebergOffsetBackingStore.class);
@@ -87,14 +84,15 @@ public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implemen
8784
GenericAppenderFactory appenderFactory;
8885
OutputFileFactory fileFactory;
8986

90-
public IcebergOffsetBackingStore() {
91-
}
87+
public IcebergOffsetBackingStore() {}
9288

9389
@Override
9490
public void configure(WorkerConfig config) {
9591
super.configure(config);
9692

97-
storageConfig = new IcebergOffsetBackingStoreConfig(Configuration.from(config.originalsStrings()), CONFIGURATION_FIELD_PREFIX_STRING);
93+
storageConfig =
94+
new IcebergOffsetBackingStoreConfig(
95+
Configuration.from(config.originalsStrings()), CONFIGURATION_FIELD_PREFIX_STRING);
9896
icebergCatalog = storageConfig.icebergCatalog();
9997
tableFullName = storageConfig.tableFullName();
10098
tableId = storageConfig.tableIdentifier();
@@ -117,17 +115,17 @@ public synchronized void stop() {
117115
LOG.info("Stopped IcebergOffsetBackingStore table:{}", tableFullName);
118116
}
119117

120-
121118
/**
122-
* Shuts down an executor service in two phases, first by calling shutdown to reject incoming tasks,
123-
* and then calling shutdownNow, if necessary, to cancel any lingering tasks.
124-
* After the timeout/on interrupt, the service is forcefully closed.
119+
* Shuts down an executor service in two phases, first by calling shutdown to reject incoming
120+
* tasks, and then calling shutdownNow, if necessary, to cancel any lingering tasks. After the
121+
* timeout/on interrupt, the service is forcefully closed.
122+
*
125123
* @param executorService The service to shut down.
126124
* @param timeout The timeout of the shutdown.
127125
* @param timeUnit The time unit of the shutdown timeout.
128126
*/
129-
public static void shutdownExecutorServiceQuietly(ExecutorService executorService,
130-
long timeout, TimeUnit timeUnit) {
127+
public static void shutdownExecutorServiceQuietly(
128+
ExecutorService executorService, long timeout, TimeUnit timeUnit) {
131129
executorService.shutdown(); // Disable new tasks from being submitted
132130
try {
133131
// Wait a while for existing tasks to terminate
@@ -151,7 +149,8 @@ private void initializeTable() {
151149
offsetTable = icebergCatalog.loadTable(tableId);
152150
} else {
153151
LOG.debug("Creating table {} to store offset", tableFullName);
154-
offsetTable = IcebergUtil.createIcebergTable(icebergCatalog, tableId, OFFSET_STORAGE_TABLE_SCHEMA);
152+
offsetTable =
153+
IcebergUtil.createIcebergTable(icebergCatalog, tableId, OFFSET_STORAGE_TABLE_SCHEMA);
155154
if (!icebergCatalog.tableExists(tableId)) {
156155
throw new DebeziumException("Failed to create table " + tableId + " to store offset");
157156
}
@@ -168,7 +167,8 @@ private void initializeTable() {
168167
}
169168

170169
private void loadFileOffset(File file) {
171-
try (SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(file.toPath()))) {
170+
try (SafeObjectInputStream is =
171+
new SafeObjectInputStream(Files.newInputStream(file.toPath()))) {
172172
Object obj = is.readObject();
173173

174174
if (!(obj instanceof HashMap))
@@ -178,7 +178,8 @@ private void loadFileOffset(File file) {
178178
Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
179179
for (Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
180180
ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
181-
ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
181+
ByteBuffer value =
182+
(mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
182183
data.put(fromByteBuffer(key), fromByteBuffer(value));
183184
}
184185
} catch (IOException | ClassNotFoundException e) {
@@ -197,21 +198,29 @@ protected void save() {
197198
OffsetDateTime currentTs = OffsetDateTime.now(ZoneOffset.UTC);
198199

199200
GenericRecord record = GenericRecord.create(OFFSET_STORAGE_TABLE_SCHEMA);
200-
Record row = record.copy(
201-
"id", UUID.randomUUID().toString(),
202-
"offset_data", dataJson,
203-
"record_insert_ts", currentTs);
204-
205-
try (BaseTaskWriter<Record> writer = new UnpartitionedWriter<>(
206-
offsetTable.spec(), format, appenderFactory, fileFactory, offsetTable.io(), Long.MAX_VALUE)) {
201+
Record row =
202+
record.copy(
203+
"id", UUID.randomUUID().toString(),
204+
"offset_data", dataJson,
205+
"record_insert_ts", currentTs);
206+
207+
try (BaseTaskWriter<Record> writer =
208+
new UnpartitionedWriter<>(
209+
offsetTable.spec(),
210+
format,
211+
appenderFactory,
212+
fileFactory,
213+
offsetTable.io(),
214+
Long.MAX_VALUE)) {
207215
writer.write(row);
208216
writer.close();
209217
WriteResult files = writer.complete();
210218

211-
Transaction t = offsetTable.newTransaction();
212-
t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
213-
Arrays.stream(files.dataFiles()).forEach(f -> t.newAppend().appendFile(f).commit());
214-
t.commitTransaction();
219+
OverwriteFiles overwrite = offsetTable.newOverwrite();
220+
overwrite.overwriteByRowFilter(Expressions.alwaysTrue());
221+
Arrays.stream(files.dataFiles()).forEach(overwrite::addFile);
222+
overwrite.commit();
223+
215224
LOG.debug("Successfully saved offset data to iceberg table");
216225
}
217226

@@ -225,8 +234,7 @@ private void load() {
225234
String dataJsonString = null;
226235

227236
int rowNum = 0;
228-
try (CloseableIterable<Record> rs = IcebergGenerics.read(offsetTable)
229-
.build()) {
237+
try (CloseableIterable<Record> rs = IcebergGenerics.read(offsetTable).build()) {
230238
for (Record row : rs) {
231239
dataJsonString = (String) row.getField("offset_data");
232240
rowNum++;
@@ -235,51 +243,54 @@ private void load() {
235243
throw new RuntimeException(e);
236244
}
237245
if (rowNum > 1) {
238-
throw new DebeziumException("Failed recover offset data from iceberg, Found multiple offset row!");
246+
throw new DebeziumException(
247+
"Failed recover offset data from iceberg, Found multiple offset row!");
239248
}
240249

241250
if (dataJsonString != null) {
242-
this.data = mapper.readValue(dataJsonString, new TypeReference<>() {
243-
});
251+
this.data = mapper.readValue(dataJsonString, new TypeReference<>() {});
244252
LOG.debug("Loaded offset data {}", dataJsonString);
245253
}
246254
} catch (JsonProcessingException e) {
247-
// e.printStackTrace();
255+
// e.printStackTrace();
248256
throw new DebeziumException("Failed recover offset data from iceberg", e);
249257
}
250258
}
251259

252260
@Override
253-
public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values,
254-
final Callback<Void> callback) {
255-
return executor.submit(() -> {
256-
for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
257-
if (entry.getKey() == null) {
258-
continue;
259-
}
260-
data.put(fromByteBuffer(entry.getKey()), fromByteBuffer(entry.getValue()));
261-
}
262-
save();
263-
if (callback != null) {
264-
callback.onCompletion(null, null);
265-
}
266-
return null;
267-
});
261+
public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
262+
return executor.submit(
263+
() -> {
264+
for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
265+
if (entry.getKey() == null) {
266+
continue;
267+
}
268+
data.put(fromByteBuffer(entry.getKey()), fromByteBuffer(entry.getValue()));
269+
}
270+
save();
271+
if (callback != null) {
272+
callback.onCompletion(null, null);
273+
}
274+
return null;
275+
});
268276
}
269277

270278
@Override
271279
public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
272-
return executor.submit(() -> {
273-
Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
274-
for (ByteBuffer key : keys) {
275-
result.put(key, toByteBuffer(data.get(fromByteBuffer(key))));
276-
}
277-
return result;
278-
});
280+
return executor.submit(
281+
() -> {
282+
Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
283+
for (ByteBuffer key : keys) {
284+
result.put(key, toByteBuffer(data.get(fromByteBuffer(key))));
285+
}
286+
return result;
287+
});
279288
}
280289

281290
public static String fromByteBuffer(ByteBuffer data) {
282-
return (data != null) ? String.valueOf(StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer())) : null;
291+
return (data != null)
292+
? String.valueOf(StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer()))
293+
: null;
283294
}
284295

285296
public static ByteBuffer toByteBuffer(String data) {
@@ -289,6 +300,4 @@ public static ByteBuffer toByteBuffer(String data) {
289300
public Set<Map<String, Object>> connectorPartitions(String connectorName) {
290301
return null;
291302
}
292-
293-
294303
}

0 commit comments

Comments
 (0)