Skip to content

Move TCP connection to thread, fully unregister completed search #3348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

kasemir
Copy link
Collaborator

@kasemir kasemir commented Apr 2, 2025

Next step after #3345

@kasemir
Copy link
Collaborator Author

kasemir commented Apr 2, 2025

@abrahamwolk This is basically what you had, moving the connection to a thread. Plus the searches are all cancelled, and we'll have a shorter timeout.

If you test this, it might "work". It performs alright when I try it.
But I think there's still a problem.

Assume you have channels A and B both on the same server.
Client searches for A, then B.
Client gets replies:
"Server with GUID X has channel A, talk to me on TCP 10.10...".
"Server with GUID X has channel B, talk to me on TCP 10.10...".

Client now handles BOTH replies on separate threads.
The tcp_handlers.computeIfAbsent(... is in there to assert that we only create ONE TCP handler per GUID.
Now the threads for A and B are both calling tcp_handlers.computeIfAbsent(server for GUID X, ... and might hang in there waiting for the timeout:

// Atomically create one ClientTCPHanlder per server
final ClientTCPHandler tcp = tcp_handlers.computeIfAbsent(server, addr ->
{
    try
    {    // Hung in here when TCP connection is slow!!
         return new ClientTCPHandler(this, addr, guid, tls);
    }
    catch (Exception ex)
    {
        logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex);
    }
    return null;
});

Is that fine, the computeIfAbsent works exactly as intended?
Or does that mean that other threads which don't have a timeout are now also blocked at this point??
Do we need to change into something like this?

// Atomically create one ClientTCPHanlder per server
final ClientTCPHandler tcp = tcp_handlers.computeIfAbsent(server, addr ->
{
    try
    {    // ClientTCPHandler constructor does NOT connect
         return new ClientTCPHandler(this, addr, guid, tls);
    }
    catch (Exception ex)
    {
        logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex);
    }
    return null;
});
// Quickly and atomically fetched the one and only ClientTCPHandler for that server.
// Now connect, which might take longer
tcp.assertConnection();
// ^^ new method in ClientTCPHandler that performs the connection which used to be in the constructor..
// Both threads call that, but it will only connect once.
// New problem: If that fails, need to remove from tcp_handlers

@kasemir
Copy link
Collaborator Author

kasemir commented Apr 2, 2025

Playing with a simplified test setup, it looks like the concurrent hash map and computeIfAbsent do exactly what we need:

Thread 1 calling computeIfAbsent for server A and then stuck there waiting for an eventual timeout
Thread 2 calling computeIfAbsent for that same server A: It's stuck in computeIfAbsent, will eventually get the same result as thread 1, so no duplication.
Thread 3 calling computeIfAbsent for a different server B: It's not stuck but gets to create the TCP handler for server B right away
Thread 4 later calling computeIfAbsent for server B: Gets it immediately.

So being stuck inside the computeIfAbsent lambda during a connection issue will only affect those threads that are looking for that server, while threads for other servers can continue.

@abrahamwolk
Copy link
Collaborator

abrahamwolk commented Apr 3, 2025

Playing with a simplified test setup, it looks like the concurrent hash map and computeIfAbsent do exactly what we need:

Very nice! The official documentation [1] also states (I have emphasized by making part of the text bold):

If the specified key is not already associated with a value, attempts to compute its value using the given mapping function and enters it into this map unless null. The entire method invocation is performed atomically. The supplied function is invoked exactly once per invocation of this method if the key is absent, else not at all. Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple.

The mapping function must not modify this map during computation.

which sounds correct.

(I edited this comment to update to the documentation for Java 17, the version of Java we are using when writing this.)

[1] https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent(K,java.util.function.Function)

