Skip to content

Commit 3a15d09

Browse files
committed
Revert "OF-3180: Refactor ConnectionCloseListener implementations for async dispatcher"
The improvement that was introduced by OF-3180 is redundant, now that a fix for OF-3176 is in place. This reverts commit 562ba69.
1 parent d93a643 commit 3a15d09

File tree

3 files changed

+33
-51
lines changed

3 files changed

+33
-51
lines changed

xmppserver/src/main/java/org/jivesoftware/openfire/SessionManager.java

Lines changed: 28 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2005-2008 Jive Software, 2017-2026 Ignite Realtime Foundation. All rights reserved.
2+
* Copyright (C) 2005-2008 Jive Software, 2017-2025 Ignite Realtime Foundation. All rights reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,7 +34,6 @@
3434
import org.jivesoftware.openfire.server.OutgoingSessionPromise;
3535
import org.jivesoftware.openfire.session.*;
3636
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
37-
import org.jivesoftware.openfire.spi.ConnectionManagerImpl;
3837
import org.jivesoftware.openfire.spi.ConnectionType;
3938
import org.jivesoftware.openfire.streammanagement.TerminationDelegate;
4039
import org.jivesoftware.util.JiveGlobals;
@@ -1404,29 +1403,27 @@ public CompletableFuture<Void> onConnectionClosing(Object handback)
14041403
return CompletableFuture.completedFuture(null);
14051404
}
14061405

1407-
Log.debug("Closing client session with address {} and streamID {} that does not have SM resume.", session.getAddress(), session.getStreamID());
1406+
CompletableFuture<Void> result = CompletableFuture.runAsync(() -> Log.debug("Closing client session with address {} and streamID {} that does not have SM resume.", session.getAddress(), session.getStreamID()));
14081407

1409-
try
1410-
{
1411-
if ((session.getPresence().isAvailable() || !session.wasAvailable()) && routingTable.hasClientRoute(session.getAddress()))
1412-
{
1413-
// Send an unavailable presence to the user's subscribers. This gives us a chance to send an
1414-
// unavailable presence to the entities that the user sent directed presences
1415-
final Presence presence = new Presence();
1416-
presence.setType(Presence.Type.unavailable);
1417-
presence.setFrom(session.getAddress());
1418-
router.route(presence);
1419-
}
1420-
} finally {
1408+
if ((session.getPresence().isAvailable() || !session.wasAvailable()) && routingTable.hasClientRoute(session.getAddress())) {
1409+
// Send an unavailable presence to the user's subscribers. This gives us a chance to send an
1410+
// unavailable presence to the entities that the user sent directed presences
1411+
final Presence presence = new Presence();
1412+
presence.setType(Presence.Type.unavailable);
1413+
presence.setFrom(session.getAddress());
1414+
1415+
result = result.thenRunAsync(() -> router.route(presence));
1416+
}
1417+
1418+
// In the completion stage remove the session (which means it'll be removed no matter if the previous stage had exceptions).
1419+
return result.whenComplete((v,t) -> {
14211420
try {
14221421
session.getStreamManager().onClose(router, serverAddress);
14231422
} finally {
1424-
// Ensure that the session is removed (no matter if the previous stage had exceptions).
1423+
// Note that the session can't be removed before the unavailable presence has been sent (as session-provided data is used by the broadcast).
14251424
removeSession(session);
14261425
}
1427-
}
1428-
1429-
return CompletableFuture.completedFuture(null);
1426+
});
14301427
}
14311428

14321429
@Override
@@ -1449,21 +1446,15 @@ public CompletableFuture<Void> onConnectionClosing(Object handback)
14491446
{
14501447
final LocalIncomingServerSession session = (LocalIncomingServerSession)handback;
14511448

1452-
Log.debug("Closing incoming server session with address {} and streamID {}.", session.getAddress(), session.getStreamID());
1453-
1454-
final ConnectionManagerImpl connectionManager = (ConnectionManagerImpl) XMPPServer.getInstance().getConnectionManager();
1455-
if (connectionManager == null) {
1456-
Log.info("No ConnectionManager available (server is likely shutting down). Skipping execution of close listener for incoming server session with address {} and streamID {}.", session.getAddress(), session.getStreamID());
1457-
return CompletableFuture.failedFuture(new IllegalStateException("No ConnectionManager available (server is likely shutting down)."));
1458-
}
1449+
CompletableFuture<Void> result = CompletableFuture.runAsync(() -> Log.debug("Closing incoming server session with address {} and streamID {}.", session.getAddress(), session.getStreamID()));
14591450

14601451
// Remove all the domains that were registered for this server session.
14611452
final Collection<CompletableFuture<Void>> tasks = new ArrayList<>();
1462-
for (final String domain : session.getValidatedDomains()) {
1463-
tasks.add(connectionManager.runConnectionEventTaskAsync(() -> unregisterIncomingServerSession(domain, session)));
1453+
for (String domain : session.getValidatedDomains()) {
1454+
tasks.add(CompletableFuture.runAsync(() -> unregisterIncomingServerSession(domain, session)));
14641455
}
14651456

1466-
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));
1457+
return result.thenCompose(e -> CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])));
14671458
}
14681459

