Skip to content

Commit ba7fa69

Browse files
authored
Merge pull request #2437 from digitallyinduced/job-queue-pool-explicit
Decouple job queue from ModelContext
2 parents bd3ed18 + 0030577 commit ba7fa69

File tree

3 files changed

+131
-134
lines changed

3 files changed

+131
-134
lines changed

ihp/IHP/Job/Queue.hs

Lines changed: 117 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import qualified Database.PostgreSQL.Simple.ToField as PG
1313
import qualified Control.Concurrent.Async as Async
1414
import qualified Control.Concurrent as Concurrent
1515
import qualified Control.Exception.Safe as Exception
16-
import IHP.ModelSupport
16+
import IHP.ModelSupport (HasqlError(..), Table(..), isCachedPlanError, GetModelByTableName, InputValue(..))
17+
import IHP.ModelSupport.Types (Id'(..), PrimaryKey)
1718
import IHP.Controller.Param
1819
import qualified System.Random as Random
1920
import qualified IHP.PGListener as PGListener
@@ -28,11 +29,25 @@ import qualified Hasql.Pool as HasqlPool
2829
import qualified Hasql.Session as HasqlSession
2930
import qualified Hasql.Statement as Hasql
3031
import qualified Hasql.Decoders as Decoders
31-
import IHP.Hasql.Encoders ()
3232
import qualified Data.Text as Text
33-
import Data.Functor.Contravariant.Divisible (conquer)
3433
import Control.Concurrent.STM (TBQueue, atomically, writeTBQueue, STM)
3534
import Control.Concurrent.STM.TBQueue (isFullTBQueue)
35+
import Data.Functor.Contravariant (contramap)
36+
37+
-- | Run a hasql session against the pool, retrying once on cached plan errors.
38+
runPool :: HasqlPool.Pool -> HasqlSession.Session a -> IO a
39+
runPool pool session = do
40+
result <- HasqlPool.use pool session
41+
case result of
42+
Left err
43+
| isCachedPlanError err -> do
44+
HasqlPool.release pool
45+
retryResult <- HasqlPool.use pool session
46+
case retryResult of
47+
Left retryErr -> throwIO (HasqlError retryErr)
48+
Right a -> pure a
49+
| otherwise -> throwIO (HasqlError err)
50+
Right a -> pure a
3651

3752
-- | Lock and fetch the next available job. In case no job is available returns Nothing.
3853
--
@@ -43,38 +58,40 @@ import Control.Concurrent.STM.TBQueue (isFullTBQueue)
4358
-- __Example:__ Locking a SendMailJob
4459
--
4560
-- > let workerId :: UUID = "faa5ba30-1d76-4adf-bf01-2d1f95cddc04"
46-
-- > job <- fetchNextJob @SendMailJob workerId
61+
-- > job <- fetchNextJob @SendMailJob pool workerId
4762
--
4863
-- After you're done with the job, call 'jobDidFail' or 'jobDidSucceed' to make it available to the queue again.
4964
fetchNextJob :: forall job.
50-
( ?modelContext :: ModelContext
51-
, job ~ GetModelByTableName (GetTableName job)
65+
( job ~ GetModelByTableName (GetTableName job)
5266
, FromRowHasql job
5367
, Show (PrimaryKey (GetTableName job))
5468
, Table job
55-
) => UUID -> IO (Maybe job)
56-
fetchNextJob workerId = do
57-
let tn = tableName @job
58-
let cols = Text.intercalate ", " (columnNames @job)
59-
let sql = "UPDATE " <> tn
60-
<> " SET status = 'job_status_running', locked_at = NOW(), locked_by = $1"
69+
) => HasqlPool.Pool -> UUID -> IO (Maybe job)
70+
fetchNextJob pool workerId = do
71+
let tableNameText = tableName @job
72+
let returningColumns = Text.intercalate ", " (columnNames @job)
73+
let sql = "UPDATE " <> tableNameText
74+
<> " SET status = 'job_status_running'"
75+
<> ", locked_at = NOW(), locked_by = $1"
6176
<> ", attempts_count = attempts_count + 1"
62-
<> " WHERE id IN (SELECT id FROM " <> tn
77+
<> " WHERE id IN (SELECT id FROM " <> tableNameText
6378
<> " WHERE " <> pendingJobConditionSQL
6479
<> " ORDER BY created_at LIMIT 1 FOR UPDATE SKIP LOCKED)"
65-
<> " RETURNING " <> cols
66-
let statement = Hasql.preparable sql (Encoders.param defaultParam) (Decoders.rowMaybe (hasqlRowDecoder @job))
67-
68-
pool <- getHasqlPool
69-
withoutQueryLogging (sqlStatementHasql pool workerId statement)
80+
<> " RETURNING " <> returningColumns
81+
let encoder = Encoders.param (Encoders.nonNullable Encoders.uuid)
82+
let decoder = Decoders.rowMaybe (hasqlRowDecoder @job)
83+
let statement = Hasql.unpreparable sql encoder decoder
84+
runPool pool (HasqlSession.statement workerId statement)
7085

7186
-- | Shared WHERE condition for fetching pending jobs as a SQL text fragment.
7287
-- Matches jobs that are either not started or in retry state,
7388
-- not locked, and whose run_at time has passed.
7489
-- Enum values are inlined as SQL string literals (PostgreSQL casts them to job_status).
7590
pendingJobConditionSQL :: Text
7691
pendingJobConditionSQL =
77-
"(status = 'job_status_not_started' OR status = 'job_status_retry') AND locked_by IS NULL AND run_at <= NOW()"
92+
"(status = 'job_status_not_started'"
93+
<> " OR status = 'job_status_retry'"
94+
<> ") AND locked_by IS NULL AND run_at <= NOW()"
7895

7996
-- | Calls a callback every time something is inserted, updated or deleted in a given database table.
8097
--
@@ -92,14 +109,13 @@ pendingJobConditionSQL =
92109
-- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@
93110
-- You will see that @"Something changed in the projects table"@ is printed onto the screen.
94111
--
95-
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO (PGListener.Subscription, ReleaseKey)
96-
watchForJob pgListener tableName pollInterval onNewJob = do
112+
watchForJob :: (?context :: context, HasField "logger" context Log.Logger) => HasqlPool.Pool -> PGListener.PGListener -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO (PGListener.Subscription, ReleaseKey)
113+
watchForJob pool pgListener tableName pollInterval onNewJob = do
97114
let tableNameBS = cs tableName
98115
liftIO do
99-
pool <- getHasqlPool
100-
withoutQueryLogging (runSessionHasql pool (HasqlSession.script (createNotificationTriggerSQL tableNameBS)))
116+
runPool pool (HasqlSession.script (createNotificationTriggerSQL tableNameBS))
101117

102-
poller <- pollForJob tableName pollInterval onNewJob
118+
poller <- pollForJob pool tableName pollInterval onNewJob
103119
subscription <- liftIO $ pgListener |> PGListener.subscribe (channelName tableNameBS) (const (do _ <- atomically $ tryWriteTBQueue onNewJob JobAvailable; pure ()))
104120

105121
pure (subscription, poller)
@@ -112,18 +128,16 @@ watchForJob pgListener tableName pollInterval onNewJob = do
112128
--
113129
-- This function returns a Async. Call 'cancel' on the async to stop polling the database.
114130
--
115-
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO ReleaseKey
116-
pollForJob tableName pollInterval onNewJob = do
131+
pollForJob :: (?context :: context, HasField "logger" context Log.Logger) => HasqlPool.Pool -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO ReleaseKey
132+
pollForJob pool tableName pollInterval onNewJob = do
117133
let sql = "SELECT COUNT(*) FROM " <> tableName
118134
<> " WHERE " <> pendingJobConditionSQL
119-
let statement = Hasql.preparable sql conquer (Decoders.singleRow (Decoders.column (Decoders.nonNullable Decoders.int8)))
135+
let decoder = Decoders.singleRow (Decoders.column (Decoders.nonNullable Decoders.int8))
136+
let statement = Hasql.unpreparable sql Encoders.noParams decoder
120137
let handler = do
121-
let ?context = ?modelContext
122-
pool <- getHasqlPool
123138
forever do
124139
result <- Exception.tryAny do
125-
-- We don't log the queries to the console as it's filling up the log entries with noise
126-
count :: Int <- fromIntegral <$> withoutQueryLogging (sqlStatementHasql pool () statement)
140+
count :: Int <- fromIntegral <$> runPool pool (HasqlSession.statement () statement)
127141

128142
-- For every job we send one signal to the job workers
129143
-- This way we use full concurrency when we find multiple jobs
@@ -178,99 +192,94 @@ channelName tableName = "job_available_" <> tableName
178192

179193
-- | Called when a job failed. Sets the job status to 'JobStatusFailed' or 'JobStatusRetry' (if more attempts are possible) and resets 'lockedBy'
180194
jobDidFail :: forall job context.
181-
( job ~ GetModelByTableName (GetTableName job)
182-
, SetField "lockedBy" job (Maybe UUID)
183-
, SetField "status" job JobStatus
184-
, SetField "updatedAt" job UTCTime
185-
, SetField "runAt" job UTCTime
195+
( Table job
196+
, HasField "id" job (Id' (GetTableName job))
197+
, PrimaryKey (GetTableName job) ~ UUID
186198
, HasField "attemptsCount" job Int
187-
, SetField "lastError" job (Maybe Text)
199+
, HasField "runAt" job UTCTime
188200
, Job job
189-
, CanUpdate job
190-
, Show job
191-
, ?modelContext :: ModelContext
192201
, ?context :: context
193202
, HasField "logger" context Log.Logger
194-
) => job -> SomeException -> IO ()
195-
jobDidFail job exception = do
203+
) => HasqlPool.Pool -> job -> SomeException -> IO ()
204+
jobDidFail pool job exception = do
196205
now <- getCurrentTime
197206

198207
Log.warn ("Failed job with exception: " <> tshow exception)
199208

200209
let ?job = job
201210
let canRetry = job.attemptsCount < maxAttempts
202211
let status = if canRetry then JobStatusRetry else JobStatusFailed
203-
let nextRunAt = addUTCTime (backoffDelay (backoffStrategy @job) job.attemptsCount) now
204-
job
205-
|> set #status status
206-
|> set #lockedBy Nothing
207-
|> set #updatedAt now
208-
|> set #lastError (Just (tshow exception))
209-
|> (if canRetry then set #runAt nextRunAt else id)
210-
|> updateRecord
211-
212-
pure ()
212+
let nextRunAt = if canRetry
213+
then addUTCTime (backoffDelay (backoffStrategy @job) job.attemptsCount) now
214+
else job.runAt
215+
let Id jobId = job.id
216+
let tableNameText = tableName @job
217+
let sql = "UPDATE " <> tableNameText
218+
<> " SET status = $1::public.job_status, locked_by = NULL, updated_at = $2, last_error = $3, run_at = $4 WHERE id = $5"
219+
let encoder =
220+
contramap (\(s,_,_,_,_) -> s) (Encoders.param (Encoders.nonNullable Encoders.text))
221+
<> contramap (\(_,u,_,_,_) -> u) (Encoders.param (Encoders.nonNullable Encoders.timestamptz))
222+
<> contramap (\(_,_,e,_,_) -> e) (Encoders.param (Encoders.nonNullable Encoders.text))
223+
<> contramap (\(_,_,_,r,_) -> r) (Encoders.param (Encoders.nonNullable Encoders.timestamptz))
224+
<> contramap (\(_,_,_,_,i) -> i) (Encoders.param (Encoders.nonNullable Encoders.uuid))
225+
let statement = Hasql.unpreparable sql encoder Decoders.noResult
226+
runPool pool (HasqlSession.statement (inputValue status, now, tshow exception, nextRunAt, jobId) statement)
213227

214228
jobDidTimeout :: forall job context.
215-
( job ~ GetModelByTableName (GetTableName job)
216-
, SetField "lockedBy" job (Maybe UUID)
217-
, SetField "status" job JobStatus
218-
, SetField "updatedAt" job UTCTime
219-
, SetField "runAt" job UTCTime
229+
( Table job
230+
, HasField "id" job (Id' (GetTableName job))
231+
, PrimaryKey (GetTableName job) ~ UUID
220232
, HasField "attemptsCount" job Int
221-
, SetField "lastError" job (Maybe Text)
233+
, HasField "runAt" job UTCTime
222234
, Job job
223-
, CanUpdate job
224-
, Show job
225-
, ?modelContext :: ModelContext
226235
, ?context :: context
227236
, HasField "logger" context Log.Logger
228-
) => job -> IO ()
229-
jobDidTimeout job = do
237+
) => HasqlPool.Pool -> job -> IO ()
238+
jobDidTimeout pool job = do
230239
now <- getCurrentTime
231240

232241
Log.warn ("Job timed out" :: Text)
233242

234243
let ?job = job
235244
let canRetry = job.attemptsCount < maxAttempts
236245
let status = if canRetry then JobStatusRetry else JobStatusTimedOut
237-
let nextRunAt = addUTCTime (backoffDelay (backoffStrategy @job) job.attemptsCount) now
238-
job
239-
|> set #status status
240-
|> set #lockedBy Nothing
241-
|> set #updatedAt now
242-
|> setJust #lastError "Timeout reached"
243-
|> (if canRetry then set #runAt nextRunAt else id)
244-
|> updateRecord
245-
246-
pure ()
246+
let nextRunAt = if canRetry
247+
then addUTCTime (backoffDelay (backoffStrategy @job) job.attemptsCount) now
248+
else job.runAt
249+
let Id jobId = job.id
250+
let tableNameText = tableName @job
251+
let sql = "UPDATE " <> tableNameText
252+
<> " SET status = $1::public.job_status, locked_by = NULL, updated_at = $2, last_error = $3, run_at = $4 WHERE id = $5"
253+
let encoder =
254+
contramap (\(s,_,_,_,_) -> s) (Encoders.param (Encoders.nonNullable Encoders.text))
255+
<> contramap (\(_,u,_,_,_) -> u) (Encoders.param (Encoders.nonNullable Encoders.timestamptz))
256+
<> contramap (\(_,_,e,_,_) -> e) (Encoders.param (Encoders.nonNullable Encoders.text))
257+
<> contramap (\(_,_,_,r,_) -> r) (Encoders.param (Encoders.nonNullable Encoders.timestamptz))
258+
<> contramap (\(_,_,_,_,i) -> i) (Encoders.param (Encoders.nonNullable Encoders.uuid))
259+
let statement = Hasql.unpreparable sql encoder Decoders.noResult
260+
runPool pool (HasqlSession.statement (inputValue status, now, "Timeout reached" :: Text, nextRunAt, jobId) statement)
247261

248262

249263
-- | Called when a job succeeded. Sets the job status to 'JobStatusSucceded' and resets 'lockedBy'
250264
jobDidSucceed :: forall job context.
251-
( job ~ GetModelByTableName (GetTableName job)
252-
, SetField "lockedBy" job (Maybe UUID)
253-
, SetField "status" job JobStatus
254-
, SetField "updatedAt" job UTCTime
255-
, HasField "attemptsCount" job Int
256-
, SetField "lastError" job (Maybe Text)
257-
, Job job
258-
, CanUpdate job
259-
, Show job
260-
, ?modelContext :: ModelContext
265+
( Table job
266+
, HasField "id" job (Id' (GetTableName job))
267+
, PrimaryKey (GetTableName job) ~ UUID
261268
, ?context :: context
262269
, HasField "logger" context Log.Logger
263-
) => job -> IO ()
264-
jobDidSucceed job = do
270+
) => HasqlPool.Pool -> job -> IO ()
271+
jobDidSucceed pool job = do
265272
Log.info ("Succeeded job" :: Text)
266273
updatedAt <- getCurrentTime
267-
job
268-
|> set #status JobStatusSucceeded
269-
|> set #lockedBy Nothing
270-
|> set #updatedAt updatedAt
271-
|> updateRecord
272-
273-
pure ()
274+
let Id jobId = job.id
275+
let tableNameText = tableName @job
276+
let sql = "UPDATE " <> tableNameText
277+
<> " SET status = 'job_status_succeeded', locked_by = NULL, updated_at = $1 WHERE id = $2"
278+
let encoder =
279+
contramap fst (Encoders.param (Encoders.nonNullable Encoders.timestamptz))
280+
<> contramap snd (Encoders.param (Encoders.nonNullable Encoders.uuid))
281+
let statement = Hasql.unpreparable sql encoder Decoders.noResult
282+
runPool pool (HasqlSession.statement (updatedAt, jobId) statement)
274283

275284
-- | Compute the delay before the next retry attempt.
276285
--
@@ -288,33 +297,32 @@ backoffDelay (ExponentialBackoff {delayInSeconds}) attempts =
288297
-- - Recently stale jobs (within 24h) are set back to retry
289298
-- - Ancient stale jobs (older than 24h) are marked as failed
290299
recoverStaleJobs :: forall job.
291-
( ?modelContext :: ModelContext
292-
, Table job
293-
) => NominalDiffTime -> IO ()
294-
recoverStaleJobs staleThreshold = do
295-
let tn = tableName @job
296-
297-
-- Tier 1: Recently stale jobs (threshold..24h) → retry
298-
let retrySql = "UPDATE " <> tn
300+
( Table job
301+
) => HasqlPool.Pool -> NominalDiffTime -> IO ()
302+
recoverStaleJobs pool staleThreshold = do
303+
let tableNameText = tableName @job
304+
-- Tier 1: Recently stale jobs (threshold..24h) -> retry
305+
let retrySql =
306+
"UPDATE " <> tableNameText
299307
<> " SET status = 'job_status_retry', locked_by = NULL, locked_at = NULL, run_at = NOW()"
300308
<> " WHERE status = 'job_status_running'"
301309
<> " AND locked_at < NOW() - interval '1 second' * $1"
302310
<> " AND locked_at > NOW() - interval '1 day'"
303-
let retryStatement = Hasql.preparable retrySql
304-
(Encoders.param (Encoders.nonNullable Encoders.int4))
305-
Decoders.noResult
311+
let retryEncoder = Encoders.param (Encoders.nonNullable (contramap (fromIntegral :: Int -> Int64) Encoders.int8))
312+
let retryStatement = Hasql.unpreparable retrySql retryEncoder Decoders.noResult
306313

307-
-- Tier 2: Ancient stale jobs (>24h) → mark failed
308-
let failSql = "UPDATE " <> tn
314+
-- Tier 2: Ancient stale jobs (>24h) -> mark failed
315+
let failSql =
316+
"UPDATE " <> tableNameText
309317
<> " SET status = 'job_status_failed', locked_by = NULL, locked_at = NULL"
310318
<> ", last_error = 'Stale job: worker likely crashed'"
311319
<> " WHERE status = 'job_status_running'"
312320
<> " AND locked_at < NOW() - interval '1 day'"
313-
let failStatement = Hasql.preparable failSql conquer Decoders.noResult
321+
let failStatement = Hasql.unpreparable failSql Encoders.noParams Decoders.noResult
314322

315-
pool <- getHasqlPool
316-
withoutQueryLogging (sqlExecStatement pool (round staleThreshold :: Int32) retryStatement)
317-
withoutQueryLogging (sqlExecStatement pool () failStatement)
323+
let thresholdSeconds = round staleThreshold :: Int
324+
runPool pool (HasqlSession.statement thresholdSeconds retryStatement)
325+
runPool pool (HasqlSession.statement () failStatement)
318326

319327
-- | Mapping for @JOB_STATUS@:
320328
--
@@ -379,9 +387,6 @@ instance DefaultParamEncoder JobStatus where
379387
instance DefaultParamEncoder [JobStatus] where
380388
defaultParam = Encoders.nonNullable $ Encoders.foldableArray $ Encoders.nonNullable (Encoders.enum (Just "public") "job_status" inputValue)
381389

382-
getHasqlPool :: (?modelContext :: ModelContext) => IO HasqlPool.Pool
383-
getHasqlPool = pure ?modelContext.hasqlPool
384-
385390
-- | Non-blocking write to a TBQueue. Returns True if the value was written,
386391
-- False if the queue was full.
387392
tryWriteTBQueue :: TBQueue a -> a -> STM Bool

0 commit comments

Comments
 (0)