Skip to content

Commit 63f9ed3

Browse files
wenhaizhumetafacebook-github-bot
authored andcommitted
CloudFileUploader - GCS
Summary: Diff for supporting multipart upload in GCP. Interesting that GCP Client Libarary doesn't support multipart upload (source: https://cloud.google.com/storage/docs/uploads-downloads#support_per_tool). There is a comment in stackoverflower (https://stackoverflow.com/a/71698884) mentioned that "You can do Multipart upload to GCS using the S3 library, as the GCS XML API is compatible with this. There is a guide to using the S3 library with GCS here. " So I'm following it here. Differential Revision: D37490665 fbshipit-source-id: 66ba1c24d0f991b39c228a619cd82d175c5512af
1 parent d18f8ed commit 63f9ed3

8 files changed

Lines changed: 224 additions & 8 deletions

File tree

fbpcf/aws/S3Util.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ std::unique_ptr<Aws::S3::S3Client> createS3Client(
8484
const S3ClientOption& option) {
8585
Aws::Client::ClientConfiguration config;
8686

87+
if (option.endpointOverride.has_value()) {
88+
config.endpointOverride = option.endpointOverride.value();
89+
} else if (std::getenv("AWS_ENDPOINT_OVERRIDE")) {
90+
config.endpointOverride = std::getenv("AWS_ENDPOINT_OVERRIDE");
91+
}
92+
8793
if (option.region.has_value()) {
8894
config.region = option.region.value();
8995
} else if (std::getenv("AWS_DEFAULT_REGION")) {

fbpcf/aws/S3Util.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ namespace fbpcf::aws {
1717
// referencee of environment variables:
1818
// https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html
1919
struct S3ClientOption {
20+
// AWS_ENDPOINT_OVERRIDE
21+
std::optional<std::string> endpointOverride;
2022
// AWS_DEFAULT_REGION
2123
std::optional<std::string> region;
2224
// AWS_ACCESS_KEY_ID

fbpcf/gcp/GCSUtil.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
namespace fbpcf::gcp {
1919
// Format:
2020
// 1. https://storage.cloud.google.com/bucket-name/key-name
21-
// 2. gs://bucket-name/key-name
21+
// 2. https://bucket-name.storage.googleapis.com/key-name
22+
// 3. https://storage.googleapis.com/bucket-name/key-name
23+
// 4. gs://bucket-name/key-name
2224
GCSObjectReference uriToObjectReference(std::string url) {
2325
std::string bucket;
2426
std::string key;
@@ -37,6 +39,8 @@ GCSObjectReference uriToObjectReference(std::string url) {
3739

3840
if (boost::iequals(scheme, "gs")) {
3941
bucket = host;
42+
} else if (host.ends_with(".storage.googleapis.com")) {
43+
bucket = host.substr(0, host.find_first_of("."));
4044
} else {
4145
// Remove the first character '/' in path
4246
path = path.substr(1);

fbpcf/gcp/test/GCSUtilTest.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,22 @@ TEST(GCSUtil, uriToObjectReference) {
2323
EXPECT_EQ("key", ref.key);
2424
}
2525

26+
TEST(GCSUtil, uriToObjectReference_virtualHostStyle) {
27+
auto uri = "https://bucket-name.storage.googleapis.com/key-name";
28+
auto ref = fbpcf::gcp::uriToObjectReference(uri);
29+
30+
EXPECT_EQ("bucket-name", ref.bucket);
31+
EXPECT_EQ("key-name", ref.key);
32+
}
33+
34+
TEST(GCSUtil, uriToObjectReference_pathStyle) {
35+
auto uri = "https://storage.googleapis.com/bucket-name/key-name";
36+
auto ref = fbpcf::gcp::uriToObjectReference(uri);
37+
38+
EXPECT_EQ("bucket-name", ref.bucket);
39+
EXPECT_EQ("key-name", ref.key);
40+
}
41+
2642
TEST(GCSUtil, uriToObjectReference_Subfolder) {
2743
auto uri = "https://storage.cloud.google.com/bucket/folder/key";
2844
auto ref = fbpcf::gcp::uriToObjectReference(uri);

fbpcf/io/cloud_util/CloudFileUtil.cpp

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
*/
77

88
#include "fbpcf/io/cloud_util/CloudFileUtil.h"
9+
#include <aws/s3/S3Client.h>
910
#include <re2/re2.h>
1011
#include "fbpcf/aws/S3Util.h"
1112
#include "fbpcf/exception/PcfException.h"
13+
#include "fbpcf/gcp/GCSUtil.h"
14+
#include "fbpcf/io/cloud_util/GCSFileUploader.h"
1215
#include "fbpcf/io/cloud_util/S3Client.h"
1316
#include "fbpcf/io/cloud_util/S3FileReader.h"
1417
#include "fbpcf/io/cloud_util/S3FileUploader.h"
@@ -17,12 +20,14 @@ namespace fbpcf::cloudio {
1720

1821
CloudFileType getCloudFileType(const std::string& filePath) {
1922
// S3 file format:
20-
// 1. https://bucket-name.s3.Region.amazonaws.com/key-name
21-
// 2. https://bucket-name.s3-Region.amazonaws.com/key-name
23+
// 1. https://bucket-name.s3.region.amazonaws.com/key-name
24+
// 2. https://bucket-name.s3-region.amazonaws.com/key-name
2225
// 3. s3://bucket-name/key-name
2326
// GCS file format:
2427
// 1. https://storage.cloud.google.com/bucket-name/key-name
25-
// 2. gs://bucket-name/key-name
28+
// 2. https://bucket-name.storage.googleapis.com/key-name
29+
// 3. https://storage.googleapis.com/bucket-name/key-name
30+
// 4. gs://bucket-name/key-name
2631
static const re2::RE2 s3Regex1(
2732
"https://[a-z0-9.-]+.s3.[a-z0-9-]+.amazonaws.com/.+");
2833
static const re2::RE2 s3Regex2(
@@ -34,9 +39,14 @@ CloudFileType getCloudFileType(const std::string& filePath) {
3439
return CloudFileType::S3;
3540
}
3641

37-
static const re2::RE2 gcsRegex("https://storage.cloud.google.com/.*");
38-
bool isGCSFile =
39-
re2::RE2::FullMatch(filePath, gcsRegex) || filePath.find("gs://", 0) == 0;
42+
static const re2::RE2 gcsRegex1("https://storage.cloud.google.com/.*");
43+
static const re2::RE2 gcsRegex2(
44+
"https://[a-z0-9.-]+.storage.googleapis.com/.+");
45+
static const re2::RE2 gcsRegex3("https://storage.googleapis.com/.*");
46+
bool isGCSFile = re2::RE2::FullMatch(filePath, gcsRegex1) ||
47+
re2::RE2::FullMatch(filePath, gcsRegex2) ||
48+
re2::RE2::FullMatch(filePath, gcsRegex3) ||
49+
filePath.find("gs://", 0) == 0;
4050
if (isGCSFile) {
4151
return CloudFileType::GCS;
4252
}
@@ -64,6 +74,14 @@ std::unique_ptr<IFileUploader> getCloudFileUploader(
6474
fbpcf::aws::S3ClientOption{.region = ref.region})
6575
.getS3Client(),
6676
filePath);
77+
} else if (fileType == CloudFileType::GCS) {
78+
const auto& ref = fbpcf::gcp::uriToObjectReference(filePath);
79+
return std::make_unique<GCSFileUploader>(
80+
fbpcf::cloudio::S3Client::getInstance(
81+
fbpcf::aws::S3ClientOption{
82+
.endpointOverride = "https://storage.googleapis.com/"})
83+
.getS3Client(),
84+
filePath);
6785
} else {
6886
throw fbpcf::PcfException("Not supported yet.");
6987
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
#include "fbpcf/io/cloud_util/GCSFileUploader.h"
9+
#include <aws/s3/model/AbortMultipartUploadRequest.h>
10+
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
11+
#include <aws/s3/model/CompletedMultipartUpload.h>
12+
#include <aws/s3/model/CreateMultipartUploadRequest.h>
13+
#include <aws/s3/model/UploadPartRequest.h>
14+
#include <folly/logging/xlog.h>
15+
#include "fbpcf/aws/S3Util.h"
16+
#include "fbpcf/exception/AwsException.h"
17+
#include "fbpcf/gcp/GCSUtil.h"
18+
19+
namespace fbpcf::cloudio {
20+
21+
static const std::string FILE_TYPE = "text/csv";
22+
static const int MAX_RETRY_COUNT = 3;
23+
24+
void GCSFileUploader::init() {
25+
XLOG(INFO) << "Start multipart upload initialization. ";
26+
const auto& ref = fbpcf::gcp::uriToObjectReference(filePath_);
27+
bucket_ = ref.bucket;
28+
key_ = ref.key;
29+
Aws::S3::Model::CreateMultipartUploadRequest request;
30+
request.SetBucket(bucket_);
31+
request.SetKey(key_);
32+
request.SetContentType(FILE_TYPE);
33+
34+
XLOG(INFO) << "Bucket: " << bucket_ << ", Key: " << key_;
35+
36+
auto createMultipartUploadOutcome =
37+
gcsClient_->CreateMultipartUpload(request);
38+
39+
if (createMultipartUploadOutcome.IsSuccess()) {
40+
uploadId_ = createMultipartUploadOutcome.GetResult().GetUploadId();
41+
XLOG(INFO) << "Multipart upload initialization succeed. Upload id is: "
42+
<< uploadId_;
43+
} else {
44+
XLOG(ERR) << createMultipartUploadOutcome.GetError();
45+
throw AwsException{
46+
"Multipart upload initialization failed: " +
47+
createMultipartUploadOutcome.GetError().GetMessage()};
48+
}
49+
}
50+
51+
int GCSFileUploader::upload(std::vector<char>& buf) {
52+
XLOG(INFO) << "Start uploading part:"
53+
<< "Part number: " << partNumber_ << "\nBucket: " << bucket_
54+
<< "\nKey: " << key_;
55+
Aws::S3::Model::UploadPartRequest request;
56+
request.SetBucket(bucket_);
57+
request.SetKey(key_);
58+
request.SetUploadId(uploadId_);
59+
request.SetPartNumber(partNumber_);
60+
request.SetContentLength(buf.size());
61+
62+
Aws::String str(buf.begin(), buf.end());
63+
auto inputData = Aws::MakeShared<Aws::StringStream>("UploadPartStream", str);
64+
request.SetBody(inputData);
65+
XLOG(INFO) << "Upload stream size: " << str.size();
66+
67+
auto uploadPartResult = gcsClient_->UploadPart(request);
68+
int retryCount = 0;
69+
while (!uploadPartResult.IsSuccess() && retryCount < MAX_RETRY_COUNT) {
70+
XLOG(INFO) << "Upload part " << partNumber_ << " failed. Retrying...";
71+
uploadPartResult = gcsClient_->UploadPart(request);
72+
retryCount++;
73+
}
74+
75+
if (uploadPartResult.IsSuccess()) {
76+
XLOG(INFO) << "Upload part " << partNumber_ << " succeeed.";
77+
Aws::S3::Model::CompletedPart part;
78+
part.SetPartNumber(request.GetPartNumber());
79+
part.SetETag(uploadPartResult.GetResult().GetETag());
80+
completedParts_.push_back(part);
81+
partNumber_++;
82+
return str.size();
83+
} else {
84+
XLOG(INFO) << "Upload part " << partNumber_ << " failed. Aborting...";
85+
abortUpload();
86+
return 0;
87+
}
88+
}
89+
90+
int GCSFileUploader::complete() {
91+
Aws::S3::Model::CompleteMultipartUploadRequest request;
92+
request.SetBucket(bucket_);
93+
request.SetKey(key_);
94+
request.SetUploadId(uploadId_);
95+
request.SetMultipartUpload(
96+
Aws::S3::Model::CompletedMultipartUpload().WithParts(completedParts_));
97+
98+
auto completeMultipartUploadResult =
99+
gcsClient_->CompleteMultipartUpload(request);
100+
if (completeMultipartUploadResult.IsSuccess()) {
101+
XLOG(INFO) << "File " << filePath_ << " uploaded successfully.";
102+
return 0;
103+
} else {
104+
XLOG(ERR) << "File " << filePath_ << " failed to upload.";
105+
XLOG(ERR) << "Error: " << completeMultipartUploadResult.GetError();
106+
abortUpload();
107+
return -1;
108+
}
109+
}
110+
111+
void GCSFileUploader::abortUpload() {
112+
Aws::S3::Model::AbortMultipartUploadRequest abortRequest;
113+
abortRequest.SetBucket(bucket_);
114+
abortRequest.SetKey(key_);
115+
abortRequest.SetUploadId(uploadId_);
116+
auto abortMultipartUploadResult =
117+
gcsClient_->AbortMultipartUpload(abortRequest);
118+
if (abortMultipartUploadResult.IsSuccess()) {
119+
XLOG(INFO) << "Abort upload successed. ";
120+
} else {
121+
XLOG(ERR) << "Abort upload failed. Upload ID: " + uploadId_;
122+
}
123+
}
124+
125+
} // namespace fbpcf::cloudio
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
#pragma once
9+
#include <aws/s3/S3Client.h>
10+
#include <aws/s3/model/CompletedPart.h>
11+
#include <memory>
12+
#include "fbpcf/io/cloud_util/IFileUploader.h"
13+
14+
namespace fbpcf::cloudio {
15+
class GCSFileUploader : public IFileUploader {
16+
public:
17+
explicit GCSFileUploader(
18+
std::shared_ptr<Aws::S3::S3Client> client,
19+
const std::string& filePath)
20+
: gcsClient_{std::move(client)}, filePath_{filePath} {
21+
init();
22+
}
23+
int upload(std::vector<char>& buf) override;
24+
int complete() override;
25+
26+
private:
27+
void init() override;
28+
void abortUpload();
29+
std::shared_ptr<Aws::S3::S3Client> gcsClient_;
30+
const std::string filePath_;
31+
std::string bucket_;
32+
std::string key_;
33+
std::string uploadId_;
34+
std::size_t partNumber_ = 1;
35+
Aws::Vector<Aws::S3::Model::CompletedPart> completedParts_;
36+
};
37+
} // namespace fbpcf::cloudio

fbpcf/io/cloud_util/test/CloudFileUtilTest.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,17 @@ TEST(FileManagerUtilTest, TestGetCloudFileType) {
2626
getCloudFileType("https://storage.cloud.google.com/bucket-name/key-name");
2727
EXPECT_EQ(CloudFileType::GCS, gcsType1);
2828

29-
auto gcsType2 = getCloudFileType("gs://bucket-name/key-name");
29+
auto gcsType2 =
30+
getCloudFileType("https://bucket-name.storage.googleapis.com/key-name");
3031
EXPECT_EQ(CloudFileType::GCS, gcsType2);
3132

33+
auto gcsType3 =
34+
getCloudFileType("https://storage.googleapis.com/bucket-name/key-name");
35+
EXPECT_EQ(CloudFileType::GCS, gcsType3);
36+
37+
auto gcsType4 = getCloudFileType("gs://bucket-name/key-name");
38+
EXPECT_EQ(CloudFileType::GCS, gcsType4);
39+
3240
auto unkonwnType =
3341
getCloudFileType("https://storage.test.com/bucket-name/key-name");
3442
EXPECT_EQ(CloudFileType::UNKNOWN, unkonwnType);

0 commit comments

Comments
 (0)