Skip to content

Commit 8f5df43

Browse files
committed
fix: serialise Postgres claims with FOR UPDATE SKIP LOCKED and seed reconciler test data before factory start
1 parent a86755a commit 8f5df43

2 files changed

Lines changed: 61 additions & 41 deletions

File tree

api/API/JobRuntime/EfJobStore.cs

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,45 +29,51 @@ public async Task<Job> EnqueueAsync(Job job, CancellationToken ct = default)
2929
public async Task<Job?> ClaimNextReadyAsync(DateTime now, TimeSpan leaseDuration, int globalCap, int perResourceCap,
3030
CancellationToken ct = default)
3131
{
32-
bool isRelational = context.Database.ProviderName != "Microsoft.EntityFrameworkCore.InMemory";
32+
if (context.Database.ProviderName != "Microsoft.EntityFrameworkCore.InMemory")
33+
return await ClaimRelationalAsync(now, leaseDuration, globalCap, perResourceCap, ct);
3334

3435
List<Job> active = await context.JobQueue
3536
.Where(j => j.Status == JobStatus.Queued || j.Status == JobStatus.Running)
36-
.AsNoTracking()
3737
.ToListAsync(ct);
3838

3939
if (JobReadySelection.PickClaimable(active, now, globalCap, perResourceCap) is not { } candidate)
4040
return null;
4141

42-
DateTime claimedUntil = now.Add(leaseDuration);
42+
JobReadySelection.MarkClaimed(candidate, now, leaseDuration);
43+
await context.SaveChangesAsync(ct);
44+
return candidate;
45+
}
4346

44-
if (isRelational)
47+
private async Task<Job?> ClaimRelationalAsync(DateTime now, TimeSpan leaseDuration, int globalCap, int perResourceCap,
48+
CancellationToken ct)
49+
{
50+
await using var txn = await context.Database.BeginTransactionAsync(ct);
51+
try
4552
{
46-
int updated = await context.JobQueue
47-
.Where(j => j.Key == candidate.Key && j.Status == JobStatus.Queued)
48-
.ExecuteUpdateAsync(s => s
49-
.SetProperty(j => j.Status, JobStatus.Running)
50-
.SetProperty(j => j.ScheduledFor, claimedUntil), ct);
51-
52-
if (updated == 0)
53+
List<Job> active = await context.JobQueue
54+
.FromSqlRaw("""
55+
SELECT * FROM "JobQueue"
56+
WHERE "Status" IN (0, 1)
57+
FOR UPDATE SKIP LOCKED
58+
""")
59+
.ToListAsync(ct);
60+
61+
if (JobReadySelection.PickClaimable(active, now, globalCap, perResourceCap) is not { } candidate)
62+
{
63+
await txn.RollbackAsync(ct);
5364
return null;
65+
}
5466

55-
candidate.Status = JobStatus.Running;
56-
candidate.ScheduledFor = claimedUntil;
67+
JobReadySelection.MarkClaimed(candidate, now, leaseDuration);
68+
await context.SaveChangesAsync(ct);
69+
await txn.CommitAsync(ct);
5770
return candidate;
5871
}
59-
60-
// InMemory provider (tests): reload as tracked and mutate via SaveChanges.
61-
Job? tracked = await context.JobQueue
62-
.Where(j => j.Key == candidate.Key && j.Status == JobStatus.Queued)
63-
.FirstOrDefaultAsync(ct);
64-
65-
if (tracked is null)
66-
return null;
67-
68-
JobReadySelection.MarkClaimed(tracked, now, leaseDuration);
69-
await context.SaveChangesAsync(ct);
70-
return tracked;
72+
catch
73+
{
74+
await txn.RollbackAsync(ct);
75+
throw;
76+
}
7177
}
7278

7379
public async Task UpdateAsync(Job job, CancellationToken ct = default)

api/Tests/Integration/ReconcilerLoopTests.cs

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using API.JobRuntime.Handlers;
2+
using API.Schema.ActionsContext;
23
using API.Schema.SeriesContext;
34
using Microsoft.EntityFrameworkCore;
45
using Xunit;
@@ -22,13 +23,40 @@ public async Task InitializeAsync()
2223
{
2324
if (!await _postgres.IsReachableAsync()) return;
2425
_dbName = await _postgres.CreateDatabaseAsync();
26+
string cs = _postgres.GetConnectionString(_dbName);
27+
Directory.CreateDirectory(_libDir);
28+
29+
await MigrateAsync<SeriesContext>(cs);
30+
await MigrateAsync<global::API.Schema.JobsContext.JobsContext>(cs);
31+
await MigrateAsync<ActionsContext>(cs);
32+
33+
var opts = new DbContextOptionsBuilder<SeriesContext>().UseNpgsql(cs).Options;
34+
await using (var ctx = new SeriesContext(opts))
35+
{
36+
var library = new FileLibrary(_libDir, "Lib");
37+
ctx.FileLibraries.Add(library);
38+
var manga = new Series("Test", "", "http://x/c.jpg", SeriesReleaseStatus.Continuing, [], [], [], [], library);
39+
ctx.Series.Add(manga);
40+
var chapter = new Chapter(manga, "1", null, null);
41+
ctx.Chapters.Add(chapter);
42+
var sourceId = new SourceId<Chapter>(chapter, "StubConnector", "site-id-1", "http://stub.test/1", true);
43+
ctx.MangaConnectorToChapter.Add(sourceId);
44+
await ctx.SaveChangesAsync();
45+
}
46+
2547
_app = new KenkuApplicationFactory
2648
{
27-
PostgresConnectionString = _postgres.GetConnectionString(_dbName),
49+
PostgresConnectionString = cs,
2850
RunStartup = true,
2951
DispatcherCaps = (0, 0),
3052
};
31-
Directory.CreateDirectory(_libDir);
53+
}
54+
55+
private static async Task MigrateAsync<TContext>(string cs) where TContext : DbContext
56+
{
57+
var opts = new DbContextOptionsBuilder<TContext>().UseNpgsql(cs).Options;
58+
await using var ctx = (TContext)Activator.CreateInstance(typeof(TContext), opts)!;
59+
await ctx.Database.MigrateAsync();
3260
}
3361

3462
public async Task DisposeAsync()
@@ -44,20 +72,6 @@ public async Task DownloadReconciler_OnFirstTick_EnqueuesDownloadJobForRequested
4472
{
4573
if (_app is null) return; // skip: Postgres not available
4674

47-
await _app.WithSeriesContext(async ctx =>
48-
{
49-
var library = new FileLibrary(_libDir, "Lib");
50-
ctx.FileLibraries.Add(library);
51-
var manga = new Series("Test", "", "http://x/c.jpg", SeriesReleaseStatus.Continuing, [], [], [], [], library);
52-
ctx.Series.Add(manga);
53-
var chapter = new Chapter(manga, "1", null, null);
54-
ctx.Chapters.Add(chapter);
55-
var sourceId = new SourceId<Chapter>(chapter, "StubConnector", "site-id-1", "http://stub.test/1", true);
56-
ctx.MangaConnectorToChapter.Add(sourceId);
57-
await ctx.SaveChangesAsync();
58-
return 0;
59-
});
60-
6175
bool appeared = false;
6276
var deadline = DateTime.UtcNow.AddSeconds(10);
6377
while (DateTime.UtcNow < deadline)

0 commit comments

Comments
 (0)