Skip to content

Commit f5494e1

Browse files
authored
Fix potential panics caused by an invalid regexp by returning an error (#53)
- Fix `streaming.NewStream` and `streaming.NewSink` to error instead of panicking when an invalid regexp is passed in through `options.WithReaderTopicPattern` or `options.WithSinkTopicPattern`. - Move usage of `regexp.MustParse` in `testing.CleanupRedis` to a global.
1 parent d79f24b commit f5494e1

File tree

6 files changed

+30
-13
lines changed

6 files changed

+30
-13
lines changed

streaming/reader.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,16 @@ type (
8686
)
8787

8888
// newReader creates a new reader.
89-
func newReader(ctx context.Context, stream *Stream, opts ...options.Reader) (*Reader, error) {
89+
func newReader(stream *Stream, opts ...options.Reader) (*Reader, error) {
9090
o := options.ParseReaderOptions(opts...)
9191
var eventFilter eventFilterFunc
9292
if o.Topic != "" {
9393
eventFilter = func(e *Event) bool { return e.Topic == o.Topic }
9494
} else if o.TopicPattern != "" {
95-
topicPatternRegexp := regexp.MustCompile(o.TopicPattern)
95+
topicPatternRegexp, err := regexp.Compile(o.TopicPattern)
96+
if err != nil {
97+
return nil, fmt.Errorf("topic pattern must be a valid regex: %w", err)
98+
}
9699
eventFilter = func(e *Event) bool { return topicPatternRegexp.MatchString(e.Topic) }
97100
}
98101

streaming/reader_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ func TestNewReader(t *testing.T) {
2424
assert.NoError(t, err)
2525
reader, err := s.NewReader(ctx, options.WithReaderBlockDuration(testBlockDuration))
2626
assert.NoError(t, err)
27-
assert.NotNil(t, reader)
28-
defer cleanupReader(t, ctx, s, reader)
27+
if assert.NotNil(t, reader) {
28+
defer cleanupReader(t, ctx, s, reader)
29+
}
30+
31+
_, err = s.NewReader(ctx, options.WithReaderTopicPattern("("))
32+
assert.EqualError(t, err, "topic pattern must be a valid regex: error parsing regexp: missing closing ): `(`")
2933
}
3034

3135
func TestReaderReadOnce(t *testing.T) {

streaming/sink.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,10 @@ func newSink(ctx context.Context, name string, stream *Stream, opts ...options.S
115115
if o.Topic != "" {
116116
eventMatcher = func(e *Event) bool { return e.Topic == o.Topic }
117117
} else if o.TopicPattern != "" {
118-
topicPatternRegexp := regexp.MustCompile(o.TopicPattern)
118+
topicPatternRegexp, err := regexp.Compile(o.TopicPattern)
119+
if err != nil {
120+
return nil, fmt.Errorf("topic pattern must be a valid regex: %w", err)
121+
}
119122
eventMatcher = func(e *Event) bool { return topicPatternRegexp.MatchString(e.Topic) }
120123
}
121124

streaming/sink_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ func TestNewSink(t *testing.T) {
2828
assert.NoError(t, err)
2929
sink, err := s.NewSink(ctx, "sink", options.WithSinkBlockDuration(testBlockDuration))
3030
assert.NoError(t, err)
31-
assert.NotNil(t, sink)
32-
cleanupSink(t, ctx, s, sink)
31+
if assert.NotNil(t, sink) {
32+
defer cleanupSink(t, ctx, s, sink)
33+
}
34+
35+
_, err = s.NewSink(ctx, "sink", options.WithSinkTopicPattern("("))
36+
assert.EqualError(t, err, "topic pattern must be a valid regex: error parsing regexp: missing closing ): `(`")
3337
}
3438

3539
func TestReadOnce(t *testing.T) {

streaming/streams.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,9 @@ func NewStream(name string, rdb *redis.Client, opts ...options.Stream) (*Stream,
7777
// - from the event added on or after the timestamp provided via
7878
// WithReaderStartAt if still in the stream, oldest event otherwise
7979
func (s *Stream) NewReader(ctx context.Context, opts ...options.Reader) (*Reader, error) {
80-
reader, err := newReader(ctx, s, opts...)
80+
reader, err := newReader(s, opts...)
8181
if err != nil {
82-
err := fmt.Errorf("failed to create reader: %w", err)
83-
s.logger.Error(err)
82+
s.logger.Error(fmt.Errorf("failed to create reader: %w", err))
8483
return nil, err
8584
}
8685
s.logger.Info("create reader", "start", reader.startID)

testing/redis.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@ import (
1313
"github.com/stretchr/testify/require"
1414
)
1515

16-
// redisPwd is the default test redis password, overridden by REDIS_PASSWORD env var
17-
var redisPwd = "redispassword"
16+
var (
17+
// redisPwd is the default test redis password, overridden by REDIS_PASSWORD env var
18+
redisPwd = "redispassword"
19+
// streamRegexp is a regular expression that matches valid stream keys
20+
streamRegexp = regexp.MustCompile(`^pulse:stream:[^:]+:node:.*`)
21+
)
1822

1923
func init() {
2024
if p := os.Getenv("REDIS_PASSWORD"); p != "" {
@@ -45,7 +49,7 @@ func CleanupRedis(t *testing.T, rdb *redis.Client, checkClean bool, testName str
4549
// Sinks content is cleaned up asynchronously, so ignore it
4650
continue
4751
}
48-
if regexp.MustCompile(`^pulse:stream:[^:]+:node:.*`).MatchString(k) {
52+
if streamRegexp.MatchString(k) {
4953
// Node streams are cleaned up asynchronously, so ignore them
5054
continue
5155
}

0 commit comments

Comments
 (0)