Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ struct TEvPQ {
EvRunCompaction,
EvMirrorTopicDescription,
EvBroadcastPartitionError,
EvForceCompaction,
EvEnd
};

Expand Down Expand Up @@ -1278,6 +1279,15 @@ struct TEvPQ {

ui64 BlobsCount = 0;
};

struct TEvForceCompaction : TEventLocal<TEvForceCompaction, EvForceCompaction> {
explicit TEvForceCompaction(const ui32 partitionId) :
PartitionId(partitionId)
{
}

ui32 PartitionId = 0;
};
};

} //NKikimr
44 changes: 0 additions & 44 deletions ydb/core/persqueue/pqtablet/partition/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2966,50 +2966,6 @@ void TPartition::CommitWriteOperations(TTransaction& t)
BlobEncoder.NewHead.Offset = Parameters->CurOffset;
}

//if (!t.WriteInfo->BlobsFromHead.empty()) {
// auto& first = t.WriteInfo->BlobsFromHead.front();
// // In one operation, a partition can write blocks of several transactions. Some of them can be broken down
// // into parts. We need to take this division into account.
// BlobEncoder.NewHead.PartNo += first.GetPartNo();

// Parameters->HeadCleared = Parameters->HeadCleared || !t.WriteInfo->BodyKeys.empty();

// BlobEncoder.NewPartitionedBlob(Partition,
// BlobEncoder.NewHead.Offset,
// first.SourceId,
// first.SeqNo,
// first.GetTotalParts(),
// first.GetTotalSize(),
// Parameters->HeadCleared, // headCleared
// false, // needCompactHead
// MaxBlobSize,
// first.GetPartNo());

// for (auto& blob : t.WriteInfo->BlobsFromHead) {
// TWriteMsg msg{Max<ui64>(), Nothing(), TEvPQ::TEvWrite::TMsg{
// .SourceId = blob.SourceId,
// .SeqNo = blob.SeqNo,
// .PartNo = (ui16)(blob.PartData ? blob.PartData->PartNo : 0),
// .TotalParts = (ui16)(blob.PartData ? blob.PartData->TotalParts : 1),
// .TotalSize = (ui32)(blob.PartData ? blob.PartData->TotalSize : blob.UncompressedSize),
// .CreateTimestamp = blob.CreateTimestamp.MilliSeconds(),
// .ReceiveTimestamp = blob.CreateTimestamp.MilliSeconds(),
// .DisableDeduplication = false,
// .WriteTimestamp = blob.WriteTimestamp.MilliSeconds(),
// .Data = blob.Data,
// .UncompressedSize = blob.UncompressedSize,
// .PartitionKey = blob.PartitionKey,
// .ExplicitHashKey = blob.ExplicitHashKey,
// .External = false,
// .IgnoreQuotaDeadline = true,
// .HeartbeatVersion = std::nullopt,
// }, std::nullopt};
// msg.Internal = true;

// WriteInflightSize += msg.Msg.Data.size();
// ExecRequest(msg, *Parameters, PersistRequest.Get());
// }
//}
for (const auto& [srcId, info] : t.WriteInfo->SrcIdInfo) {
auto& sourceIdBatch = Parameters->SourceIdBatch;
auto sourceId = sourceIdBatch.GetSource(srcId);
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/persqueue/pqtablet/partition/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class TPartition : public TBaseActor<TPartition> {
void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvRunCompaction::TPtr& ev);
void Handle(TEvPQ::TEvForceCompaction::TPtr& ev);
void Handle(TEvPQ::TEvExclusiveLockAcquired::TPtr& ev);
void Handle(TEvPQ::TBroadcastPartitionError::TPtr& ev, const TActorContext& ctx);
void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -598,6 +599,7 @@ class TPartition : public TBaseActor<TPartition> {
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
default:
if (!Initializer.Handle(ev)) {
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
Expand Down Expand Up @@ -666,6 +668,7 @@ class TPartition : public TBaseActor<TPartition> {
HFuncTraced(TEvPQ::TEvDeletePartition, Handle);
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
default:
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
break;
Expand Down Expand Up @@ -1118,7 +1121,7 @@ class TPartition : public TBaseActor<TPartition> {
const TEvPQ::TEvBlobResponse* blobResponse,
const TActorContext& ctx);

void TryRunCompaction();
void TryRunCompaction(bool force = false);
void BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>& blobs);
void BlobsForCompactionWereWrite();
ui64 NextReadCookie();
Expand Down
12 changes: 9 additions & 3 deletions ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void TPartition::DumpKeysForBlobsCompaction() const
LOG_D("===================================");
}

void TPartition::TryRunCompaction()
void TPartition::TryRunCompaction(bool force)
{
if (StopCompaction) {
LOG_D("Blobs compaction is stopped");
Expand All @@ -186,7 +186,7 @@ void TPartition::TryRunCompaction()
const ui64 blobsKeyCountLimit = GetBodyKeysCountLimit();
const ui64 compactedBlobSizeLowerBound = GetCompactedBlobSizeLowerBound();

if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit())) {
if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit()) && !force) {
LOG_D("No data for blobs compaction");
return;
}
Expand All @@ -207,13 +207,19 @@ void TPartition::TryRunCompaction()
LOG_D("Blob key for rename " << k.Key.ToString());
}
}

