Skip to content

Commit f276198

Browse files
committed
fix: harden EfJobStore claim with atomic CAS and add Postgres integration test infra
1 parent 320c840 commit f276198

6 files changed

Lines changed: 251 additions & 15 deletions

File tree

api/API/JobRuntime/EfJobStore.cs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,6 @@ namespace API.JobRuntime;
88
/// EF-backed <see cref="IJobStore"/>. Scoped (one per dispatcher tick). Claim selection reuses
99
/// <see cref="JobReadySelection"/> so it matches the in-memory store the DF tests pin.
1010
/// </summary>
11-
/// <remarks>
12-
/// The claim loads the active/ready set and selects in memory. Under high concurrency two ticks could
13-
/// race the same row; hardening the claim with <c>SELECT … FOR UPDATE SKIP LOCKED</c> is deferred to the
14-
/// first real handler (step 2), where it is verified against Postgres. The runtime is not wired to real
15-
/// work yet, so no production claim race is exercised in step 1.
16-
/// </remarks>
1711
public class EfJobStore(JobsContext context) : IJobStore
1812
{
1913
public async Task<Job> EnqueueAsync(Job job, CancellationToken ct = default)
@@ -35,16 +29,45 @@ public async Task<Job> EnqueueAsync(Job job, CancellationToken ct = default)
3529
public async Task<Job?> ClaimNextReadyAsync(DateTime now, TimeSpan leaseDuration, int globalCap, int perResourceCap,
3630
CancellationToken ct = default)
3731
{
32+
bool isRelational = context.Database.ProviderName != "Microsoft.EntityFrameworkCore.InMemory";
33+
3834
List<Job> active = await context.JobQueue
3935
.Where(j => j.Status == JobStatus.Queued || j.Status == JobStatus.Running)
36+
.AsNoTracking()
4037
.ToListAsync(ct);
4138

42-
if (JobReadySelection.PickClaimable(active, now, globalCap, perResourceCap) is not { } job)
39+
if (JobReadySelection.PickClaimable(active, now, globalCap, perResourceCap) is not { } candidate)
4340
return null;
4441

45-
JobReadySelection.MarkClaimed(job, now, leaseDuration);
42+
DateTime claimedUntil = now.Add(leaseDuration);
43+
44+
if (isRelational)
45+
{
46+
int updated = await context.JobQueue
47+
.Where(j => j.Key == candidate.Key && j.Status == JobStatus.Queued)
48+
.ExecuteUpdateAsync(s => s
49+
.SetProperty(j => j.Status, JobStatus.Running)
50+
.SetProperty(j => j.ScheduledFor, claimedUntil), ct);
51+
52+
if (updated == 0)
53+
return null;
54+
55+
candidate.Status = JobStatus.Running;
56+
candidate.ScheduledFor = claimedUntil;
57+
return candidate;
58+
}
59+
60+
// InMemory provider (tests): reload as tracked and mutate via SaveChanges.
61+
Job? tracked = await context.JobQueue
62+
.Where(j => j.Key == candidate.Key && j.Status == JobStatus.Queued)
63+
.FirstOrDefaultAsync(ct);
64+
65+
if (tracked is null)
66+
return null;
67+
68+
JobReadySelection.MarkClaimed(tracked, now, leaseDuration);
4669
await context.SaveChangesAsync(ct);
47-
return job;
70+
return tracked;
4871
}
4972

5073
public async Task UpdateAsync(Job job, CancellationToken ct = default)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
using API.JobRuntime.Interfaces;
2+
using API.Schema.JobsContext;
3+
using API.Schema.SeriesContext;
4+
using Microsoft.EntityFrameworkCore;
5+
using Microsoft.Extensions.DependencyInjection;
6+
using Npgsql;
7+
using Xunit;
8+
using JobEntity = API.Schema.JobsContext.Job;
9+
10+
namespace API.Tests.Integration;
11+
12+
[CollectionDefinition("postgres")]
13+
public class PostgresCollectionDefinition { }
14+
15+
/// <summary>
16+
/// Verifies that two concurrent Dispatcher instances claiming from the same Postgres database
17+
/// each get at most one job — no double-execution.
18+
/// Requires postgres from docker-compose.test.yml (skips if not reachable).
19+
/// </summary>
20+
[Collection("postgres")]
21+
public class EfJobStoreConcurrencyTests : IAsyncLifetime
22+
{
23+
private readonly PostgresFixture _pg = new();
24+
private string? _dbName;
25+
private KenkuApplicationFactory? _app1;
26+
private KenkuApplicationFactory? _app2;
27+
28+
public async Task InitializeAsync()
29+
{
30+
if (!await PostgresReachableAsync())
31+
return;
32+
_dbName = await _pg.CreateDatabaseAsync();
33+
string cs = _pg.GetConnectionString(_dbName);
34+
var handler = new CountingHandler(() => Interlocked.Increment(ref _executions));
35+
_app1 = new KenkuApplicationFactory { PostgresConnectionString = cs, ExtraJobHandlers = [handler] };
36+
_app2 = new KenkuApplicationFactory { PostgresConnectionString = cs, ExtraJobHandlers = [handler] };
37+
}
38+
39+
public async Task DisposeAsync()
40+
{
41+
_app1?.Dispose();
42+
_app2?.Dispose();
43+
if (_dbName is not null)
44+
await _pg.DropDatabaseAsync(_dbName);
45+
}
46+
47+
private int _executions;
48+
49+
[Fact]
50+
public async Task ConcurrentDispatchers_ClaimSameJob_ExactlyOnce()
51+
{
52+
if (_app1 is null)
53+
return; // Postgres not available — skip
54+
55+
using (var scope = _app1.Services.CreateScope())
56+
await scope.ServiceProvider.GetRequiredService<IJobStore>().EnqueueAsync(
57+
new JobEntity("CountingJob", "{}", DateTime.UtcNow));
58+
59+
var t1 = Task.Run(async () =>
60+
{
61+
using var scope = _app1.Services.CreateScope();
62+
var dispatcher = scope.ServiceProvider.GetRequiredService<API.JobRuntime.Dispatcher>();
63+
return await dispatcher.RunOnceAsync();
64+
});
65+
var t2 = Task.Run(async () =>
66+
{
67+
using var scope = _app2!.Services.CreateScope();
68+
var dispatcher = scope.ServiceProvider.GetRequiredService<API.JobRuntime.Dispatcher>();
69+
return await dispatcher.RunOnceAsync();
70+
});
71+
72+
bool[] claimed = await Task.WhenAll(t1, t2);
73+
74+
Assert.Equal(1, claimed.Count(c => c));
75+
Assert.Equal(1, _executions);
76+
}
77+
78+
[Fact]
79+
public async Task MigrationsApplied_AllContextsHaveSchema()
80+
{
81+
if (_app1 is null)
82+
return; // Postgres not available — skip
83+
84+
using var scope = _app1.Services.CreateScope();
85+
var sp = scope.ServiceProvider;
86+
87+
var jobsCtx = sp.GetRequiredService<global::API.Schema.JobsContext.JobsContext>();
88+
Assert.Empty(await jobsCtx.Database.GetPendingMigrationsAsync());
89+
90+
var seriesCtx = sp.GetRequiredService<SeriesContext>();
91+
Assert.Empty(await seriesCtx.Database.GetPendingMigrationsAsync());
92+
}
93+
94+
private static async Task<bool> PostgresReachableAsync()
95+
{
96+
try
97+
{
98+
await using var conn = new NpgsqlConnection(PostgresFixture.AdminConnectionString);
99+
await conn.OpenAsync(new CancellationTokenSource(TimeSpan.FromSeconds(3)).Token);
100+
return true;
101+
}
102+
catch
103+
{
104+
return false;
105+
}
106+
}
107+
108+
private sealed class CountingHandler(Action onExecute) : IJobHandler
109+
{
110+
public string JobType => "CountingJob";
111+
public Task ExecuteAsync(JobEntity job, CancellationToken ct) { onExecute(); return Task.CompletedTask; }
112+
}
113+
}

api/Tests/Integration/KenkuApplicationFactory.cs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Microsoft.EntityFrameworkCore;
1414
using Microsoft.Extensions.DependencyInjection;
1515
using Microsoft.Extensions.DependencyInjection.Extensions;
16+
using Microsoft.Extensions.Hosting;
1617

1718
namespace API.Tests.Integration;
1819

@@ -33,7 +34,12 @@ public sealed class KenkuApplicationFactory : WebApplicationFactory<Program>
3334
new ServiceCollection().AddEntityFrameworkInMemoryDatabase().BuildServiceProvider();
3435

3536
/// <summary>Base URL of the local server outbound metadata requests should be redirected to.</summary>
36-
public required string OutboundHttpTarget { get; init; }
37+
public string OutboundHttpTarget { get; init; } = "http://localhost:1";
38+
39+
/// <summary>When set, the three core contexts (Series, Jobs, Actions) are backed by this Postgres
40+
/// database instead of the default InMemory stores. Used by concurrency and migration tests that
41+
/// need a real relational engine.</summary>
42+
public string? PostgresConnectionString { get; init; }
3743

3844
/// <summary>Optional stub for the connectors' HTTP edge, so a connector flow can be driven without
3945
/// real network access. When set, it replaces the registered <see cref="IHttpRequester"/>.</summary>
@@ -70,11 +76,22 @@ protected override void ConfigureWebHost(IWebHostBuilder builder)
7076
builder.UseSetting("Kenku:AppData", Path.Combine(Path.GetTempPath(), "kenku-test-" + _id));
7177
builder.ConfigureTestServices(services =>
7278
{
73-
UseInMemory<SeriesContext>(services);
74-
UseInMemory<NotificationsContext>(services);
75-
UseInMemory<LibraryContext>(services);
76-
UseInMemory<ActionsContext>(services);
77-
UseInMemory<global::API.Schema.JobsContext.JobsContext>(services);
79+
if (PostgresConnectionString is not null)
80+
{
81+
UseNpgsql<SeriesContext>(services, PostgresConnectionString);
82+
UseNpgsql<global::API.Schema.JobsContext.JobsContext>(services, PostgresConnectionString);
83+
UseNpgsql<ActionsContext>(services, PostgresConnectionString);
84+
UseInMemory<NotificationsContext>(services);
85+
UseInMemory<LibraryContext>(services);
86+
}
87+
else
88+
{
89+
UseInMemory<SeriesContext>(services);
90+
UseInMemory<NotificationsContext>(services);
91+
UseInMemory<LibraryContext>(services);
92+
UseInMemory<ActionsContext>(services);
93+
UseInMemory<global::API.Schema.JobsContext.JobsContext>(services);
94+
}
7895

7996
RouteOutboundHttp<MangaDexVolumeResolver>(services);
8097
RouteOutboundHttp<MangaDexSearchService>(services);
@@ -118,6 +135,20 @@ protected override void ConfigureWebHost(IWebHostBuilder builder)
118135
});
119136
}
120137

