Feat/disk based sorting#3514
Conversation
…lize with a lightweight + heavy getNextRGData
…ght + heavy getNextRGData
bddc3f4 to
6f09dec
Compare
6f09dec to
929289e
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a disk-based sorting mechanism, which is a significant feature for handling large datasets. The overall approach of spilling to disk when an OutOfMemoryExcept is caught is sound. The changes are spread across several files, introducing new classes like DiskBasedTopNOrderBy, Dumper, and MemManager, and modifying existing ones like LimitedOrderBy and TupleAnnexStep.
However, the implementation appears to be incomplete. The merge phase for the on-disk sorted runs is stubbed out, and some new functions contain placeholder logic and debugging statements. There are also several critical issues, including a syntax error that will prevent compilation and a potential integer underflow bug. Additionally, there are some code quality issues like public member variables that should be private and risky raw pointer ownership transfers.
I've provided specific comments on these issues. The PR needs further work to complete the feature and address the identified problems before it can be merged.
| return res; | ||
|
|
||
|
|
||
| } // namespace joblist No newline at end of file |
There was a problem hiding this comment.
This closing brace appears to close the getGenerationFileNamesNextBatch function, but the comment suggests it's for the joblist namespace. The joblist namespace, opened on line 22, is missing its closing brace. This will cause a compilation error.
| } // namespace joblist | |
| } | |
| } // namespace joblist |
| auto totalNumberOfFilesYetToMerge = getGenerationFilesNumber() - batchSize; | ||
| auto batchSizeOrFilesLeftNumber = std::max(getGenerationFilesNumber(), batchSize); | ||
| auto actualBatchSize = std::min(totalNumberOfFilesYetToMerge, batchSizeOrFilesLeftNumber); |
There was a problem hiding this comment.
This logic for calculating actualBatchSize is confusing and contains a bug. totalNumberOfFilesYetToMerge will underflow if getGenerationFilesNumber() is less than batchSize. Given that getGenerationFilesNumber() is a stub returning 0, this is a definite issue. Please revise this logic for correctness and clarity. A safer way to calculate totalNumberOfFilesYetToMerge would be size_t totalNumberOfFilesYetToMerge = (num_files > batchSize) ? num_files - batchSize : 0;. The overall logic might need simplification depending on the intended behavior.
| while (inputQueuesNumber < fOrderBy->getGenerationFilesNumber()) | ||
| { | ||
| auto fileNames = fOrderBy->getGenerationFileNamesNextBatch(inputQueuesNumber); | ||
| auto inputDLs = createInputDLs(fileNames.size()); | ||
| auto readers = startReaders(inputDLs, fileNames); | ||
| // create outputDLs or simplier atomic queues + readers threads | ||
| fOrderBy->diskBasedMergePhaseIfNeeded(inputDLs); | ||
| jobstepThreadPool.join(readers); | ||
| } |
There was a problem hiding this comment.
The disk-based merge logic appears to be incomplete. The loop while (inputQueuesNumber < fOrderBy->getGenerationFilesNumber()) will never execute because getGenerationFilesNumber() is a stub that returns 0. The body of the loop also consists of stubs and comments. This logic needs to be fully implemented for the feature to work correctly.
| Dumper(const compress::CompressInterface* comp, MemManager* mm) | ||
| : fCompressor(comp), fMM(std::unique_ptr<MemManager>(mm)) {} |
There was a problem hiding this comment.
This constructor takes a raw pointer MemManager* mm and creates a std::unique_ptr from it, which implies ownership transfer. This is risky as the caller might not be aware of this and could attempt to delete the pointer, leading to a double free. To make ownership transfer explicit and safer, consider removing this constructor and only using the one that accepts a std::unique_ptr.
| // private: | ||
| uint64_t fGenerationCounter{0}; | ||
| std::queue<uint64_t> fGenerations; |
There was a problem hiding this comment.
These member variables are currently public, but the commented-out // private: suggests they are intended to be private. For better encapsulation, they should be made private.
| // private: | |
| uint64_t fGenerationCounter{0}; | |
| std::queue<uint64_t> fGenerations; | |
| private: | |
| uint64_t fGenerationCounter{0}; | |
| std::queue<uint64_t> fGenerations; |
| std::cout << "getNextRGData rowsToRetrieve " << rowsToRetrieve << " orderedRowsQueue.size() " | ||
| << orderedRowsQueue.size() << std::endl; | ||
| std::cout << "getNextRGData rowsToRetrieveFromQueue " << rowsToRetrieveFromQueue << std::endl; |
| #include <fcntl.h> | ||
| #include <string> | ||
|
|
||
| #include <string.h> |
| void release(ssize_t amount = 0) | ||
| { | ||
| // in some cases it tries to release more memory than acquired, ie create | ||
| // new rowgroup, acquire maximum size (w/o strings), add some rows with | ||
| // strings and finally release the actual size of RG with strings | ||
| if (amount == 0 || amount > fMemUsed) | ||
| amount = fMemUsed; | ||
| releaseImpl(amount); | ||
| } |
There was a problem hiding this comment.
| } | ||
|
|
||
| private: | ||
| // private: |
| std::unique_ptr<SortingPQ> fOrderByQueue = nullptr; | ||
|
|
||
| protected: | ||
| // protected: |
No description provided.