Skip to content

Move TCP connection to thread, fully unregister completed search #3348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
191 changes: 100 additions & 91 deletions core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -99,6 +101,22 @@ private class SearchedChannel
// Otherwise run risk of getting reply without being able
// to handle it
}

// Hash by channel name
@Override
public int hashCode()
{
return channel.getName().hashCode();
}

// Compare by channel name
@Override
public boolean equals(Object obj)
{
if (obj instanceof SearchedChannel other)
return other.channel.getName().equals(channel.getName());
return false;
}
}

// SearchedChannels are tracked in two data structures
Expand All @@ -115,11 +133,11 @@ private class SearchedChannel
// up to MAX_SEARCH_PERIOD.

/** Map of searched channels by channel ID */
private ConcurrentHashMap<Integer, SearchedChannel> searched_channels = new ConcurrentHashMap<>();
private HashMap<Integer, SearchedChannel> searched_channels = new HashMap<>();

/** Search buckets
*
* <p>The {@link #current_search_bucket} selects the list
* <p>The {@link #current_search_bucket} selects the set
* of channels to be searched by {@link #runSearches()},
* which runs roughly once per second, each time moving to
* the next search bucket in a ring buffer fashion.
Expand All @@ -136,13 +154,13 @@ private class SearchedChannel
* which would result in an endless loop.
*
* <p>Access to either {@link #search_buckets} or {@link #current_search_bucket}
* must SYNC on {@link #search_buckets}.
* must only occur in a 'synchronized' method.
*/
private final ArrayList<LinkedList<SearchedChannel>> search_buckets = new ArrayList<>();
private final ArrayList<Set<SearchedChannel>> search_buckets = new ArrayList<>(MAX_SEARCH_PERIOD+2);

/** Index of current search bucket, i.e. the one about to be searched.
*
* <p>Access must SYNC on {@link #search_buckets}.
* <p>Access must only occur in a 'synchronized' method.
*/
private final AtomicInteger current_search_bucket = new AtomicInteger();

Expand All @@ -157,7 +175,7 @@ private class SearchedChannel
private final ClientUDPHandler udp;

/** Create ClientTCPHandler from IP address and 'tls' flag */
private final BiFunction<InetSocketAddress, Boolean, ClientTCPHandler> tcp_provider;
private final BiFunction<InetSocketAddress, Boolean, Future<ClientTCPHandler>> tcp_provider;

