@@ -35,6 +35,8 @@ func init() {
3535 grpc .EnableTracing = false
3636}
3737
38+ var binlogWriteTimeout = 15 * time .Second
39+
3840// pumpClient is the gRPC client to write binlog, it is opened on server start and never close,
3941// shared by all sessions.
4042var pumpClient binlog.PumpClient
@@ -54,6 +56,15 @@ func GetPumpClient() binlog.PumpClient {
5456 return client
5557}
5658
59+ // SetGRPCTimeout sets grpc timeout for writing binlog.
60+ func SetGRPCTimeout (timeout time.Duration ) {
61+ if timeout < 300 * time .Millisecond {
62+ log .Warnf ("set binlog grpc timeout %s ignored, use default value %s" , timeout , binlogWriteTimeout )
63+ return // Avoid invalid value
64+ }
65+ binlogWriteTimeout = timeout
66+ }
67+
5768// SetPumpClient sets the pump client instance.
5869func SetPumpClient (client binlog.PumpClient ) {
5970 pumpClientLock .Lock ()
@@ -109,7 +120,9 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
109120 // Retry many times because we may raise CRITICAL error here.
110121 for i := 0 ; i < 20 ; i ++ {
111122 var resp * binlog.WriteBinlogResp
112- resp , err = info .Client .WriteBinlog (context .Background (), req )
123+ ctx , cancel := context .WithTimeout (context .Background (), binlogWriteTimeout )
124+ resp , err = info .Client .WriteBinlog (ctx , req )
125+ cancel ()
113126 if err == nil && resp .Errmsg != "" {
114127 err = errors .New (resp .Errmsg )
115128 }
0 commit comments