@@ -16,7 +16,8 @@ import (
1616)
1717
1818type dgramStream struct {
19- ctx context.Context
19+ cancel context.CancelFunc
20+
2021 lines chan <- * logline.LogLine
2122
2223 scheme string // Datagram scheme, either "unixgram" or "udp".
@@ -25,17 +26,15 @@ type dgramStream struct {
2526 mu sync.RWMutex // protects following fields
2627 completed bool // This pipestream is completed and can no longer be used.
2728 lastReadTime time.Time // Last time a log line was read from this named pipe
28-
29- stopOnce sync.Once // Ensure stopChan only closed once.
30- stopChan chan struct {} // Close to start graceful shutdown.
3129}
3230
33- func newDgramStream (ctx context.Context , wg * sync.WaitGroup , waker waker.Waker , scheme , address string , lines chan <- * logline.LogLine ) (LogStream , error ) {
31+ func newDgramStream (ctx context.Context , wg * sync.WaitGroup , waker waker.Waker , scheme , address string , lines chan <- * logline.LogLine , oneShot OneShotMode ) (LogStream , error ) {
3432 if address == "" {
3533 return nil , ErrEmptySocketAddress
3634 }
37- ss := & dgramStream {ctx : ctx , scheme : scheme , address : address , lastReadTime : time .Now (), lines : lines , stopChan : make (chan struct {})}
38- if err := ss .stream (ctx , wg , waker ); err != nil {
35+ ctx , cancel := context .WithCancel (ctx )
36+ ss := & dgramStream {cancel : cancel , scheme : scheme , address : address , lastReadTime : time .Now (), lines : lines }
37+ if err := ss .stream (ctx , wg , waker , oneShot ); err != nil {
3938 return nil , err
4039 }
4140 return ss , nil
@@ -50,22 +49,22 @@ func (ss *dgramStream) LastReadTime() time.Time {
5049// The read buffer size for datagrams.
5150const datagramReadBufferSize = 131072
5251
53- func (ss * dgramStream ) stream (ctx context.Context , wg * sync.WaitGroup , waker waker.Waker ) error {
52+ func (ss * dgramStream ) stream (ctx context.Context , wg * sync.WaitGroup , waker waker.Waker , oneShot OneShotMode ) error {
5453 c , err := net .ListenPacket (ss .scheme , ss .address )
5554 if err != nil {
5655 logErrors .Add (ss .address , 1 )
5756 return err
5857 }
59- glog .V (2 ).Infof ("opened new datagram socket %v" , c )
58+ glog .V (2 ).Infof ("stream(%s:%s): opened new datagram socket %v" , ss . scheme , ss . address , c )
6059 b := make ([]byte , datagramReadBufferSize )
6160 partial := bytes .NewBufferString ("" )
6261 var total int
6362 wg .Add (1 )
6463 go func () {
6564 defer wg .Done ()
6665 defer func () {
67- glog .V (2 ).Infof ("%v: read total %d bytes from %s " , c , total , ss .address )
68- glog .V (2 ).Infof ("%v: closing connection" , c )
66+ glog .V (2 ).Infof ("stream(%s:%s): read total %d bytes" , ss . scheme , ss .address , total )
67+ glog .V (2 ).Infof ("stream(%s:%s): closing connection" , ss . scheme , ss . address )
6968 err := c .Close ()
7069 if err != nil {
7170 logErrors .Add (ss .address , 1 )
@@ -83,15 +82,25 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
8382
8483 for {
8584 n , _ , err := c .ReadFrom (b )
86- glog .V (2 ).Infof ("%v: read %d bytes, err is %v" , c , n , err )
85+ glog .V (2 ).Infof ("stream(%s:%s): read %d bytes, err is %v" , ss . scheme , ss . address , n , err )
8786
8887 // This is a test-only trick that says if we've already put this
8988 // logstream in graceful shutdown, then a zero-byte read is
9089 // equivalent to an "EOF" in connection and file oriented streams.
9190 if n == 0 {
91+ if oneShot {
92+ glog .V (2 ).Infof ("stream(%s:%s): exiting because zero byte read and one shot" , ss .scheme , ss .address )
93+ if partial .Len () > 0 {
94+ sendLine (ctx , ss .address , partial , ss .lines )
95+ }
96+ return
97+ }
9298 select {
93- case <- ss .stopChan :
94- glog .V (2 ).Infof ("%v: exiting because zero byte read after Stop" , c )
99+ case <- ctx .Done ():
100+ glog .V (2 ).Infof ("stream(%s:%s): exiting because zero byte read after cancellation" , ss .scheme , ss .address )
101+ if partial .Len () > 0 {
102+ sendLine (ctx , ss .address , partial , ss .lines )
103+ }
95104 return
96105 default :
97106 }
@@ -100,7 +109,7 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
100109 if n > 0 {
101110 total += n
102111 //nolint:contextcheck
103- decodeAndSend (ss . ctx , ss .lines , ss .address , n , b [:n ], partial )
112+ decodeAndSend (ctx , ss .lines , ss .address , n , b [:n ], partial )
104113 ss .mu .Lock ()
105114 ss .lastReadTime = time .Now ()
106115 ss .mu .Unlock ()
@@ -110,28 +119,23 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
110119 if partial .Len () > 0 {
111120 sendLine (ctx , ss .address , partial , ss .lines )
112121 }
113- glog .V (2 ).Infof ("%v: exiting, stream has error %s" , c , err )
122+ glog .V (2 ).Infof ("stream(%s:%s): exiting, stream has error %s" , ss . scheme , ss . address , err )
114123 return
115124 }
116125
117126 // Yield and wait
118- glog .V (2 ).Infof ("%v: waiting" , c )
127+ glog .V (2 ).Infof ("stream(%s:%s): waiting" , ss . scheme , ss . address )
119128 select {
120- case <- ss . stopChan :
129+ case <- ctx . Done () :
121130 // We may have started waiting here when the stop signal
122131 // arrives, but since that wait the file may have been
123132 // written to. The file is not technically yet at EOF so
124133 // we need to go back and try one more read. We'll exit
125134 // the stream in the zero byte handler above.
126- glog .V (2 ).Infof ("%v: Stopping after next zero byte read" , c )
127- case <- ctx .Done ():
128- // Exit immediately; a cancelled context will set an immediate
129- // deadline on the next read which will cause us to exit then,
130- // so don't bother going around the loop again.
131- return
135+ glog .V (2 ).Infof ("stream(%s:%s): Stopping after next zero byte read" , ss .scheme , ss .address )
132136 case <- waker .Wake ():
133137 // sleep until next Wake()
134- glog .V (2 ).Infof ("%v: Wake received" , c )
138+ glog .V (2 ).Infof ("stream(%s:%s): Wake received" , ss . scheme , ss . address )
135139 }
136140 }
137141 }()
@@ -145,8 +149,6 @@ func (ss *dgramStream) IsComplete() bool {
145149}
146150
147151func (ss * dgramStream ) Stop () {
148- glog .V (2 ).Infof ("Stop received on datagram stream." )
149- ss .stopOnce .Do (func () {
150- close (ss .stopChan )
151- })
152+ glog .V (2 ).Infof ("stream(%s:%s): Stop received on datagram stream." , ss .scheme , ss .address )
153+ ss .cancel ()
152154}
0 commit comments