Skip to content

Move TCP connection to thread, fully unregister completed search #3348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 58 additions & 34 deletions core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -99,6 +101,22 @@ private class SearchedChannel
// Otherwise run risk of getting reply without being able
// to handle it
}

// Hash by channel name
@Override
public int hashCode()
{
return channel.getName().hashCode();
}

// Compare by channel name
@Override
public boolean equals(Object obj)
{
if (obj instanceof SearchedChannel other)
return other.channel.getName().equals(channel.getName());
return false;
}
}

// SearchedChannels are tracked in two data structures
Expand All @@ -119,7 +137,7 @@ private class SearchedChannel

/** Search buckets
*
* <p>The {@link #current_search_bucket} selects the list
* <p>The {@link #current_search_bucket} selects the set
* of channels to be searched by {@link #runSearches()},
* which runs roughly once per second, each time moving to
* the next search bucket in a ring buffer fashion.
Expand All @@ -138,7 +156,7 @@ private class SearchedChannel
* <p>Access to either {@link #search_buckets} or {@link #current_search_bucket}
* must SYNC on {@link #search_buckets}.
*/
private final ArrayList<LinkedList<SearchedChannel>> search_buckets = new ArrayList<>();
private final ArrayList<Set<SearchedChannel>> search_buckets = new ArrayList<>(MAX_SEARCH_PERIOD+2);

/** Index of current search bucket, i.e. the one about to be searched.
*
Expand All @@ -157,7 +175,7 @@ private class SearchedChannel
private final ClientUDPHandler udp;

/** Create ClientTCPHandler from IP address and 'tls' flag */
private final BiFunction<InetSocketAddress, Boolean, ClientTCPHandler> tcp_provider;
private final BiFunction<InetSocketAddress, Boolean, Future<ClientTCPHandler>> tcp_provider;

/** Buffer for assembling search messages */
private final ByteBuffer send_buffer = ByteBuffer.allocate(PVASettings.MAX_UDP_UNFRAGMENTED_SEND);
Expand All @@ -179,7 +197,7 @@ private class SearchedChannel
*/
public ChannelSearch(final ClientUDPHandler udp,
final List<AddressInfo> udp_addresses,
final BiFunction<InetSocketAddress, Boolean, ClientTCPHandler> tcp_provider,
final BiFunction<InetSocketAddress, Boolean, Future<ClientTCPHandler>> tcp_provider,
final List<AddressInfo> name_server_addresses) throws Exception
{
this.udp = udp;
Expand All @@ -188,7 +206,7 @@ public ChannelSearch(final ClientUDPHandler udp,
synchronized (search_buckets)
{
for (int i=0; i<MAX_SEARCH_PERIOD+2; ++i)
search_buckets.add(new LinkedList<>());
search_buckets.add(new HashSet<>());
}

// Searches sent to multicast (IPv4, IPv6) or broadcast addresses (IPv4) reach every PVA server
Expand Down Expand Up @@ -274,11 +292,11 @@ public PVAChannel unregister(final int channel_id)
if (searched != null)
{
logger.log(Level.FINE, () -> "Unregister search for " + searched.channel.getName() + " " + channel_id);
// NOT removing `searched` from all `search_buckets`.
// Removal would be a slow, linear operation.
// `runSearches()` will drop the channel from `search_buckets`
// because it's no longer listed in `searched_channels`

synchronized (search_buckets)
{
for (Set<SearchedChannel> bucket : search_buckets)
bucket.remove(searched);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LinkedList.remove() only removes the first occurrence of an element. Can an element occur multiple times in a bucket, and if so, should all the occurrences of the element be removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boost only adds it once, so needs to be like this in all places that add?

if (! bucket.contains(searched))
                        bucket.add(searched);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I wasn't aware it was checked when it was added.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's checked in one place, need to see if it's enforced in all places.

Copy link
Collaborator

@abrahamwolk abrahamwolk Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is perhaps outside the scope of this pull request, but if we change the type of bucket to be a Set [1], then we can likely get both

  1. More efficient search and removal of elements.
  2. Guarantee that elements occur only at most once.

If the order of insertion is important, an implementation like, e.g., LinkedHashSet could be used.

(Edited to update the link to point to the documentation of Java version 17.)

[1] https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/Set.html

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now a Set

}
return searched.channel;
}
return null;
Expand All @@ -301,9 +319,8 @@ public void boost()
logger.log(Level.FINE, () -> "Restart search for '" + searched.channel.getName() + "'");
synchronized (search_buckets)
{
final LinkedList<SearchedChannel> bucket = search_buckets.get(current_search_bucket.get());
if (! bucket.contains(searched))
bucket.add(searched);
final Set<SearchedChannel> bucket = search_buckets.get(current_search_bucket.get());
bucket.add(searched);
}
}
// Not sending search right now:
Expand All @@ -327,12 +344,11 @@ private void runSearches()
{
// Determine current search bucket
final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size());
final LinkedList<SearchedChannel> bucket = search_buckets.get(current);
final Set<SearchedChannel> bucket = search_buckets.get(current);
logger.log(Level.FINEST, () -> "Search bucket " + current);

// Remove searched channels from the current bucket
SearchedChannel sc;
while ((sc = bucket.poll()) != null)
for (SearchedChannel sc : bucket)
{
if (sc.channel.getState() == ClientChannelState.SEARCHING &&
searched_channels.containsKey(sc.channel.getCID()))
Expand All @@ -349,8 +365,8 @@ private void runSearches()
// in case that search bucket is quite full
final int i_n = (current + period) % search_buckets.size();
final int i_n_n = (i_n + 1) % search_buckets.size();
final LinkedList<SearchedChannel> next = search_buckets.get(i_n);
final LinkedList<SearchedChannel> next_next = search_buckets.get(i_n_n);
final Set<SearchedChannel> next = search_buckets.get(i_n);
final Set<SearchedChannel> next_next = search_buckets.get(i_n_n);
if (i_n == current || i_n_n == current)
throw new IllegalStateException("Current, next and nextnext search indices for " + sc.channel + " are " +
current + ", " + i_n + ", " + i_n_n);
Expand All @@ -362,6 +378,7 @@ private void runSearches()
else
logger.log(Level.FINE, "Dropping channel from search: " + sc.channel);
}
bucket.clear();
}


