Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions ydb/core/tx/columnshard/columnshard__locks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,31 @@ class TAbortWriteLockTransaction: public NTabletFlatExecutor::TTransactionBase<T
private:
using TBase = NTabletFlatExecutor::TTransactionBase<TColumnShard>;

bool IsLockAborted() const {
auto lock = Self->GetOperationsManager().GetLockOptional(LockId);
return lock->IsAborted();
}

public:
TAbortWriteLockTransaction(TColumnShard* self, const ui64 lockId)
: TBase(self)
, LockId(lockId) {
}

virtual bool Execute(TTransactionContext& txc, const TActorContext&) override {
if (!IsLockAborted()) {
return true;
}

Self->GetOperationsManager().AbortLockOnExecute(*Self, LockId, txc);
return true;
}

virtual void Complete(const TActorContext&) override {
if (!IsLockAborted()) {
return;
}

Self->GetOperationsManager().AbortLockOnComplete(*Self, LockId);
}

Expand Down Expand Up @@ -63,4 +76,34 @@ void TColumnShard::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr&
}
}

bool TColumnShard::IsLocksMemoryLimitExceeded() const {
const auto inFlightLocksRangesBytes = NOlap::TPKRangeFilter::GetFiltersTotalMemorySize();
const auto inFlightLocksRangesBytesLimit = AppDataVerified().ColumnShardConfig.GetInFlightLocksRangesBytesLimit();
if (inFlightLocksRangesBytes > inFlightLocksRangesBytesLimit) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "In flight locks ranges bytes limit exceeded")
("inFlightLocksRangesBytes", inFlightLocksRangesBytes)
("inFlightLocksRangesBytesLimit", inFlightLocksRangesBytesLimit);

return true;
}

return false;
}

void TColumnShard::DeleteLock(const ui64 lockId) {
if (auto lock = OperationsManager->GetLockOptional(lockId); lock) {
lock->SetDeleted();
MaybeCleanupLock(lockId);
}
}

bool TColumnShard::IsLockDeleted(const ui64 lockId) const {
if (auto lock = OperationsManager->GetLockOptional(lockId); lock && lock->IsDeleted()) {
return true;
}

return false;
}


} // namespace NKikimr::NColumnShard
14 changes: 0 additions & 14 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,20 +392,6 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
}
}

const auto inFlightLocksRangesBytes = NOlap::TPKRangeFilter::GetFiltersTotalMemorySize();
const ui64 inFlightLocksRangesBytesLimit = AppDataVerified().ColumnShardConfig.GetInFlightLocksRangesBytesLimit();
if (behaviour == EOperationBehaviour::WriteWithLock && inFlightLocksRangesBytes > inFlightLocksRangesBytesLimit) {
if (auto lock = OperationsManager->GetLockOptional(record.GetLockTxId()); lock) {
lock->SetDeleted();
MaybeCleanupLock(record.GetLockTxId());
}
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "In flight locks ranges bytes limit exceeded")
("inFlightLocksRangesBytes", inFlightLocksRangesBytes)
("inFlightLocksRangesBytesLimit", inFlightLocksRangesBytesLimit);
sendError("overloaded by in flight locks ranges memory limit", NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
return;
}

