@@ -48,6 +48,7 @@ struct cmd_in_second {
4848 int32_t bulk_limit ;
4949 int32_t log_per_timer ;
5050 state cur_state ;
51+ pthread_mutex_t lock ;
5152};
5253
5354static struct cmd_in_second this ;
@@ -196,7 +197,6 @@ static void buffer_add(const logtype* log)
196197 }
197198 buffer -> front = (buffer -> front + 1 ) % buffer -> capacity ;
198199 }
199-
200200}
201201
202202static void timer_add ()
@@ -226,7 +226,9 @@ static bool is_cmd_to_log(const char* collection_name, const char* cmd)
226226bool cmd_in_second_write (const char * collection_name , const char * cmd ,
227227 const char * key , const char * client_ip )
228228{
229+ pthread_mutex_lock (& this .lock );
229230 if (this .cur_state != ON_LOGGING || !is_cmd_to_log (collection_name , cmd )) {
231+ pthread_mutex_unlock (& this .lock );
230232 return false;
231233 }
232234
@@ -241,38 +243,56 @@ bool cmd_in_second_write(const char* collection_name, const char* cmd,
241243 buffer_add (& log );
242244 this .timer .circular_counter = (this .timer .circular_counter + 1 ) % this .log_per_timer ;
243245
246+ pthread_mutex_unlock (& this .lock );
244247 return true;
245248}
246249
250+ void cmd_in_second_init ()
251+ {
252+ assert ("test" );
253+ this .cur_state = NOT_STARTED ;
254+ pthread_mutex_init (& this .lock , NULL );
255+
256+ this .buffer .front = 0 ;
257+ this .buffer .rear = 0 ;
258+ this .buffer .ring = NULL ;
259+
260+ this .timer .front = 0 ;
261+ this .timer .rear = 0 ;
262+ this .timer .capacity = 0 ;
263+ this .timer .circular_counter = 0 ;
264+ this .timer .ring = NULL ;
265+ }
266+
247267int32_t cmd_in_second_start (const char * collection_name , const char * cmd ,
248268 const int32_t bulk_limit )
249269{
250270
271+ pthread_mutex_lock (& this .lock );
272+
251273 if (this .cur_state != NOT_STARTED ) {
274+ pthread_mutex_unlock (& this .lock );
252275 return CMD_IN_SECOND_STARTED_ALREADY ;
253276 }
254277
255278 this .bulk_limit = bulk_limit ;
256279
257280 this .buffer .capacity = bulk_limit + 1 ;
258- this .buffer .front = 0 ;
259- this .buffer .rear = 0 ;
260281 this .buffer .ring = (logtype * )malloc (this .buffer .capacity * sizeof (logtype ));
261282
262283 if (this .buffer .ring == NULL ) {
284+ pthread_mutex_unlock (& this .lock );
263285 return CMD_IN_SECOND_NO_MEM ;
264286 }
265287
266288 this .log_per_timer = bulk_limit / 10 + (bulk_limit % 10 != 0 );
267289 this .timer .capacity = this .log_per_timer + 1 ;
268- this .timer .front = 0 ;
269- this .timer .rear = 0 ;
270- this .timer .circular_counter = 0 ;
271290
272291 this .timer .ring = (struct timeval * )malloc (this .timer .capacity * sizeof (struct timeval ));
273292
274293 if (this .timer .ring == NULL ) {
275294 free (this .buffer .ring );
295+ pthread_mutex_unlock (& this .lock );
276296 return CMD_IN_SECOND_NO_MEM ;
277297 }
278298
@@ -281,5 +301,7 @@ int32_t cmd_in_second_start(const char* collection_name, const char* cmd,
281301
282302 this .cur_state = ON_LOGGING ;
283303
304+ pthread_mutex_unlock (& this .lock );
305+
284306 return CMD_IN_SECOND_START ;
285307}
0 commit comments