Skip to content

Commit 12aa8fe

Browse files
bpamiriclaude
andcommitted
Fix DATETIME fractional-second rounding and detect DB type from datasource
MySQL/H2 DATETIME columns round fractional seconds >= 0.5 to the next second. When enqueue() stores runAt=Now() and processNext() checks runAt <= Now() milliseconds later, the rounded-up value appears in the future, making the job invisible. Fix: $now() helper truncates to whole seconds. Also replace application.wheels.adapterName with $detectDatabaseType() using cfdbinfo on the actual datasource — the adapter may have been detected from a different datasource in CI. Add H2-specific TIMESTAMP type in $ensureJobTable(). Test now asserts enqueue persisted before checking processNext result. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 15b8126 commit 12aa8fe

File tree

3 files changed

+79
-33
lines changed

3 files changed

+79
-33
lines changed

vendor/wheels/Job.cfc

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ component {
7272
data = arguments.data,
7373
queue = arguments.queue,
7474
priority = arguments.priority,
75-
runAt = Now()
75+
runAt = $now()
7676
);
7777
}
7878

@@ -94,7 +94,7 @@ component {
9494
data = arguments.data,
9595
queue = arguments.queue,
9696
priority = arguments.priority,
97-
runAt = DateAdd("s", arguments.seconds, Now())
97+
runAt = DateAdd("s", arguments.seconds, $now())
9898
);
9999
}
100100

