diff --git a/pyproject.toml b/pyproject.toml index 4a9b13b..f76f668 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "api-to-dataframe" -version = "1.5.1" +version = "1.5.2" description = "A package to convert API responses to pandas dataframe" authors = ["IvanildoBarauna "] readme = "README.md" diff --git a/src/api_to_dataframe/controller/client_builder.py b/src/api_to_dataframe/controller/client_builder.py index ef52971..851898f 100644 --- a/src/api_to_dataframe/controller/client_builder.py +++ b/src/api_to_dataframe/controller/client_builder.py @@ -38,8 +38,8 @@ def __init__( # pylint: disable=too-many-positional-arguments,too-many-argument error_msg = "endpoint cannot be an empty string" logger.error(error_msg) telemetry.logs().new_log( - msg=error_msg, - tags={"component": "ClientBuilder", "method": "__init__"}, + msg=error_msg, + tags={"component": "ClientBuilder", "method": "__init__"}, level=40 # ERROR level ) raise ValueError @@ -47,8 +47,8 @@ def __init__( # pylint: disable=too-many-positional-arguments,too-many-argument error_msg = "retries must be a non-negative integer" logger.error(error_msg) telemetry.logs().new_log( - msg=error_msg, - tags={"component": "ClientBuilder", "method": "__init__"}, + msg=error_msg, + tags={"component": "ClientBuilder", "method": "__init__"}, level=40 # ERROR level ) raise ValueError @@ -56,8 +56,8 @@ def __init__( # pylint: disable=too-many-positional-arguments,too-many-argument error_msg = "initial_delay must be a non-negative integer" logger.error(error_msg) telemetry.logs().new_log( - msg=error_msg, - tags={"component": "ClientBuilder", "method": "__init__"}, + msg=error_msg, + tags={"component": "ClientBuilder", "method": "__init__"}, level=40 # ERROR level ) raise ValueError @@ -65,8 +65,8 @@ def __init__( # pylint: disable=too-many-positional-arguments,too-many-argument error_msg = "connection_timeout must be a non-negative integer" logger.error(error_msg) telemetry.logs().new_log( - msg=error_msg, - tags={"component": "ClientBuilder", "method": "__init__"}, + msg=error_msg, + tags={"component": "ClientBuilder", "method": "__init__"}, level=40 # ERROR level ) raise ValueError @@ -77,17 +77,17 @@ def __init__( # pylint: disable=too-many-positional-arguments,too-many-argument self.headers = headers self.retries = retries self.delay = initial_delay - + # Record client initialization metric telemetry.metrics().metric_increment( name="client.initialization", tags={ - "endpoint": endpoint, + "endpoint": endpoint, "retry_strategy": retry_strategy.name, "connection_timeout": str(connection_timeout) } ) - + # Log initialization telemetry.logs().new_log( msg=f"ClientBuilder initialized with endpoint {endpoint}", @@ -112,13 +112,14 @@ def get_api_data(self): Returns: dict: The JSON response from the API as a dictionary. """ - # Use the telemetry spans with context manager - with telemetry.traces().span_in_context("get_api_data") as (span, _): + # Use the telemetry spans with new API + span = telemetry.traces().new_span("get_api_data") + try: # Add span attributes span.set_attribute("endpoint", self.endpoint) span.set_attribute("retry_strategy", self.retry_strategy.name) span.set_attribute("connection_timeout", self.connection_timeout) - + # Log the API request telemetry.logs().new_log( msg=f"Making API request to {self.endpoint}", @@ -129,33 +130,33 @@ def get_api_data(self): }, level=20 # INFO level ) - + # Record the start time for response time measurement start_time = time.time() - + # Make the API request response = GetData.get_response( endpoint=self.endpoint, headers=self.headers, connection_timeout=self.connection_timeout, ) - + # Calculate response time response_time = time.time() - start_time - + # Record response time as histogram telemetry.metrics().record_histogram( name="api.response_time", tags={"endpoint": self.endpoint}, value=response_time ) - + # Record successful request metric telemetry.metrics().metric_increment( name="api.request.success", tags={"endpoint": self.endpoint} ) - + # Log success telemetry.logs().new_log( msg=f"API request to {self.endpoint} successful", @@ -169,7 +170,9 @@ def get_api_data(self): level=20 # INFO level ) - return response.json() + return response.json() + finally: + span.end() @staticmethod def api_to_dataframe(response: dict): @@ -186,11 +189,12 @@ def api_to_dataframe(response: dict): Returns: DataFrame: A pandas DataFrame containing the data from the API response. """ - # Use telemetry for this operation - with telemetry.traces().span_in_context("api_to_dataframe") as (span, _): + # Use telemetry with new API + span = telemetry.traces().new_span("api_to_dataframe") + try: response_size = len(response) if isinstance(response, list) else 1 span.set_attribute("response_size", response_size) - + # Log conversion start telemetry.logs().new_log( msg="Converting API response to DataFrame", @@ -202,17 +206,17 @@ def api_to_dataframe(response: dict): }, level=20 # INFO level ) - + try: # Convert to dataframe df = GetData.to_dataframe(response) - + # Record metrics telemetry.metrics().metric_increment( name="dataframe.conversion.success", tags={"size": len(df)} ) - + # Log success telemetry.logs().new_log( msg="Successfully converted API response to DataFrame", @@ -224,16 +228,16 @@ def api_to_dataframe(response: dict): }, level=20 # INFO level ) - + return df - + except Exception as e: # Record failure metric telemetry.metrics().metric_increment( name="dataframe.conversion.failure", tags={"error_type": type(e).__name__} ) - + # Log error error_msg = f"Failed to convert API response to DataFrame: {str(e)}" telemetry.logs().new_log( @@ -246,6 +250,8 @@ def api_to_dataframe(response: dict): }, level=40 # ERROR level ) - + # Re-raise the exception raise + finally: + span.end() diff --git a/src/api_to_dataframe/models/get_data.py b/src/api_to_dataframe/models/get_data.py index 880b333..449b1f8 100644 --- a/src/api_to_dataframe/models/get_data.py +++ b/src/api_to_dataframe/models/get_data.py @@ -7,11 +7,12 @@ class GetData: @staticmethod def get_response(endpoint: str, headers: dict, connection_timeout: int): # Start a span for the API request - with telemetry.traces().span_in_context("http_request") as (span, _): + span = telemetry.traces().new_span("http_request") + try: span.set_attribute("http.url", endpoint) span.set_attribute("http.method", "GET") span.set_attribute("http.timeout", connection_timeout) - + # Log the request telemetry.logs().new_log( msg=f"Sending HTTP GET request to {endpoint}", @@ -23,18 +24,18 @@ def get_response(endpoint: str, headers: dict, connection_timeout: int): }, level=20 # INFO level ) - + try: # Make the request response = requests.get(endpoint, timeout=connection_timeout, headers=headers) - + # Set response attributes on span span.set_attribute("http.status_code", response.status_code) span.set_attribute("http.response_content_length", len(response.content)) - + # Attempt to raise for status to catch errors response.raise_for_status() - + # Log successful response telemetry.logs().new_log( msg=f"Received HTTP {response.status_code} response from {endpoint}", @@ -47,7 +48,7 @@ def get_response(endpoint: str, headers: dict, connection_timeout: int): }, level=20 # INFO level ) - + # Record successful request metric telemetry.metrics().metric_increment( name="http.request.success", @@ -56,16 +57,16 @@ def get_response(endpoint: str, headers: dict, connection_timeout: int): "status_code": response.status_code } ) - + return response - + except requests.exceptions.RequestException as e: # Record the exception on the span span.record_exception(e) span.set_attribute("error", True) span.set_attribute("error.type", type(e).__name__) span.set_attribute("error.message", str(e)) - + # Log the error telemetry.logs().new_log( msg=f"HTTP request failed: {str(e)}", @@ -78,7 +79,7 @@ def get_response(endpoint: str, headers: dict, connection_timeout: int): }, level=40 # ERROR level ) - + # Record failure metric telemetry.metrics().metric_increment( name="http.request.failure", @@ -87,19 +88,22 @@ def get_response(endpoint: str, headers: dict, connection_timeout: int): "error_type": type(e).__name__ } ) - + # Re-raise the exception raise + finally: + span.end() @staticmethod def to_dataframe(response): # Start a span for dataframe conversion - with telemetry.traces().span_in_context("convert_to_dataframe") as (span, _): + span = telemetry.traces().new_span("convert_to_dataframe") + try: # Set attributes about the data data_size = len(response) if isinstance(response, list) else 1 span.set_attribute("data.size", data_size) span.set_attribute("data.type", type(response).__name__) - + # Log conversion attempt telemetry.logs().new_log( msg="Converting data to DataFrame", @@ -111,16 +115,16 @@ def to_dataframe(response): }, level=20 # INFO level ) - + try: # Convert to DataFrame df = pd.DataFrame(response) - + # Check if DataFrame is empty if df.empty: error_msg = "::: DataFrame is empty :::" logger.error(error_msg) - + # Log the error with OpenTelemetry telemetry.logs().new_log( msg=error_msg, @@ -132,20 +136,20 @@ def to_dataframe(response): }, level=40 # ERROR level ) - + # Record empty DataFrame metric telemetry.metrics().metric_increment( name="dataframe.empty", tags={"data_type": type(response).__name__} ) - + # Set span as error span.set_attribute("error", True) span.set_attribute("error.type", "ValueError") span.set_attribute("error.message", error_msg) - + raise ValueError(error_msg) - + # Log success telemetry.logs().new_log( msg="Successfully converted data to DataFrame", @@ -157,35 +161,35 @@ def to_dataframe(response): }, level=20 # INFO level ) - + # Record dataframe metrics telemetry.metrics().record_gauge( name="dataframe.rows", tags={"data_type": type(response).__name__}, value=float(len(df)) ) - + telemetry.metrics().record_gauge( name="dataframe.columns", tags={"data_type": type(response).__name__}, value=float(len(df.columns)) ) - + # Set additional span attributes span.set_attribute("dataframe.rows", len(df)) span.set_attribute("dataframe.columns", len(df.columns)) - + return df - + except ValueError: # Re-raise ValueErrors (like empty DataFrame) raise - + except Exception as err: # Log the error error_msg = f"Invalid response for transform in dataframe: {err}" logger.error(error_msg) - + # Log with OpenTelemetry telemetry.logs().new_log( msg=error_msg, @@ -198,18 +202,20 @@ def to_dataframe(response): }, level=40 # ERROR level ) - + # Record conversion failure metric telemetry.metrics().metric_increment( name="dataframe.conversion.error", tags={"error_type": type(err).__name__} ) - + # Record the exception on the span span.record_exception(err) span.set_attribute("error", True) span.set_attribute("error.type", type(err).__name__) span.set_attribute("error.message", str(err)) - + # Raise TypeError with original error as cause raise TypeError(error_msg) from err + finally: + span.end() diff --git a/src/api_to_dataframe/models/retainer.py b/src/api_to_dataframe/models/retainer.py index 3a7cfdd..bf7139c 100644 --- a/src/api_to_dataframe/models/retainer.py +++ b/src/api_to_dataframe/models/retainer.py @@ -15,23 +15,24 @@ class Strategies(Enum): def retry_strategies(func): def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements retry_number = 0 - + # Get the endpoint for better observability context endpoint = args[0].endpoint if hasattr(args[0], 'endpoint') else 'unknown' - + # Start a span for this entire retry operation - with telemetry.traces().span_in_context("retry_operation") as (span, _): + span = telemetry.traces().new_span("retry_operation") + try: span.set_attribute("endpoint", endpoint) span.set_attribute("retry_strategy", args[0].retry_strategy.name) span.set_attribute("max_retries", args[0].retries) - + while retry_number < args[0].retries: try: # Log retry attempt if not the first attempt if retry_number > 0: # Log using traditional logger logger.info(f"Trying for the {retry_number} of {Constants.MAX_OF_RETRIES} retries. Using {args[0].retry_strategy}") - + # Log using OpenTelemetry telemetry.logs().new_log( msg=f"Retry attempt {retry_number} of {args[0].retries}", @@ -44,7 +45,7 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements }, level=20 # INFO level ) - + # Record retry metric telemetry.metrics().metric_increment( name="api.request.retry", @@ -53,13 +54,13 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements "retry_strategy": args[0].retry_strategy.name } ) - + # Update span with current retry count span.set_attribute("current_retry", retry_number) - + # Execute the wrapped function result = func(*args, **kwargs) - + # If we got here, it succeeded - record success metric after retries if retry_number > 0: telemetry.metrics().metric_increment( @@ -70,7 +71,7 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements "retry_strategy": args[0].retry_strategy.name } ) - + telemetry.logs().new_log( msg=f"Request succeeded after {retry_number} retries", tags={ @@ -81,16 +82,16 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements }, level=20 # INFO level ) - + return result - + except RequestException as e: retry_number += 1 - + # Record exception in the span span.record_exception(e) span.set_attribute("failed_attempt", retry_number) - + # Handle different retry strategies if args[0].retry_strategy == Strategies.NO_RETRY_STRATEGY: # Log failure with OpenTelemetry @@ -104,7 +105,7 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements }, level=40 # ERROR level ) - + # Record failure metric telemetry.metrics().metric_increment( name="api.request.failure", @@ -114,9 +115,9 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements "retry_strategy": "none" } ) - + raise e - + # Apply delay based on strategy if args[0].retry_strategy == Strategies.LINEAR_RETRY_STRATEGY: delay = args[0].delay @@ -124,7 +125,7 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements elif args[0].retry_strategy == Strategies.EXPONENTIAL_RETRY_STRATEGY: delay = args[0].delay * retry_number strategy_name = "exponential" - + # Log retry delay telemetry.logs().new_log( msg=f"Request failed, retrying in {delay}s (strategy: {strategy_name})", @@ -139,7 +140,7 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements }, level=30 # WARNING level ) - + # Record the delay time in metrics telemetry.metrics().record_gauge( name="api.retry.delay", @@ -150,16 +151,16 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements }, value=float(delay) ) - + # Sleep for the calculated delay time.sleep(delay) - + # Check if we've reached max retries if retry_number in (args[0].retries, Constants.MAX_OF_RETRIES): # Log final failure error_msg = f"Failed after {retry_number} retries" logger.error(error_msg) - + telemetry.logs().new_log( msg=error_msg, tags={ @@ -172,7 +173,7 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements }, level=40 # ERROR level ) - + # Record final failure metric telemetry.metrics().metric_increment( name="api.request.retry.exhausted", @@ -182,7 +183,9 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements "error_type": type(e).__name__ } ) - + raise e + finally: + span.end() return wrapper