@@ -75,10 +75,36 @@ func (st *syncStream) readMsgLoop() {
7575 default :
7676 msg , err := st .readMsg ()
7777 if err != nil {
78- if err := st .Close ("read msg failed" , false ); err != nil {
79- st .logger .Err (err ).Msg ("failed to close sync stream" )
78+ // Classify error to determine appropriate handling
79+ errorType , errorDesc := sttypes .ClassifyStreamError (err )
80+ criticalErr := ! sttypes .IsRecoverableError (errorType )
81+
82+ // Log error with classification
83+ if criticalErr {
84+ st .logger .Warn ().
85+ Str ("streamID" , string (st .ID ())).
86+ Str ("errorType" , errorType .String ()).
87+ Str ("description" , errorDesc ).
88+ Bool ("critical" , true ).
89+ Msg ("critical error, closing stream" )
90+ } else {
91+ st .logger .Info ().
92+ Str ("streamID" , string (st .ID ())).
93+ Str ("errorType" , errorType .String ()).
94+ Str ("description" , errorDesc ).
95+ Bool ("recoverable" , true ).
96+ Msg ("recoverable error, continuing stream" )
8097 }
81- return
98+
99+ // Only close stream for non-recoverable errors
100+ if criticalErr {
101+ if err := st .Close ("read msg failed" , criticalErr ); err != nil {
102+ st .logger .Err (err ).Msg ("failed to close sync stream" )
103+ }
104+ return
105+ }
106+ // For recoverable errors, continue the loop
107+ continue
82108 }
83109 if msg == nil {
84110 if err := st .Close ("remote closed stream" , false ); err != nil {
@@ -134,12 +160,17 @@ func (st *syncStream) handleReqLoop() {
134160 err := st .handleReq (req )
135161
136162 if err != nil {
137- st .logger .Info ().Err (err ).Str ("request" , req .String ()).
138- Msg ("handle request error. Closing stream" )
139- if err := st .Close ("handle request error" , false ); err != nil {
140- st .logger .Err (err ).Msg ("failed to close sync stream" )
163+ st .logger .Error ().Err (err ).Str ("request" , req .String ()).
164+ Msg ("handle request by sync stream failed" )
165+ // Use the centralized error handling to determine if stream should be closed
166+ if sttypes .ShouldCloseStream (err ) {
167+ st .logger .Error ().Err (err ).Str ("request" , req .String ()).
168+ Msg ("sync stream critical error. Closing stream" )
169+ if err := st .Close ("stream error" , false ); err != nil {
170+ st .logger .Err (err ).Msg ("failed to close sync stream" )
171+ }
172+ return
141173 }
142- return
143174 }
144175
145176 case <- st .closeC :
@@ -443,26 +474,14 @@ func (st *syncStream) readMsg() (*syncpb.Message, error) {
443474 // Use progress-based reading with the tracker from BaseStream
444475 b , err := st .ReadBytesWithProgress (st .GetProgressTracker ())
445476 if err != nil {
446- // Log progress timeout specifically
447- if err .Error () == "progress timeout" {
448- st .logger .Warn ().
449- Str ("streamID" , string (st .ID ())).
450- Msg ("stream timeout due to lack of progress" )
451- }
452- // Log stream closure specifically
453- if err .Error () == "stream closed" {
454- st .logger .Debug ().
455- Str ("streamID" , string (st .ID ())).
456- Msg ("stream closed by remote peer" )
457- }
458477 return nil , err
459478 }
460479 if b == nil || len (b ) == 0 {
461480 // This should not happen
462481 st .logger .Warn ().
463482 Str ("streamID" , string (st .ID ())).
464483 Msg ("received empty message data" )
465- return nil , errors .New ("empty message data" )
484+ return nil , errors .Wrap ( errors . New ("empty message data" ), "unexpected empty message " )
466485 }
467486 var msg = & syncpb.Message {}
468487 if err := protobuf .Unmarshal (b , msg ); err != nil {
0 commit comments