-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathminio_client.py
More file actions
125 lines (106 loc) · 3.79 KB
/
minio_client.py
File metadata and controls
125 lines (106 loc) · 3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# MinIO client configuration and utilities.
import logging
from urllib.parse import urlparse
import certifi
import urllib3
from minio import Minio
from urllib3.util import Timeout as UrllibTimeout
from app.core.config import settings
logger = logging.getLogger(__name__)
def get_minio_client() -> Minio:
"""Create a MinIO client with proper timeout and retry configuration."""
# Parse endpoint to extract host and port (urlparse strips the scheme automatically)
parsed = urlparse(settings.minio_endpoint)
endpoint = parsed.netloc or parsed.path
# Validate that the endpoint is not empty
if not endpoint or endpoint.strip() == "":
error_msg = (
f"Invalid MinIO endpoint configuration: '{settings.minio_endpoint}'. "
"Endpoint must be a valid host or host:port (e.g., 'localhost:9000')"
)
logger.error(error_msg)
raise ValueError(error_msg)
# Configure timeout: 10s connect, 30s read
timeout = UrllibTimeout(connect=10, read=30)
# Configure retry: 3 attempts with backoff for server errors
retry = urllib3.Retry(
total=3,
backoff_factor=0.2,
status_forcelist=[500, 502, 503, 504],
)
# Create PoolManager with timeout, retry, and CA bundle
http_client = urllib3.PoolManager(
timeout=timeout,
retries=retry,
maxsize=10,
cert_reqs="CERT_REQUIRED",
ca_certs=certifi.where(),
)
return Minio(
endpoint=endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=settings.minio_use_ssl,
http_client=http_client,
)
def download_object(
object_name: str,
bucket_name: str | None = None,
minio_client: Minio | None = None,
) -> bytes:
"""
Download an object from MinIO and return its content as bytes.
Args:
object_name: Path/name of the object in the bucket
bucket_name: Name of the MinIO bucket (defaults to settings.minio_bucket)
minio_client: Optional MinIO client (creates one if not provided)
Returns:
bytes: The object content
Raises:
ValueError: If object_name is empty or download fails
"""
if bucket_name is None:
bucket_name = settings.minio_bucket
if minio_client is None:
minio_client = get_minio_client()
# Validate object_name
if not object_name or not object_name.strip():
raise ValueError("object_name cannot be empty or whitespace")
# Download the object from MinIO into memory
# Note: For very large files, consider streaming to disk instead of loading entirely into memory
try:
response = minio_client.get_object(bucket_name, object_name)
except Exception as e:
logger.error(
"Failed to get object from MinIO - bucket: '%s', object: '%s': %s",
bucket_name,
object_name,
e,
)
raise ValueError(
f"Failed to retrieve '{object_name}' from bucket '{bucket_name}': {e}"
) from e
try:
content = response.read()
# Warn if file is very large (e.g., > 100MB)
file_size_mb = len(content) / (1024 * 1024)
if file_size_mb > 100:
logger.warning(
"Large file loaded into memory: %.1f MB for '%s'",
file_size_mb,
object_name,
)
return content
except Exception as e:
logger.error(
"Failed to read content from MinIO - bucket: '%s', object: '%s': %s",
bucket_name,
object_name,
e,
)
raise ValueError(
f"Failed to read content of '{object_name}' from bucket '{bucket_name}': {e}"
) from e
finally:
response.close()
response.release_conn()