Skip to content

Commit 40b5b32

Browse files
thread design
1 parent b633ab1 commit 40b5b32

File tree

5 files changed

+479
-302
lines changed

5 files changed

+479
-302
lines changed

cmd_in_second.c

Lines changed: 149 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#define LOG_LENGTH 400
1414
#define IP_LENGTH 16
1515
#define KEY_LENGTH 256
16+
#define CMD_STR_LEN 30
1617

1718
typedef enum cmd_in_second_state {
1819
NOT_STARTED,
@@ -41,14 +42,24 @@ typedef struct cmd_in_second_timer {
4142
int32_t circular_counter;
4243
} timertype;
4344

45+
typedef struct cmd_in_second_flush_thread {
46+
pthread_t tid;
47+
pthread_attr_t attr;
48+
pthread_cond_t cond;
49+
pthread_mutex_t lock;
50+
bool sleep;
51+
} flush_thread;
52+
4453
struct cmd_in_second {
45-
int cmd;
54+
int operation;
4655
char cmd_str[50];
4756
struct cmd_in_second_buffer buffer;
4857
timertype timer;
4958
int32_t bulk_limit;
5059
int32_t log_per_timer;
5160
state cur_state;
61+
flush_thread flusher;
62+
EXTENSION_LOGGER_DESCRIPTOR *mc_logger;
5263
pthread_mutex_t lock;
5364
};
5465

@@ -65,9 +76,6 @@ static bool is_bulk_cmd()
6576
const struct timeval* front_time = &this.timer.ring[this.timer.front];
6677
const struct timeval* last_time = &this.timer.ring[this.timer.last_elem_idx];
6778

68-
//printf("%d\n", this.timer.last_elem_idx);
69-
//assert(0);
70-
7179
return last_time->tv_sec - front_time->tv_sec <= 1;
7280
}
7381

@@ -76,71 +84,124 @@ static bool buffer_empty()
7684
return this.buffer.front == this.buffer.rear;
7785
}
7886