if (behaviour == EOperationBehaviour::CommitWriteLock) {
auto commitOperation = std::make_shared<TCommitOperation>(TabletID());
auto conclusionParse = commitOperation->Parse(*ev->Get());
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ void TColumnShard::EnqueueBackgroundActivities(const bool periodic) {
// !!!!!! MUST BE FIRST THROUGH DATA HAVE TO BE SAME IN SESSIONS AFTER TABLET RESTART
SharingSessionsManager->Start(*this);

OperationsManager->DebugInfo();

SetupCompaction({});
SetupCleanupSchemas();
SetupCleanupPortions();
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
EOverloadStatus CheckOverloadedImmediate(const TInternalPathId tableId) const;
EOverloadStatus CheckOverloadedWait(const TInternalPathId tableId) const;

bool IsLocksMemoryLimitExceeded() const;
void DeleteLock(const ui64 lockId);
bool IsLockDeleted(const ui64 lockId) const;

protected:
STFUNC(StateInit) {
TRACE_EVENT(NKikimrServices::TX_COLUMNSHARD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ void TTxInternalScan::Complete(const TActorContext& ctx) {
TMemoryProfileGuard mpg("TTxInternalScan::Complete");

auto& request = *InternalScanEvent->Get();
auto lockId = request.GetLockId();

if (lockId.has_value() && Self->IsLocksMemoryLimitExceeded()) {
Self->DeleteLock(lockId.value());

SendError("Overloaded", "flight locks ranges memory limit exceeded", ctx);
return;
}

if (lockId.has_value()) {
if (Self->IsLockDeleted(lockId.value())) {
SendError("Lock invalidated", "lock is already delted", ctx);
return;
}
}

auto scanComputeActor = InternalScanEvent->Sender;
const TSnapshot snapshot = request.GetSnapshot();
const NActors::TLogContextGuard gLogging =
Expand All @@ -52,7 +68,7 @@ void TTxInternalScan::Complete(const TActorContext& ctx) {
read.TableMetadataAccessor = accConclusion.DetachResult();
}
}
read.LockId = request.GetLockId();
read.LockId = lockId;
read.DeduplicationPolicy = EDeduplicationPolicy::PREVENT_DUPLICATES;
std::unique_ptr<IScannerConstructor> scannerConstructor(new NPlain::TIndexScannerConstructor(context));
read.ColumnIds = request.GetColumnIds();
Expand Down Expand Up @@ -90,7 +106,7 @@ void TTxInternalScan::Complete(const TActorContext& ctx) {

const ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(readMetadataRange, index);
auto scanActorId = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(),
Self->DataAccessorsManager.GetObjectPtrVerified(), Self->ColumnDataManager.GetObjectPtrVerified(), TComputeShardingPolicy(), ScanId, request.GetLockId().value_or(0), ScanGen, requestCookie,
Self->DataAccessorsManager.GetObjectPtrVerified(), Self->ColumnDataManager.GetObjectPtrVerified(), TComputeShardingPolicy(), ScanId, lockId.value_or(0), ScanGen, requestCookie,
Self->TabletID(), TDuration::Max(), readMetadataRange, NKikimrDataEvents::FORMAT_ARROW, Self->Counters.GetScanCounters(), {}));

Self->InFlightReadsTracker.AddScanActorId(requestCookie, scanActorId);
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ bool TTxScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/
void TTxScan::Complete(const TActorContext& ctx) {
TMemoryProfileGuard mpg("TTxScan::Complete");
auto& request = Ev->Get()->Record;
auto lockId = request.GetLockTxId();

if (Self->IsLocksMemoryLimitExceeded()) {
Self->DeleteLock(lockId);

SendError("Overloaded", "flight locks ranges memory limit exceeded", ctx);
return;
}

if (Self->IsLockDeleted(lockId)) {
SendError("Lock invalidated", "lock is already delted", ctx);
return;
}

auto scanComputeActor = Ev->Sender;
TSnapshot snapshot = TSnapshot(request.GetSnapshot().GetStep(), request.GetSnapshot().GetTxId());
if (snapshot.IsZero()) {
Expand Down
16 changes: 15 additions & 1 deletion ydb/core/tx/columnshard/operations/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class TLockSharingInfo: TMoveOnly {
std::atomic<bool> Broken = false;
std::atomic<bool> Writes = false;
std::atomic<LockState> State{LockState::Created};
std::atomic<bool> IsExplicitlyDeleted = false;
friend class TLockFeatures;

public:
Expand Down Expand Up @@ -54,7 +55,7 @@ class TLockSharingInfo: TMoveOnly {
}

bool IsDeleted() const {
return State == LockState::Deleted || State == LockState::Aborted;
return IsExplicitlyDeleted;
}

bool IsAborted() const {
Expand Down Expand Up @@ -119,6 +120,7 @@ class TLockFeatures: TMoveOnly {
void SetDeleted() {
auto prevState = TLockSharingInfo::LockState::Created;
SharingInfo->State.compare_exchange_strong(prevState, TLockSharingInfo::LockState::Deleted);
SharingInfo->IsExplicitlyDeleted.store(true);
}

bool IsDeleted() const {
Expand Down Expand Up @@ -181,6 +183,18 @@ class TOperationsManager {

public:

void DebugInfo() const {
size_t operationsInProgress = 0;
for (const auto& [_, lock] : LockFeatures) {
operationsInProgress += lock.GetOperationsInProgress();
}

AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "!!!VLAD_DebugInfo")
("Tx2Lock.size()", Tx2Lock.size())("LockFeatures.size()", LockFeatures.size())
("operationsInProgress", operationsInProgress)
("Operations.size()", Operations.size())("InsertWriteIdToOpWriteId.size()", InsertWriteIdToOpWriteId.size());
}

void StopWriting(const TString& errorMessage) {
for (auto&& i : Operations) {
i.second->StopWriting(errorMessage);
Expand Down
Loading