Skip to content

[ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together #16486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: trunk
Choose a base branch
from

Conversation

yazgoo
Copy link

@yazgoo yazgoo commented Jun 28, 2024

see https://issues.apache.org/jira/browse/KAFKA-17049

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

example

  • 🟥: task for connector 1
  • 🟩: task for connector 2
  • 🟦: task for connector 3

let's say we want to get 4 revoked task for a worker with this tasks list (currentWorkerAllocation in the original code):

🟥 🟥 🟥 🟥 🟩 🟩 🟩 🟩 🟦 🟦 🟦 🟦

right now the first 4 are taken.

🟥 🟥 🟥 🟥

In our example, the first had all the same connector, so the revocation is unfair.

In the end this leads to an unbalanced distribution of tasks.

What this PR suggest is a fairer revocation, like so:

🟥 🟩 🟦 🟥

new algorithm detail

To do this first group the tasks by connectors

  • 🟥 🟥 🟥 🟥
  • 🟩 🟩 🟩 🟩
  • 🟦 🟦 🟦 🟦

and then we pick the first items of each list, one by one, so if we iterate:

iteration state result
0 🟥 🟥 🟥 🟥
🟩 🟩 🟩 🟩
🟦 🟦 🟦 🟦
1 🟥 🟥 🟥
🟩 🟩 🟩 🟩
🟦 🟦 🟦 🟦
🟥
2 🟥 🟥 🟥
🟩 🟩 🟩
🟦 🟦 🟦 🟦
🟥 🟩
3 🟥 🟥 🟥
🟩 🟩 🟩
🟦 🟦 🟦
🟥 🟩 🟦
4 🟥 🟥
🟩 🟩 🟩
🟦 🟦 🟦
🟥 🟩 🟦 🟥

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yazgoo for the bug report and fix! I found another source of unbalance that I didn't see before, but should also be addressed here.

performStandardRebalance();
performStandardRebalance();
assertEquals(3, memberAssignments.size());
memberAssignments.forEach((k, v) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an assertion like this (but generic) to assertBalancedAllocation and saw that there are other unit tests which leave the tasks for individual connectors unbalanced.

For example, testTaskAssignmentWhenLeaderBounces has this allocation at one point:

"worker2" -> "{ connectorIds=[connector2], taskIds=[connector1-1, connector2-0, connector2-3, connector2-2]}"
"worker3" -> "{ connectorIds=[connector1], taskIds=[connector1-2, connector2-1, connector1-0, connector1-3]}"

I think this means that we also need to adjust the allocation strategy with a similar strategy to the one you've implemented for revocation.

Otherwise the scale-up scenario is the only one fixed, and there could be other situations that cause unbalance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will have a look at it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored everything in BalancedIterator and added it to assignTasks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are still some scenarios that would lead to unbalanced per-connector task allocations across the cluster. We may need to perform an additional revocation to get to the state we want.

Demonstration

This is one possible scenario that could force us to perform an additional revocation in order to preserve balance in the allocation of total tasks per worker, and tasks per connector per worker.

Initial state

We start with three workers (W1, W2, and W3) and four connectors. Connector C1 is configured with 1 task, connector C2 is configured with 6 tasks, connector C3 is configured with two 2 tasks, and connector C4 is configured with 1 task.

The exact distribution of tasks across workers is:

W1: C1 C2 C2 C4
W2: C2 C2 C3
W3: C2 C2 C3

RIP W3

Then, worker W3 dies.

Unallocated: C2 C2 C3
W1: C1 C2 C2 C4
W2: C2 C2 C3

We now have to choose how to re-allocate the tasks from W3 across our two remaining workers.

Option 1: Allocate one C3 task to W1, and two C2 tasks to W2

W1: C1 C2 C2 C4 | C3
W2: C2 C2 C3    | C2 C2

This is unbalanced for C2's tasks (2 on W1, 4 on W2)

Option 2: Allocate one C2 task to each worker, and one C3 task to W2

W1: C1 C2 C2 C4 | C2
W2: C2 C2 C3    | C2 C3

This is unbalanced for C3's tasks (0 on W1, 2 on W2)

Option 3: Allocate one C2 task to each worker, and one C3 task to W1

W1: C1 C2 C2 C4 | C2 C3
W2: C2 C2 C3    | C2

This is unbalanced for total task counts (6 on W1, 4 on W2)

Option 4: Revocation + re-allocation

First, we revoke a C1 task from W1:

W1:    C2 C2 C4
W2: C2 C2 C3

Then we allocate one C2 task to each worker, one C3 task to W1, and one C1 task to W2:

W1:    C2 C2 C4 | C2 C2
W2: C2 C2 C3    | C2 C1

This leads to a balanced allocation in total tasks and tasks per connector across workers.

Test case

The following test case can be copy+pasted into the IncrementalCooperativeAssignorTest suite and should demonstrate the above scenario:

@Test
public void testForceUnbalancedPerConnectorTaskAllocation() {
    // Customize assignor for this test case
    time = new MockTime();
    initAssignor();

    final String c1 = "connector1";
    final String c2 = "connector2";
    final String c3 = "connector3";
    final String c4 = "connector4";

    final String w1 = "worker1";
    final String w2 = "worker2";
    final String w3 = "worker3";

    connectors.clear();
    addNewConnector(c1, 1);
    addNewConnector(c2, 6);
    addNewConnector(c3, 2);
    addNewConnector(c4, 1);

    removeWorkers(w1);
    int c1Tasks = -1;
    int c2Tasks = -1;
    int c3Tasks = -1;
    int c4Tasks = -1;
    addNewWorker(
            w1,
            Arrays.asList(c1, c4),
            Arrays.asList(
                    new ConnectorTaskId(c1, ++c1Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c4, ++c4Tasks)
            )
    );
    addNewWorker(
            w2,
            Arrays.asList(c2),
            Arrays.asList(
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c3, ++c3Tasks)
            )
    );
    addNewWorker(
            w3,
            Arrays.asList(c3),
            Arrays.asList(
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c2, ++c2Tasks),
                    new ConnectorTaskId(c3, ++c3Tasks)
            )
    );

    // Initial rebalance: should result in no change in allocation
    performStandardRebalance();
    assertDelay(0);
    assertBalancedAndCompleteAllocation();

    // RIP W3
    // This round should result in no assignments since we need to wait for the delay to expire
    removeWorkers(w3);
    performStandardRebalance();
    assertDelay(rebalanceDelay);

    // After the delay has expired, W3's connectors and total tasks should be reallocated
    // evenly across the cluster
    time.sleep(rebalanceDelay);
    performStandardRebalance();
    assertDelay(0);
    assertBalancedAndCompleteAllocation();

    // However, the allocation of tasks per connector is not even;
    // at least one of the following assertion should fail
    for (String connector : Arrays.asList(c1, c2, c3, c4)) {
        long w1Tasks = memberAssignments.get(w1).tasks().stream()
                .filter(taskId -> connector.equals(taskId.connector()))
                .count();
        long w2Tasks = memberAssignments.get(w2).tasks().stream()
                .filter(taskId -> connector.equals(taskId.connector()))
                .count();
        long allocationDiff = Math.abs(w1Tasks - w2Tasks);
        assertTrue(
                allocationDiff <= 1,
                "Tasks are not allocated evenly across remaining workers for connector " + connector
                        + "; worker 1 was assigned " + w1Tasks + " tasks and worker 2 was assigned " + w2Tasks + " tasks"
        );
    }
}

