Skip to content

Commit 9716cce

Browse files
committed
refactor: add semaphore
Replaced channel-based concurrency control with a custom semaphore implementation to improve resource management in fetchAllPages. Added semaphore package for handling concurrent operations.
1 parent 22ee1c0 commit 9716cce

File tree

4 files changed

+40
-9
lines changed

4 files changed

+40
-9
lines changed

loggly.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ var count = flags.Bool("count", false, "")
6666
var concurrency = flags.Int("concurrency", 3, "")
6767
var versionQuery = flags.Bool("version", false, "")
6868
var account = flags.String("account", "", "")
69-
var maxPages = flags.Int("maxPages", 3, "")
69+
var maxPages = flags.Int64("maxPages", 3, "")
7070
var token = flags.String("token", "", "")
7171
var size = flags.Int("size", 100, "")
7272
var from = flags.String("from", "-24h", "")
@@ -155,7 +155,7 @@ func sendQuery(
155155
size int,
156156
from string,
157157
to string,
158-
maxPages int,
158+
maxPages int64,
159159
concurrency int,
160160
) {
161161
doneChan := make(chan error)

search/query.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ type Query struct {
1212
until string
1313
order string
1414
size int
15-
maxPages int
15+
maxPages int64
1616
}
1717

1818
// Create a new query
@@ -50,7 +50,7 @@ func (q *Query) From(str string) *Query {
5050
}
5151

5252
// MaxPage sets the max page
53-
func (q *Query) MaxPage(maxPages int) *Query {
53+
func (q *Query) MaxPage(maxPages int64) *Query {
5454
q.maxPages = maxPages
5555
return q
5656
}

search/search.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync/atomic"
1111

1212
"github.com/Ajnasz/go-loggly-cli/orderedbuffer"
13+
"github.com/Ajnasz/go-loggly-cli/semaphore"
1314
"github.com/bitly/go-simplejson"
1415
)
1516

@@ -170,8 +171,8 @@ func (c *Client) fetchAllPages(ctx context.Context, q Query, resChan chan Respon
170171
return
171172
}
172173

173-
concurrent := min(q.maxPages, int(c.concurrency.Load()))
174-
pool := make(chan struct{}, concurrent)
174+
concurrent := min(q.maxPages, c.concurrency.Load())
175+
sem := semaphore.New(concurrent)
175176

176177
var page atomic.Int64
177178
page.Store(-1)
@@ -182,11 +183,15 @@ func (c *Client) fetchAllPages(ctx context.Context, q Query, resChan chan Respon
182183
responsesStore := orderedbuffer.NewOrderedBuffer(resChan)
183184

184185
for {
185-
pool <- struct{}{}
186+
if err := sem.Acquire(ctx); err != nil {
187+
errChan <- err
188+
break
189+
}
190+
186191
wg.Add(1)
187192
go func(page int) {
188193
defer wg.Done()
189-
defer func() { <-pool }()
194+
defer sem.Release()
190195

191196
if ctx.Err() != nil {
192197
errChan <- ctx.Err()
@@ -204,7 +209,7 @@ func (c *Client) fetchAllPages(ctx context.Context, q Query, resChan chan Respon
204209
}
205210
}(int(page.Add(1)))
206211

207-
shouldBreak := int(page.Load()) >= q.maxPages || !hasMore.Load()
212+
shouldBreak := page.Load() >= q.maxPages || !hasMore.Load()
208213

209214
if shouldBreak {
210215
break

semaphore/semaphore.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package semaphore
2+
3+
import "context"
4+
5+
type Semaphore struct {
6+
ch chan struct{}
7+
}
8+
9+
func New(n int64) *Semaphore {
10+
return &Semaphore{
11+
ch: make(chan struct{}, n),
12+
}
13+
}
14+
15+
func (s *Semaphore) Acquire(ctx context.Context) error {
16+
select {
17+
case <-ctx.Done():
18+
return ctx.Err()
19+
case s.ch <- struct{}{}:
20+
return nil
21+
}
22+
}
23+
24+
func (s *Semaphore) Release() {
25+
<-s.ch
26+
}

0 commit comments

Comments
 (0)