@@ -218,6 +218,48 @@ pub fn op_http_upgrade_raw(
218218 Ok ( state. resource_table . add ( UpgradeStream :: new ( read, write) ) )
219219}
220220
221+ /// Upgrade a CONNECT request by sending a 200 response, awaiting the
222+ /// upgrade, and returning the stream resource with head bytes captured.
223+ #[ op2]
224+ #[ smi]
225+ pub async fn op_http_upgrade_raw_connect (
226+ state : Rc < RefCell < OpState > > ,
227+ external : * const c_void ,
228+ ) -> Result < ResourceId , HttpNextError > {
229+ let ( http, upgrade) = {
230+ // SAFETY: external is deleted before calling this op.
231+ let http =
232+ unsafe { take_external ! ( external, "op_http_upgrade_raw_connect" ) } ;
233+ let upgrade = http. upgrade ( ) ?;
234+ ( http, upgrade)
235+ } ;
236+
237+ // Send a 200 response to complete the CONNECT handshake.
238+ http. response_parts ( ) . status = StatusCode :: OK ;
239+ http. complete ( ) ;
240+
241+ let upgraded = upgrade. await ?;
242+ let ( stream, head_bytes) = extract_network_stream ( upgraded) ;
243+ let ( read_half, write_half) = stream. into_split ( ) ;
244+
245+ let resource =
246+ UpgradeStream :: new_connected ( read_half, write_half, head_bytes) ;
247+ Ok ( state. borrow_mut ( ) . resource_table . add ( resource) )
248+ }
249+
250+ /// Return the head bytes captured during a CONNECT upgrade.
251+ /// The bytes are returned exactly once; subsequent calls return empty.
252+ #[ op2]
253+ #[ buffer]
254+ pub fn op_http_upgrade_raw_get_head (
255+ state : & mut OpState ,
256+ #[ smi] rid : ResourceId ,
257+ ) -> Result < Vec < u8 > , HttpNextError > {
258+ let resource = state. resource_table . get :: < UpgradeStream > ( rid) ?;
259+ let bytes = resource. head_bytes . borrow_mut ( ) . take ( ) ;
260+ Ok ( bytes. map ( |b| b. to_vec ( ) ) . unwrap_or_default ( ) )
261+ }
262+
221263#[ op2]
222264#[ smi]
223265pub async fn op_http_upgrade_websocket_next (
@@ -1367,6 +1409,10 @@ enum UpgradeStreamWriteState {
13671409 OnUpgrade ,
13681410 AsyncMut < Option < ( NetworkStreamReadHalf , Bytes ) > > ,
13691411 ) ,
1412+ /// Used after a CONNECT upgrade where the 200 response was already sent
1413+ /// by hyper. Consumes and discards the HTTP response the application
1414+ /// writes (since it's redundant), then switches to Network mode.
1415+ ConsumeResponse ( BytesMut , NetworkStreamWriteHalf ) ,
13701416 Network ( NetworkStreamWriteHalf ) ,
13711417 /// The upgrade was rejected with a non-101 status code.
13721418 /// The response has been sent and the stream is now closed for writing.
@@ -1378,6 +1424,9 @@ struct UpgradeStream {
13781424 read : Rc < AsyncRefCell < Option < ( NetworkStreamReadHalf , Bytes ) > > > ,
13791425 write : AsyncRefCell < UpgradeStreamWriteState > ,
13801426 cancel_handle : CancelHandle ,
1427+ /// Head bytes extracted during a CONNECT upgrade, available via
1428+ /// `op_http_upgrade_raw_get_head`.
1429+ head_bytes : RefCell < Option < Bytes > > ,
13811430 /// Set to true when the upgrade was rejected with a non-101 status.
13821431 /// When rejected, reads return EOF and writes are silently ignored.
13831432 rejected : std:: cell:: Cell < bool > ,
@@ -1392,6 +1441,25 @@ impl UpgradeStream {
13921441 read,
13931442 write : AsyncRefCell :: new ( write) ,
13941443 cancel_handle : CancelHandle :: new ( ) ,
1444+ head_bytes : RefCell :: new ( None ) ,
1445+ rejected : std:: cell:: Cell :: new ( false ) ,
1446+ }
1447+ }
1448+
1449+ pub fn new_connected (
1450+ read_half : NetworkStreamReadHalf ,
1451+ write_half : NetworkStreamWriteHalf ,
1452+ head_bytes : Bytes ,
1453+ ) -> Self {
1454+ let read = Rc :: new ( AsyncRefCell :: new ( Some ( ( read_half, Bytes :: new ( ) ) ) ) ) ;
1455+ Self {
1456+ read,
1457+ write : AsyncRefCell :: new ( UpgradeStreamWriteState :: ConsumeResponse (
1458+ BytesMut :: with_capacity ( 128 ) ,
1459+ write_half,
1460+ ) ) ,
1461+ cancel_handle : CancelHandle :: new ( ) ,
1462+ head_bytes : RefCell :: new ( Some ( head_bytes) ) ,
13951463 rejected : std:: cell:: Cell :: new ( false ) ,
13961464 }
13971465 }
@@ -1459,14 +1527,16 @@ impl UpgradeStream {
14591527 Ok ( buf. len ( ) )
14601528 }
14611529 Ok ( httparse:: Status :: Complete ( n) ) => {
1462- let status_code = response. code . unwrap_or ( 200 ) ;
1530+ let status_code = response. code . unwrap_or ( 0 ) ;
14631531
1464- if status_code == StatusCode :: SWITCHING_PROTOCOLS . as_u16 ( ) {
1465- // Upgrade accepted - proceed with upgrade
1466- http. otel_info_set_status (
1467- StatusCode :: SWITCHING_PROTOCOLS . as_u16 ( ) ,
1468- ) ;
1469- http. response_parts ( ) . status = StatusCode :: SWITCHING_PROTOCOLS ;
1532+ // Accept 101 (WebSocket upgrade) and 200 (CONNECT tunnel)
1533+ if status_code == StatusCode :: SWITCHING_PROTOCOLS . as_u16 ( )
1534+ || status_code == StatusCode :: OK . as_u16 ( )
1535+ {
1536+ let status = StatusCode :: from_u16 ( status_code)
1537+ . unwrap_or ( StatusCode :: SWITCHING_PROTOCOLS ) ;
1538+ http. otel_info_set_status ( status. as_u16 ( ) ) ;
1539+ http. response_parts ( ) . status = status;
14701540
14711541 for header in response. headers {
14721542 http. response_parts ( ) . headers . append (
@@ -1526,6 +1596,44 @@ impl UpgradeStream {
15261596 Err ( e) => Err ( std:: io:: Error :: other ( e) ) ,
15271597 }
15281598 }
1599+ UpgradeStreamWriteState :: ConsumeResponse ( mut bytes, mut stream) => {
1600+ bytes. extend_from_slice ( buf) ;
1601+
1602+ let mut headers = [ httparse:: EMPTY_HEADER ; 16 ] ;
1603+ let mut response = httparse:: Response :: new ( & mut headers) ;
1604+ match response. parse ( & bytes) {
1605+ Ok ( httparse:: Status :: Partial ) => {
1606+ * wr = UpgradeStreamWriteState :: ConsumeResponse ( bytes, stream) ;
1607+ Ok ( buf. len ( ) )
1608+ }
1609+ Ok ( httparse:: Status :: Complete ( n) ) => {
1610+ // Response consumed. Forward any trailing bytes after
1611+ // the response headers to the network.
1612+ let trailing = & bytes[ n..] ;
1613+ if !trailing. is_empty ( ) {
1614+ let mut written = 0 ;
1615+ while written < trailing. len ( ) {
1616+ written +=
1617+ Pin :: new ( & mut stream) . write ( & trailing[ written..] ) . await ?;
1618+ }
1619+ }
1620+ let consumed_from_buf = n - ( bytes. len ( ) - buf. len ( ) ) ;
1621+ * wr = UpgradeStreamWriteState :: Network ( stream) ;
1622+ Ok ( consumed_from_buf)
1623+ }
1624+ Err ( _) => {
1625+ // Not an HTTP response — treat as raw data.
1626+ // Write everything accumulated so far to the network.
1627+ let all = bytes. freeze ( ) ;
1628+ let mut written = 0 ;
1629+ while written < all. len ( ) {
1630+ written += Pin :: new ( & mut stream) . write ( & all[ written..] ) . await ?;
1631+ }
1632+ * wr = UpgradeStreamWriteState :: Network ( stream) ;
1633+ Ok ( buf. len ( ) )
1634+ }
1635+ }
1636+ }
15291637 UpgradeStreamWriteState :: Network ( mut stream) => {
15301638 let r = Pin :: new ( & mut stream) . write ( buf) . await ;
15311639 * wr = UpgradeStreamWriteState :: Network ( stream) ;
@@ -1556,6 +1664,9 @@ impl UpgradeStream {
15561664 drop ( wr) ;
15571665 self . write ( if buf1. is_empty ( ) { buf2 } else { buf1 } ) . await
15581666 }
1667+ UpgradeStreamWriteState :: ConsumeResponse ( ..) => {
1668+ self . write ( if buf1. is_empty ( ) { buf2 } else { buf1 } ) . await
1669+ }
15591670 UpgradeStreamWriteState :: Network ( stream) => {
15601671 let bufs = [ std:: io:: IoSlice :: new ( buf1) , std:: io:: IoSlice :: new ( buf2) ] ;
15611672 stream. write_vectored ( & bufs) . await
0 commit comments