2525import java .time .ZoneId ;
2626import java .time .ZonedDateTime ;
2727import java .time .format .DateTimeFormatter ;
28- import java .util .*;
28+ import java .util .ArrayList ;
29+ import java .util .Arrays ;
30+ import java .util .Base64 ;
31+ import java .util .Collection ;
32+ import java .util .Collections ;
33+ import java .util .HashMap ;
34+ import java .util .Iterator ;
35+ import java .util .Map ;
36+ import java .util .Objects ;
37+ import java .util .Random ;
2938import java .util .zip .GZIPInputStream ;
3039
31- import com .amazonaws .services .s3 .model .S3Object ;
32- import com .amazonaws .services .s3 .model .S3ObjectInputStream ;
3340import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
3441import org .apache .kafka .common .TopicPartition ;
3542import org .apache .kafka .common .record .TimestampType ;
4552import com .amazonaws .client .builder .AwsClientBuilder .EndpointConfiguration ;
4653import com .amazonaws .services .s3 .AmazonS3 ;
4754import com .amazonaws .services .s3 .AmazonS3ClientBuilder ;
55+ import com .amazonaws .services .s3 .model .S3Object ;
56+ import com .amazonaws .services .s3 .model .S3ObjectInputStream ;
4857import io .findify .s3mock .S3Mock ;
4958import org .junit .After ;
5059import org .junit .AfterClass ;
@@ -132,7 +141,7 @@ public void testAivenKafkaConnectS3SinkTaskTest() throws IOException {
132141 // * Simulate periodical flush() cycle - ensure that data files are written
133142
134143 // Push batch of records
135- Collection <SinkRecord > sinkRecords = createBatchOfRecord (0 , 100 );
144+ final Collection <SinkRecord > sinkRecords = createBatchOfRecord (0 , 100 );
136145 task .put (sinkRecords );
137146
138147 assertFalse (s3Client .doesObjectExist (TEST_BUCKET , "test-topic-0-0000000000.gz" ));
@@ -142,16 +151,19 @@ public void testAivenKafkaConnectS3SinkTaskTest() throws IOException {
142151 offsets .put (tp , new OffsetAndMetadata (100 ));
143152 task .flush (offsets );
144153
145- ConnectHeaders extectedConnectHeaders = createTestHeaders ();
154+ final ConnectHeaders extectedConnectHeaders = createTestHeaders ();
146155
147156 assertTrue (s3Client .doesObjectExist (TEST_BUCKET , "test-topic-0-0000000000.gz" ));
148157
149- S3Object s3Object = s3Client .getObject (TEST_BUCKET , "test-topic-0-0000000000.gz" );
150- S3ObjectInputStream s3ObjectInputStream = s3Object .getObjectContent ();
151- try (BufferedReader br = new BufferedReader (new InputStreamReader (new GZIPInputStream (s3ObjectInputStream )))) {
158+ final S3Object s3Object = s3Client .getObject (TEST_BUCKET , "test-topic-0-0000000000.gz" );
159+ final S3ObjectInputStream s3ObjectInputStream = s3Object .getObjectContent ();
160+ try (final BufferedReader br =
161+ new BufferedReader (
162+ new InputStreamReader (
163+ new GZIPInputStream (s3ObjectInputStream )))) {
152164 for (String line ; (line = br .readLine ()) != null ;) {
153- String [] parts = line .split ("," );
154- ConnectHeaders actualConnectHeaders = readHeaders (parts [4 ]);
165+ final String [] parts = line .split ("," );
166+ final ConnectHeaders actualConnectHeaders = readHeaders (parts [4 ]);
155167 assertTrue (headersEquals (actualConnectHeaders , extectedConnectHeaders ));
156168 }
157169 }
@@ -252,7 +264,7 @@ public void testS3LocalDatePrefix() {
252264 private Collection <SinkRecord > createBatchOfRecord (final int offsetFrom , final int offsetTo ) {
253265 final ArrayList <SinkRecord > records = new ArrayList <>();
254266 for (int offset = offsetFrom ; offset < offsetTo ; offset ++) {
255- ConnectHeaders connectHeaders = createTestHeaders ();
267+ final ConnectHeaders connectHeaders = createTestHeaders ();
256268 final SinkRecord record = new SinkRecord (
257269 "test-topic" ,
258270 0 ,
@@ -270,13 +282,13 @@ private Collection<SinkRecord> createBatchOfRecord(final int offsetFrom, final i
270282 }
271283
272284 private ConnectHeaders createTestHeaders () {
273- ConnectHeaders connectHeaders = new ConnectHeaders ();
285+ final ConnectHeaders connectHeaders = new ConnectHeaders ();
274286 connectHeaders .addBytes ("test-header-key-1" , "test-header-value-1" .getBytes (StandardCharsets .UTF_8 ));
275287 connectHeaders .addBytes ("test-header-key-2" , "test-header-value-2" .getBytes (StandardCharsets .UTF_8 ));
276288 return connectHeaders ;
277289 }
278290
279- private ConnectHeaders readHeaders (String s ) {
291+ private ConnectHeaders readHeaders (final String s ) {
280292 final ConnectHeaders connectHeaders = new ConnectHeaders ();
281293 final String [] headers = s .split (";" );
282294 for (final String header : headers ) {
0 commit comments