Skip to content
Merged
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions zookeeper-client/zookeeper-client-c/src/zookeeper.c
Original file line number Diff line number Diff line change
Expand Up @@ -3139,10 +3139,9 @@ static int queue_session_event(zhandle_t *zh, int state)
}
//#endif

completion_list_t *dequeue_completion(completion_head_t *list)
{
completion_list_t *dequeue_completion_nolock(completion_head_t *list) {

completion_list_t *cptr;
lock_completion_list(list);
cptr = list->head;
if (cptr) {
list->head = cptr->next;
Expand All @@ -3151,6 +3150,14 @@ completion_list_t *dequeue_completion(completion_head_t *list)
list->last = 0;
}
}
return cptr;
}

completion_list_t *dequeue_completion(completion_head_t *list)
{
completion_list_t *cptr;
lock_completion_list(list);
cptr = dequeue_completion_nolock(list);
unlock_completion_list(list);
return cptr;
}
Expand All @@ -3159,7 +3166,7 @@ completion_list_t *dequeue_completion(completion_head_t *list)
static void cleanup_failed_multi(zhandle_t *zh, int xid, int rc, completion_list_t *cptr) {
completion_list_t *entry;
completion_head_t *clist = &cptr->c.clist;
while ((entry = dequeue_completion(clist)) != NULL) {
while ((entry = dequeue_completion_nolock(clist)) != NULL) {
// Fake failed response for all sub-requests
deserialize_response(zh, entry->c.type, xid, 1, rc, entry, NULL);
destroy_completion_entry(entry);
Expand All @@ -3174,7 +3181,7 @@ static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, st
assert(clist);
deserialize_MultiHeader(ia, "multiheader", &mhdr);
while (!mhdr.done) {
completion_list_t *entry = dequeue_completion(clist);
completion_list_t *entry = dequeue_completion_nolock(clist);
assert(entry);

if (mhdr.type == -1) {
Expand Down Expand Up @@ -3596,7 +3603,10 @@ static completion_list_t* do_create_completion_entry(zhandle_t *zh, int xid,
watcher_registration_t* wo, completion_head_t *clist,
watcher_deregistration_t* wdo)
{
completion_list_t *c = calloc(1, sizeof(completion_list_t));

completion_list_t *c = NULL;

c = calloc(1, sizeof(completion_list_t));
if (!c) {
LOG_ERROR(LOGCALLBACK(zh), "out of memory");
return 0;
Expand Down Expand Up @@ -4651,10 +4661,10 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
struct MultiHeader mh = {-1, 1, -1};
struct oarchive *oa = create_buffer_oarchive();
completion_head_t clist = { 0 };
int rc, index;

int rc = serialize_RequestHeader(oa, "header", &h);
rc = serialize_RequestHeader(oa, "header", &h);

int index = 0;
for (index=0; index < count; index++) {
const zoo_op_t *op = ops+index;
zoo_op_result_t *result = results+index;
Expand Down Expand Up @@ -4728,7 +4738,7 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
return ZUNIMPLEMENTED;
}

queue_completion(&clist, entry, 0);
queue_completion_nolock(&clist, entry, 0);
}

rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
Expand Down