Skip to content

Commit 3949e31

Browse files
authored
Merge pull request #298 from hangfire-postgres/features/266-use-jsonb-where-possible
Change columns to jsonb where possible
2 parents 5cc7f1d + bb365d8 commit 3949e31

13 files changed

+156
-119
lines changed
+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
using System;
2+
using System.Data;
3+
using System.Text.Json;
4+
using Dapper;
5+
using Hangfire.Annotations;
6+
using Npgsql;
7+
using NpgsqlTypes;
8+
9+
namespace Hangfire.PostgreSql;
10+
11+
internal class JsonParameter : SqlMapper.ICustomQueryParameter
12+
{
13+
[CanBeNull] private readonly object _value;
14+
private readonly ValueType _type;
15+
16+
public JsonParameter([CanBeNull] object value) : this(value, ValueType.Object)
17+
{
18+
}
19+
20+
public JsonParameter([CanBeNull] object value, ValueType type)
21+
{
22+
_value = value;
23+
_type = type;
24+
}
25+
26+
public void AddParameter(IDbCommand command, string name)
27+
{
28+
string value = _value switch {
29+
string { Length: > 0 } stringValue => stringValue,
30+
string { Length: 0 } or null => GetDefaultValue(),
31+
var _ => JsonSerializer.Serialize(_value),
32+
};
33+
command.Parameters.Add(new NpgsqlParameter(name, NpgsqlDbType.Jsonb) { Value = value });
34+
}
35+
36+
private string GetDefaultValue()
37+
{
38+
return _type switch
39+
{
40+
ValueType.Object => "{}",
41+
ValueType.Array => "[]",
42+
var _ => throw new ArgumentOutOfRangeException(),
43+
};
44+
}
45+
46+
public enum ValueType
47+
{
48+
Object,
49+
Array,
50+
}
51+
}

