Skip to content

Commit fabdae5

Browse files
pawandubeyshivnagarajan
authored andcommitted
Merge pull request #155 from Shopify/candidate-v15.0.3-shopify-11
Backport: set vreplication net read and net write timeout session vars to high values (cherry picked from commit 84ea974) (cherry picked from commit 5cc2dfc) (cherry picked from commit dde1210)
1 parent 33b5962 commit fabdae5

File tree

4 files changed

+25
-0
lines changed

4 files changed

+25
-0
lines changed

go/flags/endtoend/vttablet.txt

+2
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,8 @@ Flags:
410410
--vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s)
411411
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
412412
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
413+
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
414+
--vreplication_net_write_timeout int Session value of net_write_timeout for vreplication, in seconds (default 600)
413415
--vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s)
414416
--vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s)
415417
--vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table

go/vt/vttablet/flags.go

+7
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,18 @@ const (
2929

3030
var VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
3131

32+
var (
33+
VReplicationNetReadTimeout = 300
34+
VReplicationNetWriteTimeout = 600
35+
)
36+
3237
func init() {
3338
servenv.OnParseFor("vttablet", registerFlags)
3439
}
3540

3641
func registerFlags(fs *pflag.FlagSet) {
3742
fs.Int64Var(&VReplicationExperimentalFlags, "vreplication_experimental_flags", VReplicationExperimentalFlags,
3843
"(Bitmask) of experimental features in vreplication to enable")
44+
fs.IntVar(&VReplicationNetReadTimeout, "vreplication_net_read_timeout", VReplicationNetReadTimeout, "Session value of net_read_timeout for vreplication, in seconds")
45+
fs.IntVar(&VReplicationNetWriteTimeout, "vreplication_net_write_timeout", VReplicationNetWriteTimeout, "Session value of net_write_timeout for vreplication, in seconds")
3946
}

go/vt/vttablet/tabletmanager/vreplication/controller.go

+8
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"sync/atomic"
2525
"time"
2626

27+
"vitess.io/vitess/go/vt/vttablet"
28+
2729
"google.golang.org/protobuf/encoding/prototext"
2830

2931
"vitess.io/vitess/go/vt/discovery"
@@ -227,6 +229,12 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
227229
if _, err := dbClient.ExecuteFetch("set names 'binary'", 10000); err != nil {
228230
return err
229231
}
232+
if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 10000); err != nil {
233+
return err
234+
}
235+
if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 10000); err != nil {
236+
return err
237+
}
230238
// We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid.
231239
if _, err := dbClient.ExecuteFetch("set @@session.sql_mode = CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO')", 10000); err != nil {
232240
return err

go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go

+8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"sync"
2323
"time"
2424

25+
"vitess.io/vitess/go/vt/vttablet"
26+
2527
"vitess.io/vitess/go/mysql/collations"
2628
"vitess.io/vitess/go/mysql/replication"
2729
"vitess.io/vitess/go/sqltypes"
@@ -136,6 +138,12 @@ func (rs *rowStreamer) Stream() error {
136138
if _, err := rs.conn.ExecuteFetch("set names 'binary'", 1, false); err != nil {
137139
return err
138140
}
141+
if _, err := rs.conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil {
142+
return err
143+
}
144+
if _, err := rs.conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil {
145+
return err
146+
}
139147
}
140148
return rs.streamQuery(rs.send)
141149
}

0 commit comments

Comments
 (0)