diff --git a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java index 6d8c89415f..2def8dd82a 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java +++ b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java @@ -13,11 +13,13 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Set; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -99,6 +101,22 @@ private class SearchedChannel // Otherwise run risk of getting reply without being able // to handle it } + + // Hash by channel name + @Override + public int hashCode() + { + return channel.getName().hashCode(); + } + + // Compare by channel name + @Override + public boolean equals(Object obj) + { + if (obj instanceof SearchedChannel other) + return other.channel.getName().equals(channel.getName()); + return false; + } } // SearchedChannels are tracked in two data structures @@ -115,11 +133,11 @@ private class SearchedChannel // up to MAX_SEARCH_PERIOD. /** Map of searched channels by channel ID */ - private ConcurrentHashMap searched_channels = new ConcurrentHashMap<>(); + private HashMap searched_channels = new HashMap<>(); /** Search buckets * - *

The {@link #current_search_bucket} selects the list + *

The {@link #current_search_bucket} selects the set * of channels to be searched by {@link #runSearches()}, * which runs roughly once per second, each time moving to * the next search bucket in a ring buffer fashion. @@ -136,13 +154,13 @@ private class SearchedChannel * which would result in an endless loop. * *

Access to either {@link #search_buckets} or {@link #current_search_bucket} - * must SYNC on {@link #search_buckets}. + * must only occur in a 'synchronized' method. */ - private final ArrayList> search_buckets = new ArrayList<>(); + private final ArrayList> search_buckets = new ArrayList<>(MAX_SEARCH_PERIOD+2); /** Index of current search bucket, i.e. the one about to be searched. * - *

Access must SYNC on {@link #search_buckets}. + *

Access must only occur in a 'synchronized' method. */ private final AtomicInteger current_search_bucket = new AtomicInteger(); @@ -157,7 +175,7 @@ private class SearchedChannel private final ClientUDPHandler udp; /** Create ClientTCPHandler from IP address and 'tls' flag */ - private final BiFunction tcp_provider; + private final BiFunction> tcp_provider; /** Buffer for assembling search messages */ private final ByteBuffer send_buffer = ByteBuffer.allocate(PVASettings.MAX_UDP_UNFRAGMENTED_SEND); @@ -179,17 +197,15 @@ private class SearchedChannel */ public ChannelSearch(final ClientUDPHandler udp, final List udp_addresses, - final BiFunction tcp_provider, + final BiFunction> tcp_provider, final List name_server_addresses) throws Exception { this.udp = udp; this.tcp_provider = tcp_provider; - synchronized (search_buckets) - { - for (int i=0; i()); - } + + for (int i = 0; i < MAX_SEARCH_PERIOD + 2; ++i) + search_buckets.add(new HashSet<>()); // Searches sent to multicast (IPv4, IPv6) or broadcast addresses (IPv4) reach every PVA server // on that multicast group or bcast subnet. @@ -242,7 +258,7 @@ public void start() /** @param channel Channel that should be searched * @param now Start searching as soon as possible, or delay? */ - public void register(final PVAChannel channel, final boolean now) + public synchronized void register(final PVAChannel channel, final boolean now) { logger.log(Level.FINE, () -> "Register search for " + channel + (now ? " now" : " soon")); @@ -252,13 +268,11 @@ public void register(final PVAChannel channel, final boolean now) final SearchedChannel sc = searched_channels.computeIfAbsent(channel.getCID(), id -> new SearchedChannel(channel)); - synchronized (search_buckets) - { - int bucket = current_search_bucket.get(); - if (!now) - bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size(); - search_buckets.get(bucket).add(sc); - } + int bucket = current_search_bucket.get(); + if (!now) + bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size(); + search_buckets.get(bucket).add(sc); + // Jumpstart search instead of waiting up to ~1 second for current bucket to be handled if (now) timer.execute(this::runSearches); @@ -268,17 +282,14 @@ public void register(final PVAChannel channel, final boolean now) * @param channel_id * @return {@link PVAChannel}, null when channel wasn't searched any more */ - public PVAChannel unregister(final int channel_id) + public synchronized PVAChannel unregister(final int channel_id) { final SearchedChannel searched = searched_channels.remove(channel_id); if (searched != null) { logger.log(Level.FINE, () -> "Unregister search for " + searched.channel.getName() + " " + channel_id); - // NOT removing `searched` from all `search_buckets`. - // Removal would be a slow, linear operation. - // `runSearches()` will drop the channel from `search_buckets` - // because it's no longer listed in `searched_channels` - + for (Set bucket : search_buckets) + bucket.remove(searched); return searched.channel; } return null; @@ -288,7 +299,7 @@ public PVAChannel unregister(final int channel_id) * *

Resets their search counter so they're searched "real soon". */ - public void boost() + public synchronized void boost() { for (SearchedChannel searched : searched_channels.values()) { @@ -299,12 +310,9 @@ public void boost() if (period == MIN_SEARCH_PERIOD) { logger.log(Level.FINE, () -> "Restart search for '" + searched.channel.getName() + "'"); - synchronized (search_buckets) - { - final LinkedList bucket = search_buckets.get(current_search_bucket.get()); - if (! bucket.contains(searched)) - bucket.add(searched); - } + + final Set bucket = search_buckets.get(current_search_bucket.get()); + bucket.add(searched); } // Not sending search right now: // search(channel); @@ -320,50 +328,44 @@ public void boost() /** Invoked by timer: Check searched channels for the next one to handle */ @SuppressWarnings("unchecked") - private void runSearches() + private synchronized void runSearches() { to_search.clear(); - synchronized (search_buckets) - { - // Determine current search bucket - final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size()); - final LinkedList bucket = search_buckets.get(current); - logger.log(Level.FINEST, () -> "Search bucket " + current); - - // Remove searched channels from the current bucket - SearchedChannel sc; - while ((sc = bucket.poll()) != null) - { - if (sc.channel.getState() == ClientChannelState.SEARCHING && - searched_channels.containsKey(sc.channel.getCID())) - { - // Collect channels in 'to_search' for handling outside of sync. section - to_search.add(sc.channel); - - // Determine next search period - final int period = sc.search_period.updateAndGet(sec -> sec < MAX_SEARCH_PERIOD - ? sec + 1 - : MAX_SEARCH_PERIOD); - - // Add to corresponding search bucket, or delay by one second - // in case that search bucket is quite full - final int i_n = (current + period) % search_buckets.size(); - final int i_n_n = (i_n + 1) % search_buckets.size(); - final LinkedList next = search_buckets.get(i_n); - final LinkedList next_next = search_buckets.get(i_n_n); - if (i_n == current || i_n_n == current) - throw new IllegalStateException("Current, next and nextnext search indices for " + sc.channel + " are " + - current + ", " + i_n + ", " + i_n_n); - if (next_next.size() < next.size()) - next_next.add(sc); - else - next.add(sc); - } + + // Determine current search bucket + final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size()); + final Set bucket = search_buckets.get(current); + logger.log(Level.FINEST, () -> "Search bucket " + current); + + // Remove searched channels from the current bucket + for (SearchedChannel sc : bucket) { + if (sc.channel.getState() == ClientChannelState.SEARCHING && + searched_channels.containsKey(sc.channel.getCID())) { + // Collect channels in 'to_search' for handling outside of sync. section + to_search.add(sc.channel); + + // Determine next search period + final int period = sc.search_period.updateAndGet(sec -> sec < MAX_SEARCH_PERIOD + ? sec + 1 + : MAX_SEARCH_PERIOD); + + // Add to corresponding search bucket, or delay by one second + // in case that search bucket is quite full + final int i_n = (current + period) % search_buckets.size(); + final int i_n_n = (i_n + 1) % search_buckets.size(); + final Set next = search_buckets.get(i_n); + final Set next_next = search_buckets.get(i_n_n); + if (i_n == current || i_n_n == current) + throw new IllegalStateException("Current, next and nextnext search indices for " + sc.channel + " are " + + current + ", " + i_n + ", " + i_n_n); + if (next_next.size() < next.size()) + next_next.add(sc); else - logger.log(Level.FINE, "Dropping channel from search: " + sc.channel); - } + next.add(sc); + } else + logger.log(Level.FINE, "Dropping channel from search: " + sc.channel); } - + bucket.clear(); // Search batch.. // Size of a search request is close to 50 bytes @@ -437,28 +439,35 @@ private void search(final Collection channels) { // For search via TCP, do we use plain TCP or do we send the search itself via TLS? // This is configured in EPICS_PVA_NAME_SERVERS via prefix pvas:// - final ClientTCPHandler tcp = tcp_provider.apply(name_server.getAddress(), name_server.isTLS()); + final Future create_tcp = tcp_provider.apply(name_server.getAddress(), name_server.isTLS()); + final ClientTCPHandler tcp; + try + { + tcp = create_tcp.get(); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Cannot obtain TCP handler to search " + name_server, ex); + continue; + } // In case of connection errors (TCP connection blocked by firewall), // tcp will be null - if (tcp != null) + final RequestEncoder search_request = (version, buffer) -> { - final RequestEncoder search_request = (version, buffer) -> - { - logger.log(Level.FINE, () -> "Searching for " + channels + " via TCP " + tcp.getRemoteAddress()); + logger.log(Level.FINE, () -> "Searching for " + channels + " via TCP " + tcp.getRemoteAddress()); - // Search sequence identifies the potentially repeated UDP. - // TCP search is once only, so PVXS always sends 0x66696E64 = "find". - // We send "look" ("kool" for little endian). - final int seq = 0x6C6F6F6B; + // Search sequence identifies the potentially repeated UDP. + // TCP search is once only, so PVXS always sends 0x66696E64 = "find". + // We send "look" ("kool" for little endian). + final int seq = 0x6C6F6F6B; - // Use 'any' reply address since reply will be via this TCP socket - final InetSocketAddress response_address = new InetSocketAddress(0); + // Use 'any' reply address since reply will be via this TCP socket + final InetSocketAddress response_address = new InetSocketAddress(0); - SearchRequest.encode(true, seq, channels, response_address, tls , buffer); - }; - tcp.submit(search_request); - } + SearchRequest.encode(true, seq, channels, response_address, tls , buffer); + }; + tcp.submit(search_request); } // Shortcut UDP search, avoid log messages when lists are empty @@ -524,7 +533,7 @@ private void sendSearch(final int seq, final Collection c } /** Stop searching channels */ - public void close() + public synchronized void close() { searched_channels.clear(); diff --git a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java index 4f22ad140a..3f349b708f 100644 --- a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java +++ b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java @@ -12,7 +12,9 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; @@ -58,7 +60,7 @@ public class PVAClient implements AutoCloseable private final ConcurrentHashMap channels_by_id = new ConcurrentHashMap<>(); /** TCP handlers by server address */ - private final ConcurrentHashMap tcp_handlers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> tcp_handlers = new ConcurrentHashMap<>(); private final AtomicInteger request_ids = new AtomicInteger(); @@ -89,20 +91,24 @@ public PVAClient() throws Exception // TCP traffic is handled by one ClientTCPHandler per address (IP, socket). // Pass helper to channel search for getting such a handler. - final BiFunction tcp_provider = (the_addr, use_tls) -> + final BiFunction> tcp_provider = (the_addr, use_tls) -> tcp_handlers.computeIfAbsent(the_addr, addr -> { - try - { - // If absent, create with initial empty GUID - return new ClientTCPHandler(this, addr, Guid.EMPTY, use_tls); - } - catch (Exception ex) + // If absent, create with initial empty GUID + final CompletableFuture create_tcp = new CompletableFuture<>(); + create_tcp.completeAsync(() -> { - logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); - } - return null; - + try + { + return new ClientTCPHandler(this, addr, Guid.EMPTY, use_tls); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); + } + return null; + }); + return create_tcp; }); search = new ChannelSearch(udp, udp_search_addresses, tcp_provider, name_server_addresses); @@ -250,32 +256,47 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, channel.setState(ClientChannelState.FOUND); logger.log(Level.FINE, () -> "Reply for " + channel + " from " + (tls ? "TLS " : "TCP ") + server + " " + guid); - final ClientTCPHandler tcp = tcp_handlers.computeIfAbsent(server, addr -> - { - try - { - return new ClientTCPHandler(this, addr, guid, tls); - } - catch (Exception ex) - { - logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); - } - return null; - }); - // In case of connection errors, tcp will be null - if (tcp == null) - { // Cannot connect to server on provided port? Likely a server or firewall problem. - // On the next search, that same server might reply and then we fail the same way on connect. - // Still, no way around re-registering the search so we succeed once the server is fixed. - search.register(channel, false /* not "now" but eventually */); - } - else - { - if (tcp.updateGuid(guid)) - logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp); - - channel.registerWithServer(tcp); - } + Thread.ofVirtual().name("Get TCP connection to " + server) + .start(() -> { + final Future tcp_future = tcp_handlers.computeIfAbsent(server, addr -> + { + final CompletableFuture new_tcp_future = new CompletableFuture<>(); + + // Trying to establish a TCP connection is blocking and can be slow, + // especially when blocked by firewall. Therefore, attempt the TCP + // connection on a separate virtual thread: + Thread.ofVirtual().name("Establish TCP connection to " + server) + .start(() -> + { + try { + var client_tcp_handler = new ClientTCPHandler(this, addr, guid, tls); + new_tcp_future.complete(client_tcp_handler); + } catch (Exception ex) { + logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); + // Cannot connect to server on provided port? Likely a server or firewall problem. + // On the next search, that same server might reply and then we fail the same way on connect. + // Still, no way around re-registering the search so we succeed once the server is fixed. + search.register(channel, false /* not "now" but eventually */); + new_tcp_future.complete(null); + } + }); + + return new_tcp_future; + }); + ClientTCPHandler tcp; + try { + tcp = tcp_future.get(); + } catch (Exception ex) { + logger.log(Level.WARNING, "Cannot connect to " + server, ex); + tcp = null; + } + // In case of connection errors, tcp will be null + if (tcp != null) { + if (tcp.updateGuid(guid)) + logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp); + channel.registerWithServer(tcp); + } + }); } /** Called by {@link ClientTCPHandler} when connection is lost or closed because unused @@ -288,7 +309,18 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, void shutdownConnection(final ClientTCPHandler tcp) { // Forget this connection - final ClientTCPHandler removed = tcp_handlers.remove(tcp.getRemoteAddress()); + final Future tcp_future = tcp_handlers.remove(tcp.getRemoteAddress()); + final ClientTCPHandler removed; + try + { + removed = tcp_future == null ? null : tcp_future.get(); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Cannot obtain TCP client to close for " + tcp, ex); + return; + } + if (removed != tcp) logger.log(Level.WARNING, "Closed unknown " + tcp, new Exception("Call stack")); @@ -352,8 +384,15 @@ public void close() } // Stop TCP and UDP threads - for (ClientTCPHandler handler : tcp_handlers.values()) - handler.close(true); + for (Future handler : tcp_handlers.values()) + try + { + handler.get().close(true); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "PVA Client error getting channel to close", ex); + } udp.close(); }