@@ -119,6 +119,7 @@ class KafkaMember
119119 this ->meta_list = new KafkaMetaList;
120120 this ->broker_list = new KafkaBrokerList;
121121 this ->lock_status = new KafkaLockStatus;
122+ this ->broker_map = new KafkaBrokerMap;
122123 }
123124
124125 ~KafkaMember ()
@@ -130,6 +131,7 @@ class KafkaMember
130131 delete this ->cgroup ;
131132 delete this ->meta_list ;
132133 delete this ->broker_list ;
134+ delete this ->broker_map ;
133135 delete this ->lock_status ;
134136 }
135137 }
@@ -138,6 +140,7 @@ class KafkaMember
138140 KafkaCgroup *cgroup;
139141 KafkaMetaList *meta_list;
140142 KafkaBrokerList *broker_list;
143+ KafkaBrokerMap *broker_map;
141144 KafkaLockStatus *lock_status;
142145
143146private:
@@ -231,6 +234,7 @@ class ComplexKafkaTask : public WFKafkaTask
231234 this ->cgroup = *client->member ->cgroup ;
232235 this ->client_meta_list = *client->member ->meta_list ;
233236 this ->client_broker_list = *client->member ->broker_list ;
237+ this ->client_broker_map = *client->member ->broker_map ;
234238 this ->query = query;
235239
236240 if (!client->member ->broker_hosts ->empty ())
@@ -268,6 +272,12 @@ class ComplexKafkaTask : public WFKafkaTask
268272
269273 static void kafka_meta_callback (__WFKafkaTask *task);
270274
275+ static void kafka_process_broker_api (ComplexKafkaTask *t, __WFKafkaTask *task);
276+
277+ void kafka_broker_api_callback (__WFKafkaTask *task);
278+
279+ static void kafka_broker_callback (const ParallelWork *pwork);
280+
271281 static void kafka_cgroup_callback (__WFKafkaTask *task);
272282
273283 static void kafka_offsetcommit_callback (__WFKafkaTask *task);
@@ -294,7 +304,10 @@ class ComplexKafkaTask : public WFKafkaTask
294304
295305 int arrange_commit ();
296306
297- KafkaBroker *get_broker (int node_id);
307+ inline KafkaBroker *get_broker (int node_id)
308+ {
309+ return this ->client_broker_map .find_item (node_id);
310+ }
298311
299312 int get_node_id (const KafkaToppar *toppar);
300313
@@ -303,6 +316,7 @@ class ComplexKafkaTask : public WFKafkaTask
303316 KafkaLockStatus lock_status;
304317 KafkaMetaList client_meta_list;
305318 KafkaBrokerList client_broker_list;
319+ KafkaBrokerMap client_broker_map;
306320 KafkaCgroup cgroup;
307321 std::map<int , KafkaTopparList> toppar_list_map;
308322 ParsedURI uri;
@@ -311,26 +325,6 @@ class ComplexKafkaTask : public WFKafkaTask
311325 friend class WFKafkaClient ;
312326};
313327
314- KafkaBroker *ComplexKafkaTask::get_broker (int node_id)
315- {
316- bool flag = false ;
317- this ->client_broker_list .rewind ();
318- KafkaBroker *broker;
319- while ((broker = this ->client_broker_list .get_next ()) != NULL )
320- {
321- if (broker->get_node_id () == node_id)
322- {
323- flag = true ;
324- break ;
325- }
326- }
327-
328- if (flag)
329- return broker;
330- else
331- return NULL ;
332- }
333-
334328int ComplexKafkaTask::get_node_id (const KafkaToppar *toppar)
335329{
336330 bool flag = false ;
@@ -567,6 +561,94 @@ void ComplexKafkaTask::kafka_merge_broker_list(KafkaBrokerList *dst,
567561 }
568562}
569563
564+ void ComplexKafkaTask::kafka_process_broker_api (ComplexKafkaTask *t, __WFKafkaTask *task)
565+ {
566+ if (t->config .get_broker_version ())
567+ {
568+ t->client_broker_list .rewind ();
569+ KafkaBroker *broker;
570+ while ((broker = t->client_broker_list .get_next ()) != NULL )
571+ {
572+ kafka_api_version_t *api;
573+ size_t api_cnt;
574+ const char *brk_ver = t->config .get_broker_version ();
575+ int ret = kafka_api_version_is_queryable (brk_ver, &api, &api_cnt);
576+
577+ if (ret == 0 )
578+ {
579+ if (!broker->allocate_api_version (api_cnt))
580+ {
581+ t->state = WFT_STATE_TASK_ERROR ;
582+ t->error = errno;
583+ t->lock_status .get_mutex ()->unlock ();
584+ return ;
585+ }
586+
587+ memcpy (broker->get_api (), api,
588+ sizeof (kafka_api_version_t ) * api_cnt);
589+
590+ t->client_broker_map .add_item (*broker);
591+ }
592+ else
593+ {
594+ t->state = WFT_STATE_TASK_ERROR ;
595+ t->error = WFT_ERR_KAFKA_VERSION_DISALLOWED ;
596+ t->lock_status .get_mutex ()->unlock ();
597+ return ;
598+ }
599+ }
600+
601+ *t->lock_status .get_status () |= KAFKA_META_DONE ;
602+ *t->lock_status .get_status () &= (~(KAFKA_META_INIT |KAFKA_META_DOING ));
603+
604+ t->state = WFT_STATE_SUCCESS ;
605+ t->error = 0 ;
606+ }
607+ else
608+ {
609+ SeriesWork *series;
610+ ParallelWork *parallel = Workflow::create_parallel_work (kafka_broker_callback);
611+ parallel->set_context (t);
612+ t->client_broker_list .rewind ();
613+
614+ KafkaBroker *broker;
615+ while ((broker = t->client_broker_list .get_next ()) != NULL )
616+ {
617+ auto cb = std::bind (&ComplexKafkaTask::kafka_broker_api_callback, t,
618+ std::placeholders::_1);
619+ __WFKafkaTask *ntask;
620+ if (broker->is_to_addr ())
621+ {
622+ const struct sockaddr *addr;
623+ socklen_t socklen;
624+ broker->get_broker_addr (&addr, &socklen);
625+
626+ ntask = __WFKafkaTaskFactory::create_kafka_task (addr, socklen,
627+ t->retry_max ,
628+ nullptr );
629+ }
630+ else
631+ {
632+ ntask = __WFKafkaTaskFactory::create_kafka_task (broker->get_host (),
633+ broker->get_port (),
634+ t->retry_max ,
635+ nullptr );
636+ }
637+
638+ ntask->get_req ()->set_config (t->config );
639+ ntask->get_req ()->set_broker (*broker);
640+ ntask->get_req ()->set_api (Kafka_ApiVersions);
641+ ntask->user_data = broker;
642+ KafkaComplexTask *ctask = static_cast <KafkaComplexTask *>(ntask);
643+ *ctask->get_mutable_ctx () = cb;
644+ series = Workflow::create_series_work (ntask, nullptr );
645+ parallel->add_series (series);
646+ }
647+ series_of (task)->push_front (parallel);
648+ t->lock_status .get_mutex ()->unlock ();
649+ }
650+ }
651+
570652void ComplexKafkaTask::kafka_meta_callback (__WFKafkaTask *task)
571653{
572654 ComplexKafkaTask *t = (ComplexKafkaTask *)task->user_data ;
@@ -578,6 +660,59 @@ void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task)
578660 kafka_merge_broker_list (&t->client_broker_list ,
579661 task->get_resp ()->get_broker_list ());
580662
663+ kafka_process_broker_api (t, task);
664+ }
665+ else
666+ {
667+ *t->lock_status .get_status () |= KAFKA_META_INIT ;
668+ *t->lock_status .get_status () &= (~(KAFKA_META_DONE |KAFKA_META_DOING ));
669+
670+ t->state = WFT_STATE_TASK_ERROR ;
671+ t->error = WFT_ERR_KAFKA_META_FAILED ;
672+ t->finish = true ;
673+ }
674+
675+ char name[64 ];
676+ snprintf (name, 64 , " %p.meta" , t->client );
677+ t->lock_status .get_mutex ()->unlock ();
678+ WFTaskFactory::count_by_name (name, (unsigned int )-1 );
679+ }
680+
681+ void ComplexKafkaTask::kafka_broker_api_callback (__WFKafkaTask *task)
682+ {
683+ using KafkaTuple = std::tuple<int , int , void *>;
684+ KafkaTuple * state_error_broker = new KafkaTuple{task->get_state (),
685+ task->get_error (),
686+ task->user_data };
687+ series_of (task)->set_context (state_error_broker);
688+ }
689+
690+ void ComplexKafkaTask::kafka_broker_callback (const ParallelWork *pwork)
691+ {
692+ ComplexKafkaTask *t = (ComplexKafkaTask *)pwork->get_context ();
693+ t->state = WFT_STATE_SUCCESS ;
694+ t->error = 0 ;
695+
696+ t->lock_status .get_mutex ()->lock ();
697+ using KafkaTuple = std::tuple<int , int , KafkaBroker *>;
698+ KafkaTuple *state_error_broker;
699+
700+ for (size_t i = 0 ; i < pwork->size (); i++)
701+ {
702+ state_error_broker = (KafkaTuple *)pwork->series_at (i)->get_context ();
703+ if (std::get<0 >(*state_error_broker) != WFT_STATE_SUCCESS )
704+ {
705+ t->state = std::get<0 >(*state_error_broker);
706+ t->error = std::get<1 >(*state_error_broker);
707+ }
708+ else
709+ t->client_broker_map .add_item (*std::get<2 >(*state_error_broker));
710+
711+ delete state_error_broker;
712+ }
713+
714+ if (t->state == WFT_STATE_SUCCESS )
715+ {
581716 *t->lock_status .get_status () |= KAFKA_META_DONE ;
582717 *t->lock_status .get_status () &= (~(KAFKA_META_INIT |KAFKA_META_DOING ));
583718
@@ -596,8 +731,8 @@ void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task)
596731
597732 char name[64 ];
598733 snprintf (name, 64 , " %p.meta" , t->client );
599- WFTaskFactory::count_by_name (name, (unsigned int )-1 );
600734 t->lock_status .get_mutex ()->unlock ();
735+ WFTaskFactory::count_by_name (name, (unsigned int )-1 );
601736}
602737
603738void ComplexKafkaTask::kafka_cgroup_callback (__WFKafkaTask *task)
@@ -654,8 +789,8 @@ void ComplexKafkaTask::kafka_cgroup_callback(__WFKafkaTask *task)
654789
655790 char name[64 ];
656791 snprintf (name, 64 , " %p.cgroup" , t->client );
657- WFTaskFactory::count_by_name (name, (unsigned int )-1 );
658792 t->lock_status .get_mutex ()->unlock ();
793+ WFTaskFactory::count_by_name (name, (unsigned int )-1 );
659794}
660795
661796void ComplexKafkaTask::kafka_parallel_callback (const ParallelWork *pwork)
@@ -782,7 +917,18 @@ void ComplexKafkaTask::dispatch()
782917
783918 this ->lock_status .get_mutex ()->lock ();
784919
785- if (*this ->lock_status .get_status () & KAFKA_META_INIT )
920+ if (*this ->lock_status .get_status () & KAFKA_META_DOING )
921+ {
922+ char name[64 ];
923+ snprintf (name, 64 , " %p.meta" , this ->client );
924+ counter = WFTaskFactory::create_counter_task (name, 1 , nullptr );
925+ series_of (this )->push_front (this );
926+ series_of (this )->push_front (counter);
927+ this ->lock_status .get_mutex ()->unlock ();
928+ this ->subtask_done ();
929+ return ;
930+ }
931+ else if (*this ->lock_status .get_status () & KAFKA_META_INIT )
786932 {
787933 task = __WFKafkaTaskFactory::create_kafka_task (this ->uri ,
788934 this ->retry_max ,
@@ -798,27 +944,52 @@ void ComplexKafkaTask::dispatch()
798944 this ->subtask_done ();
799945 return ;
800946 }
801- else if (*this ->lock_status .get_status () & KAFKA_META_DOING )
947+
948+ if (*this ->lock_status .get_status () & KAFKA_CGROUP_DOING )
802949 {
803950 char name[64 ];
804- snprintf (name, 64 , " %p.meta " , this ->client );
951+ snprintf (name, 64 , " %p.cgroup " , this ->client );
805952 counter = WFTaskFactory::create_counter_task (name, 1 , nullptr );
806953 series_of (this )->push_front (this );
807954 series_of (this )->push_front (counter);
808955 this ->lock_status .get_mutex ()->unlock ();
809956 this ->subtask_done ();
810957 return ;
811958 }
812-
813- if ((this ->api_type == Kafka_Fetch || this ->api_type == Kafka_OffsetCommit) &&
814- (*this ->lock_status .get_status () & KAFKA_CGROUP_INIT ))
959+ else if ((this ->api_type == Kafka_Fetch || this ->api_type == Kafka_OffsetCommit) &&
960+ (*this ->lock_status .get_status () & KAFKA_CGROUP_INIT ))
815961 {
816- task = __WFKafkaTaskFactory::create_kafka_task (this ->uri ,
817- this ->retry_max ,
818- kafka_cgroup_callback);
962+ KafkaBroker *broker = this ->client_broker_map .get_first_entry ();
963+ if (!broker)
964+ {
965+ this ->state = WFT_STATE_TASK_ERROR ;
966+ this ->error = WFT_ERR_KAFKA_CGROUP_FAILED ;
967+ this ->finish = true ;
968+ return ;
969+ }
970+
971+ if (broker->is_to_addr ())
972+ {
973+ const struct sockaddr *addr;
974+ socklen_t socklen;
975+ broker->get_broker_addr (&addr, &socklen);
976+
977+ task = __WFKafkaTaskFactory::create_kafka_task (addr, socklen,
978+ this ->retry_max ,
979+ kafka_cgroup_callback);
980+ }
981+ else
982+ {
983+ task = __WFKafkaTaskFactory::create_kafka_task (broker->get_host (),
984+ broker->get_port (),
985+ this ->retry_max ,
986+ kafka_cgroup_callback);
987+ }
988+
819989 task->user_data = this ;
820990 task->get_req ()->set_config (this ->config );
821991 task->get_req ()->set_api (Kafka_FindCoordinator);
992+ task->get_req ()->set_broker (*broker);
822993 task->get_req ()->set_cgroup (this ->cgroup );
823994 task->get_req ()->set_meta_list (this ->client_meta_list );
824995 series_of (this )->push_front (this );
@@ -828,17 +999,6 @@ void ComplexKafkaTask::dispatch()
828999 this ->subtask_done ();
8291000 return ;
8301001 }
831- else if (*this ->lock_status .get_status () & KAFKA_CGROUP_DOING )
832- {
833- char name[64 ];
834- snprintf (name, 64 , " %p.cgroup" , this ->client );
835- counter = WFTaskFactory::create_counter_task (name, 1 , nullptr );
836- series_of (this )->push_front (this );
837- series_of (this )->push_front (counter);
838- this ->lock_status .get_mutex ()->unlock ();
839- this ->subtask_done ();
840- return ;
841- }
8421002
8431003 SeriesWork *series;
8441004 switch (this ->api_type )
0 commit comments