Skip to content

Commit 032e729

Browse files
committed
add kafka_process_broker_api
1 parent 3f6a7c2 commit 032e729

1 file changed

Lines changed: 89 additions & 85 deletions

File tree

src/client/WFKafkaClient.cc

Lines changed: 89 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,8 @@ class ComplexKafkaTask : public WFKafkaTask
272272

273273
static void kafka_meta_callback(__WFKafkaTask *task);
274274

275+
static void kafka_process_broker_api(ComplexKafkaTask *t, __WFKafkaTask *task);
276+
275277
void kafka_broker_api_callback(__WFKafkaTask *task);
276278

277279
static void kafka_broker_callback(const ParallelWork *pwork);
@@ -302,7 +304,10 @@ class ComplexKafkaTask : public WFKafkaTask
302304

303305
int arrange_commit();
304306

305-
KafkaBroker *get_broker(int node_id);
307+
inline KafkaBroker *get_broker(int node_id)
308+
{
309+
return this->client_broker_map.find_item(node_id);
310+
}
306311

307312
int get_node_id(const KafkaToppar *toppar);
308313

@@ -320,11 +325,6 @@ class ComplexKafkaTask : public WFKafkaTask
320325
friend class WFKafkaClient;
321326
};
322327

323-
KafkaBroker *ComplexKafkaTask::get_broker(int node_id)
324-
{
325-
return this->client_broker_map.find_item(node_id);
326-
}
327-
328328
int ComplexKafkaTask::get_node_id(const KafkaToppar *toppar)
329329
{
330330
bool flag = false;
@@ -561,102 +561,106 @@ void ComplexKafkaTask::kafka_merge_broker_list(KafkaBrokerList *dst,
561561
}
562562
}
563563

