Skip to content

Commit 995ee1d

Browse files
craig[bot]jbowens
craig[bot]
andcommitted
Merge #146374
146374: storage/disk: improve logging of device IDs r=RaduBerinde a=jbowens Log a warning periodically if a disk that should be monitored is not being monitored because it's not found in /proc/diskstats. Additionally, on start up, print a store's monitored device ID. Close #146321. Epic: none Release note: none Co-authored-by: Jackson Owens <[email protected]>
2 parents 90346ab + ea58b01 commit 995ee1d

10 files changed

+101
-41
lines changed

pkg/server/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,7 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
820820
if err != nil {
821821
return Engines{}, errors.Wrap(err, "creating disk monitor")
822822
}
823+
detail(redact.Sprintf("store %d: disk deviceID: %s", i, monitor.DeviceID()))
823824

824825
statsCollector, err := cfg.DiskWriteStats.GetOrCreateCollector(spec.Path)
825826
if err != nil {

pkg/storage/disk/BUILD.bazel

+11-1
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ go_library(
1616
visibility = ["//visibility:public"],
1717
deps = [
1818
"//pkg/util/envutil",
19+
"//pkg/util/log",
1920
"//pkg/util/syncutil",
2021
"//pkg/util/timeutil",
2122
"@com_github_cockroachdb_errors//:errors",
2223
"@com_github_cockroachdb_pebble//vfs",
24+
"@com_github_cockroachdb_redact//:redact",
2325
] + select({
2426
"@io_bazel_rules_go//go/platform:android": [
2527
"//pkg/util/sysutil",
@@ -57,5 +59,13 @@ go_test(
5759
"@com_github_cockroachdb_errors//:errors",
5860
"@com_github_cockroachdb_pebble//vfs",
5961
"@com_github_stretchr_testify//require",
60-
],
62+
] + select({
63+
"@io_bazel_rules_go//go/platform:android": [
64+
"//pkg/util/timeutil",
65+
],
66+
"@io_bazel_rules_go//go/platform:linux": [
67+
"//pkg/util/timeutil",
68+
],
69+
"//conditions:default": [],
70+
}),
6171
)

pkg/storage/disk/linux_parse.go

+24-21
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ import (
4444
// 12 I/Os currently in progress
4545
// 13 time spent doing I/Os (ms)
4646
// 14 weighted time spent doing I/Os (ms)
47-
func parseDiskStats(contents []byte, disks []*monitoredDisk, measuredAt time.Time) error {
47+
func parseDiskStats(
48+
contents []byte, disks []*monitoredDisk, measuredAt time.Time,
49+
) (countCollected int, err error) {
4850
for lineNum := 0; len(contents) > 0; lineNum++ {
4951
lineBytes, rest := splitLine(contents)
5052
line := unsafe.String(&lineBytes[0], len(lineBytes))
@@ -54,13 +56,13 @@ func parseDiskStats(contents []byte, disks []*monitoredDisk, measuredAt time.Tim
5456

5557
var deviceID DeviceID
5658
if devMajor, rest, err := mustParseUint(line, 32, "deviceID.major"); err != nil {
57-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
59+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
5860
} else {
5961
line = rest
6062
deviceID.major = uint32(devMajor)
6163
}
6264
if devMinor, rest, err := mustParseUint(line, 32, "deviceID.minor"); err != nil {
63-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
65+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
6466
} else {
6567
line = rest
6668
deviceID.minor = uint32(devMinor)
@@ -78,78 +80,79 @@ func parseDiskStats(contents []byte, disks []*monitoredDisk, measuredAt time.Tim
7880
var err error
7981
stats.DeviceName, line = splitFieldDelim(line)
8082
if stats.ReadsCount, line, err = mustParseUint(line, 64, "reads count"); err != nil {
81-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
83+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
8284
}
8385
if stats.ReadsMerged, line, err = mustParseUint(line, 64, "reads merged"); err != nil {
84-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
86+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
8587
}
8688
if stats.ReadsSectors, line, err = mustParseUint(line, 64, "reads sectors"); err != nil {
87-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
89+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
8890
}
8991
if millis, rest, err := mustParseUint(line, 64, "reads duration"); err != nil {
90-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
92+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
9193
} else {
9294
line = rest
9395
stats.ReadsDuration = time.Duration(millis) * time.Millisecond
9496
}
9597
if stats.WritesCount, line, err = mustParseUint(line, 64, "writes count"); err != nil {
96-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
98+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
9799
}
98100
if stats.WritesMerged, line, err = mustParseUint(line, 64, "writes merged"); err != nil {
99-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
101+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
100102
}
101103
if stats.WritesSectors, line, err = mustParseUint(line, 64, "writes sectors"); err != nil {
102-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
104+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
103105
}
104106
if millis, rest, err := mustParseUint(line, 64, "writes duration"); err != nil {
105-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
107+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
106108
} else {
107109
line = rest
108110
stats.WritesDuration = time.Duration(millis) * time.Millisecond
109111
}
110112
if stats.InProgressCount, line, err = mustParseUint(line, 64, "inprogress iops"); err != nil {
111-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
113+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
112114
}
113115
if millis, rest, err := mustParseUint(line, 64, "time doing IO"); err != nil {
114-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
116+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
115117
} else {
116118
line = rest
117119
stats.CumulativeDuration = time.Duration(millis) * time.Millisecond
118120
}
119121
if millis, rest, err := mustParseUint(line, 64, "weighted IO duration"); err != nil {
120-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
122+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
121123
} else {
122124
line = rest
123125
stats.WeightedIODuration = time.Duration(millis) * time.Millisecond
124126
}
125127

126128
// The below fields are optional.
127129
if stats.DiscardsCount, _, line, err = tryParseUint(line, 64); err != nil {
128-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
130+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
129131
}
130132
if stats.DiscardsMerged, _, line, err = tryParseUint(line, 64); err != nil {
131-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
133+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
132134
}
133135
if stats.DiscardsSectors, _, line, err = tryParseUint(line, 64); err != nil {
134-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
136+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
135137
}
136138
if millis, ok, rest, err := tryParseUint(line, 64); err != nil {
137-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
139+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
138140
} else if ok {
139141
line = rest
140142
stats.DiscardsDuration = time.Duration(millis) * time.Millisecond
141143
}
142144
if stats.FlushesCount, _, line, err = tryParseUint(line, 64); err != nil {
143-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
145+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
144146
}
145147
if millis, ok, _, err := tryParseUint(line, 64); err != nil {
146-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
148+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
147149
} else if ok {
148150
stats.FlushesDuration = time.Duration(millis) * time.Millisecond
149151
}
150152
disks[diskIdx].recordStats(measuredAt, stats)
153+
countCollected++
151154
}
152-
return nil
155+
return countCollected, nil
153156
}
154157

155158
func splitLine(b []byte) (line, rest []byte) {

pkg/storage/disk/linux_parse_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2020
"github.com/cockroachdb/cockroach/pkg/util/log"
21+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2122
"github.com/cockroachdb/datadriven"
2223
"github.com/cockroachdb/pebble/vfs"
2324
"github.com/stretchr/testify/require"
@@ -59,7 +60,7 @@ func TestLinux_CollectDiskStats(t *testing.T) {
5960
// resizing logic.
6061
buf: make([]byte, 16),
6162
}
62-
err := s.collect(disks)
63+
_, err := s.collect(disks, timeutil.Now())
6364
if err != nil {
6465
return err.Error()
6566
}

pkg/storage/disk/monitor.go

+34-4
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,17 @@
66
package disk
77

88
import (
9-
"fmt"
9+
"context"
1010
"slices"
1111
"time"
1212

1313
"github.com/cockroachdb/cockroach/pkg/util/envutil"
14+
"github.com/cockroachdb/cockroach/pkg/util/log"
1415
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
1516
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1617
"github.com/cockroachdb/errors"
1718
"github.com/cockroachdb/pebble/vfs"
19+
"github.com/cockroachdb/redact"
1820
)
1921

2022
var DefaultDiskStatsPollingInterval = envutil.EnvOrDefaultDuration("COCKROACH_DISK_STATS_POLLING_INTERVAL", 100*time.Millisecond)
@@ -28,7 +30,12 @@ type DeviceID struct {
2830

2931
// String returns the string representation of the device ID.
3032
func (d DeviceID) String() string {
31-
return fmt.Sprintf("%d:%d", d.major, d.minor)
33+
return redact.StringWithoutMarkers(d)
34+
}
35+
36+
// SafeFormat implements redact.SafeFormatter.
37+
func (d DeviceID) SafeFormat(w redact.SafePrinter, _ rune) {
38+
w.Printf("%d:%d", d.major, d.minor)
3239
}
3340

3441
// MonitorManager provides observability into a pool of disks by sampling disk stats
@@ -128,7 +135,7 @@ func (m *MonitorManager) unrefDisk(disk *monitoredDisk) {
128135
}
129136

130137
type statsCollector interface {
131-
collect(disks []*monitoredDisk) error
138+
collect(disks []*monitoredDisk, now time.Time) (countCollected int, err error)
132139
}
133140

134141
// monitorDisks runs a loop collecting disk stats for all monitored disks.
@@ -137,9 +144,13 @@ type statsCollector interface {
137144
// race where the MonitorManager creates a new stop channel after unrefDisk sends a message
138145
// across the old stop channel.
139146
func (m *MonitorManager) monitorDisks(collector statsCollector, stop chan struct{}) {
147+
// TODO(jackson): Should we propagate a context here to replace the stop
148+
// channel?
149+
ctx := context.TODO()
140150
ticker := time.NewTicker(DefaultDiskStatsPollingInterval)
141151
defer ticker.Stop()
142152

153+
every := log.Every(5 * time.Minute)
143154
for {
144155
select {
145156
case <-stop:
@@ -150,14 +161,28 @@ func (m *MonitorManager) monitorDisks(collector statsCollector, stop chan struct
150161
disks := m.mu.disks
151162
m.mu.Unlock()
152163

153-
if err := collector.collect(disks); err != nil {
164+
now := timeutil.Now()
165+
countCollected, err := collector.collect(disks, now)
166+
if err != nil {
154167
for i := range disks {
155168
disks[i].tracer.RecordEvent(traceEvent{
156169
time: timeutil.Now(),
157170
stats: Stats{},
158171
err: err,
159172
})
160173
}
174+
} else if countCollected != len(disks) && every.ShouldLog() {
175+
// Log a warning if we collected fewer disk stats than expected.
176+
log.Warningf(ctx, "collected %d disk stats, expected %d", countCollected, len(disks))
177+
cutoff := now.Add(-10 * time.Second)
178+
for i := range disks {
179+
if lastEventTime := disks[i].tracer.LastEventTime(); lastEventTime.IsZero() {
180+
log.Warningf(ctx, "disk %s has not recorded any stats", disks[i].deviceID)
181+
} else if lastEventTime.Before(cutoff) {
182+
log.Warningf(ctx, "disk %s has not recorded any stats since %s",
183+
disks[i].deviceID, lastEventTime)
184+
}
185+
}
161186
}
162187
}
163188
}
@@ -227,6 +252,11 @@ type Monitor struct {
227252
}
228253
}
229254

255+
// DeviceID returns the device ID of the disk being monitored.
256+
func (m *Monitor) DeviceID() DeviceID {
257+
return m.deviceID
258+
}
259+
230260
// CumulativeStats returns the most-recent stats observed.
231261
func (m *Monitor) CumulativeStats() (Stats, error) {
232262
if event := m.tracer.Latest(); event.err != nil {

pkg/storage/disk/monitor_test.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ import (
1717
)
1818

1919
type spyCollector struct {
20-
collectCount int
20+
collectCallCount int
2121
}
2222

23-
func (s *spyCollector) collect(disks []*monitoredDisk) error {
24-
s.collectCount++
25-
return nil
23+
func (s *spyCollector) collect(
24+
disks []*monitoredDisk, now time.Time,
25+
) (countCollected int, err error) {
26+
s.collectCallCount++
27+
return len(disks), nil
2628
}
2729

2830
func TestMonitorManager_monitorDisks(t *testing.T) {
@@ -45,7 +47,7 @@ func TestMonitorManager_monitorDisks(t *testing.T) {
4547

4648
time.Sleep(2 * DefaultDiskStatsPollingInterval)
4749
stop <- struct{}{}
48-
require.Greater(t, testCollector.collectCount, 0)
50+
require.Greater(t, testCollector.collectCallCount, 0)
4951
}
5052

5153
func TestMonitor_StatsWindow(t *testing.T) {

pkg/storage/disk/monitor_tracer.go

+10
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,16 @@ func (m *monitorTracer) RecordEvent(event traceEvent) {
8686
}
8787
}
8888

89+
// LastEventTime returns the time of the last traceEvent that was queued.
90+
func (m *monitorTracer) LastEventTime() time.Time {
91+
m.mu.Lock()
92+
defer m.mu.Unlock()
93+
if m.sizeLocked() == 0 {
94+
return time.Time{}
95+
}
96+
return m.mu.trace[(m.mu.end-1)%m.capacity].time
97+
}
98+
8999
// Latest retrieves the last traceEvent that was queued. Returns a zero-valued
90100
// traceEvent if none exists.
91101
func (m *monitorTracer) Latest() traceEvent {

pkg/storage/disk/platform_darwin.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package disk
99

1010
import (
1111
"io/fs"
12+
"time"
1213

1314
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
1415
"github.com/cockroachdb/pebble/vfs"
@@ -17,8 +18,8 @@ import (
1718

1819
type darwinCollector struct{}
1920

20-
func (darwinCollector) collect([]*monitoredDisk) error {
21-
return nil
21+
func (darwinCollector) collect(disks []*monitoredDisk, time time.Time) (int, error) {
22+
return len(disks), nil
2223
}
2324

2425
func newStatsCollector(fs vfs.FS) (*darwinCollector, error) {

pkg/storage/disk/platform_default.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ package disk
99

1010
import (
1111
"io/fs"
12+
"time"
1213

1314
"github.com/cockroachdb/pebble/vfs"
1415
)
1516

1617
type defaultCollector struct{}
1718

18-
func (defaultCollector) collect([]*monitoredDisk) error {
19-
return nil
19+
func (defaultCollector) collect(disks []*monitoredDisk, time time.Time) (int, error) {
20+
return len(disks), nil
2021
}
2122

2223
func newStatsCollector(fs vfs.FS) (*defaultCollector, error) {

pkg/storage/disk/platform_linux.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ package disk
1010
import (
1111
"io"
1212
"io/fs"
13+
"time"
1314

1415
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
15-
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1616
"github.com/cockroachdb/errors"
1717
"github.com/cockroachdb/pebble/vfs"
1818
"golang.org/x/sys/unix"
@@ -26,15 +26,16 @@ type linuxStatsCollector struct {
2626
}
2727

2828
// collect collects disk stats for the identified devices.
29-
func (s *linuxStatsCollector) collect(disks []*monitoredDisk) error {
29+
func (s *linuxStatsCollector) collect(
30+
disks []*monitoredDisk, now time.Time,
31+
) (countCollected int, err error) {
3032
var n int
31-
var err error
3233
for {
3334
n, err = s.File.ReadAt(s.buf, 0)
3435
if errors.Is(err, io.EOF) {
3536
break
3637
} else if err != nil {
37-
return err
38+
return 0, err
3839
}
3940
// err == nil
4041
//
@@ -48,7 +49,7 @@ func (s *linuxStatsCollector) collect(disks []*monitoredDisk) error {
4849
// single read. Reallocate (doubling) the buffer and continue.
4950
s.buf = make([]byte, len(s.buf)*2)
5051
}
51-
return parseDiskStats(s.buf[:n], disks, timeutil.Now())
52+
return parseDiskStats(s.buf[:n], disks, now)
5253
}
5354

5455
func newStatsCollector(fs vfs.FS) (*linuxStatsCollector, error) {

0 commit comments

Comments
 (0)