Expand Down Expand Up @@ -437,28 +454,35 @@ private void search(final Collection<SearchRequest.Channel> channels)
{
// For search via TCP, do we use plain TCP or do we send the search itself via TLS?
// This is configured in EPICS_PVA_NAME_SERVERS via prefix pvas://
final ClientTCPHandler tcp = tcp_provider.apply(name_server.getAddress(), name_server.isTLS());
final Future<ClientTCPHandler> create_tcp = tcp_provider.apply(name_server.getAddress(), name_server.isTLS());
final ClientTCPHandler tcp;
try
{
tcp = create_tcp.get();
}
catch (Exception ex)
{
logger.log(Level.WARNING, "Cannot obtain TCP handler to search " + name_server, ex);
continue;
}

// In case of connection errors (TCP connection blocked by firewall),
// tcp will be null
if (tcp != null)
final RequestEncoder search_request = (version, buffer) ->
{
final RequestEncoder search_request = (version, buffer) ->
{
logger.log(Level.FINE, () -> "Searching for " + channels + " via TCP " + tcp.getRemoteAddress());
logger.log(Level.FINE, () -> "Searching for " + channels + " via TCP " + tcp.getRemoteAddress());

// Search sequence identifies the potentially repeated UDP.
// TCP search is once only, so PVXS always sends 0x66696E64 = "find".
// We send "look" ("kool" for little endian).
final int seq = 0x6C6F6F6B;
// Search sequence identifies the potentially repeated UDP.
// TCP search is once only, so PVXS always sends 0x66696E64 = "find".
// We send "look" ("kool" for little endian).
final int seq = 0x6C6F6F6B;

// Use 'any' reply address since reply will be via this TCP socket
final InetSocketAddress response_address = new InetSocketAddress(0);
// Use 'any' reply address since reply will be via this TCP socket
final InetSocketAddress response_address = new InetSocketAddress(0);

SearchRequest.encode(true, seq, channels, response_address, tls , buffer);
};
tcp.submit(search_request);
}
SearchRequest.encode(true, seq, channels, response_address, tls , buffer);
};
tcp.submit(search_request);
}

// Shortcut UDP search, avoid log messages when lists are empty
Expand Down
110 changes: 77 additions & 33 deletions core/pva/src/main/java/org/epics/pva/client/PVAClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -58,7 +60,7 @@ public class PVAClient implements AutoCloseable
private final ConcurrentHashMap<Integer, PVAChannel> channels_by_id = new ConcurrentHashMap<>();

/** TCP handlers by server address */
private final ConcurrentHashMap<InetSocketAddress, ClientTCPHandler> tcp_handlers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<InetSocketAddress, Future<ClientTCPHandler>> tcp_handlers = new ConcurrentHashMap<>();

private final AtomicInteger request_ids = new AtomicInteger();

Expand Down Expand Up @@ -89,20 +91,24 @@ public PVAClient() throws Exception

// TCP traffic is handled by one ClientTCPHandler per address (IP, socket).
// Pass helper to channel search for getting such a handler.
final BiFunction<InetSocketAddress, Boolean, ClientTCPHandler> tcp_provider = (the_addr, use_tls) ->
final BiFunction<InetSocketAddress, Boolean, Future<ClientTCPHandler>> tcp_provider = (the_addr, use_tls) ->
tcp_handlers.computeIfAbsent(the_addr, addr ->
{
try
{
// If absent, create with initial empty GUID
return new ClientTCPHandler(this, addr, Guid.EMPTY, use_tls);
}
catch (Exception ex)
// If absent, create with initial empty GUID
final CompletableFuture<ClientTCPHandler> create_tcp = new CompletableFuture<>();
create_tcp.completeAsync(() ->
{
logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex);
}
return null;

try
{
return new ClientTCPHandler(this, addr, Guid.EMPTY, use_tls);
}
catch (Exception ex)
{
logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex);
}
return null;
});
return create_tcp;
});
search = new ChannelSearch(udp, udp_search_addresses, tcp_provider, name_server_addresses);

