@@ -37,6 +37,7 @@ typedef struct cmd_in_second_timer {
3737 int32_t front ;
3838 int32_t rear ;
3939 int32_t capacity ;
40+ int32_t last_elem_idx ;
4041 int32_t circular_counter ;
4142} timertype ;
4243
@@ -48,6 +49,7 @@ struct cmd_in_second {
4849 int32_t bulk_limit ;
4950 int32_t log_per_timer ;
5051 state cur_state ;
52+ pthread_mutex_t lock ;
5153};
5254
5355static struct cmd_in_second this ;
@@ -61,9 +63,12 @@ static bool is_bulk_cmd()
6163 }
6264
6365 const struct timeval * front_time = & this .timer .ring [this .timer .front ];
64- const struct timeval * rear_time = & this .timer .ring [this .timer .rear ];
66+ const struct timeval * last_time = & this .timer .ring [this .timer .last_elem_idx ];
6567
66- return rear_time -> tv_sec - front_time -> tv_sec <= 1 ;
68+ //printf("%d\n", this.timer.last_elem_idx);
69+ //assert(0);
70+
71+ return last_time -> tv_sec - front_time -> tv_sec <= 1 ;
6772}
6873
6974static void get_whole_cmd (char * whole_cmd )
@@ -161,7 +166,6 @@ static void* buffer_flush_thread()
161166
162167static int32_t buffer_flush ()
163168{
164- this .cur_state = ON_FLUSHING ;
165169
166170 pthread_t tid ;
167171 pthread_attr_t attr ;
@@ -191,12 +195,12 @@ static void buffer_add(const logtype* log)
191195
192196 if (buffer_full ) {
193197 if (is_bulk_cmd ()) {
198+ this .cur_state = ON_FLUSHING ;
194199 buffer_flush ();
195200 return ;
196201 }
197202 buffer -> front = (buffer -> front + 1 ) % buffer -> capacity ;
198203 }
199-
200204}
201205
202206static void timer_add ()
@@ -214,6 +218,7 @@ static void timer_add()
214218 return ;
215219 };
216220
221+ timer -> last_elem_idx = timer -> rear ;
217222 timer -> rear = (timer -> rear + 1 ) % timer -> capacity ;
218223}
219224
@@ -226,7 +231,9 @@ static bool is_cmd_to_log(const char* collection_name, const char* cmd)
226231bool cmd_in_second_write (const char * collection_name , const char * cmd ,
227232 const char * key , const char * client_ip )
228233{
234+ pthread_mutex_lock (& this .lock );
229235 if (this .cur_state != ON_LOGGING || !is_cmd_to_log (collection_name , cmd )) {
236+ pthread_mutex_unlock (& this .lock );
230237 return false;
231238 }
232239
@@ -241,38 +248,57 @@ bool cmd_in_second_write(const char* collection_name, const char* cmd,
241248 buffer_add (& log );
242249 this .timer .circular_counter = (this .timer .circular_counter + 1 ) % this .log_per_timer ;
243250
251+ pthread_mutex_unlock (& this .lock );
244252 return true;
245253}
246254
255+ void cmd_in_second_init ()
256+ {
257+ assert ("test" );
258+ this .cur_state = NOT_STARTED ;
259+ pthread_mutex_init (& this .lock , NULL );
260+
261+ this .buffer .front = 0 ;
262+ this .buffer .rear = 0 ;
263+ this .buffer .ring = NULL ;
264+
265+ this .timer .front = 0 ;
266+ this .timer .rear = 0 ;
267+ this .timer .capacity = 0 ;
268+ this .timer .circular_counter = 0 ;
269+ this .timer .last_elem_idx = 0 ;
270+ this .timer .ring = NULL ;
271+ }
272+
247273int32_t cmd_in_second_start (const char * collection_name , const char * cmd ,
248274 const int32_t bulk_limit )
249275{
250276
277+ pthread_mutex_lock (& this .lock );
278+
251279 if (this .cur_state != NOT_STARTED ) {
280+ pthread_mutex_unlock (& this .lock );
252281 return CMD_IN_SECOND_STARTED_ALREADY ;
253282 }
254283
255284 this .bulk_limit = bulk_limit ;
256285
257286 this .buffer .capacity = bulk_limit + 1 ;
258- this .buffer .front = 0 ;
259- this .buffer .rear = 0 ;
260287 this .buffer .ring = (logtype * )malloc (this .buffer .capacity * sizeof (logtype ));
261288
262289 if (this .buffer .ring == NULL ) {
290+ pthread_mutex_unlock (& this .lock );
263291 return CMD_IN_SECOND_NO_MEM ;
264292 }
265293
266294 this .log_per_timer = bulk_limit / 10 + (bulk_limit % 10 != 0 );
267295 this .timer .capacity = this .log_per_timer + 1 ;
268- this .timer .front = 0 ;
269- this .timer .rear = 0 ;
270- this .timer .circular_counter = 0 ;
271296
272297 this .timer .ring = (struct timeval * )malloc (this .timer .capacity * sizeof (struct timeval ));
273298
274299 if (this .timer .ring == NULL ) {
275300 free (this .buffer .ring );
301+ pthread_mutex_unlock (& this .lock );
276302 return CMD_IN_SECOND_NO_MEM ;
277303 }
278304
@@ -281,5 +307,7 @@ int32_t cmd_in_second_start(const char* collection_name, const char* cmd,
281307
282308 this .cur_state = ON_LOGGING ;
283309
310+ pthread_mutex_unlock (& this .lock );
311+
284312 return CMD_IN_SECOND_START ;
285313}
0 commit comments