1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414#
15+ import gc
1516import http .client
17+ import json
1618import queue
1719import socket
1820import ssl
21+ import sys
22+ import threading
23+ import time
24+ import traceback
1925
2026import v3io .dataplane .request
2127import v3io .dataplane .response
2228
2329from . import abstract
2430
2531
32+ def get_connection_pool_stats (transport ):
33+ """Collect comprehensive statistics about the connection pool state."""
34+ pool_stats = {
35+ "timestamp" : time .time (),
36+ "free_connections_size" : transport ._free_connections .qsize () if transport ._free_connections else 0 ,
37+ "free_connections_empty" : transport ._free_connections .empty () if transport ._free_connections else True ,
38+ "max_connections" : transport .max_connections ,
39+ "active_thread_count" : threading .active_count (),
40+ "ssl_version" : ssl .OPENSSL_VERSION ,
41+ "python_version" : sys .version ,
42+ }
43+
44+ # Get details about all HTTPConnection objects
45+ connection_objects = []
46+ for obj in gc .get_objects ():
47+ if isinstance (obj , http .client .HTTPConnection ) or isinstance (obj , http .client .HTTPSConnection ):
48+ conn_info = {
49+ "host" : getattr (obj , "host" , "unknown" ),
50+ "port" : getattr (obj , "port" , "unknown" ),
51+ "timeout" : getattr (obj , "timeout" , "unknown" ),
52+ "has_sock" : hasattr (obj , "sock" ) and obj .sock is not None ,
53+ }
54+
55+ # Capture SSL socket details if available
56+ if hasattr (obj , "sock" ) and obj .sock is not None and isinstance (obj .sock , ssl .SSLSocket ):
57+ try :
58+ sock = obj .sock
59+ conn_info ["ssl_socket" ] = {
60+ "cipher" : sock .cipher (),
61+ "version" : sock .version (),
62+ "compression" : sock .compression (),
63+ "pending" : sock .pending (),
64+ "fileno" : sock .fileno () if hasattr (sock , "fileno" ) else None ,
65+ }
66+ except Exception as e :
67+ conn_info ["ssl_socket_error" ] = str (e )
68+
69+ connection_objects .append (conn_info )
70+
71+ pool_stats ["connection_objects" ] = connection_objects
72+
73+ # Get OS resource info
74+ try :
75+ import resource
76+
77+ rusage = resource .getrusage (resource .RUSAGE_SELF )
78+ pool_stats ["resource_usage" ] = {
79+ "max_rss" : rusage .ru_maxrss ,
80+ "page_faults" : rusage .ru_minflt ,
81+ "block_input" : rusage .ru_inblock ,
82+ "block_output" : rusage .ru_oublock ,
83+ }
84+ except ImportError :
85+ pool_stats ["resource_usage" ] = "resource module not available"
86+
87+ # Get socket statistics if available
88+ try :
89+ pool_stats ["socket_count" ] = len (socket ._connection_list ) if hasattr (socket , "_connection_list" ) else "unknown"
90+ except Exception :
91+ pool_stats ["socket_count" ] = "error getting socket count"
92+
93+ return pool_stats
94+
95+
96+ # Add this function to dump request details directly to the log
97+ def log_full_request_details (request , logger ):
98+ """
99+ Log complete details of a request including headers and body directly to the logger
100+ """
101+ try :
102+ # Create log sections with clear separation
103+ logger .error (" ==================== SSL ERROR - FULL REQUEST DUMP ====================" )
104+
105+ # Basic request info
106+ logger .error (" REQUEST DETAILS:" )
107+ logger .error (f" - Method: { request .method } " )
108+ logger .error (f" - Path: { request .encode_path ()} " )
109+
110+ # All headers
111+ logger .error (" HEADERS:" )
112+ for header_name , header_value in request .headers .items ():
113+ # Mask sensitive headers
114+ if header_name .lower () in ["authorization" , "x-v3io-session-key" ]:
115+ logger .error (f" - { header_name } : [REDACTED]" )
116+ else :
117+ logger .error (f" - { header_name } : { header_value } " )
118+
119+ # Body content
120+ if request .body :
121+ # If body is bytes, decode if possible
122+ if isinstance (request .body , bytes ):
123+ # try:
124+ # body_str = request.body.decode("utf-8")
125+ # logger.error(f" {body_str}")
126+ # except UnicodeDecodeError:
127+ logger .error (f" [BODY data, length: { len (request .body )} bytes]" )
128+ import base64
129+
130+ hex_dump = base64 .b64encode (request .body [:4096 ])
131+ logger .error (f" Hex dump (first 4096 bytes): { hex_dump } " )
132+ elif isinstance (request .body , str ):
133+ logger .error (f" { request .body } " )
134+ else :
135+ # For file-like objects or other types
136+ logger .error (f" [Body of type { type (request .body )} , cannot display directly]" )
137+ else :
138+ logger .error (" [No body]" )
139+
140+ logger .error (" ====================== END OF REQUEST DUMP ======================" )
141+ except Exception as e :
142+ logger .error (f" Error logging request details: { str (e )} " )
143+
144+
26145class Transport (abstract .Transport ):
27146 _connection_timeout_seconds = 20
28147 _request_max_retries = 2
@@ -46,6 +165,10 @@ def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, ve
46165 )
47166 self ._get_status_and_headers = self ._get_status_and_headers_py3
48167
168+ # Log initial connection pool state
169+ pool_stats = get_connection_pool_stats (self )
170+ self ._log (f"Initial connection pool state: { json .dumps (pool_stats )} " )
171+
49172 def close (self ):
50173 # Ignore redundant calls to close
51174 if not self ._free_connections :
@@ -103,7 +226,6 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
103226 response = v3io .dataplane .response .Response (request .output , status_code , headers , response_body )
104227
105228 self ._free_connections .put (connection , block = True )
106-
107229 response .raise_for_status (request .raise_for_status or raise_for_status )
108230
109231 return response
@@ -142,6 +264,15 @@ def _send_request_on_connection(self, request, connection):
142264
143265 path = request .encode_path ()
144266
267+ # Log request details
268+ request_info = {
269+ "method" : request .method ,
270+ "path" : path ,
271+ "headers" : dict (request .headers ),
272+ "body_size" : len (request .body ) if request .body else 0 ,
273+ "connection" : {"host" : connection .host , "port" : connection .port , "timeout" : connection .timeout },
274+ }
275+
145276 self .log (
146277 "Tx" , connection = connection , method = request .method , path = path , headers = request .headers , body = request .body
147278 )
@@ -153,7 +284,16 @@ def _send_request_on_connection(self, request, connection):
153284
154285 retries_left = self ._request_max_retries
155286 while True :
287+ sock_info_before_request = {}
156288 try :
289+ if hasattr (connection , "sock" ) and connection .sock and isinstance (connection .sock , ssl .SSLSocket ):
290+ sock = connection .sock
291+ sock_info_before_request = {
292+ "cipher" : sock .cipher (),
293+ "version" : sock .version (),
294+ "compression" : sock .compression (),
295+ "pending" : sock .pending (),
296+ }
157297 connection .request (request .method , path , request .body , request .headers )
158298 break
159299 except self ._send_request_exceptions as e :
@@ -174,6 +314,41 @@ def _send_request_on_connection(self, request, connection):
174314 request .body .seek (starting_offset )
175315 connection = self ._create_connection (self ._host , self ._ssl_context )
176316 request .transport .connection_used = connection
317+ except ssl .SSLError as e :
318+ log_full_request_details (request , self ._logger )
319+ # Detailed SSL error logging
320+ ssl_error_info = {
321+ "error_type" : "SSLError" ,
322+ "error_message" : str (e ),
323+ "error_code" : e .errno if hasattr (e , "errno" ) else None ,
324+ "ssl_lib" : e .library if hasattr (e , "library" ) else None ,
325+ "ssl_func" : e .reason if hasattr (e , "reason" ) else None ,
326+ "traceback" : traceback .format_exc (),
327+ "sock_info_before_request" : json .dumps (sock_info_before_request ),
328+ }
329+
330+ # Get socket state if available
331+ if hasattr (connection , "sock" ) and connection .sock :
332+ try :
333+ sock = connection .sock
334+ ssl_error_info ["socket_state" ] = {
335+ "fileno" : sock .fileno () if hasattr (sock , "fileno" ) else None ,
336+ "blocking" : sock .getblocking () if hasattr (sock , "getblocking" ) else None ,
337+ "timeout" : sock .gettimeout () if hasattr (sock , "gettimeout" ) else None ,
338+ }
339+ except Exception as sock_e :
340+ ssl_error_info ["socket_state_error" ] = str (sock_e )
341+
342+ # Get connection pool stats
343+ try :
344+ ssl_error_info ["pool_stats" ] = get_connection_pool_stats (self )
345+ except Exception as pool_e :
346+ ssl_error_info ["pool_stats_error" ] = str (pool_e )
347+
348+ ssl_error_info ["request_info" ] = request_info
349+
350+ self ._logger .error (f" SSL Error: { json .dumps (ssl_error_info )} " )
351+ raise e
177352 except BaseException as e :
178353 self ._logger .error_with (
179354 "Unhandled exception while sending request" , e = type (e ), e_msg = e , connection = connection
0 commit comments