-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#27312] fix(JmsIO): create a session pool for JmsIO #27312 #27313
Conversation
[apache#26203] fix(JmsIO): use a unused port for tests in JmsIO apache#26203 Fixes apache#27312, apache#26203 Co-Authored-By: Amrane Ait Zeouay <[email protected]>
Codecov Report
@@ Coverage Diff @@
## master #27313 +/- ##
==========================================
+ Coverage 71.49% 71.51% +0.02%
==========================================
Files 858 858
Lines 104809 104809
==========================================
+ Hits 74935 74958 +23
+ Misses 28326 28303 -23
Partials 1548 1548
Flags with carried forward coverage won't be shown. Click here to find out more. see 10 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
@johnjcasey could you take a look at this one? |
@@ -699,22 +701,24 @@ protected void finalize() { | |||
public abstract static class Write<EventT> | |||
extends PTransform<PCollection<EventT>, WriteJmsResult<EventT>> { | |||
|
|||
abstract @Nullable ConnectionFactory getConnectionFactory(); | |||
public abstract @Nullable ConnectionFactory getConnectionFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are these public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I still remember, I was trying to access to it from the new package pool
. We can move the pool classes to the main package to keep all functions as they were
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
@SuppressWarnings({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not include new blanket nullness supression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove it
return new DefaultPooledObject<>(session); | ||
} | ||
|
||
private <U> U callSessionMethod(Session session, String methodName, U defaultValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we doing this via reflection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the Javax API doesn't provide any function for closing the session/connection. But, the two JMS clients (ActiveMQ & qpid) have the same functions getSession & getConnection. The main problem that I found out is that the connection is not getting closed after the session is closed.
Note: I didn't check other clients
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using JmsIO with the Solace broker and this reflection usage will most definitely not work with the JCSMP API which Solace uses to implement JMS and probably will break JmsIO usage with Solace altogether... so please consider that checking two clients might not be enough...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ppawel which client are using with the connector JmsIO
? We are using Solace as a message broker too but with Qpid
as a client (The connector Solace is not well maintained). It would be great to add the client that you are using in the test cases so we won't make sure we don't have any regression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using the official Solace implementation of JMS (sol-jms) but it is giving us a lot of headaches combined with Apache Beam streaming. I might try the Qpid JMS client, but in any case I am looking forward to this PR being integrated as well because currently one of the problems we observe is connection count blowing up under load.
import java.util.concurrent.atomic.AtomicReference; | ||
import org.apache.commons.pool2.ObjectPool; | ||
|
||
public abstract class SerializableSessionPool<T> implements ObjectPool<T>, Serializable, Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some tests for this session pool in specific? It seems like there is a theoretical risk of incorrectly borrowing/returning objects
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some tests using a Docker image of SolaceUI
and closing the connection manually. Everything works fine. I tried to add some Unit Tests but I was blocked. I was thinking to pass a SessionPoolFactory
so that it makes the tests easier to write. What do you think?
Note: I couldn't run the pipeline and stop the session, because I don't have control over that and also using
DirectRunner
I couldn't create multiple threads to test if the session can be borrowed after reaching the maximum limit of sessions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm all for writing code that is easy to test
@Amraneze Kind ping on this PR |
Sorry, I was on holidays. I will find time for it this week or next week to work on it |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
[#26203] fix(JmsIO): use a unused port for tests in JmsIO #26203
Fixes #27312, #26203
In this PR, I added a session pool so that it can be used for either reading or writing (for now it's only for writing). This will allow the user to configure the number of connection maximum to not be exceeded.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.