-
Notifications
You must be signed in to change notification settings - Fork 1.1k
SIRI-ET updater via MQTT #6851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev-2.x
Are you sure you want to change the base?
SIRI-ET updater via MQTT #6851
Conversation
…ogging the password
# Conflicts: # application/src/main/java/org/opentripplanner/updater/trip/siri/ModifiedTripBuilder.java
This reverts commit e647b5c.
I've done some performance tests. Bottleneck is however the XML Parsing with about 1000 messages per second on my local machine and 420 messages per second on our cloud machine. The reason to change the library to HiveMQ would be that it's a newer library that still receives updates. It is faster, but right now we would not be able to profit from that. |
We decided in the dev meeting to go forward with the HiveMq library. I will check, if all current MQTT implementations work with HiveMq, and then substitute Paho with HiveMQ. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## dev-2.x #6851 +/- ##
=============================================
- Coverage 72.14% 71.98% -0.17%
- Complexity 19772 19789 +17
=============================================
Files 2151 2156 +5
Lines 79955 80274 +319
Branches 8058 8073 +15
=============================================
+ Hits 57687 57786 +99
- Misses 19423 19643 +220
Partials 2845 2845 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
{ | ||
"updaters" : [ | ||
{ | ||
"type" : "siri-et-mqtt-updater", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a reviewer but I just want to drop this: I find the suffix -updater
in these type
values strange because they are all updaters. In the ones I have added I didn't use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some comments you might want to consider.
parameters.user() == null || | ||
parameters.user().isBlank() || | ||
parameters.password() == null || | ||
parameters.password().isBlank() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a utility method for this at StringUtils.hasValue()
that you can use if you want.
primingFutures.add(f); | ||
} | ||
LOG.info("Started {} priming workers", parameters.numberOfPrimingWorkers()); | ||
liveExecutor.submit(new LiveRunner()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to run this in parallel with your priming? This could cause a live ET message be overwritten by a retained message. In our implementation we apply all the full history before we start consuming live messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, thanks!
private void onMessage(Mqtt5Publish message) { | ||
boolean offer; | ||
if (message.isRetain() && !primed) { | ||
offer = primingMessageQueue.offer(message.getPayloadAsBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there could be a race condition here if a message is put on the primingMessageQueue at the same time as the last RetainRunner times out. Then this message won't be processed. That might not be a catastrophe, but worth to consider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a new client connects to the broker, only old messages will be marked as retained for that client. All messages that are processed immediately after they are sent to the broker will never have the retained flag. So the idea is that the fixed amount of retained messages get processed, and when the runners idle long enough (maxPrimingIdleTime
), then it is assumed that all retained messages are processed so the runners can get shut down.
List<CompletableFuture<Void>> primingFutures = new ArrayList<>(); | ||
|
||
for (int i = 0; i < parameters.numberOfPrimingWorkers(); i++) { | ||
CompletableFuture<Void> f = CompletableFuture.runAsync(new RetainRunner(i), primingExecutor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing to consider about parallelizing your ET processing is that you might apply your ET messages out of order. If you have multiple messages for the same trip (for example a time update followed by a cancellation) then you will get a different state depending on the order that these are applied. If you don't have duplicate messages for the same trip in your retained messages this won't be a problem i think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only have one message per trip, so the order shouldn't matter:
- One mqtt topic per trip, in every topic is always only one message
- the only way to get 2 messages for a trip is if a new live message comes in (which is exactly the problem you mentioned in your other comment about the LiveRunner starting too early)
Summary
This PR adds the support to import SIRI-ET realtime updates via MQTT. It's implemented as a sandbox feature. When no SIRI MQTT updater is configured in the
router-config.json
the sandbox code is not executed.Issue
Closes #6639
Unit tests
Without an MQTT this is hard to test. Existing tests all run successfully and changes to non-sandbox code are minimal.
Documentation
Documentation has been updated.
Changelog
Added to changelog
Bumping the serialization version id
Not necessary.