2020import org .apache .kafka .common .Uuid ;
2121import org .apache .kafka .common .protocol .Errors ;
2222import org .apache .kafka .common .requests .DeleteShareGroupStateResponse ;
23+ import org .apache .kafka .common .requests .InitializeShareGroupStateResponse ;
2324import org .apache .kafka .common .requests .ReadShareGroupStateResponse ;
2425import org .apache .kafka .common .requests .ReadShareGroupStateSummaryResponse ;
2526import org .apache .kafka .common .requests .WriteShareGroupStateResponse ;
3334import java .util .List ;
3435import java .util .Map ;
3536import java .util .concurrent .CompletableFuture ;
36- import java .util .stream .Collectors ;
3737
3838/**
3939 * The default implementation of the {@link Persister} interface which is used by the
@@ -70,7 +70,48 @@ public void stop() {
7070 * @return A completable future of InitializeShareGroupStateResult
7171 */
7272 public CompletableFuture <InitializeShareGroupStateResult > initializeState (InitializeShareGroupStateParameters request ) {
73- throw new RuntimeException ("not implemented" );
73+ try {
74+ validate (request );
75+ } catch (Exception e ) {
76+ log .error ("Unable to validate initialize state request" , e );
77+ return CompletableFuture .failedFuture (e );
78+ }
79+ GroupTopicPartitionData <PartitionStateData > gtp = request .groupTopicPartitionData ();
80+ String groupId = gtp .groupId ();
81+
82+ Map <Uuid , Map <Integer , CompletableFuture <InitializeShareGroupStateResponse >>> futureMap = new HashMap <>();
83+ List <PersisterStateManager .InitializeStateHandler > handlers = new ArrayList <>();
84+
85+ gtp .topicsData ().forEach (topicData -> {
86+ topicData .partitions ().forEach (partitionData -> {
87+ CompletableFuture <InitializeShareGroupStateResponse > future = futureMap
88+ .computeIfAbsent (topicData .topicId (), k -> new HashMap <>())
89+ .computeIfAbsent (partitionData .partition (), k -> new CompletableFuture <>());
90+
91+ handlers .add (
92+ stateManager .new InitializeStateHandler (
93+ groupId ,
94+ topicData .topicId (),
95+ partitionData .partition (),
96+ partitionData .stateEpoch (),
97+ partitionData .startOffset (),
98+ future ,
99+ null
100+ )
101+ );
102+ });
103+ });
104+
105+ for (PersisterStateManager .PersisterStateManagerHandler handler : handlers ) {
106+ stateManager .enqueue (handler );
107+ }
108+
109+ CompletableFuture <Void > combinedFuture = CompletableFuture .allOf (
110+ handlers .stream ()
111+ .map (PersisterStateManager .InitializeStateHandler ::result )
112+ .toArray (CompletableFuture []::new ));
113+
114+ return combinedFuture .thenApply (v -> initializeResponsesToResult (futureMap ));
74115 }
75116
76117 /**
@@ -125,6 +166,51 @@ stateManager.new WriteStateHandler(
125166 return combinedFuture .thenApply (v -> writeResponsesToResult (futureMap ));
126167 }
127168
169+ /**
170+ * Takes in a list of COMPLETED futures and combines the results,
171+ * taking care of errors if any, into a single InitializeShareGroupStateResult
172+ *
173+ * @param futureMap - HashMap of {topic -> {partition -> future}}
174+ * @return Object representing combined result of type InitializeShareGroupStateResult
175+ */
176+ // visible for testing
177+ InitializeShareGroupStateResult initializeResponsesToResult (
178+ Map <Uuid , Map <Integer , CompletableFuture <InitializeShareGroupStateResponse >>> futureMap
179+ ) {
180+ List <TopicData <PartitionErrorData >> topicsData = futureMap .keySet ().stream ()
181+ .map (topicId -> {
182+ List <PartitionErrorData > partitionErrData = futureMap .get (topicId ).entrySet ().stream ()
183+ .map (partitionFuture -> {
184+ int partition = partitionFuture .getKey ();
185+ CompletableFuture <InitializeShareGroupStateResponse > future = partitionFuture .getValue ();
186+ try {
187+ // already completed because of allOf application in the caller
188+ InitializeShareGroupStateResponse partitionResponse = future .join ();
189+ return partitionResponse .data ().results ().get (0 ).partitions ().stream ()
190+ .map (partitionResult -> PartitionFactory .newPartitionErrorData (
191+ partitionResult .partition (),
192+ partitionResult .errorCode (),
193+ partitionResult .errorMessage ()))
194+ .toList ();
195+ } catch (Exception e ) {
196+ log .error ("Unexpected exception while initializing data in share coordinator" , e );
197+ return List .of (PartitionFactory .newPartitionErrorData (
198+ partition ,
199+ Errors .UNKNOWN_SERVER_ERROR .code (), // No specific public error code exists for InterruptedException / ExecutionException
200+ "Error initializing state in share coordinator: " + e .getMessage ())
201+ );
202+ }
203+ })
204+ .flatMap (List ::stream )
205+ .toList ();
206+ return new TopicData <>(topicId , partitionErrData );
207+ })
208+ .toList ();
209+ return new InitializeShareGroupStateResult .Builder ()
210+ .setTopicsData (topicsData )
211+ .build ();
212+ }
213+
128214 /**
129215 * Takes in a list of COMPLETED futures and combines the results,
130216 * taking care of errors if any, into a single WriteShareGroupStateResult
@@ -150,21 +236,21 @@ WriteShareGroupStateResult writeResponsesToResult(
150236 partitionResult .partition (),
151237 partitionResult .errorCode (),
152238 partitionResult .errorMessage ()))
153- .collect ( Collectors . toList () );
239+ .toList ();
154240 } catch (Exception e ) {
155241 log .error ("Unexpected exception while writing data to share coordinator" , e );
156- return Collections . singletonList (PartitionFactory .newPartitionErrorData (
242+ return List . of (PartitionFactory .newPartitionErrorData (
157243 partition ,
158244 Errors .UNKNOWN_SERVER_ERROR .code (), // No specific public error code exists for InterruptedException / ExecutionException
159245 "Error writing state to share coordinator: " + e .getMessage ())
160246 );
161247 }
162248 })
163249 .flatMap (List ::stream )
164- .collect ( Collectors . toList () );
250+ .toList ();
165251 return new TopicData <>(topicId , partitionErrData );
166252 })
167- .collect ( Collectors . toList () );
253+ .toList ();
168254 return new WriteShareGroupStateResult .Builder ()
169255 .setTopicsData (topicsData )
170256 .build ();
@@ -248,12 +334,12 @@ ReadShareGroupStateResult readResponsesToResult(
248334 partitionResult .startOffset (),
249335 partitionResult .errorCode (),
250336 partitionResult .errorMessage (),
251- partitionResult .stateBatches ().stream ().map (PersisterStateBatch ::from ).collect ( Collectors . toList () )
337+ partitionResult .stateBatches ().stream ().map (PersisterStateBatch ::from ).toList ()
252338 ))
253- .collect ( Collectors . toList () );
339+ .toList ();
254340 } catch (Exception e ) {
255341 log .error ("Unexpected exception while getting data from share coordinator" , e );
256- return Collections . singletonList (PartitionFactory .newPartitionAllData (
342+ return List . of (PartitionFactory .newPartitionAllData (
257343 partition ,
258344 -1 ,
259345 -1 ,
@@ -264,10 +350,10 @@ ReadShareGroupStateResult readResponsesToResult(
264350 }
265351 })
266352 .flatMap (List ::stream )
267- .collect ( Collectors . toList () );
353+ .toList ();
268354 return new TopicData <>(topicId , partitionAllData );
269355 })
270- .collect ( Collectors . toList () );
356+ .toList ();
271357 return new ReadShareGroupStateResult .Builder ()
272358 .setTopicsData (topicsData )
273359 .build ();
@@ -403,10 +489,10 @@ ReadShareGroupStateSummaryResult readSummaryResponsesToResult(
403489 partitionResult .startOffset (),
404490 partitionResult .errorCode (),
405491 partitionResult .errorMessage ()))
406- .collect ( Collectors . toList () );
492+ .toList ();
407493 } catch (Exception e ) {
408494 log .error ("Unexpected exception while getting data from share coordinator" , e );
409- return Collections . singletonList (PartitionFactory .newPartitionStateSummaryData (
495+ return List . of (PartitionFactory .newPartitionStateSummaryData (
410496 partition ,
411497 -1 ,
412498 -1 ,
@@ -415,10 +501,10 @@ ReadShareGroupStateSummaryResult readSummaryResponsesToResult(
415501 }
416502 })
417503 .flatMap (List ::stream )
418- .collect ( Collectors . toList () );
504+ .toList ();
419505 return new TopicData <>(topicId , partitionStateErrorData );
420506 })
421- .collect ( Collectors . toList () );
507+ .toList ();
422508 return new ReadShareGroupStateSummaryResult .Builder ()
423509 .setTopicsData (topicsData )
424510 .build ();
@@ -464,15 +550,27 @@ DeleteShareGroupStateResult deleteResponsesToResult(
464550 }
465551 })
466552 .flatMap (List ::stream )
467- .collect ( Collectors . toList () );
553+ .toList ();
468554 return new TopicData <>(topicId , partitionErrorData );
469555 })
470- .collect ( Collectors . toList () );
556+ .toList ();
471557 return new DeleteShareGroupStateResult .Builder ()
472558 .setTopicsData (topicsData )
473559 .build ();
474560 }
475561
562+ private static void validate (InitializeShareGroupStateParameters params ) {
563+ String prefix = "Initialize share group parameters" ;
564+ if (params == null ) {
565+ throw new IllegalArgumentException (prefix + " cannot be null." );
566+ }
567+ if (params .groupTopicPartitionData () == null ) {
568+ throw new IllegalArgumentException (prefix + " data cannot be null." );
569+ }
570+
571+ validateGroupTopicPartitionData (prefix , params .groupTopicPartitionData ());
572+ }
573+
476574 private static void validate (WriteShareGroupStateParameters params ) {
477575 String prefix = "Write share group parameters" ;
478576 if (params == null ) {
0 commit comments