Skip to content

Commit 1b79d31

Browse files
committed
extract RemoteStore from S3Store so it and SFTP store can use the same logic
1 parent 1446e20 commit 1b79d31

File tree

2 files changed

+104
-68
lines changed

2 files changed

+104
-68
lines changed

provenance/blobstores.py

+72-23
Original file line numberDiff line numberDiff line change
@@ -137,34 +137,26 @@ def delete(self, id):
137137
os.remove(self._filename(id))
138138

139139

140-
class S3Store(BaseBlobStore):
141-
def __init__(self, cachedir, basepath, s3_config=None, s3fs=None,
142-
read=True, write=True, read_through_write=True,
140+
class RemoteStore(BaseBlobStore):
141+
def __init__(self, cachedir, basepath, read=True, write=True, read_through_write=True,
143142
delete=False, on_duplicate_key='skip', cleanup_cachedir=False,
144-
always_check_s3=False):
143+
always_check_remote=False):
145144
"""
146145
Parameters
147146
----------
148-
always_check_s3 : bool
149-
When True S3 will be checked with every __contains__ call. Otherwise it will
147+
always_check_remote : bool
148+
When True the remote store will be checked with every __contains__ call. Otherwise it will
150149
short-circuit if the blob is found in the cachedir. For performance reasons this
151150
should always be set to False. The only reason why you would want to use this
152-
is if you are using a S3Store and a DiskStore in a ChainedStore together for
153-
some reason. Since the S3Store basically doubles as a DiskStore with it's cachedir
151+
is if you are using a RemoteStore and a DiskStore in a ChainedStore together for
152+
some reason. Since the RemoteStore basically doubles as a DiskStore with it's cachedir
154153
chaining the two doesn't really make sense though.
155154
"""
156-
super(S3Store, self).__init__(
155+
super(RemoteStore, self).__init__(
157156
read=read, write=write, read_through_write=read_through_write,
158157
delete=delete, on_duplicate_key=on_duplicate_key)
159158

160-
self.always_check_s3 = always_check_s3
161-
162-
if s3fs:
163-
self.s3fs = s3fs
164-
elif s3_config is not None:
165-
self.s3fs = S3FileSystem(**s3_config)
166-
else:
167-
raise ValueError("You must provide either s3_config or s3fs for a S3Store")
159+
self.always_check = always_check_remote
168160

169161
self.cachedir = _abspath(cachedir)
170162
self.basepath = basepath
@@ -181,12 +173,25 @@ def _filename(self, id):
181173
def _path(self, id):
182174
return os.path.join(self.basepath, id)
183175

176+
def _exists(self, path):
177+
raise NotImplementedError()
178+
179+
def _delete_remote(self, path):
180+
raise NotImplementedError()
181+
182+
def _upload_file(self, filename, path):
183+
raise NotImplementedError()
184+
185+
def _download_file(self, path, dest_filename):
186+
raise NotImplementedError()
187+
184188
def __contains__(self, id):
185189
cs.ensure_contains(self)
186-
if self.always_check_s3:
187-
return self.s3fs.exists(self._path(id))
190+
path = self._path(id)
191+
if self.always_check:
192+
return self._exists(path)
188193
else:
189-
return os.path.exists(self._filename(id)) or self.s3fs.exists(self._path(id))
194+
return os.path.exists(self._filename(id)) or self._exists(path)
190195

191196
def _put_overwrite(self, id, value, serializer, read_through):
192197
cs.ensure_put(self, id, read_through, check_contains=False)
@@ -195,25 +200,69 @@ def _put_overwrite(self, id, value, serializer, read_through):
195200
if not os.path.isfile(filename):
196201
with _atomic_write(filename) as temp:
197202
serializer.dump(value, temp)
198-
self.s3fs.put(filename, self._path(id))
203+
self._upload_file(filename, self._path(id))
199204

200205
def get(self, id, serializer=DEFAULT_VALUE_SERIALIZER, **_kargs):
201206
cs.ensure_read(self)
202207
cs.ensure_present(self, id)
203208
filename = self._filename(id)
204209
if not os.path.exists(filename):
205210
with _atomic_write(filename) as temp:
206-
self.s3fs.get(self._path(id), temp)
211+
self._download_file(self._path(id), temp)
207212
return serializer.load(filename)
208213

209214
def delete(self, id):
210215
cs.ensure_delete(self, id)
211216
filename = self._filename(id)
212217
if os.path.exists(filename):
213218
os.remove(filename)
214-
self.s3fs.rm(self._path(id))
219+
self._delete_remote(self._path(id))
220+
221+
222+
class S3Store(RemoteStore):
223+
def __init__(self, cachedir, basepath, s3_config=None, s3fs=None,
224+
read=True, write=True, read_through_write=True,
225+
delete=False, on_duplicate_key='skip', cleanup_cachedir=False,
226+
always_check_s3=False):
227+
"""
228+
Parameters
229+
----------
230+
always_check_s3 : bool
231+
When True S3 will be checked with every __contains__ call. Otherwise it will
232+
short-circuit if the blob is found in the cachedir. For performance reasons this
233+
should always be set to False. The only reason why you would want to use this
234+
is if you are using a S3Store and a DiskStore in a ChainedStore together for
235+
some reason. Since the S3Store basically doubles as a DiskStore with it's cachedir
236+
chaining the two doesn't really make sense though.
237+
"""
238+
super(S3Store, self).__init__(always_check_remote=always_check_s3,
239+
cachedir = cachedir,
240+
basepath = basepath,
241+
cleanup_cachedir = cleanup_cachedir,
242+
read=read, write=write, read_through_write=read_through_write,
243+
delete=delete, on_duplicate_key=on_duplicate_key)
244+
245+
if s3fs:
246+
self.s3fs = s3fs
247+
elif s3_config is not None:
248+
self.s3fs = S3FileSystem(**s3_config)
249+
else:
250+
raise ValueError("You must provide either s3_config or s3fs for a S3Store")
251+
252+
def _exists(self, path):
253+
return self.s3fs.exists(path)
254+
255+
def _delete_remote(self, path):
256+
self.s3fs.rm(path)
257+
258+
def _upload_file(self, filename, path):
259+
self.s3fs.put(filename, path)
260+
261+
def _download_file(self, remote_path, dest_filename):
262+
self.s3fs.get(remote_path, dest_filename)
215263

216264

265+
217266
class ChainedStore(BaseBlobStore):
218267
def __init__(self, stores, read=True, write=True, read_through_write=True,
219268
delete=True, on_duplicate_key='skip'):

provenance/sftp/__init__.py

+32-45
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,30 @@ def _ssh_client(ssh_config):
1818
return client
1919

2020

21-
class SFTPStore(bs.BaseBlobStore):
22-
def __init__(self, cachedir, basepath, ssh_config=None, ssh_client=None,
23-
sftp_client=None, read=True, write=True,
24-
read_through_write=True, delete=False,
25-
on_duplicate_key='skip', cleanup_cachedir=False):
26-
super(SFTPStore, self).__init__(
27-
read=read, write=write, read_through_write=read_through_write,
28-
delete=delete, on_duplicate_key=on_duplicate_key)
21+
class SFTPStore(bs.RemoteStore):
22+
def __init__(self, cachedir, basepath,
23+
ssh_config=None, ssh_client=None, sftp_client=None,
24+
read=True, write=True, read_through_write=True,
25+
delete=False, on_duplicate_key='skip', cleanup_cachedir=False,
26+
always_check_remote=False):
27+
"""
28+
Parameters
29+
----------
30+
always_check_remote : bool
31+
When True the SFTP server will be checked with every __contains__ call. Otherwise it will
32+
short-circuit if the blob is found in the cachedir. For performance reasons this
33+
should always be set to False. The only reason why you would want to use this
34+
is if you are using a SFTPStore and a DiskStore in a ChainedStore together for
35+
some reason. Since the SFTPStore basically doubles as a DiskStore with it's cachedir
36+
chaining the two doesn't really make sense though.
37+
"""
38+
super(SFTPStore, self).__init__(always_check_remote=always_check_remote,
39+
cachedir = cachedir,
40+
basepath = basepath,
41+
cleanup_cachedir = cleanup_cachedir,
42+
read=read, write=write, read_through_write=read_through_write,
43+
delete=delete, on_duplicate_key=on_duplicate_key)
44+
2945

3046
self.ssh_client = None
3147
if ssh_config is not None:
@@ -41,47 +57,18 @@ def __init__(self, cachedir, basepath, ssh_config=None, ssh_client=None,
4157
return
4258
raise ValueError('You must specify a SFTP client by passing in one of: sftp_client, ssh_config, ssh_client')
4359

44-
self.cachedir = bs._abspath(cachedir)
45-
self.basepath = basepath
46-
self.cleanup_cachedir = cleanup_cachedir
47-
mkdirp(self.cachedir)
48-
49-
def __del__(self):
50-
if self.cleanup_cachedir:
51-
shutil.rmtree(self.cachedir)
52-
53-
def _filename(self, id):
54-
return os.path.join(self.cachedir, id)
55-
56-
def _path(self, id):
57-
return os.path.join(self.basepath, id)
58-
59-
def __contains__(self, id):
60-
cs.ensure_contains(self)
60+
def _exists(self, path):
6161
try:
62-
self.sftp_client.stat(self._path(id))
62+
self.sftp_client.stat(path)
6363
return True
6464
except FileNotFoundError:
6565
return False
6666

67-
def _put_overwrite(self, id, value, serializer, read_through):
68-
cs.ensure_put(self, id, read_through, check_contains=False)
69-
filename = self._filename(id)
70-
# not already saved by DiskStore?
71-
if not os.path.isfile(filename):
72-
with bs._atomic_write(filename) as temp:
73-
serializer.dump(value, temp)
74-
self.sftp_client.put(filename, self._path(id))
67+
def _delete_remote(self, path):
68+
self.sftp_client.remove(path)
7569

76-
def get(self, id, serializer=DEFAULT_VALUE_SERIALIZER, **_kargs):
77-
cs.ensure_read(self)
78-
cs.ensure_present(self, id)
79-
filename = self._filename(id)
80-
if not os.path.exists(filename):
81-
with bs._atomic_write(filename) as temp:
82-
self.sftp_client.get(self._path(id), temp)
83-
return serializer.load(filename)
70+
def _upload_file(self, filename, path):
71+
self.sftp_client.put(filename, path)
8472

85-
def delete(self, id):
86-
cs.ensure_delete(self, id)
87-
self.sftp_client.remove(self._path(id))
73+
def _download_file(self, remote_path, dest_filename):
74+
self.sftp_client.get(remote_path, dest_filename)

0 commit comments

Comments
 (0)