@@ -22,9 +22,13 @@ import org.apache.paimon.catalog.Identifier
2222import org .apache .paimon .fs .Path
2323import org .apache .paimon .spark .PaimonHiveTestBase
2424import org .apache .paimon .table .FormatTable
25+ import org .apache .paimon .utils .{FileIOUtils , FileUtils }
2526
2627import org .apache .spark .sql .Row
2728
29+ import java .io .{File , FileInputStream , FileOutputStream }
30+ import java .util .zip .GZIPOutputStream
31+
2832abstract class FormatTableTestBase extends PaimonHiveTestBase {
2933
3034 override protected def beforeEach (): Unit = {
@@ -55,4 +59,46 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase {
5559 }
5660 }
5761 }
62+
63+ test(" Format table: read compressed files" ) {
64+ for (format <- Seq (" csv" , " json" )) {
65+ withTable(" compress_t" ) {
66+ sql(s " CREATE TABLE compress_t (a INT, b INT, c INT) USING $format" )
67+ sql(" INSERT INTO compress_t VALUES (1, 2, 3)" )
68+ val table =
69+ paimonCatalog
70+ .getTable(Identifier .create(" default" , " compress_t" ))
71+ .asInstanceOf [FormatTable ]
72+ val fileIO = table.fileIO()
73+ val file = fileIO
74+ .listStatus(new Path (table.location()))
75+ .filter(file => ! file.getPath.getName.startsWith(" ." ))
76+ .head
77+ .getPath
78+ .toUri
79+ .getPath
80+ compressGzipFile(file, file + " .gz" )
81+ fileIO.deleteQuietly(new Path (file))
82+ checkAnswer(sql(" SELECT * FROM compress_t" ), Row (1 , 2 , 3 ))
83+ }
84+ }
85+ }
86+
87+ def compressGzipFile (src : String , dest : String ): Unit = {
88+ val fis = new FileInputStream (src)
89+ val fos = new FileOutputStream (dest)
90+ val gzipOs = new GZIPOutputStream (fos)
91+ val buffer = new Array [Byte ](1024 )
92+ var bytesRead = 0
93+ while (true ) {
94+ bytesRead = fis.read(buffer)
95+ if (bytesRead == - 1 ) {
96+ fis.close()
97+ gzipOs.close()
98+ return
99+ }
100+
101+ gzipOs.write(buffer, 0 , bytesRead)
102+ }
103+ }
58104}
0 commit comments