Skip to content

Commit 585e531

Browse files
committed
support s3's fallback
1 parent f0cdfb9 commit 585e531

File tree

3 files changed

+35
-29
lines changed

3 files changed

+35
-29
lines changed

alluxiofs/client/core.py

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,9 @@ def _check_response(self, response):
234234
self.logger.debug(
235235
f"Alluxio server returned {response.status_code}: {message}"
236236
)
237-
238-
if response.status_code == 404:
237+
if response.status_code == 104:
238+
raise ConnectionResetError("Connection reset by peer")
239+
elif response.status_code == 404:
239240
raise FileNotFoundError(message)
240241
elif response.status_code == 400:
241242
raise ValueError(message)
@@ -826,23 +827,14 @@ def get_ufs_info_from_worker(self):
826827
worker_host = self.config.load_balance_domain
827828
else:
828829
worker_host = self.loadbalancer.get_worker().host
829-
try:
830-
url = GET_UFS_SECRET_INFO.format(
831-
domain=worker_host,
832-
http_port=self.config.worker_http_port,
833-
)
834-
response = requests.get(url)
835-
response.raise_for_status()
836-
info = response.content
837-
return info
838-
except Exception as e:
839-
raise Exception(
840-
EXCEPTION_CONTENT.format(
841-
worker_host=worker_host,
842-
http_port=self.config.worker_http_port,
843-
error=f"Error when get ufsInfo, {e}",
844-
)
845-
)
830+
url = GET_UFS_SECRET_INFO.format(
831+
domain=worker_host,
832+
http_port=self.config.worker_http_port,
833+
)
834+
response = requests.get(url)
835+
self._check_response(response)
836+
info = response.content
837+
return info
846838

847839
def _all_chunk_generator(
848840
self, worker_host, worker_http_port, path_id, file_path, chunk_size
@@ -881,10 +873,7 @@ def _all_chunk_generator(
881873
url_chunk, headers=headers, stream=True
882874
) as response:
883875
# Check for connection reset error (status code 104)
884-
if response.status_code == 104:
885-
raise ConnectionResetError("Connection reset by peer")
886-
887-
response.raise_for_status()
876+
self._check_response(response)
888877
for chunk in response.iter_content(chunk_size=chunk_size):
889878
if chunk:
890879
out.write(chunk)
@@ -1727,7 +1716,7 @@ def _get_preferred_worker_address(self, full_ufs_path):
17271716
)
17281717
try:
17291718
response = requests.get(url)
1730-
response.raise_for_status()
1719+
self._check_response(response)
17311720
data = json.loads(response.content)
17321721
ip = data["Host"]
17331722
port = data["HttpServerPort"]
@@ -2228,7 +2217,7 @@ def _get_preferred_worker_host(self, full_ufs_path):
22282217
)
22292218
try:
22302219
response = requests.get(url)
2231-
response.raise_for_status()
2220+
self._check_response(response)
22322221
data = json.loads(response.content)
22332222
ip = data["Host"]
22342223
except HTTPError:

alluxiofs/client/ufs_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,10 @@ def must_get_ufs_from_path(self, path: str):
223223
ufs = self.get_ufs_from_cache(path)
224224
if ufs is None:
225225
self.logger.error(
226-
f"No registered UFS found in alluxio for path: {path}"
226+
f"No registered UFS found in alluxio python sdk for path: {path}"
227227
)
228228
raise ValueError(
229-
f"No registered UFS found in alluxio for path: {path}"
229+
f"No registered UFS found in alluxio python sdk for path: {path}"
230230
)
231231
return ufs
232232

alluxiofs/client/utils.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,22 @@
2424
"endpoint": "endpoint",
2525
}
2626

27+
S3_SETUP_OPTIONS_MAP = {
28+
"access_key": "key",
29+
"secret_key": "secret",
30+
"endpoint": "endpoint_url",
31+
}
32+
2733

2834
def convert_ufs_info_to(ufs, info):
2935
if ufs == "oss":
3036
res = {OSS_SETUP_OPTIONS_MAP[k]: info[k] for k in info}
37+
elif ufs == "s3" or ufs == "s3a":
38+
res = {
39+
S3_SETUP_OPTIONS_MAP[k]: info[k]
40+
for k in info
41+
if k in S3_SETUP_OPTIONS_MAP
42+
}
3143
else:
3244
res = info
3345
return res
@@ -37,13 +49,14 @@ def parameters_adapter(fs, fs_method, final_kwargs):
3749
"""Adapts parameters for different filesystems and methods."""
3850
adapted_kwargs = final_kwargs.copy()
3951
method_name = getattr(fs_method, "__name__", str(fs_method))
52+
protocols = (fs.protocol,) if isinstance(fs.protocol, str) else fs.protocol
4053

41-
if fs.protocol == "bos":
54+
if "bos" in protocols:
4255
if method_name in ["cp_file"]:
4356
adapted_kwargs["src_path"] = final_kwargs.pop("path1")
4457
adapted_kwargs["target_path"] = final_kwargs.pop("path2")
4558

46-
if fs.protocol == "oss":
59+
if "oss" in protocols:
4760
if method_name in ["pipe_file"]:
4861
headers = {}
4962
mode = adapted_kwargs.pop("mode", "overwrite")
@@ -54,6 +67,10 @@ def parameters_adapter(fs, fs_method, final_kwargs):
5467
# This will cause the upload to fail if the file exists
5568
headers["x-oss-forbid-overwrite"] = "true"
5669

70+
if any(p in ["s3", "s3a"] for p in protocols):
71+
if method_name in ["_pipe_file", "pipe_file"]:
72+
adapted_kwargs["data"] = final_kwargs.pop("value")
73+
5774
return adapted_kwargs
5875

5976

0 commit comments

Comments
 (0)