@@ -132,7 +132,7 @@ component {
132132
) {
133133
local.id = CreateUUID();
134134
local.serializedData = SerializeJSON(arguments.data);
135-
local.now = Now();
135+
local.now = $now();
136136

137137
try {
138138
queryExecute(
@@ -198,7 +198,7 @@ component {
198198
public struct function processQueue(string queue = "", numeric limit = 10) {
199199
local.result = {processed = 0, failed = 0, errors = []};
200200
local.params = {
201-
runAt = {value = Now(), cfsqltype = "cf_sql_timestamp"}
201+
runAt = {value = $now(), cfsqltype = "cf_sql_timestamp"}
202202
};
203203

204204
local.sql = "SELECT id, jobClass, queue, data, attempts, maxRetries
@@ -246,7 +246,7 @@ component {
246246
SET status = 'processing', attempts = attempts + 1, updatedAt = :updatedAt
247247
WHERE id = :id",
248248
{
249-
updatedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
249+
updatedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
250250
id = {value = arguments.jobRow.id, cfsqltype = "cf_sql_varchar"}
251251
},
252252
{datasource = variables.$datasource}
@@ -268,8 +268,8 @@ component {
268268
SET status = 'completed', completedAt = :completedAt, updatedAt = :updatedAt
269269
WHERE id = :id",
270270
{
271-
completedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
272-
updatedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
271+
completedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
272+
updatedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
273273
id = {value = arguments.jobRow.id, cfsqltype = "cf_sql_varchar"}
274274
},
275275
{datasource = variables.$datasource}
@@ -291,7 +291,7 @@ component {
291291
if (local.currentAttempts < local.maxRetries) {
292292
// Schedule retry with configurable exponential backoff, capped at maxDelay
293293
local.backoffSeconds = Min(this.baseDelay * (2 ^ local.currentAttempts), this.maxDelay);
294-
local.nextRunAt = DateAdd("s", local.backoffSeconds, Now());
294+
local.nextRunAt = DateAdd("s", local.backoffSeconds, $now());
295295

296296
queryExecute(
297297
"UPDATE _wheels_jobs
@@ -303,7 +303,7 @@ component {
303303
{
304304
lastError = {value = Left(e.message, 1000), cfsqltype = "cf_sql_longvarchar"},
305305
runAt = {value = local.nextRunAt, cfsqltype = "cf_sql_timestamp"},
306-
updatedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
306+
updatedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
307307
id = {value = arguments.jobRow.id, cfsqltype = "cf_sql_varchar"}
308308
},
309309
{datasource = variables.$datasource}
@@ -324,9 +324,9 @@ component {
324324
updatedAt = :updatedAt
325325
WHERE id = :id",
326326
{
327-
failedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
327+
failedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
328328
lastError = {value = Left(e.message, 1000), cfsqltype = "cf_sql_longvarchar"},
329-
updatedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
329+
updatedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
330330
id = {value = arguments.jobRow.id, cfsqltype = "cf_sql_varchar"}
331331
},
332332
{datasource = variables.$datasource}
@@ -388,8 +388,8 @@ component {
388388
runAt = :runAt, updatedAt = :updatedAt
389389
WHERE status = 'failed'";
390390
local.params = {
391-
runAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
392-
updatedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"}
391+
runAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
392+
updatedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"}
393393
};
394394

395395
if (Len(arguments.queue)) {
@@ -412,7 +412,7 @@ component {
412412
* @queue Optional queue name to filter by.
413413
*/
414414
public numeric function purgeCompleted(numeric days = 7, string queue = "") {
415-
local.cutoff = DateAdd("d", -arguments.days, Now());
415+
local.cutoff = DateAdd("d", -arguments.days, $now());
416416
local.sql = "DELETE FROM _wheels_jobs WHERE status = 'completed' AND completedAt < :cutoff";
417417
local.params = {
418418
cutoff = {value = local.cutoff, cfsqltype = "cf_sql_timestamp"}
@@ -447,21 +447,24 @@ component {
447447
}
448448

449449
try {
450-
// Detect database adapter for type compatibility
451-
local.adapterName = "";
452-
if (StructKeyExists(application, "wheels") && StructKeyExists(application.wheels, "adapterName")) {
453-
local.adapterName = application.wheels.adapterName;
454-
}
450+
// Detect actual database type from the datasource via JDBC metadata.
451+
// We query the datasource directly rather than using application.wheels.adapterName
452+
// because the adapter may have been detected from a different datasource.
453+
local.dbType = $detectDatabaseType();
455454

456455
// Use database-appropriate types
457-
if (local.adapterName == "OracleModel") {
456+
if (local.dbType == "oracle") {
458457
local.varcharType = "VARCHAR2";
459458
local.textType = "CLOB";
460459
local.datetimeType = "TIMESTAMP";
461-
} else if (local.adapterName == "PostgreSQLModel" || local.adapterName == "CockroachDBModel") {
460+
} else if (local.dbType == "postgresql") {
462461
local.varcharType = "VARCHAR";
463462
local.textType = "TEXT";
464463
local.datetimeType = "TIMESTAMP";
464+
} else if (local.dbType == "h2") {
465+
local.varcharType = "VARCHAR";
466+
local.textType = "CLOB";
467+
local.datetimeType = "TIMESTAMP";
465468
} else {
466469
local.varcharType = "VARCHAR";
467470
local.textType = "TEXT";
@@ -503,4 +506,34 @@ component {
503506
return false;
504507
}
505508
}
509+
510+
/**
511+
* Returns Now() truncated to whole seconds.
512+
* Prevents MySQL/H2 DATETIME rounding: when fractional seconds >= 0.5,
513+
* these databases round UP to the next second, making runAt appear in the future.
514+
*/
515+
private date function $now() {
516+
local.n = Now();
517+
return CreateDateTime(Year(local.n), Month(local.n), Day(local.n), Hour(local.n), Minute(local.n), Second(local.n));
518+
}
519+
520+
/**
521+
* Detect the database type from the actual datasource via JDBC metadata.
522+
* Returns: "oracle", "postgresql", "h2", "mysql", "sqlserver", "sqlite", or "default".
523+
*/
524+
private string function $detectDatabaseType() {
525+
try {
526+
cfdbinfo(type = "version", datasource = "#variables.$datasource#", name = "local.info");
527+
local.product = local.info.database_productname;
528+
if (FindNoCase("oracle", local.product)) return "oracle";
529+
if (FindNoCase("postgre", local.product)) return "postgresql";
530+
if (FindNoCase("h2", local.product)) return "h2";
531+
if (FindNoCase("mysql", local.product) || FindNoCase("mariadb", local.product)) return "mysql";
532+
if (FindNoCase("sql server", local.product)) return "sqlserver";
533+
if (FindNoCase("sqlite", local.product)) return "sqlite";
534+
} catch (any e) {
535+
// cfdbinfo not available — fall through to default
536+
}
537+
return "default";
538+
}
506539
}

vendor/wheels/JobWorker.cfc

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ component {
3535

3636
// Find the next candidate job
3737
local.params = {
38-
runAt = {value = Now(), cfsqltype = "cf_sql_timestamp"}
38+
runAt = {value = $now(), cfsqltype = "cf_sql_timestamp"}
3939
};
4040

4141
local.sql = "SELECT id, jobClass, queue, data, attempts, maxRetries
@@ -109,7 +109,7 @@ component {
109109
* @timeout Seconds after which a processing job is considered timed out. Default 300.
110110
*/
111111
public numeric function checkTimeouts(numeric timeout = 300) {
112-
local.cutoff = DateAdd("s", -arguments.timeout, Now());
112+
local.cutoff = DateAdd("s", -arguments.timeout, $now());
113113

114114
// Find timed-out jobs
115115
try {
@@ -202,7 +202,7 @@ component {
202202
worker = {id = this.workerId, startedAt = this.startedAt, processed = this.jobsProcessed, failed = this.jobsFailed}
203203
};
204204

205-
local.lookback = DateAdd("n", -arguments.minutes, Now());
205+
local.lookback = DateAdd("n", -arguments.minutes, $now());
206206
local.params = {lookback = {value = local.lookback, cfsqltype = "cf_sql_timestamp"}};
207207

208208
// Throughput — completed and failed in the window
@@ -270,7 +270,7 @@ component {
270270
* @limit Maximum number of jobs to retry. 0 = unlimited.
271271
*/
272272
public numeric function retryFailed(string queue = "", numeric limit = 0) {
273-
local.now = Now();
273+
local.now = $now();
274274

275275
// If limit specified, get the IDs first
276276
if (arguments.limit > 0) {
@@ -356,7 +356,7 @@ component {
356356
throw(type = "Wheels.InvalidArgument", message = "Purge status must be 'completed' or 'failed'.");
357357
}
358358

359-
local.cutoff = DateAdd("d", -arguments.days, Now());
359+
local.cutoff = DateAdd("d", -arguments.days, $now());
360360
local.dateColumn = (arguments.status == "completed") ? "completedAt" : "failedAt";
361361

362362
local.sql = "DELETE FROM _wheels_jobs WHERE status = :status AND #local.dateColumn# < :cutoff";
@@ -395,7 +395,7 @@ component {
395395
SET status = 'processing', attempts = attempts + 1, updatedAt = :updatedAt
396396
WHERE id = :id AND status = 'pending'",
397397
{
398-
updatedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
398+
updatedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
399399
id = {value = arguments.jobId, cfsqltype = "cf_sql_varchar"}
400400
},
401401
{datasource = variables.$datasource}
@@ -429,8 +429,8 @@ component {
429429
SET status = 'completed', completedAt = :completedAt, updatedAt = :updatedAt
430430
WHERE id = :id",
431431
{
432-
completedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
433-
updatedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
432+
completedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
433+
updatedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
434434
id = {value = arguments.jobRow.id, cfsqltype = "cf_sql_varchar"}
435435
},
436436
{datasource = variables.$datasource}
@@ -473,7 +473,7 @@ component {
473473
}
474474

475475
local.backoffSeconds = Min(local.baseDelay * (2 ^ arguments.currentAttempts), local.maxDelay);
476-
local.nextRunAt = DateAdd("s", local.backoffSeconds, Now());
476+
local.nextRunAt = DateAdd("s", local.backoffSeconds, $now());
477477

478478
queryExecute(
479479
"UPDATE _wheels_jobs
@@ -485,7 +485,7 @@ component {
485485
{
486486
lastError = {value = Left(arguments.errorMessage, 1000), cfsqltype = "cf_sql_longvarchar"},
487487
runAt = {value = local.nextRunAt, cfsqltype = "cf_sql_timestamp"},
488-
updatedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
488+
updatedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
489489
id = {value = arguments.jobId, cfsqltype = "cf_sql_varchar"}
490490
},
491491
{datasource = variables.$datasource}
@@ -515,9 +515,9 @@ component {
515515
updatedAt = :updatedAt
516516
WHERE id = :id",
517517
{
518-
failedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
518+
failedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
519519
lastError = {value = Left(arguments.errorMessage, 1000), cfsqltype = "cf_sql_longvarchar"},
520-
updatedAt = {value = Now(), cfsqltype = "cf_sql_timestamp"},
520+
updatedAt = {value = $now(), cfsqltype = "cf_sql_timestamp"},
521521
id = {value = arguments.jobId, cfsqltype = "cf_sql_varchar"}
522522
},
523523
{datasource = variables.$datasource}
@@ -530,6 +530,15 @@ component {
530530
);
531531
}
532532

533+
/**
534+
* Returns Now() truncated to whole seconds.
535+
* Prevents MySQL/H2 DATETIME rounding: fractional seconds >= 0.5 round UP.
536+
*/
537+
private date function $now() {
538+
local.n = Now();
539+
return CreateDateTime(Year(local.n), Month(local.n), Day(local.n), Hour(local.n), Minute(local.n), Second(local.n));
540+
}
541+
533542
/**
534543
* Ensure the _wheels_jobs table exists. Delegates to Job.cfc's implementation.
535544
*/

vendor/wheels/tests/specs/jobs/JobWorkerSpec.cfc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ component extends="wheels.WheelsTest" {
6969
local.testJob = new app.jobs.ProcessOrdersJob();
7070
local.enqueued = local.testJob.enqueue(data = {test: true}, queue = "test_claim");
7171

72+
// Verify the job was persisted (catches silent enqueue failures)
73+
expect(local.enqueued).toHaveKey("persisted");
74+
expect(local.enqueued.persisted).toBeTrue();
75+
7276
// Process it
7377
local.worker = new wheels.JobWorker();
7478
local.result = local.worker.processNext(queues = "test_claim");

0 commit comments

Comments
 (0)