Skip to content

Commit 62369b4

Browse files
Do not return EOF directly, returning nil will send EOF to client (#142)
* Do not return EOF directly, returning nil will send EOF to client * Fix lint
1 parent 4ed63dd commit 62369b4

3 files changed

Lines changed: 73 additions & 18 deletions

File tree

proxy/admin_stream_transfer.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55
"io"
66
"sync"
7-
"sync/atomic"
87
"time"
98

109
"go.temporal.io/api/serviceerror"
@@ -53,7 +52,6 @@ func transferSourceToTarget(
5352
targetStreamServer adminservice.AdminService_StreamWorkflowReplicationMessagesServer,
5453
wg *sync.WaitGroup,
5554
shutdownChan channel.ShutdownOnce,
56-
sendEOFToServer *atomic.Bool,
5755
directionLabel string,
5856
logger log.Logger,
5957
) {
@@ -78,13 +76,11 @@ func transferSourceToTarget(
7876
if err == io.EOF {
7977
logger.Debug("sourceStreamClient.Recv encountered EOF", tag.Error(err))
8078
metrics.AdminServiceStreamTerminatedCount.WithLabelValues(directionLabel, "source").Inc()
81-
sendEOFToServer.Store(true)
8279
return
8380
}
8481

8582
if err != nil {
8683
logger.Error("sourceStreamClient.Recv encountered error", tag.Error(err))
87-
sendEOFToServer.Store(true)
8884
return
8985
}
9086
switch attr := resp.GetAttributes().(type) {
@@ -98,15 +94,13 @@ func transferSourceToTarget(
9894
logger.Debug("targetStreamServer.Send encountered EOF", tag.Error(err))
9995
metrics.AdminServiceStreamTerminatedCount.WithLabelValues(directionLabel, "target").Inc()
10096
}
101-
sendEOFToServer.Store(true)
10297
return
10398
}
10499
metrics.AdminServiceStreamReqCount.WithLabelValues(directionLabel).Inc()
105100
default:
106101
logger.Error("sourceStreamClient.Recv encountered error", tag.Error(serviceerror.NewInternal(fmt.Sprintf(
107102
"StreamWorkflowReplicationMessages encountered unknown type: %T %v", attr, attr,
108103
))))
109-
sendEOFToServer.Store(true)
110104
return
111105
}
112106
}
@@ -117,7 +111,6 @@ func transferTargetToSource(
117111
targetStreamServer adminservice.AdminService_StreamWorkflowReplicationMessagesServer,
118112
wg *sync.WaitGroup,
119113
shutdownChan channel.ShutdownOnce,
120-
sendEOFToServer *atomic.Bool,
121114
directionLabel string,
122115
logger log.Logger,
123116
) {
@@ -159,7 +152,6 @@ func transferTargetToSource(
159152
if err == io.EOF {
160153
logger.Debug("targetStreamServer.Recv encountered EOF", tag.Error(err))
161154
metrics.AdminServiceStreamTerminatedCount.WithLabelValues(directionLabel, "target").Inc()
162-
sendEOFToServer.Store(true)
163155
return
164156
}
165157

@@ -178,15 +170,13 @@ func transferTargetToSource(
178170
logger.Debug("sourceStreamClient.Send encountered EOF", tag.Error(err))
179171
metrics.AdminServiceStreamTerminatedCount.WithLabelValues(directionLabel, "source").Inc()
180172
}
181-
sendEOFToServer.Store(true)
182173
return
183174
}
184175
metrics.AdminServiceStreamRespCount.WithLabelValues(directionLabel).Inc()
185176
default:
186177
logger.Error("targetStreamServer.Recv encountered error", tag.Error(serviceerror.NewInternal(fmt.Sprintf(
187178
"StreamWorkflowReplicationMessages encountered unknown type: %T %v", attr, attr,
188179
))))
189-
sendEOFToServer.Store(true)
190180
return
191181
}
192182
}