LOG_D(blobsCount << " keys were taken away. Let's read " << blobsSize << " bytes");

CompactionInProgress = true;

Send(SelfId(), new TEvPQ::TEvRunCompaction(blobsCount));
}

void TPartition::Handle(TEvPQ::TEvForceCompaction::TPtr&)
{
TryRunCompaction(true);
}

void TPartition::Handle(TEvPQ::TEvRunCompaction::TPtr& ev)
{
const ui64 blobsCount = ev->Get()->BlobsCount;
Expand Down Expand Up @@ -337,7 +343,7 @@ void TPartition::RenameCompactedBlob(TDataKey& k,

if (!CompactionBlobEncoder.PartitionedBlob.IsInited()) {
CompactionBlobEncoder.NewPartitionedBlob(Partition,
CompactionBlobEncoder.NewHead.Offset,
parameters.CurOffset,
"", // SourceId
0, // SeqNo
0, // TotalParts
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pqtablet/partition/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,8 @@ void TInitDataRangeStep::FormHeadAndProceed() {

cz.Head.Offset = headKey.GetOffset();
cz.Head.PartNo = headKey.GetPartNo();

Partition()->WasTheLastBlobBig = false;
}

// FastWrite Body
Expand Down
18 changes: 18 additions & 0 deletions ydb/core/persqueue/pqtablet/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5238,6 +5238,23 @@ void TPersQueue::ProcessPendingEvents()
}
}

void TPersQueue::Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("TPersQueue::Handle(TEvPQ::TEvForceCompaction)");

const auto& event = *ev->Get();
const TPartitionId partitionId(event.PartitionId);

if (!Partitions.contains(partitionId)) {
PQ_LOG_D("Unknown partition id " << event.PartitionId);
return;
}

auto p = Partitions.find(partitionId);
ctx.Send(p->second.Actor,
new TEvPQ::TEvForceCompaction(event.PartitionId));
}

bool TPersQueue::HandleHook(STFUNC_SIG)
{
TRACE_EVENT(NKikimrServices::PERSQUEUE);
Expand Down Expand Up @@ -5285,6 +5302,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle);
HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle);
HFuncTraced(TEvPQ::TEvForceCompaction, Handle);
default:
return false;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/pqtablet/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {

void ResendSplitMergeRequests(const TActorContext& ctx);

void Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx);

TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl;
NWilson::TSpan WriteTxsSpan;
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/persqueue/ut/common/pq_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1250,4 +1250,19 @@ THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(T
return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(TDuration::Seconds(2));
}

void CmdRunCompaction(TTestActorRuntime& runtime,
ui64 tabletId,
const TActorId& sender,
const ui32 partition)
{
auto event = MakeHolder<TEvPQ::TEvForceCompaction>(partition);
runtime.SendToPipe(tabletId, sender, event.Release(), 0, GetPipeConfigWithRetries());
}

void CmdRunCompaction(const ui32 partition,
TTestContext& tc)
{
CmdRunCompaction(*tc.Runtime, tc.TabletId, tc.Edge, partition);
}

} // namespace NKikimr::NPQ
7 changes: 7 additions & 0 deletions ydb/core/persqueue/ut/common/pq_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,13 @@ struct TCmdWriteOptions {
};
void CmdWrite(const TCmdWriteOptions&);

void CmdRunCompaction(TTestActorRuntime& runtime,
ui64 tabletId,
const TActorId& sender,
const ui32 partition);
void CmdRunCompaction(const ui32 partition,
TTestContext& tc);

THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId);

} // namespace NKikimr::NPQ
46 changes: 46 additions & 0 deletions ydb/core/persqueue/ut/pq_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,52 @@ TMaybe<ui64> PQGetStartOffset(TTestContext& tc)
return Nothing();
}

Y_UNIT_TEST(TestCompaction) {
TTestContext tc;
tc.EnableDetailedPQLog = true;
RunTestWithReboots(tc.TabletIds, [&]() {
return tc.InitialEventsFilter.Prepare();
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
activeZone = false;
TFinalizer finalizer(tc);
tc.Prepare(dispatchName, setup, activeZone);
activeZone = false;
tc.Runtime->SetScheduledLimit(1000);

ui32 sourceIdx = 0;
auto cmdWrite = [&](const TVector<size_t>& sizes) {
TVector<std::pair<ui64, TString>> data;
for (size_t k = 1; k <= sizes.size(); ++k) {
data.emplace_back(k, TString(sizes[k - 1], 'x'));
}
TString sourceId = "sourceid_" + ToString(sourceIdx++);
CmdWrite(0, sourceId, data, tc, false, {}, false, "", -1, -1, false, false, true);
};
auto cmdCompaction = [&]() {
CmdRunCompaction(0, tc);
};

PQTabletPrepare({.partitions = 1, .writeSpeed = 50_MB}, {{"user1", true}}, tc);

cmdWrite({17400_KB});
cmdCompaction();

cmdWrite({16800_KB});
cmdCompaction();

PQTabletRestart(tc);

cmdWrite({7000_KB, 13300_KB});
cmdCompaction();

cmdWrite({1_KB});

PQTabletRestart(tc);

PQGetPartInfo(0, 4 + 1, tc);
});
}

Y_UNIT_TEST(TestCmdReadWithLastOffset) {
TTestContext tc;
tc.EnableDetailedPQLog = true;
Expand Down
Loading