Skip to content

Commit e90f157

Browse files
authored
feat: add unit test and convering client-side issues (#160)
1 parent c4a3cf2 commit e90f157

19 files changed

+887
-74
lines changed

pom.xml

+11-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
<maven.compiler.source>1.7</maven.compiler.source>
6060
<maven.compiler.target>1.7</maven.compiler.target>
6161
<hadoop.version>3.4.0</hadoop.version>
62-
<cos_api.version>5.6.228</cos_api.version>
62+
<cos_api.version>5.6.240</cos_api.version>
6363
<junit.version>4.13.2</junit.version>
6464
<google.guava.version>24.1.1-jre</google.guava.version>
6565
<commons_lang3.version>3.1</commons_lang3.version>
@@ -224,6 +224,16 @@
224224
</execution>
225225
</executions>
226226
</plugin>
227+
<plugin>
228+
<groupId>org.apache.maven.plugins</groupId>
229+
<artifactId>maven-surefire-plugin</artifactId>
230+
<version>2.22.2</version>
231+
<configuration>
232+
<includes>
233+
<include>**/*</include>
234+
</includes>
235+
</configuration>
236+
</plugin>
227237
</plugins>
228238
</build>
229239
</project>

src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
2323
String path = "META-INF/maven/com.qcloud.cos/hadoop-cos/pom.properties";
2424
Properties properties = new Properties();
2525
try (InputStream in = CosNConfigKeys.class.getClassLoader().getResourceAsStream(path)) {
26-
properties.load(in);
27-
version = properties.getProperty("version");
26+
if (in == null) {
27+
version = "unknown";
28+
} else {
29+
properties.load(in);
30+
version = properties.getProperty("version");
31+
}
2832
} catch (IOException e) {
2933
version = "unknown";
3034
}

src/main/java/org/apache/hadoop/fs/CosNFileSystem.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.apache.hadoop.util.Progressable;
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
22-
2322
import java.io.FileNotFoundException;
2423
import java.io.IOException;
2524
import java.io.InputStream;
@@ -190,7 +189,7 @@ public void initialize(URI uri, Configuration conf) throws IOException {
190189
new RejectedExecutionHandler() {
191190
@Override
192191
public void rejectedExecution(Runnable r,
193-
ThreadPoolExecutor executor) {
192+
ThreadPoolExecutor executor) {
194193
if (!executor.isShutdown()) {
195194
try {
196195
executor.getQueue().put(r);
@@ -221,7 +220,7 @@ public void rejectedExecution(Runnable r,
221220
new RejectedExecutionHandler() {
222221
@Override
223222
public void rejectedExecution(Runnable r,
224-
ThreadPoolExecutor executor) {
223+
ThreadPoolExecutor executor) {
225224
if (!executor.isShutdown()) {
226225
try {
227226
executor.getQueue().put(r);
@@ -257,7 +256,7 @@ public void rejectedExecution(Runnable r,
257256
new RejectedExecutionHandler() {
258257
@Override
259258
public void rejectedExecution(Runnable r,
260-
ThreadPoolExecutor executor) {
259+
ThreadPoolExecutor executor) {
261260
if (!executor.isShutdown()) {
262261
try {
263262
executor.getQueue().put(r);
@@ -1537,6 +1536,7 @@ protected Path resolveLink(Path f) throws IOException {
15371536

15381537
@Override
15391538
public void close() throws IOException {
1539+
LOG.debug("Filesystem {} is closed", uri);
15401540
try {
15411541
// 先释放掉 IO 线程池以及相关的 IO 资源。
15421542
try {

src/main/java/org/apache/hadoop/fs/CosNUtils.java

+24
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.lang.reflect.Method;
1717
import java.lang.reflect.Modifier;
1818
import java.net.URI;
19+
import java.nio.file.Paths;
1920
import java.util.HashMap;
2021
import java.util.Map;
2122
import java.util.concurrent.TimeUnit;
@@ -329,4 +330,27 @@ public static Path keyToPath(String key, String pathDelimiter) {
329330
return new Path(key);
330331
}
331332
}
333+
334+
public static boolean checkDirectoryRWPermissions(String directoryPath) {
335+
java.nio.file.Path path = Paths.get(directoryPath);
336+
337+
// check dir is existed
338+
if (!java.nio.file.Files.exists(path)) {
339+
LOG.error("the directory {} is not exist", directoryPath);
340+
return false;
341+
}
342+
343+
// check the input is a dir
344+
if (!java.nio.file.Files.isDirectory(path)) {
345+
LOG.error("the {} is not a directory", directoryPath);
346+
return false;
347+
}
348+
349+
// check read permission
350+
boolean isReadable = java.nio.file.Files.isReadable(path);
351+
// check write permission
352+
boolean isWritable = java.nio.file.Files.isWritable(path);
353+
354+
return isReadable && isWritable;
355+
}
332356
}

src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java

+9
Original file line numberDiff line numberDiff line change
@@ -2018,6 +2018,15 @@ private <X> Object innerCallWithRetry(X request) throws CosServiceException, IOE
20182018
} else {
20192019
throw cse;
20202020
}
2021+
} catch (IllegalStateException e) {
2022+
String message = e.getMessage();
2023+
if (message.contains("Connection pool shut down")) {
2024+
throw new IOException("call cos happen error which http connection pool has shutdown,"
2025+
+ "please check whether the file system is closed or the program has an OOM, , exception:"
2026+
+ " {}", e);
2027+
} else {
2028+
throw new IOException(e);
2029+
}
20212030
} catch (Exception e) {
20222031
throw new IOException(e);
20232032
}

src/main/java/org/apache/hadoop/fs/cosn/BufferPool.java

+7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.apache.hadoop.fs.cosn;
22

33
import org.apache.hadoop.conf.Configuration;
4+
import org.apache.hadoop.fs.CosNUtils;
45
import org.apache.hadoop.fs.cosn.buffer.*;
56
import org.apache.hadoop.fs.CosNConfigKeys;
67
import org.slf4j.Logger;
@@ -121,6 +122,12 @@ public synchronized void initialize(Configuration conf)
121122
} else if (this.bufferType == CosNBufferType.MAPPED_DISK) {
122123
String tmpDir = conf.get(CosNConfigKeys.COSN_TMP_DIR,
123124
CosNConfigKeys.DEFAULT_TMP_DIR);
125+
// Check whether you have read and write permissions for the directory during initialization
126+
if (!CosNUtils.checkDirectoryRWPermissions(tmpDir)) {
127+
String exceptionMsg = String.format("The tmp dir does not have read or write permissions." +
128+
"dir: %s", tmpDir);
129+
throw new IllegalArgumentException(exceptionMsg);
130+
}
124131
boolean deleteOnExit = conf.getBoolean(CosNConfigKeys.COSN_MAPDISK_DELETEONEXIT_ENABLED,
125132
CosNConfigKeys.DEFAULT_COSN_MAPDISK_DELETEONEXIT_ENABLED);
126133
String[] tmpDirList = tmpDir.split(",");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package org.apache.hadoop.fs;
2+
3+
import org.apache.commons.lang3.RandomStringUtils;
4+
import org.apache.hadoop.conf.Configuration;
5+
import org.junit.After;
6+
import org.junit.AfterClass;
7+
import org.junit.Before;
8+
import org.junit.BeforeClass;
9+
10+
import java.io.IOException;
11+
import java.util.Arrays;
12+
13+
public class CosNFileSystemTestBase extends CosNFileSystemTestWithTimeout {
14+
protected static Configuration configuration;
15+
protected static FileSystem fs;
16+
17+
protected static final Path unittestDirPath = new Path("/unittest-dir" + RandomStringUtils.randomAlphanumeric(8));
18+
protected final Path testDirPath = new Path(unittestDirPath, "test-dir");
19+
protected final Path testFilePath = new Path(unittestDirPath, "test-file");
20+
21+
@BeforeClass
22+
public static void beforeClass() throws IOException {
23+
String configFilePath = System.getProperty("config.file");
24+
configuration = new Configuration();
25+
// 初始化文件系统对象,因为 core-site.xml 是在 test 的 resource 下面,因此应该能够正确加载到。
26+
if (configFilePath != null) {
27+
// 使用 addResource 方法加载配置文件
28+
configuration.addResource(new Path(configFilePath));
29+
}
30+
// 考虑到是针对 CosNFileSystem 测试,因此强制设置为 CosNFileSystem。
31+
configuration.set("fs.cosn.impl", "org.apache.hadoop.fs.CosNFileSystem");
32+
fs = FileSystem.get(configuration);
33+
34+
if (null != fs && !fs.exists(unittestDirPath)) {
35+
fs.mkdirs(unittestDirPath);
36+
}
37+
}
38+
39+
@AfterClass
40+
public static void afterClass() throws IOException {
41+
if (null != fs && fs.exists(unittestDirPath)) {
42+
fs.delete(unittestDirPath, true);
43+
}
44+
if (null != fs) {
45+
fs.close();
46+
}
47+
}
48+
49+
@Before
50+
public void before() throws IOException {
51+
if (!fs.exists(testDirPath)) {
52+
fs.mkdirs(testDirPath);
53+
}
54+
if (!fs.exists(testFilePath)) {
55+
try (FSDataOutputStream fsDataOutputStream = fs.create(testFilePath)) {
56+
fsDataOutputStream.write("Hello, World!".getBytes());
57+
fsDataOutputStream.write("\n".getBytes());
58+
fsDataOutputStream.write("Hello, COS!".getBytes());
59+
}
60+
}
61+
}
62+
63+
@After
64+
public void after() throws IOException {
65+
if (fs.exists(testFilePath)) {
66+
fs.delete(testFilePath, true);
67+
}
68+
if (fs.exists(testDirPath)) {
69+
fs.delete(testDirPath, true);
70+
}
71+
}
72+
73+
/**
74+
* Return a path bonded to this method name, unique to this fork during
75+
* parallel execution.
76+
*
77+
* @return a method name unique to (fork, method).
78+
* @throws IOException IO problems
79+
*/
80+
protected Path methodPath() throws IOException {
81+
return new Path(unittestDirPath, methodName.getMethodName());
82+
}
83+
84+
/*
85+
* Helper method that creates test data of size provided by the
86+
* "size" parameter.
87+
*/
88+
protected static byte[] getTestData(int size) {
89+
byte[] testData = new byte[size];
90+
System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size);
91+
return testData;
92+
}
93+
94+
// Helper method to create file and write fileSize bytes of data on it.
95+
protected byte[] createBaseFileWithData(int fileSize, Path testPath) throws Throwable {
96+
97+
try (FSDataOutputStream createStream = fs.create(testPath)) {
98+
byte[] fileData = null;
99+
100+
if (fileSize != 0) {
101+
fileData = getTestData(fileSize);
102+
createStream.write(fileData);
103+
}
104+
return fileData;
105+
}
106+
}
107+
108+
/*
109+
* Helper method to verify a testFile data.
110+
*/
111+
protected boolean verifyFile(byte[] testData, Path testFile) {
112+
113+
try (FSDataInputStream srcStream = fs.open(testFile)) {
114+
115+
int baseBufferSize = 2048;
116+
int testDataSize = testData.length;
117+
int testDataIndex = 0;
118+
119+
while (testDataSize > baseBufferSize) {
120+
121+
if (!verifyFileData(baseBufferSize, testData, testDataIndex, srcStream)) {
122+
return false;
123+
}
124+
testDataIndex += baseBufferSize;
125+
testDataSize -= baseBufferSize;
126+
}
127+
128+
if (!verifyFileData(testDataSize, testData, testDataIndex, srcStream)) {
129+
return false;
130+
}
131+
132+
return true;
133+
} catch (Exception ex) {
134+
return false;
135+
}
136+
}
137+
138+
/*
139+
* Helper method to verify a file data equal to "dataLength" parameter
140+
*/
141+
protected boolean verifyFileData(int dataLength, byte[] testData, int testDataIndex, FSDataInputStream srcStream) {
142+
143+
try {
144+
145+
byte[] fileBuffer = new byte[dataLength];
146+
byte[] testDataBuffer = new byte[dataLength];
147+
148+
int fileBytesRead = srcStream.read(fileBuffer);
149+
150+
if (fileBytesRead < dataLength) {
151+
return false;
152+
}
153+
154+
System.arraycopy(testData, testDataIndex, testDataBuffer, 0, dataLength);
155+
156+
if (!Arrays.equals(fileBuffer, testDataBuffer)) {
157+
return false;
158+
}
159+
160+
return true;
161+
162+
} catch (Exception ex) {
163+
return false;
164+
}
165+
166+
}
167+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.apache.hadoop.fs;
2+
3+
import org.junit.Assert;
4+
import org.junit.Before;
5+
import org.junit.BeforeClass;
6+
import org.junit.Rule;
7+
import org.junit.rules.TestName;
8+
import org.junit.rules.Timeout;
9+
10+
public class CosNFileSystemTestWithTimeout extends Assert {
11+
/**
12+
* The name of the current method.
13+
*/
14+
@Rule
15+
public TestName methodName = new TestName();
16+
/**
17+
* Set the timeout for every test.
18+
* This is driven by the value returned by {@link #getTestTimeoutMillis()}.
19+
*/
20+
@Rule
21+
public Timeout testTimeout = new Timeout(getTestTimeoutMillis());
22+
23+
/**
24+
* Name the junit thread for the class. This will overridden
25+
* before the individual test methods are run.
26+
*/
27+
@BeforeClass
28+
public static void nameTestThread() {
29+
Thread.currentThread().setName("JUnit");
30+
}
31+
32+
/**
33+
* Name the thread to the current test method.
34+
*/
35+
@Before
36+
public void nameThread() {
37+
Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
38+
}
39+
40+
/**
41+
* Override point: the test timeout in milliseconds.
42+
* @return a timeout in milliseconds
43+
*/
44+
protected int getTestTimeoutMillis() {
45+
return 60 * 10 * 1000;
46+
}
47+
48+
}

0 commit comments

Comments
 (0)