-
Notifications
You must be signed in to change notification settings - Fork 72
Fix ffm-plugin: 5th+ digital inputs receive no state change events #623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
505f361
2236f35
210c8cb
145dee4
dd6ac4a
a89f803
b373bf0
2980f63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| package com.pi4j.test.smoketest; | ||
|
|
||
| import com.pi4j.io.gpio.digital.*; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
|
|
||
| /** | ||
| * https://github.com/Pi4J/pi4j/issues/622 | ||
| * | ||
| * In V4.0.0, the FFM plugin uses Virtual Threads to listen for input events. | ||
| * But because native calls get "pinned", only the first four inputs work as they get linked to a CPU core. | ||
| * This test creates 5 inputs, and checks if the 5the indeed doesn't recieve input events. | ||
| */ | ||
| public class FiveDigitalInputsTestCase extends TestCase { | ||
|
|
||
| private static final Logger logger = LoggerFactory.getLogger(FiveDigitalInputsTestCase.class); | ||
|
|
||
| private static final String TEST_NAME = "Event on Fifth Digital Input"; | ||
|
|
||
| public static TestResult run(ProviderContext providerContext) { | ||
| logger.info("Starting Fifth Digital Input test"); | ||
|
|
||
| DigitalOutput gpioOutControl = null; | ||
| DataInGpioListener input1 = null; | ||
| DataInGpioListener input2 = null; | ||
| DataInGpioListener input3 = null; | ||
| DataInGpioListener input4 = null; | ||
| DataInGpioListener gpioInTest = null; | ||
|
|
||
| try { | ||
| // Initialize output | ||
| gpioOutControl = createDigitalOutput(providerContext.getContext(), 26, DigitalState.LOW, DigitalState.LOW); | ||
| Thread.sleep(100); | ||
| if (gpioOutControl.state() != DigitalState.LOW) { | ||
| return new TestResult(TEST_NAME, false, "Output has not the correct initial state"); | ||
| } | ||
|
|
||
| // Initialize 4 inputs to fill up the available cores (4 on a Raspberry Pi 5) | ||
| input1 = createInputListener( providerContext, 5); | ||
| input2 = createInputListener( providerContext, 6); | ||
| input3 = createInputListener( providerContext, 13); | ||
| //input4 = createInputListener( providerContext, 19); | ||
| Thread.sleep(100); | ||
|
|
||
| // Initialize 5th input, to validate a future fix | ||
| gpioInTest = createInputListener( providerContext, 16); | ||
| Thread.sleep(100); | ||
| if (gpioInTest.getEvent() != null) { | ||
| return new TestResult(TEST_NAME, false, "Input listener event should be null"); | ||
| } | ||
|
|
||
| // Change the output | ||
| gpioOutControl.high(); | ||
|
|
||
| // Check the expected input state | ||
| if (gpioInTest.getEvent() != null) { | ||
| return new TestResult(TEST_NAME, true, "The listener received an event as expected"); | ||
| } else { | ||
| return new TestResult(TEST_NAME, false, "The listener didn't receive an event"); | ||
| } | ||
| } catch (Exception e) { | ||
| logger.error("Test failure", e); | ||
| return new TestResult(TEST_NAME, false, "Test failure: " + e.getMessage()); | ||
| } finally { | ||
| if (gpioOutControl != null) { | ||
| gpioOutControl.close(); | ||
| } | ||
| if (gpioInTest != null) { | ||
| gpioInTest.close(); | ||
| } | ||
| if (input1 != null) { | ||
| input1.close(); | ||
| } | ||
| if (input2 != null) { | ||
| input2.close(); | ||
| } | ||
| if (input3 != null) { | ||
| input3.close(); | ||
| } | ||
| if (input4 != null) { | ||
| input4.close(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static DataInGpioListener createInputListener(ProviderContext providerContext, int bcm) { | ||
| var input = createDigitalInput(providerContext.getContext(), bcm, PullResistance.PULL_DOWN, 0); | ||
| var listener = new DataInGpioListener(input); | ||
| input.addListener(listener); | ||
| return listener; | ||
| } | ||
|
|
||
| private static class DataInGpioListener implements DigitalStateChangeListener { | ||
| DigitalInput input; | ||
| DigitalStateChangeEvent event = null; | ||
|
|
||
| public DataInGpioListener(DigitalInput input) { | ||
| this.input = input; | ||
| } | ||
|
|
||
| @Override | ||
| public void onDigitalStateChange(DigitalStateChangeEvent event) { | ||
| this.event = event; | ||
| } | ||
|
|
||
| public DigitalStateChangeEvent getEvent() { | ||
| return event; | ||
| } | ||
|
|
||
| public void close() { | ||
| if (input != null) { | ||
| input.close(); | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,7 +99,7 @@ public DigitalInput initialize(Context context) throws InitializeException { | |
| throw new InitializeException("Debounce value of " + debounce + " is too large"); | ||
| } | ||
| var debounceAttribute = new LineAttribute(LineAttributeId.GPIO_V2_LINE_ATTR_ID_DEBOUNCE.getValue(), 0, 0, (int) debounce * 1000); | ||
| attributes.add(new LineConfigAttribute(debounceAttribute, 1L << bcm)); | ||
| attributes.add(new LineConfigAttribute(debounceAttribute, 1L)); | ||
| } | ||
|
Comment on lines
97
to
103
|
||
| flags |= switch (pull) { | ||
| case OFF -> 0; | ||
|
|
@@ -124,10 +124,11 @@ public DigitalInput initialize(Context context) throws InitializeException { | |
| public DigitalInput addListener(DigitalStateChangeListener... listener) { | ||
| logger.trace("{}-{} - Adding new listener", deviceName, bcm); | ||
| if (threadFactory == null) { | ||
| this.threadFactory = Thread.ofVirtual().name(deviceName + "-event-detection-pin-", bcm) | ||
| this.threadFactory = Thread.ofPlatform().name(deviceName + "-event-detection-pin-", bcm) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot can you explain this in more detail please? I thought the goal of virtual threads was exactly to step away from the platform thread limit? So how is this a problem that four virtual threads block the four threads on a four-core processor like the raspberry pi?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Virtual threads are designed to avoid the OS-thread-per-task limit by unmounting from their carrier thread whenever they hit a standard Java blocking call ( There is one critical exception: a virtual thread that calls a native method (JNI or Foreign Function) while blocking cannot be unmounted. It stays pinned to its carrier thread for the entire duration of the native call. This is documented in the JEP-444 notes as "pinning".
The ForkJoinPool that backs virtual threads defaults its carrier-thread count to Platform daemon threads each own a real OS thread. Blocking in a native call is normal for an OS thread — it only blocks that one thread, and the OS can schedule all others independently. That removes the 4-input ceiling entirely.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh wow, yeah, didn't realize we were using virtual threads. Makes absolute sense as i read about the pinning before.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great to know, I did not know the pinning was related to native calls. I guess that would fix the issue. |
||
| .daemon(true) | ||
| .uncaughtExceptionHandler(((_, e) -> logger.error(e.getMessage(), e))) | ||
| .factory(); | ||
| this.eventTaskProcessor = Executors.newThreadPerTaskExecutor(threadFactory); | ||
| this.eventTaskProcessor = Executors.newCachedThreadPool(threadFactory); | ||
| } | ||
| var watcher = new EventWatcher(chipFileDescriptor, PinEvent.BOTH, events -> { | ||
| for (DetectedEvent detectedEvent : events) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,9 +29,12 @@ | |
|
|
||
| import java.lang.foreign.Arena; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.Function; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.*; | ||
|
|
@@ -352,4 +355,116 @@ public void testApi() { | |
| assertEquals(5, pin.bcm()); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testMoreThanFourInputsReceiveEvents() throws InterruptedException { | ||
| // Regression test for the carrier-thread-pinning bug: | ||
| // | ||
| // Before the fix, EventWatcher used virtual threads. Virtual threads that call a | ||
| // blocking native method (poll()) are *pinned* to a ForkJoinPool carrier thread | ||
| // for the duration of the call. The carrier pool size defaults to the number of | ||
| // available CPU cores (4 on a Raspberry Pi 4). Once 4 pins are listening, all | ||
| // carrier threads are pinned and the 5th+ EventWatcher can never be scheduled. | ||
| // | ||
| // This test makes each watcher's first poll() call block until ALL numPins | ||
| // watchers are simultaneously blocked, then releases them. That directly mirrors | ||
| // the real scenario: concurrent blocking native poll() calls. | ||
| // | ||
| // With the old virtual-thread code on a <=4-core machine the test would time out | ||
| // at allWatchersBlockingLatch because the 5th watcher can never enter poll(). | ||
| // With the fixed platform-daemon-thread code every watcher runs on its own OS | ||
| // thread, all 5 block simultaneously, and the test completes. | ||
|
Comment on lines
+361
to
+376
|
||
| int numPins = 5; | ||
|
|
||
| // Latch that counts down once per watcher that reaches (and blocks inside) poll() | ||
| var allWatchersBlockingLatch = new CountDownLatch(numPins); | ||
| // Released by the test thread once all watchers are simultaneously blocking | ||
| var releaseWatchersLatch = new CountDownLatch(1); | ||
| // Counts down as each pin's listener fires the first HIGH event | ||
| var eventsDeliveredLatch = new CountDownLatch(numPins); | ||
|
|
||
| var pinReceivedEvent = new AtomicBoolean[numPins]; | ||
| for (int i = 0; i < numPins; i++) { | ||
| pinReceivedEvent[i] = new AtomicBoolean(false); | ||
| } | ||
| var lineInfoTestData = new IoctlNativeMock.IoctlTestData(LineInfo.class, (answer) -> { | ||
| LineInfo lineInfo = answer.getArgument(2); | ||
| return new LineInfo(("Test").getBytes(), ("FFM-Test").getBytes(), | ||
| lineInfo.offset(), 0, | ||
| PinFlag.INPUT.getValue(), | ||
| new LineAttribute[0]); | ||
| }); | ||
|
|
||
| // Each of the first numPins poll() calls (one per watcher thread) blocks until | ||
| // the test thread sees all watchers are blocked and releases them. Subsequent | ||
| // calls return immediately so the loop can continue delivering events. | ||
| var blockedWatcherCount = new AtomicInteger(0); | ||
| var pollingCallback = new Function<InvocationOnMock, PollingData>() { | ||
| @Override | ||
| public PollingData apply(InvocationOnMock answer) { | ||
| PollingData pollingData = answer.getArgument(0); | ||
| if (blockedWatcherCount.incrementAndGet() <= numPins) { | ||
| // Signal: this watcher is now blocked in poll(), simulating the real | ||
| // blocking native syscall that pins a virtual-thread carrier thread. | ||
| allWatchersBlockingLatch.countDown(); | ||
| try { | ||
| releaseWatchersLatch.await(5, TimeUnit.SECONDS); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
| return new PollingData(pollingData.fd(), pollingData.events(), (short) PollFlag.POLLIN); | ||
| } | ||
| }; | ||
| var pollingFile = new FileDescriptorNativeMock.FileDescriptorTestData("/dev/null", 42, ("Test").getBytes(), (answer) -> { | ||
| byte[] buffer = answer.getArgument(1); | ||
| var lineEvent = new LineEvent(1, PinEvent.RISING.getValue(), 3, 4, 5); | ||
| var memoryBuffer = Arena.ofAuto().allocate(LineEvent.LAYOUT); | ||
| try { | ||
| lineEvent.to(memoryBuffer); | ||
| } catch (Throwable e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| var lineBuffer = new byte[(int) LineEvent.LAYOUT.byteSize()]; | ||
| ByteBuffer.wrap(lineBuffer).put(memoryBuffer.asByteBuffer()); | ||
| System.arraycopy(lineBuffer, 0, buffer, 0, lineBuffer.length); | ||
| return buffer; | ||
| }); | ||
| try (var _ = FileDescriptorNativeMock.echo(GPIOCHIP_FILE, pollingFile); | ||
| var _ = IoctlNativeMock.echo(lineInfoTestData); | ||
| var _ = PollNativeMock.echo(pollingCallback)) { | ||
| List<Object> pins = new ArrayList<>(); | ||
| for (int i = 0; i < numPins; i++) { | ||
| var builder = DigitalInputConfigBuilder.newInstance(pi4j0) | ||
| .bus(-1) | ||
| .bcm(20 + i) | ||
| .debounce(0L) | ||
| .build(); | ||
| var pin = pi4j0.digitalInput().create(builder); | ||
| final int pinIndex = i; | ||
| pin.addListener(event -> { | ||
| if (event.state() == DigitalState.HIGH && pinReceivedEvent[pinIndex].compareAndSet(false, true)) { | ||
| eventsDeliveredLatch.countDown(); | ||
| } | ||
| }); | ||
| pins.add(pin); | ||
| } | ||
|
|
||
| // All numPins watcher threads must reach poll() simultaneously. | ||
| // With virtual threads on <=4 cores this assertion would time out because the | ||
| // 5th+ watcher is never scheduled while the first 4 pin the carrier pool. | ||
| assertTrue(allWatchersBlockingLatch.await(5, TimeUnit.SECONDS), | ||
| (numPins - allWatchersBlockingLatch.getCount()) + " of " + numPins + | ||
| " watcher threads reached poll(). " + | ||
| "Carrier thread pinning may have prevented remaining watchers from being scheduled."); | ||
|
|
||
| // Release all watchers so they return from poll() and process the event. | ||
| releaseWatchersLatch.countDown(); | ||
|
|
||
| // All numPins pins must deliver a state change event. | ||
| assertTrue(eventsDeliveredLatch.await(5, TimeUnit.SECONDS), | ||
| "Only " + (numPins - eventsDeliveredLatch.getCount()) + " of " + numPins + | ||
| " digital inputs delivered a state change event after poll() was released."); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@IAmNickNack do you agree with copilot here? It says the following: