Skip to content

Commit 21324eb

Browse files
DSET-3930: Log number of processed buffers/events (#22)
1 parent dafada0 commit 21324eb

File tree

1 file changed

+33
-0
lines changed

1 file changed

+33
-0
lines changed

pkg/client/client.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func NewClient(cfg *config.DataSetConfig, client *http.Client, logger *zap.Logge
111111
addEventsChannels: make(map[string]chan interface{}),
112112
}
113113

114+
// run buffer sweeper if requested
114115
if cfg.BufferSettings.MaxLifetime > 0 {
115116
dataClient.Logger.Info("Buffer.MaxLifetime is positive => send buffers regularly",
116117
zap.Duration("Buffer.MaxLifetime", cfg.BufferSettings.MaxLifetime),
@@ -123,6 +124,9 @@ func NewClient(cfg *config.DataSetConfig, client *http.Client, logger *zap.Logge
123124
)
124125
}
125126

127+
// run statistics sweeper
128+
go dataClient.statisticsSweeper()
129+
126130
dataClient.Logger.Info("DataSetClient was created",
127131
zap.String("id", dataClient.Id.String()),
128132
)
@@ -290,6 +294,35 @@ func (client *DataSetClient) listenAndSendBufferForSession(session string, ch ch
290294
}
291295
}
292296

297+
func (client *DataSetClient) statisticsSweeper() {
298+
for i := uint64(0); ; i++ {
299+
// log buffer stats
300+
bProcessed := client.buffersProcessed.Load()
301+
bEnqueued := client.buffersEnqueued.Load()
302+
bDropped := client.buffersDropped.Load()
303+
client.Logger.Info(
304+
"Buffers' Queue Stats:",
305+
zap.Uint64("processed", bProcessed),
306+
zap.Uint64("enqueued", bEnqueued),
307+
zap.Uint64("dropped", bDropped),
308+
zap.Uint64("waiting", bEnqueued-bProcessed),
309+
)
310+
311+
// log events stats
312+
eProcessed := client.eventsProcessed.Load()
313+
eEnqueued := client.eventsEnqueued.Load()
314+
client.Logger.Info(
315+
"Events' Queue Stats:",
316+
zap.Uint64("processed", eProcessed),
317+
zap.Uint64("enqueued", eEnqueued),
318+
zap.Uint64("waiting", eEnqueued-eProcessed),
319+
)
320+
321+
// wait for some time before new sweep
322+
time.Sleep(time.Minute)
323+
}
324+
}
325+
293326
func (client *DataSetClient) bufferSweeper(lifetime time.Duration) {
294327
client.Logger.Info("Starting buffer sweeper with lifetime", zap.Duration("lifetime", lifetime))
295328
totalKept := atomic.Uint64{}

0 commit comments

Comments
 (0)