File tree 1 file changed +4
-4
lines changed
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka
1 file changed +4
-4
lines changed Original file line number Diff line number Diff line change @@ -151,8 +151,8 @@ public boolean start() throws IOException {
151
151
@ Override
152
152
public boolean advance () throws IOException {
153
153
/* Read first record (if any). we need to loop here because :
154
- * - (b ) if curBatch is empty, we want to fetch next batch and then advance.
155
- * - (c ) curBatch is an iterator of iterators. we interleave the records from each.
154
+ * - (a ) if curBatch is empty, we want to fetch next batch and then advance.
155
+ * - (b ) curBatch is an iterator of iterators. we interleave the records from each.
156
156
* curBatch.next() might return an empty iterator.
157
157
*/
158
158
while (true ) {
@@ -162,7 +162,7 @@ public boolean advance() throws IOException {
162
162
163
163
PartitionState <K , V > pState = curBatch .next ();
164
164
165
- if (!pState .recordIter .hasNext ()) { // -- (c )
165
+ if (!pState .recordIter .hasNext ()) { // -- (b )
166
166
pState .recordIter = Collections .emptyIterator (); // drop ref
167
167
curBatch .remove ();
168
168
continue ;
@@ -211,7 +211,7 @@ public boolean advance() throws IOException {
211
211
212
212
kafkaResults .flushBufferedMetrics ();
213
213
return true ;
214
- } else { // -- (b )
214
+ } else { // -- (a )
215
215
kafkaResults = KafkaSinkMetrics .kafkaMetrics ();
216
216
nextBatch ();
217
217
kafkaResults .flushBufferedMetrics ();
You can’t perform that action at this time.
0 commit comments