Skip to content

Minor: Streams thread safe position. #19480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from

Conversation

cjf2xn
Copy link

@cjf2xn cjf2xn commented Apr 16, 2025

Problem

The Position class uses multi-threaded data structures and safe publication which suggests to users that it's thread safe. The Position#merge method is not thread-safe due to the check-then-act logic:

if (!partitionMap.containsKey(partition) || partitionMap.get(partition) < offset) {
  partitionMap.put(partition, offset);
}

The partitionMap's entry for partition can change between the conditional check and the put call.

The multi-threaded unit test reproduces the problem and fails ~80% of the time without the fix.

Solution

Delegate mutation to withComponent which is thread-safe since it relies on ConcurrentHashMap#compute which uses synchronization internally to atomically execute the remapping function.

@github-actions github-actions bot added triage PRs from the community streams labels Apr 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
streams triage PRs from the community
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant