1818package org .apache .seatunnel .e2e .connector .hudi ;
1919
2020import org .apache .seatunnel .common .utils .FileUtils ;
21+ import org .apache .seatunnel .e2e .common .container .seatunnel .SeaTunnelContainer ;
2122import org .apache .seatunnel .e2e .common .util .ContainerUtil ;
22- import org .apache .seatunnel .engine .e2e .SeaTunnelContainer ;
2323
2424import org .apache .hadoop .conf .Configuration ;
2525import org .apache .hadoop .fs .LocalFileSystem ;
3232import org .junit .jupiter .api .Assertions ;
3333import org .junit .jupiter .api .BeforeAll ;
3434import org .junit .jupiter .api .Test ;
35+ import org .junit .jupiter .api .TestInstance ;
3536import org .testcontainers .containers .Container ;
3637import org .testcontainers .containers .MinIOContainer ;
3738
3839import io .minio .BucketExistsArgs ;
39- import io .minio .DownloadObjectArgs ;
40- import io .minio .ListObjectsArgs ;
4140import io .minio .MakeBucketArgs ;
4241import io .minio .MinioClient ;
43- import io .minio .Result ;
44- import io .minio .messages .Item ;
4542import lombok .extern .slf4j .Slf4j ;
4643
4744import java .io .IOException ;
5148import static org .awaitility .Awaitility .given ;
5249
5350@ Slf4j
51+ @ TestInstance (TestInstance .Lifecycle .PER_CLASS )
5452public class HudiSeatunnelS3MultiTableIT extends SeaTunnelContainer {
5553
5654 private static final String MINIO_DOCKER_IMAGE = "minio/minio:RELEASE.2024-06-13T22-53-53Z" ;
@@ -88,7 +86,6 @@ public void startUp() throws Exception {
8886
8987 String s3URL = container .getS3URL ();
9088
91- // configuringClient
9289 minioClient =
9390 MinioClient .builder ()
9491 .endpoint (s3URL )
@@ -111,11 +108,13 @@ protected String[] buildStartCommand() {
111108 "wget -P "
112109 + SEATUNNEL_HOME
113110 + "lib "
111+ + " --timeout=180 "
114112 + AWS_SDK_DOWNLOAD
115113 + " &&"
116114 + "wget -P "
117115 + SEATUNNEL_HOME
118116 + "lib "
117+ + " --timeout=180 "
119118 + HADOOP_AWS_DOWNLOAD
120119 + " &&"
121120 + ContainerUtil .adaptPathForWin (
@@ -132,90 +131,81 @@ public void tearDown() throws Exception {
132131 }
133132 }
134133
134+ @ Override
135+ protected boolean isIssueWeAlreadyKnow (String threadName ) {
136+ return super .isIssueWeAlreadyKnow (threadName )
137+ // hudi with s3
138+ || threadName .startsWith ("s3a-transfer" );
139+ }
140+
135141 @ Test
136142 public void testS3MultiWrite () throws IOException , InterruptedException {
137143 copyFileToContainer ("/hudi/core-site.xml" , "/tmp/seatunnel/config/core-site.xml" );
138- Container .ExecResult textWriteResult = executeSeaTunnelJob ("/hudi/s3_fake_to_hudi.conf" );
144+ Container .ExecResult textWriteResult = executeJob ("/hudi/s3_fake_to_hudi.conf" );
139145 Assertions .assertEquals (0 , textWriteResult .getExitCode ());
140146 Configuration configuration = new Configuration ();
141147 configuration .set ("fs.defaultFS" , LocalFileSystem .DEFAULT_FS );
142- given ().ignoreExceptions ()
148+ given ().pollDelay (10 , TimeUnit .SECONDS )
149+ .pollInterval (1 , TimeUnit .SECONDS )
143150 .await ()
144- .atMost (60000 , TimeUnit .MILLISECONDS )
151+ .atMost (300 , TimeUnit .SECONDS )
145152 .untilAsserted (
146153 () -> {
147154 // copy hudi to local
148- Path inputPath1 =
149- downloadNewestCommitFile (DATABASE_1 + "/" + TABLE_NAME_1 + "/" );
150- Path inputPath2 =
151- downloadNewestCommitFile (DATABASE_2 + "/" + TABLE_NAME_2 + "/" );
152- ParquetReader <Group > reader1 =
153- ParquetReader .builder (new GroupReadSupport (), inputPath1 )
154- .withConf (configuration )
155- .build ();
156- ParquetReader <Group > reader2 =
157- ParquetReader .builder (new GroupReadSupport (), inputPath2 )
158- .withConf (configuration )
159- .build ();
160-
161- long rowCount1 = 0 ;
162- long rowCount2 = 0 ;
163- // Read data and count rows
164- while (reader1 .read () != null ) {
165- rowCount1 ++;
166- }
167- // Read data and count rows
168- while (reader2 .read () != null ) {
169- rowCount2 ++;
155+ Path inputPath1 = null ;
156+ Path inputPath2 = null ;
157+ try {
158+ inputPath1 =
159+ new Path (
160+ MinIoUtils .downloadNewestCommitFile (
161+ minioClient ,
162+ BUCKET ,
163+ String .format (
164+ "%s/%s/" , DATABASE_1 , TABLE_NAME_1 ),
165+ DOWNLOAD_PATH ));
166+ log .info (
167+ "download from s3 success, the parquet file is at: {}" ,
168+ inputPath1 );
169+ inputPath2 =
170+ new Path (
171+ MinIoUtils .downloadNewestCommitFile (
172+ minioClient ,
173+ BUCKET ,
174+ String .format (
175+ "%s/%s/" , DATABASE_2 , TABLE_NAME_2 ),
176+ DOWNLOAD_PATH ));
177+ log .info (
178+ "download from s3 success, the parquet file is at: {}" ,
179+ inputPath2 );
180+ ParquetReader <Group > reader1 =
181+ ParquetReader .builder (new GroupReadSupport (), inputPath1 )
182+ .withConf (configuration )
183+ .build ();
184+ ParquetReader <Group > reader2 =
185+ ParquetReader .builder (new GroupReadSupport (), inputPath2 )
186+ .withConf (configuration )
187+ .build ();
188+
189+ long rowCount1 = 0 ;
190+ long rowCount2 = 0 ;
191+ // Read data and count rows
192+ while (reader1 .read () != null ) {
193+ rowCount1 ++;
194+ }
195+ // Read data and count rows
196+ while (reader2 .read () != null ) {
197+ rowCount2 ++;
198+ }
199+ Assertions .assertEquals (100 , rowCount1 );
200+ Assertions .assertEquals (240 , rowCount2 );
201+ } finally {
202+ if (inputPath1 != null ) {
203+ FileUtils .deleteFile (inputPath1 .toUri ().getPath ());
204+ }
205+ if (inputPath2 != null ) {
206+ FileUtils .deleteFile (inputPath2 .toUri ().getPath ());
207+ }
170208 }
171- FileUtils .deleteFile (inputPath1 .toUri ().getPath ());
172- FileUtils .deleteFile (inputPath2 .toUri ().getPath ());
173- Assertions .assertEquals (100 , rowCount1 );
174- Assertions .assertEquals (240 , rowCount2 );
175209 });
176210 }
177-
178- public Path downloadNewestCommitFile (String pathPrefix ) throws IOException {
179- Iterable <Result <Item >> listObjects =
180- minioClient .listObjects (
181- ListObjectsArgs .builder ().bucket (BUCKET ).prefix (pathPrefix ).build ());
182- String newestCommitFileabsolutePath = "" ;
183- String newestCommitFileName = "" ;
184- long newestCommitTime = 0L ;
185- for (Result <Item > listObject : listObjects ) {
186- Item item ;
187- try {
188- item = listObject .get ();
189- } catch (Exception e ) {
190- throw new IOException ("List minio file error." , e );
191- }
192- if (item .isDir () || !item .objectName ().endsWith (".parquet" )) {
193- continue ;
194- }
195- long fileCommitTime =
196- Long .parseLong (
197- item .objectName ()
198- .substring (
199- item .objectName ().lastIndexOf ("_" ) + 1 ,
200- item .objectName ().lastIndexOf (".parquet" )));
201- if (fileCommitTime > newestCommitTime ) {
202- newestCommitFileabsolutePath = item .objectName ();
203- newestCommitFileName =
204- newestCommitFileabsolutePath .substring (
205- item .objectName ().lastIndexOf ("/" ) + 1 );
206- newestCommitTime = fileCommitTime ;
207- }
208- }
209- try {
210- minioClient .downloadObject (
211- DownloadObjectArgs .builder ()
212- .bucket (BUCKET )
213- .object (newestCommitFileabsolutePath )
214- .filename (DOWNLOAD_PATH + newestCommitFileName )
215- .build ());
216- } catch (Exception e ) {
217- log .error ("Download file from minio error." );
218- }
219- return new Path (DOWNLOAD_PATH + newestCommitFileName );
220- }
221211}
0 commit comments