Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbcon/joblist/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ set(joblist_LIB_SRCS
jobstepassociation.cpp
lbidlist.cpp
limitedorderby.cpp
disk-based-topnorderby.cpp
passthrucommand-jl.cpp
passthrustep.cpp
pcolscan.cpp
Expand Down
72 changes: 72 additions & 0 deletions dbcon/joblist/disk-based-topnorderby.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/* Copyright (C) 2025 MariaDB Corp.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */

#include <vector>

#include "dumper.h"
#include "disk-based-topnorderby.h"
namespace joblist
{

// The caller ensures lifetime of dl and rg
void DiskBasedTopNOrderBy::flushCurrentToDisk(RowGroupDL& dl, rowgroup::RowGroup rg, const size_t numberOfRGs, const bool firstFlush)
{
size_t rgid = (firstFlush) ? numberOfRGs : 0;
rowgroup::RGData rgData;

size_t generation = (firstFlush) ? getGenerationCounter() : 0; // WIP

bool more = dl.next(0, &rgData);
while (more)
{
saveRG(rgid, generation, rg, &rgData);
more = dl.next(0, &rgData);
rgid = (firstFlush) ? rgid - 1 : rgid + 1;
}

if (firstFlush)
{
incrementGenerationCounter();
}
else
{

}
}
void DiskBasedTopNOrderBy::diskBasedMergePhaseIfNeeded(std::vector<RowGroupDLSPtr>& /*dataLists*/)
{
}

std::vector<std::string> DiskBasedTopNOrderBy::getGenerationFileNamesNextBatch(const size_t batchSize)
{
// assert(getGenerationFilesNumber() > batchSize);
auto totalNumberOfFilesYetToMerge = getGenerationFilesNumber() - batchSize;
auto batchSizeOrFilesLeftNumber = std::max(getGenerationFilesNumber(), batchSize);
auto actualBatchSize = std::min(totalNumberOfFilesYetToMerge, batchSizeOrFilesLeftNumber);
Comment on lines +57 to +59

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This logic for calculating actualBatchSize is confusing and contains a bug. totalNumberOfFilesYetToMerge will underflow if getGenerationFilesNumber() is less than batchSize. Given that getGenerationFilesNumber() is a stub returning 0, this is a definite issue. Please revise this logic for correctness and clarity. A safer way to calculate totalNumberOfFilesYetToMerge would be size_t totalNumberOfFilesYetToMerge = (num_files > batchSize) ? num_files - batchSize : 0;. The overall logic might need simplification depending on the intended behavior.

// add state for the starting offset + wraparound
size_t startOffset = 0;
std::vector<std::string> res;
res.reserve(actualBatchSize);
for (size_t i = 0; i < startOffset + actualBatchSize; ++i)
{
res.push_back(makeRGFilePrefix(i));
}

return res;


} // namespace joblist

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This closing brace appears to close the getGenerationFileNamesNextBatch function, but the comment suggests it's for the joblist namespace. The joblist namespace, opened on line 22, is missing its closing brace. This will cause a compilation error.

Suggested change
} // namespace joblist
}
} // namespace joblist

77 changes: 77 additions & 0 deletions dbcon/joblist/disk-based-topnorderby.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/* Copyright (C) 2025 MariaDB Corp.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */

#pragma once

#include <cstdint>
#include <queue>
#include <string>
#include <vector>

