Skip to content

Commit 83d6346

Browse files
authored
Merge pull request #1077 from Barenboim/master
Optimize communicator.
2 parents 87d95be + cbd5463 commit 83d6346

3 files changed

Lines changed: 100 additions & 41 deletions

File tree

src/kernel/Communicator.cc

Lines changed: 91 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,8 +1141,12 @@ void Communicator::handler_thread_routine(void *context)
11411141
Communicator *comm = (Communicator *)context;
11421142
struct poller_result *res;
11431143

1144-
while ((res = (struct poller_result *)msgqueue_get(comm->queue)) != NULL)
1144+
while (1)
11451145
{
1146+
res = (struct poller_result *)msgqueue_get(comm->msgqueue);
1147+
if (!res)
1148+
break;
1149+
11461150
switch (res->data.operation)
11471151
{
11481152
case PD_OP_READ:
@@ -1174,7 +1178,8 @@ void Communicator::handler_thread_routine(void *context)
11741178
}
11751179
}
11761180

1177-
int Communicator::append(const void *buf, size_t *size, poller_message_t *msg)
1181+
int Communicator::append_request(const void *buf, size_t *size,
1182+
poller_message_t *msg)
11781183
{
11791184
CommMessageIn *in = (CommMessageIn *)msg;
11801185
struct CommConnEntry *entry = in->entry;
@@ -1186,17 +1191,37 @@ int Communicator::append(const void *buf, size_t *size, poller_message_t *msg)
11861191
if (ret > 0)
11871192
{
11881193
entry->state = CONN_STATE_SUCCESS;
1189-
if (entry->service)
1190-
timeout = -1;
1191-
else
1194+
timeout = -1;
1195+
}
1196+
else if (ret == 0 && session->timeout != 0)
1197+
timeout = Communicator::next_timeout(session);
1198+
else
1199+
return ret;
1200+
1201+
/* This set_timeout() never fails, which is very important. */
1202+
mpoller_set_timeout(entry->sockfd, timeout, entry->mpoller);
1203+
return ret;
1204+
}
1205+
1206+
int Communicator::append_reply(const void *buf, size_t *size,
1207+
poller_message_t *msg)
1208+
{
1209+
CommMessageIn *in = (CommMessageIn *)msg;
1210+
struct CommConnEntry *entry = in->entry;
1211+
CommSession *session = entry->session;
1212+
int timeout;
1213+
int ret;
1214+
1215+
ret = in->append(buf, size);
1216+
if (ret > 0)
1217+
{
1218+
entry->state = CONN_STATE_SUCCESS;
1219+
timeout = session->keep_alive_timeout();
1220+
session->timeout = timeout; /* Reuse session's timeout field. */
1221+
if (timeout == 0)
11921222
{
1193-
timeout = session->keep_alive_timeout();
1194-
session->timeout = timeout; /* Reuse session's timeout field. */
1195-
if (timeout == 0)
1196-
{
1197-
mpoller_del(entry->sockfd, entry->mpoller);
1198-
return ret;
1199-
}
1223+
mpoller_del(entry->sockfd, entry->mpoller);
1224+
return ret;
12001225
}
12011226
}
12021227
else if (ret == 0 && session->timeout != 0)
@@ -1256,32 +1281,50 @@ int Communicator::create_service_session(struct CommConnEntry *entry)
12561281
return -1;
12571282
}
12581283

