Skip to content

Commit e893530

Browse files
committed
fix(tools.console): include snapshot oslo projection when waiting to process + add activity logging
1 parent 6f9a4d4 commit e893530

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

src/BuildingRegistry.Tools.Console/Infrastructure/ProjectionRepository.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,15 @@ public async Task<long> GetProducerPosition()
2323
{
2424
await using var sql = new Microsoft.Data.SqlClient.SqlConnection(_eventsConnectionString);
2525
return await sql.ExecuteScalarAsync<long>($@"
26-
SELECT TOP 1 COALESCE([Position], 0)
27-
FROM [{Schema.Producer}].[ProjectionStates]
28-
ORDER BY [Position] ASC");
26+
SELECT TOP 1 COALESCE([Position], 0)
27+
FROM (
28+
SELECT [Position]
29+
FROM [{Schema.Producer}].[ProjectionStates]
30+
UNION
31+
SELECT [Position]
32+
FROM [{Schema.ProducerSnapshotOslo}].[ProjectionStates]
33+
) as q
34+
ORDER BY [Position] ASC");
2935
}
3036
}
3137
}

src/BuildingRegistry.Tools.Console/RepairBuilding/RepairBuildingService.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,24 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
4040
{
4141
await _repairBuildingRepository.EnsureSchemaAndTablesExist();
4242
await _repairBuildingRepository.FillBuildingToProcess();
43-
var idsToProcess = await _repairBuildingRepository.GetBuildingsToProcess();
43+
var idsToProcess = (await _repairBuildingRepository.GetBuildingsToProcess()).ToList();
44+
_logger.LogInformation("Found {idsToProcess} buildings to process.", idsToProcess.Count);
4445

4546
_ = ScheduleBuildingRepairUpdates(stoppingToken);
4647
var ticketId = await _ticketing.CreateTicket(null, stoppingToken);
4748

4849
await _sqsRateLimiter.Handle<int>(
49-
idsToProcess.ToList(),
50+
idsToProcess,
5051
id => new RepairBuildingSqsRequest { BuildingPersistentLocalId = id, TicketId = ticketId },
5152
async processedId =>
5253
{
5354
await _repairBuildingRepository.DeleteBuilding(processedId);
54-
if(processedId % 100 == 0)
55+
_logger.LogInformation("Processed building {processedId}", processedId);
56+
57+
if (processedId % 100 == 0)
58+
{
5559
await WaitIfProducerProjectionBehindAsync(stoppingToken);
60+
}
5661
},
5762
stoppingToken);
5863

@@ -68,7 +73,10 @@ private async Task WaitIfProducerProjectionBehindAsync(CancellationToken cancell
6873

6974
var lag = headPosition - producerPosition;
7075
if (lag <= 100)
76+
{
77+
_logger.LogInformation("Resume processing, lag is {Lag} messages.", lag);
7178
break;
79+
}
7280

7381
_logger.LogInformation("Producer projection is behind by {Lag} messages, waiting 3 seconds...", lag);
7482
await Task.Delay(3000, cancellationToken);

0 commit comments

Comments
 (0)