Skip to content

Commit 4957083

Browse files
committed
fixed issue #1897 , memory event store ack null
1 parent adccc0e commit 4957083

File tree

6 files changed

+50
-11
lines changed

6 files changed

+50
-11
lines changed

protocol/src/main/java/com/alibaba/otter/canal/protocol/position/PositionRange.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ public class PositionRange<T extends Position> implements Serializable {
1919
// add by ljh at 2012-09-05,用于记录一个可被ack的位置,保证每次提交到cursor中的位置是一个完整事务的结束
2020
private T ack;
2121
private T end;
22+
// add by ljh at 2019-06-25,用于精确记录ringbuffer中的位点
23+
private Long endSeq = -1L;
2224

2325
public PositionRange(){
2426
}
@@ -52,6 +54,14 @@ public void setAck(T ack) {
5254
this.ack = ack;
5355
}
5456

57+
public Long getEndSeq() {
58+
return endSeq;
59+
}
60+
61+
public void setEndSeq(Long endSeq) {
62+
this.endSeq = endSeq;
63+
}
64+
5565
@Override
5666
public String toString() {
5767
return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);

server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,8 +432,7 @@ public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerE
432432
}
433433

434434
// 可定时清理数据
435-
canalInstance.getEventStore().ack(positionRanges.getEnd());
436-
435+
canalInstance.getEventStore().ack(positionRanges.getEnd(), positionRanges.getEndSeq());
437436
}
438437

439438
/**

store/src/main/java/com/alibaba/otter/canal/store/CanalEventStore.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ Events<T> get(Position start, int batchSize, long timeout, TimeUnit unit) throws
7676
*/
7777
void ack(Position position) throws CanalStoreException;
7878

79+
/**
80+
* 删除指定seqId之前的数据
81+
*
82+
* @Since 1.1.4
83+
*/
84+
void ack(Position position, Long seqId) throws CanalStoreException;
85+
7986
/**
8087
* 出错时执行回滚操作(未提交ack的所有状态信息重新归位,减少出错时数据全部重来的成本)
8188
*/

store/src/main/java/com/alibaba/otter/canal/store/helper/CanalEventUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ public static LogPosition createPosition(Event event, boolean included) {
7070
position.setPosition(event.getPosition());
7171
position.setTimestamp(event.getExecuteTime());
7272
position.setIncluded(included);
73+
// add serverId at 2016-06-28
74+
position.setServerId(event.getServerId());
75+
// add gtid
76+
position.setGtid(event.getGtid());
7377

7478
LogPosition logPosition = new LogPosition();
7579
logPosition.setPostion(position);

store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ private Events<Event> doGet(Position start, int batchSize) throws CanalStoreExce
334334

335335
range.setStart(CanalEventUtils.createPosition(entrys.get(0)));
336336
range.setEnd(CanalEventUtils.createPosition(entrys.get(result.getEvents().size() - 1)));
337+
range.setEndSeq(end);
337338
// 记录一下是否存在可以被ack的点
338339

339340
for (int i = entrys.size() - 1; i >= 0; i--) {
@@ -369,9 +370,9 @@ public LogPosition getFirstPosition() throws CanalStoreException {
369370
return CanalEventUtils.createPosition(event, false);
370371
} else if (firstSeqeuence > INIT_SEQUENCE && firstSeqeuence < putSequence.get()) {
371372
// ack未追上put操作
372-
Event event = entries[getIndex(firstSeqeuence + 1)]; // 最后一次ack的位置数据
373-
// + 1
374-
return CanalEventUtils.createPosition(event, true);
373+
Event event = entries[getIndex(firstSeqeuence)]; // 最后一次ack的位置数据,需要移动到下一条,included
374+
// = false
375+
return CanalEventUtils.createPosition(event, false);
375376
} else if (firstSeqeuence > INIT_SEQUENCE && firstSeqeuence == putSequence.get()) {
376377
// 已经追上,store中没有数据
377378
Event event = entries[getIndex(firstSeqeuence)]; // 最后一次ack的位置数据,和last为同一条,included
@@ -410,10 +411,19 @@ public LogPosition getLatestPosition() throws CanalStoreException {
410411
}
411412

412413
public void ack(Position position) throws CanalStoreException {
413-
cleanUntil(position);
414+
cleanUntil(position, -1L);
414415
}
415416

417+
public void ack(Position position, Long seqId) throws CanalStoreException {
418+
cleanUntil(position, seqId);
419+
}
420+
421+
@Override
416422
public void cleanUntil(Position position) throws CanalStoreException {
423+
cleanUntil(position, -1L);
424+
}
425+
426+
public void cleanUntil(Position position, Long seqId) throws CanalStoreException {
417427
final ReentrantLock lock = this.lock;
418428
lock.lock();
419429
try {
@@ -425,15 +435,18 @@ public void cleanUntil(Position position) throws CanalStoreException {
425435
// ack没有list,但有已存在的foreach,还是节省一下list的开销
426436
long localExecTime = 0L;
427437
int deltaRows = 0;
438+
if (seqId > 0) {
439+
maxSequence = seqId;
440+
}
428441
for (long next = sequence + 1; next <= maxSequence; next++) {
429442
Event event = entries[getIndex(next)];
430443
if (localExecTime == 0 && event.getExecuteTime() > 0) {
431444
localExecTime = event.getExecuteTime();
432445
}
433446
deltaRows += event.getRowsCount();
434447
memsize += calculateSize(event);
435-
boolean match = CanalEventUtils.checkPosition(event, (LogPosition) position);
436-
if (match) {// 找到对应的position,更新ack seq
448+
if ((seqId < 0 || next == seqId) && CanalEventUtils.checkPosition(event, (LogPosition) position)) {
449+
// 找到对应的position,更新ack seq
437450
hasMatch = true;
438451

439452
if (batchMode.isMemSize()) {
@@ -442,6 +455,12 @@ public void cleanUntil(Position position) throws CanalStoreException {
442455
for (long index = sequence + 1; index < next; index++) {
443456
entries[getIndex(index)] = null;// 设置为null
444457
}
458+
459+
// 考虑getFirstPosition/getLastPosition会获取最后一次ack的position信息
460+
// ack清理的时候只处理entry=null,释放内存
461+
Event lastEvent = entries[getIndex(next)];
462+
lastEvent.setEntry(null);
463+
lastEvent.setRawEntry(null);
445464
}
446465

447466
if (ackSequence.compareAndSet(sequence, next)) {// 避免并发ack
@@ -681,5 +700,4 @@ public AtomicLong getAckTableRows() {
681700
return ackTableRows;
682701
}
683702

684-
685703
}

store/src/test/java/com/alibaba/otter/cancel/store/memory/buffer/MemoryEventStoreMemBatchTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,10 +300,11 @@ public void testAck() {
300300
first = eventStore.getFirstPosition();
301301
lastest = eventStore.getLatestPosition();
302302
List<Event> entrys = new ArrayList<Event>(entrys2.getEvents());
303-
Assert.assertEquals(first, entrys2.getPositionRange().getStart());
303+
// Assert.assertEquals(first, entrys2.getPositionRange().getStart());
304304
Assert.assertEquals(lastest, entrys2.getPositionRange().getEnd());
305305

306-
Assert.assertEquals(first, CanalEventUtils.createPosition(entrys.get(0)));
306+
// Assert.assertEquals(first,
307+
// CanalEventUtils.createPosition(entrys.get(0)));
307308
Assert.assertEquals(lastest, CanalEventUtils.createPosition(entrys.get(entrys.size() - 1)));
308309

309310
// 全部ack掉

0 commit comments

Comments
 (0)