Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion examples/s3/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def index():
s3_client.upload_fileobj(
BytesIO(b"abcdef"),
"bucket",
"some-directory/some-file",
"some-directory/some-file-2",
ExtraArgs={"ContentType": "text/plain"},
)

Expand All @@ -66,6 +66,16 @@ def index():
)
)

admin.add_view(
S3FileAdmin(
bucket_name=bucket_name,
s3_client=s3_client,
name="S3 with Prefix",
prefix="some-directory/",
endpoint="s3-with-prefix",
)
)

# Add Local Directory view
admin.add_view(FileAdmin("localdir", name="Local Dir"))

Expand Down
10 changes: 6 additions & 4 deletions examples/s3/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 33 additions & 17 deletions flask_admin/contrib/fileadmin/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ class MyS3Admin(BaseFileAdmin):
fileadmin_view = MyS3Admin(storage=S3Storage(...))
"""

def __init__(self, s3_client: BaseClient, bucket_name: str) -> None:
def __init__(
self, s3_client: BaseClient, bucket_name: str, prefix: str = ""
) -> None:
"""
Constructor

Expand All @@ -85,41 +87,47 @@ def __init__(self, s3_client: BaseClient, bucket_name: str) -> None:
self.s3_client = s3_client
self.bucket_name = bucket_name
self.separator = "/"
prefix = self.normpath(prefix).lstrip("/")
prefix = "" if prefix in [".", "./"] else prefix
if prefix and not prefix.endswith(self.separator):
prefix += self.separator
self.prefix = prefix

@_strip_leading_slash_from("path")
def get_files(self, path: str, directory: str) -> list[t.Any]:
def _strip_path(name: str, path: str) -> str:
@_strip_leading_slash_from("directory")
def get_files(self, directory: str, path: str) -> list[t.Any]:
def _strip_prefix(name: str) -> str:
if name.startswith(path):
return name.replace(path, "", 1)
return name

def _remove_trailing_slash(name: str) -> str:
return name[:-1]
def _rcoat(x: str) -> str:
return x.ljust(len(x) + 1, "/") if not x.endswith("/") else x

files = []
directories = []
if path and not path.endswith(self.separator):
path += self.separator
directory = _rcoat(directory) if directory else ""
path = _rcoat(path) if path else ""

