Skip to content

Commit 70010fb

Browse files
craig[bot]fqazi
andcommitted
Merge #158087
158087: catalog/lease: add support for bulk leasing r=fqazi a=fqazi This patch does the following to help support bulk leasing in the crdb_internal / pg_catalog / information_schema views: 1. Refactor the storage layer for the lease manager to support bulk acquisitions by introducing a new `acquireBatch` API, which allows the lease manager to acquire multiple leases in a transaction. This also is used to implement `acquire` 2. Add a new `EnsureBatch` public method in the lease manager, which allows the lease manager to acquire multiple leases into memory, which acts as a prefetch mechanism. If a normal acquisition occurs concurrently it will correctly wait for the bulk acquisition to complete. 3. Modification to descriptor collection API to use EnsureBatch in the GetAll.* API. New benchmarks for these change and leased descriptors vs KV in GetAll.*: ``` Cold ====================== BenchmarkLargeDatabaseColdPgClass/pg_class/leased_descriptors=true/use_prefetch=true/tables=8192 1000000000 0.7835 ns/op 0 B/op 0 allocs/op BenchmarkLargeDatabaseColdPgClass/pg_class/leased_descriptors=true/use_prefetch=false/tables=8192 1 8642132499 ns/op 1731154008 B/op 12307123 allocs/op BenchmarkLargeDatabaseColdPgClass/pg_class/leased_descriptors=false/use_prefetch=false/tables=8192 1000000000 0.3063 ns/op 0 B/op 0 allocs/op Warm ====================== BenchmarkLargeDatabaseWarmPgClass/pg_class/leased_descriptors=true/use_prefetch=true/tables=8192 6 170890458 ns/op 90846753 B/op 428243 allocs/op BenchmarkLargeDatabaseWarmPgClass/pg_class/leased_descriptors=true/use_prefetch=false/tables=8192 7 192905720 ns/op 91194598 B/op 431132 allocs/op BenchmarkLargeDatabaseWarmPgClass/pg_class/leased_descriptors=false/use_prefetch=false/tables=8192 4 269645802 ns/op 243010064 B/op 1401622 allocs/op BenchmarkLargeDatabaseWarmPgClass/pg_class/leased_descriptors=true/use_prefetch=true/tables=16384 3 352242778 ns/op 182838792 B/op 847251 allocs/op BenchmarkLargeDatabaseWarmPgClass/pg_class/leased_descriptors=true/use_prefetch=false/tables=16384 3 399849806 ns/op 182671565 B/op 847271 allocs/op BenchmarkLargeDatabaseWarmPgClass/pg_class/leased_descriptors=false/use_prefetch=false/tables=16384 2 582772520 ns/op 488168660 B/op 2793079 allocs/op BenchmarkLargeDatabaseWarmPgClass/pg_class/leased_descriptors=true/use_prefetch=true/tables=32768 2 778396542 ns/op 365770676 B/op 1690881 allocs/op BenchmarkLargeDatabaseWarmPgClass/pg_class/leased_descriptors=true/use_prefetch=false/tables=32768 2 803655520 ns/op 364708036 B/op 1685706 allocs/op BenchmarkLargeDatabaseWarmPgClass/pg_class/leased_descriptors=false/use_prefetch=false/tables=32768 1 1300587666 ns/op 977227432 B/op 5585049 allocs/tp ``` Co-authored-by: Faizan Qazi <[email protected]>
2 parents 2c48935 + 0a9831e commit 70010fb

File tree

13 files changed

+502
-96
lines changed

13 files changed

+502
-96
lines changed

pkg/sql/catalog/descs/collection.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,13 @@ var allowLeasedDescriptorsInCatalogViews = settings.RegisterBoolSetting(
416416
settings.WithPublic,
417417
)
418418

