Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Add changes here for all PR submitted to the 2.x branch.

### feature:

- [[#7674](https://github.com/apache/incubator-seata/pull/7674)] implement early rollback of global transactions when TM disconnects (#4422)
- [[#7485](https://github.com/apache/incubator-seata/pull/7485)] Add http request filter for seata-server
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] Reuse connection to merge branch transactions
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] upgrade HTTP client in common module to support HTTP/2
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

### feature:

- [[#7674](https://github.com/apache/incubator-seata/pull/7674)] 实现TM断开连接时全局事务的提前回滚功能 (#4422)
- [[#7485](https://github.com/apache/incubator-seata/pull/7485)] 给seata-server端的http请求添加过滤器
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] 复用连接合并分支事务
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] 升级 common 模块中的 HTTP 客户端以支持 HTTP/2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,11 @@ public interface ConfigurationKeys {
*/
String ENABLE_BRANCH_ASYNC_REMOVE = SERVER_PREFIX + SESSION_PREFIX + "enableBranchAsyncRemove";

/**
* The constant ENABLE_ROLLBACK_WHEN_DISCONNECT
*/
String ENABLE_ROLLBACK_WHEN_DISCONNECT = SERVER_PREFIX + "enableRollbackWhenDisconnect";

/**
* The constant SERVER_RAFT.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,11 @@ public interface DefaultValues {
*/
boolean DEFAULT_ENABLE_BRANCH_ASYNC_REMOVE = false;

/**
* the constant DEFAULT_ENABLE_ROLLBACK_WHEN_DISCONNECT
*/
boolean DEFAULT_ENABLE_ROLLBACK_WHEN_DISCONNECT = false;

/**
* The constant DEFAULT_DB_MAX_CONN.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc;

/**
* Handler for TM disconnection events
* to enable early rollback of global transactions
*/
public interface TMDisconnectHandler {

/**
* Handle TM disconnection and perform early rollback if enabled
*
* @param rpcContext the rpc context of the disconnected TM
*/
void handleTMDisconnect(RpcContext rpcContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.rpc.RemotingServer;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.TMDisconnectHandler;
import org.apache.seata.core.rpc.processor.Pair;
import org.apache.seata.core.rpc.processor.RemotingProcessor;
import org.slf4j.Logger;
Expand All @@ -55,6 +56,8 @@ public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting

private final NettyServerBootstrap serverBootstrap;

private TMDisconnectHandler tmDisconnectHandler;

@Override
public void init() {
super.init();
Expand All @@ -67,6 +70,15 @@ public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServ
serverBootstrap.setChannelHandlers(new ServerHandler());
}

/**
* Set TM disconnect handler
*
* @param tmDisconnectHandler the TM disconnect handler
*/
public void setTmDisconnectHandler(TMDisconnectHandler tmDisconnectHandler) {
this.tmDisconnectHandler = tmDisconnectHandler;
}

@Override
public Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp)
throws TimeoutException, IOException {
Expand Down Expand Up @@ -211,6 +223,15 @@ private void handleDisconnect(ChannelHandlerContext ctx) {
LOGGER.info(ipAndPort + " to server channel inactive.");
}
if (rpcContext != null && rpcContext.getClientRole() != null) {
// Handle TM disconnection for early rollback if it's a TM role
if (rpcContext.getClientRole() == NettyPoolKey.TransactionRole.TMROLE && tmDisconnectHandler != null) {
try {
tmDisconnectHandler.handleTMDisconnect(rpcContext);
} catch (Exception e) {
LOGGER.error("Error handling TM disconnect for channel: " + ctx.channel(), e);
}
}

rpcContext.release();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("remove channel:" + ctx.channel() + "context:" + rpcContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty;

import org.apache.seata.core.rpc.TMDisconnectHandler;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockitoAnnotations;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.Mockito.mock;

/**
* Test for TM disconnect handling in AbstractNettyRemotingServer
*/
public class AbstractNettyRemotingServerTMDisconnectTest {

private NettyRemotingServer server;

@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);

// Use real NettyRemotingServer like other tests
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
server = new NettyRemotingServer(executor);
}

@Test
void testTMDisconnectHandlerSetup() {
// Test that the TM disconnect handler can be set and retrieved
TMDisconnectHandler testHandler = mock(TMDisconnectHandler.class);

// Verify no exceptions are thrown when setting the handler
assertDoesNotThrow(() -> server.setTmDisconnectHandler(testHandler));
assertDoesNotThrow(() -> server.setTmDisconnectHandler(null));
assertDoesNotThrow(() -> server.setTmDisconnectHandler(testHandler));
}

@Test
void testTMDisconnectHandlerIntegration() {
// Test that handler is properly integrated in the AbstractNettyRemotingServer
// This verifies the new method exists and works
assertDoesNotThrow(() -> {
TMDisconnectHandler handler = mock(TMDisconnectHandler.class);
server.setTmDisconnectHandler(handler);
});
}
}
3 changes: 3 additions & 0 deletions script/config-center/config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ server.ratelimit.bucketTokenNumPerSecond = 999999
server.ratelimit.bucketTokenMaxNum = 999999
server.ratelimit.bucketTokenInitialNum = 999999

#Early rollback configuration
server.enableRollbackWhenDisconnect=false

server.http.filter.xss.keywords=["<script>", "</script>", "javascript:", "vbscript:"]

#Metrics configuration, only for the server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.seata.core.rpc.RemotingServer;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.TransactionMessageHandler;
import org.apache.seata.core.rpc.netty.AbstractNettyRemotingServer;
import org.apache.seata.core.rpc.netty.ChannelManager;
import org.apache.seata.core.rpc.netty.NettyRemotingServer;
import org.apache.seata.server.AbstractTCInboundHandler;
Expand Down Expand Up @@ -756,6 +757,13 @@ private void endSchedule(long delay) {
* Init.
*/
public void init() {
// Setup TM disconnect handler for early rollback
if (remotingServer instanceof AbstractNettyRemotingServer) {
DefaultTMDisconnectHandler tmDisconnectHandler = new DefaultTMDisconnectHandler(core);
((AbstractNettyRemotingServer) remotingServer).setTmDisconnectHandler(tmDisconnectHandler);
LOGGER.info("TM disconnect handler initialized for early rollback feature");
}

retryRollbacking.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking),
0,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.server.coordinator;

import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.DefaultValues;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.TMDisconnectHandler;
import org.apache.seata.server.session.GlobalSession;
import org.apache.seata.server.session.SessionHolder;
import org.apache.seata.server.session.SessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Objects;

/**
* Default implementation of TMDisconnectHandler.
* Handles TM disconnection and performs early rollback of global transactions
* when enabled via configuration.
*/
public class DefaultTMDisconnectHandler implements TMDisconnectHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTMDisconnectHandler.class);

private final Core core;

/**
* Constructor with core dependency
*
* @param core the transaction core
*/
public DefaultTMDisconnectHandler(Core core) {
this.core = core;
}

@Override
public void handleTMDisconnect(RpcContext rpcContext) {
if (rpcContext == null || rpcContext.getTransactionServiceGroup() == null) {
return;
}

// Check if early rollback is enabled
boolean enableRollbackWhenDisconnect = ConfigurationFactory.getInstance()
.getBoolean(
ConfigurationKeys.ENABLE_ROLLBACK_WHEN_DISCONNECT,
DefaultValues.DEFAULT_ENABLE_ROLLBACK_WHEN_DISCONNECT);

if (!enableRollbackWhenDisconnect) {
LOGGER.debug("Early rollback on TM disconnect is disabled");
return;
}

String transactionServiceGroup = rpcContext.getTransactionServiceGroup();
String applicationId = rpcContext.getApplicationId();

LOGGER.info(
"TM disconnected: transactionServiceGroup={}, applicationId={}, performing early rollback check",
transactionServiceGroup,
applicationId);

try {
// Find all global sessions in BEGIN status
Collection<GlobalSession> globalSessions =
SessionHolder.getRootSessionManager().allSessions();

int rollbackCount = 0;
for (GlobalSession globalSession : globalSessions) {
if (shouldRollbackSession(globalSession, rpcContext)) {
try {
LOGGER.info(
"Early rollback for TM disconnect: xid={}, transactionServiceGroup={}, applicationId={}",
globalSession.getXid(),
globalSession.getTransactionServiceGroup(),
globalSession.getApplicationId());

// Change status to TimeoutRollbacking
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacking);

// Perform rollback
core.doGlobalRollback(globalSession, false);
rollbackCount++;

} catch (TransactionException e) {
LOGGER.error(
"Failed to rollback transaction [{}] {} {}",
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message format is unclear with three consecutive placeholders without descriptive text. Consider a more descriptive format like 'Failed to rollback transaction [{}]: code={}, message={}'

Suggested change
"Failed to rollback transaction [{}] {} {}",
"Failed to rollback transaction [{}]: code={}, message={}",

Copilot uses AI. Check for mistakes.
globalSession.getXid(),
e.getCode(),
e.getMessage());
}
}
}

if (rollbackCount > 0) {
LOGGER.info(
"Early rollback completed for TM disconnect: transactionServiceGroup={}, rollbackCount={}",
transactionServiceGroup,
rollbackCount);
}

} catch (Exception e) {
LOGGER.error(
"Error during TM disconnect handling for transactionServiceGroup={}", transactionServiceGroup, e);
}
}

/**
* Get the session manager instance. Made public for testing purposes.
*
* @return the session manager
*/
public SessionManager getSessionManager() {
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is exposed solely for testing but exists in production code. Consider using package-private visibility instead of public, or use dependency injection to make the code more testable.

Suggested change
public SessionManager getSessionManager() {
SessionManager getSessionManager() {

Copilot uses AI. Check for mistakes.
return SessionHolder.getRootSessionManager();
}

/**
* Determine if a global session should be rolled back based on the disconnected TM context.
* Uses community-approved matching algorithm:
* 1. Primary: TransactionServiceGroup matching (vgroup approach)
* 2. Secondary: ApplicationId matching for additional safety
* 3. Only rollback sessions in BEGIN status
*
* @param globalSession the global session to check
* @param tmContext the disconnected TM context
* @return true if the session should be rolled back
*/
private boolean shouldRollbackSession(GlobalSession globalSession, RpcContext tmContext) {
// Only rollback sessions in BEGIN status
if (globalSession.getStatus() != GlobalStatus.Begin) {
return false;
}

// Primary identifier: TransactionServiceGroup (vgroup) matching
// This is the community consensus approach from Issue #4422
if (!Objects.equals(globalSession.getTransactionServiceGroup(), tmContext.getTransactionServiceGroup())) {
return false;
}

// Secondary safety check: ApplicationId matching when both are available
// This provides additional confidence to prevent false positives
if (globalSession.getApplicationId() != null && tmContext.getApplicationId() != null) {
return Objects.equals(globalSession.getApplicationId(), tmContext.getApplicationId());
}

// If applicationId is not available on both sides, trust the vgroup match
// This follows the community approach that vgroup is the primary identifier
return true;
}
}
Loading
Loading