Skip to content

Commit 5afaf42

Browse files
authored
Stream Consumer Group - Shard retention mechanism (#125)
1 parent ca33b7b commit 5afaf42

File tree

10 files changed

+519
-22
lines changed

10 files changed

+519
-22
lines changed

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
2828
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
2929
github.com/rs/xid v1.1.0 h1:9Z322kTPrDR5GpxTH+1yl7As6tEHIH9aGsRccl20ELk=
3030
github.com/rs/xid v1.1.0/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
31+
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
3132
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
3233
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
3334
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=

pkg/dataplane/streamconsumergroup/claim.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func newClaim(member *member, shardID int) (*claim, error) {
4343
}
4444

4545
func (c *claim) start() error {
46-
c.logger.DebugWith("Starting claim")
46+
c.logger.DebugWith("Starting claim", "shardID", c.shardID)
4747

4848
go func() {
4949
err := c.fetchRecordBatches(c.stopRecordBatchFetchChan,

pkg/dataplane/streamconsumergroup/member.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type member struct {
1616
sequenceNumberHandler *sequenceNumberHandler
1717
handler Handler
1818
session Session
19+
retainShards bool
20+
shardGroupToRetain []int
1921
}
2022

2123
func NewMember(streamConsumerGroupInterface StreamConsumerGroup, name string) (Member, error) {
@@ -41,21 +43,14 @@ func NewMember(streamConsumerGroupInterface StreamConsumerGroup, name string) (M
4143
return nil, errors.Wrap(err, "Failed creating stream consumer group state handler")
4244
}
4345

44-
err = newMember.stateHandler.start()
45-
if err != nil {
46-
return nil, errors.Wrap(err, "Failed starting stream consumer group state handler")
47-
}
48-
49-
// create & start an location handler for the stream
46+
// create & start a location handler for the stream
5047
newMember.sequenceNumberHandler, err = newSequenceNumberHandler(&newMember)
5148
if err != nil {
5249
return nil, errors.Wrap(err, "Failed creating stream consumer group location handler")
5350
}
5451

55-
// if there's no member name, just observe
56-
err = newMember.sequenceNumberHandler.start()
57-
if err != nil {
58-
return nil, errors.Wrap(err, "Failed starting stream consumer group state handler")
52+
if err := newMember.Start(); err != nil {
53+
return nil, errors.Wrap(err, "Failed starting new member")
5954
}
6055

6156
return &newMember, nil
@@ -100,3 +95,27 @@ func (m *member) Close() error {
10095

10196
return nil
10297
}
98+
99+
func (m *member) Start() error {
100+
if err := m.stateHandler.start(); err != nil {
101+
return errors.Wrap(err, "Failed starting stream consumer group state handler")
102+
}
103+
104+
if err := m.sequenceNumberHandler.start(); err != nil {
105+
return errors.Wrap(err, "Failed starting stream consumer group state handler")
106+
}
107+
108+
return nil
109+
}
110+
111+
func (m *member) GetID() string {
112+
return m.id
113+
}
114+
115+
func (m *member) GetRetainShardFlag() bool {
116+
return m.retainShards
117+
}
118+
119+
func (m *member) GetShardsToRetain() []int {
120+
return m.shardGroupToRetain
121+
}

pkg/dataplane/streamconsumergroup/session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func newSession(member *member,
2525
}
2626

2727
func (s *session) start() error {
28-
s.logger.DebugWith("Starting session")
28+
s.logger.DebugWith("Starting session", "shards", s.state.Shards)
2929

3030
// for each shard we need handle, create a StreamConsumerGroupClaim object and start it
3131
for _, shardID := range s.state.Shards {

pkg/dataplane/streamconsumergroup/statehandler.go

Lines changed: 84 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ import (
1212

1313
const stateContentsAttributeKey string = "state"
1414

15-
var errNoFreeShardGroups = errors.New("No free shard groups")
15+
var (
16+
errNoFreeShardGroups = errors.New("No free shard groups")
17+
errShardRetention = errors.New("Could not retain shard group")
18+
)
1619

1720
type stateHandler struct {
1821
logger logger.Logger
@@ -33,8 +36,16 @@ func newStateHandler(member *member) (*stateHandler, error) {
3336
func (sh *stateHandler) start() error {
3437

3538
// stops on stop()
36-
go sh.refreshStatePeriodically()
39+
go func() {
40+
if err := sh.refreshStatePeriodically(); err != nil {
41+
if errors.RootCause(err) == errShardRetention {
3742

43+
// signal that the Handler needs to be restarted
44+
sh.logger.ErrorWith("Aborting member", "memberID", sh.member.id)
45+
sh.member.handler.Abort(sh.member.session) // nolint: errcheck
46+
}
47+
}
48+
}()
3849
return nil
3950
}
4051

@@ -77,7 +88,7 @@ func (sh *stateHandler) getSessionState(state *State, memberID string) (*Session
7788
return nil, errors.Errorf("Member state not found: %s", memberID)
7889
}
7990

80-
func (sh *stateHandler) refreshStatePeriodically() {
91+
func (sh *stateHandler) refreshStatePeriodically() error {
8192
var err error
8293

8394
// guaranteed to only be REPLACED by a new instance - not edited. as such, once this is initialized
@@ -94,6 +105,13 @@ func (sh *stateHandler) refreshStatePeriodically() {
94105
} else {
95106
lastState, err = sh.refreshState()
96107
if err != nil {
108+
109+
// in case of shard retention error we want to signal the member to restart
110+
if errors.RootCause(err) == errShardRetention {
111+
sh.logger.WarnWith("Failed getting state on shard retention (requested by member)",
112+
"err", errors.GetErrorStackString(err, 10))
113+
return errors.Wrap(err, "Failed refreshing state by demand")
114+
}
97115
sh.logger.WarnWith("Failed getting state", "err", errors.GetErrorStackString(err, 10))
98116
}
99117

@@ -105,14 +123,21 @@ func (sh *stateHandler) refreshStatePeriodically() {
105123
case <-time.After(sh.member.streamConsumerGroup.config.Session.HeartbeatInterval):
106124
lastState, err = sh.refreshState()
107125
if err != nil {
126+
127+
// in case of shard retention error we want to signal the member to restart
128+
if errors.RootCause(err) == errShardRetention {
129+
sh.logger.WarnWith("Failed getting state on shard retention (periodic refresh)",
130+
"err", errors.GetErrorStackString(err, 10))
131+
return errors.Wrap(err, "Failed refreshing state periodically")
132+
}
108133
sh.logger.WarnWith("Failed refreshing state", "err", errors.GetErrorStackString(err, 10))
109134
continue
110135
}
111136

112137
// if we're told to stop, exit the loop
113138
case <-sh.stopChan:
114139
sh.logger.Debug("Stopping")
115-
return
140+
return nil
116141
}
117142
}
118143
}
@@ -142,6 +167,14 @@ func (sh *stateHandler) refreshState() (*State, error) {
142167
}
143168

144169
return state, nil
170+
171+
}, func() error {
172+
173+
// set retainShards flag to true only after the new state has been saved in persistency
174+
// (meaning the shards have been assigned successfully)
175+
sh.member.retainShards = true
176+
177+
return nil
145178
})
146179
}
147180

@@ -150,16 +183,42 @@ func (sh *stateHandler) createSessionState(state *State) error {
150183
state.SessionStates = []*SessionState{}
151184
}
152185

153-
// assign shards
154-
shards, err := sh.assignShards(sh.member.streamConsumerGroup.maxReplicas, sh.member.streamConsumerGroup.totalNumShards, state)
155-
if err != nil {
156-
return errors.Wrap(err, "Failed resolving shards for session")
186+
var shards []int
187+
var err error
188+
189+
if sh.member.retainShards {
190+
191+
// try to retain the originally assigned shard group
192+
shards, err = sh.retainShards(sh.member.shardGroupToRetain, sh.member.id, state)
193+
194+
// shards were "stolen" - set retainShards flag to false and commit suicide
195+
if err != nil {
196+
sh.logger.ErrorWith("Failed to retain shards",
197+
"memberID", sh.member.id,
198+
"shardsToRetain", sh.member.shardGroupToRetain,
199+
"state", state,
200+
"error", err.Error())
201+
sh.member.retainShards = false
202+
return err
203+
}
204+
} else {
205+
206+
// assign shards
207+
shards, err = sh.assignShards(sh.member.streamConsumerGroup.maxReplicas,
208+
sh.member.streamConsumerGroup.totalNumShards,
209+
state)
210+
if err != nil {
211+
return errors.Wrap(err, "Failed resolving shards for session")
212+
}
157213
}
158214

159215
sh.logger.DebugWith("Assigned shards",
160216
"shards", shards,
161217
"state", state)
162218

219+
// save shards to retain on the member itself
220+
sh.member.shardGroupToRetain = shards
221+
163222
state.SessionStates = append(state.SessionStates, &SessionState{
164223
MemberID: sh.member.id,
165224
LastHeartbeat: time.Now(),
@@ -209,6 +268,23 @@ func (sh *stateHandler) assignShards(maxReplicas int, numShards int, state *Stat
209268
return nil, errNoFreeShardGroups
210269
}
211270

271+
func (sh *stateHandler) retainShards(memberShardGroup []int, memberID string, state *State) ([]int, error) {
272+
273+
for _, sessionState := range state.SessionStates {
274+
if common.IntSlicesEqual(memberShardGroup, sessionState.Shards) {
275+
if sessionState.MemberID == memberID {
276+
return memberShardGroup, nil
277+
}
278+
279+
// original shard group was taken
280+
return nil, errShardRetention
281+
}
282+
}
283+
284+
// shard group to retain is not taken by any member - original member can retain it
285+
return memberShardGroup, nil
286+
}
287+
212288
func (sh *stateHandler) getReplicaShardGroups(maxReplicas int, numShards int) ([][]int, error) {
213289
var replicaShardGroups [][]int
214290
shards := common.MakeRange(0, numShards)

pkg/dataplane/streamconsumergroup/statehandler_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package streamconsumergroup
22

33
import (
4+
"strconv"
45
"testing"
56

67
"github.com/stretchr/testify/suite"
@@ -77,6 +78,58 @@ func (suite *stateHandlerSuite) TestAssignShards() {
7778
}
7879
}
7980

81+
func (suite *stateHandlerSuite) TestRetainShards() {
82+
for _, testCase := range []struct {
83+
name string
84+
memberID string
85+
existingShardGroups [][]int
86+
expectedShardGroup []int
87+
expectedError bool
88+
}{
89+
{
90+
name: "successfulRetention",
91+
memberID: "1",
92+
existingShardGroups: [][]int{{0, 1}, {2, 3}},
93+
expectedShardGroup: []int{2, 3},
94+
expectedError: false,
95+
},
96+
{
97+
name: "failedRetention",
98+
memberID: "2",
99+
existingShardGroups: [][]int{{0, 1}, {2, 3}, {4, 5}},
100+
expectedShardGroup: []int{0, 1},
101+
expectedError: true,
102+
},
103+
{
104+
name: "unexpectedBehaviour",
105+
memberID: "0",
106+
existingShardGroups: [][]int{{0, 1}, {2, 3}},
107+
expectedShardGroup: []int{4, 5},
108+
expectedError: true,
109+
},
110+
} {
111+
suite.Run(testCase.name, func() {
112+
113+
// make state from shard groups
114+
state := State{}
115+
for i, existingShardGroup := range testCase.existingShardGroups {
116+
state.SessionStates = append(state.SessionStates, &SessionState{
117+
Shards: existingShardGroup,
118+
MemberID: strconv.Itoa(i),
119+
})
120+
}
121+
122+
shards, err := suite.stateHandler.retainShards(testCase.expectedShardGroup, testCase.memberID, &state)
123+
if testCase.expectedError {
124+
suite.Require().Error(err)
125+
} else {
126+
suite.Require().NoError(err)
127+
suite.Require().ElementsMatch(testCase.expectedShardGroup, shards)
128+
}
129+
})
130+
}
131+
}
132+
80133
func TestBinaryTestSuite(t *testing.T) {
81134
suite.Run(t, new(stateHandlerSuite))
82135
}

pkg/dataplane/streamconsumergroup/streamconsumergroup.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ func (scg *streamConsumerGroup) getTotalNumberOfShards() (int, error) {
8686
return response.Output.(*v3io.DescribeStreamOutput).ShardCount, nil
8787
}
8888

89-
func (scg *streamConsumerGroup) setState(modifier stateModifier) (*State, error) {
89+
func (scg *streamConsumerGroup) setState(modifier stateModifier,
90+
handlePostSetStateInPersistency postSetStateInPersistencyHandler) (*State, error) {
9091
var previousState, modifiedState *State
9192

9293
backoff := scg.config.State.ModifyRetry.Backoff
@@ -110,6 +111,11 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier) (*State, error)
110111

111112
modifiedState, err = modifier(state)
112113
if err != nil {
114+
if errors.RootCause(err) == errShardRetention {
115+
116+
// if shard retention failed the member needs to be aborted, so we can stop retrying
117+
return false, errors.Wrap(err, "Failed modifying state")
118+
}
113119
return true, errors.Wrap(err, "Failed modifying state")
114120
}
115121

@@ -131,6 +137,10 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier) (*State, error)
131137
return true, errors.Wrap(err, "Failed setting state in persistency state")
132138
}
133139

140+
if err := handlePostSetStateInPersistency(); err != nil {
141+
return false, errors.Wrap(err, "Failed handling post set state in persistency")
142+
}
143+
134144
return false, nil
135145
})
136146

pkg/dataplane/streamconsumergroup/types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88

99
type stateModifier func(*State) (*State, error)
1010

11+
type postSetStateInPersistencyHandler func() error
12+
1113
type SessionState struct {
1214
MemberID string `json:"member_id"`
1315
LastHeartbeat time.Time `json:"last_heartbeat_time"`
@@ -27,6 +29,9 @@ type Handler interface {
2729
// Once the Messages() channel is closed, the Handler must finish its processing
2830
// loop and exit.
2931
ConsumeClaim(Session, Claim) error
32+
33+
// Abort signals the handler to start abort procedure
34+
Abort(Session) error
3035
}
3136

3237
type RecordBatch struct {
@@ -45,6 +50,10 @@ type StreamConsumerGroup interface {
4550
type Member interface {
4651
Consume(Handler) error
4752
Close() error
53+
Start() error
54+
GetID() string
55+
GetRetainShardFlag() bool
56+
GetShardsToRetain() []int
4857
}
4958

5059
type Session interface {

0 commit comments

Comments
 (0)