138+
protected override IHost CreateHost(IHostBuilder builder)
139+
{
140+
var host = base.CreateHost(builder);
141+
if (PostgresConnectionString is not null)
142+
{
143+
using var scope = host.Services.CreateScope();
144+
var sp = scope.ServiceProvider;
145+
sp.GetRequiredService<SeriesContext>().Database.MigrateAsync().GetAwaiter().GetResult();
146+
sp.GetRequiredService<global::API.Schema.JobsContext.JobsContext>().Database.MigrateAsync().GetAwaiter().GetResult();
147+
sp.GetRequiredService<ActionsContext>().Database.MigrateAsync().GetAwaiter().GetResult();
148+
}
149+
return host;
150+
}
151+
121152
private void UseInMemory<TContext>(IServiceCollection services) where TContext : DbContext
122153
{
123154
services.RemoveAll<DbContextOptions<TContext>>();
@@ -127,6 +158,13 @@ private void UseInMemory<TContext>(IServiceCollection services) where TContext :
127158
.UseInternalServiceProvider(_efProvider));
128159
}
129160

161+
private static void UseNpgsql<TContext>(IServiceCollection services, string connectionString) where TContext : DbContext
162+
{
163+
services.RemoveAll<DbContextOptions<TContext>>();
164+
services.RemoveAll<TContext>();
165+
services.AddDbContext<TContext>(o => o.UseNpgsql(connectionString));
166+
}
167+
130168
// Replaces the typed client's primary handler so its (absolute) requests are redirected to the
131169
// local server. The resolver's own configuration (e.g. the User-Agent) is left intact.
132170
private void RouteOutboundHttp<TClient>(IServiceCollection services) where TClient : class =>
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using Npgsql;
2+
3+
namespace API.Tests.Integration;
4+
5+
/// <summary>
6+
/// Manages per-test Postgres database lifecycle. Each test creates an isolated database
7+
/// (kenku_test_&lt;guid&gt;) and drops it in teardown. Requires postgres running via docker-compose.test.yml.
8+
/// Set TEST_POSTGRES_CONNECTION_STRING to override the default localhost:5433 endpoint.
9+
/// </summary>
10+
public sealed class PostgresFixture : IAsyncLifetime
11+
{
12+
public static readonly string AdminConnectionString =
13+
Environment.GetEnvironmentVariable("TEST_POSTGRES_CONNECTION_STRING")
14+
?? "Host=localhost;Port=5433;Username=kenku;Password=kenku_test;Database=postgres";
15+
16+
public Task InitializeAsync() => Task.CompletedTask;
17+
public Task DisposeAsync() => Task.CompletedTask;
18+
19+
public async Task<string> CreateDatabaseAsync()
20+
{
21+
string name = $"kenku_test_{Guid.NewGuid():N}";
22+
await using var conn = new NpgsqlConnection(AdminConnectionString);
23+
await conn.OpenAsync();
24+
await using var cmd = conn.CreateCommand();
25+
cmd.CommandText = $"CREATE DATABASE \"{name}\"";
26+
await cmd.ExecuteNonQueryAsync();
27+
return name;
28+
}
29+
30+
public async Task DropDatabaseAsync(string name)
31+
{
32+
await using var conn = new NpgsqlConnection(AdminConnectionString);
33+
await conn.OpenAsync();
34+
await using var kill = conn.CreateCommand();
35+
kill.CommandText = $"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{name}'";
36+
await kill.ExecuteNonQueryAsync();
37+
await using var drop = conn.CreateCommand();
38+
drop.CommandText = $"DROP DATABASE IF EXISTS \"{name}\"";
39+
await drop.ExecuteNonQueryAsync();
40+
}
41+
42+
public string GetConnectionString(string dbName)
43+
{
44+
var b = new NpgsqlConnectionStringBuilder(AdminConnectionString) { Database = dbName };
45+
return b.ToString();
46+
}
47+
}

api/Tests/Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
<ItemGroup>
1111
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="10.0.8" />
12+
<PackageReference Include="Npgsql" Version="10.0.1" />
1213
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="10.0.2" />
1314
<PackageReference Include="coverlet.collector" Version="6.0.4">
1415
<PrivateAssets>all</PrivateAssets>

api/docker-compose.test.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
services:
2+
postgres:
3+
image: postgres:15-alpine
4+
environment:
5+
POSTGRES_USER: kenku
6+
POSTGRES_PASSWORD: kenku_test
7+
POSTGRES_DB: postgres
8+
ports:
9+
- "5433:5432"
10+
healthcheck:
11+
test: ["CMD-SHELL", "pg_isready -U kenku"]
12+
interval: 5s
13+
timeout: 5s
14+
retries: 5

0 commit comments

Comments
 (0)