diff --git a/.release-notes/next-release.md b/.release-notes/next-release.md index 24b23edc75..36820e9eb1 100644 --- a/.release-notes/next-release.md +++ b/.release-notes/next-release.md @@ -110,3 +110,59 @@ Cross-compilation to Linux targets (RISC-V, ARM, ARMhf) now uses the embedded LL The embedded LLD path activates automatically when cross-compiling to a Linux target without `--linker` set. To use an external linker instead, pass `--linker=` as an escape hatch to the legacy linking path. +## Add capability security and multi-subscriber support to signal handling + +The `signals` package now provides capability security and signal number validation. `SignalHandler` requires a `SignalAuth` capability (derived from `AmbientAuth`) and a `ValidSignal` constrained type that enforces platform-specific whitelists, preventing registration of fatal signals like `SIGSEGV` or uncatchable signals like `SIGKILL`. Multiple actors can now subscribe to the same signal — up to 16 subscribers per signal number, with all subscribers notified when the signal fires. + +```pony +use "constrained_types" +use "signals" + +actor Main + new create(env: Env) => + let auth = SignalAuth(env.root) + match MakeValidSignal(Sig.int()) + | let sig: ValidSignal => + let handler = SignalHandler(auth, MyNotify, sig) + end +``` + +## Signal handling API requires `SignalAuth` and `ValidSignal` + +The `SignalHandler` constructor now requires a `SignalAuth` capability and a `ValidSignal` constrained type instead of a raw `U32` signal number. `ANSITerm.create` also requires a `SignalAuth` parameter as its first argument. + +Before: + +```pony +use "signals" + +let handler = SignalHandler(MyNotify, Sig.int()) +``` + +```pony +use "term" + +let term = ANSITerm(handler, env.input) +``` + +After: + +```pony +use "constrained_types" +use "signals" + +let auth = SignalAuth(env.root) +match MakeValidSignal(Sig.int()) +| let sig: ValidSignal => + let handler = SignalHandler(auth, MyNotify, sig) +end +``` + +```pony +use "signals" +use "term" + +let auth = SignalAuth(env.root) +let term = ANSITerm(auth, handler, env.input) +``` + diff --git a/CHANGELOG.md b/CHANGELOG.md index abe78b3d34..547a888df3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,12 +18,15 @@ All notable changes to the Pony compiler and standard library will be documented - Compile-time string literal concatenation ([PR #4900](https://github.com/ponylang/ponyc/pull/4900)) - Add `--sysroot` option for cross-compilation ([PR #4964](https://github.com/ponylang/ponyc/pull/4964)) +- Add capability security and multi-subscriber support to signal handling ([PR #4984](https://github.com/ponylang/ponyc/pull/4984)) ### Changed - Update to LLVM 21.1.8 ([PR #4876](https://github.com/ponylang/ponyc/pull/4876)) - Exempt unsplittable string literals from line length rule ([PR #4923](https://github.com/ponylang/ponyc/pull/4923)) - Use embedded LLD for cross-compilation to Linux targets ([PR #4964](https://github.com/ponylang/ponyc/pull/4964)) +- Signal handling API requires `SignalAuth` and `ValidSignal` ([PR #4984](https://github.com/ponylang/ponyc/pull/4984)) +- `ANSITerm` constructor requires `SignalAuth` parameter ([PR #4984](https://github.com/ponylang/ponyc/pull/4984)) ## [0.61.0] - 2026-02-28 diff --git a/examples/README.md b/examples/README.md index b205731844..0e91212107 100644 --- a/examples/README.md +++ b/examples/README.md @@ -92,7 +92,7 @@ Floods a single `Receiver` actor with messages from multiple senders to demonstr Establishes a TCP connection and applies explicit backpressure when the send buffer fills up. Demonstrates the `backpressure` package's `Backpressure` primitive, `ApplyReleaseBackpressureAuth`, and throttled/unthrottled callbacks for detecting when the scheduler responds to pressure. -## File I/O and Terminal +## File I/O, Terminal, and Signals ### [files](files/) @@ -102,6 +102,10 @@ Reads a file specified as a command-line argument and prints its path and conten An interactive command-line prompt with tab completion and command history. Demonstrates the `term` package's `Readline` and `ReadlineNotify` interfaces, and `Promise`-based prompt control where rejecting the promise exits the loop. +### [signals](signals/) + +Registers two handlers for SIGINT, raises the signal programmatically, and disposes one handler. Demonstrates the `signals` package's capability-secured API: `SignalAuth` for authorization, `MakeValidSignal` for signal validation via constrained types, `SignalHandler` for subscription, and `SignalNotify` for callbacks. Shows that multiple handlers can subscribe to the same signal. + ## C FFI ### [ffi-callbacks](ffi-callbacks/) diff --git a/examples/readline/main.pony b/examples/readline/main.pony index a30bd5ea78..fc93fe1c7b 100644 --- a/examples/readline/main.pony +++ b/examples/readline/main.pony @@ -1,3 +1,4 @@ +use "signals" use "term" use "promises" @@ -44,7 +45,8 @@ actor Main env.out.print("Use 'quit' to exit.") // Building a delegate manually - let term = ANSITerm(Readline(recover Handler end, env.out), env.input) + let term = ANSITerm(SignalAuth(env.root), + Readline(recover Handler end, env.out), env.input) term.prompt("0 > ") let notify = object iso diff --git a/examples/signals/main.pony b/examples/signals/main.pony new file mode 100644 index 0000000000..0a7a1b08ee --- /dev/null +++ b/examples/signals/main.pony @@ -0,0 +1,50 @@ +use "constrained_types" +use "signals" + +class Handler is SignalNotify + let _env: Env + let _name: String + + new iso create(env: Env, name: String) => + _env = env + _name = name + + fun ref apply(count: U32): Bool => + _env.out.print(_name + " received signal (count: " + count.string() + ")") + true + + fun ref dispose() => + _env.out.print(_name + " disposed") + +actor Main + """ + Demonstrates the signals package: + - Creating a SignalAuth capability from AmbientAuth + - Validating signal numbers with MakeValidSignal + - Registering multiple handlers for the same signal + - Raising a signal programmatically + - Disposing a handler + + Run the program and press Ctrl-C to send SIGINT, or wait for the + programmatic raise to fire both handlers. + """ + new create(env: Env) => + let auth = SignalAuth(env.root) + + match MakeValidSignal(Sig.int()) + | let sig: ValidSignal => + let h1 = SignalHandler(auth, Handler(env, "handler-1"), sig) + let h2 = SignalHandler(auth, Handler(env, "handler-2"), sig) + + env.out.print("Two handlers registered for SIGINT.") + env.out.print("Raising SIGINT programmatically...") + h1.raise(auth) + + // Dispose handler-2 to show cleanup + h2.dispose(auth) + | let f: ValidationFailure => + env.out.print("Failed to validate SIGINT") + for e in f.errors().values() do + env.out.print(" " + e) + end + end diff --git a/packages/bureaucracy/custodian.pony b/packages/bureaucracy/custodian.pony index d72157bd5a..45c77b034d 100644 --- a/packages/bureaucracy/custodian.pony +++ b/packages/bureaucracy/custodian.pony @@ -13,6 +13,7 @@ actor Custodian ```pony use "bureaucracy" + use "constrained_types" use "signals" actor Actor1 @@ -35,7 +36,11 @@ actor Custodian custodian(actor2) custodian(actor3) - SignalHandler(TermHandler(custodian), Sig.term()) + let auth = SignalAuth(env.root) + match MakeValidSignal(Sig.term()) + | let sig: ValidSignal => + SignalHandler(auth, TermHandler(custodian), sig) + end class TermHandler is SignalNotify let _custodian: Custodian diff --git a/packages/signals/_test.pony b/packages/signals/_test.pony index 019f4fb09b..4aa4acdf0f 100644 --- a/packages/signals/_test.pony +++ b/packages/signals/_test.pony @@ -1,3 +1,4 @@ +use "constrained_types" use "pony_test" actor \nodoc\ Main is TestList @@ -5,9 +6,138 @@ actor \nodoc\ Main is TestList new make() => None fun tag tests(test: PonyTest) => + test(_TestValidSignalAcceptsHandleable) + test(_TestValidSignalRejectsFatal) + test(_TestValidSignalRejectsUncatchable) + test(_TestValidSignalRejectsUnknown) test(_TestSignalINT) + test(_TestMultipleHandlers) + test(_TestDispose) + test(_TestNotifyReturnsFalse) -class \nodoc\ _TestSighupNotify is SignalNotify +class \nodoc\ iso _TestValidSignalAcceptsHandleable is UnitTest + """ + Verify that all handleable signals pass SignalValidator. + """ + fun name(): String => "signals/ValidSignal accepts handleable" + + fun apply(h: TestHelper) => + ifdef linux then + _assert_valid(h, Sig.hup()) + _assert_valid(h, Sig.int()) + _assert_valid(h, Sig.quit()) + _assert_valid(h, Sig.pipe()) + _assert_valid(h, Sig.alrm()) + _assert_valid(h, Sig.term()) + _assert_valid(h, Sig.urg()) + _assert_valid(h, Sig.stkflt()) + _assert_valid(h, Sig.tstp()) + _assert_valid(h, Sig.cont()) + _assert_valid(h, Sig.chld()) + _assert_valid(h, Sig.ttin()) + _assert_valid(h, Sig.ttou()) + _assert_valid(h, Sig.io()) + _assert_valid(h, Sig.xcpu()) + _assert_valid(h, Sig.xfsz()) + _assert_valid(h, Sig.vtalrm()) + _assert_valid(h, Sig.prof()) + _assert_valid(h, Sig.winch()) + _assert_valid(h, Sig.pwr()) + _assert_valid(h, Sig.usr1()) + _assert_valid(h, Sig.sys()) + elseif bsd or osx then + _assert_valid(h, Sig.hup()) + _assert_valid(h, Sig.int()) + _assert_valid(h, Sig.quit()) + _assert_valid(h, Sig.emt()) + _assert_valid(h, Sig.pipe()) + _assert_valid(h, Sig.alrm()) + _assert_valid(h, Sig.term()) + _assert_valid(h, Sig.urg()) + _assert_valid(h, Sig.tstp()) + _assert_valid(h, Sig.cont()) + _assert_valid(h, Sig.chld()) + _assert_valid(h, Sig.ttin()) + _assert_valid(h, Sig.ttou()) + _assert_valid(h, Sig.io()) + _assert_valid(h, Sig.xcpu()) + _assert_valid(h, Sig.xfsz()) + _assert_valid(h, Sig.vtalrm()) + _assert_valid(h, Sig.prof()) + _assert_valid(h, Sig.winch()) + _assert_valid(h, Sig.info()) + _assert_valid(h, Sig.usr1()) + _assert_valid(h, Sig.sys()) + end + + fun _assert_valid(h: TestHelper, sig: U32) => + match MakeValidSignal(sig) + | let _: ValidSignal => None + | let _: ValidationFailure => + h.fail("signal " + sig.string() + " should be valid") + end + +class \nodoc\ iso _TestValidSignalRejectsFatal is UnitTest + """ + Verify that fatal signals are rejected by SignalValidator. + """ + fun name(): String => "signals/ValidSignal rejects fatal" + + fun apply(h: TestHelper) => + ifdef linux or bsd or osx then + _assert_invalid(h, Sig.ill()) + _assert_invalid(h, Sig.trap()) + _assert_invalid(h, Sig.abrt()) + _assert_invalid(h, Sig.fpe()) + _assert_invalid(h, Sig.bus()) + _assert_invalid(h, Sig.segv()) + end + + fun _assert_invalid(h: TestHelper, sig: U32) => + match MakeValidSignal(sig) + | let _: ValidSignal => + h.fail("signal " + sig.string() + " should be rejected (fatal)") + | let _: ValidationFailure => None + end + +class \nodoc\ iso _TestValidSignalRejectsUncatchable is UnitTest + """ + Verify that uncatchable signals are rejected by SignalValidator. + """ + fun name(): String => "signals/ValidSignal rejects uncatchable" + + fun apply(h: TestHelper) => + ifdef linux or bsd or osx then + _assert_invalid(h, Sig.kill()) + _assert_invalid(h, Sig.stop()) + end + + fun _assert_invalid(h: TestHelper, sig: U32) => + match MakeValidSignal(sig) + | let _: ValidSignal => + h.fail("signal " + sig.string() + " should be rejected (uncatchable)") + | let _: ValidationFailure => None + end + +class \nodoc\ iso _TestValidSignalRejectsUnknown is UnitTest + """ + Verify that arbitrary unknown signal numbers are rejected. + """ + fun name(): String => "signals/ValidSignal rejects unknown" + + fun apply(h: TestHelper) => + _assert_invalid(h, 0) + _assert_invalid(h, 200) + _assert_invalid(h, U32.max_value()) + + fun _assert_invalid(h: TestHelper, sig: U32) => + match MakeValidSignal(sig) + | let _: ValidSignal => + h.fail("signal " + sig.string() + " should be rejected (unknown)") + | let _: ValidationFailure => None + end + +class \nodoc\ _TestSignalNotify is SignalNotify let _h: TestHelper new iso create(h: TestHelper) => @@ -23,15 +153,148 @@ class \nodoc\ iso _TestSignalINT is UnitTest fun name(): String => "signals/INT" fun ref apply(h: TestHelper) => - let signal = SignalHandler(_TestSighupNotify(h), Sig.int()) - signal.raise() - _signal = signal - h.long_test(2_000_000_000) // 2 second timeout + let auth = SignalAuth(h.env.root) + match MakeValidSignal(Sig.int()) + | let sig: ValidSignal => + let signal = SignalHandler(auth, _TestSignalNotify(h), sig) + signal.raise(auth) + _signal = signal + h.long_test(10_000_000_000) + | let _: ValidationFailure => + h.fail("SIGINT should be a valid signal") + end fun timed_out(h: TestHelper) => + let auth = SignalAuth(h.env.root) try - (_signal as SignalHandler).dispose() + (_signal as SignalHandler).dispose(auth) + end + h.fail("timeout") + h.complete(false) + +class \nodoc\ _TestMultiHandlerNotify is SignalNotify + let _h: TestHelper + let _action: String + + new iso create(h: TestHelper, action: String) => + _h = h + _action = action + + fun ref apply(count: U32): Bool => + _h.complete_action(_action) + true + +class \nodoc\ iso _TestMultipleHandlers is UnitTest + """ + Verify that multiple handlers for the same signal all get notified. + Handlers return true to stay registered; the test disposes them on + completion to avoid auto-dispose interfering with the second handler. + """ + var _signal1: (SignalHandler | None) = None + var _signal2: (SignalHandler | None) = None + + fun name(): String => "signals/multiple handlers" + + fun ref apply(h: TestHelper) => + let auth = SignalAuth(h.env.root) + h.expect_action("handler1") + h.expect_action("handler2") + match MakeValidSignal(Sig.int()) + | let sig: ValidSignal => + let s1 = SignalHandler(auth, + _TestMultiHandlerNotify(h, "handler1"), sig) + let s2 = SignalHandler(auth, + _TestMultiHandlerNotify(h, "handler2"), sig) + // Each handler raises so that same-actor message ordering guarantees + // its constructor (and thus subscription) has completed before the + // raise executes. + s1.raise(auth) + s2.raise(auth) + _signal1 = s1 + _signal2 = s2 + h.long_test(10_000_000_000) + | let _: ValidationFailure => + h.fail("SIGINT should be a valid signal") end + fun timed_out(h: TestHelper) => + let auth = SignalAuth(h.env.root) + try (_signal1 as SignalHandler).dispose(auth) end + try (_signal2 as SignalHandler).dispose(auth) end + h.fail("timeout") + h.complete(false) + +class \nodoc\ _TestDisposeNotify is SignalNotify + let _h: TestHelper + + new iso create(h: TestHelper) => + _h = h + + fun ref apply(count: U32): Bool => + true + + fun ref dispose() => + // Dispose callback was invoked — test passes + _h.complete(true) + +class \nodoc\ iso _TestDispose is UnitTest + """ + Verify that disposing a SignalHandler calls the notify's dispose method. + """ + + fun name(): String => "signals/dispose" + + fun ref apply(h: TestHelper) => + let auth = SignalAuth(h.env.root) + match MakeValidSignal(Sig.int()) + | let sig: ValidSignal => + let signal = SignalHandler(auth, _TestDisposeNotify(h), sig) + signal.dispose(auth) + h.long_test(10_000_000_000) + | let _: ValidationFailure => + h.fail("SIGINT should be a valid signal") + end + + fun timed_out(h: TestHelper) => + h.fail("dispose callback was not called") + h.complete(false) + +class \nodoc\ _TestReturnsFalseNotify is SignalNotify + let _h: TestHelper + + new iso create(h: TestHelper) => + _h = h + + fun ref apply(count: U32): Bool => + false + + fun ref dispose() => + _h.complete(true) + +class \nodoc\ iso _TestNotifyReturnsFalse is UnitTest + """ + Verify that a notify returning false auto-disposes the handler. + """ + var _signal: (SignalHandler | None) = None + + fun name(): String => "signals/notify returns false" + + fun ref apply(h: TestHelper) => + let auth = SignalAuth(h.env.root) + match MakeValidSignal(Sig.int()) + | let sig: ValidSignal => + let signal = SignalHandler(auth, _TestReturnsFalseNotify(h), sig) + signal.raise(auth) + _signal = signal + h.long_test(10_000_000_000) + | let _: ValidationFailure => + h.fail("SIGINT should be a valid signal") + end + + fun timed_out(h: TestHelper) => + let auth = SignalAuth(h.env.root) + try + (_signal as SignalHandler).dispose(auth) + end h.fail("timeout") h.complete(false) diff --git a/packages/signals/auth.pony b/packages/signals/auth.pony new file mode 100644 index 0000000000..104e1736f2 --- /dev/null +++ b/packages/signals/auth.pony @@ -0,0 +1,10 @@ +primitive SignalAuth + """ + Authority to create signal handlers, raise signals, and dispose of signal + handlers. + + This is derived directly from `AmbientAuth`. There is no intermediate + grouping — signals are a distinct resource category. + """ + new create(from: AmbientAuth) => + None diff --git a/packages/signals/signal_handler.pony b/packages/signals/signal_handler.pony index 05fe769c98..5d9126153e 100644 --- a/packages/signals/signal_handler.pony +++ b/packages/signals/signal_handler.pony @@ -10,31 +10,41 @@ use @pony_asio_event_destroy[None](event: AsioEventID) actor SignalHandler is AsioEventNotify """ Listen for a specific signal. - If the wait parameter is true, the program will not terminate until the SignalHandler's dispose method is called, or if the SignalNotify returns false, after handling the signal as this also disposes the SignalHandler and unsubscribes it. + Multiple SignalHandlers can be registered for the same signal. All + registered handlers will be notified when the signal is received, in + no particular order. + + If the wait parameter is true, the program will not terminate until + the SignalHandler's dispose method is called, or if the SignalNotify + returns false after handling the signal. Disposing a SignalHandler + unsubscribes it from the signal and is required to allow the runtime + to garbage collect the handler. """ let _notify: SignalNotify let _sig: U32 var _event: AsioEventID - new create(notify: SignalNotify iso, sig: U32, wait: Bool = false) => + new create(auth: SignalAuth, notify: SignalNotify iso, sig: ValidSignal, + wait: Bool = false) + => """ - Create a signal handler. + Create a signal handler for a validated signal number. """ _notify = consume notify - _sig = sig + _sig = sig() _event = - @pony_asio_event_create(this, 0, AsioEvent.signal(), sig.u64(), wait) + @pony_asio_event_create(this, 0, AsioEvent.signal(), _sig.u64(), wait) - be raise() => + be raise(auth: SignalAuth) => """ Raise the signal. """ - SignalRaise(_sig) + SignalRaise(auth, _sig) - be dispose() => + be dispose(auth: SignalAuth) => """ - Dispose of the signal handler. + Dispose of the signal handler, unsubscribing from the signal. """ _dispose() diff --git a/packages/signals/signal_notify.pony b/packages/signals/signal_notify.pony index 1744d747cd..a77a732e04 100644 --- a/packages/signals/signal_notify.pony +++ b/packages/signals/signal_notify.pony @@ -23,8 +23,13 @@ interface SignalNotify primitive SignalRaise """ Raise a signal. + + Unlike SignalHandler, this accepts a raw signal number rather than a + ValidSignal. Raising fatal signals (e.g. SIGABRT to intentionally crash) + is a legitimate operation — it is only handling them via the ASIO + mechanism that is prevented. """ - fun apply(sig: U32) => + fun apply(auth: SignalAuth, sig: U32) => ifdef osx then // On Darwin, @raise delivers the signal to the current thread, not the // process, but kqueue EVFILT_SIGNAL will only see signals delivered to diff --git a/packages/signals/signals.pony b/packages/signals/signals.pony index b1aa1d4a16..3d798975d1 100644 --- a/packages/signals/signals.pony +++ b/packages/signals/signals.pony @@ -1,36 +1,67 @@ """ # Signals package -The Signals package provides support for handling Unix style signals. +The Signals package provides support for handling Unix style signals with +capability security and support for multiple handlers per signal. + +## Overview + For each signal that you want to handle, you need to create a `SignalHandler` -and a corresponding `SignalNotify` object. Each SignalHandler runs as it own -actor and upon receiving the signal will call its corresponding -`SignalNotify`'s apply method. +and a corresponding `SignalNotify` object. Multiple `SignalHandler` actors can +be registered for the same signal — all registered handlers will be notified +when the signal is received, in no particular order. -## Example program +Signal handling requires a `SignalAuth` capability derived from `AmbientAuth`, +consistent with how other I/O primitives in the standard library handle +resource access. -The following program will listen for the TERM signal and output a message to -standard out if it is received. +Signal numbers must be validated through `MakeValidSignal` before they can +be used with `SignalHandler`. This prevents registration of handlers for +fatal signals (SIGILL, SIGTRAP, SIGABRT, SIGFPE, SIGBUS, SIGSEGV) and +uncatchable signals (SIGKILL, SIGSTOP) that cannot be meaningfully handled +via the ASIO mechanism. + +## Example program ```pony +use "constrained_types" use "signals" actor Main new create(env: Env) => - // Create a TERM handler - let signal = SignalHandler(TermHandler(env), Sig.term()) - // Raise TERM signal - signal.raise() + let auth = SignalAuth(env.root) + + match MakeValidSignal(Sig.term()) + | let sig: ValidSignal => + // Multiple handlers for the same signal + SignalHandler(auth, LogHandler(env.out), sig) + SignalHandler(auth, CleanupHandler(env.out), sig where wait = true) + | let err: ValidationFailure => + env.err.print("Cannot handle this signal") + end -class TermHandler is SignalNotify - let _env: Env +class LogHandler is SignalNotify + let _out: OutStream - new iso create(env: Env) => - _env = env + new iso create(out: OutStream) => + _out = out fun ref apply(count: U32): Bool => - _env.out.print("TERM signal received") + _out.print("Signal received, count: " + count.string()) true + +class CleanupHandler is SignalNotify + let _out: OutStream + + new iso create(out: OutStream) => + _out = out + + fun ref apply(count: U32): Bool => + _out.print("Cleaning up...") + false + + fun ref dispose() => + _out.print("Cleanup handler disposed") ``` ## Signal portability @@ -39,11 +70,11 @@ The `Sig` primitive provides support for portable signal handling across Linux, FreeBSD and OSX. Signals are not supported on Windows and attempting to use them will cause a compilation error. -## Shutting down handlers - -Unlike a `TCPConnection` and other forms of input receiving, creating a -`SignalHandler` will not keep your program running. As such, you are not -required to call `dispose` on your signal handlers in order to shutdown your -program. +## Disposing handlers +Disposing a `SignalHandler` unsubscribes it from the signal. This is important +because the signal dispatch mechanism holds a reference to each subscriber — +without explicit disposal, handlers will never be garbage collected. If a +`SignalHandler` is created with `wait = true`, disposing it (or returning +`false` from the notifier) is required to allow the program to terminate. """ diff --git a/packages/signals/valid_signal.pony b/packages/signals/valid_signal.pony new file mode 100644 index 0000000000..584f2619e9 --- /dev/null +++ b/packages/signals/valid_signal.pony @@ -0,0 +1,80 @@ +use "constrained_types" + +primitive SignalValidator is Validator[U32] + """ + Validates that a signal number is handleable via the ASIO mechanism. + + Only signals that can be safely caught and dispatched to Pony actors + are accepted. Fatal signals (SIGILL, SIGTRAP, SIGABRT, SIGFPE, SIGBUS, + SIGSEGV), uncatchable signals (SIGKILL, SIGSTOP), and unknown signal + numbers are rejected. + """ + fun apply(sig: U32): ValidationResult => + if _is_handleable(sig) then + ValidationSuccess + else + recover val ValidationFailure(sig.string() + " is not a handleable signal") end + end + + fun _is_handleable(sig: U32): Bool => + ifdef bsd or osx then + (sig == Sig.hup()) or (sig == Sig.int()) or (sig == Sig.quit()) + or (sig == Sig.emt()) or (sig == Sig.pipe()) or (sig == Sig.alrm()) + or (sig == Sig.term()) or (sig == Sig.urg()) or (sig == Sig.tstp()) + or (sig == Sig.cont()) or (sig == Sig.chld()) or (sig == Sig.ttin()) + or (sig == Sig.ttou()) or (sig == Sig.io()) or (sig == Sig.xcpu()) + or (sig == Sig.xfsz()) or (sig == Sig.vtalrm()) or (sig == Sig.prof()) + or (sig == Sig.winch()) or (sig == Sig.info()) or (sig == Sig.usr1()) + or (sig == Sig.sys()) + or _is_usr2(sig) + or _is_rt(sig) + elseif linux then + (sig == Sig.hup()) or (sig == Sig.int()) or (sig == Sig.quit()) + or (sig == Sig.pipe()) or (sig == Sig.alrm()) or (sig == Sig.term()) + or (sig == Sig.urg()) or (sig == Sig.stkflt()) or (sig == Sig.tstp()) + or (sig == Sig.cont()) or (sig == Sig.chld()) or (sig == Sig.ttin()) + or (sig == Sig.ttou()) or (sig == Sig.io()) or (sig == Sig.xcpu()) + or (sig == Sig.xfsz()) or (sig == Sig.vtalrm()) or (sig == Sig.prof()) + or (sig == Sig.winch()) or (sig == Sig.pwr()) or (sig == Sig.usr1()) + or (sig == Sig.sys()) + or _is_usr2(sig) + or _is_rt(sig) + elseif windows then + // Windows signal() only supports SIGINT and SIGTERM as catchable + // signals. SIGABRT, SIGFPE, SIGILL, SIGSEGV are fatal. + (sig == Sig.int()) or (sig == Sig.term()) + else + false + end + + fun _is_usr2(sig: U32): Bool => + // SIGUSR2 is only available when the runtime doesn't use it for + // scheduler scaling. Sig.usr2() has a compile_error when + // scheduler_scaling_pthreads is set, so guard the call here. + ifdef not "scheduler_scaling_pthreads" then + ifdef bsd or osx then + sig == Sig.usr2() + elseif linux then + sig == Sig.usr2() + else + false + end + else + false + end + + fun _is_rt(sig: U32): Bool => + ifdef bsd then + (sig >= 65) and (sig <= 126) + elseif linux then + (sig >= 32) and (sig <= 64) + else + false + end + +// A signal number that has been validated as handleable. +type ValidSignal is Constrained[U32, SignalValidator] + +// Factory for creating ValidSignal instances. Returns either a ValidSignal +// or a ValidationFailure with an error message. +type MakeValidSignal is MakeConstrained[U32, SignalValidator] diff --git a/packages/term/ansi_term.pony b/packages/term/ansi_term.pony index 2a2bb5b950..82274a501d 100644 --- a/packages/term/ansi_term.pony +++ b/packages/term/ansi_term.pony @@ -1,3 +1,4 @@ +use "constrained_types" use "time" use "signals" use @ioctl[I32](fx: I32, cmd: ULong, ...) if posix @@ -57,6 +58,7 @@ actor ANSITerm var _closed: Bool = false new create( + auth: SignalAuth, notify: ANSINotify iso, source: DisposableActor, timers: Timers = Timers) @@ -69,7 +71,10 @@ actor ANSITerm _source = source ifdef not windows then - SignalHandler(recover _TermResizeNotify(this) end, Sig.winch()) + match MakeValidSignal(Sig.winch()) + | let sig: ValidSignal => + SignalHandler(auth, recover _TermResizeNotify(this) end, sig) + end end _size() diff --git a/src/libponyrt/asio/epoll.c b/src/libponyrt/asio/epoll.c index 1222750dae..583336a31f 100644 --- a/src/libponyrt/asio/epoll.c +++ b/src/libponyrt/asio/epoll.c @@ -24,13 +24,21 @@ #endif #define MAX_SIGNAL 128 +#define MAX_SIGNAL_SUBSCRIBERS 16 + +#define ASIO_CANCEL_SIGNAL 10 + +typedef struct signal_subscribers_t { + PONY_ATOMIC(int) eventfd; // shared eventfd for this signal (-1 if none) + PONY_ATOMIC(asio_event_t*) subscribers[MAX_SIGNAL_SUBSCRIBERS]; +} signal_subscribers_t; struct asio_backend_t { int epfd; int wakeup; /* eventfd to break epoll loop */ struct epoll_event events[MAX_EVENTS]; - PONY_ATOMIC(asio_event_t*) sighandlers[MAX_SIGNAL]; + signal_subscribers_t sighandlers[MAX_SIGNAL]; PONY_ATOMIC(bool) terminate; messageq_t q; }; @@ -60,17 +68,15 @@ static void signal_handler(int sig) asio_backend_t* b = ponyint_asio_get_backend(); pony_assert(b != NULL); - asio_event_t* ev = atomic_load_explicit(&b->sighandlers[sig], - memory_order_acquire); -#ifdef USE_VALGRIND - ANNOTATE_HAPPENS_AFTER(&b->sighandlers[sig]); -#endif - - if(ev == NULL) + // The eventfd is set (with release) before sigaction installs this handler, + // so the happens-before ordering is established. + int fd = atomic_load_explicit(&b->sighandlers[sig].eventfd, + memory_order_acquire); + if(fd < 0) return; - eventfd_write(ev->fd, 1); + eventfd_write(fd, 1); } #if !defined(USE_SCHEDULER_SCALING_PTHREADS) @@ -98,6 +104,73 @@ static void handle_queue(asio_backend_t* b) pony_asio_event_send(ev, ASIO_DISPOSABLE, 0); break; + case ASIO_CANCEL_SIGNAL: + { + int sig = (int)ev->nsec; + + if(sig >= MAX_SIGNAL) + { + ev->flags = ASIO_DISPOSABLE; + pony_asio_event_send(ev, ASIO_DISPOSABLE, 0); + break; + } + + signal_subscribers_t* subs = &b->sighandlers[sig]; + + // Remove ev from subscriber array (set slot to NULL) + for(size_t i = 0; i < MAX_SIGNAL_SUBSCRIBERS; i++) + { + if(atomic_load_explicit(&subs->subscribers[i], + memory_order_acquire) == ev) + { + atomic_store_explicit(&subs->subscribers[i], NULL, + memory_order_release); + break; + } + } + + // Check if all subscribers are gone + bool has_subscribers = false; + for(size_t i = 0; i < MAX_SIGNAL_SUBSCRIBERS; i++) + { + if(atomic_load_explicit(&subs->subscribers[i], + memory_order_acquire) != NULL) + { + has_subscribers = true; + break; + } + } + + if(!has_subscribers) + { + int fd = atomic_load_explicit(&subs->eventfd, memory_order_acquire); + if(fd >= 0) + { + // Last subscriber: restore default signal handling, clean up + struct sigaction new_action; + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + if(sig == PONY_SCHED_SLEEP_WAKE_SIGNAL) + new_action.sa_handler = empty_signal_handler; + else +#endif + new_action.sa_handler = SIG_DFL; + + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = SA_RESTART; + sigaction(sig, &new_action, NULL); + + epoll_ctl(b->epfd, EPOLL_CTL_DEL, fd, NULL); + close(fd); + atomic_store_explicit(&subs->eventfd, -1, memory_order_release); + } + } + + ev->flags = ASIO_DISPOSABLE; + pony_asio_event_send(ev, ASIO_DISPOSABLE, 0); + break; + } + default: {} } } @@ -109,6 +182,11 @@ asio_backend_t* ponyint_asio_backend_init() memset(b, 0, sizeof(asio_backend_t)); ponyint_messageq_init(&b->q); + // Initialize all signal eventfds to -1 (0 is a valid fd) + for(int i = 0; i < MAX_SIGNAL; i++) + atomic_store_explicit(&b->sighandlers[i].eventfd, -1, + memory_order_relaxed); + b->epfd = epoll_create1(EPOLL_CLOEXEC); b->wakeup = eventfd(0, EFD_NONBLOCK); @@ -237,6 +315,8 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) SYSTEMATIC_TESTING_YIELD(); + handle_queue(b); + for(int i = 0; i < event_cnt; i++) { struct epoll_event* ep = &(b->events[i]); @@ -244,6 +324,31 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) if(ep->data.ptr == b) continue; + // Check if this is a signal event (ptr into sighandlers array) + if(ep->data.ptr >= (void*)b->sighandlers && + ep->data.ptr < (void*)&b->sighandlers[MAX_SIGNAL]) + { + signal_subscribers_t* subs = (signal_subscribers_t*)ep->data.ptr; + if(ep->events & (EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR)) + { + int fd = atomic_load_explicit(&subs->eventfd, memory_order_acquire); + if(fd >= 0) + { + uint64_t count; + ssize_t rc = read(fd, &count, sizeof(uint64_t)); + (void)rc; + for(size_t j = 0; j < MAX_SIGNAL_SUBSCRIBERS; j++) + { + asio_event_t* sub = atomic_load_explicit( + &subs->subscribers[j], memory_order_acquire); + if(sub != NULL) + pony_asio_event_send(sub, ASIO_SIGNAL, (uint32_t)count); + } + } + } + continue; + } + asio_event_t* ev = ep->data.ptr; uint32_t flags = 0; uint32_t count = 0; @@ -252,12 +357,6 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) { if(ep->events & (EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR)) { - // Send read notification to an actor if either - // * the event is not a one shot event - // * the event is a one shot event and we haven't already sent a notification - // if the event is a one shot event and we have already sent a notification - // don't send another one until we are asked for it again (i.e. the actor - // gets a 0 byte read and sets `readable` to false and resubscribes to reads if(((ev->flags & ASIO_ONESHOT) && !ev->readable) || !(ev->flags & ASIO_ONESHOT)) { ev->readable = true; @@ -270,12 +369,6 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) { if(ep->events & EPOLLOUT) { - // Send write notification to an actor if either - // * the event is not a one shot event - // * the event is a one shot event and we haven't already sent a notification - // if the event is a one shot event and we have already sent a notification - // don't send another one until we are asked for it again (i.e. the actor - // gets partial write and sets `writeable` to false and resubscribes to writes if(((ev->flags & ASIO_ONESHOT) && !ev->writeable) || !(ev->flags & ASIO_ONESHOT)) { flags |= ASIO_WRITE; @@ -295,28 +388,11 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) } } - if(ev->flags & ASIO_SIGNAL) - { - if(ep->events & (EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR)) - { - uint64_t missed; - ssize_t rc = read(ev->fd, &missed, sizeof(uint64_t)); - (void)rc; - flags |= ASIO_SIGNAL; - count = (uint32_t)missed; - } - } - - // if we had a valid event of some type that needs to be sent - // to an actor if(flags != 0) { - // send the event to the actor pony_asio_event_send(ev, flags, count); } } - - handle_queue(b); } close(b->epfd); @@ -365,6 +441,73 @@ PONY_API void pony_asio_event_subscribe(asio_event_t* ev) ponyint_sched_noisy_asio(pony_scheduler_index()); } + if(ev->flags & ASIO_SIGNAL) + { + int sig = (int)ev->nsec; + + if(sig >= MAX_SIGNAL) + return; + + signal_subscribers_t* subs = &b->sighandlers[sig]; + + // Install the OS signal handler and add the subscriber synchronously + // so they are active before subscribe returns. This prevents a race + // where @raise fires before the handler or subscriber is installed. + int expected = -1; + if(atomic_compare_exchange_strong_explicit(&subs->eventfd, &expected, -2, + memory_order_acq_rel, memory_order_acquire)) + { + // We won the race — first subscriber. Set up the shared eventfd, + // install sigaction, register with epoll. + int fd = eventfd(0, EFD_NONBLOCK); + + struct sigaction new_action; + new_action.sa_handler = signal_handler; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = SA_RESTART; + + struct epoll_event ep; + ep.data.ptr = subs; + ep.events = EPOLLIN | EPOLLET; + epoll_ctl(b->epfd, EPOLL_CTL_ADD, fd, &ep); + + // Store the fd with release ordering BEFORE installing sigaction. + // sigaction provides additional ordering, but the release store + // ensures signal_handler sees the fd. + atomic_store_explicit(&subs->eventfd, fd, memory_order_release); + + sigaction(sig, &new_action, NULL); + } + else + { + // Another thread is setting up this signal (-2 sentinel). Spin until + // setup completes so that signal_handler can see a valid eventfd. + // This loop is very brief — it only waits for eventfd creation, + // epoll registration, and one atomic store. + while(atomic_load_explicit(&subs->eventfd, memory_order_acquire) < 0) + ; + } + + // Add subscriber synchronously using CAS on array slots. This ensures + // the subscriber is visible before the calling actor's next behavior + // (e.g., raise) executes, preventing a race where the signal fires + // before the subscriber is in the list. + bool added = false; + for(size_t i = 0; i < MAX_SIGNAL_SUBSCRIBERS; i++) + { + asio_event_t* expected = NULL; + if(atomic_compare_exchange_strong_explicit(&subs->subscribers[i], + &expected, ev, memory_order_release, memory_order_relaxed)) + { + added = true; + break; + } + } + pony_assert(added); + (void)added; + return; + } + struct epoll_event ep; ep.data.ptr = ev; ep.events = EPOLLRDHUP; @@ -382,47 +525,10 @@ PONY_API void pony_asio_event_subscribe(asio_event_t* ev) ep.events |= EPOLLIN; } - if(ev->flags & ASIO_SIGNAL) - { - int sig = (int)ev->nsec; - asio_event_t* prev = NULL; - -#ifdef USE_VALGRIND - ANNOTATE_HAPPENS_BEFORE(&b->sighandlers[sig]); -#endif - if((sig < MAX_SIGNAL) && - atomic_compare_exchange_strong_explicit(&b->sighandlers[sig], &prev, ev, - memory_order_acq_rel, memory_order_acquire)) - { - struct sigaction new_action; - new_action.sa_handler = signal_handler; - sigemptyset (&new_action.sa_mask); - - // ask to restart interrupted syscalls to match `signal` behavior - new_action.sa_flags = SA_RESTART; - - sigaction(sig, &new_action, NULL); - - ev->fd = eventfd(0, EFD_NONBLOCK); - ep.events |= EPOLLIN; - } else { - return; - } - } - if(ev->flags & ASIO_ONESHOT) { ep.events |= EPOLLONESHOT; } else { - // Only use edge triggering if one shot isn't enabled. - // This is because of how the runtime gets notifications - // from epoll in this ASIO thread and then notifies the - // appropriate actor to read/write as necessary. - // specifically, it seems there's an edge case/race condition - // with edge triggering where if there is already data waiting - // on the socket, then epoll might not be triggering immediately - // when an edge triggered epoll request is made. - ep.events |= EPOLLET; } @@ -459,6 +565,14 @@ PONY_API void pony_asio_event_unsubscribe(asio_event_t* ev) asio_backend_t* b = ponyint_asio_get_backend(); pony_assert(b != NULL); + if(ev->flags & ASIO_SIGNAL) + { + // Signal unregistration is serialized through the ASIO thread. + // The shared eventfd and epoll cleanup are handled in ASIO_CANCEL_SIGNAL. + send_request(ev, ASIO_CANCEL_SIGNAL); + return; + } + epoll_ctl(b->epfd, EPOLL_CTL_DEL, ev->fd, NULL); if(ev->flags & ASIO_TIMER) @@ -470,41 +584,6 @@ PONY_API void pony_asio_event_unsubscribe(asio_event_t* ev) } } - if(ev->flags & ASIO_SIGNAL) - { - int sig = (int)ev->nsec; - asio_event_t* prev = ev; - -#ifdef USE_VALGRIND - ANNOTATE_HAPPENS_BEFORE(&b->sighandlers[sig]); -#endif - if((sig < MAX_SIGNAL) && - atomic_compare_exchange_strong_explicit(&b->sighandlers[sig], &prev, NULL, - memory_order_acq_rel, memory_order_acquire)) - { - struct sigaction new_action; - -#if !defined(USE_SCHEDULER_SCALING_PTHREADS) - // Make sure we ignore signals related to scheduler sleeping/waking - // as the default for those signals is termination - if(sig == PONY_SCHED_SLEEP_WAKE_SIGNAL) - new_action.sa_handler = empty_signal_handler; - else -#endif - new_action.sa_handler = SIG_DFL; - - sigemptyset (&new_action.sa_mask); - - // ask to restart interrupted syscalls to match `signal` behavior - new_action.sa_flags = SA_RESTART; - - sigaction(sig, &new_action, NULL); - - close(ev->fd); - ev->fd = -1; - } - } - ev->flags = ASIO_DISPOSABLE; send_request(ev, ASIO_DISPOSABLE); } diff --git a/src/libponyrt/asio/iocp.c b/src/libponyrt/asio/iocp.c index 8460a3da55..89b4f9d59f 100644 --- a/src/libponyrt/asio/iocp.c +++ b/src/libponyrt/asio/iocp.c @@ -23,12 +23,19 @@ // (the asio background thread). #define MAX_SIGNAL 32 +#define MAX_SIGNAL_SUBSCRIBERS 16 + +typedef struct signal_subscribers_t { + PONY_ATOMIC(int) registered; // 0=no, -1=in-progress, 1=yes + PONY_ATOMIC(asio_event_t*) subscribers[MAX_SIGNAL_SUBSCRIBERS]; +} signal_subscribers_t; struct asio_backend_t { HANDLE wakeup; PONY_ATOMIC(bool) stop; - asio_event_t* sighandlers[MAX_SIGNAL]; + signal_subscribers_t sighandlers[MAX_SIGNAL]; + PONY_ATOMIC(uint32_t) pending_signals; messageq_t q; }; @@ -39,8 +46,7 @@ enum // Event requests ASIO_STDIN_RESUME = 6, ASIO_SET_TIMER = 7, ASIO_CANCEL_TIMER = 8, - ASIO_SET_SIGNAL = 9, - ASIO_CANCEL_SIGNAL = 10 + ASIO_CANCEL_SIGNAL = 9 }; @@ -69,12 +75,17 @@ static void signal_handler(int sig) if(sig >= MAX_SIGNAL) return; - // Reset the signal handler. + // Re-register handler (Windows signal is one-shot) signal(sig, signal_handler); asio_backend_t* b = ponyint_asio_get_backend(); pony_assert(b != NULL); - asio_event_t* ev = b->sighandlers[sig]; - pony_asio_event_send(ev, ASIO_SIGNAL, 1); + + // Set the pending bit and wake the ASIO thread for fan-out. + // This avoids iterating the subscriber list from signal context, + // which would race with the ASIO thread modifying the list. + atomic_fetch_or_explicit(&b->pending_signals, 1u << sig, + memory_order_release); + SetEvent(b->wakeup); } @@ -199,27 +210,48 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) break; } - case ASIO_SET_SIGNAL: + case ASIO_CANCEL_SIGNAL: { int sig = (int)ev->nsec; - if(b->sighandlers[sig] == NULL) + if(sig >= MAX_SIGNAL) { - b->sighandlers[sig] = ev; - signal(sig, signal_handler); + ev->flags = ASIO_DISPOSABLE; + pony_asio_event_send(ev, ASIO_DISPOSABLE, 0); + break; } - break; - } - case ASIO_CANCEL_SIGNAL: - { - asio_event_t* ev = msg->event; - int sig = (int)ev->nsec; + signal_subscribers_t* subs = &b->sighandlers[sig]; - if(b->sighandlers[sig] == ev) + // Remove ev from subscriber array (set slot to NULL) + for(size_t i = 0; i < MAX_SIGNAL_SUBSCRIBERS; i++) + { + if(atomic_load_explicit(&subs->subscribers[i], + memory_order_acquire) == ev) + { + atomic_store_explicit(&subs->subscribers[i], NULL, + memory_order_release); + break; + } + } + + // Check if all subscribers are gone + bool has_subscribers = false; + for(size_t i = 0; i < MAX_SIGNAL_SUBSCRIBERS; i++) + { + if(atomic_load_explicit(&subs->subscribers[i], + memory_order_acquire) != NULL) + { + has_subscribers = true; + break; + } + } + + if(!has_subscribers) { - b->sighandlers[sig] = NULL; signal(sig, SIG_DFL); + atomic_store_explicit(&subs->registered, 0, + memory_order_release); } ev->flags = ASIO_DISPOSABLE; @@ -232,6 +264,23 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) } } + // Process pending signals (set by signal_handler via atomic bitmask) + uint32_t pending = atomic_exchange_explicit(&b->pending_signals, 0, + memory_order_acquire); + while(pending != 0) + { + int sig = (int)__pony_ctz(pending); + pending &= ~(1u << sig); + signal_subscribers_t* subs = &b->sighandlers[sig]; + for(size_t i = 0; i < MAX_SIGNAL_SUBSCRIBERS; i++) + { + asio_event_t* sub = atomic_load_explicit( + &subs->subscribers[i], memory_order_acquire); + if(sub != NULL) + pony_asio_event_send(sub, ASIO_SIGNAL, 1); + } + } + break; } @@ -302,8 +351,42 @@ PONY_API void pony_asio_event_subscribe(asio_event_t* ev) ev->timer = CreateWaitableTimer(NULL, FALSE, NULL); send_request(ev, ASIO_SET_TIMER); } else if((ev->flags & ASIO_SIGNAL) != 0) { - if(ev->nsec < MAX_SIGNAL) - send_request(ev, ASIO_SET_SIGNAL); + int sig = (int)ev->nsec; + if(sig < MAX_SIGNAL) + { + signal_subscribers_t* subs = &b->sighandlers[sig]; + + // Install signal handler synchronously on first subscriber + int expected = 0; + if(atomic_compare_exchange_strong_explicit(&subs->registered, &expected, + -1, memory_order_acq_rel, memory_order_acquire)) + { + signal(sig, signal_handler); + atomic_store_explicit(&subs->registered, 1, memory_order_release); + } + else + { + // Another thread is setting up this signal. Spin until setup completes. + while(atomic_load_explicit(&subs->registered, + memory_order_acquire) < 0) + ; + } + + // Add subscriber synchronously using CAS on array slots + bool added = false; + for(size_t i = 0; i < MAX_SIGNAL_SUBSCRIBERS; i++) + { + asio_event_t* exp = NULL; + if(atomic_compare_exchange_strong_explicit(&subs->subscribers[i], + &exp, ev, memory_order_release, memory_order_relaxed)) + { + added = true; + break; + } + } + pony_assert(added); + (void)added; + } } else if(ev->fd == 0) { // Need to subscribe to stdin send_request(ev, ASIO_STDIN_NOTIFY); diff --git a/src/libponyrt/asio/kqueue.c b/src/libponyrt/asio/kqueue.c index af0d5315a4..0c5be266e5 100644 --- a/src/libponyrt/asio/kqueue.c +++ b/src/libponyrt/asio/kqueue.c @@ -1,3 +1,5 @@ +#define PONY_WANT_ATOMIC_DEFS + #include "asio.h" #include "event.h" #ifdef ASIO_USE_KQUEUE @@ -23,10 +25,21 @@ typedef u_short kevent_flag_t; typedef uint32_t kevent_flag_t; #endif +#define MAX_SIGNAL 128 +#define MAX_SIGNAL_SUBSCRIBERS 16 + +#define ASIO_CANCEL_SIGNAL 10 + +typedef struct signal_subscribers_t { + PONY_ATOMIC(int) registered; // 0=no, -1=in-progress, 1=yes + PONY_ATOMIC(asio_event_t*) subscribers[MAX_SIGNAL_SUBSCRIBERS]; +} signal_subscribers_t; + struct asio_backend_t { int kq; int wakeup[2]; + signal_subscribers_t sighandlers[MAX_SIGNAL]; messageq_t q; }; @@ -37,6 +50,30 @@ static void empty_signal_handler(int sig) } #endif +static void retry_loop(asio_backend_t* b) +{ + char c = 0; + write(b->wakeup[1], &c, 1); +} + +static void send_request(asio_event_t* ev, int req) +{ + asio_backend_t* b = ponyint_asio_get_backend(); + pony_assert(b != NULL); + + asio_msg_t* msg = (asio_msg_t*)pony_alloc_msg( + POOL_INDEX(sizeof(asio_msg_t)), 0); + msg->event = ev; + msg->flags = req; + ponyint_thread_messageq_push(&b->q, (pony_msg_t*)msg, (pony_msg_t*)msg +#ifdef USE_DYNAMIC_TRACE + , pony_scheduler_index(), pony_scheduler_index() +#endif + ); + + retry_loop(b); +} + asio_backend_t* ponyint_asio_backend_init() { asio_backend_t* b = POOL_ALLOC(asio_backend_t); @@ -92,14 +129,81 @@ static void handle_queue(asio_backend_t* b) #endif )) != NULL) { - pony_asio_event_send(msg->event, ASIO_DISPOSABLE, 0); - } -} + asio_event_t* ev = msg->event; -static void retry_loop(asio_backend_t* b) -{ - char c = 0; - write(b->wakeup[1], &c, 1); + switch(msg->flags) + { + case ASIO_DISPOSABLE: + pony_asio_event_send(ev, ASIO_DISPOSABLE, 0); + break; + + case ASIO_CANCEL_SIGNAL: + { + int sig = (int)ev->nsec; + + if(sig >= MAX_SIGNAL) + { + ev->flags = ASIO_DISPOSABLE; + pony_asio_event_send(ev, ASIO_DISPOSABLE, 0); + break; + } + + signal_subscribers_t* subs = &b->sighandlers[sig]; + + // Remove ev from subscriber array (set slot to NULL) + for(size_t i = 0; i < MAX_SIGNAL_SUBSCRIBERS; i++) + { + if(atomic_load_explicit(&subs->subscribers[i], + memory_order_acquire) == ev) + { + atomic_store_explicit(&subs->subscribers[i], NULL, + memory_order_release); + break; + } + } + + // Check if all subscribers are gone + bool has_subscribers = false; + for(size_t i = 0; i < MAX_SIGNAL_SUBSCRIBERS; i++) + { + if(atomic_load_explicit(&subs->subscribers[i], + memory_order_acquire) != NULL) + { + has_subscribers = true; + break; + } + } + + if(!has_subscribers) + { + // Last subscriber: restore default signal handling, remove kevent + struct sigaction new_action; + + new_action.sa_handler = SIG_DFL; +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + if(sig == PONY_SCHED_SLEEP_WAKE_SIGNAL) + new_action.sa_handler = empty_signal_handler; +#endif + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = SA_RESTART; + sigaction(sig, &new_action, NULL); + + struct kevent event; + EV_SET(&event, sig, EVFILT_SIGNAL, EV_DELETE, 0, 0, subs); + struct timespec t = {0, 0}; + kevent(b->kq, &event, 1, NULL, 0, &t); + + atomic_store_explicit(&subs->registered, 0, memory_order_release); + } + + ev->flags = ASIO_DISPOSABLE; + pony_asio_event_send(ev, ASIO_DISPOSABLE, 0); + break; + } + + default: {} + } + } } PONY_API void pony_asio_event_resubscribe_read(asio_event_t* ev) @@ -218,11 +322,26 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) b->kq = -1; } } else { - asio_event_t* ev = ep->udata; - if(ep->flags & EV_ERROR) continue; + // Handle signal events before casting udata to asio_event_t*, + // because for signals udata points to a signal_subscribers_t*. + if(ep->filter == EVFILT_SIGNAL) + { + signal_subscribers_t* subs = (signal_subscribers_t*)ep->udata; + for(size_t j = 0; j < MAX_SIGNAL_SUBSCRIBERS; j++) + { + asio_event_t* sub = atomic_load_explicit( + &subs->subscribers[j], memory_order_acquire); + if(sub != NULL) + pony_asio_event_send(sub, ASIO_SIGNAL, (uint32_t)ep->data); + } + continue; + } + + asio_event_t* ev = ep->udata; + switch(ep->filter) { case EVFILT_READ: @@ -258,13 +377,6 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) } break; - case EVFILT_SIGNAL: - if(ev->flags & ASIO_SIGNAL) - { - pony_asio_event_send(ev, ASIO_SIGNAL, (uint32_t)ep->data); - } - break; - default: {} } } @@ -305,6 +417,63 @@ PONY_API void pony_asio_event_subscribe(asio_event_t* ev) ponyint_sched_noisy_asio(pony_scheduler_index()); } + if(ev->flags & ASIO_SIGNAL) + { + int sig = (int)ev->nsec; + + if(sig >= MAX_SIGNAL) + return; + + signal_subscribers_t* subs = &b->sighandlers[sig]; + + // Install kevent and sigaction synchronously on first subscriber. + // kqueue uses SIG_IGN — EVFILT_SIGNAL detects signals regardless + // of disposition, and SIG_IGN prevents default termination. + int expected = 0; + if(atomic_compare_exchange_strong_explicit(&subs->registered, &expected, + -1, memory_order_acq_rel, memory_order_acquire)) + { + struct sigaction new_action; + new_action.sa_handler = SIG_IGN; +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + if(sig == PONY_SCHED_SLEEP_WAKE_SIGNAL) + new_action.sa_handler = empty_signal_handler; +#endif + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = SA_RESTART; + sigaction(sig, &new_action, NULL); + + struct kevent event; + EV_SET(&event, sig, EVFILT_SIGNAL, EV_ADD | EV_CLEAR, 0, 0, subs); + struct timespec t = {0, 0}; + kevent(b->kq, &event, 1, NULL, 0, &t); + + atomic_store_explicit(&subs->registered, 1, memory_order_release); + } + else + { + // Another thread is setting up this signal. Spin until setup completes. + while(atomic_load_explicit(&subs->registered, memory_order_acquire) < 0) + ; + } + + // Add subscriber synchronously using CAS on array slots + bool added = false; + for(size_t i = 0; i < MAX_SIGNAL_SUBSCRIBERS; i++) + { + asio_event_t* exp = NULL; + if(atomic_compare_exchange_strong_explicit(&subs->subscribers[i], + &exp, ev, memory_order_release, memory_order_relaxed)) + { + added = true; + break; + } + } + pony_assert(added); + (void)added; + return; + } + struct kevent event[4]; int i = 0; @@ -333,28 +502,6 @@ PONY_API void pony_asio_event_subscribe(asio_event_t* ev) i++; } - if(ev->flags & ASIO_SIGNAL) - { - // Make sure we ignore signals related to scheduler sleeping/waking - // as the default for those signals is termination - struct sigaction new_action; - - new_action.sa_handler = SIG_IGN; -#if !defined(USE_SCHEDULER_SCALING_PTHREADS) - if((int)ev->nsec == PONY_SCHED_SLEEP_WAKE_SIGNAL) - new_action.sa_handler = empty_signal_handler; -#endif - sigemptyset (&new_action.sa_mask); - - // ask to restart interrupted syscalls to match `signal` behavior - new_action.sa_flags = SA_RESTART; - - sigaction((int)ev->nsec, &new_action, NULL); - - EV_SET(&event[i], ev->nsec, EVFILT_SIGNAL, EV_ADD | EV_CLEAR, 0, 0, ev); - i++; - } - kevent(b->kq, event, i, NULL, 0, NULL); if(ev->fd == STDIN_FILENO) @@ -409,6 +556,15 @@ PONY_API void pony_asio_event_unsubscribe(asio_event_t* ev) asio_backend_t* b = ponyint_asio_get_backend(); pony_assert(b != NULL); + if(ev->flags & ASIO_SIGNAL) + { + // Signal unregistration is serialized through the ASIO thread. + // The kevent cleanup and sigaction restore are handled in + // ASIO_CANCEL_SIGNAL. + send_request(ev, ASIO_CANCEL_SIGNAL); + return; + } + struct kevent event[4]; int i = 0; @@ -430,28 +586,6 @@ PONY_API void pony_asio_event_unsubscribe(asio_event_t* ev) i++; } - if(ev->flags & ASIO_SIGNAL) - { - // Make sure we ignore signals related to scheduler sleeping/waking - // as the default for those signals is termination - struct sigaction new_action; - - new_action.sa_handler = SIG_DFL; -#if !defined(USE_SCHEDULER_SCALING_PTHREADS) - if((int)ev->nsec == PONY_SCHED_SLEEP_WAKE_SIGNAL) - new_action.sa_handler = empty_signal_handler; -#endif - sigemptyset (&new_action.sa_mask); - - // ask to restart interrupted syscalls to match `signal` behavior - new_action.sa_flags = SA_RESTART; - - sigaction((int)ev->nsec, &new_action, NULL); - - EV_SET(&event[i], ev->nsec, EVFILT_SIGNAL, EV_DELETE, 0, 0, ev); - i++; - } - kevent(b->kq, event, i, NULL, 0, NULL); ev->flags = ASIO_DISPOSABLE;