Skip to content

Commit dde8e57

Browse files
committed
perf: reduce streamx integration testing time
1 parent d76ba5a commit dde8e57

3 files changed

Lines changed: 70 additions & 77 deletions

File tree

streamx/cancel.go

Lines changed: 31 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/cloudwego/kitex/client"
2929
"github.com/cloudwego/kitex/client/callopt/streamcall"
3030
"github.com/cloudwego/kitex/pkg/kerrors"
31-
"github.com/cloudwego/kitex/pkg/klog"
3231
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
3332
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
3433
"github.com/cloudwego/kitex/pkg/rpcinfo"
@@ -63,15 +62,17 @@ const (
6362

6463
scenarioKey = "SCENARIO"
6564
bizSpecialMessage = "biz_special_message"
65+
66+
cancelInterval = 20 * time.Millisecond
6667
)
6768

6869
func RunTestCancelServer(listenAddr string) server.Server {
6970
addr, _ := net.ResolveTCPAddr("tcp", listenAddr)
70-
evtHdlTracer, testChan := newServerEventHandlerTracer(nil)
71+
evtHdlTracer, testChan := newServerEventHandlerTracer(CancelFinishChan)
7172
hdl := newServerMockEventHandler(testChan)
7273
svr := testcancelservice.NewServer(&cancelThriftImpl{evtHdl: hdl},
7374
server.WithServiceAddr(addr),
74-
server.WithExitWaitTime(1*time.Second),
75+
server.WithExitWaitTime(100*time.Millisecond),
7576
server.WithTracer(NewTracer()),
7677
server.WithTracer(evtHdlTracer),
7778
server.WithMetaHandler(transmeta.ServerTTHeaderHandler),
@@ -172,9 +173,6 @@ func commonCancelBidiImpl[Req, Res any, Stream streaming.BidiStreamingServer[Req
172173
resGetter func(string) *Res,
173174
evtHdl *ServerMockEventHandler,
174175
) error {
175-
defer func() {
176-
CancelFinishChan <- struct{}{}
177-
}()
178176
t := <-CancelTChan
179177
scenario, ok := getScenario(ctx)
180178
test.Assert(t, ok)
@@ -204,7 +202,6 @@ func commonCancelBidiImpl[Req, Res any, Stream streaming.BidiStreamingServer[Req
204202
sendTimes++
205203
sErr := stream.Send(ctx, res)
206204
if sErr == nil {
207-
time.Sleep(50 * time.Millisecond)
208205
continue
209206
}
210207
verifyServerSideCancelCase(t, ctx, sErr, isGRPC)
@@ -227,9 +224,6 @@ func commonCancelClientImpl[Req, Res any, Stream streaming.ClientStreamingServer
227224
ctx context.Context, stream Stream,
228225
evtHdl *ServerMockEventHandler,
229226
) error {
230-
defer func() {
231-
CancelFinishChan <- struct{}{}
232-
}()
233227
t := <-CancelTChan
234228
isGRPC := isGRPCFunc(t, ctx)
235229

@@ -293,9 +287,6 @@ func commonCancelServerImpl[Res any, Stream streaming.ServerStreamingServer[Res]
293287
resGetter func(string) *Res,
294288
evtHdl *ServerMockEventHandler,
295289
) error {
296-
defer func() {
297-
CancelFinishChan <- struct{}{}
298-
}()
299290
t := <-CancelTChan
300291
isGRPC := isGRPCFunc(t, ctx)
301292

@@ -312,10 +303,9 @@ func commonCancelServerImpl[Res any, Stream streaming.ServerStreamingServer[Res]
312303
sendTimes++
313304
sErr := stream.Send(ctx, res)
314305
if sErr == nil {
315-
time.Sleep(50 * time.Millisecond)
316306
continue
317307
}
318-
t.Logf("scenario[%s] Recv err: %v", scenario, sErr)
308+
t.Logf("scenario[%s] Send err: %v", scenario, sErr)
319309
verifyServerSideCancelCase(t, ctx, sErr, isGRPC)
320310
break
321311
}
@@ -332,10 +322,9 @@ func commonCancelServerImpl[Res any, Stream streaming.ServerStreamingServer[Res]
332322
sendTimes++
333323
sErr := stream.Send(ctx, res)
334324
if sErr == nil {
335-
time.Sleep(50 * time.Millisecond)
336325
continue
337326
}
338-
klog.CtxErrorf(ctx, "scenario[%s] Recv err: %v", scenario, sErr)
327+
t.Logf("scenario[%s] Recv err: %v", scenario, sErr)
339328
verifyServerSideCancelCase(t, ctx, sErr, isGRPC)
340329
break
341330
}
@@ -363,7 +352,6 @@ func commonCancelServerImpl[Res any, Stream streaming.ServerStreamingServer[Res]
363352
sendTimes++
364353
sErr = stream.Send(ctx, res)
365354
if sErr == nil {
366-
time.Sleep(50 * time.Millisecond)
367355
continue
368356
}
369357
t.Logf("scenario[%s] Recv err: %v", scenario, sErr)
@@ -388,9 +376,9 @@ func getScenario(ctx context.Context) (string, bool) {
388376
return metainfo.GetValue(ctx, scenarioKey)
389377
}
390378

391-
func verifyClientSideCancelCase(t *testing.T, ctx context.Context, err error, isGRPC bool) {
379+
func verifyClientSideCancelCase(t *testing.T, ctx context.Context, err error, isGRPC, isSend bool) {
392380
verifyCancelContext(t, ctx)
393-
verifyClientSideCancelErr(t, err, isGRPC)
381+
verifyClientSideCancelErr(t, err, isGRPC, isSend)
394382
}
395383

396384
func verifyServerSideCancelCase(t *testing.T, ctx context.Context, err error, isGRPC bool) {
@@ -408,11 +396,16 @@ func verifyCancelContext(t *testing.T, ctx context.Context) {
408396
test.Assert(t, ctxDone)
409397
}
410398

411-
func verifyClientSideCancelErr(t *testing.T, err error, isGRPC bool) {
399+
func verifyClientSideCancelErr(t *testing.T, err error, isGRPC, isSend bool) {
412400
if isGRPC {
413401
st, ok := status.FromError(err)
414402
test.Assert(t, ok)
415-
test.Assert(t, st.Code() == codes.Canceled, st.Code())
403+
code := st.Code()
404+
if isSend {
405+
test.Assert(t, code == codes.Canceled || code == codes.Internal, code, st.Message())
406+
} else {
407+
test.Assert(t, code == codes.Canceled, code, st.Message())
408+
}
416409
} else {
417410
test.Assert(t, errors.Is(err, kerrors.ErrStreamingCanceled), err)
418411
}
@@ -587,15 +580,14 @@ func commonTestCancel[Req, Res any](t *testing.T, cli cancelClient[Req, Res], is
587580
cliSt, err := cli.CancelClient(ctx)
588581
test.Assert(t, err == nil, err)
589582
cancel()
590-
// todo: modify this timeout
591-
time.Sleep(6 * time.Second)
592-
err = cliSt.Send(cliSt.Context(), reqGetter(clientStreamingWithoutSendingAnyReq))
593-
verifyClientSideCancelCase(t, cliSt.Context(), err, isGRPC)
583+
// just for triggering cancel fast
584+
err = cliSt.RecvMsg(cliSt.Context(), reqGetter(clientStreamingWithoutSendingAnyReq))
585+
verifyClientSideCancelCase(t, cliSt.Context(), err, isGRPC, false)
594586
evtHdl.AssertCalledTimes(t, ClientCalledTimes{
595587
startTimes: 1,
596588
recvHeaderTimes: 0,
597-
recvTimes: 0,
598-
sendTimes: 1,
589+
recvTimes: 1,
590+
sendTimes: 0,
599591
finishTimes: 1,
600592
})
601593
})
@@ -617,18 +609,17 @@ func commonTestCancel[Req, Res any](t *testing.T, cli cancelClient[Req, Res], is
617609
cliSt, err := cli.CancelClient(ctx)
618610
test.Assert(t, err == nil, err)
619611
go func() {
620-
time.Sleep(200 * time.Millisecond)
612+
time.Sleep(cancelInterval)
621613
cancel()
622614
}()
623615
var sendTimes uint32
624616
for {
625617
sendTimes++
626618
err = cliSt.Send(cliSt.Context(), reqGetter(clientStreamingDuringNormalInteraction))
627619
if err == nil {
628-
time.Sleep(50 * time.Millisecond)
629620
continue
630621
}
631-
verifyClientSideCancelCase(t, cliSt.Context(), err, isGRPC)
622+
verifyClientSideCancelCase(t, cliSt.Context(), err, isGRPC, true)
632623
break
633624
}
634625
evtHdl.AssertCalledTimes(t, ClientCalledTimes{
@@ -680,11 +671,11 @@ func commonTestCancel[Req, Res any](t *testing.T, cli cancelClient[Req, Res], is
680671
ctx = setScenario(ctx, serverStreamingRemoteRespondingSlowly)
681672
srvSt, err := cli.CancelServer(ctx, reqGetter(serverStreamingRemoteRespondingSlowly))
682673
test.Assert(t, err == nil, err)
683-
time.Sleep(50 * time.Millisecond)
674+
time.Sleep(cancelInterval)
684675
cancel()
685676
CancelSendChan <- struct{}{}
686677
_, err = srvSt.Recv(srvSt.Context())
687-
verifyClientSideCancelCase(t, srvSt.Context(), err, isGRPC)
678+
verifyClientSideCancelCase(t, srvSt.Context(), err, isGRPC, false)
688679
evtHdl.AssertCalledTimes(t, ClientCalledTimes{
689680
startTimes: 1,
690681
recvHeaderTimes: 0,
@@ -708,18 +699,18 @@ func commonTestCancel[Req, Res any](t *testing.T, cli cancelClient[Req, Res], is
708699
srvSt, err := cli.CancelServer(ctx, reqGetter(serverStreamingDuringNormalInteraction))
709700
test.Assert(t, err == nil, err)
710701
go func() {
711-
time.Sleep(50 * time.Millisecond)
702+
time.Sleep(cancelInterval)
712703
cancel()
713704
}()
714705
var recvTimes uint32
715706
for {
716707
recvTimes++
717-
res, err := srvSt.Recv(srvSt.Context())
718-
if err == nil {
708+
res, rErr := srvSt.Recv(srvSt.Context())
709+
if rErr == nil {
719710
test.Assert(t, resMsgGetter(res) == serverStreamingDuringNormalInteraction, res)
720711
continue
721712
}
722-
verifyClientSideCancelCase(t, srvSt.Context(), err, isGRPC)
713+
verifyClientSideCancelCase(t, srvSt.Context(), rErr, isGRPC, false)
723714
break
724715
}
725716
evtHdl.AssertCalledTimes(t, ClientCalledTimes{
@@ -779,7 +770,7 @@ func commonTestCancel[Req, Res any](t *testing.T, cli cancelClient[Req, Res], is
779770
bidiSt, err := cli.CancelBidi(ctx)
780771
test.Assert(t, err == nil, err)
781772
go func() {
782-
time.Sleep(200 * time.Millisecond)
773+
time.Sleep(cancelInterval)
783774
cancel()
784775
}()
785776
var sendTimes, recvTimes uint32
@@ -791,10 +782,9 @@ func commonTestCancel[Req, Res any](t *testing.T, cli cancelClient[Req, Res], is
791782
sendTimes++
792783
sErr := bidiSt.Send(bidiSt.Context(), reqGetter(bidiStreamingIndependentSendRecv))
793784
if sErr == nil {
794-
time.Sleep(50 * time.Millisecond)
795785
continue
796786
}
797-
verifyClientSideCancelCase(t, bidiSt.Context(), sErr, isGRPC)
787+
verifyClientSideCancelCase(t, bidiSt.Context(), sErr, isGRPC, true)
798788
break
799789
}
800790
}()
@@ -807,7 +797,7 @@ func commonTestCancel[Req, Res any](t *testing.T, cli cancelClient[Req, Res], is
807797
test.Assert(t, resMsgGetter(res) == bidiStreamingIndependentSendRecv, res)
808798
continue
809799
}
810-
verifyClientSideCancelCase(t, bidiSt.Context(), rErr, isGRPC)
800+
verifyClientSideCancelCase(t, bidiSt.Context(), rErr, isGRPC, false)
811801
break
812802
}
813803
}()

streamx/normal.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func commonNormalUnaryImpl[Req, Res any](
159159
return nil, errors.New("metainfo is not set")
160160
}
161161
if reqMsgGetter(req) == "test_unary_timeout" {
162-
time.Sleep(500 * time.Millisecond)
162+
time.Sleep(100 * time.Millisecond)
163163
return resGetter("pong"), nil
164164
}
165165
if ctx.Value("test_unary_middleware_builder").(string) != "test_unary_middleware_builder" {
@@ -503,7 +503,7 @@ func RunNormalServer(listenAddr string) server.Server {
503503
var basicOpts = []client.Option{
504504
client.WithTracer(NewTracer()),
505505
client.WithMetaHandler(transmeta.ClientHTTP2Handler), client.WithMetaHandler(transmeta.ClientTTHeaderHandler),
506-
client.WithUnaryOptions(client.WithUnaryRPCTimeout(200*time.Millisecond),
506+
client.WithUnaryOptions(client.WithUnaryRPCTimeout(10*time.Millisecond),
507507
client.WithUnaryMiddleware(func(next endpoint.UnaryEndpoint) endpoint.UnaryEndpoint {
508508
return func(ctx context.Context, req, resp interface{}) (err error) {
509509
count := ctx.Value("test_unary_middleware").(*int)

0 commit comments

Comments
 (0)