Skip to content

Commit 11bf523

Browse files
xerialclaude
andcommitted
feature: Add portable idle/read timeout to the Native HTTP server
Replace the dropped SO_RCVTIMEO (macOS timeval layout bug) with poll()-based timeouts (posixlib's poll uses a millisecond CInt — no timeval trap, portable). Fixes the long-standing limitation where an idle or silently-dropped client pinned a worker thread forever. - NativeSocket: waitReadable(fd, timeoutMillis) via poll (1 readable / 0 timeout / -1 hangup, checking POLLHUP/POLLERR/POLLNVAL); enableKeepAlive (SO_KEEPALIVE). - NativeServerConfig: idleTimeoutMillis + readTimeoutMillis (default 30s) + builders. - HttpConnectionReader: readChunk thunk takes an `idle` flag (empty buffer = awaiting a new request) so the server applies the idle vs read timeout. - handleConnection: enableKeepAlive + polling read thunk; an idle keep-alive connection closes after idleTimeoutMillis, a stalled mid-request after readTimeoutMillis. - streamSse: probe the socket between latch waits so an idle client disconnect is detected and the subscription cancelled (frees the worker), instead of parking until the next failed write. WebSocket clean disconnects already unblock its blocking recv; half-open WS heartbeat stays a follow-up (SO_KEEPALIVE helps at the OS level). Tests: idle keep-alive closed after timeout; incomplete request -> 400 after read timeout. uniNative/test: 1399 tests, 0 failed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 68f600c commit 11bf523

6 files changed

Lines changed: 164 additions & 11 deletions

File tree

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Native HTTP server — portable idle/read timeout
2+
3+
## Context
4+
The Native server (#574) used blocking `recv` with no timeout — `SO_RCVTIMEO` was dropped because
5+
macOS's `timeval`/`suseconds_t` layout differs from posixlib's, making `recv` return immediately. So
6+
an idle or silently-dropped client pinned a worker thread indefinitely (the recurring SSE/keep-alive
7+
limitation noted across #574/#575/#576).
8+
9+
Fix: use `poll()` (posixlib 0.5.12), whose timeout is a plain millisecond `CInt` — no `timeval` trap,
10+
portable across macOS and Linux.
11+
12+
## Changes (all Native)
13+
- **`NativeSocket`**`waitReadable(fd, timeoutMillis): Int` (1 readable / 0 timeout / -1 hangup,
14+
checking `POLLHUP|POLLERR|POLLNVAL`); `enableKeepAlive(fd)` (`SO_KEEPALIVE`, best-effort OS reaping
15+
of dead peers).
16+
- **`NativeServerConfig`**`idleTimeoutMillis` (default 30000) + `readTimeoutMillis` (30000) +
17+
`withIdleTimeoutMillis`/`withReadTimeoutMillis`.
18+
- **`HttpConnectionReader`** — the `readChunk` thunk now takes an `idle` flag (true when the buffer is
19+
empty = waiting for a new request), so the caller applies the idle vs read timeout.
20+
- **`NativeHttpServer.handleConnection`**`enableKeepAlive`, then a polling read thunk: idle
21+
keep-alive waits `idleTimeoutMillis`, mid-request reads wait `readTimeoutMillis`; a timeout/hangup
22+
yields an empty chunk → the reader ends the connection (idle → Closed, mid-request → 400).
23+
- **`streamSse`** — replaces the bare `done.await()` with `done.await(idleTimeoutMillis)` intervals,
24+
probing the socket (non-blocking `waitReadable`) between waits; a disconnected peer cancels the
25+
subscription so the worker is freed instead of parked until the next failed write.
26+
27+
WebSocket is unchanged: a clean disconnect already unblocks its blocking `recv` (returns 0). Half-open
28+
WS detection (ping/pong) remains a separate follow-up; `SO_KEEPALIVE` helps at the OS level.
29+
30+
## Verification
31+
`uniNative/test` (1399 tests, 0 failed). New tests: an idle keep-alive connection is closed after
32+
`idleTimeoutMillis`; an incomplete request is closed (400) after `readTimeoutMillis`.
33+
34+
## Follow-ups (remaining)
35+
Native libcurl client bug; cross-platform WebSocket client; WS ping/pong heartbeat; permessage-deflate.

uni/.native/src/main/scala/wvlet/uni/http/NativeHttpProtocol.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,19 @@ private[http] enum ReadResult:
3030
* by `Content-Length`; chunked request bodies are not supported (MVP) and yield a `BadRequest`.
3131
*
3232
* @param readChunk
33-
* reads the next batch of bytes from the socket; an empty array means EOF.
33+
* reads the next batch of bytes from the socket; an empty array means EOF or timeout. The `idle`
34+
* argument is true when waiting for the first bytes of a new request (the buffer is empty) and
35+
* false once a request has started arriving, so the caller can apply an idle vs a read timeout.
3436
*/
35-
private[http] class HttpConnectionReader(readChunk: () => Array[Byte], maxRequestSize: Int):
37+
private[http] class HttpConnectionReader(readChunk: Boolean => Array[Byte], maxRequestSize: Int):
3638

3739
// Amortized-O(1) append buffer (indexed scan, single slice per request) — avoids the O(n^2) of
3840
// repeatedly concatenating Array[Byte].
3941
private val buf = mutable.ArrayBuffer.empty[Byte]
4042

41-
/** Append another chunk to the buffer. Returns false on EOF. */
43+
/** Append another chunk to the buffer. Returns false on EOF/timeout. */
4244
private def fill(): Boolean =
43-
val chunk = readChunk()
45+
val chunk = readChunk(buf.isEmpty)
4446
if chunk.isEmpty then
4547
false
4648
else

uni/.native/src/main/scala/wvlet/uni/http/NativeHttpServer.scala

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,23 @@ class NativeHttpServer(config: NativeServerConfig) extends HttpServer with LogSu
9595

9696
private def handleConnection(clientFd: Int): Unit =
9797
try
98+
NativeSocket.enableKeepAlive(clientFd)
99+
// Poll before recv so an idle/slow/disconnected client can't pin a worker forever: an idle
100+
// keep-alive connection waits up to idleTimeoutMillis for the next request; once a request has
101+
// started arriving, subsequent reads wait up to readTimeoutMillis. A timeout (or hangup) maps
102+
// to an empty chunk, which the reader treats as end-of-connection.
98103
val reader = HttpConnectionReader(
99-
() => NativeSocket.recvChunk(clientFd),
104+
idle =>
105+
val timeout =
106+
if idle then
107+
config.idleTimeoutMillis
108+
else
109+
config.readTimeoutMillis
110+
if NativeSocket.waitReadable(clientFd, timeout) == 1 then
111+
NativeSocket.recvChunk(clientFd)
112+
else
113+
Array.emptyByteArray
114+
,
100115
config.maxRequestSize
101116
)
102117
var continue = true
@@ -164,10 +179,9 @@ class NativeHttpServer(config: NativeServerConfig) extends HttpServer with LogSu
164179
* long-lived by design, so no handler-await timeout is applied here). A failed chunk write
165180
* cancels the subscription. Returns whether the connection is still usable for keep-alive.
166181
*
167-
* Limitation (blocking model): a client that disconnects while the stream is *idle* (no event to
168-
* write) is not detected until the next event fails to write, so the worker stays parked in
169-
* `done.await()` — each live stream pins one worker thread. A portable idle/read timeout is a
170-
* follow-up (see the SSE-streaming plan).
182+
* The worker is parked until the stream terminates, but it periodically probes the socket (every
183+
* idleTimeoutMillis) so a client that disconnects while the stream is *idle* is detected and the
184+
* subscription cancelled, rather than leaking the worker until the next event fails to write.
171185
*/
172186
private def streamSse(clientFd: Int, response: Response, keepAlive: Boolean): Boolean =
173187
if !NativeSocket.sendAll(clientFd, HttpResponseWriter.serializeSseHeaders(response, keepAlive))
@@ -205,9 +219,23 @@ class NativeHttpServer(config: NativeServerConfig) extends HttpServer with LogSu
205219
done.countDown()
206220
}
207221
)
208-
done.await()
222+
// Wait for the stream to terminate, but probe the socket between waits so an idle client
223+
// disconnect is detected promptly (a disconnected peer reads as readable-then-EOF or a hangup).
224+
var alive = true
225+
while alive && !done.await(config.idleTimeoutMillis.toLong, TimeUnit.MILLISECONDS) do
226+
NativeSocket.waitReadable(clientFd, 0) match
227+
case -1 =>
228+
alive = false // peer hung up
229+
case 1 =>
230+
// SSE clients shouldn't send; a readable socket that yields no bytes is EOF (disconnect).
231+
if NativeSocket.recvChunk(clientFd).isEmpty then
232+
alive = false
233+
case _ =>
234+
() // genuinely idle but still connected
235+
if !alive then
236+
subscription.cancel
209237
// Terminate the chunked body if the client is still there.
210-
ok.get() && NativeSocket.sendAll(clientFd, HttpResponseWriter.finalChunk)
238+
ok.get() && alive && NativeSocket.sendAll(clientFd, HttpResponseWriter.finalChunk)
211239

