Skip to content

Commit be69b0c

Browse files
FEATURE: bulk command logging for bop insert
1 parent 91a67b5 commit be69b0c

File tree

6 files changed

+372
-7
lines changed

6 files changed

+372
-7
lines changed

Makefile.am

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ memcached_SOURCES = \
8585
cmdlog.h \
8686
lqdetect.c \
8787
lqdetect.h \
88-
trace.h
88+
trace.h \
89+
cmd_in_second.c \
90+
cmd_in_second.h
91+
8992
memcached_LDFLAGS =-R '$(libdir)'
9093
memcached_CFLAGS = @PROFILER_FLAGS@ ${AM_CFLAGS}
9194
memcached_DEPENDENCIES = libmcd_util.la

cmd_in_second.c

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
#include "cmd_in_second.h"
2+
#include "include/memcached/extension.h"
3+
#include <string.h>
4+
#include <stdint.h>
5+
#include <assert.h>
6+
#include <unistd.h>
7+
#include <stdio.h>
8+
#include <sys/uio.h>
9+
#include <pthread.h>
10+
#include <stdlib.h>
11+
#include <fcntl.h>
12+
13+
#define LOG_PER_TIMER 500
14+
#define LOG_LENGTH 400
15+
#define IP_LENGTH 16
16+
#define KEY_LENGTH 256
17+
18+
typedef enum cmd_in_second_state{
19+
NOT_STARTED,
20+
ON_LOGGING,
21+
ON_FLUSHING
22+
}state;
23+
24+
typedef struct cmd_in_second_log{
25+
char key[256];
26+
char client_ip[16];
27+
int32_t timer_idx;
28+
}logtype;
29+
30+
typedef struct cmd_in_second_buffer {
31+
logtype *ring;
32+
int32_t front;
33+
int32_t rear;
34+
int32_t capacity;
35+
}buffertype;
36+
37+
typedef struct cmd_in_second_timer{
38+
struct timeval* times;
39+
int32_t size;
40+
int32_t counter;
41+
}timertype;
42+
43+
struct cmd_in_second {
44+
char cmd[10];
45+
char collection_name[4];
46+
struct cmd_in_second_buffer buffer;
47+
int32_t bulk_limit;
48+
timertype timer;
49+
state cur_state;
50+
};
51+
52+
static EXTENSION_LOGGER_DESCRIPTOR *mc_logger;
53+
static struct cmd_in_second this;
54+
55+
static bool is_bulk_cmd()
56+
{
57+
const logtype* front = &this.buffer.ring[this.buffer.front];
58+
const logtype* rear = &this.buffer.ring[this.buffer.rear];
59+
60+
struct timeval front_time = this.timer.times[front->timer_idx];
61+
struct timeval rear_time = this.timer.times[rear->timer_idx];
62+
63+
return rear_time.tv_sec - front_time.tv_sec <= 1;
64+
}
65+
66+
static void get_whole_cmd(char* whole_cmd)
67+
{
68+
if (this.collection_name) {
69+
sprintf(whole_cmd, "%s %s", this.collection_name, this.cmd);
70+
return;
71+
}
72+
sprintf(whole_cmd, "%s", this.cmd);
73+
}
74+
75+
static bool buffer_empty()
76+
{
77+
return this.buffer.front == this.buffer.rear;
78+
}
79+
80+
static void* buffer_flush_thread()
81+
{
82+
int32_t fd = open("cmd_in_second.log", O_CREAT | O_WRONLY | O_TRUNC, 0644);
83+
84+
if (fd < 0) {
85+
mc_logger->log(EXTENSION_LOG_WARNING, NULL,
86+
"Can't open cmd_in_second log file: %s\n", "cmd_in_second.log");
87+
return NULL;
88+
}
89+
90+
char whole_cmd[20] = {0};
91+
get_whole_cmd(whole_cmd);
92+
93+
buffertype* buffer = &this.buffer;
94+
timertype* timer = &this.timer;
95+
96+
int32_t timer_idx = -1;
97+
98+
char* log_str = (char*)malloc(LOG_LENGTH * this.bulk_limit * sizeof(char));
99+
100+
if (log_str == NULL) {
101+
mc_logger->log(EXTENSION_LOG_WARNING, NULL, "Can't allocate memory");
102+
return NULL;
103+
}
104+
105+
while (!buffer_empty()) {
106+
107+
logtype front = buffer->ring[buffer->front++];
108+
109+
char time_str[50] = "";
110+
111+
if (front.timer_idx != timer_idx) {
112+
113+
const struct timeval* front_time = &timer->times[front.timer_idx];
114+
const struct tm *lt = localtime((time_t*)&front_time->tv_sec);
115+
116+
sprintf(time_str, "%04d-%02d-%02d %02d:%02d:%02d.%06d\n", lt ->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, lt->tm_sec, (int32_t)front_time->tv_usec);
117+
timer_idx = front.timer_idx;
118+
}
119+
char log[LOG_LENGTH] = "";
120+
sprintf(log, "%s%s %s %s\n", time_str, whole_cmd, front.key, front.client_ip);
121+
strncat(log_str, log, LOG_LENGTH);
122+
123+
}
124+
125+
write(fd, log_str, strlen(log_str));
126+
127+
close(fd);
128+
129+
free(log_str);
130+
131+
free(this.timer.times);
132+
this.timer.times = NULL;
133+
134+
free(this.buffer.ring);
135+
this.buffer.ring = NULL;
136+
137+
this.cur_state = NOT_STARTED;
138+
139+
return NULL;
140+
}
141+
142+
static int32_t buffer_flush()
143+
{
144+
145+
this.cur_state = ON_FLUSHING;
146+
147+
int32_t ret = 0;
148+
pthread_t tid;
149+
pthread_attr_t attr;
150+
151+
if (pthread_attr_init(&attr) != 0 ||
152+
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
153+
(ret = pthread_create(&tid, &attr, buffer_flush_thread, NULL)) != 0)
154+
{
155+
mc_logger->log(EXTENSION_LOG_WARNING, NULL,
156+
"Can't create buffer flush thread: %s\n", strerror(ret));
157+
}
158+
159+
160+
return ret;
161+
}
162+
163+
static void buffer_add(const logtype* log)
164+
{
165+
166+
struct cmd_in_second_buffer* buffer = &this.buffer;
167+
168+
const bool buffer_full = (buffer->rear+1) % buffer->capacity == buffer->front;
169+
170+
if (buffer_full) {
171+
if (is_bulk_cmd()) {
172+
buffer_flush();
173+
return;
174+
}
175+
buffer->front = (buffer->front+1) % buffer->capacity;
176+
}
177+
178+
buffer->ring[buffer->rear] = *log;
179+
buffer->rear = (buffer->rear+1) % buffer->capacity;
180+
181+
}
182+
183+
static bool is_cmd_to_log(const char* collection_name, const char* cmd)
184+
{
185+
return strcmp(this.collection_name, collection_name) == 0 && strcmp(this.cmd, cmd) == 0;
186+
}
187+
188+
bool cmd_in_second_write(const char* collection_name, const char* cmd, const char* key, const char* client_ip)
189+
{
190+
if (this.cur_state != ON_LOGGING || !is_cmd_to_log(collection_name, cmd)) {
191+
return false;
192+
}
193+
194+
timertype *timer = &this.timer;
195+
196+
logtype log = { {0}, {0} };
197+
198+
snprintf(log.client_ip, IP_LENGTH, "%s", client_ip);
199+
snprintf(log.key, KEY_LENGTH, "%s", key);
200+
201+
if (timer->counter == 0) {
202+
timer->size++;
203+
gettimeofday(&timer->times[timer->size-1], NULL);
204+
}
205+
206+
log.timer_idx = timer->size-1;
207+
208+
buffer_add(&log);
209+
timer->counter = (timer->counter+1) % LOG_PER_TIMER;
210+
211+
return true;
212+
}
213+
214+
int32_t cmd_in_second_start(const char* collection_name, const char* cmd, const int32_t bulk_limit)
215+
{
216+
217+
if (this.cur_state != NOT_STARTED) {
218+
return CMD_IN_SECOND_STARTED_ALREADY;
219+
}
220+
221+
this.bulk_limit = bulk_limit;
222+
223+
this.buffer.capacity = bulk_limit+1;
224+
this.buffer.front = 0;
225+
this.buffer.rear = 0;
226+
this.buffer.ring = (logtype*)malloc(this.buffer.capacity * sizeof(logtype));
227+
228+
if (this.buffer.ring == NULL) {
229+
return CMD_IN_SECOND_NO_MEM;
230+
}
231+
232+
this.timer.size = 0;
233+
this.timer.counter = 0;
234+
this.timer.times = (struct timeval*)malloc(bulk_limit * sizeof(struct timeval));
235+
236+
if (this.timer.times == NULL) {
237+
free(this.buffer.ring);
238+
return CMD_IN_SECOND_NO_MEM;
239+
}
240+
241+
sprintf(this.collection_name, "%s", collection_name);
242+
sprintf(this.cmd, "%s", cmd);
243+
244+
this.cur_state = ON_LOGGING;
245+
246+
return CMD_IN_SECOND_START;
247+
}

cmd_in_second.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#ifndef __CMD_IN_SECOND_LOG__
2+
#define __CMD_IN_SECOND_LOG__
3+
#endif
4+
5+
#include <stdbool.h>
6+
#include <stdint.h>
7+
#include <sys/time.h>
8+
9+
#define CMD_IN_SECOND_START 0
10+
#define CMD_IN_SECOND_STARTED_ALREADY 1
11+
#define CMD_IN_SECOND_NO_MEM 2
12+
13+
int32_t cmd_in_second_start(const char* collection_name, const char* cmd, const int32_t bulk_limit);
14+
bool cmd_in_second_write(const char*collection_name, const char* cmd, const char* key, const char* client_ip);

0 commit comments

Comments
 (0)