|
1 | 1 | import asyncio |
2 | 2 | import unittest |
| 3 | +import select |
3 | 4 | from streampair import streampair |
4 | 5 |
|
5 | 6 | def async_test(f): |
@@ -49,6 +50,56 @@ async def test_async_streampair(self): |
49 | 50 | assert not b.any() |
50 | 51 | assert not a.any() |
51 | 52 |
|
| 53 | + def test_select_poll_compatibility(self): |
| 54 | + """Test that streampair works with select.poll()""" |
| 55 | + a, b = streampair() |
| 56 | + |
| 57 | + # Register stream with poll |
| 58 | + poller = select.poll() |
| 59 | + poller.register(a, select.POLLIN) |
| 60 | + |
| 61 | + # No data available initially |
| 62 | + events = poller.poll(0) |
| 63 | + assert len(events) == 0, f"Expected no events, got {events}" |
| 64 | + |
| 65 | + # Write data to b, should be readable from a |
| 66 | + b.write(b"test data") |
| 67 | + |
| 68 | + # Should now poll as readable |
| 69 | + events = poller.poll(0) |
| 70 | + assert len(events) == 1, f"Expected 1 event, got {events}" |
| 71 | + assert events[0][0] == a, "Event should be for stream a" |
| 72 | + assert events[0][1] & select.POLLIN, "Should be readable" |
| 73 | + |
| 74 | + # Read the data |
| 75 | + data = a.read() |
| 76 | + assert data == b"test data", f"Expected b'test data', got {data}" |
| 77 | + |
| 78 | + # Should no longer poll as readable |
| 79 | + events = poller.poll(0) |
| 80 | + assert len(events) == 0, f"Expected no events after read, got {events}" |
| 81 | + |
| 82 | + poller.unregister(a) |
| 83 | + |
| 84 | + @async_test |
| 85 | + async def test_streamreader_direct_usage(self): |
| 86 | + """Test that streampair can be used directly with asyncio.StreamReader""" |
| 87 | + a, b = streampair() |
| 88 | + |
| 89 | + # Create StreamReader directly on the streampair object |
| 90 | + reader = asyncio.StreamReader(a) |
| 91 | + |
| 92 | + # Write data in background task |
| 93 | + async def write_delayed(): |
| 94 | + await asyncio.sleep_ms(10) |
| 95 | + b.write(b"async test\n") |
| 96 | + |
| 97 | + asyncio.create_task(write_delayed()) |
| 98 | + |
| 99 | + # Should be able to read via StreamReader |
| 100 | + data = await asyncio.wait_for(reader.readline(), 1.0) |
| 101 | + assert data == b"async test\n", f"Expected b'async test\\n', got {data}" |
| 102 | + |
52 | 103 |
|
53 | 104 | if __name__ == "__main__": |
54 | 105 | unittest.main() |
0 commit comments