Skip to content

Commit aac6c7c

Browse files
committed
feat(search): add concurrency for page fetching
Introduce concurrency control for Loggly page fetching via the new `-concurrency` flag. Add OrderedBuffer to ensure ordered delivery of results. Update search logic to support concurrent requests and ordered output. Includes tests for OrderedBuffer.
1 parent 9d6697a commit aac6c7c

File tree

4 files changed

+183
-28
lines changed

4 files changed

+183
-28
lines changed

loggly.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const usage = `
3030
-count print total event count
3131
-all print the entire loggly event instead of just the message
3232
-maxPages <count> maximum number of pages to query [3]
33+
-concurrency <count> number of concurrent page fetchers [3]
3334
-version print version information
3435
3536
Operators:
@@ -59,6 +60,7 @@ const usage = `
5960
// Command options.
6061
var flags = flag.NewFlagSet("loggly", flag.ExitOnError)
6162
var count = flags.Bool("count", false, "")
63+
var concurrency = flags.Int("concurrency", 3, "")
6264
var versionQuery = flags.Bool("version", false, "")
6365
var account = flags.String("account", "", "")
6466
var maxPages = flags.Int("maxPages", 3, "")
@@ -143,10 +145,10 @@ func printRes(res search.Response) {
143145
}
144146
}
145147

