1+ // Copyright 2023 the Deno authors. All rights reserved. MIT license.
2+
13use std:: collections:: HashSet ;
24use std:: sync:: Arc ;
35use std:: time:: Duration ;
@@ -24,6 +26,8 @@ use rusqlite::OptionalExtension;
2426use rusqlite:: Transaction ;
2527use uuid:: Uuid ;
2628
29+ use crate :: time:: utc_now;
30+
2731const STATEMENT_INC_AND_GET_DATA_VERSION : & str =
2832 "update data_version set version = version + ? where k = 0 returning version" ;
2933const STATEMENT_KV_RANGE_SCAN : & str =
@@ -343,7 +347,7 @@ impl SqliteBackend {
343347
344348 pub fn queue_running_keepalive ( & mut self ) -> Result < ( ) , anyhow:: Error > {
345349 let running_messages = self . messages_running . clone ( ) ;
346- let now = Utc :: now ( ) ;
350+ let now = utc_now ( ) ;
347351 self . run_tx ( |tx, _| {
348352 let mut update_deadline_stmt =
349353 tx. prepare_cached ( STATEMENT_QUEUE_UPDATE_RUNNING_DEADLINE ) ?;
@@ -360,7 +364,7 @@ impl SqliteBackend {
360364 }
361365
362366 pub fn queue_cleanup ( & mut self ) -> Result < ( ) , anyhow:: Error > {
363- let now = Utc :: now ( ) ;
367+ let now = utc_now ( ) ;
364368 let queue_dequeue_waker = self . dequeue_notify . clone ( ) ;
365369 loop {
366370 let done = self . run_tx ( |tx, rng| {
@@ -389,7 +393,7 @@ impl SqliteBackend {
389393 & mut self ,
390394 ) -> Result < ( Option < DequeuedMessage > , Option < DateTime < Utc > > ) , anyhow:: Error >
391395 {
392- let now = Utc :: now ( ) ;
396+ let now = utc_now ( ) ;
393397
394398 let can_dispatch = self . messages_running . len ( ) < DISPATCH_CONCURRENCY_LIMIT ;
395399
@@ -476,7 +480,7 @@ impl SqliteBackend {
476480 id : & QueueMessageId ,
477481 success : bool ,
478482 ) -> Result < ( ) , anyhow:: Error > {
479- let now = Utc :: now ( ) ;
483+ let now = utc_now ( ) ;
480484 let requeued = self . run_tx ( |tx, rng| {
481485 let requeued = if success {
482486 let changed = tx
@@ -498,7 +502,7 @@ impl SqliteBackend {
498502 pub fn collect_expired (
499503 & mut self ,
500504 ) -> Result < Option < DateTime < Utc > > , anyhow:: Error > {
501- let now = Utc :: now ( ) ;
505+ let now = utc_now ( ) ;
502506 self . run_tx ( |tx, _| {
503507 tx. prepare_cached ( STATEMENT_DELETE_ALL_EXPIRED ) ?
504508 . execute ( params ! [ now. timestamp_millis( ) ] ) ?;
0 commit comments