@@ -50,100 +50,177 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error
5050 return fmt .Errorf ("AddEvents - reject batch: %w" , errR )
5151 }
5252
53- grouped := client .groupBundles (bundles )
53+ // first, figure out which keys are part of the batch
54+ seenKeys := make (map [string ]bool )
55+ for _ , bundle := range bundles {
56+ key := bundle .Key (client .Config .GroupBy )
57+ seenKeys [key ] = true
58+ }
5459
60+ // then create all subscribers
61+ // add subscriber for events by key
62+ // add subscriber for buffer by key
5563 client .addEventsMutex .Lock ()
5664 defer client .addEventsMutex .Unlock ()
57- for key , bundles := range grouped {
58- client .addBundle (key , bundles )
65+ for key := range seenKeys {
66+ _ , found := client .addEventsChannels [key ]
67+ if ! found {
68+ client .newBufferForEvents (key )
69+
70+ client .newChannelForEvents (key )
71+ }
72+ }
73+
74+ // and as last step - publish them
75+ for _ , bundle := range bundles {
76+ key := bundle .Key (client .Config .GroupBy )
77+ client .eventsEnqueued .Add (1 )
78+ client .addEventsPubSub .Pub (bundle , key )
5979 }
6080
6181 return nil
6282}
6383
64- func (client * DataSetClient ) addBundle (key string , bundles []* add_events.EventBundle ) {
84+ func (client * DataSetClient ) newChannelForEvents (key string ) {
85+ ch := client .addEventsPubSub .Sub (key )
86+ client .addEventsChannels [key ] = ch
87+ go (func (session string , ch chan interface {}) {
88+ client .ListenAndSendBundlesForKey (key , ch )
89+ })(key , ch )
90+ }
91+
92+ func (client * DataSetClient ) newBufferForEvents (key string ) {
93+ session := fmt .Sprintf ("%s-%s" , client .Id , key )
94+ buf := buffer .NewEmptyBuffer (session , client .Config .Tokens .WriteLog )
95+ client .initBuffer (buf , client .SessionInfo )
96+
97+ client .buffersAllMutex .Lock ()
98+ client .buffer [session ] = buf
99+ defer client .buffersAllMutex .Unlock ()
100+
101+ // create subscriber, so all the upcoming buffers are processed as well
102+ client .addEventsSubscriber (session )
103+ }
104+
105+ func (client * DataSetClient ) ListenAndSendBundlesForKey (key string , ch chan interface {}) {
106+ client .Logger .Info ("Listening to events with key" ,
107+ zap .String ("key" , key ),
108+ )
109+
65110 // this function has to be called from AddEvents - inner loop
66111 // it assumes that all bundles have the same key
67112 getBuffer := func (key string ) * buffer.Buffer {
68- buf := client .Buffer (key , client . SessionInfo )
113+ buf := client .getBuffer (key )
69114 // change state to mark that bundles are being added
70115 buf .SetStatus (buffer .AddingBundles )
71116 return buf
72117 }
73118
74119 publish := func (key string , buf * buffer.Buffer ) * buffer.Buffer {
75- client .PublishBuffer (buf )
120+ client .publishBuffer (buf )
76121 return getBuffer (key )
77122 }
78123
79- buf := getBuffer (key )
80- for _ , bundle := range bundles {
81- added , err := buf .AddBundle (bundle )
82- if err != nil {
83- if errors .Is (err , & buffer.NotAcceptingError {}) {
84- buf = getBuffer (key )
85- } else {
86- client .Logger .Error ("Cannot add bundle" , zap .Error (err ))
87- // TODO: what to do? For now, lets skip it
88- continue
89- }
90- }
124+ for processedMsgCnt := 0 ; ; processedMsgCnt ++ {
125+ if msg , ok := <- ch ; ok {
126+ bundle , ok := msg .(* add_events.EventBundle )
127+ if ok {
128+ buf := getBuffer (key )
129+ added , err := buf .AddBundle (bundle )
130+ if err != nil {
131+ if errors .Is (err , & buffer.NotAcceptingError {}) {
132+ buf = getBuffer (key )
133+ } else {
134+ client .Logger .Error ("Cannot add bundle" , zap .Error (err ))
135+ // TODO: what to do? For now, lets skip it
136+ continue
137+ }
138+ }
91139
92- if buf .ShouldSendSize () || added == buffer .TooMuch && buf .HasEvents () {
93- buf = publish (key , buf )
94- }
140+ if buf .ShouldSendSize () || added == buffer .TooMuch && buf .HasEvents () {
141+ buf = publish (key , buf )
142+ }
95143
96- if added == buffer .TooMuch {
97- added , err = buf .AddBundle (bundle )
98- if err != nil {
99- if errors .Is (err , & buffer.NotAcceptingError {}) {
100- buf = getBuffer (key )
101- } else {
102- client .Logger .Error ("Cannot add bundle" , zap .Error (err ))
103- continue
144+ if added == buffer .TooMuch {
145+ added , err = buf .AddBundle (bundle )
146+ if err != nil {
147+ if errors .Is (err , & buffer.NotAcceptingError {}) {
148+ buf = getBuffer (key )
149+ } else {
150+ client .Logger .Error ("Cannot add bundle" , zap .Error (err ))
151+ continue
152+ }
153+ }
154+ if buf .ShouldSendSize () {
155+ buf = publish (key , buf )
156+ }
157+ if added == buffer .TooMuch {
158+ client .Logger .Fatal ("Bundle was not added for second time!" , buf .ZapStats ()... )
159+ }
160+ }
161+ client .eventsProcessed .Add (1 )
162+
163+ buf .SetStatus (buffer .Ready )
164+ // it could happen that the buffer could have been published
165+ // by buffer sweeper, but it was skipped, because we have been
166+ // adding events, so lets check it and publish it if needed
167+ if buf .PublishAsap .Load () {
168+ client .publishBuffer (buf )
104169 }
105- }
106- if buf .ShouldSendSize () {
107- buf = publish (key , buf )
108- }
109- if added == buffer .TooMuch {
110- client .Logger .Fatal ("Bundle was not added for second time!" , buf .ZapStats ()... )
111170 }
112171 }
113172 }
114- buf .SetStatus (buffer .Ready )
115-
116- // it could happen that the buffer could have been published
117- // by buffer sweeper, but it was skipped, because we have been
118- // adding events, so lets check it and publish it if needed
119- if buf .PublishAsap .Load () {
120- client .PublishBuffer (buf )
121- }
122173}
123174
124- // IsProcessingData returns True if there are still some unprocessed data.
175+ // IsProcessingBuffers returns True if there are still some unprocessed data.
125176// False otherwise.
126- func (client * DataSetClient ) IsProcessingData () bool {
177+ func (client * DataSetClient ) IsProcessingBuffers () bool {
127178 return client .buffersEnqueued .Load () > client .buffersProcessed .Load ()
128179}
129180
181+ // IsProcessingBuffers returns True if there are still some unprocessed data.
182+ // False otherwise.
183+ func (client * DataSetClient ) IsProcessingEvents () bool {
184+ return client .eventsEnqueued .Load () > client .eventsProcessed .Load ()
185+ }
186+
130187// Finish stops processing of new events and waits until all the data that are
131188// being processed are really processed.
132189func (client * DataSetClient ) Finish () {
133190 // mark as finished
134191 client .finished .Store (true )
135192
193+ // do wait for all events to be processed
194+ i := 1
195+ for client .IsProcessingEvents () {
196+ client .Logger .Info (
197+ "Not all events has been processed" ,
198+ zap .Uint64 ("eventsEnqueued" , client .eventsEnqueued .Load ()),
199+ zap .Uint64 ("eventsProcessed" , client .eventsProcessed .Load ()),
200+ )
201+ time .Sleep (client .Config .RetryBase )
202+ i ++
203+ if i > 50 {
204+ break
205+ }
206+ }
207+
136208 // send all buffers
137209 client .SendAllAddEventsBuffers ()
138210
139- // do wait for everything to be processed
140- for client .IsProcessingData () {
211+ // do wait for all buffers to be processed
212+ j := 0
213+ for client .IsProcessingBuffers () {
141214 client .Logger .Info (
142215 "Not all buffers has been processed" ,
143216 zap .Uint64 ("buffersEnqueued" , client .buffersEnqueued .Load ()),
144217 zap .Uint64 ("buffersProcessed" , client .buffersProcessed .Load ()),
145218 )
146219 time .Sleep (client .Config .RetryBase )
220+ j ++
221+ if j > 50 {
222+ break
223+ }
147224 }
148225 client .workers .Wait ()
149226
@@ -187,23 +264,23 @@ func (client *DataSetClient) SendAddEventsBuffer(buf *buffer.Buffer) (*add_event
187264 return response , err
188265}
189266
190- func (client * DataSetClient ) groupBundles (bundles []* add_events.EventBundle ) map [string ][]* add_events.EventBundle {
191- grouped := make (map [string ][]* add_events.EventBundle )
192-
193- // group batch
194- for _ , bundle := range bundles {
195- if bundle == nil {
196- continue
197- }
198- key := bundle .Key (client .Config .GroupBy )
199- grouped [key ] = append (grouped [key ], bundle )
200- }
201- client .Logger .Debug ("Batch was grouped" ,
202- zap .Int ("batchSize" , len (bundles )),
203- zap .Int ("distinctStreams" , len (grouped )),
204- )
205- return grouped
206- }
267+ // func (client *DataSetClient) groupBundles(bundles []*add_events.EventBundle) map[string][]*add_events.EventBundle {
268+ // grouped := make(map[string][]*add_events.EventBundle)
269+ //
270+ // // group batch
271+ // for _, bundle := range bundles {
272+ // if bundle == nil {
273+ // continue
274+ // }
275+ // key := bundle.Key(client.Config.GroupBy)
276+ // grouped[key] = append(grouped[key], bundle)
277+ // }
278+ // client.Logger.Debug("Batch was grouped",
279+ // zap.Int("batchSize", len(bundles)),
280+ // zap.Int("distinctStreams", len(grouped)),
281+ // )
282+ // return grouped
283+ // }
207284
208285func (client * DataSetClient ) apiCall (req * http.Request , response response.SetResponseObj ) error {
209286 resp , err := client .Client .Do (req )
@@ -250,14 +327,14 @@ func (client *DataSetClient) SendAllAddEventsBuffers() {
250327 buffers := client .getBuffers ()
251328 client .Logger .Debug ("Send all AddEvents buffers" )
252329 for _ , buf := range buffers {
253- client .PublishBuffer (buf )
330+ client .publishBuffer (buf )
254331 }
255332}
256333
257334func (client * DataSetClient ) getBuffers () []* buffer.Buffer {
258- client .buffersMutex .Lock ()
259- defer client .buffersMutex .Unlock ()
260335 buffers := make ([]* buffer.Buffer , 0 )
336+ client .buffersAllMutex .Lock ()
337+ defer client .buffersAllMutex .Unlock ()
261338 for _ , buf := range client .buffer {
262339 buffers = append (buffers , buf )
263340 }
0 commit comments