Skip to content

Commit 86cd450

Browse files
authored
Refactored consumer group interface (#51)
1. StreamConsumerGroup is now a stateless object with which all access to the persistency is performed (can be used to retreive information about a consumer group) 2. All "stateful" components of the StreamConsumerGroup moved to the "Member" - a new object introduced that holds the sequence number handler, state handler, etc
1 parent 85ac1ab commit 86cd450

File tree

9 files changed

+483
-397
lines changed

9 files changed

+483
-397
lines changed

pkg/dataplane/http/context.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ type context struct {
4141
}
4242

4343
type NewClientInput struct {
44-
TlsConfig *tls.Config
44+
TLSConfig *tls.Config
4545
DialTimeout time.Duration
4646
MaxConnsPerHost int
4747
}
4848

4949
func NewClient(newClientInput *NewClientInput) *fasthttp.Client {
50-
tlsConfig := newClientInput.TlsConfig
50+
tlsConfig := newClientInput.TLSConfig
5151
if tlsConfig == nil {
5252
tlsConfig = &tls.Config{InsecureSkipVerify: true}
5353
}

pkg/dataplane/streamconsumergroup/claim.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,19 @@ import (
1515

1616
type claim struct {
1717
logger logger.Logger
18-
streamConsumerGroup *streamConsumerGroup
18+
member *member
1919
shardID int
2020
recordBatchChan chan *RecordBatch
2121
stopRecordBatchFetchChan chan struct{}
2222
currentShardLocation string
2323
}
2424

25-
func newClaim(streamConsumerGroup *streamConsumerGroup, shardID int) (*claim, error) {
25+
func newClaim(member *member, shardID int) (*claim, error) {
2626
return &claim{
27-
logger: streamConsumerGroup.logger.GetChild(fmt.Sprintf("claim-%d", shardID)),
28-
streamConsumerGroup: streamConsumerGroup,
27+
logger: member.streamConsumerGroup.logger.GetChild(fmt.Sprintf("claim-%d", shardID)),
28+
member: member,
2929
shardID: shardID,
30-
recordBatchChan: make(chan *RecordBatch, streamConsumerGroup.config.Claim.RecordBatchChanSize),
30+
recordBatchChan: make(chan *RecordBatch, member.streamConsumerGroup.config.Claim.RecordBatchChanSize),
3131
stopRecordBatchFetchChan: make(chan struct{}, 1),
3232
}, nil
3333
}
@@ -37,7 +37,7 @@ func (c *claim) start() error {
3737

3838
go func() {
3939
err := c.fetchRecordBatches(c.stopRecordBatchFetchChan,
40-
c.streamConsumerGroup.config.Claim.RecordBatchFetch.Interval)
40+
c.member.streamConsumerGroup.config.Claim.RecordBatchFetch.Interval)
4141

4242
if err != nil {
4343
c.logger.WarnWith("Failed to fetch record batches", "err", errors.GetErrorStackString(err, 10))
@@ -48,7 +48,7 @@ func (c *claim) start() error {
4848

4949
// tell the consumer group handler to consume the claim
5050
c.logger.DebugWith("Calling ConsumeClaim on handler")
51-
if err := c.streamConsumerGroup.handler.ConsumeClaim(c.streamConsumerGroup.session, c); err != nil {
51+
if err := c.member.handler.ConsumeClaim(c.member.session, c); err != nil {
5252
c.logger.WarnWith("ConsumeClaim returned with error", "err", errors.GetErrorStackString(err, 10))
5353
}
5454

@@ -73,7 +73,7 @@ func (c *claim) stop() error {
7373
}
7474

7575
func (c *claim) GetStreamPath() string {
76-
return c.streamConsumerGroup.streamPath
76+
return c.member.streamConsumerGroup.streamPath
7777
}
7878

7979
func (c *claim) GetShardID() int {
@@ -120,12 +120,12 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time
120120

121121
func (c *claim) fetchRecordBatch(location string) (string, error) {
122122
getRecordsInput := v3io.GetRecordsInput{
123-
Path: path.Join(c.streamConsumerGroup.streamPath, strconv.Itoa(c.shardID)),
123+
Path: path.Join(c.member.streamConsumerGroup.streamPath, strconv.Itoa(c.shardID)),
124124
Location: location,
125-
Limit: c.streamConsumerGroup.config.Claim.RecordBatchFetch.NumRecordsInBatch,
125+
Limit: c.member.streamConsumerGroup.config.Claim.RecordBatchFetch.NumRecordsInBatch,
126126
}
127127

128-
response, err := c.streamConsumerGroup.container.GetRecordsSync(&getRecordsInput)
128+
response, err := c.member.streamConsumerGroup.container.GetRecordsSync(&getRecordsInput)
129129
if err != nil {
130130
return "", errors.Wrapf(err, "Failed fetching record batch: %s", location)
131131
}
@@ -168,23 +168,23 @@ func (c *claim) fetchRecordBatch(location string) (string, error) {
168168
func (c *claim) getCurrentShardLocation(shardID int) (string, error) {
169169

170170
// get the location from persistency
171-
currentShardLocation, err := c.streamConsumerGroup.sequenceNumberHandler.getShardLocationFromPersistency(shardID)
172-
if err != nil && errors.RootCause(err) != errShardNotFound {
171+
currentShardLocation, err := c.member.streamConsumerGroup.getShardLocationFromPersistency(shardID)
172+
if err != nil && errors.RootCause(err) != ErrShardNotFound {
173173
return "", errors.Wrap(err, "Failed to get shard location")
174174
}
175175

176176
// if shard wasn't found, try again periodically
177-
if errors.RootCause(err) == errShardNotFound {
177+
if errors.RootCause(err) == ErrShardNotFound {
178178
for {
179179
select {
180180

181181
// TODO: from configuration
182-
case <-time.After(c.streamConsumerGroup.config.SequenceNumber.ShardWaitInterval):
182+
case <-time.After(c.member.streamConsumerGroup.config.SequenceNumber.ShardWaitInterval):
183183

184184
// get the location from persistency
185-
currentShardLocation, err = c.streamConsumerGroup.sequenceNumberHandler.getShardLocationFromPersistency(shardID)
185+
currentShardLocation, err = c.member.streamConsumerGroup.getShardLocationFromPersistency(shardID)
186186
if err != nil {
187-
if errors.RootCause(err) == errShardNotFound {
187+
if errors.RootCause(err) == ErrShardNotFound {
188188

189189
// shard doesn't exist yet, try again
190190
continue
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package streamconsumergroup
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/nuclio/errors"
7+
"github.com/nuclio/logger"
8+
"github.com/rs/xid"
9+
)
10+
11+
type member struct {
12+
id string
13+
logger logger.Logger
14+
streamConsumerGroup *streamConsumerGroup
15+
stateHandler *stateHandler
16+
sequenceNumberHandler *sequenceNumberHandler
17+
handler Handler
18+
session Session
19+
}
20+
21+
func NewMember(streamConsumerGroupInterface StreamConsumerGroup, name string) (Member, error) {
22+
var err error
23+
24+
streamConsumerGroupInstance, ok := streamConsumerGroupInterface.(*streamConsumerGroup)
25+
if !ok {
26+
return nil, errors.Errorf("Expected streamConsumerGroupInterface of type streamConsumerGroup, got %T", streamConsumerGroupInterface)
27+
}
28+
29+
// add uniqueness
30+
id := fmt.Sprintf("%s-%s", name, xid.New().String())
31+
32+
newMember := member{
33+
logger: streamConsumerGroupInstance.logger.GetChild(id),
34+
id: id,
35+
streamConsumerGroup: streamConsumerGroupInstance,
36+
}
37+
38+
// create & start a state handler for the stream
39+
newMember.stateHandler, err = newStateHandler(&newMember)
40+
if err != nil {
41+
return nil, errors.Wrap(err, "Failed creating stream consumer group state handler")
42+
}
43+
44+
err = newMember.stateHandler.start()
45+
if err != nil {
46+
return nil, errors.Wrap(err, "Failed starting stream consumer group state handler")
47+
}
48+
49+
// create & start an location handler for the stream
50+
newMember.sequenceNumberHandler, err = newSequenceNumberHandler(&newMember)
51+
if err != nil {
52+
return nil, errors.Wrap(err, "Failed creating stream consumer group location handler")
53+
}
54+
55+
// if there's no member name, just observe
56+
err = newMember.sequenceNumberHandler.start()
57+
if err != nil {
58+
return nil, errors.Wrap(err, "Failed starting stream consumer group state handler")
59+
}
60+
61+
return &newMember, nil
62+
}
63+
64+
func (m *member) Consume(handler Handler) error {
65+
m.logger.DebugWith("Starting consumption of consumer group")
66+
67+
m.handler = handler
68+
69+
// get the state (holding our shards)
70+
sessionState, err := m.stateHandler.getOrCreateSessionState(m.id)
71+
if err != nil {
72+
return errors.Wrap(err, "Failed getting stream consumer group member state")
73+
}
74+
75+
// create a session object from our state
76+
m.session, err = newSession(m, sessionState)
77+
if err != nil {
78+
return errors.Wrap(err, "Failed creating stream consumer group session")
79+
}
80+
81+
// start it
82+
return m.session.start()
83+
}
84+
85+
func (m *member) Close() error {
86+
m.logger.DebugWith("Closing consumer group")
87+
88+
if err := m.stateHandler.stop(); err != nil {
89+
return errors.Wrapf(err, "Failed stopping state handler")
90+
}
91+
if err := m.sequenceNumberHandler.stop(); err != nil {
92+
return errors.Wrapf(err, "Failed stopping location handler")
93+
}
94+
95+
if m.session != nil {
96+
if err := m.session.stop(); err != nil {
97+
return errors.Wrap(err, "Failed stopping member session")
98+
}
99+
}
100+
101+
return nil
102+
}

pkg/dataplane/streamconsumergroup/sequencenumberhandler.go

Lines changed: 9 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,33 @@
11
package streamconsumergroup
22

33
import (
4-
"fmt"
5-
"net/http"
64
"sync"
75
"time"
86

97
"github.com/v3io/v3io-go/pkg/common"
10-
"github.com/v3io/v3io-go/pkg/dataplane"
11-
"github.com/v3io/v3io-go/pkg/errors"
128

139
"github.com/nuclio/errors"
1410
"github.com/nuclio/logger"
1511
)
1612

17-
var errShardNotFound = errors.New("Shard not found")
18-
var errShardSequenceNumberAttributeNotFound = errors.New("Shard sequenceNumber attribute")
13+
var ErrShardNotFound = errors.New("Shard not found")
14+
var ErrShardSequenceNumberAttributeNotFound = errors.New("Shard sequenceNumber attribute")
1915

2016
type sequenceNumberHandler struct {
2117
logger logger.Logger
22-
streamConsumerGroup *streamConsumerGroup
18+
member *member
2319
markedShardSequenceNumbers []uint64
2420
markedShardSequenceNumbersLock sync.RWMutex
2521
stopMarkedShardSequenceNumberCommitterChan chan struct{}
2622
lastCommittedShardSequenceNumbers []uint64
2723
}
2824

29-
func newSequenceNumberHandler(streamConsumerGroup *streamConsumerGroup) (*sequenceNumberHandler, error) {
25+
func newSequenceNumberHandler(member *member) (*sequenceNumberHandler, error) {
3026

3127
return &sequenceNumberHandler{
32-
logger: streamConsumerGroup.logger.GetChild("sequenceNumberHandler"),
33-
streamConsumerGroup: streamConsumerGroup,
34-
markedShardSequenceNumbers: make([]uint64, streamConsumerGroup.totalNumShards),
28+
logger: member.logger.GetChild("sequenceNumberHandler"),
29+
member: member,
30+
markedShardSequenceNumbers: make([]uint64, member.streamConsumerGroup.totalNumShards),
3531
stopMarkedShardSequenceNumberCommitterChan: make(chan struct{}, 1),
3632
}, nil
3733
}
@@ -40,7 +36,7 @@ func (snh *sequenceNumberHandler) start() error {
4036
snh.logger.DebugWith("Starting sequenceNumber handler")
4137

4238
// stopped on stop()
43-
go snh.markedShardSequenceNumbersCommitter(snh.streamConsumerGroup.config.SequenceNumber.CommitInterval,
39+
go snh.markedShardSequenceNumbersCommitter(snh.member.streamConsumerGroup.config.SequenceNumber.CommitInterval,
4440
snh.stopMarkedShardSequenceNumberCommitterChan)
4541

4642
return nil
@@ -68,112 +64,6 @@ func (snh *sequenceNumberHandler) markShardSequenceNumber(shardID int, sequenceN
6864
return nil
6965
}
7066

71-
func (snh *sequenceNumberHandler) getShardLocationFromPersistency(shardID int) (string, error) {
72-
snh.logger.DebugWith("Getting shard sequenceNumber from persistency", "shardID", shardID)
73-
74-
shardPath, err := snh.streamConsumerGroup.getShardPath(shardID)
75-
if err != nil {
76-
return "", errors.Wrapf(err, "Failed getting shard path: %v", shardID)
77-
}
78-
79-
seekShardInput := v3io.SeekShardInput{
80-
Path: shardPath,
81-
}
82-
83-
// get the shard sequenceNumber from the item
84-
shardSequenceNumber, err := snh.getShardSequenceNumberFromItemAttributes(shardPath)
85-
if err != nil {
86-
87-
// if the error is that the attribute wasn't found, but the shard was found - seek the shard
88-
// according to the configuration
89-
if err != errShardSequenceNumberAttributeNotFound {
90-
return "", errors.Wrap(err, "Failed to get shard sequenceNumber from item attributes")
91-
}
92-
93-
seekShardInput.Type = snh.streamConsumerGroup.config.Claim.RecordBatchFetch.InitialLocation
94-
} else {
95-
96-
// use sequence number
97-
seekShardInput.Type = v3io.SeekShardInputTypeSequence
98-
seekShardInput.StartingSequenceNumber = shardSequenceNumber + 1
99-
}
100-
101-
return snh.getShardLocationWithSeek(shardPath, &seekShardInput)
102-
}
103-
104-
// returns the sequenceNumber, an error re: the shard itself and an error re: the attribute in the shard
105-
func (snh *sequenceNumberHandler) getShardSequenceNumberFromItemAttributes(shardPath string) (uint64, error) {
106-
response, err := snh.streamConsumerGroup.container.GetItemSync(&v3io.GetItemInput{
107-
Path: shardPath,
108-
AttributeNames: []string{snh.getShardCommittedSequenceNumberAttributeName()},
109-
})
110-
111-
if err != nil {
112-
errWithStatusCode, errHasStatusCode := err.(v3ioerrors.ErrorWithStatusCode)
113-
if !errHasStatusCode {
114-
return 0, errors.Wrap(err, "Got error without status code")
115-
}
116-
117-
if errWithStatusCode.StatusCode() != http.StatusNotFound {
118-
return 0, errors.Wrap(err, "Failed getting shard item")
119-
}
120-
121-
// TODO: remove after errors.Is support added
122-
snh.logger.DebugWith("Could not find shard, probably doesn't exist yet", "path", shardPath)
123-
124-
return 0, errShardNotFound
125-
}
126-
127-
defer response.Release()
128-
129-
getItemOutput := response.Output.(*v3io.GetItemOutput)
130-
131-
// return the attribute name
132-
sequenceNumber, err := getItemOutput.Item.GetFieldUint64(snh.getShardCommittedSequenceNumberAttributeName())
133-
if err != nil && err == v3ioerrors.ErrNotFound {
134-
return 0, errShardSequenceNumberAttributeNotFound
135-
}
136-
137-
// return the sequenceNumber we found
138-
return sequenceNumber, nil
139-
}
140-
141-
func (snh *sequenceNumberHandler) getShardLocationWithSeek(shardPath string, seekShardInput *v3io.SeekShardInput) (string, error) {
142-
143-
snh.logger.DebugWith("Seeking shard", "shardPath", shardPath, "seekShardInput", seekShardInput)
144-
145-
response, err := snh.streamConsumerGroup.container.SeekShardSync(seekShardInput)
146-
if err != nil {
147-
return "", errors.Wrap(err, "Failed to seek shard")
148-
}
149-
defer response.Release()
150-
151-
location := response.Output.(*v3io.SeekShardOutput).Location
152-
153-
snh.logger.DebugWith("Seek shard succeeded", "shardPath", shardPath, "location", location)
154-
155-
return location, nil
156-
}
157-
158-
func (snh *sequenceNumberHandler) getShardCommittedSequenceNumberAttributeName() string {
159-
return fmt.Sprintf("__%s_committed_sequence_number", snh.streamConsumerGroup.name)
160-
}
161-
162-
func (snh *sequenceNumberHandler) setShardSequenceNumberInPersistency(shardID int, sequenceNumber uint64) error {
163-
snh.logger.DebugWith("Setting shard sequenceNumber in persistency", "shardID", shardID, "sequenceNumber", sequenceNumber)
164-
shardPath, err := snh.streamConsumerGroup.getShardPath(shardID)
165-
if err != nil {
166-
return errors.Wrapf(err, "Failed getting shard path: %v", shardID)
167-
}
168-
169-
return snh.streamConsumerGroup.container.UpdateItemSync(&v3io.UpdateItemInput{
170-
Path: shardPath,
171-
Attributes: map[string]interface{}{
172-
snh.getShardCommittedSequenceNumberAttributeName(): sequenceNumber,
173-
},
174-
})
175-
}
176-
17767
func (snh *sequenceNumberHandler) markedShardSequenceNumbersCommitter(interval time.Duration, stopChan chan struct{}) {
17868
for {
17969
select {
@@ -218,7 +108,7 @@ func (snh *sequenceNumberHandler) commitMarkedShardSequenceNumbers() error {
218108
continue
219109
}
220110

221-
if err := snh.setShardSequenceNumberInPersistency(shardID, sequenceNumber); err != nil {
111+
if err := snh.member.streamConsumerGroup.setShardSequenceNumberInPersistency(shardID, sequenceNumber); err != nil {
222112
snh.logger.WarnWith("Failed committing shard sequenceNumber", "shardID", shardID,
223113
"sequenceNumber", sequenceNumber,
224114
"err", errors.GetErrorStackString(err, 10))

0 commit comments

Comments
 (0)