{
try
final ClientTCPHandler tcp = tcp_handlers.computeIfAbsent(server, addr ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the documentation [1] of ConcurrentHashMap.computeIfAbsent() more carefully, it seems that it is not guaranteed that all other updates are not blocked:

Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple.

In fact, I seem to get connections that are blocked until a timeout occurs on the establishment of one connection, before the establishment of another connection.

This comment suggests the same: https://stackoverflow.com/a/78230808

I think it's worth thinking about in more detail if we can make the implementation entirely non-blocking.

[1] https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent(K,java.util.function.Function)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
I'm thinking that tcp_handlers could change from ConcurrentHashMap<server, ClientTCPHandler> to ConcurrentHashMap<server, Future<ClientTCPHandler>>. So it would return the Future<ClientTCPHandler> without delay. The thread might then time out while waiting for that Future to complete, but the computeIfAbsent is immediate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that sounds like a good idea!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now using ConcurrentHashMap<server, Future<ClientTCPHandler>>, need to test that a little

synchronized (search_buckets)
{
for (LinkedList<SearchedChannel> bucket : search_buckets)
bucket.remove(searched);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LinkedList.remove() only removes the first occurrence of an element. Can an element occur multiple times in a bucket, and if so, should all the occurrences of the element be removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boost only adds it once, so needs to be like this in all places that add?

if (! bucket.contains(searched))
                        bucket.add(searched);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I wasn't aware it was checked when it was added.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's checked in one place, need to see if it's enforced in all places.

Copy link
Collaborator

@abrahamwolk abrahamwolk Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is perhaps outside the scope of this pull request, but if we change the type of bucket to be a Set [1], then we can likely get both

  1. More efficient search and removal of elements.
  2. Guarantee that elements occur only at most once.

If the order of insertion is important, an implementation like, e.g., LinkedHashSet could be used.

(Edited to update the link to point to the documentation of Java version 17.)

[1] https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/Set.html

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now a Set

@@ -250,32 +250,40 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server,
channel.setState(ClientChannelState.FOUND);
logger.log(Level.FINE, () -> "Reply for " + channel + " from " + (tls ? "TLS " : "TCP ") + server + " " + guid);

final ClientTCPHandler tcp = tcp_handlers.computeIfAbsent(server, addr ->
// TCP connection can be slow, especially when blocked by firewall, so move to thread
// TODO Lightweight thread? Thread pool?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be hesitant to create OS-level threads for each connection attempt. I think it's not unreasonable for an OPI to contain on the order of 100 or even 1000 PVs, and I am not sure creating that many OS-level threads is a good idea. I suggest that we wait until we have adopted Java 21 and then create Virtual Threads instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think adding a thread pool would be good.

I'm not actually sure it would be 1000 threads per OPI. I believe there is some tcp connection sharing, the case that did the crash was more the number of IOCs per OPI, which I think approaches closer to 100 max than 1000.

The archiver however.... that's a lot of threads.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not an upgrade to Java 21, saves us discussions on thread pools...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a thread per TCP connection, and one TCP connection per IP:port.
Now using virtual thread.
Runs with JDK 20 when using --enable-preview or of course JDK 21

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. but the GitHub build now fails because it's not using JDK 21:

 /home/runner/work/phoebus/phoebus/core/pva/src/main/java/org/epics/pva/client/PVAClient.java:[260,14] error: cannot find symbol
Error:    symbol:   method ofVirtual()

kasemir added 3 commits April 3, 2025 06:58
This way, `tcp_handlers` can provide the `Future` without delays, while
the slower TCP connection is then awaited when getting the future's
value
@@ -250,32 +256,52 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server,
channel.setState(ClientChannelState.FOUND);
logger.log(Level.FINE, () -> "Reply for " + channel + " from " + (tls ? "TLS " : "TCP ") + server + " " + guid);

final ClientTCPHandler tcp = tcp_handlers.computeIfAbsent(server, addr ->
// TCP connection can be slow, especially when blocked by firewall, so move to thread
Thread.ofVirtual()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One alternative to this way of implementing the functionality, would be to create the thread only for the computation that computes the value of the Future. That way, there will be fewer threads.

I'm thinking of something along the lines of:

final Future<ClientTCPHandler> tcp_future = tcp_handlers.computeIfAbsent(server, addr ->
            {
                final CompletableFuture<ClientTCPHandler> create_tcp = new CompletableFuture<>();

                // Attempt the TCP connection on a separate virtual thread:
                Thread.ofVirtual().name("TCP connect " + server)
                                  .start(() ->
                        {
                            try {
                                var client_tcp_handler = new ClientTCPHandler(this, addr, guid, tls);
                                create_tcp.complete(client_tcp_handler);
                            }
                            catch (Exception ex)
                            {
                                logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex);
                            }
                            create_tcp.complete(null);
                        });
                
                return create_tcp;
            });

This is just an idea to discuss. (Also, I have not compiled this code, it's just a sketch.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Go ahead and update the branch like that, since this also addresses your other concern about CompletableFuture.completeAsync using OS threads

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented this idea now and pushed the implementation to the branch of this pull request. (Commit: f982551)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My commit f982551 was based on wrong reasoning: the outer thread is still needed, since otherwise the logic blocks when calling tcp_future.get(). I have fixed this with the commit 97dc5f7 by adding back the outer thread again.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the conclusion is that it doesn't seem that we can avoid creating many threads this way, but at least we can create virtual threads instead of OS-level threads.

{
try
final CompletableFuture<ClientTCPHandler> create_tcp = new CompletableFuture<>();
create_tcp.completeAsync(() ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, will this not spawn an OS-level thread? The documentation of CompletableFuture.completeAsync() [1] states:

Completes this CompletableFuture with the result of the given Supplier function invoked from an asynchronous task using the default executor.

[1] https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/CompletableFuture.html#completeAsync(java.util.function.Supplier)

@abrahamwolk
Copy link
Collaborator

There is still some issue present in ChannelSearch with searched_channels I think.

Unfortunately, I don't know how to reproduce this, but I have been able to end up with searched_channels in a state like this:
searched_channels

I am looking into how to reproduce this.

@kasemir
Copy link
Collaborator Author

kasemir commented Apr 4, 2025

You end up with several instances of "LocalPV2", each with a different client ID?
At first glance, PVPool.getPV should avoid that. If you open the menu Applications, Debug, PV List, does that also list the PV many times?
Do you get into this situation when closing and re-opening displays which contain problematic channels?
I assume the PV pool has at most one, maybe no reference to "LocalPV2", and the dangling entries in searched_channels are left over from channels that never connected and weren't then removed when the channel got closed.

Maybe try to trap that by adding something like this to ChannelSearch.register:

    public void register(final PVAChannel channel, final boolean now)
    {
        logger.log(Level.FINE, () -> "Register search for " + channel + (now ? " now" : " soon"));
        
        for (var searched : searched_channels.values())
            if (searched.channel.getName().equals(channel.getName()))
                System.out.println("Channel " + channel + " already searched as " + searched + "?!?");

@abrahamwolk
Copy link
Collaborator

abrahamwolk commented Apr 4, 2025

One thing I'm doing is re-loading the OPI many times, so maybe it's related to that.

Are the data-structures for searching for PVs cleared when reloading OPIs?

I added assert statements similar to the code you posted, and the assertions fail sometimes. But I cannot easily determine the reason for failing from that.

@kasemir
Copy link
Collaborator Author

kasemir commented Apr 4, 2025

ChannelSearch.unregister would ordinarily be called to remove the channel. Maybe it isn't for some reason in that scenario

@abrahamwolk
Copy link
Collaborator

ChannelSearch.unregister would ordinarily be called to remove the channel. Maybe it isn't for some reason in that scenario

Yes. I will continue to debug this.

@abrahamwolk
Copy link
Collaborator

abrahamwolk commented Apr 8, 2025

I am able to reproduce the bug by:

  1. Running an OPI with a PV that is reachable over UDP but not TCP.
  2. Reloading the OPI very quickly by holding the key 'F5' pressed.
  3. The bug (sometimes or often) occurs when there is a timeout on trying to establish the TCP connection.

In order to trigger the bug, it is helpful to merge in the master-branch and setting the timeout on TCP connections to a low value using the feature implemented in #3345.

While I have not managed to determine the exact cause of the bug, I no longer encounter the bug if I add the keyword synchronized to the methods that work on search_buckets and searched_channels. I have implemented this in commit 31ccb76.

With the changes I added in 31ccb76, there were two separate approaches to locking search_buckets and searched_channels:

  1. The original locking mechanism that uses synchronized (search_buckets) { ... }-blocks and the fact that searched_channels is a ConcurrentHashMap<>.
  2. The synchronized keyword that I added to the methods.

While it seems that the two approaches can co-exist, I think it's not optimal, as it makes the code harder to read and reason about.

In order to facilitate reasoning, I therefore removed the old mechanism (i.e., the mechanism under point 1), so that the code instead uses only mechanism 2, which seems to me to be easier to reason about. I have implemented this in the two commits 971903c and bcbc14f.

@abrahamwolk
Copy link
Collaborator

abrahamwolk commented Apr 8, 2025

In the current pull request, on line 293 of PVAClient.java, a call to search.register() is made in order to re-register a search for a channel to which the establishment of a TCP connection was unsuccessful.

Should this call be moved into the computation that tries to establish the TCP connection? (I.e., into the "inner" virtual thread that computes the value of the Future<>?)

It seems to me that by being called in the "outer" virtual thread, the call to search.register() may occur more than once, since more than one thread may be waiting for the same TCP connection.

EDIT: Since I believe this is correct, I have implemented it in the commit 5df73f6.

@kasemir
Copy link
Collaborator Author

kasemir commented Apr 8, 2025

My leaning would be to avoid wholesale synchronized methods and instead only synchronize on a specific object as necessary. Using the synchronized methods, searched_channels.computeIfAbsent becomes somewhat meaningless since it's always inside a blocking call.
Still, if this fixes the superfluous search entries, that's of course very good.
Is it possible that your most recent update, moving the call to search.register() inside the connection thread, is the actual fix, and the synchronization can then be left as it was using synchronized (search_buckets) { ... }?

@abrahamwolk
Copy link
Collaborator

Is it possible that your most recent update, moving the call to search.register() inside the connection thread, is the actual fix, and the synchronization can then be left as it was using synchronized (search_buckets) { ... }?

No, unfortunately it doesn't solve the issue.

@kasemir
Copy link
Collaborator Author

kasemir commented Apr 21, 2025

OK, where are we now with this?

  • Using Set<SearchedChannel> for the search_buckets to avoid multiple registrations of the same channel
  • Unregister removes channel from all search buckets
  • synchronized methods instead of finer grained sync on search_buckets
  • Create TCP connection on virtual thread

As far as we can tell from looking at it and tests, this is an improvement, but the virtual threads require JDK 21, causing the automated build to fail.

Now what?
Use plain threads for the time being?
Remove spring boot so that #3350 can update to JDK 21?
Ignore the build failures?

@abrahamwolk
Copy link
Collaborator

To me, this pull request also looks like an improvement.

I propose to first upgrade Spring Boot and JDK, and then to merge this pull request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants