Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare for v1.1 release #8

Merged
merged 6 commits into from
Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
dist: trusty
language: clojure
services:
- mysql
jdk:
- openjdk7
- oraclejdk7
- oraclejdk8
- oraclejdk9
- oraclejdk11
script: "lein test-all"
before_script: "echo 'CREATE DATABASE IF NOT EXISTS clj_mysql_queue;' | mysql -uroot"

2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject mysql-queue "1.0.0"
(defproject mysql-queue "1.1.0"
:description "A durable queue with scheduled job support that is backed by MySQL."
:url "https://github.com/wildbit/mysql-queue"
:license {:name "Eclipse Public License"
Expand Down
43 changes: 31 additions & 12 deletions src/mysql_queue/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,21 @@
[clojure.core.async :as async :refer [chan >!! >! <! <!! go go-loop thread thread-call close! timeout alts! alts!!]]
[clojure.core.async.impl.protocols :as async-proto :refer [closed?]])
(:import (com.mysql.jdbc.exceptions.jdbc4 MySQLIntegrityConstraintViolationException)
(java.util Date)))
(java.util Date)
(java.util.concurrent TimeoutException)))

(defmacro with-timeout
"A macro that executes the body in a future with a specified
timeout. Returns nil if times out. Throws an exception if
body returns nil."
[timeout & body]
`(let [f# (future ~@body)
ret# (deref f# ~timeout ~::timed-out)]
(if (= ret# ~::timed-out)
(do
(future-cancel f#)
(throw (TimeoutException. (str "Execution took more than " ~timeout "ms. Terminating."))))
ret#)))

(def ultimate-job-states #{:canceled :failed :done})
(def max-retries 5)
Expand Down Expand Up @@ -42,7 +56,7 @@

(defprotocol Executable
(finished? [job])
(execute [job db-conn log-fn err-fn]))
(execute [job db-conn log-fn err-fn timeout]))

(defprotocol Fertile
(beget [parent] [parent status] [parent status parameters]))
Expand Down Expand Up @@ -103,7 +117,9 @@
(job-summary-string this))
Fertile
(beget [this]
(->Job user-fn nil scheduled-job-id id name status parameters (inc attempt)))
(if (< attempt max-retries)
(->Job user-fn nil scheduled-job-id id name status parameters (inc attempt))
(->Job user-fn nil scheduled-job-id id name :failed parameters attempt)))
(beget [this _] (beget this))
(beget [this _ _] (beget this)))

Expand Down Expand Up @@ -156,13 +172,13 @@
Job
(finished? [job]
(ultimate-job-states (:status job)))
(execute [{:as job job-fn :user-fn :keys [status parameters attempt]} db-conn log-fn err-fn]
(execute [{:as job job-fn :user-fn :keys [status parameters attempt]} db-conn log-fn err-fn timeout]
(profile-block [m]
(if (finished? job)
(cleanup job db-conn)
(try
(log-fn :info job "Executing job " job)
(let [[status params] (-> (meter m :user (job-fn status parameters))
(let [[status params] (-> (meter m :user (with-timeout timeout (job-fn status parameters)))
job-result-or-nil (or [:done nil]))]
(-> job (beget status params) (persist db-conn)))
(catch Exception e
Expand All @@ -172,14 +188,14 @@
(-> job (beget :failed) (persist db-conn))))))))
StuckJob
(finished? [job] false)
(execute [job db-conn log-fn err-fn]
(execute [job db-conn log-fn err-fn timeout]
(profile-block [_]
(log-fn :info job "Recovering job " job)
(-> job beget (persist db-conn))))
ScheduledJob
(finished? [job]
(throw (UnsupportedOperationException. "finished? is not implemented for ScheduledJob.")))
(execute [job db-conn log-fn _err-fn]
(execute [job db-conn log-fn err-fn timeout]
(profile-block [_]
(log-fn :info job "Executing job " job)
(-> job beget (persist db-conn)))))
Expand Down Expand Up @@ -246,7 +262,7 @@

(defn- consumer-thread
"Consumer loop. Automatically quits if the listen-chan is closed. Runs in a go-thread."
[id listen-chan status-chan db-conn log-fn err-fn]
[id listen-chan status-chan db-conn log-fn err-fn job-timeout]
(go
(while-let [job (<! listen-chan)]
(try
Expand All @@ -257,7 +273,7 @@
(if current-job
(do
(>! status-chan {:id id :state :running-job :job current-job})
(let [[next-job dmetrics] (<! (thread (execute current-job db-conn log-fn err-fn)))]
(let [[next-job dmetrics] (<! (thread (execute current-job db-conn log-fn err-fn job-timeout)))]
(recur next-job current-job (merge-with + metrics dmetrics))))
(do
(>! status-chan {:id id :state :finished-job :job last-job :metrics metrics})
Expand Down Expand Up @@ -449,6 +465,7 @@

* buffer-size - maximum number of jobs allowed into internal queue. Determines
when the publisher will block. Default 10.
* job-timeout-mins - the number of minutes after which the job times out. Default 20.
* prefetch - the number of jobs a publisher fetches from the database at once.
Default 10.
* num-stats-jobs - the number of jobs to keep in memory for statistical purpose.
Expand All @@ -463,14 +480,15 @@
sleep before querying the database for stuck jobs. Default 0 seconds.
* max-recovery-sleep-interval - the maximum time in seconds the recovery thread will
sleep before qerying the database for stuck jobs. Default 10 seconds.
* recovery-threshold-mins - the number of seconds after which a job is considered
stuck and will be picked up by the recovery thread.
* recovery-threshold-mins - the number of minutes after which a job is considered
stuck and will be picked up by the recovery thread. Default 20.
* log-fn - user-provided logging function of 3 arguments: level (keyword), job (record), message (msg).
* err-fn - user-provided error function of one argument: error (Exception)."
[db-conn
fn-bindings
&{:keys [buffer-size
prefetch
job-timeout-mins
num-stats-jobs
num-consumer-threads
min-scheduler-sleep-interval
Expand All @@ -481,6 +499,7 @@
log-fn
err-fn]
:or {buffer-size 10
job-timeout-mins 20
prefetch 10
num-stats-jobs 50
num-consumer-threads 2
Expand Down Expand Up @@ -519,7 +538,7 @@
[in-ch out-chs sieve] (deduplicate queue-chan num-consumer-threads)
consumer-threads (->> out-chs
(map-indexed
#(consumer-thread %1 %2 consumer-status-channel db-conn log-fn err-fn))
#(consumer-thread %1 %2 consumer-status-channel db-conn log-fn err-fn (* 60 1000 job-timeout-mins)))
(into [])
doall)
handler (partial publisher-error-handler log-fn err-fn)
Expand Down
41 changes: 41 additions & 0 deletions test/mysql_queue/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
[db-conn job-name]
(sql/delete! db-conn :scheduled_jobs ["name = ?" job-name]))

(defn count-jobs
[db-conn]
(sql/query db-conn ["SELECT COUNT(*) AS c FROM jobs"] {:result-set-fn (comp :c first)}))

(defn queue-size
[db-conn]
(sql/query db-conn ["SELECT COUNT(*) AS c FROM scheduled_jobs"] {:result-set-fn (comp :c first)}))

(defn setup-db
[f]
(initialize! db-conn)
Expand Down Expand Up @@ -202,6 +210,39 @@
(is (= num-jobs (count @check-ins))
"The number of executed jobs doesn't match the number of jobs queued."))))

(deftest stuck-job-max-attempts-test
(let [jobs {:test-foo #(throw (Exception. "This job should not have been executed, because it reached max attempts."))}
scheduled-id (schedule-job db-conn :test-foo :begin {} (java.util.Date. 0))]
(queries/insert-job<! db-conn scheduled-id 0 "test-foo" "begin" (pr-str {}) 5 (java.util.Date. 0))
(is (= 1 (count (queries/select-n-stuck-jobs db-conn ultimate-job-states ["test-foo"] [0] 5 5))))
(is (= 1 (count-jobs db-conn)))
(with-worker [wrk (worker db-conn
jobs
:num-consumer-threads 1
:max-scheduler-sleep-interval 0.5
:max-recovery-sleep-interval 0.5)]
(Thread/sleep 1000)
(is (zero? (count (queries/select-n-stuck-jobs db-conn ultimate-job-states ["test-foo"] [0] 5 5))))
(is (zero? (count-jobs db-conn))))))

(deftest job-timeout-test
(let [attempt (atom 0)
jobs {:test-foo (fn [_ _]
(swap! attempt inc)
(when (= 1 @attempt)
(Thread/sleep 10000))
[:done {}])}]
(schedule-job db-conn :test-foo :begin {} (java.util.Date.))
(with-worker [wrk (worker db-conn
jobs
:num-consumer-threads 1
; run scheduler every 0.5s
:max-scheduler-sleep-interval 0.5
; terminate jobs that take over 1 second
:job-timeout-mins (/ 1 60))]
(Thread/sleep 3000)
(is (= 2 @attempt)))))

(deftest graceful-shutdown-test
(let [num-jobs 2
expected-set (->> num-jobs range (map inc) (into #{}))
Expand Down