Skip to content

Commit

Permalink
GH-45263: [MATLAB] Add ability to construct RecordBatchStreamReader
Browse files Browse the repository at this point in the history
… from `uint8` array (#45274)

### Rationale for this change

To enable more workflows using the IPC Stream format in the MATLAB interface, this pull request adds the ability to construct a `RecordBatchStreamReader` from a MATLAB `uint8` array.

This is helpful, for example, to enable Arrow-over-HTTP workflows in conjunction with the [MATLAB `webread` function](https://www.mathworks.com/help/matlab/ref/webread.html) (which can return a `uint8` array from an HTTP request).

This is a followup issue to #44923.

### What changes are included in this PR?

1. Added a new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromBytes(bytes)`.
2. Added a new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromFile(filename)`.
3. Changed the signature of the `arrow.io.ipc.RecordBatchStreamReader` constructor to no longer directly accept a `filename` as an input. Instead, a `arrow.io.ipc.RecordBatchStreamReader` can now only be directly constructed from a `libmexclass.proxy.Proxy` instance. This mirrors the design of other MATLAB classes which wrap `Proxy` instances in the MATLAB interface. To construct `RecordBatchStreamReader` objects from an Arrow IPC Stream file on disk, users can instead use the new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromFile(filename)`.

### Are these changes tested?

Yes.

1. Updated tests in `arrow/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m` to be parameterized over the `fromFile` and `fromBytes` "construction functions".
2. Added a new test to verify that an appropriate error is thrown if the `RecordBatchStreamReader` constructor is called directly with an input that is not a `libmexclass.proxy.Proxy` instance.

### Are there any user-facing changes?

Yes.

1. Users can now create `arrow.io.ipc.RecordBatchStreamReader` objects from an Arrow IPC Stream file on disk using the new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromFile(filename)`.
2. Users can now create `arrow.io.ipc.RecordBatchStreamReader` objects from an in-memory MATLAB `uint8` "bytes" array using the new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromBytes(bytes)`.

**This PR includes breaking changes to public APIs.**

This PR changes the signature of the public `arrow.io.ipc.RecordBatchStreamReader` constructor to no longer directly accept a `filename` as an input. Instead, a `arrow.io.ipc.RecordBatchStreamReader` can now only be directly constructed from a `libmexclass.proxy.Proxy` instance. This mirrors the design of other MATLAB classes which wrap `Proxy` instances in the MATLAB interface. To construct `RecordBatchStreamReader` objects from an Arrow IPC Stream file on disk, users can instead use the new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromFile(filename)`.

### Future Directions

1. Use the new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromBytes(bytes)` in an example to demonstrate how to read an Arrow IPC Stream from an HTTP endpoint as part of [apache/arrow-experiments](https://github.com/apache/arrow-experiments/tree/main/http/get_simple).

### Notes

1. Thank you @ sgilmore10 for your help with this pull request!
* GitHub Issue: #45263

Authored-by: Kevin Gurney <[email protected]>
Signed-off-by: Kevin Gurney <[email protected]>
  • Loading branch information
kevingurney authored Jan 17, 2025
1 parent c9f417f commit 1fe27fe
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 42 deletions.
2 changes: 2 additions & 0 deletions matlab/src/cpp/arrow/matlab/error/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ static const char* IPC_RECORD_BATCH_WRITE_FAILED =
static const char* IPC_RECORD_BATCH_WRITE_CLOSE_FAILED = "arrow:io:ipc:CloseFailed";
static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED =
"arrow:io:ipc:FailedToOpenRecordBatchReader";
static const char* IPC_RECORD_BATCH_READER_INVALID_CONSTRUCTION_TYPE =
"arrow:io:ipc:InvalidConstructionType";
static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX = "arrow:io:ipc:InvalidIndex";
static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed";
static const char* IPC_TABLE_READ_FAILED = "arrow:io:ipc:TableReadFailed";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h"
#include "arrow/io/file.h"
#include "arrow/io/memory.h"
#include "arrow/matlab/buffer/matlab_buffer.h"
#include "arrow/matlab/error/error.h"
#include "arrow/matlab/tabular/proxy/record_batch.h"
#include "arrow/matlab/tabular/proxy/schema.h"
Expand All @@ -36,14 +38,13 @@ RecordBatchStreamReader::RecordBatchStreamReader(
REGISTER_METHOD(RecordBatchStreamReader, readTable);
}

libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
libmexclass::proxy::MakeResult RecordBatchStreamReader::fromFile(
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
namespace mda = ::matlab::data;
using RecordBatchStreamReaderProxy =
arrow::matlab::io::ipc::proxy::RecordBatchStreamReader;

const mda::StructArray opts = constructor_arguments[0];

const mda::StringArray filename_mda = opts[0]["Filename"];
const auto filename_utf16 = std::u16string(filename_mda[0]);
MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8,
Expand All @@ -60,6 +61,43 @@ libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
return std::make_shared<RecordBatchStreamReaderProxy>(std::move(reader));
}

libmexclass::proxy::MakeResult RecordBatchStreamReader::fromBytes(
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
namespace mda = ::matlab::data;
using RecordBatchStreamReaderProxy =
arrow::matlab::io::ipc::proxy::RecordBatchStreamReader;

const mda::StructArray opts = constructor_arguments[0];
const ::matlab::data::TypedArray<uint8_t> bytes_mda = opts[0]["Bytes"];
const auto matlab_buffer =
std::make_shared<arrow::matlab::buffer::MatlabBuffer>(bytes_mda);
auto buffer_reader = std::make_shared<arrow::io::BufferReader>(matlab_buffer);
MATLAB_ASSIGN_OR_ERROR(auto reader,
arrow::ipc::RecordBatchStreamReader::Open(buffer_reader),
error::IPC_RECORD_BATCH_READER_OPEN_FAILED);
return std::make_shared<RecordBatchStreamReaderProxy>(std::move(reader));
}

libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
namespace mda = ::matlab::data;
const mda::StructArray opts = constructor_arguments[0];

// Dispatch to the appropriate static "make" method depending
// on the input type.
const mda::StringArray type_mda = opts[0]["Type"];
const auto type_utf16 = std::u16string(type_mda[0]);
if (type_utf16 == u"Bytes") {
return RecordBatchStreamReader::fromBytes(constructor_arguments);
} else if (type_utf16 == u"File") {
return RecordBatchStreamReader::fromFile(constructor_arguments);
} else {
return libmexclass::error::Error{
"arrow:io:ipc:InvalidConstructionType",
"Invalid construction type for RecordBatchStreamReader."};
}
}

void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context& context) {
namespace mda = ::matlab::data;
using SchemaProxy = arrow::matlab::tabular::proxy::Schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class RecordBatchStreamReader : public libmexclass::proxy::Proxy {

static libmexclass::proxy::MakeResult make(
const libmexclass::proxy::FunctionArguments& constructor_arguments);
static libmexclass::proxy::MakeResult fromFile(
const libmexclass::proxy::FunctionArguments& constructor_arguments);
static libmexclass::proxy::MakeResult fromBytes(
const libmexclass::proxy::FunctionArguments& constructor_arguments);

protected:
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> reader;
Expand Down
28 changes: 24 additions & 4 deletions matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,34 @@
Schema
end

methods
function obj = RecordBatchStreamReader(filename)
methods (Static)
function obj = fromBytes(bytes)
arguments
bytes(:, 1) uint8
end
args = struct(Bytes=bytes, Type="Bytes");
proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader";
proxy = arrow.internal.proxy.create(proxyName, args);
obj = arrow.io.ipc.RecordBatchStreamReader(proxy);
end

function obj = fromFile(filename)
arguments
filename(1, 1) string {mustBeNonzeroLengthText}
end
args = struct(Filename=filename);
args = struct(Filename=filename, Type="File");
proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader";
obj.Proxy = arrow.internal.proxy.create(proxyName, args);
proxy = arrow.internal.proxy.create(proxyName, args);
obj = arrow.io.ipc.RecordBatchStreamReader(proxy);
end
end

methods
function obj = RecordBatchStreamReader(proxy)
arguments
proxy(1, 1) libmexclass.proxy.Proxy
end
obj.Proxy = proxy;
end

function schema = get.Schema(obj)
Expand Down
Loading

0 comments on commit 1fe27fe

Please sign in to comment.