Skip to content

Commit ea40e7e

Browse files
authored
Merge pull request #144823 from mgartner/backport24.3-144309
release-24.3: jobs: add jobs.avoid_full_scans_in_find_running_jobs.enabled
2 parents a29f43f + 0fd9f50 commit ea40e7e

File tree

6 files changed

+85
-43
lines changed

6 files changed

+85
-43
lines changed

pkg/jobs/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ go_library(
6868
"//pkg/util/log",
6969
"//pkg/util/log/eventpb",
7070
"//pkg/util/log/severity",
71+
"//pkg/util/metamorphic",
7172
"//pkg/util/metric",
7273
"//pkg/util/pprofutil",
7374
"//pkg/util/protoutil",

pkg/jobs/utils.go

+41-21
Original file line numberDiff line numberDiff line change
@@ -12,39 +12,56 @@ import (
1212

1313
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1414
"github.com/cockroachdb/cockroach/pkg/kv"
15+
"github.com/cockroachdb/cockroach/pkg/settings"
16+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1517
"github.com/cockroachdb/cockroach/pkg/sql/isql"
1618
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
19+
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
1720
"github.com/cockroachdb/errors"
1821
)
1922

23+
var testingAvoidFullScans = metamorphic.ConstantWithTestBool(
24+
"jobs.avoid_full_scans_in_find_running_jobs",
25+
false, /* defaultValue */
26+
)
27+
28+
var avoidFullScans = settings.RegisterBoolSetting(
29+
settings.ApplicationLevel,
30+
"jobs.avoid_full_scans_in_find_running_jobs.enabled",
31+
"when true, enables hints to avoid full scans for internal, jobs-related queries",
32+
testingAvoidFullScans)
33+
2034
// RunningJobExists checks that whether there are any job of the given types
2135
// in the pending, running, or paused status, optionally ignoring the job with
2236
// the ID specified by ignoreJobID as well as any jobs created after it, if
2337
// the passed ID is not InvalidJobID.
2438
func RunningJobExists(
25-
ctx context.Context, ignoreJobID jobspb.JobID, txn isql.Txn, jobTypes ...jobspb.Type,
39+
ctx context.Context,
40+
cs *cluster.Settings,
41+
ignoreJobID jobspb.JobID,
42+
txn isql.Txn,
43+
jobTypes ...jobspb.Type,
2644
) (exists bool, retErr error) {
2745
typeStrs, err := getJobTypeStrs(jobTypes)
2846
if err != nil {
2947
return false, err
3048
}
3149

32-
orderBy := " ORDER BY created"
50+
orderBy := "ORDER BY created"
3351
if ignoreJobID == jobspb.InvalidJobID {
3452
// There is no need to order by the created column if there is no job to
3553
// ignore.
3654
orderBy = ""
3755
}
3856

39-
stmt := `
40-
SELECT
41-
id
42-
FROM
43-
system.jobs@jobs_status_created_idx
44-
WHERE
45-
job_type IN ` + typeStrs + ` AND
46-
status IN ` + NonTerminalStatusTupleString + orderBy + `
47-
LIMIT 1`
57+
hint := "jobs_status_created_idx"
58+
if avoidFullScans.Get(&cs.SV) {
59+
hint = "{FORCE_INDEX=jobs_status_created_idx,AVOID_FULL_SCAN}"
60+
}
61+
62+
q := `SELECT id FROM system.jobs@%s WHERE job_type IN %s AND status IN %s %s LIMIT 1`
63+
stmt := fmt.Sprintf(q, hint, typeStrs, NonTerminalStatusTupleString, orderBy)
64+
4865
it, err := txn.QueryIterator(
4966
ctx,
5067
"find-running-jobs-of-type",
@@ -74,28 +91,31 @@ LIMIT 1`
7491
// by ignoreJobID as well as any jobs created after it, if the passed ID is not
7592
// InvalidJobID.
7693
func RunningJobs(
77-
ctx context.Context, ignoreJobID jobspb.JobID, txn isql.Txn, jobTypes ...jobspb.Type,
94+
ctx context.Context,
95+
cs *cluster.Settings,
96+
ignoreJobID jobspb.JobID,
97+
txn isql.Txn,
98+
jobTypes ...jobspb.Type,
7899
) (jobIDs []jobspb.JobID, retErr error) {
79100
typeStrs, err := getJobTypeStrs(jobTypes)
80101
if err != nil {
81102
return jobIDs, err
82103
}
83104

84-
orderBy := " ORDER BY created"
105+
orderBy := "ORDER BY created"
85106
if ignoreJobID == jobspb.InvalidJobID {
86107
// There is no need to order by the created column if there is no job to
87108
// ignore.
88109
orderBy = ""
89110
}
90111

91-
stmt := `
92-
SELECT
93-
id
94-
FROM
95-
system.jobs@jobs_status_created_idx
96-
WHERE
97-
job_type IN ` + typeStrs + ` AND
98-
status IN ` + NonTerminalStatusTupleString + orderBy
112+
hint := "jobs_status_created_idx"
113+
if avoidFullScans.Get(&cs.SV) {
114+
hint = "{FORCE_INDEX=jobs_status_created_idx,AVOID_FULL_SCAN}"
115+
}
116+
117+
q := `SELECT id FROM system.jobs@%s WHERE job_type IN %s AND status IN %s %s`
118+
stmt := fmt.Sprintf(q, hint, typeStrs, NonTerminalStatusTupleString, orderBy)
99119
it, err := txn.QueryIterator(
100120
ctx,
101121
"find-all-running-jobs-of-type",

pkg/spanconfig/spanconfigmanager/manager.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (m *Manager) run(ctx context.Context) {
128128
return
129129
}
130130

131-
started, err := m.createAndStartJobIfNoneExists(ctx)
131+
started, err := m.createAndStartJobIfNoneExists(ctx, m.settings)
132132
if err != nil {
133133
log.Errorf(ctx, "error starting auto span config reconciliation job: %v", err)
134134
}
@@ -162,7 +162,9 @@ func (m *Manager) run(ctx context.Context) {
162162
// createAndStartJobIfNoneExists creates span config reconciliation job iff it
163163
// hasn't been created already and notifies the jobs registry to adopt it.
164164
// Returns a boolean indicating if the job was created.
165-
func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, error) {
165+
func (m *Manager) createAndStartJobIfNoneExists(
166+
ctx context.Context, cs *cluster.Settings,
167+
) (bool, error) {
166168
if m.knobs.ManagerDisableJobCreation {
167169
return false, nil
168170
}
@@ -177,8 +179,9 @@ func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, erro
177179

178180
var job *jobs.Job
179181
if err := m.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
180-
exists, err := jobs.RunningJobExists(ctx, jobspb.InvalidJobID, txn,
181-
jobspb.TypeAutoSpanConfigReconciliation)
182+
exists, err := jobs.RunningJobExists(
183+
ctx, cs, jobspb.InvalidJobID, txn, jobspb.TypeAutoSpanConfigReconciliation,
184+
)
182185
if err != nil {
183186
return err
184187
}

pkg/spanconfig/spanconfigmanager/manager_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestManagerConcurrentJobCreation(t *testing.T) {
104104

105105
var g errgroup.Group
106106
g.Go(func() error {
107-
started, err := manager.TestingCreateAndStartJobIfNoneExists(ctx)
107+
started, err := manager.TestingCreateAndStartJobIfNoneExists(ctx, ts.ClusterSettings())
108108
if err != nil {
109109
return err
110110
}
@@ -117,7 +117,7 @@ func TestManagerConcurrentJobCreation(t *testing.T) {
117117
// Only try to start the job if the first goroutine has reached the testing
118118
// knob and is blocked.
119119
<-isBlocked
120-
started, err := manager.TestingCreateAndStartJobIfNoneExists(ctx)
120+
started, err := manager.TestingCreateAndStartJobIfNoneExists(ctx, ts.ClusterSettings())
121121
if err != nil {
122122
return err
123123
}
@@ -183,7 +183,7 @@ func TestManagerStartsJobIfFailed(t *testing.T) {
183183
)
184184
require.NoError(t, err)
185185

186-
started, err := manager.TestingCreateAndStartJobIfNoneExists(ctx)
186+
started, err := manager.TestingCreateAndStartJobIfNoneExists(ctx, ts.ClusterSettings())
187187
require.NoError(t, err)
188188
require.True(t, started)
189189
}
@@ -331,7 +331,7 @@ func TestReconciliationJobErrorAndRecovery(t *testing.T) {
331331
},
332332
)
333333

334-
started, err := manager.TestingCreateAndStartJobIfNoneExists(ctx)
334+
started, err := manager.TestingCreateAndStartJobIfNoneExists(ctx, ts.ClusterSettings())
335335
require.NoError(t, err)
336336
require.True(t, started)
337337

@@ -354,7 +354,7 @@ func TestReconciliationJobErrorAndRecovery(t *testing.T) {
354354
mu.err = nil
355355
mu.Unlock()
356356

357-
started, err = manager.TestingCreateAndStartJobIfNoneExists(ctx)
357+
started, err = manager.TestingCreateAndStartJobIfNoneExists(ctx, ts.ClusterSettings())
358358
require.NoError(t, err)
359359
require.True(t, started)
360360

@@ -421,7 +421,7 @@ func TestReconciliationUsesRightCheckpoint(t *testing.T) {
421421
nil,
422422
)
423423

424-
started, err := manager.TestingCreateAndStartJobIfNoneExists(ctx)
424+
started, err := manager.TestingCreateAndStartJobIfNoneExists(ctx, ts.ClusterSettings())
425425
require.NoError(t, err)
426426
require.True(t, started)
427427

pkg/spanconfig/spanconfigmanager/test_helpers.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,16 @@
55

66
package spanconfigmanager
77

8-
import "context"
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
12+
)
913

1014
// TestingCreateAndStartJobIfNoneExists is a wrapper around
1115
// createAndStartJobIfNoneExists for testing it.
12-
func (m *Manager) TestingCreateAndStartJobIfNoneExists(ctx context.Context) (bool, error) {
13-
return m.createAndStartJobIfNoneExists(ctx)
16+
func (m *Manager) TestingCreateAndStartJobIfNoneExists(
17+
ctx context.Context, cs *cluster.Settings,
18+
) (bool, error) {
19+
return m.createAndStartJobIfNoneExists(ctx, cs)
1420
}

pkg/sql/create_stats.go

+21-9
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,18 @@ func (n *createStatsNode) runJob(ctx context.Context) error {
158158
// (To handle race conditions we check this again after the job starts,
159159
// but this check is used to prevent creating a large number of jobs that
160160
// immediately fail).
161-
if err := checkRunningJobsInTxn(ctx, jobspb.InvalidJobID, txn); err != nil {
161+
if err := checkRunningJobsInTxn(
162+
ctx, n.p.EvalContext().Settings, jobspb.InvalidJobID, txn,
163+
); err != nil {
162164
return err
163165
}
164166
// Don't start auto partial stats jobs if there is another auto partial
165167
// stats job running on the same table.
166168
if n.Name == jobspb.AutoPartialStatsName {
167-
if err := checkRunningAutoPartialJobsInTxn(ctx, jobspb.InvalidJobID, txn, n.p.ExecCfg().JobRegistry, details.Table.ID); err != nil {
169+
if err := checkRunningAutoPartialJobsInTxn(
170+
ctx, n.p.EvalContext().Settings, jobspb.InvalidJobID, txn,
171+
n.p.ExecCfg().JobRegistry, details.Table.ID,
172+
); err != nil {
168173
return err
169174
}
170175
}
@@ -872,11 +877,15 @@ func checkRunningJobs(
872877
jobID = job.ID()
873878
}
874879
return p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) {
875-
if err = checkRunningJobsInTxn(ctx, jobID, txn); err != nil {
880+
if err = checkRunningJobsInTxn(
881+
ctx, p.ExtendedEvalContext().Settings, jobID, txn,
882+
); err != nil {
876883
return err
877884
}
878885
if autoPartial {
879-
return checkRunningAutoPartialJobsInTxn(ctx, jobID, txn, jobRegistry, tableID)
886+
return checkRunningAutoPartialJobsInTxn(
887+
ctx, p.ExtendedEvalContext().Settings, jobID, txn, jobRegistry, tableID,
888+
)
880889
}
881890
return nil
882891
})
@@ -887,9 +896,11 @@ func checkRunningJobs(
887896
// that started earlier than this one. If there are, checkRunningJobsInTxn
888897
// returns an error. If jobID is jobspb.InvalidJobID, checkRunningJobsInTxn just
889898
// checks if there are any pending, running, or paused CreateStats jobs.
890-
func checkRunningJobsInTxn(ctx context.Context, jobID jobspb.JobID, txn isql.Txn) error {
891-
exists, err := jobs.RunningJobExists(ctx, jobID, txn,
892-
jobspb.TypeCreateStats, jobspb.TypeAutoCreateStats,
899+
func checkRunningJobsInTxn(
900+
ctx context.Context, cs *cluster.Settings, jobID jobspb.JobID, txn isql.Txn,
901+
) error {
902+
exists, err := jobs.RunningJobExists(
903+
ctx, cs, jobID, txn, jobspb.TypeCreateStats, jobspb.TypeAutoCreateStats,
893904
)
894905
if err != nil {
895906
return err
@@ -910,13 +921,14 @@ func checkRunningJobsInTxn(ctx context.Context, jobID jobspb.JobID, txn isql.Txn
910921
// AutoCreatePartialStats jobs for the same table.
911922
func checkRunningAutoPartialJobsInTxn(
912923
ctx context.Context,
924+
cs *cluster.Settings,
913925
jobID jobspb.JobID,
914926
txn isql.Txn,
915927
jobRegistry *jobs.Registry,
916928
tableID descpb.ID,
917929
) error {
918-
autoPartialStatJobIDs, err := jobs.RunningJobs(ctx, jobID, txn,
919-
jobspb.TypeAutoCreatePartialStats,
930+
autoPartialStatJobIDs, err := jobs.RunningJobs(
931+
ctx, cs, jobID, txn, jobspb.TypeAutoCreatePartialStats,
920932
)
921933
if err != nil {
922934
return err

0 commit comments

Comments
 (0)