Skip to content

Commit 33df52f

Browse files
author
HappyUncle
committed
feat(storage): support rclone
1 parent 1fc8cc7 commit 33df52f

File tree

2 files changed

+351
-0
lines changed

2 files changed

+351
-0
lines changed

pghoard/rohmu/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ def get_class_for_transfer(obj_store):
2626
elif storage_type == "s3":
2727
from .object_storage.s3 import S3Transfer
2828
return S3Transfer
29+
elif storage_type == "rclone":
30+
from .object_storage.rclone import RCloneTransfer
31+
return RCloneTransfer
2932
elif storage_type == "swift":
3033
from .object_storage.swift import SwiftTransfer
3134
return SwiftTransfer
Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
"""
2+
rohmu - rclone
3+
4+
Copyright (c) 2016 Ohmu Ltd
5+
See LICENSE for details
6+
"""
7+
from ..errors import FileNotFoundFromStorageError, InvalidConfigurationError, StorageError
8+
from .base import BaseTransfer, get_total_memory, KEY_TYPE_PREFIX, KEY_TYPE_OBJECT, IterKeyItem
9+
import json
10+
import subprocess
11+
from io import BytesIO, StringIO
12+
import datetime # for general datetime object handling
13+
# import rfc3339 # for date object -> date string
14+
import iso8601 # for date string -> date object
15+
16+
17+
def calculate_chunk_size():
18+
total_mem_mib = get_total_memory() or 0
19+
# At least 5 MiB, at most 524 MiB. Max block size used for hosts with ~300+ GB of memory
20+
return max(min(int(total_mem_mib / 600), 524), 120) * 1024 * 1024
21+
22+
23+
MULTIPART_CHUNK_SIZE = calculate_chunk_size()
24+
25+
26+
def exec_cmd(cmd):
27+
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
28+
stdout, stderr = proc.communicate()
29+
if proc.returncode != 0:
30+
raise Exception("cmd [%s], stdout [%s], error [%s], code [%s]" % (cmd, stdout, stderr, proc.returncode))
31+
return stdout, stderr
32+
33+
34+
def exec_cmd_to_stdout(cmd):
35+
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
36+
return proc.stdout
37+
38+
39+
def exec_cmd_from_stdout(cmd, src_fd, progress_fn=None):
40+
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
41+
bytes_sent = 0
42+
while True:
43+
content = src_fd.read(MULTIPART_CHUNK_SIZE)
44+
if len(content) == 0:
45+
stdout, stderr = proc.communicate()
46+
break
47+
proc.stdin.write(content)
48+
bytes_sent += len(content)
49+
if progress_fn:
50+
progress_fn(bytes_sent)
51+
if proc.returncode != 0:
52+
raise Exception("cmd [%s], stdout [%s], error [%s], code [%s]" % (cmd, stdout, stderr, proc.returncode))
53+
return stdout, stderr
54+
55+
56+
def new_rclone_dest_key(dest, dest_path, key):
57+
return "%s:%s/%s" % (dest, dest_path, key)
58+
59+
60+
class RCloneClient:
61+
62+
def __init__(self, conf_path):
63+
self.config_path = conf_path
64+
self.base_cmd = ["rclone", "--config", self.config_path]
65+
pass
66+
67+
def new_cmd(self, keys):
68+
cmd = self.base_cmd[:] + keys[:]
69+
return cmd
70+
71+
# List directories and objects in the path in JSON format.
72+
def head_object(self, key):
73+
info = self.list_objects(key)
74+
if len(info) == 0:
75+
return None
76+
return info[0]
77+
78+
def get_object_size(self, key):
79+
info = self.list_objects(key)
80+
if len(info) == 0:
81+
return 0
82+
return int(info[0]['Size'])
83+
84+
# Remove the contents of path.
85+
def delete_object(self, key):
86+
cmd = self.new_cmd(["deletefile", key])
87+
exec_cmd(cmd)
88+
89+
# List directories and objects in the path in JSON format.
90+
def list_objects(self, key, deep=False):
91+
if deep:
92+
cmd = self.new_cmd(["lsjson", key, "--recursive"])
93+
else:
94+
cmd = self.new_cmd(["lsjson", key])
95+
stdout, stderr = exec_cmd(cmd)
96+
return json.loads(stdout)
97+
98+
# Copies standard input to file on remote.
99+
def put_object(self, src_fd, dest, progress_fn=None):
100+
cmd = self.new_cmd(["rcat", dest])
101+
exec_cmd_from_stdout(cmd, src_fd, progress_fn)
102+
103+
# Copy files from source to dest, skipping already copied.
104+
def copy_object(self, src, dest):
105+
cmd = self.new_cmd(["copyto", src, dest])
106+
exec_cmd(cmd)
107+
108+
# Concatenates any files and sends them to stdout.
109+
def get_object_stream(self, key):
110+
cmd = self.new_cmd(["cat", key])
111+
fd = exec_cmd_to_stdout(cmd)
112+
113+
info = self.list_objects(key)
114+
if len(info) == 0:
115+
length = 0
116+
else:
117+
length = int(info[0]['Size'])
118+
return fd, length
119+
120+
def get_object_content(self, key):
121+
info = self.head_object(key)
122+
if info is None:
123+
return None, 0
124+
length = int(info['Size'])
125+
126+
cmd = self.new_cmd(["cat", key])
127+
stdout, stderr = exec_cmd(cmd)
128+
return stdout, length
129+
130+
131+
class RCloneTransfer(BaseTransfer):
132+
133+
def __init__(self,
134+
remote_clone_config_path,
135+
source,
136+
destination,
137+
destination_path,
138+
prefix=None):
139+
super().__init__(prefix=prefix)
140+
self.remote_clone_client = RCloneClient(remote_clone_config_path)
141+
self.source = source
142+
self.destination = destination
143+
self.destination_path = destination_path
144+
self.log.debug("RCloneTransfer initialized")
145+
146+
# data from file, and file is so big, need split file
147+
def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimetype=None, upload_progress_fn=None):
148+
target_path = self.format_key_for_backend(key.strip("/"))
149+
metadata_path = target_path + ".metadata"
150+
self.log.debug("Save file: %r, %r", target_path, metadata_path)
151+
152+
k = new_rclone_dest_key(self.destination, self.destination_path, target_path)
153+
self.remote_clone_client.put_object(fd, k, upload_progress_fn)
154+
155+
bio = BytesIO(json.dumps(self.sanitize_metadata(metadata)).encode())
156+
k = new_rclone_dest_key(self.destination, self.destination_path, metadata_path)
157+
self.remote_clone_client.put_object(bio, k)
158+
159+
# no use
160+
def store_file_from_disk(self, key, filepath, metadata=None, multipart=None, cache_control=None, mimetype=None):
161+
target_path = self.format_key_for_backend(key.strip("/"))
162+
self.log.debug("Save file from disk: %r", target_path)
163+
164+
with open(filepath, "rb") as fp:
165+
self.store_file_object(key, fp, metadata=metadata, cache_control=cache_control, mimetype=mimetype)
166+
167+
# data from var string
168+
def store_file_from_memory(self, key, memstring, metadata=None, cache_control=None, mimetype=None):
169+
target_path = self.format_key_for_backend(key.strip("/"))
170+
self.log.debug("Save file from memory: %r", target_path)
171+
172+
bio = BytesIO(memstring)
173+
self.store_file_object(key, bio, metadata=metadata, cache_control=cache_control, mimetype=mimetype)
174+
175+
@staticmethod
176+
def _skip_file_name(file_name):
177+
return file_name.startswith(".") or file_name.endswith(".metadata") or ".metadata_tmp" in file_name
178+
179+
def iter_key(self, key, *, with_metadata=True, deep=False, include_key=False):
180+
# add prefix for key
181+
target_path = self.format_key_for_backend(key.strip("/"))
182+
try:
183+
# get all dir and obj
184+
rclone_target_path = new_rclone_dest_key(self.destination, self.destination_path, target_path)
185+
response = self.remote_clone_client.list_objects(rclone_target_path, deep)
186+
187+
# check dir and obj
188+
for item in response:
189+
# skip file
190+
if self._skip_file_name(item['Path']):
191+
continue
192+
193+
# full file key
194+
# when object and bucket using same name, rclone select object first
195+
file_key = (target_path + "/" if len(target_path) != 0 else "") + item["Path"]
196+
objs = self.remote_clone_client.list_objects(
197+
new_rclone_dest_key(self.destination, self.destination_path, file_key)
198+
)
199+
if len(objs) == 0:
200+
if include_key is False:
201+
continue
202+
file_key = target_path
203+
204+
# check dir
205+
if item['IsDir'] is True:
206+
yield IterKeyItem(
207+
type=KEY_TYPE_PREFIX,
208+
value=file_key,
209+
)
210+
continue
211+
212+
# check obj
213+
if with_metadata:
214+
try:
215+
metadata_path = (key.strip("/") + "/" if len(key.strip("/")) != 0 else "") + item["Path"]
216+
metadata = self.get_metadata_for_key(metadata_path)
217+
except FileNotFoundFromStorageError as ex:
218+
self.log.debug("get metadata file error %s", ex)
219+
metadata = None
220+
pass
221+
else:
222+
metadata = None
223+
224+
yield IterKeyItem(
225+
type=KEY_TYPE_OBJECT,
226+
value={
227+
"last_modified": iso8601.parse_date(item["ModTime"]).astimezone(tz=datetime.timezone.utc),
228+
"metadata": metadata,
229+
"name": file_key,
230+
"size": item["Size"],
231+
},
232+
)
233+
except Exception as ex:
234+
self.log.debug("itr_key error %s", ex)
235+
return
236+
237+
def get_metadata_for_key(self, key, trailing_slash=False):
238+
source_path = self.format_key_for_backend(key.strip("/"), trailing_slash=trailing_slash)
239+
metadata_path = source_path + ".metadata"
240+
self.log.debug("Get metadata: %r", metadata_path)
241+
242+
k = new_rclone_dest_key(self.destination, self.destination_path, metadata_path)
243+
stdout, length = self.remote_clone_client.get_object_content(k)
244+
if stdout is None:
245+
raise FileNotFoundFromStorageError(key)
246+
return json.loads(stdout)
247+
248+
# unit is Byte
249+
def get_file_size(self, key):
250+
key = self.format_key_for_backend(key, remove_slash_prefix=True)
251+
self.log.debug("Get file size: %r", key)
252+
253+
k = new_rclone_dest_key(self.destination, self.destination_path, key)
254+
response = self.remote_clone_client.list_objects(k)
255+
if len(response) == 0:
256+
raise FileNotFoundFromStorageError(key)
257+
258+
k = new_rclone_dest_key(self.destination, self.destination_path, key)
259+
response = self.remote_clone_client.get_object_size(k)
260+
return response
261+
262+
def get_contents_to_stream(self, key):
263+
target_key = self.format_key_for_backend(key, remove_slash_prefix=True)
264+
self.log.debug("Get content to stream: %r", target_key)
265+
266+
k = new_rclone_dest_key(self.destination, self.destination_path, target_key)
267+
response = self.remote_clone_client.list_objects(k)
268+
if len(response) == 0:
269+
raise FileNotFoundFromStorageError(key)
270+
271+
k = new_rclone_dest_key(self.destination, self.destination_path, target_key)
272+
response, _ = self.remote_clone_client.get_object_stream(k)
273+
metadata = self.get_metadata_for_key(key)
274+
return response, metadata
275+
276+
def get_contents_to_string(self, key):
277+
response, metadata = self.get_contents_to_stream(key)
278+
return response.read(), metadata
279+
280+
def get_contents_to_fileobj(self, key, fileobj_to_store_to, *, progress_callback=None):
281+
stream, metadata = self.get_contents_to_stream(key)
282+
length = self.get_file_size(key)
283+
self._read_object_to_fileobj(fileobj_to_store_to, stream, length, cb=progress_callback)
284+
return metadata
285+
286+
# no use
287+
def get_contents_to_file(self, key, filepath_to_store_to, *, progress_callback=None):
288+
with open(filepath_to_store_to, "wb") as fh:
289+
return self.get_contents_to_fileobj(key, fh)
290+
291+
def delete_key(self, key):
292+
target_path = self.format_key_for_backend(key, remove_slash_prefix=True)
293+
metadata_path = target_path + ".metadata"
294+
self.log.debug("Deleting key: %r, %r", target_path, metadata_path)
295+
296+
k = new_rclone_dest_key(self.destination, self.destination_path, target_path)
297+
infos = self.remote_clone_client.list_objects(k)
298+
if len(infos) == 0:
299+
raise FileNotFoundFromStorageError(key)
300+
301+
self.remote_clone_client.delete_object(
302+
new_rclone_dest_key(self.destination, self.destination_path, target_path)
303+
)
304+
self.remote_clone_client.delete_object(
305+
new_rclone_dest_key(self.destination, self.destination_path, metadata_path)
306+
)
307+
308+
# for small file, just copy
309+
def copy_file(self, *, source_key, destination_key, metadata=None, **_kwargs):
310+
source_path = self.format_key_for_backend(source_key.strip("/"))
311+
destination_path = self.format_key_for_backend(destination_key.strip("/"))
312+
self.log.debug("Copy file from %r -> %r", source_path, destination_path)
313+
314+
k = new_rclone_dest_key(self.destination, self.destination_path, source_path)
315+
infos = self.remote_clone_client.list_objects(k)
316+
if len(infos) == 0:
317+
raise FileNotFoundFromStorageError(source_key)
318+
319+
self.remote_clone_client.copy_object(
320+
new_rclone_dest_key(self.destination, self.destination_path, source_path),
321+
new_rclone_dest_key(self.destination, self.destination_path, destination_path)
322+
)
323+
324+
if metadata is None:
325+
metadata = self.get_metadata_for_key(source_key)
326+
327+
metadata_path = destination_path + ".metadata"
328+
self.log.debug("Save metadata: %r", metadata_path)
329+
330+
bio = BytesIO(json.dumps(self.sanitize_metadata(metadata)).encode())
331+
k = new_rclone_dest_key(self.destination, self.destination_path, metadata_path)
332+
self.remote_clone_client.put_object(bio, k)
333+
334+
def _read_object_to_fileobj(self, fileobj, streaming_body, body_length, cb=None):
335+
data_read = 0
336+
while data_read < body_length:
337+
read_amount = body_length - data_read
338+
if read_amount > MULTIPART_CHUNK_SIZE:
339+
read_amount = MULTIPART_CHUNK_SIZE
340+
data = streaming_body.read(read_amount)
341+
if len(data) != read_amount:
342+
raise StorageError("Rclone read data error, need %d but %d" % (read_amount, len(data)))
343+
fileobj.write(data)
344+
data_read += len(data)
345+
if cb:
346+
cb(data_read, body_length)
347+
if cb:
348+
cb(data_read, body_length)

0 commit comments

Comments
 (0)