Skip to content

Remove exception-mangling in connect/close listeners #127954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.netty.channel.Channel;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpResponse;

Expand All @@ -25,7 +25,7 @@
public class Netty4HttpChannel implements HttpChannel {

private final Channel channel;
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
private final SubscribableListener<Void> closeContext = new SubscribableListener<>();

Netty4HttpChannel(Channel channel) {
this.channel = channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.netty.channel.Channel;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.http.HttpServerChannel;

import java.net.InetSocketAddress;
Expand All @@ -22,7 +22,7 @@
public class Netty4HttpServerChannel implements HttpServerChannel {

private final Channel channel;
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
private final SubscribableListener<Void> closeContext = new SubscribableListener<>();

Netty4HttpServerChannel(Channel channel) {
this.channel = channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import io.netty.channel.ChannelOption;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.transport.TcpChannel;
Expand All @@ -30,16 +30,15 @@ public class Netty4TcpChannel implements TcpChannel {
private final Channel channel;
private final boolean isServer;
private final String profile;
private final ListenableFuture<Void> connectContext;
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
private final SubscribableListener<Void> connectContext = new SubscribableListener<>();
private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
private final ChannelStats stats = new ChannelStats();
private final boolean rstOnClose;

Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) {
this.channel = channel;
this.isServer = isServer;
this.profile = profile;
this.connectContext = new ListenableFuture<>();
this.rstOnClose = rstOnClose;
addListener(this.channel.closeFuture(), closeContext);
addListener(connectFuture, connectContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.netty.channel.Channel;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.transport.TcpServerChannel;

import java.net.InetSocketAddress;
Expand All @@ -22,7 +22,7 @@
public class Netty4TcpServerChannel implements TcpServerChannel {

private final Channel channel;
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
private final SubscribableListener<Void> closeContext = new SubscribableListener<>();

Netty4TcpServerChannel(Channel channel) {
this.channel = channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
package org.elasticsearch.transport;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.core.AbstractRefCounted;

import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -20,8 +20,8 @@
*/
public abstract class CloseableConnection extends AbstractRefCounted implements Transport.Connection {

private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
private final ListenableFuture<Void> removeContext = new ListenableFuture<>();
private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
private final SubscribableListener<Void> removeContext = new SubscribableListener<>();

private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean removed = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.AbstractRefCounted;
Expand All @@ -43,7 +43,7 @@ public class ClusterConnectionManager implements ConnectionManager {
private static final Logger logger = LogManager.getLogger(ClusterConnectionManager.class);

private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<DiscoveryNode, ListenableFuture<Transport.Connection>> pendingConnections = ConcurrentCollections
private final ConcurrentMap<DiscoveryNode, SubscribableListener<Transport.Connection>> pendingConnections = ConcurrentCollections
.newConcurrentMap();
private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete);

Expand Down Expand Up @@ -184,8 +184,8 @@ private void connectToNodeOrRetry(
return;
}

final ListenableFuture<Transport.Connection> currentListener = new ListenableFuture<>();
final ListenableFuture<Transport.Connection> existingListener = pendingConnections.putIfAbsent(node, currentListener);
final SubscribableListener<Transport.Connection> currentListener = new SubscribableListener<>();
final SubscribableListener<Transport.Connection> existingListener = pendingConnections.putIfAbsent(node, currentListener);
if (existingListener != null) {
try {
// wait on previous entry to complete connection attempt
Expand All @@ -203,7 +203,7 @@ private void connectToNodeOrRetry(
// extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending.
final Transport.Connection existingConnectionRecheck = connectedNodes.get(node);
if (existingConnectionRecheck != null) {
ListenableFuture<Transport.Connection> future = pendingConnections.remove(node);
var future = pendingConnections.remove(node);
assert future == currentListener : "Listener in pending map is different than the expected listener";
connectingRefCounter.decRef();
future.onResponse(existingConnectionRecheck);
Expand Down Expand Up @@ -257,7 +257,7 @@ private void connectToNodeOrRetry(
}
}
} finally {
ListenableFuture<Transport.Connection> future = pendingConnections.remove(node);
var future = pendingConnections.remove(node);
assert future == currentListener : "Listener in pending map is different than the expected listener";
managerRefs.decRef();
releaseOnce.run();
Expand Down Expand Up @@ -387,9 +387,9 @@ private void failConnectionListener(
DiscoveryNode node,
RunOnce releaseOnce,
Exception e,
ListenableFuture<Transport.Connection> expectedListener
SubscribableListener<Transport.Connection> expectedListener
) {
ListenableFuture<Transport.Connection> future = pendingConnections.remove(node);
final var future = pendingConnections.remove(node);
releaseOnce.run();
if (future != null) {
assert future == expectedListener : "Listener in pending map is different than the expected listener";
Expand Down