src/Hangfire.PostgreSql/PostgreSqlConnection.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ public override string CreateExpiredJob(
124124
return _storage.UseTransaction(_dedicatedConnection, (connection, transaction) => {
125125
string jobId = connection.QuerySingle<long>(createJobSql,
126126
new {
127-
InvocationData = SerializationHelper.Serialize(invocationData),
128-
invocationData.Arguments,
127+
InvocationData = new JsonParameter(SerializationHelper.Serialize(invocationData)),
128+
Arguments = new JsonParameter(invocationData.Arguments, JsonParameter.ValueType.Array),
129129
CreatedAt = createdAt,
130130
ExpireAt = createdAt.Add(expireIn),
131131
}).ToString(CultureInfo.InvariantCulture);
@@ -443,7 +443,7 @@ SELECT 1
443443
";
444444

445445
_storage.UseConnection(_dedicatedConnection, connection => connection
446-
.Execute(sql, new { Id = serverId, Data = SerializationHelper.Serialize(data) }));
446+
.Execute(sql, new { Id = serverId, Data = new JsonParameter(SerializationHelper.Serialize(data)) }));
447447
}
448448

449449
public override void RemoveServer(string serverId)

src/Hangfire.PostgreSql/PostgreSqlObjectsInstaller.cs

+12-12
Original file line numberDiff line numberDiff line change
@@ -68,20 +68,20 @@ public static void Install(NpgsqlConnection connection, string schemaName = "han
6868

6969
if (!VersionAlreadyApplied(connection, schemaName, version))
7070
{
71+
string commandText = $@"{script}; UPDATE ""{schemaName}"".""schema"" SET ""version"" = @Version WHERE ""version"" = @PreviousVersion";
7172
using NpgsqlTransaction transaction = connection.BeginTransaction(IsolationLevel.Serializable);
72-
#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities
73-
using NpgsqlCommand command = new(script, connection, transaction);
74-
command.CommandTimeout = 120;
73+
using NpgsqlCommand command = new(commandText, connection, transaction)
74+
{
75+
CommandTimeout = 120,
76+
Parameters =
77+
{
78+
new NpgsqlParameter("Version", version),
79+
new NpgsqlParameter("PreviousVersion", previousVersion),
80+
},
81+
};
7582
try
7683
{
77-
#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities
78-
command.CommandText += $@"; UPDATE ""{schemaName}"".""schema"" SET ""version"" = @Version WHERE ""version"" = @PreviousVersion";
79-
#pragma warning restore CA2100 // Review SQL queries for security vulnerabilities
80-
command.Parameters.AddWithValue("Version", version);
81-
command.Parameters.AddWithValue("PreviousVersion", previousVersion);
82-
8384
command.ExecuteNonQuery();
84-
8585
transaction.Commit();
8686
}
8787
catch (PostgresException ex)
@@ -112,8 +112,8 @@ private static bool VersionAlreadyApplied(NpgsqlConnection connection, string sc
112112
{
113113
try
114114
{
115-
using NpgsqlCommand command = new($@"SELECT true :: boolean ""VersionAlreadyApplied"" FROM ""{schemaName}"".""schema"" WHERE ""version""::integer >= @Version", connection);
116-
command.Parameters.AddWithValue("Version", version);
115+
using NpgsqlCommand command = new($@"SELECT true ""VersionAlreadyApplied"" FROM ""{schemaName}"".""schema"" WHERE ""version"" >= $1", connection);
116+
command.Parameters.Add(new NpgsqlParameter { Value = version });
117117
object result = command.ExecuteScalar();
118118
if (true.Equals(result))
119119
{

src/Hangfire.PostgreSql/PostgreSqlWriteOnlyTransaction.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public override void SetJobState(string jobId, IState state)
106106
state.Name,
107107
state.Reason,
108108
CreatedAt = DateTime.UtcNow,
109-
Data = SerializationHelper.Serialize(state.SerializeData()),
109+
Data = new JsonParameter(SerializationHelper.Serialize(state.SerializeData())),
110110
Id = Convert.ToInt64(jobId, CultureInfo.InvariantCulture),
111111
}));
112112
}
@@ -124,7 +124,7 @@ public override void AddJobState(string jobId, IState state)
124124
state.Name,
125125
state.Reason,
126126
CreatedAt = DateTime.UtcNow,
127-
Data = SerializationHelper.Serialize(state.SerializeData()),
127+
Data = new JsonParameter(SerializationHelper.Serialize(state.SerializeData())),
128128
}));
129129
}
130130

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
SET search_path = 'hangfire';
2+
3+
DO $$
4+
BEGIN
5+
IF EXISTS(SELECT 1 FROM "schema" WHERE "version"::integer >= 20) THEN
6+
RAISE EXCEPTION 'version-already-applied';
7+
END IF;
8+
END $$;
9+
10+
-- Update existing jobs, if any have empty values first
11+
UPDATE "job" SET "invocationdata" = '{}' WHERE "invocationdata" = '';
12+
UPDATE "job" SET "arguments" = '[]' WHERE "arguments" = '';
13+
14+
-- Change the type
15+
16+
ALTER TABLE "job" ALTER COLUMN "invocationdata" TYPE jsonb USING "invocationdata"::jsonb;
17+
ALTER TABLE "job" ALTER COLUMN "arguments" TYPE jsonb USING "arguments"::jsonb;
18+
ALTER TABLE "server" ALTER COLUMN "data" TYPE jsonb USING "data"::jsonb;
19+
ALTER TABLE "state" ALTER COLUMN "data" TYPE jsonb USING "data"::jsonb;
20+
21+
RESET search_path;

tests/Hangfire.PostgreSql.Tests/ExpirationManagerFacts.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void Execute_Processes_JobTable()
125125
// Arrange
126126
string createSql = $@"
127127
INSERT INTO ""{GetSchemaName()}"".""job"" (""invocationdata"", ""arguments"", ""createdat"", ""expireat"")
128-
VALUES ('', '', NOW(), @ExpireAt)
128+
VALUES ('{{}}', '[]', NOW(), @ExpireAt)
129129
";
130130
connection.Execute(createSql, new { ExpireAt = DateTime.UtcNow.AddMonths(-1) });
131131

tests/Hangfire.PostgreSql.Tests/PostgreSqlConnectionFacts.cs

+18-38
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Globalization;
44
using System.Linq;
5+
using System.Text.Json;
56
using System.Threading;
67
using System.Threading.Tasks;
78
using System.Transactions;
@@ -199,9 +200,9 @@ public void GetJobData_ReturnsResult_WhenJobExists()
199200

