Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/src/env-vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ Boolean `0` or `1`

`1`

### RUNAI_STREAMER_S3_UNSIGNED

Enables unsigned (anonymous) requests to S3. Use this when accessing public S3 buckets that do not require authentication.

#### Values accepted

Boolean `0` or `1`

#### Default value

`0`

### RUNAI_STREAMER_GCS_CREDENTIAL_FILE

Specifies the path to a credential file to use for GCS authentication.
Expand Down
10 changes: 10 additions & 0 deletions docs/src/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ The session token shoud be passed as an environment variable `AWS_SESSION_TOKEN`

To check if IAM role assumption is needed run `aws s3 ls s3://your-bucket-name --region your-region`. If you get a `403 Forbidden` error, you might need an assumed role

###### Unsigned requests (public buckets)

To access public S3 buckets that do not require authentication, set:

```bash
export RUNAI_STREAMER_S3_UNSIGNED=1
```

This configures the boto3 client to send unsigned (anonymous) requests, bypassing credential resolution entirely.

#### Streaming from Azure Blob Storage

> **Note:** Streaming models from Azure Blob Storage requires the installation of the streamer Azure package, as can be found [here](#azureCapabilityInstallation).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import boto3

AWS_CA_BUNDLE_ENV = "AWS_CA_BUNDLE"
RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR = "RUNAI_STREAMER_S3_UNSIGNED"

class S3Credentials:
def __init__(
Expand All @@ -20,7 +21,7 @@ def __init__(
self.region_name = region_name
self.endpoint = endpoint

def get_credentials(credentials: Optional[S3Credentials] = None) -> Tuple[boto3.Session, S3Credentials]:
def get_credentials(credentials: Optional[S3Credentials] = None) -> Tuple[Optional[boto3.Session], S3Credentials]:
"""
Creates a boto3 session only if the environment variable RUNAI_STREAMER_NO_BOTO3_SESSION is set.
If the variable is not set, returns None and the original credentials.
Expand All @@ -30,6 +31,14 @@ def get_credentials(credentials: Optional[S3Credentials] = None) -> Tuple[boto3.
- S3Credentials object with the resolved credentials (or original if session not created)
"""

if os.getenv(RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR, "0") == "1":
if AWS_CA_BUNDLE_ENV not in os.environ:
session = boto3.Session()
ca_bundle = session._session.get_config_variable("ca_bundle")
if ca_bundle is not None:
os.environ.setdefault(AWS_CA_BUNDLE_ENV, ca_bundle)
return None, credentials if credentials else S3Credentials()

if "RUNAI_STREAMER_NO_BOTO3_SESSION" in os.environ:
return None, credentials if credentials else S3Credentials()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import unittest
from unittest.mock import patch, MagicMock

from runai_model_streamer_s3.credentials.credentials import (
get_credentials,
AWS_CA_BUNDLE_ENV,
RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR,
)


def _env_without_unsigned():
env = os.environ.copy()
env.pop(RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR, None)
env.pop(AWS_CA_BUNDLE_ENV, None)
env.pop("RUNAI_STREAMER_NO_BOTO3_SESSION", None)
return env


class TestGetCredentialsUnsigned(unittest.TestCase):
@patch("runai_model_streamer_s3.credentials.credentials.boto3")
def test_unsigned_returns_no_session(self, mock_boto3):
mock_boto3.Session.return_value._session.get_config_variable.return_value = None
with patch.dict(os.environ, {RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR: "1"}, clear=False):
session, _ = get_credentials(None)
self.assertIsNone(session)

@patch("runai_model_streamer_s3.credentials.credentials.boto3")
def test_unsigned_sets_ca_bundle(self, mock_boto3):
mock_boto3.Session.return_value._session.get_config_variable.return_value = "/etc/ssl/custom.pem"
env = _env_without_unsigned()
env[RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR] = "1"
with patch.dict(os.environ, env, clear=True):
get_credentials(None)
self.assertEqual(os.environ.get(AWS_CA_BUNDLE_ENV), "/etc/ssl/custom.pem")

@patch("runai_model_streamer_s3.credentials.credentials.boto3")
def test_unsigned_disabled_resolves_credentials(self, mock_boto3):
mock_session = MagicMock()
mock_session.get_credentials.return_value = None
mock_session._session.get_config_variable.return_value = None
mock_boto3.Session.return_value = mock_session
with patch.dict(os.environ, {RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR: "0"}, clear=False):
session, _ = get_credentials(None)
mock_session.get_credentials.assert_called_once()
self.assertIsNotNone(session)

@patch("runai_model_streamer_s3.credentials.credentials.boto3")
def test_unsigned_absent_resolves_credentials(self, mock_boto3):
mock_session = MagicMock()
mock_session.get_credentials.return_value = None
mock_session._session.get_config_variable.return_value = None
mock_boto3.Session.return_value = mock_session
with patch.dict(os.environ, _env_without_unsigned(), clear=True):
session, _ = get_credentials(None)
mock_session.get_credentials.assert_called_once()
self.assertIsNotNone(session)


if __name__ == "__main__":
unittest.main()
44 changes: 19 additions & 25 deletions py/runai_model_streamer_s3/runai_model_streamer_s3/files/files.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
from typing import Optional, List, Tuple
from runai_model_streamer_s3.credentials.credentials import get_credentials, S3Credentials
from runai_model_streamer_s3.credentials.credentials import get_credentials, S3Credentials, RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR
import fnmatch
import os
import boto3
from botocore import UNSIGNED
from botocore.config import Config
from pathlib import Path
import posixpath

def glob(path: str, allow_pattern: Optional[List[str]] = None, credentials: Optional[S3Credentials] = None) -> List[str]:
session, _ = get_credentials(credentials)
use_virtual_addressing = os.getenv("RUNAI_STREAMER_S3_USE_VIRTUAL_ADDRESSING", "1")

client_config = None
if use_virtual_addressing == "0":
client_config = Config(s3={'addressing_style': 'path'})

# Pass the config to the client constructor
def _build_client_config() -> Optional[Config]:
config_kwargs = {}
if os.getenv("RUNAI_STREAMER_S3_USE_VIRTUAL_ADDRESSING", "1") == "0":
config_kwargs["s3"] = {"addressing_style": "path"}
if os.getenv(RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR, "0") == "1":
config_kwargs["signature_version"] = UNSIGNED
return Config(**config_kwargs) if config_kwargs else None

def _build_s3_client(credentials: Optional[S3Credentials]):
session, _ = get_credentials(credentials)
client_config = _build_client_config()
if session is None:
s3 = boto3.client("s3", config=client_config)
else:
s3 = session.client("s3", config=client_config)

return boto3.client("s3", config=client_config)
return session.client("s3", config=client_config)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment on lines +20 to +25

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 get_credentials called unconditionally in unsigned mode

_build_s3_client always calls get_credentials on line 22 before checking whether unsigned mode is active. In environments where no credentials are configured (the primary use-case for public-bucket access), get_credentials may raise or return unexpected values, defeating the purpose of the unsigned flag. The bundled test test_credentials_not_used_when_unsigned_enabled directly asserts mock_get_credentials.assert_not_called() when RUNAI_STREAMER_S3_UNSIGNED=1 — this assertion will fail with the current implementation, as will mock_boto3.client.assert_called_once() (the session returned by the unchecked get_credentials call causes session.client() to be used instead).

The fix is to short-circuit before credential resolution when unsigned mode is on:

def _build_s3_client(credentials: Optional[S3Credentials]):
    client_config = _build_client_config()
    if os.getenv(RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR, "0") == "1":
        return boto3.client("s3", config=client_config)
    session, _ = get_credentials(credentials)
    if session is None:
        return boto3.client("s3", config=client_config)
    return session.client("s3", config=client_config)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_credentials checks the unsigmned flag internally, and if set ignores credentials resolution


def glob(path: str, allow_pattern: Optional[List[str]] = None, credentials: Optional[S3Credentials] = None) -> List[str]:
s3 = _build_s3_client(credentials)
if not path.endswith("/"):
path = f"{path}/"
bucket_name, _, keys = list_files(s3,
Expand All @@ -33,18 +38,7 @@ def pull_files(model_path: str,
allow_pattern: Optional[List[str]] = None,
ignore_pattern: Optional[List[str]] = None,
credentials: Optional[S3Credentials] = None,) -> None:
session, _ = get_credentials(credentials)
use_virtual_addressing = os.getenv("RUNAI_STREAMER_S3_USE_VIRTUAL_ADDRESSING", "1")

client_config = None
if use_virtual_addressing == "0":
client_config = Config(s3={'addressing_style': 'path'})

# Pass the config to the client constructor
if session is None:
s3 = boto3.client("s3", config=client_config)
else:
s3 = session.client("s3", config=client_config)
s3 = _build_s3_client(credentials)

if not model_path.endswith("/"):
model_path = model_path + "/"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import os
import unittest
from unittest.mock import patch, MagicMock

from botocore import UNSIGNED

import runai_model_streamer_s3.files.files as files


Expand Down Expand Up @@ -33,5 +38,64 @@ def test_removeprefix_no(self):
self.assertEqual(res, "test_prefix_string")


def _env_without_unsigned():
env = os.environ.copy()
env.pop(files.RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR, None)
return env


class TestBuildClientConfig(unittest.TestCase):
def test_unsigned_enabled_when_one(self):
with patch.dict(os.environ, {files.RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR: "1"}):
config = files._build_client_config()
self.assertIsNotNone(config)
self.assertEqual(config.signature_version, UNSIGNED)

def test_unsigned_disabled_when_zero(self):
with patch.dict(os.environ, {files.RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR: "0"}):
config = files._build_client_config()
sig = config.signature_version if config else None
self.assertNotEqual(sig, UNSIGNED)

def test_unsigned_disabled_when_absent(self):
with patch.dict(os.environ, _env_without_unsigned(), clear=True):
config = files._build_client_config()
sig = config.signature_version if config else None
self.assertNotEqual(sig, UNSIGNED)


class TestBuildS3Client(unittest.TestCase):
@patch("runai_model_streamer_s3.files.files.boto3")
@patch("runai_model_streamer_s3.files.files.get_credentials")
def test_credentials_used_when_unsigned_disabled(self, mock_get_credentials, mock_boto3):
mock_session = MagicMock()
mock_get_credentials.return_value = (mock_session, MagicMock())
with patch.dict(os.environ, {files.RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR: "0"}):
files._build_s3_client(None)
mock_get_credentials.assert_called_once()
mock_session.client.assert_called_once()
mock_boto3.client.assert_not_called()

@patch("runai_model_streamer_s3.files.files.boto3")
@patch("runai_model_streamer_s3.files.files.get_credentials")
def test_credentials_used_when_unsigned_absent(self, mock_get_credentials, mock_boto3):
mock_session = MagicMock()
mock_get_credentials.return_value = (mock_session, MagicMock())
with patch.dict(os.environ, _env_without_unsigned(), clear=True):
files._build_s3_client(None)
mock_get_credentials.assert_called_once()
mock_session.client.assert_called_once()
mock_boto3.client.assert_not_called()

@patch("runai_model_streamer_s3.files.files.boto3")
@patch("runai_model_streamer_s3.files.files.get_credentials")
def test_credentials_not_used_when_unsigned_enabled(self, mock_get_credentials, mock_boto3):
mock_get_credentials.return_value = (None, MagicMock())
with patch.dict(os.environ, {files.RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR: "1"}):
files._build_s3_client(None)
mock_get_credentials.assert_called_once()
mock_boto3.client.assert_called_once()


if __name__ == "__main__":
unittest.main()
90 changes: 90 additions & 0 deletions tests/s3/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
import json
import shutil
import tempfile
import unittest
import os
import time
import boto3
from unittest.mock import patch

from botocore.exceptions import NoCredentialsError, ClientError
from safetensors.torch import safe_open

from tests.cases.interface import ObjectStoreBackend
from tests.cases.testcases import compatibility_test_cases
from tests.safetensors.generator import create_random_safetensors
from tests.safetensors.comparison import tensor_maps_are_equal
from runai_model_streamer.safetensors_streamer.safetensors_streamer import (
SafetensorsStreamer,
list_safetensors,
pull_files,
)
RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR = "RUNAI_STREAMER_S3_UNSIGNED"


class MinioServer(ObjectStoreBackend):
Expand Down Expand Up @@ -48,5 +61,82 @@ def upload_file(self, bucket, directory, file):
bucket_name = os.getenv("AWS_BUCKET")
)


class TestS3UnsignedPublicBucket(unittest.TestCase):
PUBLIC_BUCKET = "public-test-bucket"

@classmethod
def setUpClass(cls):
cls.server = MinioServer()
cls.server.wait_for_startup()
cls.temp_dir = tempfile.mkdtemp()

s3_admin = boto3.client(
"s3",
endpoint_url=cls.server.url,
aws_access_key_id=cls.server.key,
aws_secret_access_key=cls.server.password,
)
try:
s3_admin.create_bucket(Bucket=cls.PUBLIC_BUCKET)
except ClientError as e:
if e.response["Error"]["Code"] != "BucketAlreadyOwnedByYou":
raise

s3_admin.put_bucket_policy(
Bucket=cls.PUBLIC_BUCKET,
Policy=json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": "*",
"Action": ["s3:GetObject", "s3:ListBucket"],
"Resource": [
f"arn:aws:s3:::{cls.PUBLIC_BUCKET}",
f"arn:aws:s3:::{cls.PUBLIC_BUCKET}/*"
]
}]
})
)

