Skip to content

Commit 5ae5e87

Browse files
author
Ravjot Brar
committed
Update example to return reader to give user more flexibility. Also fix client auth middleware returning null bearer token error
1 parent 8e54411 commit 5ae5e87

7 files changed

Lines changed: 20 additions & 31 deletions

File tree

python/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,4 +180,4 @@ _description:_ The specific engine to run against. Only applicable to Dremio Clo
180180
181181
This lightweight Python client application connects to the Dremio Arrow Flight server endpoint. Developers can use token based or regular user credentials (username/password) for authentication. Please note username/password is not supported for Dremio Cloud. Dremio Cloud requires a token. Any datasets in Dremio that are accessible by the provided Dremio user can be queried. Developers can change settings by providing options in a config yaml file before running the client.
182182
183-
Moreover, the tls option can be provided to establish an encrypted connection.
183+
The example includes a function called get_reader, which returns a FlightStreamReader. Users can choose to read the data based on the methods available in the [FlightStreamReader class](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader). In our example, we've decided to read the data into a Pandas dataframe.

python/dremio-flight/dremio/flight/connection.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ def _connect_to_software(
9999
middleware=[client_auth_middleware, client_cookie_middleware],
100100
**tls_args,
101101
)
102-
103102
# Authenticate with the server endpoint.
104103
password_or_token = self.password if self.password else self.token
105104
bearer_token = client.authenticate_basic_token(

python/dremio-flight/dremio/flight/endpoint.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ def __init__(self, connection_args: dict) -> None:
1212
def connect(self) -> flight.FlightClient:
1313
return self.dremio_flight_conn.connect()
1414

15-
def execute_query(self, flight_client: flight.FlightClient) -> DataFrame:
15+
def get_reader(self, client: flight.FlightClient) -> flight.FlightStreamReader:
1616
dremio_flight_query = DremioFlightEndpointQuery(
17-
self.connection_args.get("query"), flight_client, self.dremio_flight_conn
17+
self.connection_args.get("query"), client, self.dremio_flight_conn
1818
)
19-
return dremio_flight_query.execute_query()
19+
return dremio_flight_query.get_reader()

python/dremio-flight/dremio/flight/query.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __init__(
3232
self.client = client
3333
self.headers = getattr(connection, "headers")
3434

35-
def execute_query(self) -> DataFrame:
35+
def get_reader(self) -> flight.FlightStreamReader:
3636
try:
3737
options = flight.FlightCallOptions(headers=self.headers)
3838
# Get the FlightInfo message to retrieve the Ticket corresponding
@@ -43,25 +43,11 @@ def execute_query(self) -> DataFrame:
4343
logging.info("GetFlightInfo was successful")
4444
logging.debug("Ticket: %s", flight_info.endpoints[0].ticket)
4545

46-
# Retrieve the result set as pandas DataFrame
47-
reader = self.client.do_get(flight_info.endpoints[0].ticket, options)
48-
return self._get_chunks(reader)
46+
# Retrieve the reader
47+
return self.client.do_get(flight_info.endpoints[0].ticket, options)
4948

5049
except Exception:
5150
logging.exception(
5251
"There was an error trying to get the data from the flight endpoint"
5352
)
5453
raise
55-
56-
def _get_chunks(self, reader: flight.FlightStreamReader) -> DataFrame:
57-
dataframe = DataFrame()
58-
while True:
59-
try:
60-
flight_batch = reader.read_chunk()
61-
record_batch = flight_batch.data
62-
data_to_pandas = record_batch.to_pandas()
63-
dataframe = concat([dataframe, data_to_pandas])
64-
except StopIteration:
65-
break
66-
67-
return dataframe

python/dremio-flight/dremio/middleware/auth.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ def __init__(self, factory):
4848
self.factory = factory
4949

5050
def received_headers(self, headers):
51+
if self.factory.call_credential:
52+
return
53+
5154
auth_header_key = "authorization"
55+
5256
authorization_header = reduce(
5357
lambda result, header: header[1]
5458
if header[0] == auth_header_key

python/dremio-flight/tests/test.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"""
1616
from argparse import Namespace
1717
from numpy import array, array_equal
18-
from pyarrow.flight import FlightUnauthenticatedError, FlightUnavailableError
18+
from pyarrow.flight import FlightUnauthenticatedError, FlightInternalError
1919
from dotenv import load_dotenv
2020
import certifi
2121
import os
@@ -79,7 +79,7 @@ def test_simple_query():
7979
dremio_flight_query = DremioFlightEndpointQuery(
8080
args_dict["query"], flight_client, dremio_flight_conn
8181
)
82-
dataframe = dremio_flight_query.execute_query()
82+
dataframe = dremio_flight_query.get_reader().read_pandas()
8383
dataframe_arr = dataframe.to_numpy()
8484
expected_arr = array([[1, 2, 3]])
8585
assert array_equal(dataframe_arr, expected_arr)
@@ -96,7 +96,7 @@ def test_tls():
9696
dremio_flight_query = DremioFlightEndpointQuery(
9797
args_dict_ssl["query"], flight_client, dremio_flight_conn
9898
)
99-
dataframe = dremio_flight_query.execute_query()
99+
dataframe = dremio_flight_query.get_reader().read_pandas()
100100
dataframe_arr = dataframe.to_numpy()
101101
expected_arr = array([[1, 2, 3]])
102102
assert array_equal(dataframe_arr, expected_arr)
@@ -110,7 +110,7 @@ def test_bad_hostname():
110110
args_dict_bad_hostname["hostname"] = "ha-ha!"
111111

112112
dremio_flight_conn = DremioFlightEndpointConnection(args_dict_bad_hostname)
113-
with pytest.raises(FlightUnavailableError):
113+
with pytest.raises(FlightInternalError):
114114
dremio_flight_conn.connect()
115115

116116

@@ -122,7 +122,7 @@ def test_bad_port():
122122
args_dict_bad_port["port"] = 12345
123123

124124
dremio_flight_conn = DremioFlightEndpointConnection(args_dict_bad_port)
125-
with pytest.raises(FlightUnavailableError):
125+
with pytest.raises(FlightInternalError):
126126
dremio_flight_conn.connect()
127127

128128

python/example.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
# Connect to Dremio Arrow Flight server endpoint.
2727
flight_client = dremio_flight_endpoint.connect()
2828

29-
# Execute query
30-
dataframe = dremio_flight_endpoint.execute_query(flight_client)
29+
# Get reader
30+
reader = dremio_flight_endpoint.get_reader(flight_client)
3131

32-
# Print out the data
33-
print(dataframe)
32+
# Print out the data as a dataframe
33+
print(reader.read_pandas())

0 commit comments

Comments
 (0)