@@ -267,7 +267,7 @@ func (tm *TunnelManager) listenAndServe(config TunnelPassiveConfig) {
267267 tm .poolLock .Unlock ()
268268
269269 tm .poolLock .RLock ()
270- t := NewTunnelTransport (tm .writePool [client ], tm .readPool [client ], conn , client )
270+ t := NewTunnelTransport (tm .writePool [client ], tm .readPool [client ], conn , PassiveMode , client )
271271 tm .poolLock .RUnlock ()
272272
273273 t .SetCCIDChannel (tm .newCCIDChannel )
@@ -367,7 +367,7 @@ func (tm *TunnelManager) connectAndServe(config TunnelActiveConfig, client Clien
367367 tm .poolLock .Unlock ()
368368
369369 tm .poolLock .RLock ()
370- t := NewTunnelTransport (tm .writePool [client ], tm .readPool [client ], conn , client )
370+ t := NewTunnelTransport (tm .writePool [client ], tm .readPool [client ], conn , ActiveMode , client )
371371 tm .poolLock .RUnlock ()
372372
373373 tm .ttPool = append (tm .ttPool , t )
@@ -427,6 +427,7 @@ type TunnelTransport struct {
427427 clientID ClientID
428428
429429 quit chan struct {}
430+ mode int
430431
431432 mutex sync.Locker
432433 writeStopped bool
@@ -445,7 +446,7 @@ type TunnelTransport struct {
445446 rw io.ReadWriteCloser
446447}
447448
448- func NewTunnelTransport (wp , rp chan <- chan * GotaFrame , rw io.ReadWriteCloser , clientID ... ClientID ) * TunnelTransport {
449+ func NewTunnelTransport (wp , rp chan <- chan * GotaFrame , rw io.ReadWriteCloser , mode int , clientID ... ClientID ) * TunnelTransport {
449450 var c ClientID
450451 if clientID != nil {
451452 c = clientID [0 ]
@@ -462,6 +463,7 @@ func NewTunnelTransport(wp, rp chan<- chan *GotaFrame, rw io.ReadWriteCloser, cl
462463 mutex : & sync.Mutex {},
463464
464465 clientID : c ,
466+ mode : mode ,
465467
466468 newCCIDChannel : nil ,
467469
@@ -497,7 +499,14 @@ func (t *TunnelTransport) readFromPeerTunnel() {
497499 for {
498500 gf , err := ReadGotaFrame (t .rw )
499501 if err != nil {
500- log .Errorf ("TT: Received gota frame error, stop this worker, error: %s" , err )
502+ log .Errorf ("TT: Error: %s" , err )
503+ select {
504+ case <- t .quit :
505+ return
506+ default :
507+ log .Errorf ("TT: Received gota frame error, stop this worker, error: %s" , err )
508+ t .Stop ()
509+ }
501510 return
502511 }
503512 gf .clientID = t .clientID
@@ -509,7 +518,7 @@ func (t *TunnelTransport) readFromPeerTunnel() {
509518 case TMHeartBeatPingSeq :
510519 go t .sendHeartBeatResponse ()
511520 case TMHeartBeatPongSeq :
512- log .Info ("TT: Received Hearbeat Pong" )
521+ log .Info ("TT: Received Heartbeat Pong" )
513522
514523 case TMCreateConnSeq :
515524 log .Debug ("TT: Received Create Connection Signal" )
@@ -602,14 +611,14 @@ Loop:
602611 log .Errorf ("TT: Send heartbeat failed, stop this worker, error: \" %s\" " , err )
603612 break Loop
604613 }
605- log .Info ("TT: Sent Hearbeat Ping" )
614+ log .Info ("TT: Sent Heartbeat Ping" )
606615 case <- tick .C :
607616 err := t .sendHeartBeatRequest ()
608617 if err != nil {
609618 log .Errorf ("TT: Send heartbeat failed, stop this worker, error: \" %s\" " , err )
610619 break Loop
611620 }
612- log .Info ("TT: Sent Hearbeat Ping" )
621+ log .Info ("TT: Sent Heartbeat Ping" )
613622 case <- t .quit :
614623 // TODO if the read channel already registered to the read pool
615624
@@ -632,15 +641,16 @@ func (t *TunnelTransport) sendHeartBeatRequest() error {
632641
633642func (t * TunnelTransport ) sendHeartBeatResponse () {
634643 t .readChannel <- TMHeartBeatPongGotaFrame
635- log .Info ("TT: Sent Hearbeat Pong" )
644+ log .Info ("TT: Sent Heartbeat Pong" )
636645}
637646
638647func (t * TunnelTransport ) sendCloseTunnelRequest () error {
648+ log .Info ("TT: Sent Close Tunnel request" )
639649 err := WriteNBytes (t .rw , HeaderLength , TMCloseTunnelBytes )
640650 if err != nil {
641- return err
651+ log . Errorf ( "TT: Sent Close Tunnel request error: %s" , err )
642652 }
643- return nil
653+ return err
644654}
645655
646656func (t * TunnelTransport ) sendCloseTunnelResponse () {
@@ -651,6 +661,7 @@ func (t *TunnelTransport) sendCloseTunnelResponse() {
651661 log .Info ("TT: Sent Close Tunnel response" )
652662 t .readStopped = true
653663 close (t .quit )
664+ t .Stop ()
654665}
655666
656667// Start method starts the run loop for the worker, listening for a quit channel in
@@ -665,23 +676,29 @@ func (t *TunnelTransport) Stop() {
665676 t .mutex .Lock ()
666677 defer t .mutex .Unlock ()
667678
679+ defer func () {
680+ if t .mode == ActiveMode {
681+ ShutdownGota ()
682+ }
683+ }()
684+
668685 if t .readStopped && t .writeStopped {
669686 return
670687 }
688+
671689 // close a channel will trigger all reading from the channel to return immediately
672690 close (t .quit )
673691 timeout := 0
674692 for {
693+ if t .readStopped && t .writeStopped {
694+ return
695+ }
675696 if timeout < TMHeartBeatSecond {
676697 time .Sleep (time .Second )
677698 timeout ++
678699 } else {
679- if t .readStopped && t .writeStopped {
680- return
681- } else {
682- t .rw .Close ()
683- return
684- }
700+ t .rw .Close ()
701+ return
685702 }
686703 }
687704}
0 commit comments