Skip to content

Added more use of ProgressLogger (#8504) #8509

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 50 additions & 14 deletions src/Nethermind/Nethermind.Db/FullPruning/FullPruningDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class FullPruningDb : IDb, IFullPruningDb, ITunableDb
private readonly DbSettings _settings;
private readonly IDbFactory _dbFactory;
private readonly Action? _updateDuplicateWriteMetrics;
private readonly ILogManager _logManager;
private ProgressLogger? _progressLogger;

// current main DB, will be written to and will be main source for reading
private IDb _currentDb;
Expand All @@ -32,11 +34,12 @@ public class FullPruningDb : IDb, IFullPruningDb, ITunableDb
// this will be null if no full pruning is in progress
private PruningContext? _pruningContext;

public FullPruningDb(DbSettings settings, IDbFactory dbFactory, Action? updateDuplicateWriteMetrics = null)
public FullPruningDb(DbSettings settings, IDbFactory dbFactory, ILogManager logManager, Action? updateDuplicateWriteMetrics = null)
{
_settings = settings;
_dbFactory = dbFactory;
_updateDuplicateWriteMetrics = updateDuplicateWriteMetrics;
_logManager = logManager;
_currentDb = CreateDb(_settings).WithEOACompressed();
}

Expand Down Expand Up @@ -182,6 +185,8 @@ DbSettings ClonedDbSettings()
context = pruningContext ?? newContext;
if (pruningContext is null)
{
_progressLogger = new ProgressLogger("Pruning", _logManager);
_progressLogger.Reset(0, 0); // Initialize with no upper bound
PruningStarted?.Invoke(this, new PruningEventArgs(context, true));
return true;
}
Expand All @@ -203,6 +208,8 @@ private void FinishPruning()
_pruningContext?.CloningDb?.Flush();
IDb oldDb = Interlocked.Exchange(ref _currentDb, _pruningContext?.CloningDb);
ClearOldDb(oldDb);
_progressLogger?.MarkEnd();
_progressLogger = null;
}

protected virtual void ClearOldDb(IDb oldDb)
Expand All @@ -218,50 +225,50 @@ private void FinishPruning(PruningContext pruningContext, bool success)

private class PruningContext : IPruningContext
{
private readonly FullPruningDb _db;
private long _processedKeys;
private bool _committed = false;
private bool _disposed = false;
public IDb CloningDb { get; }
public bool DuplicateReads { get; }
private readonly FullPruningDb _db;

public PruningContext(FullPruningDb db, IDb cloningDb, bool duplicateReads)
{
CloningDb = cloningDb;
DuplicateReads = duplicateReads;
_db = db;
_db._progressLogger?.Reset(0, 0); // Initialize with no upper bound
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
_db.Duplicate(CloningDb, key, value, flags);
CloningDb.Set(key, value, flags);
long currentCount = Interlocked.Increment(ref _processedKeys);
_db._progressLogger?.Update(currentCount);
if (currentCount % 100000 == 0)
{
_db._progressLogger?.LogProgress();
}
}

public IWriteBatch StartWriteBatch()
{
return CloningDb.StartWriteBatch();
}

public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return CloningDb.Get(key, flags);
return new ProgressTrackingWriteBatch(CloningDb.StartWriteBatch(), this);
}

/// <inheritdoc />
public void Commit()
{
_db.FinishPruning();
_committed = true; // we mark the context as committed.
_committed = true;
}

/// <inheritdoc />
public void MarkStart()
{
Metrics.StateDbPruning = 1;
}

public CancellationTokenSource CancellationTokenSource { get; } = new();

/// <inheritdoc />
public void Dispose()
{
if (!_disposed)
Expand All @@ -272,7 +279,6 @@ public void Dispose()
// if the context was not committed, then pruning failed and we delete the cloned DB
CloningDb.Clear();
}

CancellationTokenSource.Dispose();
Metrics.StateDbPruning = 0;
_disposed = true;
Expand Down Expand Up @@ -312,6 +318,36 @@ public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteF
}
}

private class ProgressTrackingWriteBatch : IWriteBatch
{
private readonly IWriteBatch _writeBatch;
private readonly PruningContext _context;
private long _batchProcessedKeys;

public ProgressTrackingWriteBatch(IWriteBatch writeBatch, PruningContext context)
{
_writeBatch = writeBatch;
_context = context;
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
_writeBatch.Set(key, value, flags);
_batchProcessedKeys++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interlocked.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait. Probably fine here.

if (_batchProcessedKeys % 100000 == 0)
{
_context._db._progressLogger?.Update(_context._processedKeys + _batchProcessedKeys);
_context._db._progressLogger?.LogProgress();
}
}

public void Dispose()
{
_context._processedKeys += _batchProcessedKeys;
_writeBatch.Dispose();
}
}

public void Tune(ITunableDb.TuneType type)
{
if (_currentDb is ITunableDb tunableDb)
Expand Down