proxy/adminservice.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package proxy
33
import (
44
"context"
55
"fmt"
6-
"io"
76
"sync"
8-
"sync/atomic"
97
"time"
108

119
"go.temporal.io/api/serviceerror"
@@ -299,18 +297,16 @@ func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages(
299297
// Scenario 2: Local disconnects. targetStreamServer.Recv will return EOF. This unblocks transferTargetToSource and sets shutdownChan.
300298
// transferSourceToTarget needs to be unblocked from sourceStreamClient.Recv
301299
shutdownChan := channel.NewShutdownOnce()
302-
sendEOFToServer := atomic.Bool{}
303300
var wg sync.WaitGroup
304301
wg.Add(2)
305-
go transferTargetToSource(sourceStreamClient, targetStreamServer, &wg, shutdownChan, &sendEOFToServer, directionLabel, logger)
306-
go transferSourceToTarget(sourceStreamClient, targetStreamServer, &wg, shutdownChan, &sendEOFToServer, directionLabel, logger)
302+
go transferTargetToSource(sourceStreamClient, targetStreamServer, &wg, shutdownChan, directionLabel, logger)
303+
go transferSourceToTarget(sourceStreamClient, targetStreamServer, &wg, shutdownChan, directionLabel, logger)
307304
wg.Wait()
308305

309306
streamDuration := time.Since(streamStartTime)
310307
metrics.AdminServiceStreamDuration.WithLabelValues(directionLabel).Observe(streamDuration.Seconds())
311308
metrics.AdminServiceStreamsClosedCount.WithLabelValues(directionLabel).Inc()
312-
if sendEOFToServer.Load() {
313-
return io.EOF
314-
}
309+
// Do not try to transfer EOF from the source here. Just returning "nil" is sufficient to terminate the stream
310+
// to the client.
315311
return nil
316312
}

proxy/test/wiring_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
11
package proxy
22

33
import (
4+
"context"
45
"fmt"
56
"io"
7+
"net"
68
"net/http"
9+
"sync"
710
"testing"
811
"time"
912

13+
"github.com/gogo/status"
1014
"github.com/stretchr/testify/assert"
1115
"go.temporal.io/api/workflowservice/v1"
1216
"go.temporal.io/server/api/adminservice/v1"
1317
"go.temporal.io/server/common/log"
18+
"go.temporal.io/server/common/log/tag"
19+
"google.golang.org/grpc"
20+
"google.golang.org/grpc/codes"
21+
"google.golang.org/grpc/credentials/insecure"
1422

1523
"github.com/temporalio/s2s-proxy/config"
1624
)
@@ -52,6 +60,67 @@ var (
5260
logger = log.NewTestLogger()
5361
)
5462

63+
type hangupAdminServer struct {
64+
adminservice.UnimplementedAdminServiceServer
65+
}
66+
67+
func (s *hangupAdminServer) StreamWorkflowReplicationMessages(server adminservice.AdminService_StreamWorkflowReplicationMessagesServer) error {
68+
go func() {
69+
for {
70+
_, err := server.Recv()
71+
if status.Code(err) == codes.Canceled {
72+
logger.Info("Client closed")
73+
return
74+
} else if err != nil {
75+
logger.Info("Got a message with error", tag.Error(err))
76+
}
77+
}
78+
}()
79+
wg := sync.WaitGroup{}
80+
wg.Add(1)
81+
go func() {
82+
err := server.Send(&adminservice.StreamWorkflowReplicationMessagesResponse{})
83+
if err != nil {
84+
logger.Info("Got a message with error", tag.Error(err))
85+
}
86+
err = server.Send(&adminservice.StreamWorkflowReplicationMessagesResponse{})
87+
if err != nil {
88+
logger.Info("Got a message with error", tag.Error(err))
89+
}
90+
wg.Done()
91+
}()
92+
wg.Wait()
93+
return nil
94+
}
95+
96+
func TestEOFFromServer(t *testing.T) {
97+
adminHandler := &hangupAdminServer{}
98+
grpcHost := grpc.NewServer()
99+
adminservice.RegisterAdminServiceServer(grpcHost, adminHandler)
100+
listener, _ := net.Listen("tcp", "localhost:8566")
101+
go func() {
102+
_ = grpcHost.Serve(listener)
103+
}()
104+
time.Sleep(10 * time.Millisecond)
105+
client, err := grpc.NewClient("localhost:8566", grpc.WithTransportCredentials(insecure.NewCredentials()))
106+
assert.NoError(t, err)
107+
adminServiceClient := adminservice.NewAdminServiceClient(client)
108+
clientCtx, cancelCtx := context.WithCancel(t.Context())
109+
streamServer, err := adminServiceClient.StreamWorkflowReplicationMessages(clientCtx)
110+
assert.NoError(t, err)
111+
err = streamServer.Send(&adminservice.StreamWorkflowReplicationMessagesRequest{})
112+
assert.NoError(t, err)
113+
_, err = streamServer.Recv()
114+
assert.NoError(t, err)
115+
_, err = streamServer.Recv()
116+
assert.NoError(t, err)
117+
_, err = streamServer.Recv()
118+
assert.True(t, err == io.EOF, "Should have thrown io.EOF, but got %v instead! Error() returned: %v", err, err.Error())
119+
grpcHost.Stop()
120+
_ = listener.Close()
121+
cancelCtx()
122+
}
123+
55124
func TestWiringWithEchoService(t *testing.T) {
56125
echoServer := newEchoServer(echoServerInfo, echoClientInfo, "EchoServer", logger, nil)
57126
echoClient := newEchoServer(echoClientInfo, echoServerInfo, "EchoClient", logger, nil)

0 commit comments

Comments
 (0)