@@ -39,8 +39,14 @@ import (
3939 "go.uber.org/zap"
4040)
4141
42+ const (
43+ MaxLifeTimeMultiplier = 5
44+ PurgeOlderThanMultiplier = 15
45+ )
46+
4247func main () {
4348 eventsCount := flag .Int ("events" , 1e5 , "number of events" )
49+ bucketsCount := flag .Int ("buckets" , 1e5 , "number of buckets" )
4450 sleep := flag .Duration ("sleep" , 10 * time .Millisecond , "sleep between sending two events" )
4551 logFile := flag .String ("log" , fmt .Sprintf ("log-%s-%d.log" , version .Version , time .Now ().UnixMilli ()), "log file for stats" )
4652 logEvery := flag .Duration ("log-every" , time .Second , "how often log statistics" )
@@ -51,8 +57,23 @@ func main() {
5157 logger := zap .Must (zap .NewDevelopment ())
5258
5359 // log input parameters
54- logger .Info ("Running stress test with:" ,
60+ logger .Info ("Running stress test - input:" ,
61+ zap .Int ("events" , * eventsCount ),
62+ zap .Int ("buckets" , * bucketsCount ),
63+ zap .Duration ("sleep" , * sleep ),
64+ zap .String ("log" , * logFile ),
65+ zap .Duration ("log-every" , * logEvery ),
66+ zap .Bool ("pprof" , * enablePProf ),
67+ zap .String ("version" , version .Version ),
68+ )
69+
70+ if * bucketsCount == - 1 {
71+ * bucketsCount = PurgeOlderThanMultiplier
72+ }
73+
74+ logger .Info ("Running stress test - adjusted:" ,
5575 zap .Int ("events" , * eventsCount ),
76+ zap .Int ("buckets" , * bucketsCount ),
5677 zap .Duration ("sleep" , * sleep ),
5778 zap .String ("log" , * logFile ),
5879 zap .Duration ("log-every" , * logEvery ),
@@ -61,6 +82,8 @@ func main() {
6182 )
6283
6384 if * enablePProf {
85+ runtime .SetBlockProfileRate (1 )
86+ runtime .SetMutexProfileFraction (1 )
6487 go func () {
6588 http .ListenAndServe ("localhost:8080" , nil )
6689 }()
@@ -82,7 +105,11 @@ func main() {
82105 defer server .Close ()
83106
84107 cfg := config .NewDefaultDataSetConfig ()
85- bufferCfg , err := cfg .BufferSettings .WithOptions (buffer_config .WithGroupBy ([]string {"body.str" }))
108+ bufferCfg , err := cfg .BufferSettings .WithOptions (
109+ buffer_config .WithGroupBy ([]string {"body.str" }),
110+ buffer_config .WithMaxLifetime (MaxLifeTimeMultiplier * * sleep ),
111+ buffer_config .WithPurgeOlderThan (PurgeOlderThanMultiplier * * sleep ),
112+ )
86113 check (err )
87114 cfgUpdated , err := cfg .WithOptions (
88115 config .WithBufferSettings (* bufferCfg ),
@@ -102,9 +129,20 @@ func main() {
102129
103130 go logStats (dataSetClient , & apiCalls , * logFile , * logEvery )
104131
132+ // start sending events
133+ logger .Info (
134+ "STRESS - Start adding events" ,
135+ )
105136 for i := 0 ; i < * eventsCount ; i ++ {
106137 batch := make ([]* add_events.EventBundle , 0 )
107- key := fmt .Sprintf ("%d" , i )
138+ key := fmt .Sprintf ("%d" , i % * bucketsCount )
139+
140+ logger .Debug (
141+ "STRESS - Creating event" ,
142+ zap .Int ("i" , i ),
143+ zap .String ("key" , key ),
144+ )
145+
108146 attrs := make (map [string ]interface {})
109147 attrs ["body.str" ] = key
110148 attrs ["attributes.p1" ] = strings .Repeat ("A" , rand .Intn (2000 ))
@@ -135,6 +173,9 @@ func main() {
135173 }
136174
137175 // wait until everything is processed
176+ logger .Info (
177+ "STRESS - Wait for everything to finish" ,
178+ )
138179 for {
139180 processed := uint64 (0 )
140181 stats := dataSetClient .Statistics ()
@@ -152,6 +193,9 @@ func main() {
152193 }
153194
154195 // wait for extra 1 minute to see how the memory will behave
196+ logger .Info (
197+ "STRESS - Extra sleep at the end" ,
198+ )
155199 extraSleepFor := 60
156200 for i := 0 ; i <= extraSleepFor ; i ++ {
157201 time .Sleep (time .Second )
@@ -175,27 +219,53 @@ func logStats(client *client.DataSetClient, apiCalls *atomic.Uint64, logFile str
175219 f , err := os .Create (logFile )
176220 check (err )
177221
178- _ , err = f .WriteString ("i\t Time\t Enqueued \t Processed \t Calls\t HeapAlloc\t HeapSys\t Mallocs\t Frees\t HeapObjects\t Version\n " )
222+ _ , err = f .WriteString ("i\t Time\t EvEnqueued \t EvProcessed \t EvBroken \t EvDropped \t BufEnqueued \t BufProcessed \t BufBroken \t BufDropped \t SesOpened \t SesClosed \t Calls\t HeapAlloc\t HeapSys\t Mallocs\t Frees\t HeapObjects\t Version\n " )
179223 check (err )
180224
181225 for i := 0 ; ; i ++ {
182226 var memStats runtime.MemStats
183227 runtime .ReadMemStats (& memStats )
184- enqueued := uint64 (0 )
185- processed := uint64 (0 )
228+ evEnqueued := uint64 (0 )
229+ evProcessed := uint64 (0 )
230+ evDropped := uint64 (0 )
231+ evBroken := uint64 (0 )
232+ bufEnqueued := uint64 (0 )
233+ bufProcessed := uint64 (0 )
234+ bufDropped := uint64 (0 )
235+ bufBroken := uint64 (0 )
236+ sesOpened := uint64 (0 )
237+ sesClosed := uint64 (0 )
186238 clientStats := client .Statistics ()
187239 if clientStats != nil {
188- enqueued = clientStats .Events .Enqueued ()
189- processed = clientStats .Events .Processed ()
240+ evEnqueued = clientStats .Events .Enqueued ()
241+ evProcessed = clientStats .Events .Processed ()
242+ evDropped = clientStats .Events .Dropped ()
243+ evBroken = clientStats .Events .Broken ()
244+
245+ bufEnqueued = clientStats .Buffers .Enqueued ()
246+ bufProcessed = clientStats .Buffers .Processed ()
247+ bufDropped = clientStats .Buffers .Dropped ()
248+ bufBroken = clientStats .Buffers .Broken ()
249+
250+ sesOpened = clientStats .Sessions .SessionsOpened ()
251+ sesClosed = clientStats .Sessions .SessionsClosed ()
190252 }
191253
192254 _ , err := f .WriteString (
193255 fmt .Sprintf (
194- "%d\t %d\t %d\t %d\t %d\t %d\t %d\t %d\t %d\t %d\t %s\n " ,
256+ "%d\t %d\t %d\t %d\t %d\t %d\t %d\t %d\t %d\t %d\t %d \t %d \t %d \t %d \t %d \t %d \t %d \t %d \t % s\n " ,
195257 i ,
196258 time .Now ().Unix (),
197- enqueued ,
198- processed ,
259+ evEnqueued ,
260+ evProcessed ,
261+ evDropped ,
262+ evBroken ,
263+ bufEnqueued ,
264+ bufProcessed ,
265+ bufDropped ,
266+ bufBroken ,
267+ sesOpened ,
268+ sesClosed ,
199269 apiCalls .Load (),
200270 memStats .HeapAlloc ,
201271 memStats .HeapSys ,
0 commit comments