forked from konflux-ci/pulp-tool
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpulp_client.py
More file actions
1908 lines (1604 loc) · 77.9 KB
/
pulp_client.py
File metadata and controls
1908 lines (1604 loc) · 77.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Pulp API client for managing repositories and content.
This module provides the main PulpClient class, which is composed using the
mixin pattern to provide specialized functionality:
Mixins:
- RpmRepositoryMixin, FileRepositoryMixin: Repository creation and management
- RpmDistributionMixin, FileDistributionMixin: Distribution management
- RpmPackageContentMixin, FileContentMixin: Content upload and management
- ArtifactMixin: Artifact operations
- TaskMixin: Task monitoring with exponential backoff
The PulpClient class combines all resource-based mixins to provide a complete Pulp API interface
organized by resource type, matching Pulp's API documentation structure.
Key Features:
- OAuth2 authentication with automatic token refresh
- Exponential backoff for task polling
- Context-based error handling with @with_error_handling decorator
- Type-safe operations using Pydantic models
- Proper resource cleanup with context managers
"""
# Standard library imports
import asyncio
import json
import logging
import os
import ssl
import tempfile
import time
import traceback
from functools import lru_cache, wraps
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import urlencode
# Third-party imports
import httpx
# Local imports
from ..utils import create_session_with_retry
from ..utils.response_utils import content_find_results_from_json
from ..utils.artifact_detection import rpm_packages_letter_and_basename
from ..utils.constants import DEFAULT_CHUNK_SIZE, SUPPORTED_ARCHITECTURES
from ..utils.validation import sanitize_build_id_for_repository, validate_build_id
from ..utils.rpm_operations import calculate_sha256_checksum, parse_rpm_filename_to_nvr
from ..models.artifacts import ContentData, ExtraArtifactRef, FileInfoMap, PulpContentRow
from .auth import OAuth2ClientCredentialsAuth
# Resource-based mixins
from .repositories.rpm import RpmRepositoryMixin
from .repositories.file import FileRepositoryMixin
from .distributions.rpm import RpmDistributionMixin
from .distributions.file import FileDistributionMixin
from .content.rpm_packages import RpmPackageContentMixin
from .content.file_files import FileContentMixin
from .artifacts.operations import ArtifactMixin
from .tasks.operations import TaskMixin
import tomllib
# ============================================================================
# Constants
# ============================================================================
# Default timeout for HTTP requests (seconds)
# Increased to 120 seconds to handle slow operations like bulk content queries
DEFAULT_TIMEOUT = 120
# Cache TTL (time-to-live) in seconds for GET request caching
CACHE_TTL = 300 # 5 minutes
# Placeholder request for synthetic responses (httpx.raise_for_status requires it)
_EMPTY_RESPONSE_REQUEST = httpx.Request("GET", "https://placeholder/")
def _dedupe_results_by_pulp_href(results: List[Any]) -> List[Any]:
"""Deduplicate RPM result dicts by pulp_href. Later entries overwrite earlier."""
seen: Dict[str, Any] = {}
for r in results:
href = r.get("pulp_href") if isinstance(r, dict) else None
if href:
seen[href] = r
return list(seen.values())
# ============================================================================
# Performance Metrics
# ============================================================================
class PerformanceMetrics:
"""Track API performance metrics."""
def __init__(self) -> None:
"""Initialize metrics tracker."""
self.total_requests = 0
self.cached_requests = 0
self.chunked_requests = 0
self.task_polls = 0
def log_request(self, cached: bool = False) -> None:
"""Log an API request."""
self.total_requests += 1
if cached:
self.cached_requests += 1
def log_chunked_request(self, parallel: bool = True) -> None:
"""Log a chunked request (always parallel)."""
self.chunked_requests += 1
def log_task_poll(self) -> None:
"""Log a task poll."""
self.task_polls += 1
def get_summary(self) -> Dict[str, Any]:
"""
Get metrics summary.
Returns:
Dictionary with metrics summary
"""
cache_hit_rate = (self.cached_requests / self.total_requests * 100) if self.total_requests > 0 else 0
return {
"total_requests": self.total_requests,
"cached_requests": self.cached_requests,
"cache_hit_rate": f"{cache_hit_rate:.1f}%",
"chunked_requests": self.chunked_requests,
"task_polls": self.task_polls,
}
def log_summary(self) -> None:
"""Log metrics summary."""
summary = self.get_summary()
logging.info("=== API Performance Metrics ===")
logging.info("Total requests: %d", summary["total_requests"])
logging.info("Cached requests: %d (%s)", summary["cached_requests"], summary["cache_hit_rate"])
logging.info("Parallel chunked requests: %d", summary["chunked_requests"])
logging.info("Task polls: %d", summary["task_polls"])
# ============================================================================
# Cache Implementation
# ============================================================================
class TTLCache:
"""Simple time-to-live cache for GET requests."""
def __init__(self, ttl: int = CACHE_TTL):
"""
Initialize TTL cache.
Args:
ttl: Time to live in seconds for cache entries
"""
self._cache: Dict[str, Tuple[Any, float]] = {}
self._ttl = ttl
def get(self, key: str) -> Optional[Any]:
"""
Get value from cache if not expired.
Args:
key: Cache key
Returns:
Cached value or None if expired/not found
"""
if key in self._cache:
value, timestamp = self._cache[key]
if time.time() - timestamp < self._ttl:
return value
# Expired, remove it
del self._cache[key]
return None
def set(self, key: str, value: Any) -> None:
"""
Set value in cache with current timestamp.
Args:
key: Cache key
value: Value to cache
"""
self._cache[key] = (value, time.time())
def clear(self) -> None:
"""Clear all cache entries."""
self._cache.clear()
def size(self) -> int:
"""Return number of cached entries."""
return len(self._cache)
def cached_get(method: Callable) -> Callable:
"""
Decorator to cache GET request results.
Caches responses based on URL to reduce redundant API calls.
Tracks metrics for cache hits and misses.
"""
@wraps(method)
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
# Only cache if first argument is a URL string
if not args or not isinstance(args[0], str):
return method(self, *args, **kwargs)
url = args[0]
cache_key = f"{method.__name__}:{url}"
# Check cache
cached_result = self._get_cache.get(cache_key)
if cached_result is not None:
logging.debug("Cache hit for %s", url)
# Track cache hit
if hasattr(self, "_metrics"):
self._metrics.log_request(cached=True)
return cached_result
# Cache miss - make request
result = method(self, *args, **kwargs)
# Track request
if hasattr(self, "_metrics"):
self._metrics.log_request(cached=False)
# Only cache successful GET responses
if hasattr(result, "is_success") and result.is_success:
self._get_cache.set(cache_key, result)
logging.debug("Cached response for %s", url)
return result
return wrapper
# ============================================================================
# Main Client Class
# ============================================================================
class PulpClient(
RpmRepositoryMixin,
FileRepositoryMixin,
RpmDistributionMixin,
FileDistributionMixin,
RpmPackageContentMixin,
FileContentMixin,
ArtifactMixin,
TaskMixin,
):
"""
A client for interacting with Pulp API.
API documentation:
- https://docs.pulpproject.org/pulp_rpm/restapi.html
- https://docs.pulpproject.org/pulpcore/restapi.html
A note regarding PUT vs PATCH:
- PUT changes all data and therefore all required fields need to be sent
- PATCH changes only the data that we are sending
Many methods require repository, distribution, publication, etc,
to be the full API endpoint (called "pulp_href"), not simply their name.
If method argument doesn't have "name" in its name, assume it expects
pulp_href. It looks like this:
/pulp/api/v3/publications/rpm/rpm/5e6827db-260f-4a0f-8e22-7f17d6a2b5cc/
"""
def __init__(
self, config: Dict[str, Union[str, int]], domain: Optional[str] = None, config_path: Optional[Path] = None
) -> None:
"""Initialize the Pulp client.
Args:
config: Configuration dictionary from the TOML file
domain: Optional explicit domain override
config_path: Path to config file for resolving relative cert/key paths
"""
self.domain = domain
self.config = config
# Set namespace from domain or config file's domain field
self.namespace = domain if domain else config.get("domain")
self.config_path = config_path # Store config path for resolving relative cert/key paths
self.timeout = DEFAULT_TIMEOUT # Used by Protocol mixins
self._auth = None
self._async_session: Optional[httpx.AsyncClient] = None
self._cert_temp_dir: Optional[tempfile.TemporaryDirectory] = None
self._cert_paths: Optional[Tuple[str, str]] = None
self.session = self._create_session()
# Initialize cache for GET requests
self._get_cache = TTLCache(ttl=CACHE_TTL)
# Initialize performance metrics tracker
self._metrics = PerformanceMetrics()
logging.debug("PulpClient initialized with request caching enabled (TTL: %ds)", CACHE_TTL)
def _create_session(self) -> httpx.Client:
"""Create a requests session with retry strategy and connection pool configuration."""
# Pass cert to Client constructor if available, otherwise auth will be added per-request
cert = self.cert if self.config.get("cert") else None
return create_session_with_retry(cert=cert)
def _get_async_session(self) -> httpx.AsyncClient:
"""Get or create async session with optimized configuration."""
if self._async_session is None or self._async_session.is_closed:
cert = self.cert if self.config.get("cert") else None
# Create async client with same configuration as sync client
# Increased limits for concurrent chunked requests
limits = httpx.Limits(
max_keepalive_connections=20, max_connections=100 # Match sync client's connection pool
)
timeout = httpx.Timeout(self.timeout, connect=10.0)
# Add compression headers
default_headers = {
"Accept-Encoding": "gzip, deflate, br",
}
# Try to enable HTTP/2 if available, but don't fail if not
try:
import importlib.util # pylint: disable=import-outside-toplevel
use_http2 = importlib.util.find_spec("h2") is not None
except (ImportError, AttributeError):
use_http2 = False
if not use_http2:
logging.debug("HTTP/2 support not available for async client (h2 package not installed)")
# Configure SSL context for client certificates if provided
verify: Union[bool, ssl.SSLContext] = True
if cert:
# Only create SSL context if certificate files actually exist
# This allows tests to pass fake paths without FileNotFoundError
if os.path.exists(cert[0]) and os.path.exists(cert[1]):
ssl_context = ssl.create_default_context()
ssl_context.load_cert_chain(certfile=cert[0], keyfile=cert[1])
verify = ssl_context
# If cert paths provided but files don't exist, just use default verification
# (useful for testing where we mock the actual HTTP calls)
# Prepare client kwargs
client_kwargs: Dict[str, Any] = {
"limits": limits,
"timeout": timeout,
"follow_redirects": True,
"headers": default_headers,
"http2": use_http2,
"verify": verify,
}
# Add auth for non-cert authentication
if not self.config.get("cert"):
client_kwargs["auth"] = self.auth
self._async_session = httpx.AsyncClient(**client_kwargs) # type: ignore[arg-type]
return self._async_session
def close(self) -> None:
"""Close the session and release all connections."""
if hasattr(self, "session") and self.session:
self.session.close()
logging.debug("PulpClient session closed and connections released")
# Clear cache on close
if hasattr(self, "_get_cache"):
cache_size = self._get_cache.size()
self._get_cache.clear()
logging.debug("Cleared cache (%d entries)", cache_size)
# Log performance metrics summary
if hasattr(self, "_metrics"):
self._metrics.log_summary()
# Clean up temp cert/key dir if we created one for base64-decoded certs
if hasattr(self, "_cert_temp_dir") and self._cert_temp_dir is not None:
try:
self._cert_temp_dir.cleanup()
except OSError:
pass
self._cert_temp_dir = None
self._cert_paths = None
async def async_close(self) -> None:
"""Close the async session and release all connections."""
if self._async_session and not self._async_session.is_closed:
await self._async_session.aclose()
logging.debug("PulpClient async session closed and connections released")
def _run_async(self, coro) -> Any:
"""
Run async coroutine and clear cached async session afterward.
Prevents 'Event loop is closed' when multiple sync wrappers call asyncio.run()
in sequence (e.g. search-by with checksums + filenames + signed_by).
Each asyncio.run() creates and closes a loop; the cached httpx.AsyncClient
becomes bound to a closed loop. Clearing the session ensures the next call
creates a fresh client for the new loop.
"""
async def _run_with_cleanup() -> Any:
try:
return await coro
finally:
if self._async_session and not self._async_session.is_closed:
await self._async_session.aclose()
self._async_session = None
return asyncio.run(_run_with_cleanup())
def __enter__(self) -> "PulpClient":
"""Context manager entry."""
return self
def __exit__(self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[Any]) -> None:
"""Context manager exit - ensures session is closed."""
self.close()
async def _chunked_get_async(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
chunk_param: Optional[str] = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
**kwargs,
) -> httpx.Response:
"""
Perform a GET request with chunking for large parameter lists using async.
This is a workaround for the fact that requests with large parameter
values using "GET" method fails with "Request Line is too large".
Hence, this splits the parameter value into chunks of the given size,
and makes separate async requests for each chunk concurrently.
The results are aggregated into a single response.
Note: - chunks are created on only one parameter at a time.
- response object of the last chunk is returned with the aggregated results.
- chunks are processed concurrently using asyncio for optimal performance
"""
import asyncio # pylint: disable=import-outside-toplevel
async_client = self._get_async_session()
if not params or not chunk_param or chunk_param not in params:
# No chunking needed, make regular request
return await async_client.get(url, params=params, **kwargs)
# Extract the parameter value and check if it needs chunking
param_value = params[chunk_param]
if not isinstance(param_value, str) or "," not in param_value:
# Not a comma-separated list, make regular request
return await async_client.get(url, params=params, **kwargs)
values = [v.strip() for v in param_value.split(",")]
if len(values) <= chunk_size:
# Small list, make regular request
return await async_client.get(url, params=params, **kwargs)
# Need to chunk the request
chunks = [values[i : i + chunk_size] for i in range(0, len(values), chunk_size)]
logging.debug(
"Chunking parameter '%s' with %d values into %d chunks (async concurrent)",
chunk_param,
len(values),
len(chunks),
)
# Track metrics
if hasattr(self, "_metrics"):
self._metrics.log_chunked_request(parallel=True)
# Create tasks for all chunks
async def fetch_chunk(chunk: list, chunk_index: int) -> tuple:
"""Fetch a single chunk and return its results."""
chunk_params = params.copy()
chunk_params[chunk_param] = ",".join(chunk)
try:
response = await async_client.get(url, params=chunk_params, **kwargs)
self._check_response(response, f"chunked request {chunk_index}")
# Parse results
chunk_data = response.json()
results = chunk_data.get("results", [])
logging.debug("Completed chunk %d/%d with %d results", chunk_index, len(chunks), len(results))
return response, results
except Exception as e:
logging.error("Failed to process chunk %d: %s", chunk_index, e)
logging.error("Traceback: %s", traceback.format_exc())
raise
# Execute all chunks concurrently
tasks = [fetch_chunk(chunk, i) for i, chunk in enumerate(chunks, 1)]
results = await asyncio.gather(*tasks)
# Aggregate results
all_results = []
last_response = None
for response, chunk_results in results:
last_response = response
all_results.extend(chunk_results)
# Create aggregated response
if last_response:
aggregated_data = {"count": len(all_results), "results": all_results}
# Modify response content to return aggregated results from all chunks
# intentionally modifying _content
# pylint: disable=W0212 (protected-access)
last_response._content = json.dumps(aggregated_data).encode("utf-8")
return last_response
# Fallback: return empty response
return await async_client.get(url, params={chunk_param: ""}, **kwargs)
def _chunked_get(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
chunk_param: Optional[str] = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
**kwargs,
) -> httpx.Response:
"""
Synchronous wrapper for _chunked_get_async.
Provides a synchronous interface while using async implementation
underneath for better performance.
"""
import asyncio # pylint: disable=import-outside-toplevel
# Check if we're already in an event loop
try:
asyncio.get_running_loop()
# We're in an async context, but this is a sync method
raise RuntimeError("_chunked_get called from async context. Use _chunked_get_async instead.")
except RuntimeError:
# No event loop running, safe to create one
pass
return self._run_async(self._chunked_get_async(url, params, chunk_param, chunk_size, **kwargs))
@classmethod
def create_from_config_file(cls, path: Optional[str] = None, domain: Optional[str] = None) -> "PulpClient":
"""
Create a Pulp client from a standard configuration file that is
used by the `pulp` CLI tool.
The namespace/domain will be read from the config file's 'domain' field.
Args:
path: Path to config file or base64-encoded config content. If None, uses default path.
domain: Optional domain override
"""
from ..utils.config_utils import load_config_content
config_source = path or "~/.config/pulp/cli.toml"
config_bytes, is_base64 = load_config_content(config_source)
# Parse TOML from bytes
try:
config = tomllib.loads(config_bytes.decode("utf-8"))
except tomllib.TOMLDecodeError as e:
source_desc = "base64 config" if is_base64 else str(Path(config_source).expanduser())
raise ValueError(f"Invalid TOML in configuration {source_desc}: {e}") from e
# For base64 config, config_path is None (no file path)
# For file path, use the expanded path
config_path = None if is_base64 else Path(config_source).expanduser()
return cls(config["cli"], domain, config_path=config_path)
@property
def headers(self) -> Optional[Dict[str, str]]:
"""
Get headers for requests.
Returns:
None (no custom headers are currently used)
"""
return None
@property
def auth(self) -> Union[OAuth2ClientCredentialsAuth, httpx.BasicAuth]:
"""
Get authentication credentials.
Supports OAuth2 (client_id/client_secret) or Basic Auth (username/password).
OAuth2 is preferred when both credential types are present.
Returns:
OAuth2ClientCredentialsAuth or BasicAuth instance for API authentication
"""
if not self._auth:
client_id = self.config.get("client_id")
client_secret = self.config.get("client_secret")
username = self.config.get("username")
password = self.config.get("password")
# Prefer OAuth2 if both client_id and client_secret are set
if client_id and client_secret:
token_url = (
"https://sso.redhat.com/auth/realms/redhat-external/"
"protocol/openid-connect/token" # nosec B105
)
self._auth = OAuth2ClientCredentialsAuth( # type: ignore[assignment]
client_id=str(client_id),
client_secret=str(client_secret),
token_url=token_url,
)
# Fall back to Basic Auth (username/password) for packages.redhat.com
elif username is not None and password is not None:
self._auth = httpx.BasicAuth(str(username), str(password)) # type: ignore[assignment]
else:
missing = []
if not (client_id and client_secret):
missing.append("client_id/client_secret (OAuth2)")
if username is None or password is None:
missing.append("username/password (Basic Auth)")
raise ValueError(
f"Authentication credentials missing. Provide either: {', or '.join(missing)}. "
"See README for configuration."
)
return self._auth # type: ignore[return-value]
@property
def cert(self) -> Tuple[str, str]:
"""
Get client certificate information.
If cert/key paths are not absolute and config_path is available,
tries to resolve them relative to the config file's directory.
If cert or key file content is base64-encoded, decodes it and uses
temporary files so the SSL context receives valid paths.
Returns:
Tuple of (cert_path, key_path) for client certificate authentication
"""
from ..utils.config_utils import load_file_content_maybe_base64
# Return cached paths when we previously wrote base64-decoded content to temp files
if self._cert_paths is not None:
return self._cert_paths
cert_path_str = str(self.config.get("cert"))
key_path_str = str(self.config.get("key"))
# Try to resolve relative paths if config_path is available
if self.config_path and self.config_path.parent:
cert_path = Path(cert_path_str)
key_path = Path(key_path_str)
# If cert path is not absolute and doesn't exist, try relative to config
if not cert_path.is_absolute() and not cert_path.exists():
potential_cert = self.config_path.parent / cert_path
if potential_cert.exists():
cert_path_str = str(potential_cert)
# If key path is not absolute and doesn't exist, try relative to config
if not key_path.is_absolute() and not key_path.exists():
potential_key = self.config_path.parent / key_path
if potential_key.exists():
key_path_str = str(potential_key)
cert_path = Path(cert_path_str)
key_path = Path(key_path_str)
if (
cert_path_str in ("None", "")
or key_path_str in ("None", "")
or not cert_path.exists()
or not key_path.exists()
):
return (cert_path_str, key_path_str)
cert_bytes, cert_was_base64 = load_file_content_maybe_base64(cert_path)
key_bytes, key_was_base64 = load_file_content_maybe_base64(key_path)
if cert_was_base64 or key_was_base64:
self._cert_temp_dir = tempfile.TemporaryDirectory()
temp_dir = Path(self._cert_temp_dir.name)
temp_cert = temp_dir / "cert.pem"
temp_key = temp_dir / "key.pem"
temp_cert.write_bytes(cert_bytes)
temp_key.write_bytes(key_bytes)
self._cert_paths = (str(temp_cert), str(temp_key))
logging.debug("Using temp cert/key from base64-decoded content")
return self._cert_paths
return (cert_path_str, key_path_str)
@property
def request_params(self) -> Dict[str, Any]:
"""
Get default parameters for requests.
Returns:
Dictionary containing default request parameters including
authentication information (headers and auth, but not cert which is in Client)
"""
params = {}
if self.headers:
params["headers"] = self.headers
# Note: cert is passed to Client constructor, not per-request
# Only add auth if not using cert-based authentication
if not self.config.get("cert"):
params["auth"] = self.auth # type: ignore[assignment]
return params
def _url(self, endpoint: str) -> str:
"""
Build a fully qualified URL for a given API endpoint.
Args:
endpoint: API endpoint path (e.g., "api/v3/repositories/rpm/rpm/")
Returns:
Complete URL including base URL, API root, domain, and endpoint
"""
domain = self._get_domain()
relative = os.path.normpath(
"/".join(
[
str(self.config["api_root"]),
domain,
endpoint,
]
)
)
# Normpath removes the trailing slash. If it was there, put it back
if endpoint.endswith("/"):
relative += "/"
return str(self.config["base_url"]) + relative
def _get_domain(self) -> str:
"""
Get the domain name.
Returns:
Domain name as configured
"""
if self.domain:
return self.domain
if self.config.get("domain"):
return str(self.config["domain"])
return ""
def get_domain(self) -> str:
"""Public method to get the domain name."""
return self._get_domain()
@cached_get
def _get_single_resource(self, endpoint: str, name: str) -> httpx.Response:
"""
Helper method to get a single resource by name.
This method is cached to avoid redundant lookups of repositories/distributions.
Args:
endpoint: API endpoint for the resource type
name: Name of the resource to retrieve
Returns:
Response object containing the resource data
"""
url = self._url(f"{endpoint}?")
url += urlencode({"name": name, "offset": 0, "limit": 1})
return self.session.get(url, timeout=self.timeout, **self.request_params)
def _log_request_headers(self, response: httpx.Response) -> None:
"""Log request headers with sensitive data redacted."""
if response.request and response.request.headers:
safe_headers = dict(response.request.headers)
# Redact sensitive headers
for sensitive_key in ["authorization", "cookie", "x-api-key"]:
if sensitive_key in safe_headers:
safe_headers[sensitive_key] = "[REDACTED]"
logging.error(" Request Headers: %s", safe_headers)
def _log_request_body(self, response: httpx.Response) -> None:
"""Log request body, handling different content types."""
try:
if response.request and response.request.content:
try:
# Try to decode as text for logging
content = response.request.content.decode("utf-8", errors="replace")
# Truncate if very long
if len(content) > 1000:
logging.error(" Request Body (truncated): %s...", content[:1000])
else:
logging.error(" Request Body: %s", content)
except Exception:
logging.error(" Request Body: <binary data, %d bytes>", len(response.request.content))
except (httpx.RequestNotRead, AttributeError):
# For streaming/multipart requests, content has already been consumed
content_type = response.request.headers.get("content-type", "") if response.request else ""
if "multipart" in content_type:
logging.error(" Request Body: <multipart/form-data - file upload>")
else:
logging.error(" Request Body: <streaming request - content already consumed>")
def _log_response_details(self, response: httpx.Response) -> None:
"""Log response details including headers and body."""
logging.error("RESPONSE DETAILS:")
logging.error(" Status Code: %s", response.status_code)
logging.error(" Response Headers: %s", dict(response.headers))
# Try to parse error details
try:
error_data = response.json()
logging.error(" Error Data: %s", error_data)
except (ValueError, json.JSONDecodeError):
# Log response body at error level for 5xx errors
if len(response.text) > 500:
logging.error(" Response Body (truncated): %s...", response.text[:500])
else:
logging.error(" Response Body: %s", response.text)
def _log_server_error(self, response: httpx.Response, operation: str) -> None:
"""Log detailed information for server errors (5xx)."""
logging.error("=" * 80)
logging.error("SERVER ERROR (500) during %s", operation)
logging.error("=" * 80)
# Request details
logging.error("REQUEST DETAILS:")
logging.error(" Method: %s", response.request.method if response.request else "Unknown")
logging.error(" URL: %s", response.url)
self._log_request_headers(response)
self._log_request_body(response)
self._log_response_details(response)
logging.error("=" * 80)
def _check_response(self, response: httpx.Response, operation: str = "request") -> None:
"""Check if a response is successful, raise exception if not."""
if not response.is_success:
# Server errors (5xx) are critical and should be logged as ERROR
if response.status_code >= 500:
self._log_server_error(response, operation)
elif response.status_code >= 400:
# Client errors (4xx) are logged at debug level
logging.debug("Client error during %s: %s - %s", operation, response.status_code, response.text)
else:
# Other non-success responses
logging.debug("Failed to %s: %s - %s", operation, response.status_code, response.text)
raise httpx.HTTPError(f"Failed to {operation}: {response.status_code} - {response.text}")
def check_response(self, response: httpx.Response, operation: str = "request") -> None:
"""Public method to check if a response is successful, raise exception if not."""
self._check_response(response, operation)
# ============================================================================
# Async Methods for Repository Setup
# ============================================================================
def _prepare_async_kwargs(self, **kwargs: Any) -> Dict[str, Any]:
"""Prepare kwargs for async requests with auth if configured."""
if self.auth:
kwargs.setdefault("auth", self.auth)
return kwargs
async def async_get(self, url: str, **kwargs) -> httpx.Response:
"""Async GET request."""
client = self._get_async_session()
return await client.get(url, **self._prepare_async_kwargs(**kwargs))
async def async_post(self, url: str, **kwargs) -> httpx.Response:
"""Async POST request."""
client = self._get_async_session()
return await client.post(url, **self._prepare_async_kwargs(**kwargs))
# ============================================================================
# Content Management Methods (migrated from ContentManagerMixin)
# ============================================================================
def upload_content(
self, file_path: str, labels: Dict[str, str], *, file_type: str, arch: Optional[str] = None
) -> str:
"""
Generic file upload function with validation and error handling.
Args:
file_path: Path to the file to upload
labels: Labels to attach to the uploaded content
file_type: Type of file (e.g., 'rpm', 'file') - determines upload method
arch: Architecture for the uploaded content (required for RPM uploads)
Returns:
Pulp href of the uploaded content
Raises:
FileNotFoundError: If the file does not exist
PermissionError: If the file cannot be read
ValueError: If the file is empty or arch is missing for RPMs
"""
from ..utils import validate_file_path
# Validate file before upload
validate_file_path(file_path, file_type)
try:
# Call the appropriate upload method based on file_type
if file_type.lower() == "rpm":
if not arch:
raise ValueError("arch parameter is required for RPM uploads")
# Handle RPM upload directly
url = self._url("api/v3/content/rpm/packages/upload/")
with open(file_path, "rb") as fp:
file_name = os.path.basename(file_path)
build_id = labels.get("build_id", "")
# Build relative_path for RPMs
# RPMs use only the filename as the relative_path (no build_id, no arch prefix)
# The distribution base_path contains namespace/parent_package/rpms
relative_path = file_name
data = {
"pulp_labels": json.dumps(labels),
"relative_path": relative_path,
}
files = {"file": fp}
# Log upload attempt details for debugging
logging.debug("Attempting RPM upload:")
logging.debug(" URL: %s", url)
logging.debug(" File: %s", file_name)
logging.debug(" Relative Path: %s", relative_path)
logging.debug(" Build ID: %s", build_id)
logging.debug(" Arch: %s", arch)
logging.debug(" Labels: %s", labels)
response = self.session.post(
url, data=data, files=files, timeout=self.timeout, **self.request_params
)
else:
# For non-RPM files, use create_file_content from FileContentMixin
response = self.create_file_content(
"", file_path, build_id=labels.get("build_id", ""), pulp_label=labels, arch=arch
)
# Include filename in operation for better error context
operation_context = f"upload {file_type} ({os.path.basename(file_path)})"
self._check_response(response, operation_context)
return response.json()["pulp_href"]
except httpx.HTTPError:
logging.error("Request failed for %s %s", file_type, file_path, exc_info=True)
raise
except Exception:
logging.error("Unexpected error uploading %s %s", file_type, file_path, exc_info=True)
raise
# Note: create_file_content and add_content are inherited from FileContentMixin
# _build_file_relative_path is also inherited from FileContentMixin
# ============================================================================
# Repository Management Methods (migrated from RepositoryManagerMixin)
# ============================================================================
def _create_resource(self, endpoint: str, request_model: Any) -> httpx.Response:
"""
Create a resource (repository or distribution).
Args:
endpoint: API endpoint for resource creation
request_model: Request model (RepositoryRequest or DistributionRequest)
Returns:
Response object from the creation request
"""
url = self._url(endpoint)
data = request_model.model_dump(exclude_none=True)
return self.session.post(url, json=data, timeout=self.timeout, **self.request_params)
def _create_repository(self, endpoint: str, new_repository: Any) -> httpx.Response:
"""Create a repository (delegates to _create_resource)."""
return self._create_resource(endpoint, new_repository)
def _create_distribution(self, endpoint: str, new_distribution: Any) -> httpx.Response:
"""Create a distribution (delegates to _create_resource)."""
return self._create_resource(endpoint, new_distribution)
def repository_operation(
self,
operation: str,
repo_type: str,
*,
name: Optional[str] = None,
repository_data: Optional[Any] = None,
distribution_data: Optional[Any] = None,