79-
static void* buffer_flush_thread()
80-
{
81-
const int32_t fd = open("cmd_in_second.log", O_CREAT | O_WRONLY | O_TRUNC, 0644);
87+
static bool buffer_full() {
88+
return (this.buffer.rear+1) % this.buffer.capacity == this.buffer.front;
89+
}
8290

83-
if (fd < 0) {
84-
perror("Can't open cmd_in_second log file: cmd_in_second.log");
85-
return NULL;
91+
static void put_flusher_to_sleep() {
92+
struct timeval now;
93+
struct timespec timeout;
94+
95+
pthread_mutex_lock(&this.flusher.lock);
96+
gettimeofday(&now, NULL);
97+
98+
now.tv_usec += 50000;
99+
100+
if (now.tv_usec >= 1000000) {
101+
now.tv_sec += 1;
102+
now.tv_usec -= 1000000;
86103
}
87104

88-
buffertype* const buffer = &this.buffer;
89-
timertype* const timer = &this.timer;
105+
timeout.tv_sec = now.tv_sec;
106+
timeout.tv_nsec = now.tv_usec * 1000;
107+
108+
flush_thread* const flusher = &this.flusher;
90109

91-
char* log_str = (char*)malloc(LOG_LENGTH * this.bulk_limit * sizeof(char));
110+
flusher->sleep = true;
111+
pthread_cond_timedwait(&flusher->cond, &flusher->lock, &timeout);
112+
flusher->sleep = false;
92113

93-
if (log_str == NULL) {
94-
perror("Can't allocate memory");
95-
return NULL;
114+
pthread_mutex_unlock(&this.flusher.lock);
115+
}
116+
117+
static void wake_flusher_up(){
118+
printf("in\n");
119+
pthread_mutex_lock(&this.flusher.lock);
120+
if (this.flusher.sleep) {
121+
printf("signaled\n");
122+
pthread_cond_signal(&this.flusher.cond);
96123
}
124+
pthread_mutex_unlock(&this.flusher.lock);
125+
}
97126

98-
const size_t cmd_len = strlen(this.cmd_str);
99-
const int32_t whitespaces = 3;
127+
static void* flush_buffer()
128+
{
129+
const int32_t fd = open("cmd_in_second.log", O_CREAT | O_WRONLY | O_TRUNC, 0644);
130+
131+
while(1) {
132+
if (fd < 0) {
133+
perror("Can't open cmd_in_second log file: cmd_in_second.log");
134+
break;
135+
}
100136

101-
size_t expected_write_length = 0;
102-
int32_t circular_log_counter = 0;
137+
if (this.cur_state != ON_FLUSHING) {
138+
put_flusher_to_sleep();
139+
continue;
140+
}
103141

104-
while (!buffer_empty()) {
142+
buffertype* const buffer = &this.buffer;
143+
timertype* const timer = &this.timer;
105144

106-
const logtype front = buffer->ring[buffer->front];
107-
buffer->front = (buffer->front+1) % buffer->capacity;
145+
char* log_str = (char*)malloc(LOG_LENGTH * this.bulk_limit * sizeof(char));
146+
147+
if (log_str == NULL) {
148+
break;
149+
}
108150

109-
char time_str[50] = "";
151+
const size_t cmd_len = strlen(this.cmd_str);
152+
const int32_t whitespaces = 3;
110153

111-
if (circular_log_counter == 0) {
154+
size_t expected_write_length = 0;
155+
int32_t circular_log_counter = 0;
112156

113-
const struct timeval* front_time = &timer->ring[timer->front];
114-
const struct tm* lt = localtime((time_t*)&front_time->tv_sec);
157+
while (!buffer_empty()) {
115158

116-
timer->front = (timer->front+1) % timer->capacity;
159+
const logtype front = buffer->ring[buffer->front];
160+
buffer->front = (buffer->front+1) % buffer->capacity;
117161

118-
if (lt == NULL) {
119-
perror("localtime failed");
120-
continue;
162+
char time_str[50] = "";
163+
164+
if (circular_log_counter == 0) {
165+
166+
const struct timeval* front_time = &timer->ring[timer->front];
167+
const struct tm* lt = localtime((time_t*)&front_time->tv_sec);
168+
169+
timer->front = (timer->front+1) % timer->capacity;
170+
171+
if (lt == NULL) {
172+
perror("localtime failed");
173+
continue;
174+
}
175+
176+
sprintf(time_str, "%04d-%02d-%02d %02d:%02d:%02d.%06d\n", lt ->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday,
177+
lt->tm_hour, lt->tm_min, lt->tm_sec, (int32_t)front_time->tv_usec);
178+
expected_write_length += 27;
121179
}
122180

123-
sprintf(time_str, "%04d-%02d-%02d %02d:%02d:%02d.%06d\n", lt ->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday,
124-
lt->tm_hour, lt->tm_min, lt->tm_sec, (int32_t)front_time->tv_usec);
125-
expected_write_length += 27;
181+
char log[LOG_LENGTH] = "";
182+
snprintf(log, LOG_LENGTH, "%s%s %s %s\n", time_str, this.cmd_str, front.key, front.client_ip);
183+
strncat(log_str, log, LOG_LENGTH);
184+
185+
expected_write_length += cmd_len + strlen(front.key) + strlen(front.client_ip) + whitespaces;
186+
circular_log_counter = (circular_log_counter+1) % this.log_per_timer;
126187
}
127188

128-
char log[LOG_LENGTH] = "";
129-
snprintf(log, LOG_LENGTH, "%s%s %s %s\n", time_str, this.cmd_str, front.key, front.client_ip);
130-
strncat(log_str, log, LOG_LENGTH);
189+
const int written_length = write(fd, log_str, expected_write_length);
190+
printf("%s", log_str);
191+
free(log_str);
131192

132-
expected_write_length += cmd_len + strlen(front.key) + strlen(front.client_ip) + whitespaces;
133-
circular_log_counter = (circular_log_counter+1) % this.log_per_timer;
134-
}
193+
if (written_length != expected_write_length) {
194+
perror("write length is difference to expectation.");
195+
break;
196+
}
135197

136-
if (write(fd, log_str, expected_write_length) != expected_write_length) {
137-
perror("write length is difference to expectation.");
198+
break;
138199
}
139200

140-
close(fd);
141-
142-
free(log_str);
143-
201+
if (fd >= 0) {
202+
close(fd);
203+
}
204+
144205
free(this.timer.ring);
145206
this.timer.ring = NULL;
146207

@@ -152,49 +213,32 @@ static void* buffer_flush_thread()
152213
return NULL;
153214
}
154215

155-
static int32_t buffer_flush()
156-
{
157-
158-
pthread_t tid;
159-
pthread_attr_t attr;
160-
161-
int32_t ret = 0;
162-
163-
if (pthread_attr_init(&attr) != 0 ||
164-
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
165-
(ret = pthread_create(&tid, &attr, buffer_flush_thread, NULL)) != 0)
166-
{
167-
perror("Can't create buffer flush thread");
168-
return ret;
169-
}
170-
171-
return ret;
172-
}
173-
174216
static void buffer_add(const logtype* log)
175217
{
176-
218+
printf("add ready\n");
177219
struct cmd_in_second_buffer* const buffer = &this.buffer;
178220

179221
buffer->ring[buffer->rear] = *log;
180222
buffer->rear = (buffer->rear+1) % buffer->capacity;
223+
printf("enqueued\n");
181224

182-
const bool buffer_full = (buffer->rear+1) % buffer->capacity == buffer->front;
183-
184-
if (buffer_full) {
225+
if (buffer_full()) {
226+
printf("buffer full\n");
185227
if (is_bulk_cmd()) {
228+
printf("bulk_cmd\n");
186229
this.cur_state = ON_FLUSHING;
187-
buffer_flush();
230+
wake_flusher_up();
231+
printf("flush woked up\n");
188232
return;
189233
}
190234
buffer->front = (buffer->front+1) % buffer->capacity;
235+
printf("buffer popped\n");
191236
}
192237
}
193238

194239
static void timer_add()
195240
{
196241
struct cmd_in_second_timer* const timer = &this.timer;
197-
198242
const bool timer_full = (timer->rear+1) % timer->capacity == timer->front;
199243

200244
if (timer_full) {
@@ -210,17 +254,19 @@ static void timer_add()
210254
timer->rear = (timer->rear+1) % timer->capacity;
211255
}
212256

213-
static bool is_cmd_to_log(const char* collection_name, const char* cmd)
257+
static bool is_cmd_to_log(const int operation)
214258
{
215-
return strcmp(this.collection_name, collection_name) == 0 &&
216-
strcmp(this.cmd, cmd) == 0;
259+
return this.operation == operation;
217260
}
218261

219-
bool cmd_in_second_write(const int cmd, const char* key, const char* client_ip)
262+
bool cmd_in_second_write(const int operation, const char* key, const char* client_ip)
220263
{
221264
pthread_mutex_lock(&this.lock);
222-
if (this.cur_state != ON_LOGGING || !is_cmd_to_log(collection_name, cmd)) {
265+
printf("locked %s %d \n", key, this.cur_state);
266+
267+
if (this.cur_state != ON_LOGGING || !is_cmd_to_log(operation)) {
223268
pthread_mutex_unlock(&this.lock);
269+
printf("already locked\n");
224270
return false;
225271
}
226272

@@ -233,17 +279,19 @@ bool cmd_in_second_write(const int cmd, const char* key, const char* client_ip)
233279
}
234280

235281
buffer_add(&log);
282+
printf("buffer add finished\n");
236283
this.timer.circular_counter = (this.timer.circular_counter+1) % this.log_per_timer;
237284

238285
pthread_mutex_unlock(&this.lock);
286+
printf("unlocked\n");
239287
return true;
240288
}
241289

242290
/* TODO mc_logger */
243-
void cmd_in_second_init()
291+
void cmd_in_second_init(EXTENSION_LOGGER_DESCRIPTOR *mc_logger)
244292
{
293+
this.mc_logger = mc_logger;
245294
this.cur_state = NOT_STARTED;
246-
pthread_mutex_init(&this.lock, NULL);
247295

248296
this.buffer.front = 0;
249297
this.buffer.rear = 0;
@@ -255,9 +303,16 @@ void cmd_in_second_init()
255303
this.timer.circular_counter = 0;
256304
this.timer.last_elem_idx = 0;
257305
this.timer.ring = NULL;
306+
307+
pthread_mutex_init(&this.lock, NULL);
308+
309+
flush_thread* const flusher = &this.flusher;
310+
pthread_attr_init(&flusher->attr);
311+
pthread_mutex_init(&flusher->lock, NULL);
312+
pthread_cond_init(&flusher->cond, NULL);
258313
}
259314

260-
int cmd_in_second_start(const int cmd, const int bulk_limit)
315+
int cmd_in_second_start(const int operation, const char cmd_str[], const int bulk_limit)
261316
{
262317

263318
pthread_mutex_lock(&this.lock);
@@ -267,8 +322,21 @@ int cmd_in_second_start(const int cmd, const int bulk_limit)
267322
return CMD_IN_SECOND_STARTED_ALREADY;
268323
}
269324

270-
this.cmd = cmd;
325+
int thread_created = -1;
326+
327+
flush_thread* const flusher = &this.flusher;
328+
329+
if (pthread_attr_init(&flusher->attr) != 0 ||
330+
pthread_attr_setdetachstate(&flusher->attr, PTHREAD_CREATE_DETACHED) != 0 ||
331+
(thread_created = pthread_create(&flusher->tid, &flusher->attr,
332+
flush_buffer, NULL)) != 0)
333+
{
334+
return CMD_IN_SECOND_THREAD_FAILED;
335+
}
336+
337+
this.operation = operation;
271338
this.bulk_limit = bulk_limit;
339+
snprintf(this.cmd_str, CMD_STR_LEN, cmd_str);
272340

273341
this.buffer.capacity = bulk_limit+1;
274342
this.buffer.ring = (logtype*)malloc(this.buffer.capacity * sizeof(logtype));
@@ -280,7 +348,6 @@ int cmd_in_second_start(const int cmd, const int bulk_limit)
280348

281349
this.log_per_timer = bulk_limit / 10 + (bulk_limit % 10 != 0);
282350
this.timer.capacity = this.log_per_timer + 1;
283-
284351
this.timer.ring = (struct timeval*)malloc(this.timer.capacity * sizeof(struct timeval));
285352

286353
if (this.timer.ring == NULL) {
@@ -291,6 +358,7 @@ int cmd_in_second_start(const int cmd, const int bulk_limit)
291358

292359
this.cur_state = ON_LOGGING;
293360

361+
printf("log start\n");
294362
pthread_mutex_unlock(&this.lock);
295363

296364
return CMD_IN_SECOND_START;

cmd_in_second.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44

55
#include <stdbool.h>
66
#include <stdint.h>
7-
#include <sys/time.h>
7+
#include <sys/time.h>
8+
#include <include/memcached/extension.h>
89

910
#define CMD_IN_SECOND_START 0
1011
#define CMD_IN_SECOND_STARTED_ALREADY 1
1112
#define CMD_IN_SECOND_NO_MEM 2
13+
#define CMD_IN_SECOND_THREAD_FAILED 3
1214

13-
void cmd_in_second_init(void);
14-
int32_t cmd_in_second_start(const char* collection_name, const char* cmd, const int32_t bulk_limit);
15-
bool cmd_in_second_write(const char* collection_name, const char* cmd, const char* key, const char* client_ip);
15+
void cmd_in_second_init(EXTENSION_LOGGER_DESCRIPTOR *mc_logger);
16+
int32_t cmd_in_second_start(const int operation, const char cmd[], const int32_t bulk_limit);
17+
bool cmd_in_second_write(const int operation, const char* key, const char* client_ip);

0 commit comments

Comments
 (0)