Skip to content

Commit b021c53

Browse files
xiaoxmengmeta-codesync[bot]
authored andcommitted
feat: Add RowRange and ChunkHeader utilities (facebookincubator#586)
Summary: Pull Request resolved: facebookincubator#586 CONTEXT: NimbleIndexProjector needs lightweight value types for representing row ranges and parsing chunk headers from raw tablet stream data. WHAT: - Add RowRange struct for representing [offset, length) row ranges with merge/overlap detection and fmt::formatter support. - Add ChunkHeader for parsing chunk headers from raw tablet stream bytes, providing header size, data size, and total chunk size accessors. Reviewed By: HuamengJiang, kewang1024, tanjialiang Differential Revision: D97251759 fbshipit-source-id: 665f09aaf1a23a66fae3a7187891d84739a5f69a
1 parent b6be38b commit b021c53

14 files changed

+266
-78
lines changed

dwio/nimble/common/ChunkHeader.h

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <cstdint>
19+
20+
#include "dwio/nimble/common/EncodingPrimitives.h"
21+
#include "dwio/nimble/common/Types.h"
22+
23+
namespace facebook::nimble {
24+
25+
/// Chunk header: 4 bytes for compressed chunk length + 1 byte for compression
26+
/// type. This constant must remain 5 to maintain backward compatibility with
27+
/// existing Nimble files.
28+
constexpr int kChunkHeaderSize = 5;
29+
30+
struct ChunkHeader {
31+
uint32_t length;
32+
CompressionType compressionType;
33+
};
34+
35+
/// Reads a chunk header from 'pos', advancing 'pos' by kChunkHeaderSize bytes.
36+
inline ChunkHeader readChunkHeader(const char*& pos) {
37+
return {
38+
encoding::readUint32(pos),
39+
static_cast<CompressionType>(encoding::readChar(pos))};
40+
}
41+
42+
/// Writes a chunk header to 'pos', advancing 'pos' by kChunkHeaderSize bytes.
43+
inline void
44+
writeChunkHeader(uint32_t length, CompressionType compressionType, char*& pos) {
45+
encoding::writeUint32(length, pos);
46+
encoding::write(compressionType, pos);
47+
}
48+
49+
} // namespace facebook::nimble

dwio/nimble/common/Constants.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,4 @@ constexpr uint64_t kChunkingWriterMaxChunkSize{20 << 20}; // 20MB
4545
/// Used in place of kChunkingWriterMaxChunkSize for tables with large schemas.
4646
constexpr uint64_t kChunkingWriterWideSchemaMaxChunkSize{2 << 20}; // 2MB
4747

48-
// Chunk header size: 4 bytes for length + 1 byte for compression type.
49-
constexpr int kChunkHeaderSize = 5;
5048
} // namespace facebook::nimble

