Skip to content

Commit 66e454a

Browse files
committed
refactor(search): use errgroup for concurrent page fetch
Replaces manual goroutine and WaitGroup management in fetchAllPages with golang.org/x/sync/errgroup for improved error handling and concurrency control. Updates usage and warning messages to clarify concurrency implications. Adds errgroup dependency to go.mod and go.sum.
1 parent 9716cce commit 66e454a

File tree

4 files changed

+25
-32
lines changed

4 files changed

+25
-32
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ module github.com/Ajnasz/go-loggly-cli
33
go 1.25
44

55
require github.com/bitly/go-simplejson v0.5.1
6+
7+
require golang.org/x/sync v0.17.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow=
22
github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q=
3+
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
4+
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=

loggly.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,8 @@ import (
1313
"github.com/Ajnasz/go-loggly-cli/search"
1414
)
1515

16-
// Version is the version string
1716
var version string
1817

19-
//
20-
// Usage information.
21-
//
22-
2318
const usage = `
2419
Usage: loggly [options] [query...]
2520
@@ -33,7 +28,7 @@ const usage = `
3328
-count print total event count
3429
-all print the entire loggly event instead of just the message
3530
-maxPages <count> maximum number of pages to query [3]
36-
-concurrency <count> number of concurrent page fetchers [3]
31+
-concurrency <count> number of concurrent page fetchers [3]. If loggly returns with http error consider reducing this value.
3732
-version print version information
3833
3934
Operators:
@@ -202,7 +197,7 @@ func warnInvalidFlagPlacement(args []string) {
202197

203198
func warnHighConcurrency(concurrency int) {
204199
if concurrency > 3 {
205-
fmt.Fprintf(os.Stderr, "Warning: High concurrency (%d) may lead to rate limiting or temporary blocking by Loggly. Consider reducing the concurrency level.\n", concurrency)
200+
fmt.Fprintf(os.Stderr, "Warning: High concurrency (%d) may lead to rate limiting or temporary blocking by Loggly. If loggly returns with error, consider reducing the concurrency level.\n", concurrency)
206201
}
207202
}
208203

search/search.go

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ import (
66
"io"
77
"net/http"
88
"strconv"
9-
"sync"
109
"sync/atomic"
1110

1211
"github.com/Ajnasz/go-loggly-cli/orderedbuffer"
1312
"github.com/Ajnasz/go-loggly-cli/semaphore"
1413
"github.com/bitly/go-simplejson"
14+
"golang.org/x/sync/errgroup"
1515
)
1616

1717
// Client Loggly search client with user credentials, loggly
@@ -161,14 +161,12 @@ func shouldStopFetching(err error, res *Response, pageSize int) bool {
161161
return false
162162
}
163163

164-
func (c *Client) fetchAllPages(ctx context.Context, q Query, resChan chan Response, errChan chan error) {
164+
func (c *Client) fetchAllPages(ctx context.Context, q Query, resChan chan Response) error {
165165
defer close(resChan)
166-
defer close(errChan)
167166
j, err := c.CreateSearch(ctx, q.String())
168167

169168
if err != nil {
170-
errChan <- err
171-
return
169+
return err
172170
}
173171

174172
concurrent := min(q.maxPages, c.concurrency.Load())
@@ -177,37 +175,27 @@ func (c *Client) fetchAllPages(ctx context.Context, q Query, resChan chan Respon
177175
var page atomic.Int64
178176
page.Store(-1)
179177

180-
var wg sync.WaitGroup
181178
var hasMore atomic.Bool
182179
hasMore.Store(true)
183180
responsesStore := orderedbuffer.NewOrderedBuffer(resChan)
184181

182+
errg, ctx := errgroup.WithContext(ctx)
183+
185184
for {
186185
if err := sem.Acquire(ctx); err != nil {
187-
errChan <- err
188-
break
186+
return err
189187
}
190-
191-
wg.Add(1)
192-
go func(page int) {
193-
defer wg.Done()
188+
p := int(page.Add(1))
189+
errg.Go(func() error {
194190
defer sem.Release()
195191

196-
if ctx.Err() != nil {
197-
errChan <- ctx.Err()
198-
hasMore.Store(false)
199-
return
200-
}
201-
202-
res, err := c.fetchAndStorePage(ctx, j, responsesStore, page)
192+
res, err := c.fetchAndStorePage(ctx, j, responsesStore, p)
203193

204194
if shouldStopFetching(err, res, q.size) {
205195
hasMore.Store(false)
206-
if err != nil {
207-
errChan <- err
208-
}
209196
}
210-
}(int(page.Add(1)))
197+
return err
198+
})
211199

212200
shouldBreak := page.Load() >= q.maxPages || !hasMore.Load()
213201

@@ -216,7 +204,7 @@ func (c *Client) fetchAllPages(ctx context.Context, q Query, resChan chan Respon
216204
}
217205
}
218206

219-
wg.Wait()
207+
return errg.Wait()
220208
}
221209

222210
// Fetch Search response with total events, page number
@@ -229,7 +217,13 @@ func (c *Client) Fetch(ctx context.Context, q Query) (chan Response, chan error)
229217
resChan := make(chan Response)
230218
errChan := make(chan error)
231219

232-
go c.fetchAllPages(ctx, q, resChan, errChan)
220+
go func() {
221+
defer close(errChan)
222+
err := c.fetchAllPages(ctx, q, resChan)
223+
if err != nil {
224+
errChan <- err
225+
}
226+
}()
233227

234228
return resChan, errChan
235229
}

0 commit comments

Comments
 (0)