Skip to content

Commit 41e2584

Browse files
austenLacyshanth96
authored andcommitted
Purge old schema versions from memory in historian (vitessio#13056)
* purge old schema versions from memory in historian Signed-off-by: austenLacy <[email protected]> * fix vttablet arg e2e test Signed-off-by: Austen Lacy <[email protected]> * only reassign historian schemas if necessary Signed-off-by: Austen Lacy <[email protected]> * dont use from_unixtime in query because already in unix Signed-off-by: Austen Lacy <[email protected]> --------- Signed-off-by: austenLacy <[email protected]> Signed-off-by: Austen Lacy <[email protected]> Co-authored-by: Austen Lacy <[email protected]>
1 parent 46f1def commit 41e2584

File tree

8 files changed

+164
-35
lines changed

8 files changed

+164
-35
lines changed

go/flags/endtoend/vttablet.txt

+1
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ Usage of vttablet:
256256
--s3_backup_storage_root string root prefix for all backup-related object names.
257257
--s3_backup_tls_skip_verify_cert skip the 'certificate is valid' check for SSL connections.
258258
--sanitize_log_messages Remove potentially sensitive information in tablet INFO, WARNING, and ERROR log messages such as query parameters.
259+
--schema-version-max-age-seconds int max age of schema version records to kept in memory by the vreplication historian
259260
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
260261
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
261262
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state

go/vt/vttablet/tabletserver/schema/engine.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func NewEngine(env tabletenv.Env) *Engine {
115115

116116
schemazHandler(se.GetSchema(), w, r)
117117
})
118-
se.historian = newHistorian(env.Config().TrackSchemaVersions, se.conns)
118+
se.historian = newHistorian(env.Config().TrackSchemaVersions, env.Config().SchemaVersionMaxAgeSeconds, se.conns)
119119
return se
120120
}
121121

@@ -627,8 +627,9 @@ func (se *Engine) handleHTTPSchema(response http.ResponseWriter) {
627627
// doesn't reload. Use SetTableForTests to set table schema.
628628
func NewEngineForTests() *Engine {
629629
se := &Engine{
630-
isOpen: true,
631-
tables: make(map[string]*Table),
630+
isOpen: true,
631+
tables: make(map[string]*Table),
632+
historian: newHistorian(false, 0, nil),
632633
}
633634
return se
634635
}

go/vt/vttablet/tabletserver/schema/engine_test.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func TestOpenAndReload(t *testing.T) {
7575
))
7676
firstReadRowsValue := 12
7777
AddFakeInnoDBReadRowsResult(db, firstReadRowsValue)
78-
se := newEngine(10, 10*time.Second, 10*time.Second, db)
78+
se := newEngine(10, 10*time.Second, 10*time.Second, 0, db)
7979
se.Open()
8080
defer se.Close()
8181

