Skip to content

Commit 96f2417

Browse files
pradeepvakameta-codesync[bot]
authored andcommitted
feat(serializer): Add Arrow IPC format for RemoteFunctionPage (#16981)
Summary: Pull Request resolved: #16981 Add ArrowVectorSerde implementing Arrow IPC stream format for the remote function framework. Adds ARROW_IPC as a third PageFormat alongside PRESTO_PAGE and SPARK_UNSAFE_ROW. Serialize: Velox Bridge export (zero-copy) -> Arrow C++ RecordBatch -> IPC stream write via VeloxArrowOutputStream adapter (~2x bandwidth vs PrestoPage ~5x). No CRC, no null bitmap inversion. Deserialize: IOBufArrowBuffer wraps thrift IOBuf directly as arrow::Buffer (zero-copy for contiguous IOBufs) -> IPC parse with SliceBuffer (zero-copy) -> importFromArrowAsOwner (zero-copy for fixed-width). ~0x bandwidth for fixed-width vs PrestoPage ~3x. Adds VectorSerde::deserialize(const folly::IOBuf&, ...) virtual method so IOBufToRowVector() bypasses ByteInputStream conversion. Differential Revision: D98584648
1 parent cf7d5a7 commit 96f2417

File tree

9 files changed

+816
-23
lines changed

9 files changed

+816
-23
lines changed

velox/functions/remote/client/tests/RemoteFunctionTest.cpp

Lines changed: 133 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <stdlib.h>
2222
#include <numeric>
2323

24+
#include "velox/buffer/StringViewBufferHolder.h"
2425
#include "velox/common/base/Exceptions.h"
2526
#include "velox/common/base/tests/GTestUtils.h"
2627
#include "velox/functions/Registerer.h"
@@ -229,10 +230,8 @@ TEST_P(RemoteFunctionTest, tryErrorCode) {
229230
}
230231

231232
TEST_P(RemoteFunctionTest, opaque) {
232-
// TODO: Support opaque type serialization in SPARK_UNSAFE_ROW
233-
if (GetParam() == remote::PageFormat::SPARK_UNSAFE_ROW) {
234-
LOG(WARNING)
235-
<< "opaque type serialization not supported in SPARK_UNSAFE_ROW";
233+
// Opaque type serialization not supported in SPARK_UNSAFE_ROW or ARROW_IPC.
234+
if (GetParam() != remote::PageFormat::PRESTO_PAGE) {
236235
return;
237236
}
238237
auto inputVector = makeFlatVector<std::shared_ptr<void>>(
@@ -267,6 +266,134 @@ TEST_P(RemoteFunctionTest, connectionError) {
267266
}
268267
}
269268

269+
// Arrow IPC-specific tests that exercise type coverage and edge cases beyond
270+
// the parameterized suite. These use TEST_P but skip non-ARROW_IPC formats.
271+
TEST_P(RemoteFunctionTest, arrowIpcNullsInBigint) {
272+
if (GetParam() != remote::PageFormat::ARROW_IPC) {
273+
return;
274+
}
275+
auto inputVector =
276+
makeNullableFlatVector<int64_t>({1, std::nullopt, 3, std::nullopt, 5});
277+
auto results = evaluate<SimpleVector<int64_t>>(
278+
"remote_plus(c0, c0)", makeRowVector({inputVector}));
279+
280+
auto expected =
281+
makeNullableFlatVector<int64_t>({2, std::nullopt, 6, std::nullopt, 10});
282+
assertEqualVectors(expected, results);
283+
}
284+
285+
TEST_P(RemoteFunctionTest, arrowIpcNullsInDouble) {
286+
if (GetParam() != remote::PageFormat::ARROW_IPC) {
287+
return;
288+
}
289+
auto numerator =
290+
makeNullableFlatVector<double>({1.0, std::nullopt, 9.0, 16.0});
291+
auto denominator = makeNullableFlatVector<double>({1.0, 2.0, 3.0, 4.0});
292+
auto results = evaluate<SimpleVector<double>>(
293+
"remote_divide(c0, c1)", makeRowVector({numerator, denominator}));
294+
295+
auto expected = makeNullableFlatVector<double>({1.0, std::nullopt, 3.0, 4.0});
296+
assertEqualVectors(expected, results);
297+
}
298+
299+
TEST_P(RemoteFunctionTest, arrowIpcNullsInString) {
300+
if (GetParam() != remote::PageFormat::ARROW_IPC) {
301+
return;
302+
}
303+
auto inputStr = makeNullableFlatVector<StringView>(
304+
{"hello"_sv, std::nullopt, "world"_sv, std::nullopt});
305+
auto inputPos = makeNullableFlatVector<int32_t>({2, 1, 3, 1});
306+
auto results = evaluate<SimpleVector<StringView>>(
307+
"remote_substr(c0, c1)", makeRowVector({inputStr, inputPos}));
308+
309+
auto expected = makeNullableFlatVector<StringView>(
310+
{"ello"_sv, std::nullopt, "rld"_sv, std::nullopt});
311+
assertEqualVectors(expected, results);
312+
}
313+
314+
TEST_P(RemoteFunctionTest, arrowIpcLargeBatch) {
315+
if (GetParam() != remote::PageFormat::ARROW_IPC) {
316+
return;
317+
}
318+
constexpr int kSize = 10'000;
319+
std::vector<int64_t> values(kSize);
320+
std::iota(values.begin(), values.end(), 0);
321+
322+
auto inputVector = makeFlatVector<int64_t>(values);
323+
auto results = evaluate<SimpleVector<int64_t>>(
324+
"remote_plus(c0, c0)", makeRowVector({inputVector}));
325+
326+
std::vector<int64_t> expectedValues(kSize);
327+
for (int i = 0; i < kSize; i++) {
328+
expectedValues[i] = values[i] * 2;
329+
}
330+
auto expected = makeFlatVector<int64_t>(expectedValues);
331+
assertEqualVectors(expected, results);
332+
}
333+
334+
TEST_P(RemoteFunctionTest, arrowIpcAllNulls) {
335+
if (GetParam() != remote::PageFormat::ARROW_IPC) {
336+
return;
337+
}
338+
auto inputVector = makeNullableFlatVector<int64_t>(
339+
{std::nullopt, std::nullopt, std::nullopt});
340+
auto results = evaluate<SimpleVector<int64_t>>(
341+
"remote_plus(c0, c0)", makeRowVector({inputVector}));
342+
343+
auto expected = makeNullableFlatVector<int64_t>(
344+
{std::nullopt, std::nullopt, std::nullopt});
345+
assertEqualVectors(expected, results);
346+
}
347+
348+
TEST_P(RemoteFunctionTest, arrowIpcMixedNullsAcrossColumns) {
349+
if (GetParam() != remote::PageFormat::ARROW_IPC) {
350+
return;
351+
}
352+
// Column a has nulls at positions 1,3; column b has nulls at positions 0,2.
353+
auto colA =
354+
makeNullableFlatVector<int64_t>({1, std::nullopt, 3, std::nullopt});
355+
auto colB =
356+
makeNullableFlatVector<int64_t>({std::nullopt, 20, std::nullopt, 40});
357+
auto results = evaluate<SimpleVector<int64_t>>(
358+
"remote_plus(c0, c1)", makeRowVector({colA, colB}));
359+
360+
// plus propagates nulls: null if either input is null.
361+
auto expected = makeNullableFlatVector<int64_t>(
362+
{std::nullopt, std::nullopt, std::nullopt, std::nullopt});
363+
assertEqualVectors(expected, results);
364+
}
365+
366+
TEST_P(RemoteFunctionTest, arrowIpcLongStrings) {
367+
if (GetParam() != remote::PageFormat::ARROW_IPC) {
368+
return;
369+
}
370+
// Strings longer than 12 bytes (non-inline in Velox StringView).
371+
const std::string longStr(256, 'x');
372+
const std::string longerStr(4'096, 'y');
373+
auto inputStr = makeFlatVector<StringView>(
374+
{StringView(longStr), StringView(longerStr), "short"_sv});
375+
auto inputPos = makeFlatVector<int32_t>({1, 1, 1});
376+
auto results = evaluate<SimpleVector<StringView>>(
377+
"remote_substr(c0, c1)", makeRowVector({inputStr, inputPos}));
378+
379+
// substr(s, 1) returns the full string.
380+
auto expected = makeFlatVector<StringView>(
381+
{StringView(longStr), StringView(longerStr), "short"_sv});
382+
assertEqualVectors(expected, results);
383+
}
384+
385+
TEST_P(RemoteFunctionTest, arrowIpcSingleRow) {
386+
if (GetParam() != remote::PageFormat::ARROW_IPC) {
387+
return;
388+
}
389+
auto inputVector = makeFlatVector<int64_t>({42});
390+
auto results = evaluate<SimpleVector<int64_t>>(
391+
"remote_plus(c0, c0)", makeRowVector({inputVector}));
392+
393+
auto expected = makeFlatVector<int64_t>({84});
394+
assertEqualVectors(expected, results);
395+
}
396+
270397
/// Mock implementation of IRemoteFunctionClient for testing without a real
271398
/// thrift server.
272399
class MockRemoteFunctionClient : public IRemoteFunctionClient {
@@ -498,7 +625,8 @@ VELOX_INSTANTIATE_TEST_SUITE_P(
498625
RemoteFunctionTest,
499626
::testing::Values(
500627
remote::PageFormat::PRESTO_PAGE,
501-
remote::PageFormat::SPARK_UNSAFE_ROW));
628+
remote::PageFormat::SPARK_UNSAFE_ROW,
629+
remote::PageFormat::ARROW_IPC));
502630

503631
} // namespace
504632
} // namespace facebook::velox::functions

velox/functions/remote/if/GetSerde.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
#include "velox/functions/remote/if/GetSerde.h"
18+
#include "velox/serializers/ArrowSerializer.h"
1819
#include "velox/serializers/PrestoSerializer.h"
1920
#include "velox/serializers/UnsafeRowSerializer.h"
2021

@@ -27,6 +28,9 @@ std::unique_ptr<VectorSerde> getSerde(const remote::PageFormat& format) {
2728

2829
case remote::PageFormat::SPARK_UNSAFE_ROW:
2930
return std::make_unique<serializer::spark::UnsafeRowVectorSerde>();
31+
32+
case remote::PageFormat::ARROW_IPC:
33+
return std::make_unique<serializer::arrow::ArrowVectorSerde>();
3034
}
3135
VELOX_UNREACHABLE();
3236
}
@@ -42,6 +46,7 @@ std::unique_ptr<VectorSerde::Options> getSerdeOptions(
4246
return options;
4347
}
4448
case remote::PageFormat::SPARK_UNSAFE_ROW:
49+
case remote::PageFormat::ARROW_IPC:
4550
return std::make_unique<VectorSerde::Options>();
4651
}
4752
VELOX_UNREACHABLE();

velox/functions/remote/if/RemoteFunction.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ typedef binary IOBuf
3232
enum PageFormat {
3333
PRESTO_PAGE = 1,
3434
SPARK_UNSAFE_ROW = 2,
35+
ARROW_IPC = 3,
3536
}
3637

3738
/// Identifies the remote function being called.

0 commit comments

Comments
 (0)