200201
long jobId = connection.QuerySingle<long>(arrangeSql,
201202
new {
202-
InvocationData = SerializationHelper.Serialize(InvocationData.SerializeJob(job)),
203+
InvocationData = new JsonParameter(SerializationHelper.Serialize(InvocationData.SerializeJob(job))),
203204
StateName = "Succeeded",
204-
Arguments = "[\"\\\"Arguments\\\"\"]",
205+
Arguments = new JsonParameter("[\"\\\"Arguments\\\"\"]", JsonParameter.ValueType.Array),
205206
});
206207

207208
JobData result = jobStorageConnection.GetJobData(jobId.ToString(CultureInfo.InvariantCulture));
@@ -239,7 +240,7 @@ public void GetStateData_ReturnsCorrectData()
239240
{
240241
string createJobSql = $@"
241242
INSERT INTO ""{GetSchemaName()}"".""job"" (""invocationdata"", ""arguments"", ""statename"", ""createdat"")
242-
VALUES ('', '', '', NOW()) RETURNING ""id"";
243+
VALUES ('{{}}', '[]', '', NOW()) RETURNING ""id"";
243244
";
244245

245246
string createStateSql = $@"
@@ -265,7 +266,7 @@ public void GetStateData_ReturnsCorrectData()
265266
long jobId = connection.QuerySingle<long>(createJobSql);
266267

267268
long stateId = connection.QuerySingle<long>(createStateSql,
268-
new { JobId = jobId, Name = "Name", Reason = "Reason", Data = SerializationHelper.Serialize(data) });
269+
new { JobId = jobId, Name = "Name", Reason = "Reason", Data = new JsonParameter(SerializationHelper.Serialize(data)) });
269270

270271
connection.Execute(updateJobStateSql, new { JobId = jobId, StateId = stateId });
271272

@@ -278,29 +279,6 @@ public void GetStateData_ReturnsCorrectData()
278279
});
279280
}
280281