14691460
@Override
@@ -1486,21 +1477,15 @@ public CompletableFuture<Void> onConnectionClosing(Object handback)
14861477
{
14871478
final OutgoingServerSession session = (OutgoingServerSession)handback;
14881479

1489-
Log.debug("Closing outgoing server session with address {} and streamID {}.", session.getAddress(), session.getStreamID());
1490-
1491-
final ConnectionManagerImpl connectionManager = (ConnectionManagerImpl) XMPPServer.getInstance().getConnectionManager();
1492-
if (connectionManager == null) {
1493-
Log.info("No ConnectionManager available (server is likely shutting down). Skipping execution of close listener for outgoing server session with address {} and streamID {}.", session.getAddress(), session.getStreamID());
1494-
return CompletableFuture.failedFuture(new IllegalStateException("No ConnectionManager available (server is likely shutting down)."));
1495-
}
1480+
CompletableFuture<Void> result = CompletableFuture.runAsync(() -> Log.debug("Closing outgoing server session with address {} and streamID {}.", session.getAddress(), session.getStreamID()));
14961481

14971482
// Remove all the domains that were registered for this server session.
14981483
final Collection<CompletableFuture<Void>> tasks = new ArrayList<>();
14991484
for (DomainPair domainPair : session.getOutgoingDomainPairs()) {
1500-
tasks.add(connectionManager.runConnectionEventTaskAsync(() -> server.getRoutingTable().removeServerRoute(domainPair)));
1485+
tasks.add(CompletableFuture.runAsync(() -> server.getRoutingTable().removeServerRoute(domainPair)));
15011486
}
15021487

1503-
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));
1488+
return result.thenCompose(e -> CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])));
15041489
}
15051490

15061491
@Override
@@ -1524,21 +1509,21 @@ public CompletableFuture<Void> onConnectionClosing(Object handback)
15241509
final ConnectionMultiplexerSession session = (ConnectionMultiplexerSession)handback;
15251510
final String domain = session.getAddress().getDomain();
15261511

1527-
Log.debug("Closing multiplexer session with address {} and streamID {}.", session.getAddress(), session.getStreamID());
1512+
CompletableFuture<Void> result = CompletableFuture.runAsync(() -> Log.debug("Closing multiplexer session with address {} and streamID {}.", session.getAddress(), session.getStreamID()));
15281513

15291514
// Remove all the domains that were registered for this server session
1530-
localSessionManager.getConnnectionManagerSessions().remove(session.getAddress().toString());
1515+
result = result.thenRunAsync(() -> localSessionManager.getConnnectionManagerSessions().remove(session.getAddress().toString()));
15311516

15321517
// Remove track of the cluster node hosting the CM connection
1533-
multiplexerSessionsCache.remove(session.getAddress().toString());
1518+
result = result.thenRunAsync(() -> multiplexerSessionsCache.remove(session.getAddress().toString()));
15341519

15351520
if (getConnectionMultiplexerSessions(domain).isEmpty()) {
15361521
// Terminate ClientSessions originated from this connection manager
15371522
// that are still active since the connection manager has gone down
1538-
ConnectionMultiplexerManager.getInstance().multiplexerUnavailable(domain);
1523+
result = result.thenRunAsync(() -> ConnectionMultiplexerManager.getInstance().multiplexerUnavailable(domain));
15391524
}
15401525

1541-
return CompletableFuture.completedFuture(null);
1526+
return result;
15421527
}
15431528

15441529
@Override

xmppserver/src/main/java/org/jivesoftware/openfire/net/ComponentStanzaHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2005-2008 Jive Software, 2017-2026 Ignite Realtime Foundation. All rights reserved.
2+
* Copyright (C) 2005-2008 Jive Software, 2017-2025 Ignite Realtime Foundation. All rights reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -109,8 +109,7 @@ else if (extraDomain.endsWith(initialDomain)) {
109109
componentSession.getConnection().registerCloseListener(new ConnectionCloseListener() {
110110
@Override
111111
public CompletableFuture<Void> onConnectionClosing(@Nullable Object handback) {
112-
InternalComponentManager.getInstance().removeComponent(subdomain, (ComponentSession.ExternalComponent) handback);
113-
return CompletableFuture.completedFuture(null);
112+
return CompletableFuture.runAsync(() -> InternalComponentManager.getInstance().removeComponent(subdomain, (ComponentSession.ExternalComponent) handback));
114113
}
115114
@Override
116115
public int getPriority() {

xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalComponentSession.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2005-2008 Jive Software, 2017-2026 Ignite Realtime Foundation. All rights reserved.
2+
* Copyright (C) 2005-2008 Jive Software, 2017-2025 Ignite Realtime Foundation. All rights reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -136,8 +136,7 @@ public static LocalComponentSession createSession(String serverName, XmlPullPars
136136
connection.registerCloseListener(new ConnectionCloseListener() {
137137
@Override
138138
public CompletableFuture<Void> onConnectionClosing(@Nullable Object handback) {
139-
SessionManager.getInstance().removeComponentSession((LocalComponentSession) handback);
140-
return CompletableFuture.completedFuture(null);
139+
return CompletableFuture.runAsync(() -> SessionManager.getInstance().removeComponentSession((LocalComponentSession) handback));
141140
}
142141
@Override
143142
public int getPriority() {
@@ -245,8 +244,7 @@ public boolean authenticate(String digest) {
245244
conn.registerCloseListener(new ConnectionCloseListener() {
246245
@Override
247246
public CompletableFuture<Void> onConnectionClosing(@Nullable Object handback) {
248-
InternalComponentManager.getInstance().removeComponent(defaultSubdomain, (ExternalComponent) handback);
249-
return CompletableFuture.completedFuture(null);
247+
return CompletableFuture.runAsync(() -> InternalComponentManager.getInstance().removeComponent(defaultSubdomain, (ExternalComponent) handback));
250248
}
251249
@Override
252250
public int getPriority() {

0 commit comments

Comments
 (0)