Expand Down Expand Up @@ -250,32 +256,52 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server,
channel.setState(ClientChannelState.FOUND);
logger.log(Level.FINE, () -> "Reply for " + channel + " from " + (tls ? "TLS " : "TCP ") + server + " " + guid);

final ClientTCPHandler tcp = tcp_handlers.computeIfAbsent(server, addr ->
// TCP connection can be slow, especially when blocked by firewall, so move to thread
Thread.ofVirtual()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One alternative to this way of implementing the functionality, would be to create the thread only for the computation that computes the value of the Future. That way, there will be fewer threads.

I'm thinking of something along the lines of:

final Future<ClientTCPHandler> tcp_future = tcp_handlers.computeIfAbsent(server, addr ->
            {
                final CompletableFuture<ClientTCPHandler> create_tcp = new CompletableFuture<>();

                // Attempt the TCP connection on a separate virtual thread:
                Thread.ofVirtual().name("TCP connect " + server)
                                  .start(() ->
                        {
                            try {
                                var client_tcp_handler = new ClientTCPHandler(this, addr, guid, tls);
                                create_tcp.complete(client_tcp_handler);
                            }
                            catch (Exception ex)
                            {
                                logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex);
                            }
                            create_tcp.complete(null);
                        });
                
                return create_tcp;
            });

This is just an idea to discuss. (Also, I have not compiled this code, it's just a sketch.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Go ahead and update the branch like that, since this also addresses your other concern about CompletableFuture.completeAsync using OS threads

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented this idea now and pushed the implementation to the branch of this pull request. (Commit: f982551)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My commit f982551 was based on wrong reasoning: the outer thread is still needed, since otherwise the logic blocks when calling tcp_future.get(). I have fixed this with the commit 97dc5f7 by adding back the outer thread again.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the conclusion is that it doesn't seem that we can avoid creating many threads this way, but at least we can create virtual threads instead of OS-level threads.

.name("TCP connect " + server)
.start(() ->
{
final Future<ClientTCPHandler> tcp_future = tcp_handlers.computeIfAbsent(server, addr ->
{
final CompletableFuture<ClientTCPHandler> create_tcp = new CompletableFuture<>();
create_tcp.completeAsync(() ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, will this not spawn an OS-level thread? The documentation of CompletableFuture.completeAsync() [1] states:

Completes this CompletableFuture with the result of the given Supplier function invoked from an asynchronous task using the default executor.

[1] https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/CompletableFuture.html#completeAsync(java.util.function.Supplier)

{
try
{
return new ClientTCPHandler(this, addr, guid, tls);
}
catch (Exception ex)
{
logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex);
}
return null;
});
return create_tcp;
});
ClientTCPHandler tcp;
try
{
return new ClientTCPHandler(this, addr, guid, tls);
tcp = tcp_future.get();
}
catch (Exception ex)
{
logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex);
logger.log(Level.WARNING, "Cannot connect to " + server, ex);
tcp = null;
}
// In case of connection errors, tcp will be null
if (tcp == null)
{ // Cannot connect to server on provided port? Likely a server or firewall problem.
// On the next search, that same server might reply and then we fail the same way on connect.
// Still, no way around re-registering the search so we succeed once the server is fixed.
search.register(channel, false /* not "now" but eventually */);
}
else
{
if (tcp.updateGuid(guid))
logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp);
channel.registerWithServer(tcp);
}
return null;
});
// In case of connection errors, tcp will be null
if (tcp == null)
{ // Cannot connect to server on provided port? Likely a server or firewall problem.
// On the next search, that same server might reply and then we fail the same way on connect.
// Still, no way around re-registering the search so we succeed once the server is fixed.
search.register(channel, false /* not "now" but eventually */);
}
else
{
if (tcp.updateGuid(guid))
logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp);

channel.registerWithServer(tcp);
}
}

/** Called by {@link ClientTCPHandler} when connection is lost or closed because unused
Expand All @@ -288,7 +314,18 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server,
void shutdownConnection(final ClientTCPHandler tcp)
{
// Forget this connection
final ClientTCPHandler removed = tcp_handlers.remove(tcp.getRemoteAddress());
final Future<ClientTCPHandler> tcp_future = tcp_handlers.remove(tcp.getRemoteAddress());
final ClientTCPHandler removed;
try
{
removed = tcp_future == null ? null : tcp_future.get();
}
catch (Exception ex)
{
logger.log(Level.WARNING, "Cannot obtain TCP client to close for " + tcp, ex);
return;
}

if (removed != tcp)
logger.log(Level.WARNING, "Closed unknown " + tcp, new Exception("Call stack"));

Expand Down Expand Up @@ -352,8 +389,15 @@ public void close()
}

// Stop TCP and UDP threads
for (ClientTCPHandler handler : tcp_handlers.values())
handler.close(true);
for (Future<ClientTCPHandler> handler : tcp_handlers.values())
try
{
handler.get().close(true);
}
catch (Exception ex)
{
logger.log(Level.WARNING, "PVA Client error getting channel to close", ex);
}

udp.close();
}
Expand Down
Loading