@@ -53,48 +53,70 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error
5353 grouped := client .groupBundles (bundles )
5454
5555 for key , bundles := range grouped {
56+ client .addBundle (key , bundles )
57+ }
58+
59+ return nil
60+ }
5661
62+ func (client * DataSetClient ) addBundle (key string , bundles []* add_events.EventBundle ) {
63+ // this function has to be called from AddEvents - inner loop
64+ // it assumes that all bundles have the same key
65+ getBuffer := func (key string ) * buffer.Buffer {
5766 buf := client .Buffer (key , client .SessionInfo )
67+ // change state to mark that bundles are being added
68+ buf .SetStatus (buffer .AddingBundles )
69+ return buf
70+ }
71+
72+ publish := func (key string , buf * buffer.Buffer ) * buffer.Buffer {
73+ client .PublishBuffer (buf )
74+ return getBuffer (key )
75+ }
76+
77+ buf := getBuffer (key )
78+ for _ , bundle := range bundles {
79+ added , err := buf .AddBundle (bundle )
80+ if err != nil {
81+ if errors .Is (err , & buffer.NotAcceptingError {}) {
82+ buf = getBuffer (key )
83+ } else {
84+ client .Logger .Error ("Cannot add bundle" , zap .Error (err ))
85+ // TODO: what to do? For now, lets skip it
86+ continue
87+ }
88+ }
89+
90+ if buf .ShouldSendSize () || added == buffer .TooMuch && buf .HasEvents () {
91+ buf = publish (key , buf )
92+ }
5893
59- for _ , bundle := range bundles {
60- added , err : = buf .AddBundle (bundle )
94+ if added == buffer . TooMuch {
95+ added , err = buf .AddBundle (bundle )
6196 if err != nil {
6297 if errors .Is (err , & buffer.NotAcceptingError {}) {
63- buf = client . Buffer (key , client . SessionInfo )
98+ buf = getBuffer (key )
6499 } else {
65100 client .Logger .Error ("Cannot add bundle" , zap .Error (err ))
66- // TODO: what to do? For now, lets skip it
67101 continue
68102 }
69103 }
70-
71- if buf .ShouldSendSize () || added == buffer .TooMuch && buf .HasEvents () {
72- client .PublishBuffer (buf )
73- buf = client .Buffer (key , client .SessionInfo )
104+ if buf .ShouldSendSize () {
105+ buf = publish (key , buf )
74106 }
75-
76107 if added == buffer .TooMuch {
77- added , err = buf .AddBundle (bundle )
78- if err != nil {
79- if errors .Is (err , & buffer.NotAcceptingError {}) {
80- buf = client .Buffer (key , client .SessionInfo )
81- } else {
82- client .Logger .Error ("Cannot add bundle" , zap .Error (err ))
83- continue
84- }
85- }
86- if buf .ShouldSendSize () {
87- client .PublishBuffer (buf )
88- buf = client .Buffer (key , client .SessionInfo )
89- }
90- if added == buffer .TooMuch {
91- client .Logger .Fatal ("Bundle was not added for second time!" , buf .ZapStats ()... )
92- }
108+ client .Logger .Fatal ("Bundle was not added for second time!" , buf .ZapStats ()... )
93109 }
94110 }
95111 }
112+ buf .SetStatus (buffer .Ready )
96113
97- return nil
114+ // it could happen that the buffer could have been published
115+ // by buffer sweeper, but it was skipped, because we have been
116+ // adding events, so lets check it and publish it if needed
117+ if buf .PublishAsap .Load () {
118+ client .PublishBuffer (buf )
119+ }
98120}
99121
100122// IsProcessingData returns True if there are still some unprocessed data.
@@ -223,14 +245,19 @@ func (client *DataSetClient) apiCall(req *http.Request, response response.SetRes
223245}
224246
225247func (client * DataSetClient ) SendAllAddEventsBuffers () {
248+ buffers := client .getBuffers ()
226249 client .Logger .Debug ("Send all AddEvents buffers" )
227- client .buffer .Range (func (k , v interface {}) bool {
228- buf , ok := v .(* buffer.Buffer )
229- if ok {
230- client .PublishBuffer (buf )
231- } else {
232- client .Logger .Error ("Unable to convert message to buffer" )
233- }
234- return true
235- })
250+ for _ , buf := range buffers {
251+ client .PublishBuffer (buf )
252+ }
253+ }
254+
255+ func (client * DataSetClient ) getBuffers () []* buffer.Buffer {
256+ client .buffersMutex .Lock ()
257+ defer client .buffersMutex .Unlock ()
258+ buffers := make ([]* buffer.Buffer , 0 )
259+ for _ , buf := range client .buffer {
260+ buffers = append (buffers , buf )
261+ }
262+ return buffers
236263}
0 commit comments