Conclusion

Even if we do implement the logic to perform the additional revocation for the scenario above, I don't know if it's a good idea. More task revocations will lead to more task downtime and more rebalances (which can cause temporary unavailability for portions of the cluster's REST API). Those are not dealbreakers, but when coupled with the fact that connector loads may still vary greatly within a cluster, it's unclear that this improvement would buy us much, especially in cases where there are more workers than the number of tasks per connector.

@yazgoo @gharris1727 what are your thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks for finding that test case and rebalance situation! I suspected this wouldn't be so easy :)

This is a tradeoff between:

  • Job continuity (not being interrupted while running)
  • Job availability (running instead of being unassigned)
  • Global balance (workers each run approximately the same number of jobs)
  • Local balance (connectors have approximately the same number of jobs on each worker)

The existing algorithm prioritizes job continuity before the rebalance timeout expires, and global balance afterwards. I would probably prefer to follow this pattern for the local balance, and rely on the rebalance timeout to limit the impact on job continuity. I think local balance is a good property to have, especially because we don't have resource-aware-scheduling.

If we were to prioritize job continuity over local balance and not perform these additional revocations, what happens on an upgrade to a cluster with a very poor existing balance? I think if the rolling upgrade was performed within the rebalance timeout, the existing balance could remain after the upgrade.

Perhaps the workers could end up in a state like this:

W1: C1 C3 C4
W2: C2 C2 C2
W3: C2 C2 C2

After W3 dies, but we prioritize global balance + continuity over local balance:

W1: C1 C3 C4 C2 C2
W2: C2 C2 C2 C2

After W3 rejoins, it could end up in the original state, or like this, depending on the connector-name-ordering:

W1: C1 C3 C2
W2: C2 C2 C2
W3: C2 C2 C4

I think W2 will maintain that homogeneous allocation until it is offline for more than the rebalance timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gharris1727 , just wanted to know why do you think Local balance is a good property to have? I think it is slightly hard to predict how the tasks are going to behave but with the global balancing (as described above), we are trying to ensure that all workers run more or less equal number of tasks. We would still guarantee the same with this new algorithm, but considering the heterogeneity of connect tasks and how the load patterns can change over time for the tasks of the same or different connectors, what advantage do we get by achieving Local balance?

I am just asking because from what I have seen, the best assignment in terms of tasks running as of now might not hold true a few hours later if the load patterns change considering we don't have Resource aware scheduling. Just wanted to know your thoughts on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vamossagar12 Local balance is a good property to have in situations where tasks of the same connector have similar resource requirements. For example, if connector A is memory-heavy, it makes sense to try to distribute it around the cluster evenly to avoid exhausting the memory capacity of any one worker.

A locally-balanced cluster should "average" the resource requirements of these heterogeneous connectors.

If the tasks of a connector are very heterogeneous (such as task 0 being high-load and all others being low-load) the local-balance strategy will be ineffective. I don't expect that it will be harmful, but I don't have any evidence for this.

Do you have an alternative property that you think we should use instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gharris1727 , thanks for the explanation. Yes I agree that when the tasks have similar resource requirements, Local balancing might yield better results. Having said that, it might not hold true for all the connectors that are getting deployed on a given connect cluster so we might end up optimising for something which may or mayn't be true for all the connectors. Even I don't think it would be harmful to always opt for Local balancing but what might still end up happening is that resource-heavy tasks from 2 different heterogenous connectors (let's say task-0 of 2 connectors c1 and c2) get scheduled on the same worker which puts load on the worker. This may or mayn't happen with the global balancing strategy.

The main point I am trying to make here is that the benefits of Local balancing v/s Global balancing is hard to quantify and really depends upon the connectors. So if we go ahead and make the changes as suggested in this PR, we might solve the local balancing problem but in theory could lead to imbalances for clusters which need global balancing (connectors running heterogenous tasks). Maybe, we keep both strategies and let users choose based on the kind of connectors they run (might need a KIP) since they are best placed to make that decision based on the connectors they run. Do you think that would be an overkill?

BTW, I am not at all against the changes in the PR. Just that I feel in the absence of any quantifiable way of assigning tasks (like metrics or load usage), we can pivot either way but the overall benefits might get evened out based upon the nature of tasks running and how their behaviour changes over time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even I don't think it would be harmful to always opt for Local balancing but what might still end up happening is that resource-heavy tasks from 2 different heterogenous connectors (let's say task-0 of 2 connectors c1 and c2)
get scheduled on the same worker which puts load on the worker.

I agree that this is problematic, but it isn't a problem that we can currently solve in resource-unaware scheduling. Before we have resource-aware-scheduling, we should take steps to improve the algorithm that we do have.

The main point I am trying to make here is that the benefits of Local balancing v/s Global balancing is hard to quantify and really depends upon the connectors.

It isn't global vs local balancing, it's global + local balancing. Nobody is proposing that we move away from global balancing, that will still be in place. Additionally, the "local balancing" is not a totally new property. If you look at the assignTasks algorithm when run against an empty cluster, such as in testAssignTasksWhenBalanced(), it produces a locally-balanced assignment. The thing addressed here is that after successive generations, the assignment is no longer locally-balanced like it would be if there was just one round of assignments.

Maybe, we keep both strategies and let users choose based on the kind of connectors they run (might need a KIP) since they are best placed to make that decision based on the connectors they run. Do you think that would be an overkill?

Certainly. If we're introducing a KIP to change the rebalance protocol, I would want it to solve the problem holistically instead of just adding a configuration to use a different revocation algorithm. This is a change within the bounds of backwards-compatibility, because the revocation and balance algorithm was never officially specified.

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, @yazgoo. I left a few small stylistic comments that should be easy to address, but I've also noticed there are some test failures in the IncrementalCooperativeAssignorTest suite. You can see them in our CI here; can you take a look at those?


@Override
public E next() {
for (; k < k + this.keys.size(); ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can't this be a while loop instead of a for loop?
  2. Is the condition correct? Technically the loop will terminate eventually, but only if either this.keys is empty, or overflow occurs and the value of k + this.keys.size() becomes negative.

Comment on lines 136 to 147
connectors.clear();
addNewConnector("connector1", 12);
performStandardRebalance();
addNewConnector("connector2", 12);
performStandardRebalance();
addNewEmptyWorkers("worker2");
performStandardRebalance();
performStandardRebalance();
addNewEmptyWorkers("worker3");
performStandardRebalance();
performStandardRebalance();
assertEquals(3, memberAssignments.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try to follow the style in the other tests, where each rebalance is split out into its own separate group of lines, and each has a comment explaining what should take place during that rebalance?

These tests get pretty hard to read if it's just a wall of adding connectors and rebalancing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take the blame for this one, I just spat out a unit test following the end-to-end reproduction case. Sorry @yazgoo that you're cleaning up after me :^)

yazgoo and others added 3 commits July 1, 2024 17:45
…/distributed/IncrementalCooperativeAssignor.java

Co-authored-by: Chris Egerton <[email protected]>
…/distributed/IncrementalCooperativeAssignor.java

Co-authored-by: Chris Egerton <[email protected]>
List::iterator
)
));
this.keys = new ArrayList<>(grouped.keySet());
Copy link
Contributor

