Skip to content

Commit e1a7006

Browse files
committed
Implements audit data retention policy
Implements data retention policy for audit messages and saga snapshots using a background service. This change introduces a base `RetentionCleaner` class that handles the logic for deleting expired audit data in batches. Database-specific implementations are provided for SQL Server and PostgreSQL, leveraging their respective locking mechanisms (sp_getapplock and advisory locks) to prevent concurrent executions of the cleanup process. Removes the registration of the `RetentionCleaner` from the base class and registers it on specific implementations. The cleanup process deletes processed messages and saga snapshots older than the configured retention period, optimizing database space and improving query performance.
1 parent 38b2f2f commit e1a7006

6 files changed

Lines changed: 118 additions & 17 deletions

File tree

src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/BaseAuditPersistence.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,5 @@ protected static void RegisterDataStores(IServiceCollection services, AuditSqlPe
2626
services.AddSingleton<IFailedAuditStorage, EFFailedAuditStorage>();
2727
services.AddSingleton<IAuditIngestionUnitOfWorkFactory, AuditIngestionUnitOfWorkFactory>();
2828
services.AddSingleton(TimeProvider.System);
29-
services.AddHostedService<RetentionCleaner>();
3029
}
3130
}

src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionCleaner.cs

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,17 @@ namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
88
using Microsoft.Extensions.Hosting;
99
using Microsoft.Extensions.Logging;
1010

11-
public class RetentionCleaner(
12-
ILogger<RetentionCleaner> logger,
11+
public abstract class RetentionCleaner(
12+
ILogger logger,
1313
TimeProvider timeProvider,
1414
IServiceScopeFactory serviceScopeFactory,
1515
AuditSqlPersisterSettings settings,
1616
IBodyStoragePersistence bodyPersistence) : BackgroundService
1717
{
18+
protected const int BatchSize = 250;
19+
1820
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
1921
{
20-
// NO-OPS per requirements
2122
logger.LogInformation("Starting {ServiceName}", nameof(RetentionCleaner));
2223

2324
try
@@ -46,25 +47,53 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
4647

4748
async Task Clean(CancellationToken stoppingToken)
4849
{
49-
var stopwatch = Stopwatch.StartNew();
50-
const int batchSize = 250;
51-
5250
using var scope = serviceScopeFactory.CreateScope();
5351
var dbContext = scope.ServiceProvider.GetRequiredService<AuditDbContextBase>();
5452

53+
await using var transaction = await dbContext.Database.BeginTransactionAsync(stoppingToken);
54+
55+
if (!await TryAcquireLock(dbContext, stoppingToken))
56+
{
57+
logger.LogDebug("Another instance is running retention cleanup, skipping this cycle");
58+
return;
59+
}
60+
61+
var stopwatch = Stopwatch.StartNew();
5562
var cutoff = timeProvider.GetUtcNow().UtcDateTime - settings.AuditRetentionPeriod;
5663

5764
var totalDeletedMessages = 0;
65+
var totalDeletedSnapshots = 0;
66+
67+
try
68+
{
69+
totalDeletedMessages = await CleanProcessedMessages(dbContext, cutoff, stoppingToken);
70+
totalDeletedSnapshots = await CleanSagaSnapshots(dbContext, cutoff, stoppingToken);
71+
await transaction.CommitAsync(stoppingToken);
72+
}
73+
catch
74+
{
75+
await transaction.RollbackAsync(stoppingToken);
76+
throw;
77+
}
78+
79+
logger.LogInformation("Retention cleanup removed {Messages} messages and {Snapshots} saga snapshots in {Elapsed}",
80+
totalDeletedMessages, totalDeletedSnapshots, stopwatch.Elapsed.ToString(@"hh\:mm\:ss"));
81+
}
82+
83+
async Task<int> CleanProcessedMessages(AuditDbContextBase dbContext, DateTime cutoff, CancellationToken stoppingToken)
84+
{
85+
var totalDeleted = 0;
5886
int deleted;
5987

6088
do
6189
{
6290
// Get a batch of IDs to delete so we can clean up body files
91+
// Note: No OrderBy - we just need any expired records, not necessarily the oldest first.
92+
// Ordering would require sorting potentially millions of rows and cause timeouts.
6393
var messageIdsToDelete = await dbContext.ProcessedMessages
6494
.Where(m => m.ProcessedAt < cutoff)
65-
.OrderBy(m => m.ProcessedAt)
6695
.Select(m => m.UniqueMessageId)
67-
.Take(batchSize)
96+
.Take(BatchSize)
6897
.ToListAsync(stoppingToken);
6998

7099
if (messageIdsToDelete.Count == 0)
@@ -80,18 +109,23 @@ async Task Clean(CancellationToken stoppingToken)
80109
.Where(m => messageIdsToDelete.Contains(m.UniqueMessageId))
81110
.ExecuteDeleteAsync(stoppingToken);
82111

83-
totalDeletedMessages += deleted;
84-
} while (deleted == batchSize);
112+
totalDeleted += deleted;
113+
} while (deleted == BatchSize);
85114

86-
var totalDeletedSnapshots = 0;
115+
return totalDeleted;
116+
}
117+
118+
async Task<int> CleanSagaSnapshots(AuditDbContextBase dbContext, DateTime cutoff, CancellationToken stoppingToken)
119+
{
120+
var totalDeleted = 0;
121+
int deleted;
87122

88123
do
89124
{
90125
var snapshotIdsToDelete = await dbContext.SagaSnapshots
91126
.Where(s => s.ProcessedAt < cutoff)
92-
.OrderBy(s => s.ProcessedAt)
93127
.Select(s => s.Id)
94-
.Take(batchSize)
128+
.Take(BatchSize)
95129
.ToListAsync(stoppingToken);
96130

97131
if (snapshotIdsToDelete.Count == 0)
@@ -103,9 +137,11 @@ async Task Clean(CancellationToken stoppingToken)
103137
.Where(s => snapshotIdsToDelete.Contains(s.Id))
104138
.ExecuteDeleteAsync(stoppingToken);
105139

106-
totalDeletedSnapshots += deleted;
107-
} while (deleted == batchSize);
140+
totalDeleted += deleted;
141+
} while (deleted == BatchSize);
108142

109-
logger.LogInformation("Retention cleanup removed {Messages} messages and {Snapshots} saga snapshots in {Elapsed}", totalDeletedMessages, totalDeletedSnapshots, stopwatch.Elapsed.ToString(@"hh\:mm\:ss"));
143+
return totalDeleted;
110144
}
145+
146+
protected abstract Task<bool> TryAcquireLock(AuditDbContextBase dbContext, CancellationToken stoppingToken);
111147
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Infrastructure;
2+
3+
using Core.Abstractions;
4+
using Core.DbContexts;
5+
using Core.Infrastructure;
6+
using Microsoft.EntityFrameworkCore;
7+
using Microsoft.Extensions.DependencyInjection;
8+
using Microsoft.Extensions.Logging;
9+
10+
class RetentionCleaner(
11+
ILogger<RetentionCleaner> logger,
12+
TimeProvider timeProvider,
13+
IServiceScopeFactory serviceScopeFactory,
14+
AuditSqlPersisterSettings settings,
15+
IBodyStoragePersistence bodyPersistence)
16+
: Core.Infrastructure.RetentionCleaner(logger, timeProvider, serviceScopeFactory, settings, bodyPersistence)
17+
{
18+
protected override async Task<bool> TryAcquireLock(AuditDbContextBase dbContext, CancellationToken stoppingToken)
19+
{
20+
// Use PostgreSQL's advisory lock for distributed locking
21+
// pg_try_advisory_xact_lock returns true if lock acquired, false otherwise
22+
// The lock is automatically released when the transaction ends
23+
var sql = "SELECT pg_try_advisory_xact_lock(hashtext('retention_cleaner'))";
24+
25+
var result = await dbContext.Database.SqlQueryRaw<bool>(sql).FirstOrDefaultAsync(stoppingToken);
26+
return result;
27+
}
28+
}

src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/PostgreSqlAuditPersistence.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public void AddPersistence(IServiceCollection services)
2222
RegisterDataStores(services, settings);
2323
services.AddSingleton<IAuditFullTextSearchProvider, PostgreSqlAuditFullTextSearchProvider>();
2424
services.AddHostedService<KnownEndpointsReconciler>();
25+
services.AddHostedService<RetentionCleaner>();
2526
}
2627

