Skip to content

Commit 55b5da4

Browse files
committed
chore: debug logs
1 parent dc2319f commit 55b5da4

File tree

3 files changed

+34
-4
lines changed

3 files changed

+34
-4
lines changed

router/batchrouter/handle.go

+1
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ func (brt *Handle) mainLoop(ctx context.Context) {
168168
return
169169
case <-time.After(mainLoopSleep):
170170
for _, partition := range brt.activePartitions(ctx) {
171+
brt.logger.Infof("Pinging worker for partition: %s", partition)
171172
pool.PingWorker(partition)
172173
}
173174
mainLoopSleep = brt.mainLoopFreq.Load()

router/batchrouter/worker.go

+14
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package batchrouter
33
import (
44
"context"
55
"fmt"
6+
"runtime"
67
"slices"
8+
"strconv"
79
"strings"
810
"time"
911

@@ -32,6 +34,17 @@ type worker struct {
3234
brt *Handle
3335
}
3436

37+
func goid() int {
38+
var buf [64]byte
39+
n := runtime.Stack(buf[:], false)
40+
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine"))[0]
41+
id, err := strconv.Atoi(idField)
42+
if err != nil {
43+
panic(fmt.Sprintf("cannot get goroutine id: %v", err))
44+
}
45+
return id
46+
}
47+
3548
// Work retrieves jobs from batch router for the worker's partition and processes them,
3649
// grouped by destination and in parallel.
3750
// The function returns when processing completes and the return value is true if at least 1 job was processed,
@@ -40,6 +53,7 @@ func (w *worker) Work() bool {
4053
brt := w.brt
4154
workerJobs := brt.getWorkerJobs(w.partition)
4255
brt.logger.Infof("Worker %s has %v jobs to process", w.partition, len(workerJobs))
56+
brt.logger.Infof("Goroutine ID: %d", goid())
4357
if len(workerJobs) == 0 {
4458
return false
4559
}

utils/workerpool/internal_worker.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ package workerpool
22

33
import (
44
"context"
5+
"fmt"
6+
"runtime"
7+
"strconv"
8+
"strings"
59
"sync"
610
"time"
711

@@ -62,10 +66,10 @@ func (w *internalWorker) start() {
6266

6367
w.setIdleSince(time.Time{})
6468
if w.delegate.Work() {
65-
w.logger.Infof("worker %q produced work", w.partition)
69+
w.logger.Infof("worker %q produced work %d", w.partition, goid())
6670
exponentialSleep.Reset()
6771
} else {
68-
w.logger.Infof("worker %q didn't produce any work", w.partition)
72+
w.logger.Infof("worker %q didn't produce any work %d", w.partition, goid())
6973
if err := misc.SleepCtx(w.lifecycle.ctx, exponentialSleep.Next(w.delegate.SleepDurations())); err != nil {
7074
w.logger.Debugf("worker %q sleep interrupted: %v", w.partition, err)
7175
return
@@ -84,6 +88,17 @@ func (w *internalWorker) setIdleSince(t time.Time) {
8488
}
8589
}
8690

91+
func goid() int {
92+
var buf [64]byte
93+
n := runtime.Stack(buf[:], false)
94+
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine"))[0]
95+
id, err := strconv.Atoi(idField)
96+
if err != nil {
97+
panic(fmt.Sprintf("cannot get goroutine id: %v", err))
98+
}
99+
return id
100+
}
101+
87102
// Ping triggers the worker to pick more jobs
88103
func (w *internalWorker) Ping() {
89104
w.logger.Debugf("worker %q pinged", w.partition)
@@ -94,9 +109,9 @@ func (w *internalWorker) Ping() {
94109
}
95110
select {
96111
case w.ping <- struct{}{}:
97-
w.logger.Infof("worker %q pinged", w.partition)
112+
w.logger.Infof("worker %q pinged %d", w.partition, goid())
98113
default:
99-
w.logger.Infof("worker %q pinged but channel was full", w.partition)
114+
w.logger.Infof("worker %q pinged but channel was full %d", w.partition, goid())
100115
}
101116
}
102117

0 commit comments

Comments
 (0)