Resolve concurrency issues that were discovered when upgrading the ws package to the latest version. #7068
+30
−12
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR contains:
Describe the problem you have without this PR
After update ws to 18.81 version, it will make test case
doing insert after subscribe should end with the correct results
error.Through my investigation, this is a concurrency issue that has existed since the beginning. The upgrade of the ws package only changed the message transfer speed, making the concurrency problem more likely to surface. When I manually slowed down the query execution to an extremely low speed, the original version of ws also exhibited the same concurrency issue, causing the client to lose necessary data when processing the changesStream.
The cause of the problem
So by the time the client obtained the query results and began processing the changesStream, it was already too late. The query results we obtained were from version 2, but I started getting the changesStream from the version 3 database.
What I do
The most ideal situation is that we get the changeStream starting from v2. However, the client and the server are two separate entities, so we can only find a checkPoint close to v2 to start merging data.
When rxQuery mustReExec, it records the changePoint. Let changePoint = _changeEventBuffer.getCounter(), which records the current database version. When the query results are obtained, the changesEvent is obtained starting from the recorded changePoint. Of course, this will cause data redundancy, but data redundancy is much easier to handle than data loss. All we need to do is to implement idempotency checks.
And then the issue has been resolved, and the CI is currently functioning normally. CI Result