Skip to content

Commit c45592d

Browse files
authored
feat(buffer_object): Add resize method to BufferIO (#57)
Fixes #40
1 parent 7075ce4 commit c45592d

File tree

9 files changed

+465
-0
lines changed

9 files changed

+465
-0
lines changed

src/ml_flashpoint/checkpoint_object_manager/buffer_io.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,46 @@ def close(self, truncate: bool = True) -> None:
404404
# Set position to -1, a common convention for a closed stream.
405405
self._pos = -1
406406

407+
def resize(self, new_size: int) -> None:
408+
"""Resizes the buffer to the new size (including metadata size).
409+
410+
Args:
411+
new_size: The new size of the buffer in bytes. Must be >= METADATA_SIZE.
412+
"""
413+
self._check_validity("write")
414+
_LOGGER.info("Resizing BufferIO from %d to %d bytes.", len(self._mv), new_size)
415+
416+
if new_size < METADATA_SIZE:
417+
raise ValueError(f"New size must be at least {METADATA_SIZE} bytes, got {new_size}.")
418+
419+
# 1. Release the memoryview
420+
if self._mv:
421+
self._mv.release()
422+
self._mv = None
423+
424+
# 2. Call C++ resize
425+
try:
426+
self.buffer_obj.resize(new_size)
427+
except Exception:
428+
_LOGGER.exception("Failed to resize underlying BufferObject.")
429+
raise
430+
431+
# 3. Recreate the memoryview
432+
try:
433+
self._mv = memoryview(self.buffer_obj)
434+
except Exception:
435+
_LOGGER.exception("Failed to recreate memoryview after resize.")
436+
raise ValueError("Failed to recreate memoryview after resize.")
437+
438+
# 4. Re-map metadata
439+
# Since the buffer might have moved in memory, we need to refresh the metadata view.
440+
try:
441+
# We are always in writable mode if we are resizing (resize checks !readonly)
442+
self._metadata = BufferMetadataType.from_buffer(self._mv[:METADATA_SIZE])
443+
except Exception as e:
444+
_LOGGER.exception("Failed to recreate metadata object from buffer slice.")
445+
raise IOError(f"Could not initialize metadata from buffer: {e}") from e
446+
407447
# --- Properties and Context Manager ---
408448

409449
@property

src/ml_flashpoint/checkpoint_object_manager/buffer_object/bindings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ PYBIND11_MODULE(buffer_object_ext, m) {
5353
"Gets the buffer object capacity.",
5454
py::call_guard<py::gil_scoped_release>())
5555

56+
.def("resize", &BufferObject::resize, "Resizes the buffer object.",
57+
py::arg("new_capacity"), py::call_guard<py::gil_scoped_release>())
58+
5659
.def(
5760
"get_data_ptr",
5861
[](const BufferObject& self) -> std::uintptr_t {

src/ml_flashpoint/checkpoint_object_manager/buffer_object/buffer_helper.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,5 +270,48 @@ absl::Status unmap_and_close(int fd, void* data_ptr, size_t data_size,
270270
return absl::OkStatus();
271271
}
272272

273+
absl::Status resize_mmap(int fd, size_t new_size, void*& data_ptr,
274+
size_t& curr_size) {
275+
// @brief Resizes the memory map.
276+
//
277+
// This function assumes `data_ptr` is currently mapped with
278+
// `curr_size`. It unmaps it, ftruncates the file, and remaps it with
279+
// `new_size`.
280+
281+
if (new_size == curr_size) {
282+
return absl::OkStatus();
283+
}
284+
if (fd == -1) {
285+
return absl::InvalidArgumentError("Invalid file descriptor for resize.");
286+
}
287+
if (data_ptr == MAP_FAILED || curr_size == 0) {
288+
return absl::InvalidArgumentError(
289+
"Invalid data pointer or size for resize.");
290+
}
291+
292+
// 1. Unmap existing
293+
if (munmap(data_ptr, curr_size) == -1) {
294+
return ErrnoToStatus("munmap() failed during resize");
295+
}
296+
data_ptr = nullptr;
297+
298+
// 2. Truncate
299+
if (ftruncate(fd, new_size) == -1) {
300+
return ErrnoToStatus("ftruncate() failed during resize");
301+
}
302+
303+
// 3. Mmap new size
304+
void* ptr = mmap(NULL, new_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
305+
if (ptr == MAP_FAILED) {
306+
return ErrnoToStatus("mmap() failed during resize");
307+
}
308+
309+
data_ptr = ptr;
310+
curr_size = new_size;
311+
312+
LOG(INFO) << "Successfully resized mmap to " << new_size;
313+
return absl::OkStatus();
314+
}
315+
273316
} // namespace
274317
// ml_flashpoint::checkpoint_object_manager::buffer_object::internal

src/ml_flashpoint/checkpoint_object_manager/buffer_object/buffer_helper.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ absl::Status open_file_and_mmap_ro(const std::string& object_id, int& out_fd,
4545
// Unmaps memory, optionally truncates the file, and closes the file descriptor.
4646
absl::Status unmap_and_close(int fd, void* data_ptr, size_t data_size,
4747
std::optional<size_t> truncate_size);
48+
// Resizes the file to `new_size` and remaps it into memory,
49+
// updating `data_ptr` to point to the resized memory buffer.
50+
// If new_size == curr_size, this is a no-op and `data_ptr` is returned as is.
51+
absl::Status resize_mmap(int fd, size_t new_size, void*& data_ptr,
52+
size_t& curr_size);
4853

4954
}; // namespace
5055
// ml_flashpoint::checkpoint_object_manager::buffer_object::internal

src/ml_flashpoint/checkpoint_object_manager/buffer_object/buffer_object.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,4 +172,27 @@ void BufferObject::close(std::optional<size_t> truncate_size) noexcept {
172172
fd_ = -1;
173173
data_ptr_ = nullptr;
174174
capacity_ = 0;
175+
}
176+
177+
void BufferObject::resize(size_t new_capacity) {
178+
if (is_closed()) {
179+
throw std::runtime_error("Cannot resize a closed buffer.");
180+
}
181+
if (is_readonly()) {
182+
throw std::runtime_error("Cannot resize a read-only buffer.");
183+
}
184+
if (new_capacity == 0) {
185+
throw std::runtime_error("Cannot resize buffer to 0.");
186+
}
187+
188+
LOG(INFO) << "BufferObject::resize: Resizing object_id=" << object_id_
189+
<< " from " << capacity_ << " to " << new_capacity;
190+
191+
absl::Status status =
192+
ml_flashpoint::checkpoint_object_manager::buffer_object::internal::
193+
resize_mmap(fd_, new_capacity, data_ptr_, capacity_);
194+
195+
if (!status.ok()) {
196+
throw std::runtime_error(status.ToString());
197+
}
175198
}

src/ml_flashpoint/checkpoint_object_manager/buffer_object/buffer_object.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ class BufferObject {
7171
// close the buffer object, optionally truncates the file before closing
7272
void close(std::optional<size_t> truncate_size = std::nullopt) noexcept;
7373

74+
// Resizes the buffer to the new capacity.
75+
void resize(size_t new_capacity);
76+
7477
private:
7578
std::string object_id_; // The unique identifier of the buffer, typically the
7679
// filename.

tests/checkpoint_object_manager/buffer_object/buffer_helper_test.cpp

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,5 +611,120 @@ TEST_F(BufferHelperTest, UnmapAndCloseFailsWithValidPtrAndZeroSize) {
611611
ASSERT_EQ(errno, EBADF);
612612
}
613613

614+
// --- Tests for resize_mmap ---
615+
616+
// Verifies that resizing to a larger size succeeds and preserves data.
617+
struct ResizeParams {
618+
size_t initial_size;
619+
size_t new_size;
620+
};
621+
622+
class ResizeMmapCombinedTest
623+
: public BufferHelperTest,
624+
public ::testing::WithParamInterface<ResizeParams> {};
625+
626+
TEST_P(ResizeMmapCombinedTest, ResizeMmapSucceeds) {
627+
// Given
628+
const ResizeParams& params = GetParam();
629+
const size_t initial_size = params.initial_size;
630+
const size_t new_size = params.new_size;
631+
const std::string content = "Some data to preserve";
632+
633+
int fd = -1;
634+
size_t out_size = 0;
635+
void* ptr = MAP_FAILED;
636+
absl::Status status = create_file_and_mmap(test_path_, initial_size, fd,
637+
out_size, ptr, /*overwrite=*/true);
638+
ASSERT_TRUE(status.ok());
639+
640+
// Ensure content fits in initial size
641+
ASSERT_LE(content.size(), initial_size);
642+
std::memcpy(ptr, content.c_str(), content.size());
643+
644+
// When
645+
status = resize_mmap(fd, new_size, ptr, out_size);
646+
647+
// Then
648+
ASSERT_TRUE(status.ok());
649+
EXPECT_EQ(out_size, new_size);
650+
EXPECT_NE(ptr, nullptr);
651+
EXPECT_NE(ptr, MAP_FAILED);
652+
653+
// Verify content preserved (up to the new size)
654+
size_t expected_len = std::min(content.size(), new_size);
655+
EXPECT_EQ(std::memcmp(ptr, content.c_str(), expected_len), 0);
656+
657+
// Cleanup
658+
SafeUnmapAndClose(fd, ptr, out_size);
659+
}
660+
661+
INSTANTIATE_TEST_SUITE_P(
662+
ResizeMmapCombinedTests, ResizeMmapCombinedTest,
663+
::testing::Values(
664+
// Larger
665+
ResizeParams{1024, 2048}, // Aligned -> Aligned
666+
ResizeParams{1024, 2011}, // Aligned -> Unaligned
667+
ResizeParams{1025, 2048}, // Unaligned -> Aligned
668+
ResizeParams{1025, 2011}, // Unaligned -> Unaligned
669+
ResizeParams{4096, 8192}, // Page -> Page
670+
ResizeParams{4096, 4097}, // Page -> Unaligned
671+
// Smaller
672+
ResizeParams{2048, 1024}, // Aligned -> Aligned
673+
ResizeParams{2048, 1011}, // Aligned -> Unaligned
674+
ResizeParams{2049, 1024}, // Unaligned -> Aligned
675+
ResizeParams{2049, 1011}, // Unaligned -> Unaligned
676+
ResizeParams{8192, 4096}, // Page -> Page
677+
ResizeParams{8192, 4097}, // Page -> Unaligned
678+
// Same
679+
ResizeParams{1024, 1024}, // Aligned -> Aligned (Same)
680+
ResizeParams{4096, 4096}, // Page -> Page (Same)
681+
ResizeParams{1025, 1025} // Unaligned -> Unaligned (Same)
682+
));
683+
684+
// Verifies that resize fails with invalid fd
685+
TEST_F(BufferHelperTest, ResizeMmapFailsOnInvalidFd) {
686+
// Given
687+
void* ptr = nullptr;
688+
size_t size = 1024;
689+
690+
// When
691+
absl::Status status = resize_mmap(-1, 2048, ptr, size);
692+
693+
// Then
694+
EXPECT_FALSE(status.ok());
695+
EXPECT_EQ(status.code(), absl::StatusCode::kInvalidArgument);
696+
}
697+
698+
// Verifies failure when ftruncate fails (e.g., read-only fd)
699+
TEST_F(BufferHelperTest, ResizeMmapFailsOnFtruncateFailure) {
700+
// Given
701+
CreateEmptyFile(test_path_);
702+
CreateFileWithContent(test_path_, "data");
703+
704+
// Open as Read-Only
705+
int fd = open(test_path_.c_str(), O_RDONLY);
706+
ASSERT_NE(fd, -1);
707+
708+
// Map it (read-only map)
709+
struct stat sb;
710+
fstat(fd, &sb);
711+
size_t size = sb.st_size;
712+
void* ptr = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
713+
ASSERT_NE(ptr, MAP_FAILED);
714+
715+
// When
716+
absl::Status status = resize_mmap(fd, size * 2, ptr, size);
717+
718+
// Then
719+
EXPECT_FALSE(status.ok());
720+
EXPECT_THAT(status.message(), testing::HasSubstr("ftruncate() failed"));
721+
722+
// Cleanup
723+
if (ptr != nullptr && ptr != MAP_FAILED) {
724+
munmap(ptr, size);
725+
}
726+
close(fd);
727+
}
728+
614729
} // namespace
615730
// ml_flashpoint::checkpoint_object_manager::buffer_object::internal

tests/checkpoint_object_manager/buffer_object/buffer_object_test.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,3 +407,60 @@ TEST_F(BufferObjectTest, MoveAssignmentToSelfIsSafe) {
407407
EXPECT_EQ(obj.get_id(), original_id);
408408
EXPECT_EQ(obj.is_readonly(), original_readonly);
409409
}
410+
// --- Tests for resize ---
411+
412+
TEST_F(BufferObjectTest, ResizeSucceeds) {
413+
const size_t initial_capacity = 1024;
414+
const size_t new_capacity = 2048;
415+
416+
BufferObject buffer(test_path_, initial_capacity);
417+
ASSERT_EQ(buffer.get_capacity(), initial_capacity);
418+
419+
ASSERT_NO_THROW(buffer.resize(new_capacity));
420+
EXPECT_EQ(buffer.get_capacity(), new_capacity);
421+
EXPECT_FALSE(buffer.is_closed());
422+
423+
// Verify size on disk
424+
EXPECT_EQ(std::filesystem::file_size(test_path_), new_capacity);
425+
}
426+
427+
TEST_F(BufferObjectTest, ResizePreservesData) {
428+
const size_t initial_capacity = 1024;
429+
const size_t new_capacity = 2048;
430+
const std::string content = "Important Data";
431+
432+
BufferObject buffer(test_path_, initial_capacity);
433+
std::memcpy(buffer.get_data_ptr(), content.c_str(), content.size());
434+
435+
buffer.resize(new_capacity);
436+
437+
// Verify data is still there
438+
EXPECT_EQ(std::memcmp(buffer.get_data_ptr(), content.c_str(), content.size()),
439+
0);
440+
}
441+
442+
TEST_F(BufferObjectTest, ResizeFailsOnClosedBuffer) {
443+
BufferObject buffer(test_path_, 1024);
444+
buffer.close();
445+
ASSERT_TRUE(buffer.is_closed());
446+
447+
ASSERT_THROW(buffer.resize(2048), std::runtime_error);
448+
}
449+
450+
TEST_F(BufferObjectTest, ResizeFailsOnReadOnlyBuffer) {
451+
// Create file
452+
{
453+
BufferObject writer(test_path_, 1024);
454+
}
455+
456+
// Open RO
457+
BufferObject reader(test_path_);
458+
ASSERT_TRUE(reader.is_readonly());
459+
460+
ASSERT_THROW(reader.resize(2048), std::runtime_error);
461+
}
462+
463+
TEST_F(BufferObjectTest, ResizeFailsOnZeroCapacity) {
464+
BufferObject buffer(test_path_, 1024);
465+
ASSERT_THROW(buffer.resize(0), std::runtime_error);
466+
}

0 commit comments

Comments
 (0)