Skip to content

Commit e20b752

Browse files
committed
add test to flink
1 parent 6930863 commit e20b752

File tree

3 files changed

+80
-20
lines changed

3 files changed

+80
-20
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.utils;
20+
21+
import java.io.FileInputStream;
22+
import java.io.FileOutputStream;
23+
import java.io.IOException;
24+
import java.util.zip.GZIPOutputStream;
25+
26+
/** Compress utils. */
27+
public class CompressUtils {
28+
29+
public static void gzipCompressFile(String src, String dest) throws IOException {
30+
FileInputStream fis = new FileInputStream(src);
31+
FileOutputStream fos = new FileOutputStream(dest);
32+
GZIPOutputStream gzipOs = new GZIPOutputStream(fos);
33+
byte[] buffer = new byte[1024];
34+
int bytesRead;
35+
while (true) {
36+
bytesRead = fis.read(buffer);
37+
if (bytesRead == -1) {
38+
fis.close();
39+
gzipOs.close();
40+
return;
41+
}
42+
43+
gzipOs.write(buffer, 0, bytesRead);
44+
}
45+
}
46+
}

paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@
1818

1919
package org.apache.paimon.hive;
2020

21+
import org.apache.paimon.flink.FormatCatalogTable;
22+
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.fs.Path;
2124
import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
25+
import org.apache.paimon.table.FormatTable;
26+
import org.apache.paimon.utils.CompressUtils;
2227

2328
import com.klarna.hiverunner.HiveShell;
2429
import com.klarna.hiverunner.annotations.HiveSQL;
2530
import org.apache.flink.table.api.EnvironmentSettings;
2631
import org.apache.flink.table.api.TableEnvironment;
2732
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
33+
import org.apache.flink.table.catalog.ObjectPath;
2834
import org.apache.flink.types.Row;
2935
import org.apache.flink.util.CloseableIterator;
3036
import org.junit.Rule;
@@ -40,6 +46,7 @@
4046
import java.lang.annotation.Target;
4147
import java.time.LocalDateTime;
4248
import java.util.ArrayList;
49+
import java.util.Arrays;
4350
import java.util.HashMap;
4451
import java.util.List;
4552
import java.util.Map;
@@ -215,6 +222,31 @@ private void doTestJSONFormatTable(String tableName) throws Exception {
215222
.containsExactlyInAnyOrder(
216223
Row.of(1, LocalDateTime.parse("2025-03-17T10:15:30")),
217224
Row.of(2, LocalDateTime.parse("2025-03-18T10:15:30")));
225+
226+
// test compression
227+
FormatCatalogTable catalogTable =
228+
(FormatCatalogTable)
229+
tEnv.getCatalog(tEnv.getCurrentCatalog())
230+
.get()
231+
.getTable(new ObjectPath(tEnv.getCurrentDatabase(), tableName));
232+
FormatTable table = catalogTable.table();
233+
FileIO fileIO = table.fileIO();
234+
String file =
235+
Arrays.stream(fileIO.listStatus(new Path(table.location())))
236+
.filter(status -> !status.getPath().getName().startsWith("."))
237+
.findFirst()
238+
.get()
239+
.getPath()
240+
.toUri()
241+
.getPath();
242+
CompressUtils.gzipCompressFile(file, file + ".gz");
243+
fileIO.deleteQuietly(new Path(file));
244+
245+
assertThat(collect(String.format("SELECT * FROM %s", tableName)))
246+
.containsExactlyInAnyOrder(
247+
Row.of(1, LocalDateTime.parse("2025-03-17T10:15:30")),
248+
Row.of(2, LocalDateTime.parse("2025-03-18T10:15:30")));
249+
218250
tEnv.executeSql(
219251
String.format(
220252
"INSERT INTO %s VALUES (3, CAST('2025-03-19 10:15:30' AS TIMESTAMP))",

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.paimon.catalog.Identifier
2222
import org.apache.paimon.fs.Path
2323
import org.apache.paimon.spark.PaimonHiveTestBase
2424
import org.apache.paimon.table.FormatTable
25-
import org.apache.paimon.utils.{FileIOUtils, FileUtils}
25+
import org.apache.paimon.utils.{CompressUtils, FileIOUtils, FileUtils}
2626

2727
import org.apache.spark.sql.Row
2828

@@ -77,28 +77,10 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase {
7777
.getPath
7878
.toUri
7979
.getPath
80-
compressGzipFile(file, file + ".gz")
80+
CompressUtils.gzipCompressFile(file, file + ".gz")
8181
fileIO.deleteQuietly(new Path(file))
8282
checkAnswer(sql("SELECT * FROM compress_t"), Row(1, 2, 3))
8383
}
8484
}
8585
}
86-
87-
def compressGzipFile(src: String, dest: String): Unit = {
88-
val fis = new FileInputStream(src)
89-
val fos = new FileOutputStream(dest)
90-
val gzipOs = new GZIPOutputStream(fos)
91-
val buffer = new Array[Byte](1024)
92-
var bytesRead = 0
93-
while (true) {
94-
bytesRead = fis.read(buffer)
95-
if (bytesRead == -1) {
96-
fis.close()
97-
gzipOs.close()
98-
return
99-
}
100-
101-
gzipOs.write(buffer, 0, bytesRead)
102-
}
103-
}
10486
}

0 commit comments

Comments
 (0)