@@ -124,10 +124,148 @@ end PersistentConnection
124124
125125namespace Connection
126126
127+ private inductive Recv
128+ | bytes (x : Option ByteArray)
129+ | timeout
130+
131+ private def receiveWithTimeout [Transport α] (socket : α) (expect : UInt64)
132+ (timeoutMs : Millisecond.Offset) (req : Timestamp) (all : Timestamp) :
133+ Async Recv := do
134+ Selectable.one #[
135+ .case (Transport.recvSelector socket expect) (fun x => pure <| .bytes x),
136+ .case (← Selector.sleep timeoutMs) (fun _ => pure <| .timeout),
137+ .case (← Selector.sleep (req - (← Timestamp.now) |>.toMilliseconds)) (fun _ => pure <| .timeout),
138+ .case (← Selector.sleep (all - (← Timestamp.now) |>.toMilliseconds)) (fun _ => pure <| .timeout)]
139+
140+ private def processNeedMoreData
141+ [Transport α] (config : Config) (socket : α) (expect : Option Nat)
142+ (req : Timestamp) (all : Timestamp):
143+ Async (Except Protocol.H1.Error (Option ByteArray)) := do
144+ try
145+ let expect := expect
146+ |>.getD config.defaultRequestBufferSize
147+ |>.min config.maxRecvChunkSize
148+
149+ let data ← receiveWithTimeout socket expect.toUInt64 config.readTimeout req all
150+
151+ match data with
152+ | .bytes (some bytes) => pure (.ok <| some bytes)
153+ | .bytes none => pure (.ok <| none)
154+ | .timeout => pure (.error .timeout)
155+
156+ catch _ =>
157+ pure (.error .timeout)
158+
127159private def handle [Transport α] (connection : Connection α) (config : Client.Config) (requestChannel : CloseableChannel RequestPacket) : Async Unit := do
128160 let mut machine := connection.machine
129161 let socket := connection.socket
130162
163+ let mut responseStream ← Body.ByteStream.emptyWithCapacity
164+ let mut waitingForRequest := true
165+ let mut reqStream := none
166+ let mut requestTimer := (← Timestamp.now) + config.requestTimeout.val
167+ let mut connectionTimer := (← Timestamp.now) + config.keepAliveTimeout.val
168+
169+ let mut currentRequest := none
170+
171+ while ¬machine.halted do
172+ if waitingForRequest then
173+ match ← await (← requestChannel.recv) with
174+ | some packet =>
175+ currentRequest := some packet
176+ waitingForRequest := false
177+
178+ machine := machine.send packet.request.head
179+
180+ match packet.request.body with
181+ | .bytes data => machine := machine.sendData #[Chunk.mk data #[]] |>.userClosedBody
182+ | .zero => machine := machine.userClosedBody
183+ | .stream stream =>
184+ if let some size ← stream.getKnownSize then
185+ machine := machine.setKnownSize size
186+
187+ reqStream := some stream
188+
189+ | none =>
190+ break
191+
192+ let (newMachine, step) := machine.step
193+ machine := newMachine
194+
195+ if step.output.size > 0 then
196+ try
197+ Transport.sendAll socket step.output.data
198+ catch e =>
199+ if let some packet := currentRequest then
200+ packet.onError e
201+
202+ for event in step.events do
203+ match event with
204+ | .needMoreData expect => do
205+ try
206+ match ← processNeedMoreData config socket expect requestTimer connectionTimer with
207+ | .ok (some bs) => machine := machine.feed bs
208+ | .ok none => machine := machine.noMoreInput
209+ | .error _ => machine := machine.closeWriter.closeReader.userClosedBody
210+
211+ catch e =>
212+ if let some packet := currentRequest then
213+ packet.onError e
214+
215+ | .endHeaders head => do
216+ if let some (.fixed n) := Protocol.H1.Machine.getMessageSize head then
217+ responseStream.setKnownSize (some n)
218+
219+ if let some packet := currentRequest then
220+ let response := { head := machine.reader.messageHead, body := some (.stream responseStream) }
221+ packet.onResponse response
222+
223+ | .gotData final data =>
224+ discard <| responseStream.write data.toByteArray
225+
226+ if final then
227+ responseStream.close
228+
229+ | .chunkExt _ =>
230+ pure ()
231+
232+ | .failed e =>
233+ if let some packet := currentRequest then
234+ packet.onError (.userError (toString e))
235+
236+ | .close =>
237+ pure ()
238+
239+ | .next =>
240+ requestTimer := (← Timestamp.now) + config.requestTimeout.val
241+ responseStream ← Body.ByteStream.emptyWithCapacity
242+ reqStream := none
243+ currentRequest := none
244+ waitingForRequest := true
245+
246+ if let some stream := reqStream then
247+ if ¬machine.writer.isClosed then
248+ if machine.isReaderComplete then
249+ if let some data ← stream.recv then
250+ machine := machine.sendData data
251+ else
252+ machine := machine.userClosedBody
253+ else
254+ if ← stream.isClosed then
255+ pure ()
256+ else
257+ match ← stream.tryRecv with
258+ | some res => machine := machine.sendData res
259+ | none => machine := machine.userClosedBody
260+
261+ responseStream.close
262+ requestChannel.close
263+
264+ while true do
265+ if let some x ← requestChannel.tryRecv then
266+ x.onError (.userError "connection closed, cannot send more requests" )
267+ else
268+ break
131269
132270end Connection
133271
0 commit comments