Skip to content

Conversation

@Fariarx
Copy link

@Fariarx Fariarx commented Nov 30, 2025

Changes:

  • Replaced sequential ReplaceOneAsync calls with BulkWriteAsync in UpdateAsync method when processing multiple (>1) read model updates

Impact:
Significantly accelerates read model repopulation in MongoDB.​

Production proven:
This change has been running in production for over a year with excellent stability and 2-4x performance improvement for batches >1000 events during repopulation.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Fariarx seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@rasmus
Copy link
Member

rasmus commented Dec 6, 2025

Remove the need for approvals on builds as legacy v0 builds have been stopped (was using Windows Server).


var collection = _mongoDatabase.GetCollection<TReadModel>(readModelDescription.RootCollectionName.Value);
var filter = Builders<TReadModel>.Filter.In(readModel => readModel.Id, modelIds);
var results = await collection.Find(filter).ToListAsync(cancellationToken: cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var results = await collection.Find(filter).ToListAsync(cancellationToken: cancellationToken);
var results = await collection.Find(filter).ToListAsync(cancellationToken: cancellationToken).ConfigureAwait(false);


if (readModelContext.IsMarkedForDeletion)
{
await DeleteAsync(readModelUpdate.ReadModelId, c);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await DeleteAsync(readModelUpdate.ReadModelId, c);
await DeleteAsync(readModelUpdate.ReadModelId, c).ConfigureAwait(false);

x.ReadModelEnvelope.ReadModel) { IsUpsert = true };

return replaceOneModel;
}), cancellationToken: c);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}), cancellationToken: c);
}), cancellationToken: c).ConfigureAwait(false);

@eventflow eventflow deleted a comment from github-actions bot Dec 6, 2025
@rasmus
Copy link
Member

rasmus commented Dec 6, 2025

.review

@github-actions
Copy link

github-actions bot commented Dec 6, 2025

Findings

  • Source/EventFlow.MongoDB/ReadStores/MongoDbReadModelStore.cs:203 – The pre-fetch of results happens outside the _transientFaultHandler.TryAsync scope. If any write in the batch collides and the retry strategy replays the delegate, it reuses the stale snapshot and keeps sending the same OriginalVersion, so the retry can never succeed. Please move the load inside the retry to refresh state per attempt (matching the previous one-by-one behavior).
  • Source/EventFlow.MongoDB/ReadStores/MongoDbReadModelStore.cs:249 – When every ReadModelUpdateResult is IsModified == false, nonNullEnvelopes is empty and BulkWriteAsync throws because MongoDB forbids empty batches. The sequential implementation simply skipped writes in that case; this now hard-fails. Bail out early when there is nothing to persist.
  • Source/EventFlow.MongoDB/ReadStores/MongoDbReadModelStore.cs:271 – BulkWriteAsync raises MongoBulkWriteException<T> (not MongoWriteException). The catch therefore misses version conflicts, so callers stop receiving OptimisticConcurrencyException and the retry policy never triggers. Catch the bulk exception (or the general MongoException) and wrap it as before.

Next steps

  1. Move the Find(...).ToListAsync into the retry lambda and ensure retries fetch fresh state.
  2. Skip BulkWriteAsync if there are no modified envelopes.
  3. Catch MongoBulkWriteException<TReadModel> (or MongoException) and rethrow OptimisticConcurrencyException with the proper read model id in the message.

Your friendly neighborhood AI

@rasmus
Copy link
Member

rasmus commented Dec 6, 2025

@Fariarx let me know if the AI review has value and it seems there's a misalignment of your commit email and your GitHub user. Align those and submit the CLA ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants