@@ -41,7 +41,7 @@ public structure Connection (α : Type) where
4141 /--
4242 The processing machine for HTTP 1.1
4343 -/
44- machine : Protocol.H1.Machine .response
44+ machine : Protocol.H1.Machine .sending
4545
4646/--
4747A request packet to be sent through the persistent connection channel
@@ -124,187 +124,10 @@ end PersistentConnection
124124
125125namespace Connection
126126
127- private inductive Recv
128- | bytes (x : Option ByteArray)
129- | timeout
130-
131- private def receiveWithTimeout [Transport α] (socket : α) (expect : UInt64)
132- (timeoutMs : Millisecond.Offset) :
133- Async Recv := do
134- Selectable.one #[
135- .case (Transport.recvSelector socket expect) (fun x => pure <| .bytes x),
136- .case (← Selector.sleep timeoutMs) (fun _ => pure <| .timeout)]
137-
138- private def processNeedMoreData
139- [Transport α] (config : Client.Config) (socket : α) (expect : Option Nat) :
140- Async (Except Protocol.H1.Machine.Error (Option ByteArray)) := do
141- try
142- let expect := expect
143- |>.getD config.defaultRequestBufferSize
144- |>.min config.maxRecvChunkSize
145-
146- let data ← receiveWithTimeout socket expect.toUInt64 config.readTimeout
147-
148- match data with
149- | .bytes (some bytes) => pure (.ok <| some bytes)
150- | .bytes none => pure (.ok <| none)
151- | .timeout => pure (.error Protocol.H1.Machine.Error.timeout)
152-
153- catch _ =>
154- pure (.error Protocol.H1.Machine.Error.timeout)
155-
156- private def handle [Transport α] (connection : Connection α) (config : Client.Config)
157- (requestChannel : CloseableChannel RequestPacket) : Async Unit := do
158-
127+ private def handle [Transport α] (connection : Connection α) (config : Client.Config) (requestChannel : CloseableChannel RequestPacket) : Async Unit := do
159128 let mut machine := connection.machine
160- let mut running := true
161129 let socket := connection.socket
162130
163- let mut responseStream ← Body.ByteStream.emptyWithCapacity
164- let mut requestTimer ← Interval.mk config.requestTimeout.val config.requestTimeout.property
165- let mut connectionTimer ← Sleep.mk config.keepAliveTimeout
166-
167- let mut currentRequest : Option RequestPacket := none
168- let mut receivedResponse := false
169- let mut reqStream := none
170- let mut waitingForRequest := true
171-
172- -- Wait for the first tick that is immediate
173- requestTimer.tick
174-
175- let mut requestTimerTask ← async requestTimer.tick
176- let connectionTimerTask ← async connectionTimer.wait
177-
178- while running do
179-
180- if waitingForRequest then
181- match ← await (← requestChannel.recv) with
182- | some packet =>
183- currentRequest := some packet
184- waitingForRequest := false
185-
186- machine := machine.sendMessage packet.request.head
187-
188- match packet.request.body with
189- | .bytes data => machine := machine.writeUserData #[Chunk.mk data #[]] |>.closeWriter
190- | .zero => machine := machine.closeWriter
191- | .stream stream => do
192- if let some size ← stream.getKnownSize then
193- machine := machine.setKnownSize size
194- reqStream := some stream
195-
196- requestTimer.reset
197-
198- | none =>
199- running := false
200- continue
201-
202- machine := machine.processRead.processWrite
203-
204- let (newMachine, events) := machine.takeEvents
205- machine := newMachine
206-
207- -- Sends the output of the machine to the socket in a vectored write.
208- if let some (newMachine, data) := machine.takeOutput then
209- machine := newMachine
210-
211- if data.size > 0 then
212- try
213- Transport.sendAll socket data.data
214- catch e =>
215- if let some packet := currentRequest then
216- packet.onError e
217- running := false
218-
219- for event in events do
220- match event with
221- | .needMoreData expect => do
222- try
223- match ← processNeedMoreData config socket expect with
224- | .ok (some bs) =>
225- machine := machine.feed bs
226- | .ok none =>
227- machine := machine.setFailure .connectionClosed
228- | .error _ => do
229- machine := machine.setFailure .timeout
230- catch e =>
231- if let some packet := currentRequest then
232- packet.onError e
233- running := false
234-
235- | .endHeaders head => do
236- if let some (.fixed n) := Protocol.H1.Machine.getMessageSize head then
237- responseStream.setKnownSize (some n)
238-
239- receivedResponse := true
240-
241- if let some packet := currentRequest then
242- let response := { head := machine.reader.messageHead, body := some (.stream responseStream) }
243- packet.onResponse response
244-
245- | .gotData final data =>
246- discard <| responseStream.write data.toByteArray
247-
248- if final then
249- responseStream.close
250-
251- | .chunkExt _ =>
252- pure ()
253-
254- | .failed e =>
255- if let some packet := currentRequest then
256- packet.onError (.userError (toString e))
257- pure ()
258-
259- | .close =>
260- running := false
261-
262- | .next =>
263- -- Request/response cycle complete, ready for next request
264- requestTimer.reset
265- responseStream ← Body.ByteStream.emptyWithCapacity
266- reqStream := none
267- receivedResponse := false
268- currentRequest := none
269- waitingForRequest := true
270-
271- -- Sends data from the request body.
272- if let some stream := reqStream then
273- if machine.isWriterOpened then
274- if machine.isReaderComplete ∧ events.isEmpty then
275- if let some data ← stream.recv then
276- machine := machine.writeUserData data
277- else
278- machine := machine.closeWriter
279- else
280- if ← stream.isClosed then
281- pure ()
282- else
283- match ← stream.tryRecv with
284- | some res => machine := machine.writeUserData res
285- | none => machine := machine.closeWriter
286-
287- -- Checks for things that can close the connection.
288- if ¬ waitingForRequest then
289- if (← requestTimerTask.isFinished) ∨ (← connectionTimerTask.isFinished) then
290- machine := machine.setFailure .timeout
291- if let some packet := currentRequest then
292- packet.onError (.userError "timeout" )
293- running := false
294-
295- -- End of the connection
296- connectionTimer.stop
297- requestTimer.stop
298-
299- requestChannel.close
300-
301- while true do
302- if let some x ← requestChannel.tryRecv then
303- x.onError (.userError "connection closed, cannot send more requests" )
304- else
305- break
306-
307-
308131
309132end Connection
310133
0 commit comments