@@ -6,29 +6,22 @@ use std::{
66
77use compio_buf:: BufResult ;
88use compio_driver:: { SharedFd , ToSharedFd , op:: Accept } ;
9- use compio_runtime:: { JoinHandle , Submit } ;
9+ use compio_runtime:: Submit ;
1010use futures_util:: { FutureExt , Stream , stream:: FusedStream } ;
1111use socket2:: Socket as Socket2 ;
1212
1313use crate :: Socket ;
1414
15- #[ allow( clippy:: large_enum_variant) ]
16- enum IncomingState {
17- Idle ,
18- CreatingSocket ( JoinHandle < io:: Result < Socket2 > > ) ,
19- Accepting ( Submit < Accept < SharedFd < Socket2 > > > ) ,
20- }
21-
2215pub struct Incoming < ' a > {
2316 listener : & ' a Socket ,
24- state : IncomingState ,
17+ state : Option < Submit < Accept < SharedFd < Socket2 > > > > ,
2518}
2619
2720impl < ' a > Incoming < ' a > {
2821 pub fn new ( listener : & ' a Socket ) -> Self {
2922 Self {
3023 listener,
31- state : IncomingState :: Idle ,
24+ state : None ,
3225 }
3326 }
3427}
@@ -40,44 +33,26 @@ impl Stream for Incoming<'_> {
4033 let this = self . get_mut ( ) ;
4134 loop {
4235 match & mut this. state {
43- IncomingState :: Idle => {
36+ None => {
4437 let domain = this. listener . local_addr ( ) . map ( |addr| addr. domain ( ) ) ?;
4538 let ty = this. listener . socket . r#type ( ) ?;
4639 let protocol = this. listener . socket . protocol ( ) ?;
47- let handle =
48- compio_runtime:: spawn_blocking ( move || Socket2 :: new ( domain, ty, protocol) ) ;
49- this. state = IncomingState :: CreatingSocket ( handle) ;
40+ let socket = Socket2 :: new ( domain, ty, protocol) ?;
41+ let op =
42+ compio_runtime:: submit ( Accept :: new ( this. listener . to_shared_fd ( ) , socket) ) ;
43+ this. state = Some ( op) ;
5044 }
51- IncomingState :: CreatingSocket ( handle) => match ready ! ( handle. poll_unpin( cx) ) {
52- Ok ( Ok ( socket) ) => {
53- let op = compio_runtime:: submit ( Accept :: new (
54- this. listener . to_shared_fd ( ) ,
55- socket,
56- ) ) ;
57- this. state = IncomingState :: Accepting ( op) ;
58- }
59- Ok ( Err ( e) ) => {
60- this. state = IncomingState :: Idle ;
61- return Poll :: Ready ( Some ( Err ( e) ) ) ;
62- }
63- Err ( e) => {
64- this. state = IncomingState :: Idle ;
65- e. resume_unwind ( ) ;
66- // The background task was cancelled; terminate the stream.
67- return Poll :: Ready ( None ) ;
68- }
69- } ,
70- IncomingState :: Accepting ( op) => {
45+ Some ( op) => {
7146 let BufResult ( res, op) = ready ! ( op. poll_unpin( cx) ) ;
7247 match res {
7348 Ok ( _) => {
74- this. state = IncomingState :: Idle ;
49+ this. state = None ;
7550 op. update_context ( ) ?;
7651 let ( accept_sock, _) = op. into_addr ( ) ?;
7752 return Poll :: Ready ( Some ( Ok ( Socket :: from_socket2 ( accept_sock) ?) ) ) ;
7853 }
7954 Err ( e) => {
80- this. state = IncomingState :: Idle ;
55+ this. state = None ;
8156 return Poll :: Ready ( Some ( Err ( e) ) ) ;
8257 }
8358 }
0 commit comments