From 98660544aae502703363609efbfef6ade22f6704 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 30 Oct 2014 00:10:16 -0700 Subject: [PATCH 1/5] Verify that pmap has a readahead memory leak Pmap uses a thread to add tasks to the queue. That thread is not bounded by anything, so given lots of potential tasks, it can realize lots of lazy input. --- test/com/climate/claypoole_test.clj | 31 +++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index c6b0250..7154d41 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -603,6 +603,35 @@ (deliver finish :done) @task-runner)) +(defn check-read-ahead + "Verify that this pmap function does not read too far ahead in the input + sequence, as that can cause unnecessary use of RAM." + [pmap-fn] + (let [a (atom nil) + indicator #(do (reset! a %) a) + finish (promise) + started (promise) + results (pmap-fn 4 deref + (concat ;; indicate we've started + (repeatedly 1 #(do (deliver started true) + started)) + ;; block the map + (repeat 10 finish) + ;; a long runway + (map atom (range 100)) + ;; an indicator for whether we've realized + ;; past the runway + (map indicator [:started])))] + ;; Let the tasks run + @started + ;; Let the threadpool run unchecked for a minute + (Thread/sleep 100) + ;; Verify that the indicator wasn't triggered + (is (= nil @a)) + ;; Complete the map + (deliver finish :done) + (dorun results))) + (defn check-shuts-off [pmap-like] (cp/with-shutdown! [pool 2] @@ -649,6 +678,8 @@ (when lazy? (testing (format "%s doesn't hold the head of lazy sequences" fn-name) (check-holding-thread pmap-like)) + (testing (format "%s doesn't read ahead in the input sequence" fn-name) + (check-read-ahead pmap-like)) (testing (format "%s can be chained in various threadpools" fn-name) (check-chaining pmap-like)) (testing (format "%s stops processing when an exception occurs" fn-name) From dce1e50f198901ddd5e332dc3cb4b2733adc142d Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 30 Oct 2014 00:12:32 -0700 Subject: [PATCH 2/5] Fix readahead memory leak --- src/clj/com/climate/claypoole.clj | 72 +++++++++++++----------- src/clj/com/climate/claypoole/impl.clj | 68 ++++++++++++++++------ test/com/climate/claypoole/impl_test.clj | 23 ++++++-- 3 files changed, 105 insertions(+), 58 deletions(-) diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index 19d6e75..cdf99d0 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -271,7 +271,7 @@ "Creates a function to cancel a bunch of futures." [future-reader] (let [first-already-cancelled (atom Long/MAX_VALUE)] - (fn [i future-seq] + (fn [i later-tasks] (let [cancel-end @first-already-cancelled] ;; Don't re-kill futures we've already zapped to prevent an O(n^2) ;; explosion. @@ -280,13 +280,13 @@ ;; Kill the future reader. (future-cancel future-reader) ;; Stop the tasks above i before cancel-end. - (doseq [f (->> future-seq (take (- cancel-end i)))] + (doseq [f (->> later-tasks rest (take (- cancel-end i)))] (future-cancel f))))))) (defn- pmap-core "Given functions to customize for pmap or upmap, create a function that does the hard work of pmap." - [send-result read-result] + [ordered?] (fn [pool f arg-seqs] (let [[shutdown? pool] (impl/->threadpool pool) ;; Use map to handle the argument sequences. @@ -294,29 +294,34 @@ ;; Pre-declare the canceller because it needs the tasks but the tasks ;; need it too. canceller (promise) + ;; Set up queues of tasks and results + [task-q tasks] (impl/queue-seq) + [unordered-results-q unordered-results] (impl/queue-seq) + ;; This is how we'll actually make things go. start-task (fn [i a later-tasks] ;; We can't directly make a future add itself to a - ;; queue. Instead, we use a promise for indirection. + ;; queue. Instead, we use a promise for indirection. (let [p (promise)] (deliver p (future-call pool (with-meta - ;; Try to run the task, but - ;; definitely add the future to - ;; the queue. + ;; Try to run the task, but definitely + ;; add the future to the queue. #(try (let [result (apply f a)] - (send-result @p) + (impl/queue-seq-add! + unordered-results-q @p) result) - ;; Even if we had an error - ;; running the task, make sure the - ;; future shows up in the queue. + ;; Even if we had an error running + ;; the task, make sure the future + ;; shows up in the queue. (catch Throwable t ;; We've still got to send that ;; result, even if it was an ;; exception, and we have to do it ;; before we start the canceller. - (send-result @p) + (impl/queue-seq-add! + unordered-results-q @p) ;; If we've had an exception, kill ;; future and ongoing processes. (@canceller i later-tasks) @@ -326,19 +331,28 @@ ;; metadata for prioritization. {:args a}))) @p)) - futures (impl/map-indexed-with-rest start-task args) ;; Start all the tasks in a real future, so we don't block. - read-future (core/future - (try - ;; Force all those futures to start. - (dorun futures) - ;; If we created a temporary pool, shut it down. - (finally (when shutdown? (shutdown pool)))))] - (deliver canceller (make-canceller read-future)) + readahead (* 2 (or (impl/get-pool-size pool) 100)) + driver (core/future + (try + (doseq [[i a later-tasks _] + (map vector (range) args (impl/subseqs tasks) + ;; force the results so we don't get + ;; ahead of ourselves + (concat (repeat readahead nil) + unordered-results))] + (impl/queue-seq-add! task-q (start-task i a later-tasks))) + (finally + (impl/queue-seq-end! task-q) + (when shutdown? (shutdown pool)))))] + (deliver canceller (make-canceller driver)) ;; Read results as available. - (concat (map read-result futures) + (concat (map deref + (if ordered? + tasks + (map second (impl/lazy-co-read tasks unordered-results)))) ;; Deref the read-future to get its exceptions, if it has any. - (lazy-seq (try @read-future + (lazy-seq (try @driver ;; But if it was cancelled, the user doesn't care. (catch CancellationException e))))))) @@ -373,23 +387,13 @@ serial via (doall map). This may be helpful during profiling, for example. " [pool f & arg-seqs] - (pmap-boilerplate pool f arg-seqs - ;; pmap is easy--just deref the futures. - (let [send-result (constantly nil) - read-result deref] - (pmap-core send-result read-result)))) + (pmap-boilerplate pool f arg-seqs (pmap-core true))) (defn upmap "Like pmap, except that the return value is a sequence of results ordered by *completion time*, not by input order." [pool f & arg-seqs] - (pmap-boilerplate pool f arg-seqs - ;; upmap is a little complex; read data out of a queue to - ;; get the earliest-available data. - (let [q (LinkedBlockingQueue.) - send-result (fn [result] (.add q result)) - read-result (fn [_] (-> q .take deref))] - (pmap-core send-result read-result)))) + (pmap-boilerplate pool f arg-seqs (pmap-core false))) (defn pcalls "Like clojure.core.pcalls, except it takes a threadpool. For more detail on diff --git a/src/clj/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj index 6f9c5f2..7ff3d0b 100644 --- a/src/clj/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -29,6 +29,7 @@ Executors ExecutorService Future + LinkedBlockingQueue ScheduledExecutorService ThreadFactory TimeoutException @@ -214,23 +215,54 @@ (str "Claypoole functions require a threadpool, a " "number, :builtin, or :serial, not %s.") arg))))) -(defn map-indexed-with-rest - "Given f and xs, return a sequence of (f i x s) for every x in xs where - s is the results of the rest of the map. +(defn get-pool-size + "If the pool has a max size, get that; else, return nil." + [pool] + (cond + (instance? java.util.concurrent.ScheduledThreadPoolExecutor pool) + (.getCorePoolSize pool) - i.e. (map-indexed-with-rest #(do (prn [%1 %2 %3]) %2) [:x :y :z]) - would print - [0 :x (:y :z)] - [1 :y (:z)] - [2 :z ()] + (instance? java.util.concurrent.ThreadPoolExecutor pool) + (.getMaximumPoolSize pool) - This is handy for lazy sequences that might need to see later elements - in the sequence, e.g. for cancelling futures." - [f xs & [i]] - (lazy-seq - (when (not (empty? xs)) - (let [i (or i 0) - x (first xs) - more (rest xs) - results (map-indexed-with-rest f more (inc i))] - (cons (f i x results) results))))) + :else + nil)) + +(let [marker (Object.)] + (defn- queue-reader + "Make a lazy sequence from a queue." + [q] + (lazy-seq + (let [x (.take q)] + (when-not (identical? x marker) + (cons x (queue-reader q)))))) + + (defn queue-seq + "Create a queue and a lazy sequence that reads from that queue." + [] + (let [q (LinkedBlockingQueue.)] + [q (queue-reader q)])) + + (defn queue-seq-add! + "Add an item to a queue (and its lazy sequence)." + [q x] + (.add q x)) + + (defn queue-seq-end! + "End a lazy sequence reading from a queue." + [q] + (queue-seq-add! q marker))) + +(defn lazy-co-read + "Zip s1 and s2, stopping when s1 stops. This helps avoid potential stalls + when trying to read queue sequences." + [s1 s2] + (lazy-seq (when-not (empty? s1) + (cons [(first s1) (first s2)] + (lazy-co-read (rest s1) (rest s2)))))) + +(defn subseqs + "Given a sequence s, return a lazy, non-head-holding sequence of + (s (drop 1 s) (drop 2 s) ... '())" + [s] + (reductions (fn [l _] (rest l)) s s)) diff --git a/test/com/climate/claypoole/impl_test.clj b/test/com/climate/claypoole/impl_test.clj index f3cc20d..918f098 100644 --- a/test/com/climate/claypoole/impl_test.clj +++ b/test/com/climate/claypoole/impl_test.clj @@ -17,9 +17,20 @@ [com.climate.claypoole.impl :as impl])) -(deftest test-map-indexed-with-rest - (is (= (impl/map-indexed-with-rest vector [:x :y :z]) - '([0 :x ([1 :y ([2 :z ()])] - [2 :z ()])] - [1 :y ([2 :z ()])] - [2 :z ()])))) +(deftest test-queue-seq + (let [[q qs] (impl/queue-seq)] + (doseq [i (range 10)] + (impl/queue-seq-add! q i)) + (impl/queue-seq-end! q) + (is (= (range 10) qs)))) + +(deftest test-lazy-co-read + (let [s1 (range 10) + s2 (concat (range 10) (lazy-seq (deref (promise))))] + (is (= (map #(list % %) (range 10)) + (impl/lazy-co-read s1 s2))))) + +(deftest test-subseqs + (let [n 10] + (is (= (impl/subseqs (range n)) + (map #(drop % (range n)) (range (inc n))))))) From 5f5b2baa51b736bc0dfafe42512f360bba7a78ca Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 30 Oct 2014 10:53:26 -0700 Subject: [PATCH 3/5] Clearer comments & code for fix #17 This addresses Seba's comments. --- src/clj/com/climate/claypoole.clj | 29 +++++++++++++++++++++----- src/clj/com/climate/claypoole/impl.clj | 27 ++++++++++++++++++------ 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index cdf99d0..b2a5dd6 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -42,6 +42,16 @@ handy for testing." true) +(def ^:dynamic *default-pmap-buffer* + "This is an advanced configuration option. You probably don't need to set + this! + + When doing a pmap, Claypoole pushes input tasks into the threadpool. It + normally tries to keep the threadpool full, plus it adds a buffer of size + nthreads. If it can't find out the number of thread in the threadpool, it + just tries to keep *default-pmap-buffer* tasks in the pool." + 200) + (defn ncpus "Get the number of available CPUs." [] @@ -283,6 +293,15 @@ (doseq [f (->> later-tasks rest (take (- cancel-end i)))] (future-cancel f))))))) +(defn- buffer-blocking-seq + "Make a lazy sequence that blocks when the map's (imaginary) buffer is full." + [pool unordered-results] + (let [buffer-size (if-let [pool-size (impl/get-pool-size pool)] + (* 2 pool-size) + *default-pmap-buffer*)] + (concat (repeat buffer-size nil) + unordered-results))) + (defn- pmap-core "Given functions to customize for pmap or upmap, create a function that does the hard work of pmap." @@ -332,15 +351,15 @@ {:args a}))) @p)) ;; Start all the tasks in a real future, so we don't block. - readahead (* 2 (or (impl/get-pool-size pool) 100)) driver (core/future (try (doseq [[i a later-tasks _] (map vector (range) args (impl/subseqs tasks) - ;; force the results so we don't get - ;; ahead of ourselves - (concat (repeat readahead nil) - unordered-results))] + ;; The driver thread reads from this sequence + ;; and ignores the result, just to get the + ;; side effect of blocking when the map's + ;; (imaginary) buffer is full. + (buffer-blocking-seq pool unordered-results))] (impl/queue-seq-add! task-q (start-task i a later-tasks))) (finally (impl/queue-seq-end! task-q) diff --git a/src/clj/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj index 7ff3d0b..9f625dc 100644 --- a/src/clj/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -228,13 +228,18 @@ :else nil)) -(let [marker (Object.)] +;; Queue-seq needs a unique item that, when seen in a queue, indicates that the +;; sequence has ended. It uses this private object, and uses identical? to +;; check against this object's (unique) memory address. +(let [end-marker (Object.)] + (defn- queue-reader - "Make a lazy sequence from a queue." + "Make a lazy sequence from a queue, stopping upon reading the unique + end-marker object." [q] (lazy-seq (let [x (.take q)] - (when-not (identical? x marker) + (when-not (identical? x end-marker) (cons x (queue-reader q)))))) (defn queue-seq @@ -251,11 +256,21 @@ (defn queue-seq-end! "End a lazy sequence reading from a queue." [q] - (queue-seq-add! q marker))) + (queue-seq-add! q end-marker))) (defn lazy-co-read - "Zip s1 and s2, stopping when s1 stops. This helps avoid potential stalls - when trying to read queue sequences." + "Zip s1 and s2, stopping when s1 stops. This helps avoid potential blocking + when trying to read queue sequences. + + In particular, this will block: + (map vector + (range 10) + (concat (range 10) (lazy-seq (deref (promise))))) + even though we only can read 10 things. Lazy-co-read fixes that case by + checking the first sequence first, so this will not block: + (lazy-co-read + (range 10) + (concat (range 10) (lazy-seq (deref (promise)))))" [s1 s2] (lazy-seq (when-not (empty? s1) (cons [(first s1) (first s2)] From a70d88d4deca6e7844b8da9864c083c6d33c8f3a Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 30 Oct 2014 10:55:38 -0700 Subject: [PATCH 4/5] Simplify calling of pmap-boilerplate and pmap-core The other changes to pmap-core have made this simplification possible. --- src/clj/com/climate/claypoole.clj | 145 +++++++++++++++--------------- 1 file changed, 72 insertions(+), 73 deletions(-) diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index b2a5dd6..d772050 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -305,85 +305,84 @@ (defn- pmap-core "Given functions to customize for pmap or upmap, create a function that does the hard work of pmap." - [ordered?] - (fn [pool f arg-seqs] - (let [[shutdown? pool] (impl/->threadpool pool) - ;; Use map to handle the argument sequences. - args (apply map vector (map impl/unchunk arg-seqs)) - ;; Pre-declare the canceller because it needs the tasks but the tasks - ;; need it too. - canceller (promise) - ;; Set up queues of tasks and results - [task-q tasks] (impl/queue-seq) - [unordered-results-q unordered-results] (impl/queue-seq) - ;; This is how we'll actually make things go. - start-task (fn [i a later-tasks] - ;; We can't directly make a future add itself to a - ;; queue. Instead, we use a promise for indirection. - (let [p (promise)] - (deliver p (future-call - pool - (with-meta - ;; Try to run the task, but definitely - ;; add the future to the queue. - #(try - (let [result (apply f a)] - (impl/queue-seq-add! - unordered-results-q @p) - result) - ;; Even if we had an error running - ;; the task, make sure the future - ;; shows up in the queue. - (catch Throwable t - ;; We've still got to send that - ;; result, even if it was an - ;; exception, and we have to do it - ;; before we start the canceller. - (impl/queue-seq-add! - unordered-results-q @p) - ;; If we've had an exception, kill - ;; future and ongoing processes. - (@canceller i later-tasks) - ;; Re-throw that throwable! - (throw t))) - ;; Add the args to the function's - ;; metadata for prioritization. - {:args a}))) - @p)) - ;; Start all the tasks in a real future, so we don't block. - driver (core/future - (try - (doseq [[i a later-tasks _] - (map vector (range) args (impl/subseqs tasks) - ;; The driver thread reads from this sequence - ;; and ignores the result, just to get the - ;; side effect of blocking when the map's - ;; (imaginary) buffer is full. - (buffer-blocking-seq pool unordered-results))] - (impl/queue-seq-add! task-q (start-task i a later-tasks))) - (finally - (impl/queue-seq-end! task-q) - (when shutdown? (shutdown pool)))))] - (deliver canceller (make-canceller driver)) - ;; Read results as available. - (concat (map deref - (if ordered? - tasks - (map second (impl/lazy-co-read tasks unordered-results)))) - ;; Deref the read-future to get its exceptions, if it has any. - (lazy-seq (try @driver - ;; But if it was cancelled, the user doesn't care. - (catch CancellationException e))))))) + [pool ordered? f arg-seqs] + (let [[shutdown? pool] (impl/->threadpool pool) + ;; Use map to handle the argument sequences. + args (apply map vector (map impl/unchunk arg-seqs)) + ;; Pre-declare the canceller because it needs the tasks but the tasks + ;; need it too. + canceller (promise) + ;; Set up queues of tasks and results + [task-q tasks] (impl/queue-seq) + [unordered-results-q unordered-results] (impl/queue-seq) + ;; This is how we'll actually make things go. + start-task (fn [i a later-tasks] + ;; We can't directly make a future add itself to a + ;; queue. Instead, we use a promise for indirection. + (let [p (promise)] + (deliver p (future-call + pool + (with-meta + ;; Try to run the task, but definitely + ;; add the future to the queue. + #(try + (let [result (apply f a)] + (impl/queue-seq-add! + unordered-results-q @p) + result) + ;; Even if we had an error running + ;; the task, make sure the future + ;; shows up in the queue. + (catch Throwable t + ;; We've still got to send that + ;; result, even if it was an + ;; exception, and we have to do it + ;; before we start the canceller. + (impl/queue-seq-add! + unordered-results-q @p) + ;; If we've had an exception, kill + ;; future and ongoing processes. + (@canceller i later-tasks) + ;; Re-throw that throwable! + (throw t))) + ;; Add the args to the function's + ;; metadata for prioritization. + {:args a}))) + @p)) + ;; Start all the tasks in a real future, so we don't block. + driver (core/future + (try + (doseq [[i a later-tasks _] + (map vector (range) args (impl/subseqs tasks) + ;; The driver thread reads from this sequence + ;; and ignores the result, just to get the side + ;; effect of blocking when the map's + ;; (imaginary) buffer is full. + (buffer-blocking-seq pool unordered-results))] + (impl/queue-seq-add! task-q (start-task i a later-tasks))) + (finally + (impl/queue-seq-end! task-q) + (when shutdown? (shutdown pool)))))] + (deliver canceller (make-canceller driver)) + ;; Read results as available. + (concat (map deref + (if ordered? + tasks + (map second (impl/lazy-co-read tasks unordered-results)))) + ;; Deref the read-future to get its exceptions, if it has any. + (lazy-seq (try @driver + ;; But if it was cancelled, the user doesn't care. + (catch CancellationException e)))))) (defn- pmap-boilerplate "Do boilerplate pmap checks, then call the real pmap function." - [pool f arg-seqs pmap-fn] + [pool ordered? f arg-seqs] (when (empty? arg-seqs) (throw (IllegalArgumentException. "pmap requires at least one sequence to map over"))) (if (serial? pool) (doall (apply map f arg-seqs)) - (pmap-fn pool f arg-seqs))) + (pmap-core pool ordered? f arg-seqs))) (defn pmap "Like clojure.core.pmap, except: @@ -406,13 +405,13 @@ serial via (doall map). This may be helpful during profiling, for example. " [pool f & arg-seqs] - (pmap-boilerplate pool f arg-seqs (pmap-core true))) + (pmap-boilerplate pool true f arg-seqs)) (defn upmap "Like pmap, except that the return value is a sequence of results ordered by *completion time*, not by input order." [pool f & arg-seqs] - (pmap-boilerplate pool f arg-seqs (pmap-core false))) + (pmap-boilerplate pool false f arg-seqs)) (defn pcalls "Like clojure.core.pcalls, except it takes a threadpool. For more detail on From 41d3c732f7abeffca6be78823bb27cc04c1e320b Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 30 Oct 2014 12:47:30 -0700 Subject: [PATCH 5/5] Slight readability/doc improvements --- src/clj/com/climate/claypoole.clj | 10 +++++----- src/clj/com/climate/claypoole/impl.clj | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index d772050..697be19 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -48,7 +48,7 @@ When doing a pmap, Claypoole pushes input tasks into the threadpool. It normally tries to keep the threadpool full, plus it adds a buffer of size - nthreads. If it can't find out the number of thread in the threadpool, it + nthreads. If it can't find out the number of threads in the threadpool, it just tries to keep *default-pmap-buffer* tasks in the pool." 200) @@ -278,8 +278,8 @@ `(future-call ~pool (^{:once true} fn future-body [] ~@body))) (defn- make-canceller - "Creates a function to cancel a bunch of futures." - [future-reader] + "Creates a function to cancel a pmap." + [driver] (let [first-already-cancelled (atom Long/MAX_VALUE)] (fn [i later-tasks] (let [cancel-end @first-already-cancelled] @@ -287,8 +287,8 @@ ;; explosion. (when (< i cancel-end) (swap! first-already-cancelled min i) - ;; Kill the future reader. - (future-cancel future-reader) + ;; Kill the pmap driver thread. + (future-cancel driver) ;; Stop the tasks above i before cancel-end. (doseq [f (->> later-tasks rest (take (- cancel-end i)))] (future-cancel f))))))) diff --git a/src/clj/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj index 9f625dc..06a30e6 100644 --- a/src/clj/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -229,8 +229,8 @@ nil)) ;; Queue-seq needs a unique item that, when seen in a queue, indicates that the -;; sequence has ended. It uses this private object, and uses identical? to -;; check against this object's (unique) memory address. +;; sequence has ended. It uses the private object end-marker, and uses +;; identical? to check against this object's (unique) memory address. (let [end-marker (Object.)] (defn- queue-reader