146-
func sendQuery(query string, size int, from string, to string, maxPages int) {
148+
func sendQuery(query string, size int, from string, to string, maxPages int, concurrency int) {
147149
doneChan := make(chan error)
148150

149-
c := search.New(*account, *token)
151+
c := search.New(*account, *token).SetConcurrency(concurrency)
150152
res, err := c.Query(query).Size(size).From(from).To(to).MaxPage(maxPages).Fetch()
151153

152154
go func() {
@@ -182,6 +184,12 @@ func warnInvalidFlagPlacement(args []string) {
182184
}
183185
}
184186

187+
func warnHighConcurrency(concurrency int) {
188+
if concurrency > 3 {
189+
fmt.Fprintf(os.Stderr, " Warning: High concurrency (%d) may lead to rate limiting or temporary blocking by Loggly. Consider reducing the concurrency level.\n", concurrency)
190+
}
191+
}
192+
185193
func main() {
186194
flags.Usage = printUsage
187195
flags.Parse(os.Args[1:])
@@ -196,12 +204,13 @@ func main() {
196204

197205
args := flags.Args()
198206
warnInvalidFlagPlacement(args)
207+
warnHighConcurrency(*concurrency)
199208
query := strings.Join(args, " ")
200209

201210
if *count {
202211
execCount(query, *from, *to)
203212
return
204213
}
205214

206-
sendQuery(query, *size, *from, *to, *maxPages)
215+
sendQuery(query, *size, *from, *to, *maxPages, *concurrency)
207216
}

orderedbuffer/orderedbuffer.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package orderedbuffer
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"sync"
7+
)
8+
9+
type OrderedBuffer[T any] struct {
10+
responses map[int]T
11+
mu *sync.RWMutex
12+
lastSentIdx int
13+
ch chan T
14+
}
15+
16+
func NewOrderedBuffer[T any](ch chan T) *OrderedBuffer[T] {
17+
fmt.Fprintln(os.Stderr, "Creating new ResponsesStore")
18+
var mu sync.RWMutex
19+
return &OrderedBuffer[T]{
20+
responses: make(map[int]T),
21+
ch: ch,
22+
lastSentIdx: -1,
23+
mu: &mu,
24+
}
25+
}
26+
27+
func (s *OrderedBuffer[T]) send() {
28+
s.mu.Lock()
29+
newIdx := s.lastSentIdx + 1
30+
fmt.Fprintln(os.Stderr, "Checking for page", newIdx)
31+
if resp, ok := s.responses[newIdx]; ok {
32+
fmt.Fprintln(os.Stderr, "Sending page", newIdx)
33+
s.ch <- resp
34+
s.lastSentIdx = newIdx
35+
delete(s.responses, newIdx)
36+
s.mu.Unlock()
37+
s.send()
38+
return
39+
} else {
40+
fmt.Fprintln(os.Stderr, "Page", newIdx, "not ready yet")
41+
}
42+
s.mu.Unlock()
43+
}
44+
45+
func (s *OrderedBuffer[T]) Store(i int, r T) {
46+
fmt.Fprintf(os.Stderr, "Storing page %d\n", i)
47+
s.mu.Lock()
48+
s.responses[i] = r
49+
s.mu.Unlock()
50+
s.send()
51+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package orderedbuffer
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestOrderedBufferOrderedDelivery(t *testing.T) {
9+
ch := make(chan int, 3)
10+
buf := NewOrderedBuffer(ch)
11+
12+
buf.Store(0, 10)
13+
buf.Store(1, 20)
14+
buf.Store(2, 30)
15+
16+
got := []int{<-ch, <-ch, <-ch}
17+
want := []int{10, 20, 30}
18+
for i := range want {
19+
if got[i] != want[i] {
20+
t.Errorf("expected %d at index %d, got %d", want[i], i, got[i])
21+
}
22+
}
23+
}
24+
25+
func TestOrderedBufferOutOfOrder(t *testing.T) {
26+
ch := make(chan string, 3)
27+
buf := NewOrderedBuffer(ch)
28+
29+
buf.Store(2, "c")
30+
buf.Store(0, "a")
31+
buf.Store(1, "b")
32+
33+
got := []string{<-ch, <-ch, <-ch}
34+
want := []string{"a", "b", "c"}
35+
for i := range want {
36+
if got[i] != want[i] {
37+
t.Errorf("expected %s at index %d, got %s", want[i], i, got[i])
38+
}
39+
}
40+
}
41+
42+
func TestOrderedBufferConcurrent(t *testing.T) {
43+
ch := make(chan int, 3)
44+
buf := NewOrderedBuffer(ch)
45+
46+
go buf.Store(1, 100)
47+
go buf.Store(0, 50)
48+
go buf.Store(2, 150)
49+
50+
time.Sleep(100 * time.Millisecond)
51+
got := []int{<-ch, <-ch, <-ch}
52+
want := []int{50, 100, 150}
53+
for i := range want {
54+
if got[i] != want[i] {
55+
t.Errorf("expected %d at index %d, got %d", want[i], i, got[i])
56+
}
57+
}
58+
}

search/search.go

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,32 @@ package search
22

33
import (
44
"fmt"
5-
"io/ioutil"
5+
"io"
66
"net/http"
77
"net/url"
88
"strconv"
9+
"sync"
10+
"sync/atomic"
911

12+
"github.com/Ajnasz/go-loggly-cli/orderedbuffer"
1013
"github.com/bitly/go-simplejson"
1114
)
1215

1316
// Client Loggly search client with user credentials, loggly
1417
// does not seem to support tokens right now.
1518
type Client struct {
16-
Token string
17-
Account string
18-
Endpoint string
19+
Token string
20+
Account string
21+
Endpoint string
22+
concurrency int
1923
}
2024

2125
// Response Search response with total events, page number
2226
// and the events array.
2327
type Response struct {
2428
Total int64
2529
Page int64
26-
Events []interface{}
30+
Events []any
2731
}
2832

2933
// Query builder struct
@@ -60,6 +64,14 @@ func New(account string, token string) *Client {
6064
return c
6165
}
6266

67+
func (c *Client) SetConcurrency(n int) *Client {
68+
if n < 1 {
69+
n = 1
70+
}
71+
c.concurrency = n
72+
return c
73+
}
74+
6375
// URL Return the base api url.
6476
func (c *Client) URL() string {
6577
return fmt.Sprintf("https://%s.%s", c.Account, c.Endpoint)
@@ -89,10 +101,11 @@ func (c *Client) GetJSON(path string) (j *simplejson.Json, err error) {
89101
defer res.Body.Close()
90102

91103
if res.StatusCode >= 400 {
92-
return nil, fmt.Errorf("go-loggly-search: %q", res.Status)
104+
body, _ := io.ReadAll(res.Body)
105+
return nil, fmt.Errorf("go-loggly-search: %q, %s", res.Status, body)
93106
}
94107

95-
body, err := ioutil.ReadAll(res.Body)
108+
body, err := io.ReadAll(res.Body)
96109

97110
if err != nil {
98111
return
@@ -187,7 +200,12 @@ func (q *Query) Fetch() (chan Response, chan error) {
187200
resChan := make(chan Response)
188201
errChan := make(chan error)
189202

190-
page := 0
203+
concurrent := min(q.maxPages, q.client.concurrency)
204+
205+
pool := make(chan struct{}, concurrent)
206+
207+
var page atomic.Int64
208+
page.Store(-1)
191209
go func() {
192210
defer close(resChan)
193211
defer close(errChan)
@@ -198,26 +216,45 @@ func (q *Query) Fetch() (chan Response, chan error) {
198216
return
199217
}
200218

219+
var wg sync.WaitGroup
220+
var hasMore atomic.Bool
221+
hasMore.Store(true)
222+
var lastPrintedPage atomic.Int32
223+
lastPrintedPage.Store(-1)
224+
responsesStore := orderedbuffer.NewOrderedBuffer(resChan)
201225
for {
202-
res, err := q.client.Search(j, page)
203-
204-
if err != nil {
205-
errChan <- err
206-
return
226+
pool <- struct{}{}
227+
wg.Add(1)
228+
go func(page int) {
229+
defer wg.Done()
230+
defer func() { <-pool }()
231+
232+
res, err := q.client.Search(j, page)
233+
if err != nil {
234+
errChan <- err
235+
hasMore.Store(false)
236+
return
237+
}
238+
239+
if res != nil {
240+
// resChan <- *res
241+
responsesStore.Store(page, *res)
242+
243+
if len(res.Events) < q.size {
244+
hasMore.Store(false)
245+
return
246+
}
247+
} else {
248+
hasMore.Store(false)
249+
return
250+
}
251+
}(int(page.Add(1)))
252+
if int(page.Load()) >= q.maxPages || !hasMore.Load() {
253+
break
207254
}
208-
209-
resChan <- *res
210-
211-
if page+1 == q.maxPages {
212-
return
213-
}
214-
215-
if len(res.Events) < q.size {
216-
return
217-
}
218-
219-
page++
220255
}
256+
257+
wg.Wait()
221258
}()
222259

223260
return resChan, errChan

0 commit comments

Comments
 (0)