281-
[Fact]
282-
[CleanDatabase]
283-
public void GetJobData_ReturnsJobLoadException_IfThereWasADeserializationException()
284-
{
285-
string arrangeSql = $@"
286-
INSERT INTO ""{GetSchemaName()}"".""job"" (""invocationdata"", ""arguments"", ""statename"", ""createdat"")
287-
VALUES (@InvocationData, @Arguments, @StateName, NOW()) RETURNING ""id""
288-
";
289-
290-
UseConnections((connection, jobStorageConnection) => {
291-
long jobId = connection.QuerySingle<long>(arrangeSql,
292-
new {
293-
InvocationData = SerializationHelper.Serialize(new InvocationData(null, null, null, null)),
294-
StateName = "Succeeded",
295-
Arguments = "['Arguments']",
296-
});
297-
298-
JobData result = jobStorageConnection.GetJobData(jobId.ToString(CultureInfo.InvariantCulture));
299-
300-
Assert.NotNull(result.LoadException);
301-
});
302-
}
303-
304282
[Fact]
305283
[CleanDatabase]
306284
public void SetParameter_ThrowsAnException_WhenJobIdIsNull()
@@ -329,7 +307,7 @@ public void SetParameters_CreatesNewParameter_WhenParameterWithTheGivenNameDoesN
329307
{
330308
string arrangeSql = $@"
331309
INSERT INTO ""{GetSchemaName()}"".""job"" (""invocationdata"", ""arguments"", ""createdat"")
332-
VALUES ('', '', NOW()) RETURNING ""id""
310+
VALUES ('{{}}', '[]', NOW()) RETURNING ""id""
333311
";
334312

335313
UseConnections((connection, jobStorageConnection) => {
@@ -350,7 +328,7 @@ public void SetParameter_UpdatesValue_WhenParameterWithTheGivenName_AlreadyExist
350328
{
351329
string arrangeSql = $@"
352330
INSERT INTO ""{GetSchemaName()}"".""job"" (""invocationdata"", ""arguments"", ""createdat"")
353-
VALUES ('', '', NOW()) RETURNING ""id""
331+
VALUES ('{{}}', '[]', NOW()) RETURNING ""id""
354332
";
355333

356334
UseConnections((connection, jobStorageConnection) => {
@@ -372,7 +350,7 @@ public void SetParameter_CanAcceptNulls_AsValues()
372350
{
373351
string arrangeSql = $@"
374352
INSERT INTO ""{GetSchemaName()}"".""job"" (""invocationdata"", ""arguments"", ""createdat"")
375-
VALUES ('', '', NOW()) RETURNING ""id""
353+
VALUES ('{{}}', '[]', NOW()) RETURNING ""id""
376354
";
377355

378356
UseConnections((connection, jobStorageConnection) => {
@@ -426,7 +404,7 @@ public void GetParameter_ReturnsParameterValue_WhenJobExists()
426404
string arrangeSql = $@"
427405
WITH ""insertedjob"" AS (
428406
INSERT INTO ""{GetSchemaName()}"".""job"" (""invocationdata"", ""arguments"", ""createdat"")
429-
VALUES ('', '', NOW()) RETURNING ""id""
407+
VALUES ('{{}}', '[]', NOW()) RETURNING ""id""
430408
)
431409
INSERT INTO ""{GetSchemaName()}"".""jobparameter"" (""jobid"", ""name"", ""value"")
432410
SELECT ""insertedjob"".""id"", @Name, @Value
@@ -522,15 +500,17 @@ public void AnnounceServer_CreatesOrUpdatesARecord()
522500
{
523501
UseConnections((connection, jobStorageConnection) => {
524502
ServerContext context1 = new ServerContext {
525-
Queues = new[] { "critical", "default" },
526503
WorkerCount = 4,
504+
Queues = new[] { "critical", "default" },
527505
};
528506
jobStorageConnection.AnnounceServer("server", context1);
529507

530508
dynamic server = connection.Query($@"SELECT * FROM ""{GetSchemaName()}"".""server""").Single();
531509
Assert.Equal("server", server.id);
532-
Assert.True(((string)server.data).StartsWith("{\"WorkerCount\":4,\"Queues\":[\"critical\",\"default\"],\"StartedAt\":"),
533-
server.data);
510+
511+
ServerContext serverData = JsonSerializer.Deserialize<ServerContext>(server.data);
512+
Assert.Equal(4, serverData.WorkerCount);
513+
Assert.Equal(context1.Queues, serverData.Queues);
534514
Assert.NotNull(server.lastheartbeat);
535515

536516
ServerContext context2 = new ServerContext {
@@ -557,8 +537,8 @@ public void RemoveServer_RemovesAServerRecord()
557537
{
558538
string arrangeSql = $@"
559539
INSERT INTO ""{GetSchemaName()}"".""server"" (""id"", ""data"", ""lastheartbeat"")
560-
VALUES ('Server1', '', NOW()),
561-
('Server2', '', NOW())
540+
VALUES ('Server1', '{{}}', NOW()),
541+
('Server2', '{{}}', NOW())
562542
";
563543

564544
UseConnections((connection, jobStorageConnection) => {
@@ -593,7 +573,7 @@ public void Heartbeat_UpdatesLastHeartbeat_OfTheServerWithGivenId()
593573
{
594574
string arrangeSql = $@"
595575
INSERT INTO ""{GetSchemaName()}"".""server"" (""id"", ""data"", ""lastheartbeat"")
596-
VALUES ('server1', '', '2012-12-12 12:12:12'), ('server2', '', '2012-12-12 12:12:12')
576+
VALUES ('server1', '{{}}', '2012-12-12 12:12:12'), ('server2', '{{}}', '2012-12-12 12:12:12')
597577
";
598578

599579
UseConnections((connection, jobStorageConnection) => {
@@ -622,7 +602,7 @@ public void RemoveTimedOutServers_DoItsWorkPerfectly()
622602
{
623603
string arrangeSql = $@"
624604
INSERT INTO ""{GetSchemaName()}"".""server"" (""id"", ""data"", ""lastheartbeat"")
625-
VALUES (@Id, '', @Heartbeat)
605+
VALUES (@Id, '{{}}', @Heartbeat)
626606
";
627607

628608
UseConnections((connection, jobStorageConnection) => {

tests/Hangfire.PostgreSql.Tests/PostgreSqlInstallerFacts.cs

+4-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public void InstallingSchemaUpdatesVersionAndShouldNotThrowAnException()
1919
PostgreSqlObjectsInstaller.Install(connection, schemaName);
2020

2121
int lastVersion = connection.Query<int>($@"SELECT version FROM ""{schemaName}"".""schema""").Single();
22-
Assert.Equal(19, lastVersion);
22+
Assert.Equal(20, lastVersion);
2323

2424
connection.Execute($@"DROP SCHEMA ""{schemaName}"" CASCADE;");
2525
});
@@ -38,7 +38,7 @@ public void InstallingSchemaWithCapitalsUpdatesVersionAndShouldNotThrowAnExcepti
3838
PostgreSqlObjectsInstaller.Install(connection, schemaName);
3939

4040
int lastVersion = connection.Query<int>($@"SELECT version FROM ""{schemaName}"".""schema""").Single();
41-
Assert.Equal(19, lastVersion);
41+
Assert.Equal(20, lastVersion);
4242

4343
connection.Execute($@"DROP SCHEMA ""{schemaName}"" CASCADE;");
4444
});
@@ -49,10 +49,8 @@ public void InstallingSchemaWithCapitalsUpdatesVersionAndShouldNotThrowAnExcepti
4949

5050
private static void UseConnection(Action<NpgsqlConnection> action)
5151
{
52-
using (NpgsqlConnection connection = ConnectionUtils.CreateConnection())
53-
{
54-
action(connection);
55-
}
52+
using NpgsqlConnection connection = ConnectionUtils.CreateConnection();
53+
action(connection);
5654
}
5755
}
5856
}

tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs

+8-8
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ WITH i AS (
204204
// Arrange
205205
UseConnection((connection, storage) => {
206206
connection.Execute(arrangeSql,
207-
new { InvocationData = "", Arguments = "", Queue = "default" });
207+
new { InvocationData = new JsonParameter(""), Arguments = new JsonParameter("", JsonParameter.ValueType.Array), Queue = "default" });
208208
PostgreSqlJobQueue queue = CreateJobQueue(storage, useNativeDatabaseTransactions);
209209

210210
// Act
@@ -255,8 +255,8 @@ WITH i AS (
255255
new {
256256
Queue = "default",
257257
FetchedAt = DateTime.UtcNow.AddDays(-1),
258-
InvocationData = "",
259-
Arguments = "",
258+
InvocationData = new JsonParameter(""),
259+
Arguments = new JsonParameter("", JsonParameter.ValueType.Array),
260260
});
261261
PostgreSqlJobQueue queue = CreateJobQueue(storage, useNativeDatabaseTransactions);
262262

@@ -298,8 +298,8 @@ WITH i AS (
298298
UseConnection((connection, storage) => {
299299
connection.Execute(arrangeSql,
300300
new[] {
301-
new { Queue = "default", InvocationData = "", Arguments = "" },
302-
new { Queue = "default", InvocationData = "", Arguments = "" },
301+
new { Queue = "default", InvocationData = new JsonParameter(""), Arguments = new JsonParameter("", JsonParameter.ValueType.Array) },
302+
new { Queue = "default", InvocationData = new JsonParameter(""), Arguments = new JsonParameter("", JsonParameter.ValueType.Array) },
303303
});
304304
PostgreSqlJobQueue queue = CreateJobQueue(storage, useNativeDatabaseTransactions);
305305

@@ -345,7 +345,7 @@ WITH i AS (
345345
PostgreSqlJobQueue queue = CreateJobQueue(storage, useNativeDatabaseTransactions);
346346

347347
connection.Execute(arrangeSql,
348-
new { Queue = "critical", InvocationData = "", Arguments = "" });
348+
new { Queue = "critical", InvocationData = new JsonParameter(""), Arguments = new JsonParameter("", JsonParameter.ValueType.Array) });
349349

350350
Assert.Throws<OperationCanceledException>(() => queue.Dequeue(_defaultQueues,
351351
CreateTimingOutCancellationToken()));
@@ -383,8 +383,8 @@ WITH i AS (
383383
UseConnection((connection, storage) => {
384384
connection.Execute(arrangeSql,
385385
new[] {
386-
new { Queue = queueNames.First(), InvocationData = "", Arguments = "" },
387-
new { Queue = queueNames.Last(), InvocationData = "", Arguments = "" },
386+
new { Queue = queueNames.First(), InvocationData = new JsonParameter("") , Arguments = new JsonParameter("", JsonParameter.ValueType.Array) },
387+
new { Queue = queueNames.Last(), InvocationData = new JsonParameter(""), Arguments = new JsonParameter("", JsonParameter.ValueType.Array) },
388388
});
389389

390390
PostgreSqlJobQueue queue = CreateJobQueue(storage, useNativeDatabaseTransactions);

0 commit comments

Comments
 (0)