Skip to content

Commit

Permalink
ASYNC-256: Implemented thread-call, thread, and io-thread in terms of…
Browse files Browse the repository at this point in the history
… thread-call, providing the proper workflow flag to a new arity that takes a keyword representing the expected workload - one of :io, :compute, :mixed that utilizes the proper ExecutorService (:mixed by default). Merged concurrency and exec.threadpool nses into dispatch. Added functionality to dispatch to allow user definition via sysprop -> qualified var name of executor service factory taking flags above and returning instance or nil.
  • Loading branch information
fogus authored and puredanger committed Feb 19, 2025
1 parent 6689b56 commit fb61cdc
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 100 deletions.
4 changes: 1 addition & 3 deletions build.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
(b/delete {:path "target"})
(b/compile-clj {:basis basis, :src-dirs ["src/main/clojure"], :class-dir class-dir,
:filter-nses '[clojure.core.async]
:ns-compile '[clojure.core.async.impl.exec.threadpool
clojure.core.async.impl.protocols
:ns-compile '[clojure.core.async.impl.protocols
clojure.core.async.impl.mutex
clojure.core.async.impl.concurrent
clojure.core.async.impl.dispatch
clojure.core.async.impl.ioc-macros
clojure.core.async.impl.buffers
Expand Down
74 changes: 54 additions & 20 deletions src/main/clojure/clojure/core/async.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,33 @@ to validate go blocks do not invoke core.async blocking operations.
Property is read once, at namespace load time. Recommended for use
primarily during development. Invalid blocking calls will throw in
go block threads - use Thread.setDefaultUncaughtExceptionHandler()
to catch and handle."
to catch and handle.
Use the Java system property `clojure.core.async.executor-factory`
to specify a function that will provide ExecutorServices for
application-wide use by core.async in lieu of its defaults. The
property value should name a fully qualified var. The function
will be passed a keyword indicating the context of use of the
executor, and should return either an ExecutorService, or nil to
use the default. Results per keyword will be cached and used for
the remainder of the application. Possible context arguments are:
:io - used in async/io-thread, for :io workloads in flow/process,
and for dispatch handling if no explicit dispatch handler is
provided (see below)
:mixed - used by async/thread and for :mixed workloads in
flow/process
:compute - used for :compute workloads in flow/process
:core-async-dispatch - used for completion fn handling (e.g. in put!
and take!, as well as go block IOC thunk processing) throughout
core.async. If not supplied the ExecutorService for :io will be
used instead.
The set of contexts may grow in the future so the function should
return nil for unexpected contexts."
(:refer-clojure :exclude [reduce transduce into merge map take partition
partition-by bounded-count])
(:require [clojure.core.async.impl.protocols :as impl]
Expand All @@ -29,7 +55,6 @@ to catch and handle."
[clojure.core.async.impl.ioc-macros :as ioc]
clojure.core.async.impl.go ;; TODO: make conditional
[clojure.core.async.impl.mutex :as mutex]
[clojure.core.async.impl.concurrent :as conc]
)
(:import [java.util.concurrent.atomic AtomicLong]
[java.util.concurrent.locks Lock]
Expand Down Expand Up @@ -468,33 +493,42 @@ to catch and handle."
[& body]
(#'clojure.core.async.impl.go/go-impl &env body))

(defonce ^:private ^Executor thread-macro-executor
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-thread-macro-%d" true)))

(defn thread-call
"Executes f in another thread, returning immediately to the calling
thread. Returns a channel which will receive the result of calling
f when completed, then close."
[f]
(let [c (chan 1)]
(let [binds (Var/getThreadBindingFrame)]
(.execute thread-macro-executor
(fn []
(Var/resetThreadBindingFrame binds)
(try
(let [ret (f)]
(when-not (nil? ret)
(>!! c ret)))
(finally
(close! c))))))
c))
f when completed, then close. workload is a keyword that describes
the work performed by f, where:
:io - may do blocking I/O but must not do extended computation
:compute - must not ever block
:mixed - anything else (default)
when workload not supplied, defaults to :mixed"
([f] (thread-call f :mixed))
([f workload]
(let [c (chan 1)
returning-to-chan (fn [bf]
#(try
(when-some [ret (bf)]
(>!! c ret))
(finally (close! c))))]
(-> f bound-fn* returning-to-chan (dispatch/exec workload))
c)))

(defmacro thread
"Executes the body in another thread, returning immediately to the
calling thread. Returns a channel which will receive the result of
the body when completed, then close."
[& body]
`(thread-call (^:once fn* [] ~@body)))
`(thread-call (^:once fn* [] ~@body) :mixed))

(defmacro io-thread
"Executes the body in a thread, returning immediately to the calling
thread. The body may do blocking I/O but must not do extended computation.
Returns a channel which will receive the result of the body when completed,
then close."
[& body]
`(thread-call (^:once fn* [] ~@body) :io))

;;;;;;;;;;;;;;;;;;;; ops ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

Expand Down
38 changes: 0 additions & 38 deletions src/main/clojure/clojure/core/async/impl/concurrent.clj

This file was deleted.

71 changes: 66 additions & 5 deletions src/main/clojure/clojure/core/async/impl/dispatch.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,45 @@

(ns ^{:skip-wiki true}
clojure.core.async.impl.dispatch
(:require [clojure.core.async.impl.protocols :as impl]
[clojure.core.async.impl.exec.threadpool :as tp]))
(:require [clojure.core.async.impl.protocols :as impl])
(:import [java.util.concurrent Executors ExecutorService ThreadFactory]))

(set! *warn-on-reflection* true)

(defonce ^:private in-dispatch (ThreadLocal.))

(defonce executor
(delay (tp/thread-pool-executor #(.set ^ThreadLocal in-dispatch true))))
(defonce executor nil)

(defn counted-thread-factory
"Create a ThreadFactory that maintains a counter for naming Threads.
name-format specifies thread names - use %d to include counter
daemon is a flag for whether threads are daemons or not
opts is an options map:
init-fn - function to run when thread is created"
([name-format daemon]
(counted-thread-factory name-format daemon nil))
([name-format daemon {:keys [init-fn] :as opts}]
(let [counter (atom 0)]
(reify
ThreadFactory
(newThread [_this runnable]
(let [body (if init-fn
(fn [] (init-fn) (.run ^Runnable runnable))
runnable)
t (Thread. ^Runnable body)]
(doto t
(.setName (format name-format (swap! counter inc)))
(.setDaemon daemon))))))))

(defonce
^{:doc "Number of processors reported by the JVM"}
processors (.availableProcessors (Runtime/getRuntime)))

(def ^:private pool-size
"Value is set via clojure.core.async.pool-size system property; defaults to 8; uses a
delay so property can be set from code after core.async namespace is loaded but before
any use of the async thread pool."
(delay (or (Long/getLong "clojure.core.async.pool-size") 8)))

(defn in-dispatch-thread?
"Returns true if the current thread is a go block dispatch pool thread"
Expand All @@ -37,9 +67,40 @@
(.uncaughtException (Thread/currentThread) ex))
nil)

(defn- make-ctp-named
[workflow]
(Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name workflow) "-%d") true)))

(defn ^:private create-default-executor
[workload]
(case workload
:compute (make-ctp-named :compute)
:io (make-ctp-named :io)
:mixed (make-ctp-named :mixed)))

(def executor-for
"Given a workload tag, returns an ExecutorService instance and memoizes the result. By
default, core.async will defer to a user factory (if provided via sys prop) or construct
a specialized ExecutorService instance for each tag :io, :compute, and :mixed. When
given the tag :core-async-dispatch it will default to the executor service for :io."
(memoize
(fn ^ExecutorService [workload]
(let [sysprop-factory (when-let [esf (System/getProperty "clojure.core.async.executor-factory")]
(requiring-resolve (symbol esf)))
sp-exec (and sysprop-factory (sysprop-factory workload))]
(or sp-exec
(if (= workload :core-async-dispatch)
(executor-for :io)
(create-default-executor workload)))))))

(defn exec
[^Runnable r workload]
(let [^ExecutorService e (executor-for workload)]
(.execute e r)))

(defn run
"Runs Runnable r on current thread when :on-caller? meta true, else in a thread pool thread."
[^Runnable r]
(if (-> r meta :on-caller?)
(try (.run r) (catch Throwable t (ex-handler t)))
(impl/exec @executor r)))
(exec r :core-async-dispatch)))
32 changes: 0 additions & 32 deletions src/main/clojure/clojure/core/async/impl/exec/threadpool.clj

This file was deleted.

4 changes: 2 additions & 2 deletions src/test/clojure/clojure/core/async/concurrent_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

(ns clojure.core.async.concurrent-test
(:require [clojure.test :refer :all]
[clojure.core.async.impl.concurrent :as conc])
[clojure.core.async.impl.dispatch :as dispatch])
(:import [java.util.concurrent ThreadFactory]))

(deftest test-counted-thread-factory
(testing "Creates numbered threads"
(let [^ThreadFactory factory (conc/counted-thread-factory "foo-%d" true)
(let [^ThreadFactory factory (dispatch/counted-thread-factory "foo-%d" true)
threads (repeatedly 3 #(.newThread factory (constantly nil)))]
(is (= ["foo-1" "foo-2" "foo-3"] (map #(.getName ^Thread %) threads))))))

17 changes: 17 additions & 0 deletions src/test/clojure/clojure/core/async_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,23 @@
(binding [test-dyn true]
(is (<!! (thread test-dyn))))))

(deftest io-thread-tests
(testing "io-thread blocking ops"
(let [c1 (chan)
c2 (chan)
c3 (chan)]
(io-thread (>!! c2 (clojure.string/upper-case (<!! c1))))
(io-thread (>!! c3 (clojure.string/reverse (<!! c2))))
(>!! c1 "loop")
(is (= "POOL" (<!! c3)))))
(testing "io-thread parking op should fail"
(let [c1 (chan)]
(io-thread
(try
(>! c1 :no)
(catch AssertionError _
(>!! c1 :yes))))
(is (= :yes (<!! c1))))))

(deftest ops-tests
(testing "map<"
Expand Down

0 comments on commit fb61cdc

Please sign in to comment.