Skip to content

Commit 2bced75

Browse files
ziming-ai子茗
andauthored
修复mysql rds oss订阅本地binlog过期需要再次订阅oss (#5496)
* fix: fix rds oss server binlog expired * fix: fix rds oss server binlog expired * fix: fix rds oss server binlog expired --------- Co-authored-by: 子茗 <[email protected]>
1 parent 25501db commit 2bced75

File tree

3 files changed

+20
-2
lines changed

3 files changed

+20
-2
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.alibaba.otter.canal.parse.exception;
2+
import com.alibaba.otter.canal.common.CanalException;
3+
4+
public class ServerLogPurgedException extends CanalException {
5+
public ServerLogPurgedException(String errorCode) {
6+
super("ServerLogPurged0: " + errorCode);
7+
}
8+
}

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
import java.net.SocketTimeoutException;
66
import java.nio.channels.ClosedByInterruptException;
77

8+
import org.apache.commons.lang.StringUtils;
89
import org.slf4j.Logger;
910
import org.slf4j.LoggerFactory;
1011

1112
import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
1213
import com.taobao.tddl.dbsync.binlog.LogFetcher;
14+
import com.alibaba.otter.canal.parse.exception.ServerLogPurgedException;
1315

1416
/**
1517
* 基于socket的logEvent实现
@@ -99,6 +101,11 @@ public boolean fetch() throws IOException {
99101
final int errno = getInt16();
100102
String sqlstate = forward(1).getFixString(SQLSTATE_LENGTH);
101103
String errmsg = getFixString(limit - position);
104+
if (StringUtils.containsIgnoreCase(errmsg, "not find first log file name")
105+
|| StringUtils.containsIgnoreCase(errmsg, "purged binary logs")) {
106+
// 开始 dump 后,server 位点过期,DUMP 和 DUMP_GTID 两种错误信息
107+
throw new ServerLogPurgedException(errmsg);
108+
}
102109
throw new IOException("Received error packet:" + " errno = " + errno + ", sqlstate = " + sqlstate
103110
+ " errmsg = " + errmsg);
104111
} else if (mark == 254) {

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.apache.commons.lang.StringUtils;
77

88
import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
9+
import com.alibaba.otter.canal.parse.exception.ServerLogPurgedException;
910
import com.alibaba.otter.canal.parse.inbound.ParserExceptionHandler;
1011
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
1112

@@ -92,8 +93,10 @@ public void start() {
9293
}
9394

9495
private void handleMysqlParserException(Throwable throwable) {
95-
if (throwable instanceof PositionNotFoundException) {
96-
logger.info("remove rds not found position, try download rds binlog!");
96+
if (throwable instanceof PositionNotFoundException
97+
|| throwable instanceof ServerLogPurgedException) {
98+
logger.info("remove rds not found position, try download rds binlog! {} : {}",
99+
throwable.getClass().getSimpleName(), throwable.getMessage());
97100
executorService.execute(() -> {
98101
try {
99102
logger.info("stop mysql parser!");

0 commit comments

Comments
 (0)