Skip to content

Commit 9ff1611

Browse files
committed
fix: call raft read barrier before describe schema
This adds call to raft read barrier to ensure schema consistency before getting schema from the host.
1 parent c2dc173 commit 9ff1611

File tree

6 files changed

+69
-14
lines changed

6 files changed

+69
-14
lines changed

pkg/service/one2onerestore/model.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@ type node struct {
4040

4141
// Host contains basic information about Scylla node.
4242
type Host struct {
43-
ID string
44-
DC string
45-
Addr string
46-
ShardCount int
43+
ID string
44+
DC string
45+
Addr string
46+
ShardCount int
47+
SafeDescribeMethod scyllaclient.SafeDescribeMethod
4748
}
4849

4950
// ViewType either Materialized View or Secondary Index.

pkg/service/one2onerestore/progress.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ import (
2020
// initProgress adds entries to RunTableProgress and RunViewProgress, so that API call
2121
// to show progress can return some information at this point.
2222
func (w *worker) initProgress(ctx context.Context, workload []hostWorkload) error {
23-
tablesToRestore := getTablesToRestore(workload)
24-
views, err := w.getViews(ctx, tablesToRestore)
23+
views, err := w.getViews(ctx, workload)
2524
if err != nil {
2625
return errors.Wrap(err, "get views")
2726
}

pkg/service/one2onerestore/service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ func (s *Service) newWorker(ctx context.Context, clusterID, taskID, runID uuid.U
122122

123123
client: client,
124124
clusterSession: clusterSession,
125+
sessionFunc: s.clusterSession,
125126

126127
logger: s.logger,
127128

pkg/service/one2onerestore/worker.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/scylladb/gocqlx/v2"
1616
"github.com/scylladb/scylla-manager/backupspec"
1717
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
18+
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
1819
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/ksfilter"
1920
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
2021
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
@@ -30,6 +31,8 @@ type worker struct {
3031
client *scyllaclient.Client
3132
clusterSession gocqlx.Session
3233

34+
sessionFunc cluster.SessionFunc // is needed to create cql session to single host
35+
3336
logger log.Logger
3437

3538
runInfo struct {
@@ -177,12 +180,40 @@ func (w *worker) prepareHostWorkload(ctx context.Context, manifests []*backupspe
177180
return errors.Wrap(err, "read manifest content")
178181
}
179182

183+
nodeInfo, err := w.client.NodeInfo(ctx, h.Addr)
184+
if err != nil {
185+
return errors.Wrapf(err, "get node %s info", h.Addr)
186+
}
187+
method, err := nodeInfo.SupportsSafeDescribeSchemaWithInternals()
188+
if err != nil {
189+
return errors.Wrapf(err, "node %s safe describe method", h.Addr)
190+
}
191+
hw.host.SafeDescribeMethod = method
192+
180193
result[i] = hw
181194

182195
return nil
183196
}, parallel.NopNotify)
184197
}
185198

199+
func (w *worker) singleHostCQLSession(ctx context.Context, clusterID uuid.UUID, host string) (gocqlx.Session, error) {
200+
session, err := w.sessionFunc(ctx, clusterID, cluster.SingleHostSessionConfigOption(host))
201+
if err != nil {
202+
return gocqlx.Session{}, errors.Wrap(err, "create cql session")
203+
}
204+
return session, nil
205+
}
206+
207+
func (w *worker) raftReadBarrier(ctx context.Context, session gocqlx.Session, host Host) error {
208+
switch host.SafeDescribeMethod {
209+
case scyllaclient.SafeDescribeMethodReadBarrierAPI:
210+
return w.client.RaftReadBarrier(ctx, host.Addr, "")
211+
case scyllaclient.SafeDescribeMethodReadBarrierCQL:
212+
return query.RaftReadBarrier(session)
213+
}
214+
return errors.Errorf("unsupported method: %s", host.SafeDescribeMethod)
215+
}
216+
186217
// alterSchemaRetryWrapper is useful when executing many statements altering schema,
187218
// as it might take more time for Scylla to process them one after another.
188219
// This wrapper exits on: success, context cancel, op returned non-timeout error or after maxTotalTime has passed.

pkg/service/one2onerestore/worker_tgc.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,15 @@ func (w *worker) setTombstoneGCModeRepair(ctx context.Context, workload []hostWo
3030
name string
3131
isView bool
3232
}
33-
tablesToRestore := getTablesToRestore(workload)
3433
var targets []alterTGCTarget
35-
for table := range tablesToRestore {
34+
for table := range getTablesToRestore(workload) {
3635
targets = append(targets, alterTGCTarget{
3736
keyspace: table.keyspace,
3837
name: table.table,
3938
isView: false,
4039
})
4140
}
42-
views, err := w.getViews(ctx, tablesToRestore)
41+
views, err := w.getViews(ctx, workload)
4342
if err != nil {
4443
return errors.Wrap(err, "get views")
4544
}

pkg/service/one2onerestore/worker_views.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (w *worker) dropViews(ctx context.Context, workload []hostWorkload) ([]View
2323
w.logger.Info(ctx, "Drop views", "took", timeutc.Since(start))
2424
}()
2525

26-
views, err := w.getViews(ctx, getTablesToRestore(workload))
26+
views, err := w.getViews(ctx, workload)
2727
if err != nil {
2828
return nil, errors.Wrap(err, "get views")
2929
}
@@ -69,17 +69,19 @@ func (w *worker) dropView(ctx context.Context, view View) error {
6969
return alterSchemaRetryWrapper(ctx, op, notify)
7070
}
7171

72-
func (w *worker) getViews(ctx context.Context, tablesToRestore map[scyllaTable]struct{}) ([]View, error) {
72+
func (w *worker) getViews(ctx context.Context, workload []hostWorkload) ([]View, error) {
7373
keyspaces, err := w.client.Keyspaces(ctx)
7474
if err != nil {
7575
return nil, errors.Wrap(err, "get keyspaces")
7676
}
7777

78-
describedViews, err := w.viewsSchemaByName()
78+
describedViews, err := w.viewsSchemaByName(ctx, workload)
7979
if err != nil {
8080
return nil, err
8181
}
8282

83+
tablesToRestore := getTablesToRestore(workload)
84+
8385
var views []View
8486
for _, ks := range keyspaces {
8587
meta, err := w.clusterSession.KeyspaceMetadata(ks)
@@ -233,8 +235,30 @@ func (w *worker) waitForViewBuilding(ctx context.Context, view View, pr *RunView
233235
return nil
234236
}
235237

236-
func (w *worker) viewsSchemaByName() (map[scyllaTable]string, error) {
237-
describedSchema, err := query.DescribeSchemaWithInternals(w.clusterSession)
238+
func (w *worker) viewsSchemaByName(ctx context.Context, workload []hostWorkload) (map[scyllaTable]string, error) {
239+
// In order to ensure schema consistency, we need to call raft read barrier on a host
240+
// from which we are going to read the schema.
241+
host := workload[0].host
242+
if host.SafeDescribeMethod != scyllaclient.SafeDescribeMethodReadBarrierAPI {
243+
// Let's try to find the host that supports raft read barrier api.
244+
for _, w := range workload {
245+
if w.host.SafeDescribeMethod == scyllaclient.SafeDescribeMethodReadBarrierAPI {
246+
host = w.host
247+
break
248+
}
249+
}
250+
}
251+
252+
hostSession, err := w.singleHostCQLSession(ctx, w.runInfo.ClusterID, host.Addr)
253+
if err != nil {
254+
return nil, errors.Wrap(err, "single host cql session")
255+
}
256+
257+
if err := w.raftReadBarrier(ctx, hostSession, host); err != nil {
258+
return nil, errors.Wrap(err, "raft read barrier")
259+
}
260+
261+
describedSchema, err := query.DescribeSchemaWithInternals(hostSession)
238262
if err != nil {
239263
return nil, errors.Wrap(err, "describe schema")
240264
}

0 commit comments

Comments
 (0)