212240
/**
213241
* Run the handler and block for its single response. The handler may be asynchronous (Rx), so

uni/.native/src/main/scala/wvlet/uni/http/NativeServer.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ case class NativeServerConfig(
5858
workerThreads: Int = 16,
5959
// Max time to wait for a handler's Rx to produce a response before returning 503
6060
handlerTimeoutMillis: Long = 30000,
61+
// Max time a kept-alive connection may sit idle (awaiting the next request) before it is closed
62+
idleTimeoutMillis: Int = 30000,
63+
// Max time to wait for more bytes once a request has started arriving before closing
64+
readTimeoutMillis: Int = 30000,
6165
// WebSocket routes, matched by path during the HTTP upgrade handshake
6266
override val webSocketRoutes: Seq[WebSocketRoute] = Nil,
6367
// Maximum size (bytes) of an inbound WebSocket message
@@ -104,6 +108,14 @@ case class NativeServerConfig(
104108
require(millis > 0, "handlerTimeoutMillis must be positive")
105109
copy(handlerTimeoutMillis = millis)
106110

111+
def withIdleTimeoutMillis(millis: Int): NativeServerConfig =
112+
require(millis > 0, "idleTimeoutMillis must be positive")
113+
copy(idleTimeoutMillis = millis)
114+
115+
def withReadTimeoutMillis(millis: Int): NativeServerConfig =
116+
require(millis > 0, "readTimeoutMillis must be positive")
117+
copy(readTimeoutMillis = millis)
118+
107119
/** Register a WebSocket route; the factory creates a fresh handler per accepted connection. */
108120
def withWebSocketRoute(path: String)(
109121
handlerFactory: Request => WebSocketHandler

uni/.native/src/main/scala/wvlet/uni/http/NativeSocket.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import scala.scalanative.libc.string as cstring
1717
import scala.scalanative.posix.arpa.inet
1818
import scala.scalanative.posix.netinet.in.{in_addr, sockaddr_in}
1919
import scala.scalanative.posix.netinet.inOps.*
20+
import scala.scalanative.posix.poll.*
21+
import scala.scalanative.posix.pollOps.*
2022
import scala.scalanative.posix.sys.socket as csocket
2123
import scala.scalanative.posix.sys.socket.{sockaddr, socklen_t}
2224
import scala.scalanative.posix.unistd
@@ -116,6 +118,42 @@ private[http] object NativeSocket:
116118
null.asInstanceOf[Ptr[socklen_t]]
117119
)
118120

