-
Notifications
You must be signed in to change notification settings - Fork 2.6k
/
Copy pathstorage.py
142 lines (121 loc) · 5.68 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
"""
import logging
import os
import threading
from urllib.parse import unquote, urlsplit, urlunsplit
import google.auth
from django.conf import settings
from django.contrib.staticfiles.storage import ManifestStaticFilesStorage
from storages.backends.azure_storage import AzureStorage
from storages.backends.gcloud import GoogleCloudStorage, _quote, clean_name
from storages.backends.s3boto3 import S3Boto3Storage
logger = logging.getLogger(__name__)
class SkipMissedManifestStaticFilesStorage(ManifestStaticFilesStorage):
"""We need this class to escape missing files from
django.contrib.staticfiles.finders.FileSystemFinder:
this class tries to find js/css/png/jpg/... inside of you js/css/...
"""
# Disable strict cache manifest checking
manifest_strict = False
def hashed_name(self, name, content=None, filename=None):
# `filename` is the name of file to hash if `content` isn't given.
# `name` is the base name to construct the new hashed filename from.
parsed_name = urlsplit(unquote(name))
clean_name = parsed_name.path.strip()
filename = (filename and urlsplit(unquote(filename)).path.strip()) or clean_name
opened = content is None
if opened:
if not self.exists(filename):
return ''
try:
content = self.open(filename)
except IOError:
# Handle directory paths and fragments
return name
try:
file_hash = self.file_hash(clean_name, content)
finally:
if opened:
content.close()
path, filename = os.path.split(clean_name)
root, ext = os.path.splitext(filename)
if file_hash is not None:
file_hash = '.%s' % file_hash
hashed_name = os.path.join(path, '%s%s%s' % (root, file_hash, ext))
unparsed_name = list(parsed_name)
unparsed_name[2] = hashed_name
# Special casing for a @font-face hack, like url(myfont.eot?#iefix")
# http://www.fontspring.com/blog/the-new-bulletproof-font-face-syntax
if '?#' in name and not unparsed_name[3]:
unparsed_name[2] += '?'
return urlunsplit(unparsed_name)
class StorageProxyMixin:
def url(self, name, storage_url=False, *args, **kwargs):
if storage_url is True:
return super().url(name, *args, **kwargs)
return f'{settings.HOSTNAME}/storage-data/uploaded/?filepath={name}'
class CustomS3Boto3Storage(StorageProxyMixin, S3Boto3Storage):
def url(self, name, storage_url=False, *args, **kwargs):
if storage_url is True:
return super().url(name, *args, **kwargs)
return f"s3://{settings.AWS_STORAGE_BUCKET_NAME}/{name}"
class CustomAzureStorage(StorageProxyMixin, AzureStorage):
pass
class AlternativeGoogleCloudStorageBase(GoogleCloudStorage):
"""A subclass to force the use of the IAM signBlob API
This allows the signing of blob URLs without having to use a credential file.
The service account must have the iam.serviceAccounts.signBlob permission."""
def __init__(self, **settings):
super().__init__(**settings)
self._signing_credentials = None
self._signing_credentials_lock = threading.Lock()
def url(self, name):
"""
Return public url or a signed url for the Blob.
This DOES NOT check for existence of Blob - that makes codes too slow
for many use cases.
Overridden to force the use of the IAM signBlob API.
See https://github.com/googleapis/python-storage/blob/519074112775c19742522158f612b467cf590219/google/cloud/storage/_signing.py#L628 # NOQA
"""
name = self._normalize_name(clean_name(name))
blob = self.bucket.blob(name)
blob_params = self.get_object_parameters(name)
no_signed_url = blob_params.get('acl', self.default_acl) == 'publicRead' or not self.querystring_auth
if not self.custom_endpoint and no_signed_url:
return blob.public_url
elif no_signed_url:
out = '{storage_base_url}/{quoted_name}'.format(
storage_base_url=self.custom_endpoint,
quoted_name=_quote(name, safe=b'/~'),
)
return out
elif not self.custom_endpoint:
out2 = blob.generate_signed_url(expiration=self.expiration, version='v4', **self._get_signing_kwargs())
return out2
else:
out3 = blob.generate_signed_url(
bucket_bound_hostname=self.custom_endpoint,
expiration=self.expiration,
version='v4',
**self._get_signing_kwargs(),
)
return out3
def _get_signing_credentials(self):
with self._signing_credentials_lock:
if self._signing_credentials is None or self._signing_credentials.expired:
credentials, _ = google.auth.default(['https://www.googleapis.com/auth/cloud-platform'])
auth_req = google.auth.transport.requests.Request()
credentials.refresh(auth_req)
self._signing_credentials = credentials
return self._signing_credentials
def _get_signing_kwargs(self):
credentials = self._get_signing_credentials()
out = {
'service_account_email': credentials.service_account_email,
'access_token': credentials.token,
'credentials': credentials,
}
return out
class AlternativeGoogleCloudStorage(StorageProxyMixin, AlternativeGoogleCloudStorageBase):
pass