#include "dumper.h"
#include "elementtype.h"
#include "resourcemanager.h"
namespace joblist
{

class DiskBasedTopNOrderBy : public rowgroup::RGDumper
{
// std::string fTmpDir =
// config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Aggregates);
// std::string fCompStr = config::Config::makeConfig()->getConfig("RowAggregation", "Compression");
public:
// TODO Parametrize compression, tmpdir and memory manager (can be temp)
DiskBasedTopNOrderBy(ResourceManager* /*rm*/)
: RGDumper(compress::getCompressInterfaceByName("LZ4"), std::make_unique<rowgroup::MemManager>(),
config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Sorting),
"Sorting", reinterpret_cast<std::uintptr_t>(this))
{
}
~DiskBasedTopNOrderBy() = default;

void incrementGenerationCounter()
{
++fGenerationCounter;
uint64_t newGeneration = (fGenerations.empty()) ? 1 : fGenerations.back() + 1;
fGenerations.push(newGeneration);
}
uint64_t getGenerationCounter() const
{
return (fGenerations.empty()) ? 0 : fGenerations.back();
}

bool isDiskBased() const
{
return fGenerationCounter > 0;
}

size_t getGenerationFilesNumber() const
{
return 0;
}
std::vector<std::string> getGenerationFileNamesNextBatch(const size_t batchSize);

// The caller ensures lifetime of dl and rg
void flushCurrentToDisk(RowGroupDL& dl, rowgroup::RowGroup rg, const size_t numberOfRGs, const bool firstFlush);
void diskBasedMergePhaseIfNeeded(std::vector<RowGroupDLSPtr>& dataLists);

// private:
uint64_t fGenerationCounter{0};
std::queue<uint64_t> fGenerations;
Comment on lines +72 to +74

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These member variables are currently public, but the commented-out // private: suggests they are intended to be private. For better encapsulation, they should be made private.

Suggested change
// private:
uint64_t fGenerationCounter{0};
std::queue<uint64_t> fGenerations;
private:
uint64_t fGenerationCounter{0};
std::queue<uint64_t> fGenerations;

};

} // namespace joblist
15 changes: 3 additions & 12 deletions dbcon/joblist/elementtype.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016-2025 MariaDB Corporation

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
Expand Down Expand Up @@ -238,16 +239,7 @@ extern std::istream& operator>>(std::istream& in, TupleType& rhs);
extern std::ostream& operator<<(std::ostream& out, const TupleType& rhs);
} // namespace joblist

#ifndef NO_DATALISTS

// #include "bandeddl.h"
// #include "wsdl.h"
#include "fifo.h"
// #include "bucketdl.h"
// #include "constantdatalist.h"
// #include "swsdl.h"
// #include "zdl.h"
// #include "deliverywsdl.h"

namespace joblist
{
Expand Down Expand Up @@ -327,7 +319,8 @@ typedef DataList<StringElementType> StrDataList;
// */
// typedef BucketDL<TupleType> TupleBucketDataList;

typedef FIFO<rowgroup::RGData> RowGroupDL;
using RowGroupDL = FIFO<rowgroup::RGData>;
using RowGroupDLSPtr = std::shared_ptr<RowGroupDL>;

} // namespace joblist

Expand Down Expand Up @@ -425,5 +418,3 @@ extern std::ostream& showOidInDL(std::ostream& strm);
extern std::ostream& omitOidInDL(std::ostream& strm);

} // namespace joblist

#endif
11 changes: 2 additions & 9 deletions dbcon/joblist/jlf_tuplejoblist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,12 +499,6 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,
deliverySteps[CNX_VTABLE_ID] = ws;
}

// TODO MCOL-894 we don't need to run sorting|distinct
// every time
// if ((jobInfo.limitCount != (uint64_t) - 1) ||
// (jobInfo.constantCol == CONST_COL_EXIST) ||
// (jobInfo.hasDistinct))
// {
if (jobInfo.annexStep.get() == NULL)
jobInfo.annexStep.reset(new TupleAnnexStep(jobInfo));

Expand All @@ -513,20 +507,19 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,

if (jobInfo.orderByColVec.size() > 0)
{
tas->addOrderBy(new LimitedOrderBy());
tas->addOrderBy(jobInfo.rm);
if (jobInfo.orderByThreads > 1)
tas->setParallelOp();
tas->setMaxThreads(jobInfo.orderByThreads);
}

// TODO decouple TCS from TNS
if (jobInfo.constantCol == CONST_COL_EXIST)
tas->addConstant(new TupleConstantStep(jobInfo));

if (jobInfo.hasDistinct)
tas->setDistinct();

// }

if (jobInfo.annexStep)
{
TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
Expand Down
Loading