cls.file_path = create_random_safetensors(cls.temp_dir)
cls.server.upload_file(cls.PUBLIC_BUCKET, "", cls.file_path)

@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.temp_dir)

def test_list_safetensors_unsigned(self):
with patch.dict(os.environ, {RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR: "1"}):
result = list_safetensors(f"s3://{self.PUBLIC_BUCKET}/")
self.assertIn(f"s3://{self.PUBLIC_BUCKET}/model.safetensors", result)
Comment on lines +109 to +112

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No test for RUNAI_STREAMER_S3_UNSIGNED=0 (opt-out case)

The test suite verifies the "1" (opt-in) path but never asserts that setting the variable to "0" keeps authenticated mode active. Given the presence-check bug above, a regression test like this would have caught it at review time. Consider adding a small test that sets the env var to "0" and confirms credentials are still used (or at minimum confirms listing a private bucket still works).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added


def test_pull_files_unsigned(self):
pull_dir = tempfile.mkdtemp()
try:
with patch.dict(os.environ, {RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR: "1"}):
pull_files(f"s3://{self.PUBLIC_BUCKET}/", pull_dir, allow_pattern=["*.safetensors"])
self.assertIn("model.safetensors", os.listdir(pull_dir))
finally:
shutil.rmtree(pull_dir)

def test_stream_file_unsigned(self):
our = {}
with patch.dict(os.environ, {RUNAI_STREAMER_S3_UNSIGNED_ENV_VAR: "1"}):
with SafetensorsStreamer() as streamer:
streamer.stream_file(f"s3://{self.PUBLIC_BUCKET}/model.safetensors", None, "cpu")
for name, tensor in streamer.get_tensors():
our[name] = tensor

their = {}
with safe_open(self.file_path, framework="pt", device="cpu") as f:
for name in f.keys():
their[name] = f.get_tensor(name)

equal, message = tensor_maps_are_equal(our, their)
if not equal:
self.fail(f"Tensor mismatch: {message}")


if __name__ == "__main__":
unittest.main()
Loading