419+
var prefetchLeasedDescriptorsInCatalogViews = settings.RegisterBoolSetting(
420+
settings.ApplicationLevel,
421+
"sql.catalog.allow_leased_descriptors.prefetch.enabled",
422+
"if true, catalog views (crdb_internal, information_schema, pg_catalog) can prefetch leased descriptors for improved performance",
423+
true,
424+
)
425+
419426
// GetCatalogGetAllOptions returns the functional options for GetAll* methods
420427
// based on the cluster setting for allowing leased descriptors in catalog views.
421428
func GetCatalogGetAllOptions(sv *settings.Values) []GetAllOption {
@@ -1058,6 +1065,11 @@ func (tc *Collection) GetAllObjectsInSchema(
10581065
if err != nil {
10591066
return nstree.Catalog{}, err
10601067
}
1068+
if options.allowLeased && prefetchLeasedDescriptorsInCatalogViews.Get(&tc.settings.SV) {
1069+
if err := tc.leased.ensureLeasesExist(ctx, stored.OrderedDescriptorIDs()); err != nil {
1070+
return nstree.Catalog{}, err
1071+
}
1072+
}
10611073
ret, err = tc.aggregateAllLayers(ctx, txn, options, stored, sc)
10621074
if err != nil {
10631075
return nstree.Catalog{}, err
@@ -1084,6 +1096,11 @@ func (tc *Collection) GetAllInDatabase(
10841096
if err != nil {
10851097
return nstree.Catalog{}, err
10861098
}
1099+
if options.allowLeased && prefetchLeasedDescriptorsInCatalogViews.Get(&tc.settings.SV) {
1100+
if err := tc.leased.ensureLeasesExist(ctx, stored.OrderedDescriptorIDs()); err != nil {
1101+
return nstree.Catalog{}, err
1102+
}
1103+
}
10871104
schemas, err := tc.GetAllSchemasInDatabase(ctx, txn, db, opts...)
10881105
if err != nil {
10891106
return nstree.Catalog{}, err
@@ -1131,6 +1148,11 @@ func (tc *Collection) GetAllTablesInDatabase(
11311148
if err != nil {
11321149
return nstree.Catalog{}, err
11331150
}
1151+
if options.allowLeased && prefetchLeasedDescriptorsInCatalogViews.Get(&tc.settings.SV) {
1152+
if err := tc.leased.ensureLeasesExist(ctx, stored.OrderedDescriptorIDs()); err != nil {
1153+
return nstree.Catalog{}, err
1154+
}
1155+
}
11341156
var ret nstree.MutableCatalog
11351157
if db.HasPublicSchemaWithDescriptor() {
11361158
ret, err = tc.aggregateAllLayers(ctx, txn, options, stored)

pkg/sql/catalog/descs/leased_descriptors.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type LeaseManager interface {
3333
ctx context.Context, timestamp lease.ReadTimestamp, id descpb.ID,
3434
) (lease.LeasedDescriptor, error)
3535

36+
EnsureBatch(ctx context.Context, ids []descpb.ID) error
37+
3638
IncGaugeAfterLeaseDuration(
3739
gaugeType lease.AfterLeaseDurationGauge,
3840
) (decrAfterWait func())
@@ -203,6 +205,13 @@ func (ld *leasedDescriptors) maybeInitReadTimestamp(ctx context.Context, txn dea
203205
ld.leaseTimestamp = ld.lm.GetReadTimestamp(ctx, readTimestamp)
204206
}
205207

208+
// ensureLeasesExist requests that the lease manager acquire leases for the
209+
// given IDs, if one isn't already exists. This is done in a bulk fashion leading
210+
// to fewer round trips inside the lease manager.
211+
func (ld *leasedDescriptors) ensureLeasesExist(ctx context.Context, ids []descpb.ID) error {
212+
return ld.lm.EnsureBatch(ctx, ids)
213+
}
214+
206215
// getLeasedDescriptorByName return a leased descriptor valid for the
207216
// transaction, acquiring one if necessary. Due to a bug in lease acquisition
208217
// for dropped descriptors, the descriptor may have to be read from the store,

pkg/sql/catalog/lease/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ go_test(
101101
"//pkg/kv/kvserver",
102102
"//pkg/kv/kvserver/kvserverbase",
103103
"//pkg/roachpb",
104+
"//pkg/rpc",
105+
"//pkg/rpc/nodedialer",
104106
"//pkg/security/securityassets",
105107
"//pkg/security/securitytest",
106108
"//pkg/server",

pkg/sql/catalog/lease/descriptor_state.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ type descriptorState struct {
5858
// acquisition finishes but indicate that that new lease is expired are not
5959
// ignored.
6060
acquisitionsInProgress int
61+
62+
// acquisitionChannel indicates that a bulk acquisition is in progress.
63+
acquisitionChannel chan struct{}
6164
}
6265
}
6366

pkg/sql/catalog/lease/ie_writer_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,13 @@ func (w *ieWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields)
6161
return nil
6262
}
6363

64+
func (w *ieWriter) insertLeases(ctx context.Context, txn *kv.Txn, leases []leaseFields) error {
65+
for _, l := range leases {
66+
if err := w.insertLease(ctx, txn, l); err != nil {
67+
return err
68+
}
69+
}
70+
return nil
71+
}
72+
6473
var _ writer = (*ieWriter)(nil)

pkg/sql/catalog/lease/kv_writer.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,23 +52,29 @@ func leaseTableWithID(id descpb.ID, table systemschema.SystemTable) catalog.Tabl
5252
return mut.ImmutableCopy().(catalog.TableDescriptor)
5353
}
5454

55-
func (w *kvWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
56-
if err := w.do(ctx, txn, l, func(b *kv.Batch) error {
57-
if l.sessionID != nil {
58-
err := w.sessionBasedWriter.Insert(ctx, b, false /*kvTrace*/, leaseAsSessionBasedDatum(l)...)
59-
if err != nil {
60-
return err
55+
func (w *kvWriter) insertLeases(ctx context.Context, txn *kv.Txn, leases []leaseFields) error {
56+
if err := w.do(ctx, txn, leases, func(b *kv.Batch) error {
57+
for _, l := range leases {
58+
if l.sessionID != nil {
59+
err := w.sessionBasedWriter.Insert(ctx, b, false /*kvTrace*/, leaseAsSessionBasedDatum(l)...)
60+
if err != nil {
61+
return err
62+
}
6163
}
6264
}
6365
return nil
6466
}); err != nil {
65-
return errors.Wrapf(err, "failed to insert lease %v", l)
67+
return errors.Wrapf(err, "failed to insert lease %v", leases)
6668
}
6769
return nil
6870
}
6971

72+
func (w *kvWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
73+
return w.insertLeases(ctx, txn, []leaseFields{l})
74+
}
75+
7076
func (w *kvWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
71-
if err := w.do(ctx, txn, l, func(b *kv.Batch) error {
77+
if err := w.do(ctx, txn, []leaseFields{l}, func(b *kv.Batch) error {
7278
if l.sessionID != nil {
7379
err := w.sessionBasedWriter.Delete(ctx, b, false /*kvTrace*/, leaseAsSessionBasedDatum(l)...)
7480
if err != nil {
@@ -85,7 +91,7 @@ func (w *kvWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields)
8591
type addToBatchFunc = func(*kv.Batch) error
8692

8793
func (w *kvWriter) do(
88-
ctx context.Context, txn *kv.Txn, lease leaseFields, addToBatch addToBatchFunc,
94+
ctx context.Context, txn *kv.Txn, lease []leaseFields, addToBatch addToBatchFunc,
8995
) error {
9096
run := (*kv.Txn).Run
9197
do := func(ctx context.Context, txn *kv.Txn) error {

pkg/sql/catalog/lease/kv_writer_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,4 +282,11 @@ func (t teeWriter) insertLease(ctx context.Context, txn *kv.Txn, fields leaseFie
282282
)
283283
}
284284

285+
func (t teeWriter) insertLeases(ctx context.Context, txn *kv.Txn, leases []leaseFields) error {
286+
return errors.CombineErrors(
287+
t.a.insertLeases(ctx, txn, leases),
288+
t.b.insertLeases(ctx, txn, leases),
289+
)
290+
}
291+
285292
var _ writer = (*teeWriter)(nil)

0 commit comments

Comments
 (0)