Skip to content

Commit

Permalink
Merge branch '1.7-dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
puredanger committed Feb 19, 2025
2 parents 88a7971 + d292e0e commit 78add58
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 104 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ Copyright © Rich Hickey and contributors

## Changelog

* next
* [ASYNC-256](https://clojure.atlassian.net/browse/ASYNC-256) (CLJ) Add io-thread and System property clojure.core.async.executor-factory
* [ASYNC-255](https://clojure.atlassian.net/browse/ASYNC-255) (CLJ) alts guards against put of nil message on entry
* Update tools.analyzer.jvm to 1.3.2
* Release 1.7.701 on 2024.12.17
* [ASYNC-254](https://clojure.atlassian.net/browse/ASYNC-254) (CLJ) Completions for blocking ops can be directly delivered without enqueuing for dispatch
* [ASYNC-252](https://clojure.atlassian.net/browse/ASYNC-252) (CLJ) Move go expander from ioc-macros to impl.go namespace
Expand Down
2 changes: 1 addition & 1 deletion VERSION_TEMPLATE
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.7.GENERATED_VERSION
1.8.GENERATED_VERSION-beta1
4 changes: 1 addition & 3 deletions build.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
(b/delete {:path "target"})
(b/compile-clj {:basis basis, :src-dirs ["src/main/clojure"], :class-dir class-dir,
:filter-nses '[clojure.core.async]
:ns-compile '[clojure.core.async.impl.exec.threadpool
clojure.core.async.impl.protocols
:ns-compile '[clojure.core.async.impl.protocols
clojure.core.async.impl.mutex
clojure.core.async.impl.concurrent
clojure.core.async.impl.dispatch
clojure.core.async.impl.ioc-macros
clojure.core.async.impl.buffers
Expand Down
2 changes: 1 addition & 1 deletion deps.edn
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{:paths ["src/main/clojure"]
:deps
{org.clojure/tools.analyzer.jvm {:mvn/version "1.3.1"}}
{org.clojure/tools.analyzer.jvm {:mvn/version "1.3.2"}}
:aliases
{:cljs-test {:extra-deps {org.clojure/clojurescript {:mvn/version "1.11.132"}}
:extra-paths ["src/main/clojure/cljs" "src/test/cljs"]}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>org.clojure</groupId>
<artifactId>tools.analyzer.jvm</artifactId>
<version>1.3.1</version>
<version>1.3.2</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
:url "http://www.eclipse.org/legal/epl-v10.html"}
:parent [org.clojure/pom.contrib "1.2.0"]
:dependencies [[org.clojure/clojure "1.11.4"]
[org.clojure/tools.analyzer.jvm "1.3.1"]
[org.clojure/tools.analyzer.jvm "1.3.2"]
[org.clojure/clojurescript "1.11.132" :scope "provided"]]
:global-vars {*warn-on-reflection* true}
:source-paths ["src/main/clojure"]
Expand Down
6 changes: 6 additions & 0 deletions src/main/clojure/cljs/core/async.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@
(let [flag (alt-flag)
ports (vec ports) ;; ensure vector for indexed nth
n (count ports)
_ (loop [i 0] ;; check for invalid write op
(when (< i n)
(let [port (nth ports i)]
(when (vector? port)
(assert (some? (port 1)) "can't put nil on channel")))
(recur (unchecked-inc i))))
idxs (random-array n)
priority (:priority opts)
ret
Expand Down
80 changes: 60 additions & 20 deletions src/main/clojure/clojure/core/async.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,33 @@ to validate go blocks do not invoke core.async blocking operations.
Property is read once, at namespace load time. Recommended for use
primarily during development. Invalid blocking calls will throw in
go block threads - use Thread.setDefaultUncaughtExceptionHandler()
to catch and handle."
to catch and handle.
Use the Java system property `clojure.core.async.executor-factory`
to specify a function that will provide ExecutorServices for
application-wide use by core.async in lieu of its defaults. The
property value should name a fully qualified var. The function
will be passed a keyword indicating the context of use of the
executor, and should return either an ExecutorService, or nil to
use the default. Results per keyword will be cached and used for
the remainder of the application. Possible context arguments are:
:io - used in async/io-thread, for :io workloads in flow/process,
and for dispatch handling if no explicit dispatch handler is
provided (see below)
:mixed - used by async/thread and for :mixed workloads in
flow/process
:compute - used for :compute workloads in flow/process
:core-async-dispatch - used for completion fn handling (e.g. in put!
and take!, as well as go block IOC thunk processing) throughout
core.async. If not supplied the ExecutorService for :io will be
used instead.
The set of contexts may grow in the future so the function should
return nil for unexpected contexts."
(:refer-clojure :exclude [reduce transduce into merge map take partition
partition-by bounded-count])
(:require [clojure.core.async.impl.protocols :as impl]
Expand All @@ -29,7 +55,6 @@ to catch and handle."
[clojure.core.async.impl.ioc-macros :as ioc]
clojure.core.async.impl.go ;; TODO: make conditional
[clojure.core.async.impl.mutex :as mutex]
[clojure.core.async.impl.concurrent :as conc]
)
(:import [java.util.concurrent.atomic AtomicLong]
[java.util.concurrent.locks Lock]
Expand Down Expand Up @@ -281,6 +306,12 @@ to catch and handle."
(let [flag (alt-flag)
ports (vec ports) ;; ensure vector for indexed nth
n (count ports)
_ (loop [i 0] ;; check for invalid write op
(when (< i n)
(let [port (nth ports i)]
(when (vector? port)
(assert (some? (port 1)) "can't put nil on channel")))
(recur (unchecked-inc i))))
^ints idxs (random-array n)
priority (:priority opts)
ret
Expand Down Expand Up @@ -463,33 +494,42 @@ to catch and handle."
[& body]
(#'clojure.core.async.impl.go/go-impl &env body))

(defonce ^:private ^Executor thread-macro-executor
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-thread-macro-%d" true)))

(defn thread-call
"Executes f in another thread, returning immediately to the calling
thread. Returns a channel which will receive the result of calling
f when completed, then close."
[f]
(let [c (chan 1)]
(let [binds (Var/getThreadBindingFrame)]
(.execute thread-macro-executor
(fn []
(Var/resetThreadBindingFrame binds)
(try
(let [ret (f)]
(when-not (nil? ret)
(>!! c ret)))
(finally
(close! c))))))
c))
f when completed, then close. workload is a keyword that describes
the work performed by f, where:
:io - may do blocking I/O but must not do extended computation
:compute - must not ever block
:mixed - anything else (default)
when workload not supplied, defaults to :mixed"
([f] (thread-call f :mixed))
([f workload]
(let [c (chan 1)
returning-to-chan (fn [bf]
#(try
(when-some [ret (bf)]
(>!! c ret))
(finally (close! c))))]
(-> f bound-fn* returning-to-chan (dispatch/exec workload))
c)))

(defmacro thread
"Executes the body in another thread, returning immediately to the
calling thread. Returns a channel which will receive the result of
the body when completed, then close."
[& body]
`(thread-call (^:once fn* [] ~@body)))
`(thread-call (^:once fn* [] ~@body) :mixed))

(defmacro io-thread
"Executes the body in a thread, returning immediately to the calling
thread. The body may do blocking I/O but must not do extended computation.
Returns a channel which will receive the result of the body when completed,
then close."
[& body]
`(thread-call (^:once fn* [] ~@body) :io))

;;;;;;;;;;;;;;;;;;;; ops ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

Expand Down
38 changes: 0 additions & 38 deletions src/main/clojure/clojure/core/async/impl/concurrent.clj

This file was deleted.

71 changes: 66 additions & 5 deletions src/main/clojure/clojure/core/async/impl/dispatch.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,45 @@

(ns ^{:skip-wiki true}
clojure.core.async.impl.dispatch
(:require [clojure.core.async.impl.protocols :as impl]
[clojure.core.async.impl.exec.threadpool :as tp]))
(:require [clojure.core.async.impl.protocols :as impl])
(:import [java.util.concurrent Executors ExecutorService ThreadFactory]))

(set! *warn-on-reflection* true)

(defonce ^:private in-dispatch (ThreadLocal.))

(defonce executor
(delay (tp/thread-pool-executor #(.set ^ThreadLocal in-dispatch true))))
(defonce executor nil)

(defn counted-thread-factory
"Create a ThreadFactory that maintains a counter for naming Threads.
name-format specifies thread names - use %d to include counter
daemon is a flag for whether threads are daemons or not
opts is an options map:
init-fn - function to run when thread is created"
([name-format daemon]
(counted-thread-factory name-format daemon nil))
([name-format daemon {:keys [init-fn] :as opts}]
(let [counter (atom 0)]
(reify
ThreadFactory
(newThread [_this runnable]
(let [body (if init-fn
(fn [] (init-fn) (.run ^Runnable runnable))
runnable)
t (Thread. ^Runnable body)]
(doto t
(.setName (format name-format (swap! counter inc)))
(.setDaemon daemon))))))))

(defonce
^{:doc "Number of processors reported by the JVM"}
processors (.availableProcessors (Runtime/getRuntime)))

(def ^:private pool-size
"Value is set via clojure.core.async.pool-size system property; defaults to 8; uses a
delay so property can be set from code after core.async namespace is loaded but before
any use of the async thread pool."
(delay (or (Long/getLong "clojure.core.async.pool-size") 8)))

(defn in-dispatch-thread?
"Returns true if the current thread is a go block dispatch pool thread"
Expand All @@ -37,9 +67,40 @@
(.uncaughtException (Thread/currentThread) ex))
nil)

(defn- make-ctp-named
[workflow]
(Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name workflow) "-%d") true)))

(defn ^:private create-default-executor
[workload]
(case workload
:compute (make-ctp-named :compute)
:io (make-ctp-named :io)
:mixed (make-ctp-named :mixed)))

(def executor-for
"Given a workload tag, returns an ExecutorService instance and memoizes the result. By
default, core.async will defer to a user factory (if provided via sys prop) or construct
a specialized ExecutorService instance for each tag :io, :compute, and :mixed. When
given the tag :core-async-dispatch it will default to the executor service for :io."
(memoize
(fn ^ExecutorService [workload]
(let [sysprop-factory (when-let [esf (System/getProperty "clojure.core.async.executor-factory")]
(requiring-resolve (symbol esf)))
sp-exec (and sysprop-factory (sysprop-factory workload))]
(or sp-exec
(if (= workload :core-async-dispatch)
(executor-for :io)
(create-default-executor workload)))))))

(defn exec
[^Runnable r workload]
(let [^ExecutorService e (executor-for workload)]
(.execute e r)))

(defn run
"Runs Runnable r on current thread when :on-caller? meta true, else in a thread pool thread."
[^Runnable r]
(if (-> r meta :on-caller?)
(try (.run r) (catch Throwable t (ex-handler t)))
(impl/exec @executor r)))
(exec r :core-async-dispatch)))
32 changes: 0 additions & 32 deletions src/main/clojure/clojure/core/async/impl/exec/threadpool.clj

This file was deleted.

4 changes: 2 additions & 2 deletions src/test/clojure/clojure/core/async/concurrent_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

(ns clojure.core.async.concurrent-test
(:require [clojure.test :refer :all]
[clojure.core.async.impl.concurrent :as conc])
[clojure.core.async.impl.dispatch :as dispatch])
(:import [java.util.concurrent ThreadFactory]))

(deftest test-counted-thread-factory
(testing "Creates numbered threads"
(let [^ThreadFactory factory (conc/counted-thread-factory "foo-%d" true)
(let [^ThreadFactory factory (dispatch/counted-thread-factory "foo-%d" true)
threads (repeatedly 3 #(.newThread factory (constantly nil)))]
(is (= ["foo-1" "foo-2" "foo-3"] (map #(.getName ^Thread %) threads))))))

24 changes: 24 additions & 0 deletions src/test/clojure/clojure/core/async_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,23 @@
(binding [test-dyn true]
(is (<!! (thread test-dyn))))))

(deftest io-thread-tests
(testing "io-thread blocking ops"
(let [c1 (chan)
c2 (chan)
c3 (chan)]
(io-thread (>!! c2 (clojure.string/upper-case (<!! c1))))
(io-thread (>!! c3 (clojure.string/reverse (<!! c2))))
(>!! c1 "loop")
(is (= "POOL" (<!! c3)))))
(testing "io-thread parking op should fail"
(let [c1 (chan)]
(io-thread
(try
(>! c1 :no)
(catch AssertionError _
(>!! c1 :yes))))
(is (= :yes (<!! c1))))))

(deftest ops-tests
(testing "map<"
Expand Down Expand Up @@ -465,3 +482,10 @@
:ok
(catch AssertionError e
:ko))))))

(deftest test-alts-put-nil-invalid
(is
(thrown? AssertionError
(let [c1 (a/chan)
c2 (a/chan)]
(a/alts!! [c1 [c2 nil]])))))

0 comments on commit 78add58

Please sign in to comment.