121+
/** Enable TCP keep-alive so the OS eventually reaps a silently-dropped peer (best effort). */
122+
def enableKeepAlive(fd: Int): Unit =
123+
val optval = stackalloc[CInt]()
124+
!optval = 1
125+
csocket.setsockopt(
126+
fd,
127+
csocket.SOL_SOCKET,
128+
csocket.SO_KEEPALIVE,
129+
optval.asInstanceOf[Ptr[Byte]],
130+
sizeof[CInt].toUInt
131+
)
132+
133+
/**
134+
* Wait until `fd` is readable or `timeoutMillis` elapses, using `poll` (a portable millisecond
135+
* timeout — avoids the macOS `SO_RCVTIMEO`/`timeval` layout trap). Returns 1 if readable, 0 on
136+
* timeout, -1 if the peer hung up or `poll` errored.
137+
*/
138+
def waitReadable(fd: Int, timeoutMillis: Int): Int =
139+
val fds = stackalloc[struct_pollfd]()
140+
fds.fd = fd
141+
fds.events = POLLIN.toShort
142+
fds.revents = 0.toShort
143+
val rc = poll(fds, 1.toUInt, timeoutMillis)
144+
if rc < 0 then
145+
-1
146+
else if rc == 0 then
147+
0
148+
else
149+
val re = fds.revents.toInt
150+
if (re & (POLLHUP | POLLERR | POLLNVAL)) != 0 then
151+
-1
152+
else if (re & POLLIN) != 0 then
153+
1
154+
else
155+
0
156+
119157
/**
120158
* Receive up to ChunkSize bytes. Returns an empty array on EOF or error (caller treats both as
121159
* end-of-connection).

uni/.native/src/test/scala/wvlet/uni/http/NativeServerTest.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,44 @@ class NativeServerTest extends UniTest:
292292
}
293293
}
294294

295+
test("should close an idle keep-alive connection after the idle timeout") {
296+
NativeServer
297+
.withHandler(_ => Response.ok("ok"))
298+
.withIdleTimeoutMillis(300)
299+
.withPort(0)
300+
.start { server =>
301+
val fd = connectLoopback(server.localPort)
302+
try
303+
// Keep-alive request (no Connection: close), so the server waits for a follow-up request.
304+
NativeSocket.sendAll(
305+
fd,
306+
"GET /x HTTP/1.1\r\nHost: localhost\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)
307+
)
308+
NativeSocket.recvChunk(fd).nonEmpty shouldBe true // the 200 response
309+
// Now stay idle; the server should close the connection after the idle timeout.
310+
val start = System.currentTimeMillis()
311+
var chunk = NativeSocket.recvChunk(fd)
312+
while chunk.nonEmpty do
313+
chunk = NativeSocket.recvChunk(fd)
314+
val elapsed = System.currentTimeMillis() - start
315+
(elapsed < 5000) shouldBe true
316+
finally
317+
NativeSocket.close(fd)
318+
}
319+
}
320+
321+
test("should close a connection with an incomplete request after the read timeout") {
322+
NativeServer
323+
.withHandler(_ => Response.ok("ok"))
324+
.withReadTimeoutMillis(300)
325+
.withPort(0)
326+
.start { server =>
327+
// Headers are never terminated, so the read times out mid-request.
328+
val response = request(server.localPort, "GET /x HTTP/1.1\r\nHost: localhost\r\n")
329+
response.status shouldBe 400
330+
}
331+
}
332+
295333
// ---- WebSocket ----
296334

297335
/**

0 commit comments

Comments
 (0)