dwio/nimble/common/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ add_executable(
2424
nimble_common_tests
2525
BitEncoderTests.cpp
2626
BitsTests.cpp
27+
ChunkHeaderTest.cpp
2728
ConstantsTest.cpp
2829
ExceptionHelperTest.cpp
2930
ExceptionTests.cpp
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include <gtest/gtest.h>
18+
19+
#include "dwio/nimble/common/ChunkHeader.h"
20+
21+
namespace facebook::nimble::test {
22+
23+
TEST(ChunkHeaderTest, size) {
24+
EXPECT_EQ(kChunkHeaderSize, 5);
25+
EXPECT_EQ(kChunkHeaderSize, sizeof(uint32_t) + sizeof(CompressionType));
26+
}
27+
28+
TEST(ChunkHeaderTest, writeAndRead) {
29+
char buffer[kChunkHeaderSize];
30+
auto* writePos = buffer;
31+
writeChunkHeader(1'234, CompressionType::Zstd, writePos);
32+
EXPECT_EQ(writePos - buffer, kChunkHeaderSize);
33+
34+
const char* readPos = buffer;
35+
const auto header = readChunkHeader(readPos);
36+
EXPECT_EQ(readPos - buffer, kChunkHeaderSize);
37+
EXPECT_EQ(header.length, 1'234);
38+
EXPECT_EQ(header.compressionType, CompressionType::Zstd);
39+
}
40+
41+
TEST(ChunkHeaderTest, uncompressed) {
42+
char buffer[kChunkHeaderSize];
43+
auto* writePos = buffer;
44+
writeChunkHeader(42, CompressionType::Uncompressed, writePos);
45+
46+
const char* readPos = buffer;
47+
const auto header = readChunkHeader(readPos);
48+
EXPECT_EQ(header.length, 42);
49+
EXPECT_EQ(header.compressionType, CompressionType::Uncompressed);
50+
}
51+
52+
TEST(ChunkHeaderTest, structuredBinding) {
53+
char buffer[kChunkHeaderSize];
54+
auto* writePos = buffer;
55+
writeChunkHeader(100'000, CompressionType::Zstd, writePos);
56+
57+
const char* readPos = buffer;
58+
const auto [length, compressionType] = readChunkHeader(readPos);
59+
EXPECT_EQ(length, 100'000);
60+
EXPECT_EQ(compressionType, CompressionType::Zstd);
61+
}
62+
63+
} // namespace facebook::nimble::test

dwio/nimble/common/tests/ConstantsTest.cpp

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,8 @@
1414
* limitations under the License.
1515
*/
1616

17-
#include <gtest/gtest.h>
18-
19-
#include "dwio/nimble/common/Constants.h"
20-
2117
namespace facebook::nimble::test {
2218

23-
TEST(ConstantsTest, kChunkHeaderSize) {
24-
// Chunk header format:
25-
// - 4 bytes (uint32_t) for compressed chunk length
26-
// - 1 byte for compression type
27-
// This constant must remain 5 to maintain backward compatibility with
28-
// existing Nimble files.
29-
EXPECT_EQ(kChunkHeaderSize, 5);
30-
EXPECT_EQ(kChunkHeaderSize, sizeof(uint32_t) + 1);
31-
}
19+
// kChunkHeaderSize tests moved to ChunkHeaderTest.cpp.
3220

3321
} // namespace facebook::nimble::test

dwio/nimble/velox/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
add_library(nimble_velox_common SchemaUtils.cpp)
15-
target_link_libraries(nimble_velox_common nimble_common velox_type)
15+
target_link_libraries(nimble_velox_common nimble_common velox_type Folly::folly fmt::fmt)
1616

1717
add_library(nimble_velox_schema SchemaTypes.cpp)
1818
target_link_libraries(nimble_velox_schema nimble_common Folly::folly)

dwio/nimble/velox/ChunkedStream.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616
#include "dwio/nimble/velox/ChunkedStream.h"
17-
#include "dwio/nimble/common/EncodingPrimitives.h"
17+
#include "dwio/nimble/common/ChunkHeader.h"
1818
#include "dwio/nimble/common/Exceptions.h"
1919
#include "dwio/nimble/tablet/Compression.h"
2020
#include "folly/io/Cursor.h"
@@ -37,11 +37,10 @@ std::string_view InMemoryChunkedStream::nextChunk() {
3737
ensureLoaded();
3838
uncompressed_.clear();
3939
NIMBLE_CHECK_LE(
40-
sizeof(uint32_t) + sizeof(char),
40+
kChunkHeaderSize,
4141
stream_.size() - (pos_ - stream_.data()),
4242
"Read beyond end of stream");
43-
auto length = encoding::readUint32(pos_);
44-
auto compressionType = static_cast<CompressionType>(encoding::readChar(pos_));
43+
const auto [length, compressionType] = readChunkHeader(pos_);
4544
NIMBLE_CHECK_LE(
4645
length,
4746
stream_.size() - (pos_ - stream_.data()),
@@ -70,10 +69,10 @@ std::string_view InMemoryChunkedStream::nextChunk() {
7069
CompressionType InMemoryChunkedStream::peekCompressionType() {
7170
ensureLoaded();
7271
NIMBLE_CHECK_LE(
73-
sizeof(uint32_t) + sizeof(char),
72+
kChunkHeaderSize,
7473
stream_.size() - (pos_ - stream_.data()),
7574
"Read beyond end of stream");
76-
auto pos = pos_ + sizeof(uint32_t);
75+
auto* pos = pos_ + sizeof(uint32_t);
7776
return static_cast<CompressionType>(encoding::readChar(pos));
7877
}
7978

dwio/nimble/velox/ChunkedStreamWriter.cpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616
#include "dwio/nimble/velox/ChunkedStreamWriter.h"
17-
#include "dwio/nimble/common/EncodingPrimitives.h"
17+
#include "dwio/nimble/common/ChunkHeader.h"
1818
#include "dwio/nimble/common/Exceptions.h"
1919
#include "dwio/nimble/tablet/Compression.h"
2020

@@ -34,25 +34,22 @@ ChunkedStreamWriter::ChunkedStreamWriter(
3434

3535
std::vector<std::string_view> ChunkedStreamWriter::encode(
3636
std::string_view chunk) {
37-
constexpr uint8_t headerSize = sizeof(uint32_t) + sizeof(CompressionType);
38-
auto* header = buffer_.reserve(headerSize);
37+
auto* header = buffer_.reserve(kChunkHeaderSize);
3938
auto* pos = header;
4039

4140
if (compressionParams_.type == CompressionType::Zstd) {
4241
auto compressed = ZstdCompression::compress(
4342
buffer_.getMemoryPool(), chunk, compressionParams_.zstdLevel);
4443
if (compressed.has_value()) {
45-
encoding::writeUint32(compressed->size(), pos);
46-
encoding::write(CompressionType::Zstd, pos);
44+
writeChunkHeader(compressed->size(), CompressionType::Zstd, pos);
4745
return {
48-
{header, headerSize},
46+
{header, kChunkHeaderSize},
4947
buffer_.takeOwnership(compressed->releaseOwnership())};
5048
}
5149
}
5250

53-
encoding::writeUint32(chunk.size(), pos);
54-
encoding::write(CompressionType::Uncompressed, pos);
55-
return {{header, headerSize}, chunk};
51+
writeChunkHeader(chunk.size(), CompressionType::Uncompressed, pos);
52+
return {{header, kChunkHeaderSize}, chunk};
5653
}
5754

5855
} // namespace facebook::nimble

dwio/nimble/velox/RowRange.h

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <fmt/format.h>
20+
#include <folly/hash/Hash.h>
21+
22+
#include "velox/vector/TypeAliases.h"
23+
24+
namespace facebook::nimble {
25+
26+
/// Row range within a stripe [startRow, endRow).
27+
struct RowRange {
28+
velox::vector_size_t startRow{0};
29+
velox::vector_size_t endRow{0};
30+
31+
RowRange() = default;
32+
RowRange(velox::vector_size_t start, velox::vector_size_t end)
33+
: startRow(start), endRow(end) {}
34+
35+
inline velox::vector_size_t numRows() const {
36+
return endRow - startRow;
37+
}
38+
39+
inline bool empty() const {
40+
return startRow >= endRow;
41+
}
42+
43+
inline bool contains(const RowRange& other) const {
44+
return startRow <= other.startRow && other.endRow <= endRow;
45+
}
46+
47+
inline bool operator==(const RowRange& other) const {
48+
return startRow == other.startRow && endRow == other.endRow;
49+
}
50+
51+
inline std::string toString() const {
52+
return fmt::format("[{}, {})", startRow, endRow);
53+
}
54+
};
55+
56+
struct RowRangeHash {
57+
inline size_t operator()(const RowRange& range) const {
58+
return folly::hash::hash_combine(range.startRow, range.endRow);
59+
}
60+
};
61+
62+
} // namespace facebook::nimble

dwio/nimble/velox/selective/ChunkedDecoder.cpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
#include "dwio/nimble/velox/selective/ChunkedDecoder.h"
1818

19-
#include "dwio/nimble/common/Constants.h"
20-
#include "dwio/nimble/common/EncodingPrimitives.h"
19+
#include "dwio/nimble/common/ChunkHeader.h"
2120
#include "dwio/nimble/common/Types.h"
2221
#include "dwio/nimble/encodings/EncodingFactory.h"
2322
#include "velox/common/testutil/TestValue.h"
@@ -33,9 +32,7 @@ using namespace facebook::velox;
3332
void ChunkedDecoder::loadNextChunk() {
3433
auto ret = ensureInput(kChunkHeaderSize);
3534
NIMBLE_CHECK(ret, "Failed to read chunk header");
36-
auto length = encoding::readUint32(inputData_);
37-
const auto compressionType =
38-
static_cast<CompressionType>(encoding::readChar(inputData_));
35+
const auto [length, compressionType] = readChunkHeader(inputData_);
3936
inputSize_ -= kChunkHeaderSize;
4037
ret = ensureInput(length);
4138
NIMBLE_CHECK(ret);

0 commit comments

Comments
 (0)