Skip to content

Commit c46490b

Browse files
committed
WIP: Implements vthreads sysprop but the error modes and messages need cleaning up. defparkingop macro needs thinking, especially how it operates at various times. Need to rethink throwing when parking ops used in io-thread.
1 parent 7047022 commit c46490b

File tree

3 files changed

+40
-8
lines changed

3 files changed

+40
-8
lines changed

src/main/clojure/clojure/core/async.clj

+18-5
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,17 @@ return nil for unexpected contexts."
143143
[^long msecs]
144144
(timers/timeout msecs))
145145

146+
(defmacro defparkingop
147+
[op doc arglist & body]
148+
(let [as (mapv #(list 'quote %) arglist)
149+
delegate (-> op name (str "!") symbol)]
150+
`(def ~(with-meta op {:arglists `(list ~as) :doc doc})
151+
(if (dispatch/targetting-vthreads?)
152+
(fn [~'& ~'args]
153+
~(list* apply delegate '[args]))
154+
(fn ~arglist
155+
~@body)))))
156+
146157
(defmacro defblockingop
147158
[op doc arglist & body]
148159
(let [as (mapv #(list 'quote %) arglist)]
@@ -167,11 +178,11 @@ return nil for unexpected contexts."
167178
@ret
168179
(deref p))))
169180

170-
(defn <!
181+
(defparkingop <!
171182
"takes a val from port. Must be called inside a (go ...) block. Will
172183
return nil if closed. Will park if nothing is available."
173184
[port]
174-
(assert nil "<! used not in (go ...) block"))
185+
(assert nil ">! used not in (go ...) block"))
175186

176187
(defn take!
177188
"Asynchronously takes a val from port, passing to fn1. Will pass nil
@@ -206,7 +217,7 @@ return nil for unexpected contexts."
206217
@ret
207218
(deref p))))
208219

209-
(defn >!
220+
(defparkingop >!
210221
"puts a val into port. nil values are not allowed. Must be called
211222
inside a (go ...) block. Will park if no buffer space is available.
212223
Returns true unless port is already closed."
@@ -349,7 +360,7 @@ return nil for unexpected contexts."
349360
@ret
350361
(deref p))))
351362

352-
(defn alts!
363+
(defparkingop alts!
353364
"Completes at most one of several channel operations. Must be called
354365
inside a (go ...) block. ports is a vector of channel endpoints,
355366
which can be either a channel to take from or a vector of
@@ -492,7 +503,9 @@ return nil for unexpected contexts."
492503
Returns a channel which will receive the result of the body when
493504
completed"
494505
[& body]
495-
(#'clojure.core.async.impl.go/go-impl &env body))
506+
(if (dispatch/targetting-vthreads?)
507+
`(thread-call (^:once fn* [] ~@body) :io)
508+
(#'clojure.core.async.impl.go/go-impl &env body)))
496509

497510
(defn thread-call
498511
"Executes f in another thread, returning immediately to the calling

src/main/clojure/clojure/core/async/impl/dispatch.clj

+20-1
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,28 @@
7878
(catch ClassNotFoundException _
7979
false)))
8080

81+
(def aot-compiling? clojure.core/*compile-files*)
82+
83+
(defn vthreads-directive-of
84+
"Returns the value of the sysprop clojure.core.async.vthreads, that can be one
85+
of three values:
86+
- \"unset\" = default to ioc when aot, always
87+
- \"target\" = target vthreads when compiling go and require them at runtime
88+
use vthreads in io-thread when available
89+
- \"avoid\" = use ioc when compiling go (will work regardless), do not use
90+
vthreads for io-thread"
91+
[s]
92+
(= s (System/getProperty "clojure.core.async.vthreads")))
93+
94+
(defn targetting-vthreads? []
95+
(or (and aot-compiling? (vthreads-directive-of "target"))
96+
(and (not aot-compiling?)
97+
(not (vthreads-directive-of "avoid"))
98+
virtual-threads-available?)))
99+
81100
(defn- make-io-executor
82101
[]
83-
(if virtual-threads-available?
102+
(if (targetting-vthreads?)
84103
(-> (.getDeclaredMethod Executors "newVirtualThreadPerTaskExecutor" (make-array Class 0))
85104
(.invoke nil (make-array Class 0)))
86105
(make-ctp-named :io)))

src/test/clojure/clojure/core/async_test.clj

+2-2
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@
194194
(io-thread (>!! c3 (clojure.string/reverse (<!! c2))))
195195
(>!! c1 "loop")
196196
(is (= "POOL" (<!! c3)))))
197-
(testing "io-thread parking op should fail"
197+
#_(testing "io-thread parking op should fail"
198198
(let [c1 (chan)]
199199
(io-thread
200200
(try
@@ -488,4 +488,4 @@
488488
(thrown? AssertionError
489489
(let [c1 (a/chan)
490490
c2 (a/chan)]
491-
(a/alts!! [c1 [c2 nil]])))))
491+
(a/alts!! [c1 [c2 nil]])))))

0 commit comments

Comments
 (0)