Skip to content

Commit 625be78

Browse files
authored
Added migration guide for KIP-848 (#1483)
2 parents 9a19915 + cc3d3f7 commit 625be78

File tree

2 files changed

+193
-0
lines changed

2 files changed

+193
-0
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@
44

55
This is a feature release:
66

7+
### [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) – General Availability
8+
Starting with __confluent-kafka-go 2.12.0__, the next generation consumer group rebalance protocol defined in **[KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol)** is **production-ready**. Please refer to the following [migration guide](kafka/kip-848-migration-guide.md) for moving from `classic` to `consumer` protocol.
9+
10+
**Note:** The new consumer group protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. `group.protocol` configuration property dictates whether to use the new `consumer` protocol or older `classic` protocol. It defaults to `classic` if not provided.
11+
12+
### Enhancements
713
* OAuth OIDC method example for Kafka metadata based authentication with
814
an Azure IMDS endpoint using an attached managed identity as principal (#1477).
915

kafka/kip-848-migration-guide.md

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
<!-- TODO: Move this guide to kafka package docs present in kafka.go -->
2+
3+
Starting with **confluent-kafka-go 2.12.0** (GA release), the next generation consumer group rebalance protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is **production-ready**.
4+
5+
**Note:** The new consumer group protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is not enabled by default. There are a few contract changes associated with the new protocol and might cause breaking changes. `group.protocol` configuration property dictates whether to use the new `consumer` protocol or older `classic` protocol. It defaults to `classic` if not provided.
6+
7+
# Overview
8+
9+
- **What changed:**
10+
11+
The **Group Leader role** (consumer member) is removed. Assignments are calculated by the **Group Coordinator (broker)** and distributed via **heartbeats**.
12+
13+
- **Requirements:**
14+
15+
- Broker version **4.0.0+**
16+
- confluent-kafka-go version **2.12.0+**: GA (production-ready)
17+
18+
- **Enablement (client-side):**
19+
20+
- `group.protocol=consumer`
21+
- `group.remote.assignor=<assignor>` (optional; broker-controlled if unset; default broker assignor is `uniform`)
22+
23+
# Available Features
24+
25+
All [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) features are supported including:
26+
27+
- Subscription to one or more topics, including **regular expression (regex) subscriptions**
28+
- Rebalance callbacks (**incremental only**)
29+
- Static group membership
30+
- Configurable remote assignor
31+
- Enforced max poll interval
32+
- Upgrade from `classic` protocol or downgrade from `consumer` protocol
33+
- AdminClient changes as per KIP
34+
35+
# Contract Changes
36+
37+
## Client Configuration changes
38+
39+
| Classic Protocol (Deprecated Configs in KIP-848) | KIP-848 / Next-Gen Replacement |
40+
|--------------------------------------------------|-------------------------------------------------------|
41+
| `partition.assignment.strategy` | `group.remote.assignor` |
42+
| `session.timeout.ms` | Broker config: `group.consumer.session.timeout.ms` |
43+
| `heartbeat.interval.ms` | Broker config: `group.consumer.heartbeat.interval.ms` |
44+
| `group.protocol.type` | Not used in the new protocol |
45+
46+
**Note:** The properties listed under “Classic Protocol (Deprecated Configs in KIP-848)” are **no longer used** when using the KIP-848 consumer protocol.
47+
48+
## Rebalance Callback Changes
49+
50+
- The **protocol is fully incremental** in KIP-848.
51+
- In the **rebalance callbacks**, you **must only use** (optional - if not used, client will handle it internally):
52+
- `consumer.IncrementalAssign(e.Partitions)` to assign new partitions
53+
- `consumer.IncrementalUnassign(e.Partitions)` to revoke partitions
54+
- **Do not** use `consumer.Assign()` or `consumer.Unassign()` after subscribing with `group.protocol='consumer'` (KIP-848).
55+
- If you don't call `IncrementalAssign()`/`IncrementalUnassign()` inside rebalance callbacks, the client will automatically use `IncrementalAssign()`/`IncrementalUnassign()` internally.
56+
- ⚠️ The `e.Partitions` list passed to `IncrementalAssign()` and `IncrementalUnassign()` contains only the **incremental changes** — partitions being **added** or **revoked****not the full assignment**, as was the case with `Assign()` in the classic protocol.
57+
- All assignors under KIP-848 are now **sticky**, including `range`, which was **not sticky** in the classic protocol.
58+
59+
## Static Group Membership
60+
61+
- Duplicate `group.instance.id` handling:
62+
- **Newly joining member** is fenced with **ErrUnreleasedInstanceID (fatal)**.
63+
- (Classic protocol fenced the **existing** member instead.)
64+
- Implications:
65+
- Ensure only **one active instance per** `group.instance.id`.
66+
- Consumers must shut down cleanly to avoid blocking replacements until session timeout expires.
67+
68+
## Session Timeout & Fetching
69+
70+
- **Session timeout is broker-controlled**:
71+
- If the Coordinator is unreachable, a consumer **continues fetching messages** but cannot commit offsets.
72+
- Consumer is fenced once a heartbeat response is received from the Coordinator.
73+
- In the classic protocol, the client stopped fetching when session timeout expired.
74+
75+
## Closing / Auto-Commit
76+
77+
- On `Close()` or `Unsubscribe()` with auto-commit enabled:
78+
- Member retries committing offsets until a timeout expires.
79+
- Currently uses the **default remote session timeout**.
80+
- Future **KIP-1092** will allow custom commit timeouts.
81+
82+
## Error Handling Changes
83+
84+
- `ErrUnknownTopicOrPart` (**subscription case**):
85+
- No longer returned if a topic is missing in the **local cache** when subscribing; the subscription proceeds.
86+
- `ErrTopicAuthorizationFailed`:
87+
- Reported once per heartbeat or subscription change, even if only one topic is unauthorized.
88+
89+
## Summary of Key Differences (Classic vs Next-Gen)
90+
91+
- **Assignment:** Classic protocol calculated by **Group Leader (consumer)**; KIP-848 calculated by **Group Coordinator (broker)**
92+
- **Assignors:** Classic range assignor was **not sticky**; KIP-848 assignors are **sticky**, including range
93+
- **Deprecated configs:** Classic client configs are replaced by `group.remote.assignor` and broker-controlled session/heartbeat configs
94+
- **Static membership fencing:** KIP-848 fences **new member** on duplicate `group.instance.id`
95+
- **Session timeout:** Classic: enforced on client; KIP-848: enforced on broker
96+
- **Auto-commit on close:** Classic: stops at client session timeout; KIP-848: retries until remote timeout
97+
- **Unknown topics:** KIP-848 does not return error on subscription if topic is missing
98+
- **Upgrade/Downgrade:** KIP-848 supports upgrade/downgrade from/to `classic` and `consumer` protocols
99+
100+
# Minimal Example Config
101+
102+
## Classic Protocol
103+
104+
``` properties
105+
# Optional; default is 'classic'
106+
group.protocol=classic
107+
108+
partition.assignment.strategy=<range,roundrobin,sticky>
109+
session.timeout.ms=45000
110+
heartbeat.interval.ms=15000
111+
```
112+
113+
## Next-Gen Protocol / KIP-848
114+
115+
``` properties
116+
group.protocol=consumer
117+
118+
# Optional: select a remote assignor
119+
# Valid options currently: 'uniform' or 'range'
120+
# group.remote.assignor=<uniform,range>
121+
# If unset, broker chooses the assignor (default: 'uniform')
122+
123+
# Session & heartbeat now controlled by broker:
124+
# group.consumer.session.timeout.ms
125+
# group.consumer.heartbeat.interval.ms
126+
```
127+
128+
# Rebalance Callback Migration
129+
130+
## Range Assignor (Classic)
131+
132+
```go
133+
// Classic protocol: Full partition list provided on assign
134+
func onRebalanceClassic(consumer *kafka.Consumer, ev kafka.Event) {
135+
switch e := ev.(type) {
136+
case kafka.AssignedPartitions:
137+
fmt.Printf("[Classic] Assigned partitions: %v\n", e.Partitions)
138+
// Optional: client handles assign if not done manually
139+
consumer.Assign(e.Partitions)
140+
141+
case kafka.RevokedPartitions:
142+
fmt.Printf("[Classic] Revoked partitions: %v\n", e.Partitions)
143+
// Optional: client handles unassign if not done manually
144+
consumer.Unassign()
145+
}
146+
}
147+
```
148+
149+
## Incremental Assignor (Including Range in Consumer / KIP-848, Any Protocol)
150+
151+
```go
152+
// Incremental assignor
153+
func onRebalanceCooperative(consumer *kafka.Consumer, ev kafka.Event) {
154+
switch e := ev.(type) {
155+
case kafka.AssignedPartitions:
156+
fmt.Printf("[KIP-848] Incrementally assigning: %v\n", e.Partitions)
157+
// Optional, client handles if omitted
158+
consumer.IncrementalAssign(e.Partitions)
159+
160+
case kafka.RevokedPartitions:
161+
fmt.Printf("[KIP-848] Incrementally revoking: %v\n", e.Partitions)
162+
// Optional, client handles if omitted
163+
consumer.IncrementalUnassign(e.Partitions)
164+
}
165+
}
166+
```
167+
168+
**Note:** The `e.Partitions` list contains **only partitions being added or revoked**, not the full partition list as in the classic `consumer.Assign()`.
169+
170+
# Upgrade and Downgrade
171+
172+
- A group made up entirely of `classic` consumers runs under the classic protocol.
173+
- The group is **upgraded to the consumer protocol** as soon as at least one `consumer` protocol member joins.
174+
- The group is **downgraded back to the classic protocol** if the last `consumer` protocol member leaves while `classic` members remain.
175+
- Both **rolling upgrade** (classic → consumer) and **rolling downgrade** (consumer → classic) are supported.
176+
177+
# Migration Checklist (Next-Gen Protocol / [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol))
178+
179+
1. Upgrade to **confluent-kafka-go ≥ 2.12.0** (GA release)
180+
2. Run against **Kafka brokers ≥ 4.0.0**
181+
3. Set `group.protocol=consumer`
182+
4. Optionally set `group.remote.assignor`; leave unspecified for broker-controlled (default: `uniform`), valid options: `uniform` or `range`
183+
5. Replace deprecated configs with new ones
184+
6. Update rebalance callbacks to **incremental APIs only** (if used)
185+
7. Review static membership handling (`group.instance.id`)
186+
8. Ensure proper shutdown to avoid fencing issues
187+
9. Adjust error handling for unknown topics and authorization failures

0 commit comments

Comments
 (0)