564-
void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task)
564+
void ComplexKafkaTask::kafka_process_broker_api(ComplexKafkaTask *t, __WFKafkaTask *task)
565565
{
566-
ComplexKafkaTask *t = (ComplexKafkaTask *)task->user_data;
567-
t->lock_status.get_mutex()->lock();
568-
if (task->get_state() == WFT_STATE_SUCCESS)
566+
if (t->config.get_broker_version())
569567
{
570-
kafka_merge_meta_list(&t->client_meta_list,
571-
task->get_resp()->get_meta_list());
572-
kafka_merge_broker_list(&t->client_broker_list,
573-
task->get_resp()->get_broker_list());
574-
575-
if (t->config.get_broker_version())
568+
t->client_broker_list.rewind();
569+
KafkaBroker *broker;
570+
while ((broker = t->client_broker_list.get_next()) != NULL)
576571
{
577-
t->client_broker_list.rewind();
578-
KafkaBroker *broker;
579-
while ((broker = t->client_broker_list.get_next()) != NULL)
580-
{
581-
kafka_api_version_t *api;
582-
size_t api_cnt;
583-
const char *brk_ver = t->config.get_broker_version();
584-
int ret = kafka_api_version_is_queryable(brk_ver, &api, &api_cnt);
572+
kafka_api_version_t *api;
573+
size_t api_cnt;
574+
const char *brk_ver = t->config.get_broker_version();
575+
int ret = kafka_api_version_is_queryable(brk_ver, &api, &api_cnt);
585576

586-
if (ret == 0)
587-
{
588-
if (!broker->allocate_api_version(api_cnt))
589-
{
590-
t->state = WFT_STATE_TASK_ERROR;
591-
t->error = errno;
592-
t->lock_status.get_mutex()->unlock();
593-
return;
594-
}
595-
596-
memcpy(broker->get_api(), api,
597-
sizeof(kafka_api_version_t) * api_cnt);
598-
599-
t->client_broker_map.add_item(*broker);
600-
}
601-
else
577+
if (ret == 0)
578+
{
579+
if (!broker->allocate_api_version(api_cnt))
602580
{
603581
t->state = WFT_STATE_TASK_ERROR;
604-
t->error = WFT_ERR_KAFKA_VERSION_DISALLOWED;
582+
t->error = errno;
605583
t->lock_status.get_mutex()->unlock();
606584
return;
607585
}
608-
}
609586

610-
*t->lock_status.get_status() |= KAFKA_META_DONE;
611-
*t->lock_status.get_status() &= (~(KAFKA_META_INIT|KAFKA_META_DOING));
587+
memcpy(broker->get_api(), api,
588+
sizeof(kafka_api_version_t) * api_cnt);
612589

613-
t->state = WFT_STATE_SUCCESS;
614-
t->error = 0;
590+
t->client_broker_map.add_item(*broker);
591+
}
592+
else
593+
{
594+
t->state = WFT_STATE_TASK_ERROR;
595+
t->error = WFT_ERR_KAFKA_VERSION_DISALLOWED;
596+
t->lock_status.get_mutex()->unlock();
597+
return;
598+
}
615599
}
616-
else
617-
{
618-
SeriesWork *series;
619-
ParallelWork *parallel = Workflow::create_parallel_work(kafka_broker_callback);
620-
parallel->set_context(t);
621-
t->client_broker_list.rewind();
622600

623-
KafkaBroker *broker;
624-
while ((broker = t->client_broker_list.get_next()) != NULL)
625-
{
626-
auto cb = std::bind(&ComplexKafkaTask::kafka_broker_api_callback, t,
627-
std::placeholders::_1);
628-
__WFKafkaTask *ntask;
629-
if (broker->is_to_addr())
630-
{
631-
const struct sockaddr *addr;
632-
socklen_t socklen;
633-
broker->get_broker_addr(&addr, &socklen);
601+
*t->lock_status.get_status() |= KAFKA_META_DONE;
602+
*t->lock_status.get_status() &= (~(KAFKA_META_INIT|KAFKA_META_DOING));
634603

635-
ntask = __WFKafkaTaskFactory::create_kafka_task(addr, socklen,
636-
t->retry_max,
637-
nullptr);
638-
}
639-
else
640-
{
641-
ntask = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(),
642-
broker->get_port(),
643-
t->retry_max,
644-
nullptr);
645-
}
604+
t->state = WFT_STATE_SUCCESS;
605+
t->error = 0;
606+
}
607+
else
608+
{
609+
SeriesWork *series;
610+
ParallelWork *parallel = Workflow::create_parallel_work(kafka_broker_callback);
611+
parallel->set_context(t);
612+
t->client_broker_list.rewind();
613+
614+
KafkaBroker *broker;
615+
while ((broker = t->client_broker_list.get_next()) != NULL)
616+
{
617+
auto cb = std::bind(&ComplexKafkaTask::kafka_broker_api_callback, t,
618+
std::placeholders::_1);
619+
__WFKafkaTask *ntask;
620+
if (broker->is_to_addr())
621+
{
622+
const struct sockaddr *addr;
623+
socklen_t socklen;
624+
broker->get_broker_addr(&addr, &socklen);
646625

647-
ntask->get_req()->set_config(t->config);
648-
ntask->get_req()->set_broker(*broker);
649-
ntask->get_req()->set_api(Kafka_ApiVersions);
650-
ntask->user_data = broker;
651-
KafkaComplexTask *ctask = static_cast<KafkaComplexTask *>(ntask);
652-
*ctask->get_mutable_ctx() = cb;
653-
series = Workflow::create_series_work(ntask, nullptr);
654-
parallel->add_series(series);
626+
ntask = __WFKafkaTaskFactory::create_kafka_task(addr, socklen,
627+
t->retry_max,
628+
nullptr);
655629
}
656-
series_of(task)->push_front(parallel);
657-
t->lock_status.get_mutex()->unlock();
658-
return;
630+
else
631+
{
632+
ntask = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(),
633+
broker->get_port(),
634+
t->retry_max,
635+
nullptr);
636+
}
637+
638+
ntask->get_req()->set_config(t->config);
639+
ntask->get_req()->set_broker(*broker);
640+
ntask->get_req()->set_api(Kafka_ApiVersions);
641+
ntask->user_data = broker;
642+
KafkaComplexTask *ctask = static_cast<KafkaComplexTask *>(ntask);
643+
*ctask->get_mutable_ctx() = cb;
644+
series = Workflow::create_series_work(ntask, nullptr);
645+
parallel->add_series(series);
659646
}
647+
series_of(task)->push_front(parallel);
648+
t->lock_status.get_mutex()->unlock();
649+
}
650+
}
651+
652+
void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task)
653+
{
654+
ComplexKafkaTask *t = (ComplexKafkaTask *)task->user_data;
655+
t->lock_status.get_mutex()->lock();
656+
if (task->get_state() == WFT_STATE_SUCCESS)
657+
{
658+
kafka_merge_meta_list(&t->client_meta_list,
659+
task->get_resp()->get_meta_list());
660+
kafka_merge_broker_list(&t->client_broker_list,
661+
task->get_resp()->get_broker_list());
662+
663+
kafka_process_broker_api(t, task);
660664
}
661665
else
662666
{

0 commit comments

Comments
 (0)