Open
Description
Describe the usage question you have. Please include as many useful details as possible.
# cef_flight_sdk/client/cef_flight_data_client.py
import logging
import pyarrow.flight as flight
from cef_flight_sdk.exceptions.sdk_exceptions import CefFlightException
from cef_flight_sdk.model.object_store_details import ObjectStoreDetails
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def build_ticket_str(details: ObjectStoreDetails) -> str:
"""
Builds the ticket string used to request data from the Flight server.
This ticket string is formatted with the following information from ObjectStoreDetails:
- Access key
- Secret key
- S3 path
- Data retrieval mode
- Optional batch size for "batch" mode
:param details: ObjectStoreDetails with all necessary fields.
:return: Formatted ticket string.
:raises ValueError: If batch size is provided in 'full' mode or an invalid mode is specified.
"""
mode = details.data_retrieval_mode.lower()
base_ticket_str = f"{details.access_key}|{details.secret_key}|{details.s3_path}|{mode}"
if mode == "batch":
return f"{base_ticket_str}|{details.batch_size or 100000}" # Use 100k as the default batch size
elif mode == "full":
if details.batch_size is not None:
raise ValueError("Batch size cannot be provided when mode is 'full'.")
return base_ticket_str
else:
raise ValueError(f"Invalid data retrieval mode: {mode}")
class CefFlightDataClient:
"""
A client for connecting to an Apache Arrow Flight server to retrieve data streams from an S3-compatible object store.
The CefFlightDataClient creates a new FlightClient connection for each stream request to improve
resource management and scalability in multi-client scenarios. This setup helps avoid potential
issues with lingering connections when handling multiple concurrent clients, as each request
operates with a fresh, temporary connection that is closed upon completion.
Attributes:
host (str): The Flight server hostname or IP address.
port (int): The Flight server port number.
use_tls (bool): Specifies if TLS should be used for secure connections.
tls_roots_certs_path (str): Path to trusted TLS certificates if TLS is enabled.
"""
def __init__(self, host: str, port: int, use_tls: bool = False, tls_roots_certs_path: str = None):
"""
Initialize the CefFlightDataClient with server host, port, and optional TLS settings.
:param host: Hostname or IP address of the Flight server.
:param port: Port number of the Flight server.
:param use_tls: Whether to use TLS for secure connections.
:param tls_roots_certs_path: Path to trusted TLS certificates if TLS is enabled.
"""
self.host = host
self.port = port
self.use_tls = use_tls
self.tls_roots_certs_path = tls_roots_certs_path
def _connect_to_flight_server(self):
"""Establishes a connection to the Flight server with optional TLS."""
# Create Location based on whether TLS is used
connection_args = {}
self.location = (
flight.Location.for_grpc_tls(self.host, self.port) if self.use_tls
else flight.Location.for_grpc_tcp(self.host, self.port)
)
# Handle TLS if enabled
if self.tls_roots_certs_path:
with open(self.tls_roots_certs_path, "rb") as root_certs_file:
connection_args["tls_roots_certs_path"] = root_certs_file.read()
# Use Location to initialize the client
return flight.FlightClient(self.location)
def get_stream_iterator(self, object_store_details: ObjectStoreDetails):
"""
Retrieves a data stream from the Flight server as an iterator of RecordBatch objects.
A new FlightClient instance is created for each call to `get_stream_iterator` to ensure that
connections are fresh and not reused across multiple requests. This design improves scalability
by avoiding potential connection reuse issues in multi-client scenarios, allowing each data
retrieval to use an isolated, short-lived connection.
:param object_store_details: ObjectStoreDetails with access key, secret key, S3 path,
data retrieval mode, and optional batch size.
:yield: pyarrow.RecordBatch from each FlightStreamChunk in the data stream.
:raises CefFlightException: If there is an error in fetching the data stream.
"""
ticket_str = build_ticket_str(object_store_details)
ticket = flight.Ticket(ticket_str.encode('utf-8'))
client = None
try:
client = self._connect_to_flight_server() # Initialize a new client
data_stream = client.do_get(ticket)
for chunk in data_stream:
yield chunk.data
except Exception as e:
logging.error(f"Error processing stream: {e}")
raise CefFlightException("Error processing stream", e)
finally:
if client:
client.close()
logging.info("Flight client closed after streaming.")
Main code to interact with get_stream_iterator
from cef_flight_sdk.builder.object_store_details_builder import ObjectStoreDetailsBuilder
from cef_flight_sdk.client.cef_flight_client import CefFlightDataClient
from cef_flight_sdk.config.config_loader import ConfigLoader
from cef_flight_sdk.model.data_retrieval_mode import DataRetrievalMode
def main():
"""
Main entry point for processing data from an Apache Arrow Flight server.
This script:
1. Loads configuration settings.
2. Initializes a Flight client to connect to the Flight server.
3. Builds ObjectStoreDetails for specific and wildcard S3 paths.
4. Retrieves and displays data streams for both specific and wildcard paths.
Important:
- Ensure the configuration file is correctly set up with appropriate S3 and Flight server details.
"""
# Load configuration
ConfigLoader.load_config()
host = ConfigLoader.get_property("flight_server.host")
port = int(ConfigLoader.get_property("flight_server.port"))
# Initialize the Flight client
client = CefFlightDataClient(host=host, port=port)
# Build ObjectStoreDetails for non-wildcard and wildcard paths
object_store_details_non_wildcard = build_object_store_details(
s3_path=ConfigLoader.get_property("s3.non_wildcard_path"),
mode=DataRetrievalMode.BATCH
)
object_store_details_wildcard = build_object_store_details(
s3_path=ConfigLoader.get_property("s3.wildcard_path"),
mode=DataRetrievalMode.BATCH
)
# Process and print the schema and top 10 rows for wildcard and non-wildcard paths
print("\n--- Processing Non-Wildcard Path ---")
process_stream(client, object_store_details_non_wildcard)
print("\n--- Processing Wildcard Path ---")
process_stream(client, object_store_details_wildcard)
# Optionally, write the non-wildcard data to a local Parquet file
# output_file_path = ConfigLoader.get_property("output.file_path")
# write_data_to_parquet(client, object_store_details_non_wildcard, output_file_path=output_file_path)
def build_object_store_details(s3_path: str, mode: DataRetrievalMode):
"""
Helper function to construct ObjectStoreDetails using the ObjectStoreDetailsBuilder.
:param s3_path: Path in the S3-compatible storage for the target data.
:param mode: Data retrieval mode, either 'batch' or 'full'.
:return: Configured ObjectStoreDetails instance.
"""
return (
ObjectStoreDetailsBuilder()
.with_access_key(ConfigLoader.get_property("s3.access_key"))
.with_secret_key(ConfigLoader.get_property("s3.secret_key"))
.with_s3_path(s3_path)
.with_data_retrieval_mode(mode)
.build()
)
def process_stream(client, object_store_details):
"""
Fetches the data stream from the Flight server and displays row counts and top 10 rows.
:param client: CefFlightDataClient instance to interact with the Flight server.
:param object_store_details: ObjectStoreDetails instance containing the access and retrieval details.
"""
for batch in client.get_stream_iterator(object_store_details):
print("Number of rows:", batch.num_rows)
# Optionally convert to pandas for easier handling
df = batch.to_pandas()
print("Top 10 rows:\n", df.head(10))
if __name__ == "__main__":
main()
Hello Developers and Community members I am seeing an issue, the flight client is getting stuck very often during reading the streams. Never seen the issue with scala but with python seeing the same.
Requesting your guidance
Component(s)
FlightRPC