Skip to content

Commit 19b0040

Browse files
authored
Merge pull request #329 from Barenboim/master
remove std::tuple in kafka client
2 parents 67888a2 + fd54c3f commit 19b0040

2 files changed

Lines changed: 29 additions & 28 deletions

File tree

src/client/WFKafkaClient.cc

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -678,13 +678,20 @@ void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task)
678678
WFTaskFactory::count_by_name(name, (unsigned int)-1);
679679
}
680680

681+
struct __broker_status
682+
{
683+
KafkaBroker *broker;
684+
int state;
685+
int error;
686+
};
687+
681688
void ComplexKafkaTask::kafka_broker_api_callback(__WFKafkaTask *task)
682689
{
683-
using KafkaTuple = std::tuple<int, int, void *>;
684-
KafkaTuple * state_error_broker = new KafkaTuple{task->get_state(),
685-
task->get_error(),
686-
task->user_data};
687-
series_of(task)->set_context(state_error_broker);
690+
struct __broker_status *status = new struct __broker_status;
691+
status->broker = (KafkaBroker *)task->user_data;
692+
status->state = task->get_state();
693+
status->error = task->get_error();
694+
series_of(task)->set_context(status);
688695
}
689696

690697
void ComplexKafkaTask::kafka_broker_callback(const ParallelWork *pwork)
@@ -694,21 +701,20 @@ void ComplexKafkaTask::kafka_broker_callback(const ParallelWork *pwork)
694701
t->error = 0;
695702

696703
t->lock_status.get_mutex()->lock();
697-
using KafkaTuple = std::tuple<int, int, KafkaBroker *>;
698-
KafkaTuple *state_error_broker;
704+
struct __broker_status *status;
699705

700706
for (size_t i = 0; i < pwork->size(); i++)
701707
{
702-
state_error_broker = (KafkaTuple *)pwork->series_at(i)->get_context();
703-
if (std::get<0>(*state_error_broker) != WFT_STATE_SUCCESS)
708+
status = (struct __broker_status *)pwork->series_at(i)->get_context();
709+
if (status->state != WFT_STATE_SUCCESS)
704710
{
705-
t->state = std::get<0>(*state_error_broker);
706-
t->error = std::get<1>(*state_error_broker);
711+
t->state = status->state;
712+
t->error = status->error;
707713
}
708714
else
709-
t->client_broker_map.add_item(*std::get<2>(*state_error_broker));
715+
t->client_broker_map.add_item(*status->broker);
710716

711-
delete state_error_broker;
717+
delete status;
712718
}
713719

714720
if (t->state == WFT_STATE_SUCCESS)

src/protocol/kafka_parser.c

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323
#include <ctype.h>
2424
#include "kafka_parser.h"
2525

26-
27-
#define MIN(a, b) ((x) <= (y) ? (x) : (y))
28-
2926
static kafka_api_version_t kafka_api_version_queryable[] = {
3027
{ Kafka_ApiVersions, 0, 0 }
3128
};
@@ -207,8 +204,8 @@ static int kafka_get_legacy_api_version(const char *broker_version,
207204
{ "", kafka_api_version_queryable, 1 },
208205
{ NULL, NULL, 0 }
209206
};
210-
211207
int i;
208+
212209
for (i = 0 ; vermap[i].pfx ; i++)
213210
{
214211
if (!strncmp(vermap[i].pfx, broker_version, strlen(vermap[i].pfx)))
@@ -263,7 +260,7 @@ unsigned kafka_get_features(kafka_api_version_t *api, size_t api_cnt)
263260
int i, fails, r;
264261
const kafka_api_version_t *match;
265262

266-
for (i = 0 ; kafka_feature_map[i].feature != 0 ; i++)
263+
for (i = 0; kafka_feature_map[i].feature != 0; i++)
267264
{
268265
fails = 0;
269266
for (match = &kafka_feature_map[i].depends[0];
@@ -574,6 +571,7 @@ void kafka_block_deinit(kafka_block_t *block)
574571
int kafka_parser_append_message(const void *buf, size_t *size,
575572
kafka_parser_t *parser)
576573
{
574+
size_t s = *size;
577575
int totaln;
578576

579577
if (parser->complete)
@@ -582,9 +580,7 @@ int kafka_parser_append_message(const void *buf, size_t *size,
582580
return 1;
583581
}
584582

585-
size_t s = *size;
586-
587-
if (parser->hsize + *size < 4)
583+
if (parser->hsize + s < 4)
588584
{
589585
memcpy(parser->headbuf + parser->hsize, buf, s);
590586
parser->hsize += s;
@@ -672,15 +668,12 @@ int kafka_record_header_set_kv(const void *key, size_t key_len,
672668
kafka_record_header_t *header)
673669
{
674670
void *k = malloc(key_len);
675-
676-
if (!k)
677-
return -1;
678-
679671
void *v = malloc(val_len);
680672

681-
if (!v)
673+
if (!k || !v)
682674
{
683675
free(k);
676+
free(v);
684677
return -1;
685678
}
686679

@@ -728,11 +721,12 @@ int kafka_sasl_plain_client_new(void *p)
728721
size_t ulen = strlen(conf->sasl.username);
729722
size_t plen = strlen(conf->sasl.passwd);
730723
size_t blen = ulen + plen + 3;
731-
char *buf = malloc(blen);
724+
size_t off = 0;
725+
char *buf = (char *)malloc(blen);
726+
732727
if (!buf)
733728
return -1;
734729

735-
size_t off = 0;
736730
buf[off++] = '\0';
737731

738732
memcpy(buf + off, conf->sasl.username, ulen);
@@ -783,3 +777,4 @@ int kafka_sasl_set_passwd(const char *passwd, kafka_config_t *conf)
783777
conf->sasl.passwd = t;
784778
return 0;
785779
}
780+

0 commit comments

Comments
 (0)