Skip to content

Commit 6c361b9

Browse files
committed
sql: force PCR reader catalogs to use a fixed timestamp
Previously, the original logic for advancing the PCR reader catalog timestamp ran into the risk of mixing different timestamps, since the lease manager relies on range feeds. So, it was possible for a single txn to observe some newer and older descriptors. To address this, this patch does the following: 1) It introduces an assertion in the leased descriptors to ensure that all external row data tables have the same timestamp. 2) The connection executor is modified for PCR reader catalog to always use AOST queries, where the AOST timestamp is determine based on the range feeds check point. This ensures that all data up to the timestamp synced up. We detect PCR reader catalog by checking the the ReplicatedPCRVersion on the systme database, which will be set during bootstrap. Fixes: #130812 Release note: None
1 parent a50d588 commit 6c361b9

File tree

7 files changed

+351
-19
lines changed

7 files changed

+351
-19
lines changed

pkg/sql/catalog/descs/leased_descriptors.go

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
2121
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
2222
"github.com/cockroachdb/cockroach/pkg/util/hlc"
23+
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
2324
"github.com/cockroachdb/cockroach/pkg/util/log"
2425
"github.com/cockroachdb/errors"
2526
)
@@ -79,6 +80,93 @@ type leasedDescriptors struct {
7980
cache nstree.NameMap
8081
}
8182

83+
// mismatchedExternalDataRowTimestamp is generated when the external row data timestamps
84+
// within a descriptor do not match.
85+
type mismatchedExternalDataRowTimestamp struct {
86+
newDescName string
87+
newDescID descpb.ID
88+
newDescTS hlc.Timestamp
89+
existingDescName string
90+
existingDescID descpb.ID
91+
existingDescTS hlc.Timestamp
92+
}
93+
94+
func newMismatchedExternalDataRowTimestampError(
95+
newDesc catalog.TableDescriptor, existingDesc catalog.TableDescriptor,
96+
) *mismatchedExternalDataRowTimestamp {
97+
return &mismatchedExternalDataRowTimestamp{
98+
newDescName: newDesc.GetName(),
99+
newDescID: newDesc.GetID(),
100+
newDescTS: newDesc.ExternalRowData().AsOf,
101+
existingDescName: existingDesc.GetName(),
102+
existingDescID: existingDesc.GetID(),
103+
existingDescTS: existingDesc.ExternalRowData().AsOf,
104+
}
105+
}
106+
107+
func (e *mismatchedExternalDataRowTimestamp) SafeFormatError(p errors.Printer) (next error) {
108+
p.Printf("PCR reader timestamp has moved forward, "+
109+
"existing descriptor %s(%d) and timestamp: %s "+
110+
"new descritpor %s(%d) and timestamp: %s",
111+
e.newDescName,
112+
e.newDescID,
113+
e.newDescTS,
114+
e.existingDescName,
115+
e.existingDescID,
116+
e.existingDescTS)
117+
return nil
118+
}
119+
120+
func (e *mismatchedExternalDataRowTimestamp) Error() string {
121+
return fmt.Sprint(errors.Formattable(e))
122+
}
123+
124+
var _ errors.SafeFormatter = (*mismatchedExternalDataRowTimestamp)(nil)
125+
126+
// maybeAssertExternalRowDataTS asserts if the descriptor references external
127+
// row data, then the timestamp across the entire collection *must* match.
128+
func (ld *leasedDescriptors) maybeAssertExternalRowDataTS(desc catalog.Descriptor) error {
129+
tableDesc, ok := desc.(catalog.TableDescriptor)
130+
if !ok {
131+
return nil
132+
}
133+
if tableDesc.ExternalRowData() == nil {
134+
return nil
135+
}
136+
currentTS := tableDesc.ExternalRowData().AsOf
137+
return ld.cache.IterateByID(func(entry catalog.NameEntry) error {
138+
// Skip databases / schemas.
139+
if entry.GetParentID() == descpb.InvalidID ||
140+
entry.GetParentSchemaID() == descpb.InvalidID ||
141+
entry.GetID() == tableDesc.GetID() {
142+
return nil
143+
}
144+
// Next get the underlying descriptor.
145+
otherDesc := entry.(lease.LeasedDescriptor).Underlying()
146+
if otherTableDesc, ok := otherDesc.(catalog.TableDescriptor); ok {
147+
// Skip conventional descriptors.
148+
if otherTableDesc.ExternalRowData() == nil {
149+
return nil
150+
}
151+
// Confirm the timestamps match the most recent descriptor.
152+
if !otherTableDesc.ExternalRowData().AsOf.Equal(currentTS) {
153+
// Normally the PCR catalog reader will run with AOST timestamps,
154+
// if during the setup of the connection executor we were able to
155+
// lease the system database descriptor and confirm that it is for
156+
// a PCR reader catalog. If we were not able to lease the system database
157+
// descriptor, then its possible no AOST timestamp is set. Otherwise,
158+
// this error should *never* happen.
159+
return newMismatchedExternalDataRowTimestampError(
160+
tableDesc,
161+
otherTableDesc)
162+
}
163+
// Otherwise, we expect all other timestamps to match as well.
164+
return iterutil.StopIteration()
165+
}
166+
return nil
167+
})
168+
}
169+
82170
// getLeasedDescriptorByName return a leased descriptor valid for the
83171
// transaction, acquiring one if necessary. Due to a bug in lease acquisition
84172
// for dropped descriptors, the descriptor may have to be read from the store,
@@ -99,7 +187,8 @@ func (ld *leasedDescriptors) getByName(
99187
log.Eventf(ctx, "found descriptor in collection for (%d, %d, '%s'): %d",
100188
parentID, parentSchemaID, name, cached.GetID())
101189
}
102-
return cached.(lease.LeasedDescriptor).Underlying(), false, nil
190+
desc = cached.(lease.LeasedDescriptor).Underlying()
191+
return desc, false, nil
103192
}
104193

