fix: handle redelivered messages in map stream#3418
Conversation
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3418 +/- ##
==========================================
+ Coverage 82.65% 82.75% +0.10%
==========================================
Files 307 307
Lines 76082 76165 +83
==========================================
+ Hits 62882 63027 +145
+ Misses 12645 12580 -65
- Partials 555 558 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…n cancellation token in execute loop Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
…message Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
| _ = self.shared_ctx.hard_shutdown_token.cancelled() => { | ||
| let e = Error::Mapper("Map Stream cancelled, current message will be nacked".into()); | ||
| warn!(parent_id = ?self.msg_handle.message.id, ?e, "failed to map message"); |
There was a problem hiding this comment.
If downstream is handling the cancellation, we don't need a check here.
| match senders_guard.map.entry(key.clone()) { | ||
| Entry::Occupied(_) => Some(format!( | ||
| "duplicate in-flight request id: {key}; refusing to overwrite existing sender" | ||
| )), | ||
| Entry::Vacant(slot) => { | ||
| slot.insert(tx.clone()); | ||
| None | ||
| } |
There was a problem hiding this comment.
do we need this check in unary as well?
There was a problem hiding this comment.
Ideally yes, but the case for unary and batch is interesting because we're saved by the fact that they use oneshot sender to get their responses back from the receiver task.
So, even if this sender overwriting behaviour happens, one of the unary/batch tasks will still receive the full response.
Correct fix would be to handle deduplication in the reader/source
There was a problem hiding this comment.
Do you want to handle dedup at the source/reader? because once you have that you will have to remove this logic from the map stream.
There was a problem hiding this comment.
I don't think we need to remove this defensive check in the map stream irrespective of whether we have the handling present in the reader/source.
I can create a separate PR for reader/source
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
|
redelivered messages are being handled in reader/source as part of this change: github.com//pull/3429 Will close this PR. |
What this PR does / why we need it
Changes:
streammethod, when mapper is closed/duplicate ID exists in senders map.Related issues
Fixes: #3423
Testing
Special notes for reviewers
Anything notable for review (risk, rollout, follow-ups).