Skip to content

Commit dfedbc7

Browse files
authored
refactor: hq source package select statements (#579)
1 parent 944d40d commit dfedbc7

File tree

3 files changed

+36
-53
lines changed

3 files changed

+36
-53
lines changed

internal/pkg/source/hq/consumer.go

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -44,28 +44,23 @@ func (s *HQ) consumer() {
4444
go s.consumerSender(ctx, &wg, urlBuffer)
4545

4646
// Wait for shutdown signal
47-
for {
48-
select {
49-
case <-s.ctx.Done():
50-
logger.Debug("received done signal")
47+
<-s.ctx.Done()
48+
logger.Debug("received done signal")
5149

52-
// Cancel the context to stop all goroutines.
53-
cancel()
50+
// Cancel the context to stop all goroutines.
51+
cancel()
5452

55-
logger.Debug("waiting for goroutines to finish")
53+
logger.Debug("waiting for goroutines to finish")
5654

57-
// Wait for all goroutines to finish
58-
wg.Wait()
55+
// Wait for all goroutines to finish
56+
wg.Wait()
5957

60-
// Close the urlBuffer to signal consumerSenders to finish
61-
close(urlBuffer)
58+
// Close the urlBuffer to signal consumerSenders to finish
59+
close(urlBuffer)
6260

63-
s.wg.Done()
61+
s.wg.Done()
6462

65-
logger.Debug("closed")
66-
return
67-
}
68-
}
63+
logger.Debug("closed")
6964
}
7065

7166
func (s *HQ) consumerFetcher(ctx context.Context, wg *sync.WaitGroup, urlBuffer chan<- *gocrawlhq.URL, batchSize int) {
@@ -182,11 +177,9 @@ func (s *HQ) consumerSender(ctx context.Context, wg *sync.WaitGroup, urlBuffer <
182177
err = reactor.ReceiveInsert(newItem)
183178
if err != nil {
184179
if err == reactor.ErrReactorFrozen {
185-
select {
186-
case <-ctx.Done():
187-
logger.Debug("closed while sending to frozen reactor")
188-
return
189-
}
180+
<-ctx.Done()
181+
logger.Debug("closed while sending to frozen reactor")
182+
return
190183
}
191184
panic(err)
192185
}

internal/pkg/source/hq/finisher.go

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,28 +40,23 @@ func (s *HQ) finisher() {
4040
go s.finisherDispatcher(ctx, &wg, batchCh)
4141

4242
// Wait for the context to be canceled.
43-
for {
44-
select {
45-
case <-s.ctx.Done():
46-
logger.Debug("received done signal")
43+
<-s.ctx.Done()
44+
logger.Debug("received done signal")
4745

48-
// Cancel the context to stop all goroutines.
49-
cancel()
46+
// Cancel the context to stop all goroutines.
47+
cancel()
5048

51-
logger.Debug("waiting for goroutines to finish")
49+
logger.Debug("waiting for goroutines to finish")
5250

53-
// Wait for the finisher and dispatcher to finish.
54-
wg.Wait()
51+
// Wait for the finisher and dispatcher to finish.
52+
wg.Wait()
5553

56-
// Close the batch channel to signal the dispatcher to finish.
57-
close(batchCh)
54+
// Close the batch channel to signal the dispatcher to finish.
55+
close(batchCh)
5856

59-
s.wg.Done()
57+
s.wg.Done()
6058

61-
logger.Debug("closed")
62-
return
63-
}
64-
}
59+
logger.Debug("closed")
6560
}
6661

6762
// finisherReceiver reads URLs from finishCh, accumulates them into batches, and sends the batches to batchCh.

internal/pkg/source/hq/producer.go

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,28 +39,23 @@ func (s *HQ) producer() {
3939
go s.producerDispatcher(ctx, &wg, batchCh)
4040

4141
// Wait for the context to be canceled.
42-
for {
43-
select {
44-
case <-s.ctx.Done():
45-
logger.Debug("received done signal")
42+
<-s.ctx.Done()
43+
logger.Debug("received done signal")
4644

47-
// Cancel the context to stop all goroutines.
48-
cancel()
45+
// Cancel the context to stop all goroutines.
46+
cancel()
4947

50-
logger.Debug("waiting for goroutines to finish")
48+
logger.Debug("waiting for goroutines to finish")
5149

52-
// Wait for the producer and dispatcher to finish.
53-
wg.Wait()
50+
// Wait for the producer and dispatcher to finish.
51+
wg.Wait()
5452

55-
// Close the batch channel to signal the dispatcher to finish.
56-
close(batchCh)
53+
// Close the batch channel to signal the dispatcher to finish.
54+
close(batchCh)
5755

58-
s.wg.Done()
56+
s.wg.Done()
5957

60-
logger.Debug("closed")
61-
return
62-
}
63-
}
58+
logger.Debug("closed")
6459
}
6560

6661
// producerReceiver reads URLs from produceCh, accumulates them into batches, and sends the batches to batchCh.

0 commit comments

Comments
 (0)