try:
paginator = self.s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(
Bucket=self.bucket_name, Prefix=path, Delimiter=self.separator
):
for common_prefix in page.get("CommonPrefixes", []):
name = _remove_trailing_slash(
_strip_path(common_prefix["Prefix"], path)
name = common_prefix["Prefix"].removeprefix(path).rstrip("/")
rel_path = (
common_prefix["Prefix"].removeprefix(self.prefix).rstrip("/")
)
key_name = _remove_trailing_slash(common_prefix["Prefix"])
directories.append((name, key_name, True, 0, 0))
directories.append((name, rel_path, True, 0, 0))

for obj in page.get("Contents", []):
if obj["Key"] == path:
continue

last_modified = int(obj["LastModified"].timestamp())
name = _strip_path(obj["Key"], path)
files.append((name, obj["Key"], False, obj["Size"], last_modified))
name = obj["Key"].removeprefix(path)
rel_path = obj["Key"].removeprefix(self.prefix)
files.append((name, rel_path, False, obj["Size"], last_modified))

except ClientError as e:
raise ValueError(f"Failed to list files: {e}") from e
Expand Down Expand Up @@ -166,10 +174,10 @@ def path_exists(self, path: str) -> bool:
if path == "":
return True
keys = self._get_path_keys(path)
return path in keys or (path + self.separator) in keys
return any([k.startswith(path) for k in keys])

def get_base_path(self) -> str:
return ""
return self.prefix

@_strip_leading_slash_from("path")
def get_breadcrumbs(self, path: str) -> list[tuple[str, str]]:
Expand Down Expand Up @@ -288,6 +296,12 @@ class S3FileAdmin(BaseFileAdmin):
:param bucket_name:
Name of the bucket that the files are on.

:param prefix:
Optional prefix to use within the bucket. Note that this is different
from the `base_path` parameter of the BaseFileAdmin, which is handled
internally by the S3Storage. Prefix must be specified without leading
slash and will be normalized to end with a slash.

Sample usage::

from flask_admin import Admin
Expand All @@ -305,8 +319,10 @@ def __init__(
self,
s3_client: BaseClient,
bucket_name: str,
prefix: str = "",
*args: t.Any,
**kwargs: t.Any,
) -> None:
storage = S3Storage(s3_client, bucket_name)
storage = S3Storage(s3_client, bucket_name, prefix=prefix)

super().__init__(*args, storage=storage, **kwargs) # type: ignore[misc]
142 changes: 142 additions & 0 deletions flask_admin/tests/fileadmin/test_fileadmin_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,145 @@ class MyFileAdmin(fileadmin_class): # type: ignore[valid-type, misc]
# Test access to deep path
rv = client.get("/admin/myfileadmin/b/xx/yy/zz")
assert rv.status_code == 200

def test_prefix(self, app, admin, mock_s3_client):
fileadmin_class = self.fileadmin_class()
fileadmin_args, fileadmin_kwargs = self.fileadmin_args()

class MyFileAdmin(fileadmin_class): # type: ignore[valid-type, misc]
pass

view_kwargs = dict(fileadmin_kwargs)
view_kwargs["prefix"] = "xx/yy/"
view_kwargs.setdefault("name", "Files")
view = MyFileAdmin(*fileadmin_args, **view_kwargs)

# create deep path
view.storage.make_dir("", "xx/yy/zz/")

admin.add_view(view)

client = app.test_client()

# upload to deep path
rv = client.post(
"/admin/myfileadmin/upload/zz/",
data=dict(upload=(BytesIO(b"test content"), "test_upload.txt")),
follow_redirects=True,
)
assert rv.status_code == 200
assert "Successfully saved file: test_upload.txt" in rv.text

rv = client.get("/admin/myfileadmin/b/zz/")
assert rv.status_code == 200
assert "path=zz/test_upload.txt" in rv.text

# rename in deep path
rv = client.post(
"/admin/myfileadmin/rename/?path=zz/test_upload.txt",
data=dict(name="dummy.txt", path="zz/test_upload.txt"),
)
assert rv.status_code == 302

rv = client.get("/admin/myfileadmin/b/zz/")
assert rv.status_code == 200
assert "path=zz/dummy.txt" in rv.text
assert "path=test_upload.txt" not in rv.text

# download from deep path
rv = client.get("/admin/myfileadmin/download/zz/dummy.txt")
assert rv.status_code == 302
assert rv.headers["Location"].startswith(
"https://my-bucket.s3.amazonaws.com/xx/yy/zz/dummy.txt?AWSAccessKeyId=FOOBARKEY"
)

# delete
rv = client.post(
"/admin/myfileadmin/delete",
data=dict(path="zz/dummy.txt"),
follow_redirects=True,
)
assert rv.status_code == 200
assert "successfully deleted" in rv.text

@pytest.mark.parametrize(
"prefix, res_code",
[
("xx/yy", 200),
("xx/yy/", 200),
("/xx/yy", 200),
("/xx/yy/", 200),
("/xx//yy/", 200),
("xx\\yy", 404),
("/xx\\yy/", 404),
("xx/../xx/yy", 200),
("xx/../xx//yy", 200),
],
)
def test_base_path(self, app, admin, mock_s3_client, prefix, res_code):
fileadmin_class = self.fileadmin_class()
fileadmin_args, fileadmin_kwargs = self.fileadmin_args()

s3 = fileadmin_args[0]
s3.upload_fileobj(BytesIO(b""), _bucket_name, "xx/yy/zz/dummy2.txt")

class MyFileAdmin(fileadmin_class): # type: ignore[valid-type, misc]
pass

view_kwargs = dict(fileadmin_kwargs)
view_kwargs["prefix"] = prefix
view_kwargs.setdefault("name", "Files")
view = MyFileAdmin(*fileadmin_args, **view_kwargs)

admin.add_view(view)

client = app.test_client()

# actual s3 prefix is xx/yy/
rv = client.get("/admin/myfileadmin/")
assert rv.status_code == res_code
if res_code == 200:
assert "dummy2.txt" not in rv.data.decode("utf-8")

rv = client.get("/admin/myfileadmin/b/zz/")
assert rv.status_code == res_code
if res_code == 200:
assert "dummy2.txt" in rv.data.decode("utf-8")

rv = client.get("/admin/myfileadmin/b/xx/zz/")
assert rv.status_code == 404

@pytest.mark.parametrize(
"prefix",
["", ".", "/", "./"],
)
def test_base_path_root(self, app, admin, mock_s3_client, prefix):
fileadmin_class = self.fileadmin_class()
fileadmin_args, fileadmin_kwargs = self.fileadmin_args()

s3 = fileadmin_args[0]
s3.upload_fileobj(BytesIO(b""), _bucket_name, "xx/yy/zz/dummy2.txt")

class MyFileAdmin(fileadmin_class): # type: ignore[valid-type, misc]
pass

view_kwargs = dict(fileadmin_kwargs)
view_kwargs["prefix"] = prefix
view_kwargs.setdefault("name", "Files")
view = MyFileAdmin(*fileadmin_args, **view_kwargs)

admin.add_view(view)

client = app.test_client()

rv = client.get("/admin/myfileadmin/")
assert rv.status_code == 200
assert "dummy2.txt" not in rv.data.decode("utf-8")

rv = client.get("/admin/myfileadmin/b/xx/yy/")
assert rv.status_code == 200
assert "dummy2.txt" not in rv.data.decode("utf-8")

rv = client.get("/admin/myfileadmin/b/xx/yy/zz")
assert rv.status_code == 200
assert "dummy2.txt" in rv.data.decode("utf-8")