Skip to content

Commit 404d248

Browse files
committed
add some protection for consumer npe
1 parent bf129a8 commit 404d248

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

consumer/shard_worker.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,20 @@ func (c *ShardConsumerWorker) callProcess(logGroupList *sls.LogGroupList, plm *s
146146
}
147147
}
148148

149-
func (c *ShardConsumerWorker) processInternal(logGroup *sls.LogGroupList) (rollBackCheckpoint string, err error) {
149+
func (c *ShardConsumerWorker) processInternal(logGroupList *sls.LogGroupList) (rollBackCheckpoint string, err error) {
150150
defer func() {
151151
if r := c.recoverIfPanic("panic in your process function"); r != nil {
152152
err = fmt.Errorf("panic when process: %v", r)
153153
}
154154
}()
155-
156-
return c.processor.Process(c.shardId, logGroup, c.consumerCheckPointTracker)
155+
// now we may have empty content(filter by query), we do this to avoid of npe inside user's processors
156+
if logGroupList == nil {
157+
logGroupList = &sls.LogGroupList{}
158+
}
159+
if logGroupList.LogGroups == nil {
160+
logGroupList.LogGroups = make([]*sls.LogGroup, 0)
161+
}
162+
return c.processor.Process(c.shardId, logGroupList, c.consumerCheckPointTracker)
157163
}
158164

159165
// call user shutdown func and flush checkpoint

0 commit comments

Comments
 (0)