2728
public void AddInstaller(IServiceCollection services)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
namespace ServiceControl.Audit.Persistence.Sql.SqlServer.Infrastructure;
2+
3+
using Core.Abstractions;
4+
using Core.DbContexts;
5+
using Core.Infrastructure;
6+
using Microsoft.EntityFrameworkCore;
7+
using Microsoft.Extensions.DependencyInjection;
8+
using Microsoft.Extensions.Logging;
9+
10+
class RetentionCleaner(
11+
ILogger<RetentionCleaner> logger,
12+
TimeProvider timeProvider,
13+
IServiceScopeFactory serviceScopeFactory,
14+
AuditSqlPersisterSettings settings,
15+
IBodyStoragePersistence bodyPersistence)
16+
: Core.Infrastructure.RetentionCleaner(logger, timeProvider, serviceScopeFactory, settings, bodyPersistence)
17+
{
18+
protected override async Task<bool> TryAcquireLock(AuditDbContextBase dbContext, CancellationToken stoppingToken)
19+
{
20+
// Use SQL Server's sp_getapplock for distributed locking
21+
// LockTimeout = 0 means return immediately if lock cannot be acquired
22+
// Returns >= 0 on success, < 0 on failure
23+
var sql = @"
24+
DECLARE @lockResult INT;
25+
EXEC @lockResult = sp_getapplock
26+
@Resource = 'retention_cleaner',
27+
@LockMode = 'Exclusive',
28+
@LockOwner = 'Transaction',
29+
@LockTimeout = 0;
30+
SELECT @lockResult;
31+
";
32+
33+
var result = await dbContext.Database.SqlQueryRaw<int>(sql).FirstOrDefaultAsync(stoppingToken);
34+
return result >= 0;
35+
}
36+
}

src/ServiceControl.Audit.Persistence.Sql.SqlServer/SqlServerAuditPersistence.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public void AddPersistence(IServiceCollection services)
2222
RegisterDataStores(services, settings);
2323
services.AddSingleton<IAuditFullTextSearchProvider, SqlServerAuditFullTextSearchProvider>();
2424
services.AddHostedService<KnownEndpointsReconciler>();
25+
services.AddHostedService<RetentionCleaner>();
2526
}
2627

2728
public void AddInstaller(IServiceCollection services)

0 commit comments

Comments
 (0)