Skip to content

Commit 410ae3a

Browse files
authored
client: use singleflight to reduce request press (#10632)
close #10633 Signed-off-by: tongjian <1045931706@qq.com>
1 parent f941a7e commit 410ae3a

3 files changed

Lines changed: 51 additions & 18 deletions

File tree

client/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/stretchr/testify v1.9.0
1717
go.uber.org/goleak v1.1.11
1818
go.uber.org/zap v1.24.0
19+
golang.org/x/sync v0.19.0
1920
google.golang.org/grpc v1.75.1
2021
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9
2122
)

client/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
130130
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
131131
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
132132
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
133+
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
134+
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
133135
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
134136
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
135137
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

client/servicediscovery/service_discovery.go

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
"go.uber.org/zap"
29+
"golang.org/x/sync/singleflight"
2930
"google.golang.org/grpc"
3031
"google.golang.org/grpc/codes"
3132
healthpb "google.golang.org/grpc/health/grpc_health_v1"
@@ -443,6 +444,8 @@ type serviceDiscovery struct {
443444
tlsCfg *tls.Config
444445
// Client option.
445446
option *opt.Option
447+
448+
flight singleflight.Group
446449
}
447450

448451
// NewDefaultServiceDiscovery returns a new default service discovery-based client.
@@ -474,6 +477,7 @@ func NewServiceDiscovery(
474477
keyspaceID: keyspaceID,
475478
tlsCfg: tlsCfg,
476479
option: option,
480+
flight: singleflight.Group{},
477481
}
478482
pdsd.callbacks.setServiceModeUpdateCallback(serviceModeUpdateCb)
479483
urls = tlsutil.AddrsToURLs(urls, tlsCfg)
@@ -919,18 +923,31 @@ func (c *serviceDiscovery) getClusterInfo(ctx context.Context, url string, timeo
919923
}
920924
start := time.Now()
921925
defer func() { metrics.InternalCmdDurationGetClusterInfo.Observe(time.Since(start).Seconds()) }()
922-
clusterInfo, err := pdpb.NewPDClient(cc).GetClusterInfo(ctx, &pdpb.GetClusterInfoRequest{})
923-
if err != nil {
924-
metrics.InternalCmdFailedDurationGetClusterInfo.Observe(time.Since(start).Seconds())
925-
attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String())
926-
return nil, errs.ErrClientGetClusterInfo.Wrap(attachErr).GenWithStackByCause()
927-
}
928-
if clusterInfo.GetHeader().GetError() != nil {
926+
key := "GetClusterInfo-" + url
927+
r := c.flight.DoChan(key, func() (any, error) {
928+
return pdpb.NewPDClient(cc).GetClusterInfo(ctx, &pdpb.GetClusterInfoRequest{})
929+
})
930+
select {
931+
case res := <-r:
932+
err = res.Err
933+
if err != nil {
934+
metrics.InternalCmdFailedDurationGetClusterInfo.Observe(time.Since(start).Seconds())
935+
attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String())
936+
return nil, errs.ErrClientGetClusterInfo.Wrap(attachErr).GenWithStackByCause()
937+
}
938+
val := res.Val
939+
clusterInfo := val.(*pdpb.GetClusterInfoResponse)
940+
if clusterInfo.GetHeader().GetError() != nil {
941+
metrics.InternalCmdFailedDurationGetClusterInfo.Observe(time.Since(start).Seconds())
942+
attachErr := errors.Errorf("error:%s target:%s status:%s", clusterInfo.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
943+
return nil, errs.ErrClientGetClusterInfo.Wrap(attachErr).GenWithStackByCause()
944+
}
945+
return clusterInfo, nil
946+
case <-ctx.Done():
947+
attachErr := errors.Errorf("error:%s target:%s status:%s", ctx.Err(), cc.Target(), cc.GetState().String())
929948
metrics.InternalCmdFailedDurationGetClusterInfo.Observe(time.Since(start).Seconds())
930-
attachErr := errors.Errorf("error:%s target:%s status:%s", clusterInfo.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
931949
return nil, errs.ErrClientGetClusterInfo.Wrap(attachErr).GenWithStackByCause()
932950
}
933-
return clusterInfo, nil
934951
}
935952

936953
func (c *serviceDiscovery) getMembers(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetMembersResponse, error) {
@@ -942,18 +959,31 @@ func (c *serviceDiscovery) getMembers(ctx context.Context, url string, timeout t
942959
}
943960
start := time.Now()
944961
defer func() { metrics.InternalCmdDurationGetMembers.Observe(time.Since(start).Seconds()) }()
945-
members, err := pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{})
946-
if err != nil {
947-
metrics.InternalCmdFailedDurationGetMembers.Observe(time.Since(start).Seconds())
948-
attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String())
949-
return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause()
950-
}
951-
if members.GetHeader().GetError() != nil {
962+
key := "GetMembers-" + url
963+
r := c.flight.DoChan(key, func() (any, error) {
964+
return pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{})
965+
})
966+
select {
967+
case res := <-r:
968+
err = res.Err
969+
if err != nil {
970+
metrics.InternalCmdFailedDurationGetMembers.Observe(time.Since(start).Seconds())
971+
attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String())
972+
return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause()
973+
}
974+
val := res.Val
975+
members := val.(*pdpb.GetMembersResponse)
976+
if members.GetHeader().GetError() != nil {
977+
metrics.InternalCmdFailedDurationGetMembers.Observe(time.Since(start).Seconds())
978+
attachErr := errors.Errorf("error:%s target:%s status:%s", members.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
979+
return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause()
980+
}
981+
return members, nil
982+
case <-ctx.Done():
983+
attachErr := errors.Errorf("error:%s target:%s status:%s", ctx.Err(), cc.Target(), cc.GetState().String())
952984
metrics.InternalCmdFailedDurationGetMembers.Observe(time.Since(start).Seconds())
953-
attachErr := errors.Errorf("error:%s target:%s status:%s", members.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
954985
return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause()
955986
}
956-
return members, nil
957987
}
958988

959989
func (c *serviceDiscovery) updateURLs(members []*pdpb.Member) {

0 commit comments

Comments
 (0)