105194
readTimestamp := txn.ReadTimestamp()
@@ -183,7 +272,12 @@ func (ld *leasedDescriptors) getResult(
183272
return nil, false, err
184273
}
185274
}
186-
return ldesc.Underlying(), false, nil
275+
276+
desc := ldesc.Underlying()
277+
if err = ld.maybeAssertExternalRowDataTS(desc); err != nil {
278+
return nil, false, err
279+
}
280+
return desc, false, nil
187281
}
188282

189283
func (ld *leasedDescriptors) maybeUpdateDeadline(

pkg/sql/catalog/lease/lease.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,10 @@ type Manager struct {
889889
rangeFeed *rangefeed.RangeFeed
890890
}
891891

892+
// closeTimeStamp for the range feed, which is the timestamp
893+
// that we have all the updates for.
894+
closeTimestamp atomic.Value
895+
892896
draining atomic.Value
893897

894898
// names is a cache for name -> id mappings. A mapping for the cache
@@ -996,6 +1000,12 @@ func NewLeaseManager(
9961000
lm.storage.writer = newKVWriter(codec, db.KV(), keys.LeaseTableID, settingsWatcher, lm)
9971001
lm.stopper.AddCloser(lm.sem.Closer("stopper"))
9981002
lm.mu.descriptors = make(map[descpb.ID]*descriptorState)
1003+
// We are going to start the range feed later when RefreshLeases
1004+
// is invoked inside pre-start. So, that guarantees all range feed events
1005+
// that will be generated will be after the current time. So, historical
1006+
// queries with in this tenant (i.e. PCR catalog reader) before this point are
1007+
// guaranteed to be up to date.
1008+
lm.closeTimestamp.Store(db.KV().Clock().Now())
9991009
lm.draining.Store(false)
10001010
lm.descUpdateCh = make(chan catalog.Descriptor)
10011011
lm.descDelCh = make(chan descpb.ID)
@@ -1438,6 +1448,12 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB)
14381448
})
14391449
}
14401450

1451+
// GetSafeReplicationTS gets the timestamp till which the leased descriptors
1452+
// have been synced.
1453+
func (m *Manager) GetSafeReplicationTS() hlc.Timestamp {
1454+
return m.closeTimestamp.Load().(hlc.Timestamp)
1455+
}
1456+
14411457
// watchForUpdates will watch a rangefeed on the system.descriptor table for
14421458
// updates.
14431459
func (m *Manager) watchForUpdates(ctx context.Context) {
@@ -1496,6 +1512,7 @@ func (m *Manager) watchForUpdates(ctx context.Context) {
14961512
return
14971513
}
14981514
m.mu.rangeFeedCheckpoints += 1
1515+
m.closeTimestamp.Store(checkpoint.ResolvedTS)
14991516
}
15001517

15011518
// If we already started a range feed terminate it first

pkg/sql/catalog/replication/BUILD.bazel

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,17 @@ go_test(
3535
"//pkg/security/securitytest",
3636
"//pkg/server",
3737
"//pkg/sql",
38+
"//pkg/sql/catalog/lease",
39+
"//pkg/testutils",
3840
"//pkg/testutils/serverutils",
41+
"//pkg/testutils/skip",
3942
"//pkg/testutils/sqlutils",
4043
"//pkg/testutils/testcluster",
44+
"//pkg/util/ctxgroup",
45+
"//pkg/util/hlc",
46+
"//pkg/util/leaktest",
4147
"//pkg/util/randutil",
48+
"@com_github_cockroachdb_errors//:errors",
4249
"@com_github_stretchr_testify//require",
4350
],
4451
)

0 commit comments

Comments
 (0)