Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<NoWarn>1570;1571;1572;1573;1574;1587;1591;1701;1702;1711;1735;0618</NoWarn>
<ImplicitUsings>true</ImplicitUsings>
<Nullable>enable</Nullable>
<Version>5.9.1</Version>
<Version>5.9.2</Version>
<RepositoryUrl>$(PackageProjectUrl)</RepositoryUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
110 changes: 110 additions & 0 deletions src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,113 @@ private async Task<WolverineNode> readNodeAsync(DbDataReader reader)
return null;
}
}

internal class AdvisoryLock : IAdvisoryLock
{
private readonly string _databaseName;
private readonly List<int> _locks = new();
private readonly ILogger _logger;
private readonly NpgsqlDataSource _source;
private NpgsqlConnection _conn;

public AdvisoryLock(NpgsqlDataSource source, ILogger logger, string databaseName)
{
_source = source;
_logger = logger;
_databaseName = databaseName;
}

public bool HasLock(int lockId)
{
return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId);
}

public async Task<bool> TryAttainLockAsync(int lockId, CancellationToken token)
{
if (_conn == null)
{
_conn = _source.CreateConnection();
await _conn.OpenAsync(token).ConfigureAwait(false);
}

if (_conn.State == ConnectionState.Closed)
{
try
{
await _conn.DisposeAsync().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Error trying to clean up and restart an advisory lock connection");
}
finally
{
_conn = null;
}

return false;
}


var attained = await _conn.TryGetGlobalLock(lockId, token).ConfigureAwait(false);
if (attained == AttainLockResult.Success)
{
_locks.Add(lockId);
return true;
}

return false;
}

public async Task ReleaseLockAsync(int lockId)
{
if (!_locks.Contains(lockId))
{
return;
}

if (_conn == null || _conn.State == ConnectionState.Closed)
{
_locks.Remove(lockId);
return;
}

var cancellation = new CancellationTokenSource();
cancellation.CancelAfter(1.Seconds());

await _conn.ReleaseGlobalLock(lockId, cancellation.Token).ConfigureAwait(false);
_locks.Remove(lockId);

if (!_locks.Any())
{
await _conn.CloseAsync().ConfigureAwait(false);
await _conn.DisposeAsync().ConfigureAwait(false);
_conn = null;
}
}

public async ValueTask DisposeAsync()
{
if (_conn == null)
{
return;
}

try
{
foreach (var i in _locks) await _conn.ReleaseGlobalLock(i, CancellationToken.None).ConfigureAwait(false);

await _conn.CloseAsync().ConfigureAwait(false);
await _conn.DisposeAsync().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Error trying to dispose of advisory locks for database {Identifier}",
_databaseName);
}
finally
{
await _conn.DisposeAsync().ConfigureAwait(false);
}
}
}
Loading