@@ -266,7 +266,7 @@ func TestReloadWithSwappedTables(t *testing.T) {
266266
firstReadRowsValue := 12
267267
AddFakeInnoDBReadRowsResult(db, firstReadRowsValue)
268268

269-
se := newEngine(10, 10*time.Second, 10*time.Second, db)
269+
se := newEngine(10, 10*time.Second, 10*time.Second, 0, db)
270270
se.Open()
271271
defer se.Close()
272272
want := initialSchema()
@@ -416,7 +416,7 @@ func TestOpenFailedDueToExecErr(t *testing.T) {
416416
schematest.AddDefaultQueries(db)
417417
want := "injected error"
418418
db.RejectQueryPattern(baseShowTablesPattern, want)
419-
se := newEngine(10, 1*time.Second, 1*time.Second, db)
419+
se := newEngine(10, 1*time.Second, 1*time.Second, 0, db)
420420
err := se.Open()
421421
if err == nil || !strings.Contains(err.Error(), want) {
422422
t.Errorf("se.Open: %v, want %s", err, want)
@@ -446,7 +446,7 @@ func TestOpenFailedDueToTableErr(t *testing.T) {
446446
})
447447

448448
AddFakeInnoDBReadRowsResult(db, 0)
449-
se := newEngine(10, 1*time.Second, 1*time.Second, db)
449+
se := newEngine(10, 1*time.Second, 1*time.Second, 0, db)
450450
err := se.Open()
451451
want := "Row count exceeded"
452452
if err == nil || !strings.Contains(err.Error(), want) {
@@ -458,7 +458,7 @@ func TestExportVars(t *testing.T) {
458458
db := fakesqldb.New(t)
459459
defer db.Close()
460460
schematest.AddDefaultQueries(db)
461-
se := newEngine(10, 1*time.Second, 1*time.Second, db)
461+
se := newEngine(10, 1*time.Second, 1*time.Second, 0, db)
462462
se.Open()
463463
defer se.Close()
464464
expvar.Do(func(kv expvar.KeyValue) {
@@ -470,7 +470,7 @@ func TestStatsURL(t *testing.T) {
470470
db := fakesqldb.New(t)
471471
defer db.Close()
472472
schematest.AddDefaultQueries(db)
473-
se := newEngine(10, 1*time.Second, 1*time.Second, db)
473+
se := newEngine(10, 1*time.Second, 1*time.Second, 0, db)
474474
se.Open()
475475
defer se.Close()
476476

@@ -500,7 +500,7 @@ func TestSchemaEngineCloseTickRace(t *testing.T) {
500500
})
501501
AddFakeInnoDBReadRowsResult(db, 12)
502502
// Start the engine with a small reload tick
503-
se := newEngine(10, 100*time.Millisecond, 1*time.Second, db)
503+
se := newEngine(10, 100*time.Millisecond, 1*time.Second, 0, db)
504504
err := se.Open()
505505
require.NoError(t, err)
506506

@@ -527,13 +527,14 @@ func TestSchemaEngineCloseTickRace(t *testing.T) {
527527
}
528528
}
529529

530-
func newEngine(queryCacheSize int, reloadTime time.Duration, idleTimeout time.Duration, db *fakesqldb.DB) *Engine {
530+
func newEngine(queryCacheSize int, reloadTime time.Duration, idleTimeout time.Duration, schemaMaxAgeSeconds int64, db *fakesqldb.DB) *Engine {
531531
config := tabletenv.NewDefaultConfig()
532532
config.QueryCacheSize = queryCacheSize
533533
config.SchemaReloadIntervalSeconds.Set(reloadTime)
534534
config.OltpReadPool.IdleTimeoutSeconds.Set(idleTimeout)
535535
config.OlapReadPool.IdleTimeoutSeconds.Set(idleTimeout)
536536
config.TxPool.IdleTimeoutSeconds.Set(idleTimeout)
537+
config.SchemaVersionMaxAgeSeconds = schemaMaxAgeSeconds
537538
se := NewEngine(tabletenv.NewEnv(config, "SchemaTest"))
538539
se.InitDBConfig(newDBConfigs(db).DbaWithDB())
539540
return se

go/vt/vttablet/tabletserver/schema/historian.go

+68-19
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ package schema
1818

1919
import (
2020
"context"
21-
"fmt"
2221
"sort"
2322
"sync"
23+
"time"
2424

2525
"google.golang.org/protobuf/proto"
2626

@@ -35,36 +35,40 @@ import (
3535
"vitess.io/vitess/go/vt/sqlparser"
3636
)
3737

38-
const getSchemaVersions = "select id, pos, ddl, time_updated, schemax from _vt.schema_version where id > %d order by id asc"
38+
const getInitialSchemaVersions = "select id, pos, ddl, time_updated, schemax from _vt.schema_version where time_updated > %d order by id asc"
39+
const getNextSchemaVersions = "select id, pos, ddl, time_updated, schemax from _vt.schema_version where id > %d order by id asc"
3940

4041
// vl defines the glog verbosity level for the package
4142
const vl = 10
4243

4344
// trackedSchema has the snapshot of the table at a given pos (reached by ddl)
4445
type trackedSchema struct {
45-
schema map[string]*binlogdatapb.MinimalTable
46-
pos mysql.Position
47-
ddl string
46+
schema map[string]*binlogdatapb.MinimalTable
47+
pos mysql.Position
48+
ddl string
49+
timeUpdated int64
4850
}
4951

5052
// historian implements the Historian interface by calling schema.Engine for the underlying schema
5153
// and supplying a schema for a specific version by loading the cached values from the schema_version table
5254
// The schema version table is populated by the Tracker
5355
type historian struct {
54-
conns *connpool.Pool
55-
lastID int64
56-
schemas []*trackedSchema
57-
mu sync.Mutex
58-
enabled bool
59-
isOpen bool
56+
conns *connpool.Pool
57+
lastID int64
58+
schemas []*trackedSchema
59+
mu sync.Mutex
60+
enabled bool
61+
isOpen bool
62+
schemaMaxAgeSeconds int64
6063
}
6164

6265
// newHistorian creates a new historian. It expects a schema.Engine instance
63-
func newHistorian(enabled bool, conns *connpool.Pool) *historian {
66+
func newHistorian(enabled bool, schemaMaxAgeSeconds int64, conns *connpool.Pool) *historian {
6467
sh := historian{
65-
conns: conns,
66-
lastID: 0,
67-
enabled: enabled,
68+
conns: conns,
69+
lastID: 0,
70+
enabled: enabled,
71+
schemaMaxAgeSeconds: schemaMaxAgeSeconds,
6872
}
6973
return &sh
7074
}
@@ -164,7 +168,17 @@ func (h *historian) loadFromDB(ctx context.Context) error {
164168
return err
165169
}
166170
defer conn.Recycle()
167-
tableData, err := conn.Exec(ctx, fmt.Sprintf(getSchemaVersions, h.lastID), 10000, true)
171+
172+
var tableData *sqltypes.Result
173+
if h.lastID == 0 && h.schemaMaxAgeSeconds > 0 { // only at vttablet start
174+
schemaMaxAge := time.Now().UTC().Add(time.Duration(-h.schemaMaxAgeSeconds) * time.Second)
175+
tableData, err = conn.Exec(ctx, sqlparser.BuildParsedQuery(getInitialSchemaVersions,
176+
schemaMaxAge.Unix()).Query, 10000, true)
177+
} else {
178+
tableData, err = conn.Exec(ctx, sqlparser.BuildParsedQuery(getNextSchemaVersions,
179+
h.lastID).Query, 10000, true)
180+
}
181+
168182
if err != nil {
169183
log.Infof("Error reading schema_tracking table %v, will operate with the latest available schema", err)
170184
return nil
@@ -177,6 +191,14 @@ func (h *historian) loadFromDB(ctx context.Context) error {
177191
h.schemas = append(h.schemas, trackedSchema)
178192
h.lastID = id
179193
}
194+
195+
if h.lastID != 0 && h.schemaMaxAgeSeconds > 0 {
196+
// To avoid keeping old schemas in memory which can lead to an eventual memory leak
197+
// we purge any older than h.schemaMaxAgeSeconds. Only needs to be done when adding
198+
// new schema rows.
199+
h.purgeOldSchemas()
200+
}
201+
180202
h.sortSchemas()
181203
return nil
182204
}
@@ -217,13 +239,40 @@ func (h *historian) readRow(row []sqltypes.Value) (*trackedSchema, int64, error)
217239
tables[t.Name] = t
218240
}
219241
tSchema := &trackedSchema{
220-
schema: tables,
221-
pos: pos,
222-
ddl: ddl,
242+
schema: tables,
243+
pos: pos,
244+
ddl: ddl,
245+
timeUpdated: timeUpdated,
223246
}
224247
return tSchema, id, nil
225248
}
226249

250+
func (h *historian) purgeOldSchemas() {
251+
maxAgeDuration := time.Duration(h.schemaMaxAgeSeconds) * time.Second
252+
shouldPurge := false
253+
254+
// check if we have any schemas we need to purge and only create the filtered
255+
// slice if necessary
256+
for _, s := range h.schemas {
257+
if time.Since(time.Unix(s.timeUpdated, 0)) > maxAgeDuration {
258+
shouldPurge = true
259+
break
260+
}
261+
}
262+
263+
if !shouldPurge {
264+
return
265+
}
266+
267+
filtered := make([]*trackedSchema, 0)
268+
for _, s := range h.schemas {
269+
if time.Since(time.Unix(s.timeUpdated, 0)) < maxAgeDuration {
270+
filtered = append(filtered, s)
271+
}
272+
}
273+
h.schemas = filtered
274+
}
275+
227276
// sortSchemas sorts entries in ascending order of gtid, ex: 40,44,48
228277
func (h *historian) sortSchemas() {
229278
sort.Slice(h.schemas, func(i int, j int) bool {

go/vt/vttablet/tabletserver/schema/historian_test.go

+76-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package schema
1919
import (
2020
"fmt"
2121
"testing"
22+
"time"
2223

2324
"github.com/stretchr/testify/require"
2425
"google.golang.org/protobuf/proto"
@@ -71,7 +72,7 @@ func getDbSchemaBlob(t *testing.T, tables map[string]*binlogdatapb.MinimalTable)
7172
}
7273

7374
func TestHistorian(t *testing.T) {
74-
se, db, cancel := getTestSchemaEngine(t)
75+
se, db, cancel := getTestSchemaEngine(t, 0)
7576
defer cancel()
7677

7778
se.EnableHistorian(false)
@@ -174,3 +175,77 @@ func TestHistorian(t *testing.T) {
174175
require.NoError(t, err)
175176
require.Equal(t, exp3, fmt.Sprintf("%v", tab))
176177
}
178+
179+
func TestHistorianPurgeOldSchemas(t *testing.T) {
180+
schemaVersionMaxAgeSeconds := 3600 // 1 hour
181+
se, db, cancel := getTestSchemaEngine(t, int64(schemaVersionMaxAgeSeconds))
182+
defer cancel()
183+
184+
gtidPrefix := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:"
185+
gtid1 := gtidPrefix + "1-10"
186+
ddl1 := "create table tracker_test (id int)"
187+
// create the first record 1 day ago so it gets purged from memory
188+
ts1 := time.Now().Add(time.Duration(-24) * time.Hour)
189+
_, _, _ = ddl1, ts1, db
190+
se.EnableHistorian(true)
191+
_, err := se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid1)
192+
require.Equal(t, "table t1 not found in vttablet schema", err.Error())
193+
var blob1 string
194+
195+
fields := []*querypb.Field{{
196+
Name: "id",
197+
Type: sqltypes.Int32,
198+
}, {
199+
Name: "pos",
200+
Type: sqltypes.VarBinary,
201+
}, {
202+
Name: "ddl",
203+
Type: sqltypes.VarBinary,
204+
}, {
205+
Name: "time_updated",
206+
Type: sqltypes.Int32,
207+
}, {
208+
Name: "schemax",
209+
Type: sqltypes.Blob,
210+
}}
211+
212+
table := getTable("t1", []string{"id1", "id2"}, []querypb.Type{querypb.Type_INT32, querypb.Type_INT32}, []int64{0})
213+
tables := make(map[string]*binlogdatapb.MinimalTable)
214+
tables["t1"] = table
215+
blob1 = getDbSchemaBlob(t, tables)
216+
db.AddQueryPattern("select id, pos, ddl, time_updated, schemax from _vt\\.schema_version where time_updated \\>.*", &sqltypes.Result{
217+
Fields: fields,
218+
Rows: [][]sqltypes.Value{
219+
{sqltypes.NewInt32(1), sqltypes.NewVarBinary(gtid1), sqltypes.NewVarBinary(ddl1), sqltypes.NewInt32(int32(ts1.Unix())), sqltypes.NewVarBinary(blob1)},
220+
},
221+
})
222+
require.Nil(t, se.RegisterVersionEvent())
223+
_, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid1)
224+
// validate the old schema has been purged
225+
require.Equal(t, "table t1 not found in vttablet schema", err.Error())
226+
require.Equal(t, 0, len(se.historian.schemas))
227+
228+
// add a second schema record row with a time_updated that won't be purged
229+
gtid2 := gtidPrefix + "1-20"
230+
_, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid2)
231+
require.Equal(t, "table t1 not found in vttablet schema", err.Error())
232+
233+
table = getTable("t1", []string{"id1", "id2"}, []querypb.Type{querypb.Type_INT32, querypb.Type_VARBINARY}, []int64{0})
234+
tables["t1"] = table
235+
blob2 := getDbSchemaBlob(t, tables)
236+
ddl2 := "alter table t1 modify column id2 varbinary"
237+
// set time_updated younger than the cutoff from historian.schemaMaxAgeSeconds
238+
ts2 := time.Now().Add(time.Duration(-60) * time.Second)
239+
db.AddQuery("select id, pos, ddl, time_updated, schemax from _vt.schema_version where id > 1 order by id asc", &sqltypes.Result{
240+
Fields: fields,
241+
Rows: [][]sqltypes.Value{
242+
{sqltypes.NewInt32(2), sqltypes.NewVarBinary(gtid2), sqltypes.NewVarBinary(ddl2), sqltypes.NewInt32(int32(ts2.Unix())), sqltypes.NewVarBinary(blob2)},
243+
},
244+
})
245+
require.Nil(t, se.RegisterVersionEvent())
246+
exp2 := `name:"t1" fields:{name:"id1" type:INT32 table:"t1"} fields:{name:"id2" type:VARBINARY table:"t1"} p_k_columns:0`
247+
tab, err := se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid2)
248+
require.NoError(t, err)
249+
require.Equal(t, exp2, fmt.Sprintf("%v", tab))
250+
require.Equal(t, 1, len(se.historian.schemas))
251+
}

go/vt/vttablet/tabletserver/schema/main_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"vitess.io/vitess/go/sqltypes"
2828
)
2929

30-
func getTestSchemaEngine(t *testing.T) (*Engine, *fakesqldb.DB, func()) {
30+
func getTestSchemaEngine(t *testing.T, schemaMaxAgeSeconds int64) (*Engine, *fakesqldb.DB, func()) {
3131
db := fakesqldb.New(t)
3232
db.AddQuery("select unix_timestamp()", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
3333
"t",
@@ -37,7 +37,7 @@ func getTestSchemaEngine(t *testing.T) (*Engine, *fakesqldb.DB, func()) {
3737
db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{})
3838
db.AddQuery(mysql.BaseShowPrimary, &sqltypes.Result{})
3939
AddFakeInnoDBReadRowsResult(db, 1)
40-
se := newEngine(10, 10*time.Second, 10*time.Second, db)
40+
se := newEngine(10, 10*time.Second, 10*time.Second, schemaMaxAgeSeconds, db)
4141
require.NoError(t, se.Open())
4242
cancel := func() {
4343
defer db.Close()

go/vt/vttablet/tabletserver/schema/tracker_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030

3131
func TestTracker(t *testing.T) {
3232
initialSchemaInserted := false
33-
se, db, cancel := getTestSchemaEngine(t)
33+
se, db, cancel := getTestSchemaEngine(t, 0)
3434
defer cancel()
3535
gtid1 := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10"
3636
ddl1 := "create table tracker_test (id int)"
@@ -91,7 +91,7 @@ func TestTracker(t *testing.T) {
9191

9292
func TestTrackerShouldNotInsertInitialSchema(t *testing.T) {
9393
initialSchemaInserted := false
94-
se, db, cancel := getTestSchemaEngine(t)
94+
se, db, cancel := getTestSchemaEngine(t, 0)
9595
gtid1 := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10"
9696

9797
defer cancel()

go/vt/vttablet/tabletserver/tabletenv/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
133133
fs.BoolVar(&currentConfig.AnnotateQueries, "queryserver-config-annotate-queries", defaultConfig.AnnotateQueries, "prefix queries to MySQL backend with comment indicating vtgate principal (user) and target tablet type")
134134
fs.BoolVar(&currentConfig.WatchReplication, "watch_replication_stream", false, "When enabled, vttablet will stream the MySQL replication stream from the local server, and use it to update schema when it sees a DDL.")
135135
fs.BoolVar(&currentConfig.TrackSchemaVersions, "track_schema_versions", false, "When enabled, vttablet will store versions of schemas at each position that a DDL is applied and allow retrieval of the schema corresponding to a position")
136+
fs.Int64Var(&currentConfig.SchemaVersionMaxAgeSeconds, "schema-version-max-age-seconds", 0, "max age of schema version records to kept in memory by the vreplication historian")
136137
fs.BoolVar(&currentConfig.TwoPCEnable, "twopc_enable", defaultConfig.TwoPCEnable, "if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.")
137138
fs.StringVar(&currentConfig.TwoPCCoordinatorAddress, "twopc_coordinator_address", defaultConfig.TwoPCCoordinatorAddress, "address of the (VTGate) process(es) that will be used to notify of abandoned transactions.")
138139
SecondsVar(fs, &currentConfig.TwoPCAbandonAge, "twopc_abandon_age", defaultConfig.TwoPCAbandonAge, "time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.")
@@ -289,6 +290,7 @@ type TabletConfig struct {
289290
SignalSchemaChangeReloadIntervalSeconds Seconds `json:"signalSchemaChangeReloadIntervalSeconds,omitempty"`
290291
WatchReplication bool `json:"watchReplication,omitempty"`
291292
TrackSchemaVersions bool `json:"trackSchemaVersions,omitempty"`
293+
SchemaVersionMaxAgeSeconds int64 `json:"schemaVersionMaxAgeSeconds,omitempty"`
292294
TerseErrors bool `json:"terseErrors,omitempty"`
293295
AnnotateQueries bool `json:"annotateQueries,omitempty"`
294296
MessagePostponeParallelism int `json:"messagePostponeParallelism,omitempty"`

0 commit comments

Comments
 (0)