Skip to content

Commit 9f2fc74

Browse files
committed
merge issue #5496 , 修复mysql rds oss订阅本地binlog过期需要再次订阅oss
1 parent 556d998 commit 9f2fc74

File tree

3 files changed

+12
-1
lines changed

3 files changed

+12
-1
lines changed

parse/src/main/java/com/alibaba/otter/canal/parse/exception/ServerLogPurgedException.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package com.alibaba.otter.canal.parse.exception;import com.alibaba.otter.canal.common.CanalException;public class ServerLogPurgedException extends CanalException { public ServerLogPurgedException(String errorCode){ super("ServerLogPurged by " + errorCode); }}

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
import java.net.SocketTimeoutException;
66
import java.nio.channels.ClosedByInterruptException;
77

8+
import org.apache.commons.lang.StringUtils;
89
import org.slf4j.Logger;
910
import org.slf4j.LoggerFactory;
1011

1112
import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
13+
import com.alibaba.otter.canal.parse.exception.ServerLogPurgedException;
1214
import com.taobao.tddl.dbsync.binlog.LogFetcher;
1315

1416
/**
@@ -99,6 +101,13 @@ public boolean fetch() throws IOException {
99101
final int errno = getInt16();
100102
String sqlstate = forward(1).getFixString(SQLSTATE_LENGTH);
101103
String errmsg = getFixString(limit - position);
104+
if (StringUtils.containsIgnoreCase(errmsg, "not find first log file name")
105+
|| StringUtils.containsIgnoreCase(errmsg, "purged binary logs")) {
106+
// 开始 dump 后,server 位点过期,DUMP 和 DUMP_GTID 两种错误信息
107+
throw new ServerLogPurgedException(
108+
" errno = " + errno + ", sqlstate = " + sqlstate + " errmsg = " + errmsg);
109+
}
110+
102111
throw new IOException("Received error packet:" + " errno = " + errno + ", sqlstate = " + sqlstate
103112
+ " errmsg = " + errmsg);
104113
} else if (mark == 254) {

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.apache.commons.lang.StringUtils;
77

88
import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
9+
import com.alibaba.otter.canal.parse.exception.ServerLogPurgedException;
910
import com.alibaba.otter.canal.parse.inbound.ParserExceptionHandler;
1011
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
1112

@@ -92,7 +93,7 @@ public void start() {
9293
}
9394

9495
private void handleMysqlParserException(Throwable throwable) {
95-
if (throwable instanceof PositionNotFoundException) {
96+
if (throwable instanceof PositionNotFoundException || throwable instanceof ServerLogPurgedException) {
9697
logger.info("remove rds not found position, try download rds binlog!");
9798
executorService.execute(() -> {
9899
try {

0 commit comments

Comments
 (0)