Skip to content

GH-46481: [C++][Python] Allow nullable schema in FlightInfo #46489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ TEST(FlightTypes, FlightInfo) {
MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42, true, ""),
MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1, false,
"\xDE\xAD\xC0\xDE"),
MakeFlightInfo(desc1, {}, -1, -1, false, ""),
};
std::vector<std::string> reprs = {
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
Expand All @@ -257,6 +258,8 @@ TEST(FlightTypes, FlightInfo) {
"locations=[grpc+tcp://localhost:1234] expiration_time=null "
"app_metadata='CAFED00D'>] "
"total_records=64 total_bytes=-1 ordered=false app_metadata='DEADC0DE'>",
"<FlightInfo schema=(empty) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false app_metadata=''>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightInfo>(values, reprs));
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/flight/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descript
return info;
}

FlightInfo MakeFlightInfo(const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes, bool ordered,
std::string app_metadata) {
EXPECT_OK_AND_ASSIGN(auto info,
FlightInfo::Make(nullptr, descriptor, endpoints, total_records,
total_bytes, ordered, std::move(app_metadata)));
return info;
}

NumberingStream::NumberingStream(std::unique_ptr<FlightDataStream> stream)
: counter_(0), stream_(std::move(stream)) {}

Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/flight/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descript
int64_t total_records, int64_t total_bytes, bool ordered,
std::string app_metadata);

ARROW_FLIGHT_EXPORT
FlightInfo MakeFlightInfo(const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes, bool ordered,
std::string app_metadata);

ARROW_FLIGHT_EXPORT
Status ExampleTlsCertificates(std::vector<CertKeyPair>* out);

Expand Down
25 changes: 24 additions & 1 deletion cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,31 @@ arrow::Result<FlightInfo> FlightInfo::Make(const Schema& schema,
return FlightInfo(std::move(data));
}

arrow::Result<FlightInfo> FlightInfo::Make(const std::shared_ptr<Schema>& schema,
const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes,
bool ordered, std::string app_metadata) {
FlightInfo::Data data;
data.descriptor = descriptor;
data.endpoints = endpoints;
data.total_records = total_records;
data.total_bytes = total_bytes;
data.ordered = ordered;
data.app_metadata = std::move(app_metadata);
if (schema) {
RETURN_NOT_OK(internal::SchemaToString(*schema, &data.schema));
}
return FlightInfo(std::move(data));
}

arrow::Result<std::shared_ptr<Schema>> FlightInfo::GetSchema(
ipc::DictionaryMemo* dictionary_memo) const {
if (reconstructed_schema_) {
return schema_;
} else if (data_.schema.empty()) {
reconstructed_schema_ = true;
return schema_;
}
// Create a non-owned Buffer to avoid copying
io::BufferReader schema_reader(std::make_shared<Buffer>(data_.schema));
Expand All @@ -305,7 +326,9 @@ arrow::Status FlightInfo::Deserialize(std::string_view serialized,
std::string FlightInfo::ToString() const {
std::stringstream ss;
ss << "<FlightInfo schema=";
if (schema_) {
if (data_.schema.empty()) {
ss << "(empty)";
} else if (schema_) {
ss << schema_->ToString();
} else {
ss << "(serialized)";
Expand Down
11 changes: 10 additions & 1 deletion cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -638,12 +638,21 @@ class ARROW_FLIGHT_EXPORT FlightInfo
bool ordered = false,
std::string app_metadata = "");

/// \brief Factory method to construct a FlightInfo.
static arrow::Result<FlightInfo> Make(const std::shared_ptr<Schema>& schema,
const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes,
bool ordered = false,
std::string app_metadata = "");

/// \brief Deserialize the Arrow schema of the dataset. Populate any
/// dictionary encoded fields into a DictionaryMemo for
/// bookkeeping
/// \param[in,out] dictionary_memo for dictionary bookkeeping, will
/// be modified
/// \return Arrow result with the reconstructed Schema
/// \return Arrow result with the reconstructed Schema. Note that the schema
/// may be nullptr, as the schema is optional.
arrow::Result<std::shared_ptr<Schema>> GetSchema(
ipc::DictionaryMemo* dictionary_memo) const;

Expand Down
4 changes: 3 additions & 1 deletion python/pyarrow/_flight.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ cdef class FlightInfo(_Weakrefable):

Parameters
----------
schema : Schema
schema : Schema, optional
the schema of the data in this flight.
descriptor : FlightDescriptor
the descriptor for this flight.
Expand Down Expand Up @@ -961,6 +961,8 @@ cdef class FlightInfo(_Weakrefable):
CDictionaryMemo dummy_memo

check_flight_status(self.info.get().GetSchema(&dummy_memo).Value(&schema))
if schema.get() == NULL:
return None
return pyarrow_wrap_schema(schema)

@property
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/src/arrow/python/flight.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
const std::string& app_metadata,
std::unique_ptr<arrow::flight::FlightInfo>* out) {
ARROW_ASSIGN_OR_RAISE(auto result, arrow::flight::FlightInfo::Make(
*schema, descriptor, endpoints, total_records,
schema, descriptor, endpoints, total_records,
total_bytes, ordered, app_metadata));
*out = std::unique_ptr<arrow::flight::FlightInfo>(
new arrow::flight::FlightInfo(std::move(result)));
Expand Down
24 changes: 21 additions & 3 deletions python/pyarrow/tests/test_flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,10 @@ def __init__(self, factory):

def received_headers(self, headers):
auth_header = case_insensitive_header_lookup(headers, 'Authorization')
self.factory.set_call_credential([
b'authorization',
auth_header[0].encode("utf-8")])
if auth_header:
self.factory.set_call_credential([
b'authorization',
auth_header[0].encode("utf-8")])


class HeaderAuthServerMiddlewareFactory(ServerMiddlewareFactory):
Expand Down Expand Up @@ -916,6 +917,23 @@ def test_repr():
assert repr(flight.SchemaResult(pa.schema([("int", "int64")]))) == \
"<pyarrow.flight.SchemaResult schema=(int: int64)>"
assert repr(flight.Ticket(b"foo")) == ticket_repr
assert info.schema == pa.schema([])

info = flight.FlightInfo(
None, flight.FlightDescriptor.for_path(), [],
1, 42, True, b"test app metadata"
)
info_repr = (
"<pyarrow.flight.FlightInfo "
"schema=None "
"descriptor=<pyarrow.flight.FlightDescriptor path=[]> "
"endpoints=[] "
"total_records=1 "
"total_bytes=42 "
"ordered=True "
"app_metadata=b'test app metadata'>")
assert repr(info) == info_repr
assert info.schema is None

with pytest.raises(TypeError):
flight.Action("foo", None)
Expand Down
Loading