@gharris1727 gharris1727 Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for this key set to coincide across different workers? For example, if connector "a" always showed up first in the listing.

If so, then connector "a" would be the first thing revoked from all of the existing workers, and "a" could be concentrated on the new worker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump on this, as I think it's still an issue.

@yazgoo
Copy link
Author

yazgoo commented Jul 8, 2024

  • any idea on why the tests fail ?
  • is this PR ok as a first improvement ?

@C0urante
Copy link
Contributor

C0urante commented Jul 8, 2024

Most of our CI failures are flaky tests that can be ignored, but there are a few failures that are happening because this PR directly causes them. You can find those in the RebalanceSourceConnectorsIntegrationTest and IncrementalCooperativeAssignorTest test suites, and I'd strongly suggest running those test suites locally before pushing another commit, as it's much faster to run tests locally than to wait for CI to do them.

This PR might be acceptable (once the test failures are addressed) as-is, but there's an open question (that I raised here) that would have to be settled on first. Essentially, we might have to do an additional round of revocations if we want to fully address the goal of this PR.

Comment on lines +816 to +818
.map(allocationGrouper)
.distinct()
.collect(Collectors.toList());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gharris1727 does this look better to you ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I don't understand, but I don't think this changed anything. The incoming Collection may still have an over-representation of a single connector first, leading that connector to be preferentially revoked.

