@@ -102,24 +102,29 @@ extension Driver {
102102 let msg = queued. taskMessage
103103 let connectionId = queued. connectionId
104104 let wireMsg : Message
105+ let progressChannelId : UInt64 ?
105106 switch msg {
106107 case . data( let channelId, let payload) :
107108 wireMsg = messageData ( channelId: channelId, item: payload, connectionId: connectionId)
109+ progressChannelId = channelId
108110 case . close( let channelId) :
109111 wireMsg = messageChannelClose ( channelId: channelId, connectionId: connectionId)
112+ progressChannelId = channelId
110113 case . grantCredit( let channelId, let bytes) :
111114 wireMsg = messageCredit (
112115 channelId: channelId,
113116 additional: bytes,
114117 connectionId: connectionId
115118 )
119+ progressChannelId = channelId
116120 case . schema( let methodId, let direction, let schemas) :
117121 wireMsg = messageSchema (
118122 methodId: methodId,
119123 direction: direction,
120124 schemas: schemas,
121125 connectionId: connectionId
122126 )
127+ progressChannelId = nil
123128 case . response( let requestId, let payload, let methodId, let responseSchemaClosure) :
124129 // Advertise the response schema at THIS sequential send point (not in the
125130 // concurrent dispatch task): under pipelining many responses for a method
@@ -160,8 +165,15 @@ extension Driver {
160165 return
161166 }
162167 wireMsg = response
168+ progressChannelId = nil
163169 }
164170 try await sendOrEnqueue ( wireMsg)
171+ if let progressChannelId {
172+ await markChannelRequestProgress (
173+ connectionId: connectionId,
174+ channelId: progressChannelId
175+ )
176+ }
165177 }
166178
167179 /// Handle a command from a lane or connection handle.
@@ -245,37 +257,12 @@ extension Driver {
245257 return
246258 }
247259
248- guard let timeout else {
249- return
250- }
251-
252- let timeoutNs = Self . timeoutToNanoseconds ( timeout)
253- let capturedState = state
254- let capturedConduit = conduit
255- let capturedConnectionId = connectionId
256- let timeoutTask = Task {
257- do {
258- try await Task . sleep ( nanoseconds: timeoutNs)
259- } catch {
260- return
261- }
262- guard let pending = await capturedState. claimPendingResponse (
263- requestId,
264- reason: " timeout "
265- ) else {
266- return
267- }
268- pending. timeoutTask? . cancel ( )
269- warnLog ( " request timed out request_id= \( requestId) timeout_s= \( timeout) " )
270- pending. responseTx ( . failure( . timeout) )
271- try ? await capturedConduit. send (
272- messageCancel ( requestId: requestId, connectionId: capturedConnectionId)
260+ if let timeout {
261+ await installRequestIdleTimeout (
262+ requestId: requestId,
263+ timeout: timeout
273264 )
274265 }
275- let installed = await state. setPendingTimeoutTask ( requestId, timeoutTask: timeoutTask)
276- if !installed {
277- timeoutTask. cancel ( )
278- }
279266 case . openLane( let settings, let metadata, let dispatcher, let responseTx) :
280267 let isClosed = await state. isConnectionClosed ( )
281268 guard !isClosed else {
@@ -396,38 +383,12 @@ extension Driver {
396383
397384 pendingCalls. removeFirst ( )
398385
399- guard let timeout = call. timeout else {
400- continue
401- }
402-
403- let timeoutNs = Self . timeoutToNanoseconds ( timeout)
404- let capturedState = state
405- let capturedConduit = conduit
406- let capturedConnectionId = call. connectionId
407- let requestId = call. requestId
408- let timeoutTask = Task {
409- do {
410- try await Task . sleep ( nanoseconds: timeoutNs)
411- } catch {
412- return
413- }
414- guard let pending = await capturedState. claimPendingResponse (
415- requestId,
416- reason: " timeout "
417- ) else {
418- return
419- }
420- pending. timeoutTask? . cancel ( )
421- warnLog ( " request timed out request_id= \( requestId) timeout_s= \( timeout) " )
422- pending. responseTx ( . failure( . timeout) )
423- try ? await capturedConduit. send (
424- messageCancel ( requestId: requestId, connectionId: capturedConnectionId)
386+ if let timeout = call. timeout {
387+ await installRequestIdleTimeout (
388+ requestId: call. requestId,
389+ timeout: timeout
425390 )
426391 }
427- let installed = await state. setPendingTimeoutTask ( requestId, timeoutTask: timeoutTask)
428- if !installed {
429- timeoutTask. cancel ( )
430- }
431392 }
432393 }
433394
@@ -450,4 +411,63 @@ extension Driver {
450411 pendingTaskMessages. removeFirst ( )
451412 }
452413 }
414+
415+ // r[impl rpc.timeout.idle-progress]
416+ func installRequestIdleTimeout(
417+ requestId: UInt64 ,
418+ timeout: TimeInterval
419+ ) async {
420+ let timeoutNs = Self . timeoutToNanoseconds ( timeout)
421+ let timeoutTask = Task { [ weak self] in
422+ do {
423+ try await Task . sleep ( nanoseconds: timeoutNs)
424+ } catch {
425+ return
426+ }
427+ await self ? . expireRequestForIdleTimeout ( requestId: requestId, timeout: timeout)
428+ }
429+ let replacement = await state. replacePendingTimeoutTask (
430+ requestId,
431+ timeoutTask: timeoutTask
432+ )
433+ guard replacement. installed else {
434+ timeoutTask. cancel ( )
435+ return
436+ }
437+ replacement. previous? . cancel ( )
438+ }
439+
440+ // r[impl rpc.timeout.idle-progress]
441+ func markChannelRequestProgress( connectionId: UInt64 , channelId: UInt64 ) async {
442+ let contexts = await state. pendingTimeoutContexts (
443+ connectionId: connectionId,
444+ channelId: channelId
445+ )
446+ for context in contexts {
447+ await installRequestIdleTimeout (
448+ requestId: context. requestId,
449+ timeout: context. timeout
450+ )
451+ }
452+ }
453+
454+ // r[impl rpc.timeout.idle-progress]
455+ // r[impl rpc.request.scope.terminal]
456+ // r[impl rpc.request.scope.channels]
457+ func expireRequestForIdleTimeout( requestId: UInt64 , timeout: TimeInterval ) async {
458+ guard let pending = await state. claimPendingResponse ( requestId, reason: " timeout " ) else {
459+ return
460+ }
461+ pending. timeoutTask? . cancel ( )
462+ warnLog ( " request timed out request_id= \( requestId) timeout_s= \( timeout) " )
463+ await terminateRequestChannels (
464+ connectionId: pending. request. connectionId,
465+ channelIds: pending. request. channels,
466+ error: . timedOut
467+ )
468+ pending. responseTx ( . failure( . timeout) )
469+ try ? await conduit. send (
470+ messageCancel ( requestId: requestId, connectionId: pending. request. connectionId)
471+ )
472+ }
453473}
0 commit comments