1259-
poller_message_t *Communicator::create_message(void *context)
1284+
poller_message_t *Communicator::create_request(struct CommConnEntry *entry)
12601285
{
1261-
struct CommConnEntry *entry = (struct CommConnEntry *)context;
12621286
CommSession *session;
12631287

12641288
if (entry->state == CONN_STATE_IDLE)
12651289
{
1266-
pthread_mutex_t *mutex;
1290+
pthread_mutex_lock(&entry->target->mutex);
1291+
/* do nothing */
1292+
pthread_mutex_unlock(&entry->target->mutex);
1293+
}
1294+
1295+
if (entry->state != CONN_STATE_KEEPALIVE &&
1296+
entry->state != CONN_STATE_CONNECTED)
1297+
{
1298+
errno = EBADMSG;
1299+
return NULL;
1300+
}
12671301

1268-
if (entry->service)
1269-
mutex = &entry->target->mutex;
1270-
else
1271-
mutex = &entry->mutex;
1302+
if (Communicator::create_service_session(entry) < 0)
1303+
return NULL;
12721304

1273-
pthread_mutex_lock(mutex);
1274-
/* do nothing */
1275-
pthread_mutex_unlock(mutex);
1305+
session = entry->session;
1306+
session->in = session->message_in();
1307+
if (session->in)
1308+
{
1309+
session->in->poller_message_t::append = Communicator::append_request;
1310+
session->in->entry = entry;
12761311
}
12771312

1278-
if (entry->state == CONN_STATE_CONNECTED ||
1279-
entry->state == CONN_STATE_KEEPALIVE)
1313+
return session->in;
1314+
}
1315+
1316+
poller_message_t *Communicator::create_reply(struct CommConnEntry *entry)
1317+
{
1318+
CommSession *session;
1319+
1320+
if (entry->state == CONN_STATE_IDLE)
12801321
{
1281-
if (Communicator::create_service_session(entry) < 0)
1282-
return NULL;
1322+
pthread_mutex_lock(&entry->mutex);
1323+
/* do nothing */
1324+
pthread_mutex_unlock(&entry->mutex);
12831325
}
1284-
else if (entry->state != CONN_STATE_RECEIVING)
1326+
1327+
if (entry->state != CONN_STATE_RECEIVING)
12851328
{
12861329
errno = EBADMSG;
12871330
return NULL;
@@ -1291,13 +1334,23 @@ poller_message_t *Communicator::create_message(void *context)
12911334
session->in = session->message_in();
12921335
if (session->in)
12931336
{
1294-
session->in->poller_message_t::append = Communicator::append;
1337+
session->in->poller_message_t::append = Communicator::append_reply;
12951338
session->in->entry = entry;
12961339
}
12971340

12981341
return session->in;
12991342
}
13001343

1344+
poller_message_t *Communicator::create_message(void *context)
1345+
{
1346+
struct CommConnEntry *entry = (struct CommConnEntry *)context;
1347+
1348+
if (entry->service)
1349+
return Communicator::create_request(entry);
1350+
else
1351+
return Communicator::create_reply(entry);
1352+
}
1353+
13011354
int Communicator::partial_written(size_t n, void *context)
13021355
{
13031356
struct CommConnEntry *entry = (struct CommConnEntry *)context;
@@ -1311,8 +1364,8 @@ int Communicator::partial_written(size_t n, void *context)
13111364

13121365
void Communicator::callback(struct poller_result *res, void *context)
13131366
{
1314-
Communicator *comm = (Communicator *)context;
1315-
msgqueue_put(res, comm->queue);
1367+
msgqueue_t *msgqueue = (msgqueue_t *)context;
1368+
msgqueue_put(res, msgqueue);
13161369
}
13171370

13181371
void *Communicator::accept(const struct sockaddr *addr, socklen_t addrlen,
@@ -1359,7 +1412,7 @@ int Communicator::create_handler_threads(size_t handler_threads)
13591412
if (i == handler_threads)
13601413
return 0;
13611414

1362-
msgqueue_set_nonblock(this->queue);
1415+
msgqueue_set_nonblock(this->msgqueue);
13631416
thrdpool_destroy(NULL, this->thrdpool);
13641417
}
13651418

@@ -1373,15 +1426,15 @@ int Communicator::create_poller(size_t poller_threads)
13731426
.create_message = Communicator::create_message,
13741427
.partial_written = Communicator::partial_written,
13751428
.callback = Communicator::callback,
1376-
.context = this
13771429
};
13781430

13791431
if ((ssize_t)params.max_open_files < 0)
13801432
return -1;
13811433

1382-
this->queue = msgqueue_create(4096, sizeof (struct poller_result));
1383-
if (this->queue)
1434+
this->msgqueue = msgqueue_create(4096, sizeof (struct poller_result));
1435+
if (this->msgqueue)
13841436
{
1437+
params.context = this->msgqueue;
13851438
this->mpoller = mpoller_create(&params, poller_threads);
13861439
if (this->mpoller)
13871440
{
@@ -1391,7 +1444,7 @@ int Communicator::create_poller(size_t poller_threads)
13911444
mpoller_destroy(this->mpoller);
13921445
}
13931446

1394-
msgqueue_destroy(this->queue);
1447+
msgqueue_destroy(this->msgqueue);
13951448
}
13961449

13971450
return -1;
@@ -1415,7 +1468,7 @@ int Communicator::init(size_t poller_threads, size_t handler_threads)
14151468

14161469
mpoller_stop(this->mpoller);
14171470
mpoller_destroy(this->mpoller);
1418-
msgqueue_destroy(this->queue);
1471+
msgqueue_destroy(this->msgqueue);
14191472
}
14201473

14211474
return -1;
@@ -1425,10 +1478,10 @@ void Communicator::deinit()
14251478
{
14261479
this->stop_flag = 1;
14271480
mpoller_stop(this->mpoller);
1428-
msgqueue_set_nonblock(this->queue);
1481+
msgqueue_set_nonblock(this->msgqueue);
14291482
thrdpool_destroy(NULL, this->thrdpool);
14301483
mpoller_destroy(this->mpoller);
1431-
msgqueue_destroy(this->queue);
1484+
msgqueue_destroy(this->msgqueue);
14321485
}
14331486

14341487
int Communicator::nonblock_connect(CommTarget *target)

src/kernel/Communicator.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ class Communicator
280280

281281
private:
282282
struct __mpoller *mpoller;
283-
struct __msgqueue *queue;
283+
struct __msgqueue *msgqueue;
284284
struct __thrdpool *thrdpool;
285285
int stop_flag;
286286

@@ -341,10 +341,16 @@ class Communicator
341341
static int first_timeout_send(CommSession *session);
342342
static int first_timeout_recv(CommSession *session);
343343

344-
static int append(const void *buf, size_t *size, poller_message_t *msg);
344+
static int append_request(const void *buf, size_t *size,
345+
poller_message_t *msg);
346+
static int append_reply(const void *buf, size_t *size,
347+
poller_message_t *msg);
345348

346349
static int create_service_session(struct CommConnEntry *entry);
347350

351+
static poller_message_t *create_request(struct CommConnEntry *entry);
352+
static poller_message_t *create_reply(struct CommConnEntry *entry);
353+
348354
static poller_message_t *create_message(void *context);
349355

350356
static int partial_written(size_t n, void *context);

src/util/json_parser.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ struct __json_array
4343

4444
struct __json_value
4545
{
46-
int type;
4746
union
4847
{
4948
char *string;
5049
double number;
5150
json_object_t object;
5251
json_array_t array;
5352
} value;
53+
int type;
5454
};
5555

5656
struct __json_member

0 commit comments

Comments
 (0)