Skip to content

Commit aac50e5

Browse files
authored
[server] Add logic to protect delete a not exists record for kv table (#31)
1 parent 96b108a commit aac50e5

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,9 @@ void testDeleteAndUpdateStmtOnPkTable() throws Exception {
458458
+ " primary key (a) not enforced"
459459
+ ")",
460460
tableName));
461+
// test delete without data.
462+
tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE a = 5").await();
463+
461464
List<String> insertValues =
462465
Arrays.asList(
463466
"(1, 3501, 'Beijing')",

fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ public LogAppendInfo putAsLeader(
260260
ValueDecoder valueDecoder =
261261
new ValueDecoder(readContext.getRowDecoder(schemaId));
262262

263+
int appendedRecordCount = 0;
263264
for (KvRecord kvRecord : kvRecords.records(readContext)) {
264265
byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
265266
KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
@@ -277,11 +278,13 @@ public LogAppendInfo putAsLeader(
277278
// if newRow is null, it means the row should be deleted
278279
if (newRow == null) {
279280
walBuilder.append(RowKind.DELETE, oldRow);
281+
appendedRecordCount += 1;
280282
kvPreWriteBuffer.delete(key, logOffset++);
281283
} else {
282284
// otherwise, it's a partial update, should produce -U,+U
283285
walBuilder.append(RowKind.UPDATE_BEFORE, oldRow);
284286
walBuilder.append(RowKind.UPDATE_AFTER, newRow);
287+
appendedRecordCount += 2;
285288
kvPreWriteBuffer.put(
286289
key,
287290
ValueEncoder.encodeValue(schemaId, newRow),
@@ -299,6 +302,7 @@ public LogAppendInfo putAsLeader(
299302
updateRow(oldRow, kvRecord.getRow(), partialUpdater);
300303
walBuilder.append(RowKind.UPDATE_BEFORE, oldRow);
301304
walBuilder.append(RowKind.UPDATE_AFTER, newRow);
305+
appendedRecordCount += 2;
302306
// logOffset is for -U, logOffset + 1 is for +U, we need to use
303307
// the log offset for +U
304308
kvPreWriteBuffer.put(
@@ -312,6 +316,7 @@ public LogAppendInfo putAsLeader(
312316
// of the input row are set to null.
313317
BinaryRow newRow = kvRecord.getRow();
314318
walBuilder.append(RowKind.INSERT, newRow);
319+
appendedRecordCount += 1;
315320
kvPreWriteBuffer.put(
316321
key,
317322
ValueEncoder.encodeValue(schemaId, newRow),
@@ -320,8 +325,21 @@ public LogAppendInfo putAsLeader(
320325
}
321326
}
322327

323-
// now, we can build the full log
324-
return logTablet.appendAsLeader(walBuilder.build());
328+
// if appendedRecordCount is 0, it means there is no record to append, we
329+
// should not append.
330+
if (appendedRecordCount > 0) {
331+
// now, we can build the full log.
332+
return logTablet.appendAsLeader(walBuilder.build());
333+
} else {
334+
return new LogAppendInfo(
335+
logEndOffsetOfPrevBatch - 1,
336+
logEndOffsetOfPrevBatch - 1,
337+
0L,
338+
0L,
339+
0,
340+
0,
341+
false);
342+
}
325343
} catch (Throwable t) {
326344
// While encounter error here, the CDC logs may fail writing to disk,
327345
// and the client probably will resend the batch. If we do not remove the

0 commit comments

Comments
 (0)