@@ -34,6 +34,7 @@ typedef struct cmd_in_second_buffer {
3434
3535typedef struct cmd_in_second_timer {
3636 struct timeval * ring ;
37+ struct timeval last_elem ;
3738 int32_t front ;
3839 int32_t rear ;
3940 int32_t capacity ;
@@ -48,6 +49,7 @@ struct cmd_in_second {
4849 int32_t bulk_limit ;
4950 int32_t log_per_timer ;
5051 state cur_state ;
52+ pthread_mutex_t lock ;
5153};
5254
5355static struct cmd_in_second this ;
@@ -61,9 +63,9 @@ static bool is_bulk_cmd()
6163 }
6264
6365 const struct timeval * front_time = & this .timer .ring [this .timer .front ];
64- const struct timeval * rear_time = & this .timer .ring [ this . timer . rear ] ;
66+ const struct timeval * last_time = & this .timer .last_elem ;
6567
66- return rear_time -> tv_sec - front_time -> tv_sec <= 1 ;
68+ return last_time -> tv_sec - front_time -> tv_sec <= 1 ;
6769}
6870
6971static void get_whole_cmd (char * whole_cmd )
@@ -196,7 +198,6 @@ static void buffer_add(const logtype* log)
196198 }
197199 buffer -> front = (buffer -> front + 1 ) % buffer -> capacity ;
198200 }
199-
200201}
201202
202203static void timer_add ()
@@ -214,6 +215,7 @@ static void timer_add()
214215 return ;
215216 };
216217
218+ timer -> last_elem = timer -> ring [timer -> rear ];
217219 timer -> rear = (timer -> rear + 1 ) % timer -> capacity ;
218220}
219221
@@ -226,7 +228,9 @@ static bool is_cmd_to_log(const char* collection_name, const char* cmd)
226228bool cmd_in_second_write (const char * collection_name , const char * cmd ,
227229 const char * key , const char * client_ip )
228230{
231+ pthread_mutex_lock (& this .lock );
229232 if (this .cur_state != ON_LOGGING || !is_cmd_to_log (collection_name , cmd )) {
233+ pthread_mutex_unlock (& this .lock );
230234 return false;
231235 }
232236
@@ -241,38 +245,56 @@ bool cmd_in_second_write(const char* collection_name, const char* cmd,
241245 buffer_add (& log );
242246 this .timer .circular_counter = (this .timer .circular_counter + 1 ) % this .log_per_timer ;
243247
248+ pthread_mutex_unlock (& this .lock );
244249 return true;
245250}
246251
252+ void cmd_in_second_init ()
253+ {
254+ assert ("test" );
255+ this .cur_state = NOT_STARTED ;
256+ pthread_mutex_init (& this .lock , NULL );
257+
258+ this .buffer .front = 0 ;
259+ this .buffer .rear = 0 ;
260+ this .buffer .ring = NULL ;
261+
262+ this .timer .front = 0 ;
263+ this .timer .rear = 0 ;
264+ this .timer .capacity = 0 ;
265+ this .timer .circular_counter = 0 ;
266+ this .timer .ring = NULL ;
267+ }
268+
247269int32_t cmd_in_second_start (const char * collection_name , const char * cmd ,
248270 const int32_t bulk_limit )
249271{
250272
273+ pthread_mutex_lock (& this .lock );
274+
251275 if (this .cur_state != NOT_STARTED ) {
276+ pthread_mutex_unlock (& this .lock );
252277 return CMD_IN_SECOND_STARTED_ALREADY ;
253278 }
254279
255280 this .bulk_limit = bulk_limit ;
256281
257282 this .buffer .capacity = bulk_limit + 1 ;
258- this .buffer .front = 0 ;
259- this .buffer .rear = 0 ;
260283 this .buffer .ring = (logtype * )malloc (this .buffer .capacity * sizeof (logtype ));
261284
262285 if (this .buffer .ring == NULL ) {
286+ pthread_mutex_unlock (& this .lock );
263287 return CMD_IN_SECOND_NO_MEM ;
264288 }
265289
266290 this .log_per_timer = bulk_limit / 10 + (bulk_limit % 10 != 0 );
267291 this .timer .capacity = this .log_per_timer + 1 ;
268- this .timer .front = 0 ;
269- this .timer .rear = 0 ;
270- this .timer .circular_counter = 0 ;
271292
272293 this .timer .ring = (struct timeval * )malloc (this .timer .capacity * sizeof (struct timeval ));
273294
274295 if (this .timer .ring == NULL ) {
275296 free (this .buffer .ring );
297+ pthread_mutex_unlock (& this .lock );
276298 return CMD_IN_SECOND_NO_MEM ;
277299 }
278300
@@ -281,5 +303,7 @@ int32_t cmd_in_second_start(const char* collection_name, const char* cmd,
281303
282304 this .cur_state = ON_LOGGING ;
283305
306+ pthread_mutex_unlock (& this .lock );
307+
284308 return CMD_IN_SECOND_START ;
285309}
0 commit comments