Skip to content

Commit dc81fe7

Browse files
authored
Merge pull request #54 from vintmd/master
add the 503 mpu retry, support get client from native store and fix t…
2 parents 3288399 + 052a511 commit dc81fe7

File tree

3 files changed

+31
-5
lines changed

3 files changed

+31
-5
lines changed

deploy.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ do
2121
# 外部maven 中央仓库
2222
deploy_repository_id="oss"
2323
deploy_repository_url="https://oss.sonatype.org/service/local/staging/deploy/maven2"
24-
elif ["$OPT" = "$INTER" ]; then
24+
elif [ "$OPT" = "$INTER" ]; then
2525
deploy_repository_id="cos-inner-maven-repository"
2626
deploy_repository_url="http://mirrors.tencent.com/repository/maven/QCLOUD_COS"
2727
fi

src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java

+26-4
Original file line numberDiff line numberDiff line change
@@ -1437,17 +1437,15 @@ private <X> Object callCOSClientWithRetry(X request) throws CosServiceException,
14371437
String errorCode = cse.getErrorCode();
14381438
LOG.debug("fail to retry statusCode {}, errorCode {}", statusCode, errorCode);
14391439
// 对5xx错误进行重试
1440-
if (request instanceof CopyObjectRequest && statusCode / 100 == 2
1441-
&& errorCode != null && !errorCode.isEmpty()) {
1440+
if (request instanceof CopyObjectRequest && hasErrorCode(statusCode, errorCode)) {
14421441
if (retryIndex <= this.maxRetryTimes) {
14431442
LOG.info(errMsg, cse);
14441443
++retryIndex;
14451444
} else {
14461445
LOG.error(errMsg, cse);
14471446
throw new IOException(errMsg);
14481447
}
1449-
} else if (request instanceof CompleteMultipartUploadRequest && statusCode / 100 ==2
1450-
&& errorCode != null && !errorCode.isEmpty()) {
1448+
} else if (request instanceof CompleteMultipartUploadRequest && hasErrorCode(statusCode, errorCode)) {
14511449
// complete mpu error code might be in body when status code is 200
14521450
// double check to head object only works in big data job case which key is not same.
14531451
String key = ((CompleteMultipartUploadRequest) request).getKey();
@@ -1507,6 +1505,21 @@ private <X> Object callCOSClientWithRetry(X request) throws CosServiceException,
15071505
throw cse;
15081506
}
15091507
}
1508+
1509+
// mpu might occur 503 access time out but already completed,
1510+
// if direct retry may occur 403 not found the upload id.
1511+
if (request instanceof CompleteMultipartUploadRequest && statusCode == 503) {
1512+
String key = ((CompleteMultipartUploadRequest) request).getKey();
1513+
FileMetadata fileMetadata = this.queryObjectMetadata(key);
1514+
if (null != fileMetadata) {
1515+
// if file exist direct return.
1516+
LOG.info("complete mpu error might access time out, " +
1517+
"but key {} already exist, length {}",
1518+
key, fileMetadata.getLength());
1519+
return new CompleteMultipartUploadResult();
1520+
}
1521+
}
1522+
15101523
Thread.sleep(
15111524
ThreadLocalRandom.current().nextLong(sleepLeast, sleepBound));
15121525
++retryIndex;
@@ -1529,6 +1542,15 @@ private <X> Object callCOSClientWithRetry(X request) throws CosServiceException,
15291542
private static String ensureValidAttributeName(String attributeName) {
15301543
return attributeName.replace('.', '-').toLowerCase();
15311544
}
1545+
1546+
private boolean hasErrorCode(int statusCode, String errCode) {
1547+
return statusCode / 100 == 2 && errCode != null && !errCode.isEmpty();
1548+
}
1549+
1550+
public COSClient getCOSClient(){
1551+
return this.cosClient;
1552+
}
1553+
15321554
private String getPluginVersionInfo() {
15331555
Properties versionProperties = new Properties();
15341556
InputStream inputStream= null;

src/main/java/org/apache/hadoop/fs/NativeFileSystemStore.java

+4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.apache.hadoop.classification.InterfaceAudience;
77
import org.apache.hadoop.classification.InterfaceStability;
88
import org.apache.hadoop.conf.Configuration;
9+
import com.qcloud.cos.COSClient;
910

1011
import java.io.File;
1112
import java.io.IOException;
@@ -24,6 +25,9 @@ public interface NativeFileSystemStore {
2425

2526
void initialize(URI uri, Configuration conf) throws IOException;
2627

28+
// must init first
29+
COSClient getCOSClient();
30+
2731
HeadBucketResult headBucket(String bucketName) throws IOException;
2832

2933
void storeFile(String key, File file, byte[] md5Hash) throws IOException;

0 commit comments

Comments
 (0)