/** Buffer for assembling search messages */
private final ByteBuffer send_buffer = ByteBuffer.allocate(PVASettings.MAX_UDP_UNFRAGMENTED_SEND);
Expand All @@ -179,17 +197,15 @@ private class SearchedChannel
*/
public ChannelSearch(final ClientUDPHandler udp,
final List<AddressInfo> udp_addresses,
final BiFunction<InetSocketAddress, Boolean, ClientTCPHandler> tcp_provider,
final BiFunction<InetSocketAddress, Boolean, Future<ClientTCPHandler>> tcp_provider,
final List<AddressInfo> name_server_addresses) throws Exception
{
this.udp = udp;
this.tcp_provider = tcp_provider;

synchronized (search_buckets)
{
for (int i=0; i<MAX_SEARCH_PERIOD+2; ++i)
search_buckets.add(new LinkedList<>());
}

for (int i = 0; i < MAX_SEARCH_PERIOD + 2; ++i)
search_buckets.add(new HashSet<>());

// Searches sent to multicast (IPv4, IPv6) or broadcast addresses (IPv4) reach every PVA server
// on that multicast group or bcast subnet.
Expand Down Expand Up @@ -242,7 +258,7 @@ public void start()
/** @param channel Channel that should be searched
* @param now Start searching as soon as possible, or delay?
*/
public void register(final PVAChannel channel, final boolean now)
public synchronized void register(final PVAChannel channel, final boolean now)
{
logger.log(Level.FINE, () -> "Register search for " + channel + (now ? " now" : " soon"));

Expand All @@ -252,13 +268,11 @@ public void register(final PVAChannel channel, final boolean now)

final SearchedChannel sc = searched_channels.computeIfAbsent(channel.getCID(), id -> new SearchedChannel(channel));

synchronized (search_buckets)
{
int bucket = current_search_bucket.get();
if (!now)
bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size();
search_buckets.get(bucket).add(sc);
}
int bucket = current_search_bucket.get();
if (!now)
bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size();
search_buckets.get(bucket).add(sc);

// Jumpstart search instead of waiting up to ~1 second for current bucket to be handled
if (now)
timer.execute(this::runSearches);
Expand All @@ -268,17 +282,14 @@ public void register(final PVAChannel channel, final boolean now)
* @param channel_id
* @return {@link PVAChannel}, <code>null</code> when channel wasn't searched any more
*/
public PVAChannel unregister(final int channel_id)
public synchronized PVAChannel unregister(final int channel_id)
{
final SearchedChannel searched = searched_channels.remove(channel_id);
if (searched != null)
{
logger.log(Level.FINE, () -> "Unregister search for " + searched.channel.getName() + " " + channel_id);
// NOT removing `searched` from all `search_buckets`.
// Removal would be a slow, linear operation.
// `runSearches()` will drop the channel from `search_buckets`
// because it's no longer listed in `searched_channels`

for (Set<SearchedChannel> bucket : search_buckets)
bucket.remove(searched);
return searched.channel;
}
return null;
Expand All @@ -288,7 +299,7 @@ public PVAChannel unregister(final int channel_id)
*
* <p>Resets their search counter so they're searched "real soon".
*/
public void boost()
public synchronized void boost()
{
for (SearchedChannel searched : searched_channels.values())
{
Expand All @@ -299,12 +310,9 @@ public void boost()
if (period == MIN_SEARCH_PERIOD)
{
logger.log(Level.FINE, () -> "Restart search for '" + searched.channel.getName() + "'");
synchronized (search_buckets)
{
final LinkedList<SearchedChannel> bucket = search_buckets.get(current_search_bucket.get());
if (! bucket.contains(searched))
bucket.add(searched);
}

final Set<SearchedChannel> bucket = search_buckets.get(current_search_bucket.get());
bucket.add(searched);
}
// Not sending search right now:
// search(channel);
Expand All @@ -320,50 +328,44 @@ public void boost()

/** Invoked by timer: Check searched channels for the next one to handle */
@SuppressWarnings("unchecked")
private void runSearches()
private synchronized void runSearches()
{
to_search.clear();
synchronized (search_buckets)
{
// Determine current search bucket
final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size());
final LinkedList<SearchedChannel> bucket = search_buckets.get(current);
logger.log(Level.FINEST, () -> "Search bucket " + current);

// Remove searched channels from the current bucket
SearchedChannel sc;
while ((sc = bucket.poll()) != null)
{
if (sc.channel.getState() == ClientChannelState.SEARCHING &&
searched_channels.containsKey(sc.channel.getCID()))
{
// Collect channels in 'to_search' for handling outside of sync. section
to_search.add(sc.channel);

// Determine next search period
final int period = sc.search_period.updateAndGet(sec -> sec < MAX_SEARCH_PERIOD
? sec + 1
: MAX_SEARCH_PERIOD);

// Add to corresponding search bucket, or delay by one second
// in case that search bucket is quite full
final int i_n = (current + period) % search_buckets.size();
final int i_n_n = (i_n + 1) % search_buckets.size();
final LinkedList<SearchedChannel> next = search_buckets.get(i_n);
final LinkedList<SearchedChannel> next_next = search_buckets.get(i_n_n);
if (i_n == current || i_n_n == current)
throw new IllegalStateException("Current, next and nextnext search indices for " + sc.channel + " are " +
current + ", " + i_n + ", " + i_n_n);
if (next_next.size() < next.size())
next_next.add(sc);
else
next.add(sc);
}

// Determine current search bucket
final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size());
final Set<SearchedChannel> bucket = search_buckets.get(current);
logger.log(Level.FINEST, () -> "Search bucket " + current);

// Remove searched channels from the current bucket
for (SearchedChannel sc : bucket) {
if (sc.channel.getState() == ClientChannelState.SEARCHING &&
searched_channels.containsKey(sc.channel.getCID())) {
// Collect channels in 'to_search' for handling outside of sync. section
to_search.add(sc.channel);

// Determine next search period
final int period = sc.search_period.updateAndGet(sec -> sec < MAX_SEARCH_PERIOD
? sec + 1
: MAX_SEARCH_PERIOD);

// Add to corresponding search bucket, or delay by one second
// in case that search bucket is quite full
final int i_n = (current + period) % search_buckets.size();
final int i_n_n = (i_n + 1) % search_buckets.size();
final Set<SearchedChannel> next = search_buckets.get(i_n);
final Set<SearchedChannel> next_next = search_buckets.get(i_n_n);
if (i_n == current || i_n_n == current)
throw new IllegalStateException("Current, next and nextnext search indices for " + sc.channel + " are " +
current + ", " + i_n + ", " + i_n_n);
if (next_next.size() < next.size())
next_next.add(sc);
else
logger.log(Level.FINE, "Dropping channel from search: " + sc.channel);
}
next.add(sc);
} else
logger.log(Level.FINE, "Dropping channel from search: " + sc.channel);
}

bucket.clear();

// Search batch..
// Size of a search request is close to 50 bytes
Expand Down Expand Up @@ -437,28 +439,35 @@ private void search(final Collection<SearchRequest.Channel> channels)
{
// For search via TCP, do we use plain TCP or do we send the search itself via TLS?
// This is configured in EPICS_PVA_NAME_SERVERS via prefix pvas://
final ClientTCPHandler tcp = tcp_provider.apply(name_server.getAddress(), name_server.isTLS());
final Future<ClientTCPHandler> create_tcp = tcp_provider.apply(name_server.getAddress(), name_server.isTLS());
final ClientTCPHandler tcp;
try
{
tcp = create_tcp.get();
}
catch (Exception ex)
{
logger.log(Level.WARNING, "Cannot obtain TCP handler to search " + name_server, ex);
continue;
}

// In case of connection errors (TCP connection blocked by firewall),
// tcp will be null
if (tcp != null)
final RequestEncoder search_request = (version, buffer) ->
{
final RequestEncoder search_request = (version, buffer) ->
{
logger.log(Level.FINE, () -> "Searching for " + channels + " via TCP " + tcp.getRemoteAddress());
logger.log(Level.FINE, () -> "Searching for " + channels + " via TCP " + tcp.getRemoteAddress());

// Search sequence identifies the potentially repeated UDP.
// TCP search is once only, so PVXS always sends 0x66696E64 = "find".
// We send "look" ("kool" for little endian).
final int seq = 0x6C6F6F6B;
// Search sequence identifies the potentially repeated UDP.
// TCP search is once only, so PVXS always sends 0x66696E64 = "find".
// We send "look" ("kool" for little endian).
final int seq = 0x6C6F6F6B;

// Use 'any' reply address since reply will be via this TCP socket
final InetSocketAddress response_address = new InetSocketAddress(0);
// Use 'any' reply address since reply will be via this TCP socket
final InetSocketAddress response_address = new InetSocketAddress(0);

SearchRequest.encode(true, seq, channels, response_address, tls , buffer);
};
tcp.submit(search_request);
}
SearchRequest.encode(true, seq, channels, response_address, tls , buffer);
};
tcp.submit(search_request);
}

// Shortcut UDP search, avoid log messages when lists are empty
Expand Down Expand Up @@ -524,7 +533,7 @@ private void sendSearch(final int seq, final Collection<SearchRequest.Channel> c
}

/** Stop searching channels */
public void close()
public synchronized void close()
{
searched_channels.clear();

Expand Down
Loading
Loading