For example, consider this situation

W1: C1 C2 C3 C4
W2: C1 C5 C6 C7
W3: C1 C8 C9 C10

If a new worker joins, C1 could be revoked because it appears the same number of times as all of the other connectors, but that would violate local balance later:

W1: C2 C3 C4
W2: C5 C6 C7
W3: C8 C9 C10
W4: C1 C1 C1

The BalancedIterator isn't fairly tie-breaking when two connectors have the same number of jobs assigned to the current worker. Picking a single job to revoke depends on the entire rest of the state, and some degree of predicting how the jobs will be distributed afterwards.

This is what I think the "ideal" state should be after that initial state:

W1: C1 C2 C3
W2: C1 C5 C6
W3: C1 C8 C9
W4: C4 C7 C10

At most one C1 should be revoked overall, because revoking two to put on W4 would break local balance.

Copy link
Author

@yazgoo yazgoo Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification !
The BalancedIterator is only used to group:

  • Collection<ConnectorTaskId> by connector
  • Collection<String> (string beings Connector) by identity

So the structure is not aware of workers.

Copy link

github-actions bot commented Jan 5, 2025

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions bot added the stale Stale PRs label Jan 5, 2025
@seankumar-tl
Copy link

@yazgoo - I just wanted to check if there has been any additional progress here and if you've been able to run your PR on a production environment to solve task distribution based on the connector. We're running into a similar problem where we have some heavy topics and corresponding connectors in a cluster, and over time, as the cluster scales up and down, we end up with a few hot nodes due to an unequal distribution of connectors.

@github-actions github-actions bot removed the stale Stale PRs label Jan 31, 2025
Copy link

github-actions bot commented May 1, 2025

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions bot added the stale Stale PRs label May 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants