Skip to content

Commit 1639496

Browse files
committed
storage/disk: improve logging of device IDs
Log a warning periodically if a disk that should be monitored is not being monitored because it's not found in /proc/diskstats. Additionally, on start up, print a store's monitored device ID. Close #146321. Epic: none Release note: none
1 parent cc9bfd5 commit 1639496

10 files changed

+101
-41
lines changed

pkg/server/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,7 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
815815
if err != nil {
816816
return Engines{}, errors.Wrap(err, "creating disk monitor")
817817
}
818+
detail(redact.Sprintf("store %d: disk deviceID: %s", i, monitor.DeviceID()))
818819

819820
statsCollector, err := cfg.DiskWriteStats.GetOrCreateCollector(spec.Path)
820821
if err != nil {

pkg/storage/disk/BUILD.bazel

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ go_library(
1616
visibility = ["//visibility:public"],
1717
deps = [
1818
"//pkg/util/envutil",
19+
"//pkg/util/log",
1920
"//pkg/util/syncutil",
2021
"//pkg/util/timeutil",
2122
"@com_github_cockroachdb_errors//:errors",
2223
"@com_github_cockroachdb_pebble//vfs",
24+
"@com_github_cockroachdb_redact//:redact",
2325
] + select({
2426
"@io_bazel_rules_go//go/platform:android": [
2527
"//pkg/util/sysutil",
@@ -57,5 +59,13 @@ go_test(
5759
"@com_github_cockroachdb_errors//:errors",
5860
"@com_github_cockroachdb_pebble//vfs",
5961
"@com_github_stretchr_testify//require",
60-
],
62+
] + select({
63+
"@io_bazel_rules_go//go/platform:android": [
64+
"//pkg/util/timeutil",
65+
],
66+
"@io_bazel_rules_go//go/platform:linux": [
67+
"//pkg/util/timeutil",
68+
],
69+
"//conditions:default": [],
70+
}),
6171
)

pkg/storage/disk/linux_parse.go

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ import (
4545
// 12 I/Os currently in progress
4646
// 13 time spent doing I/Os (ms)
4747
// 14 weighted time spent doing I/Os (ms)
48-
func parseDiskStats(contents []byte, disks []*monitoredDisk, measuredAt time.Time) error {
48+
func parseDiskStats(
49+
contents []byte, disks []*monitoredDisk, measuredAt time.Time,
50+
) (countCollected int, err error) {
4951
for lineNum := 0; len(contents) > 0; lineNum++ {
5052
lineBytes, rest := splitLine(contents)
5153
line := unsafe.String(&lineBytes[0], len(lineBytes))
@@ -55,13 +57,13 @@ func parseDiskStats(contents []byte, disks []*monitoredDisk, measuredAt time.Tim
5557

5658
var deviceID DeviceID
5759
if devMajor, rest, err := mustParseUint(line, 32, "deviceID.major"); err != nil {
58-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
60+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
5961
} else {
6062
line = rest
6163
deviceID.major = uint32(devMajor)
6264
}
6365
if devMinor, rest, err := mustParseUint(line, 32, "deviceID.minor"); err != nil {
64-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
66+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
6567
} else {
6668
line = rest
6769
deviceID.minor = uint32(devMinor)
@@ -79,78 +81,79 @@ func parseDiskStats(contents []byte, disks []*monitoredDisk, measuredAt time.Tim
7981
var err error
8082
stats.DeviceName, line = splitFieldDelim(line)
8183
if stats.ReadsCount, line, err = mustParseUint(line, 64, "reads count"); err != nil {
82-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
84+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
8385
}
8486
if stats.ReadsMerged, line, err = mustParseUint(line, 64, "reads merged"); err != nil {
85-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
87+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
8688
}
8789
if stats.ReadsSectors, line, err = mustParseUint(line, 64, "reads sectors"); err != nil {
88-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
90+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
8991
}
9092
if millis, rest, err := mustParseUint(line, 64, "reads duration"); err != nil {
91-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
93+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
9294
} else {
9395
line = rest
9496
stats.ReadsDuration = time.Duration(millis) * time.Millisecond
9597
}
9698
if stats.WritesCount, line, err = mustParseUint(line, 64, "writes count"); err != nil {
97-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
99+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
98100
}
99101
if stats.WritesMerged, line, err = mustParseUint(line, 64, "writes merged"); err != nil {
100-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
102+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
101103
}
102104
if stats.WritesSectors, line, err = mustParseUint(line, 64, "writes sectors"); err != nil {
103-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
105+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
104106
}
105107
if millis, rest, err := mustParseUint(line, 64, "writes duration"); err != nil {
106-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
108+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
107109
} else {
108110
line = rest
109111
stats.WritesDuration = time.Duration(millis) * time.Millisecond
110112
}
111113
if stats.InProgressCount, line, err = mustParseUint(line, 64, "inprogress iops"); err != nil {
112-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
114+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
113115
}
114116
if millis, rest, err := mustParseUint(line, 64, "time doing IO"); err != nil {
115-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
117+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
116118
} else {
117119
line = rest
118120
stats.CumulativeDuration = time.Duration(millis) * time.Millisecond
119121
}
120122
if millis, rest, err := mustParseUint(line, 64, "weighted IO duration"); err != nil {
121-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
123+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
122124
} else {
123125
line = rest
124126
stats.WeightedIODuration = time.Duration(millis) * time.Millisecond
125127
}
126128

127129
// The below fields are optional.
128130
if stats.DiscardsCount, _, line, err = tryParseUint(line, 64); err != nil {
129-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
131+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
130132
}
131133
if stats.DiscardsMerged, _, line, err = tryParseUint(line, 64); err != nil {
132-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
134+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
133135
}
134136
if stats.DiscardsSectors, _, line, err = tryParseUint(line, 64); err != nil {
135-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
137+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
136138
}
137139
if millis, ok, rest, err := tryParseUint(line, 64); err != nil {
138-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
140+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
139141
} else if ok {
140142
line = rest
141143
stats.DiscardsDuration = time.Duration(millis) * time.Millisecond
142144
}
143145
if stats.FlushesCount, _, line, err = tryParseUint(line, 64); err != nil {
144-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
146+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
145147
}
146148
if millis, ok, _, err := tryParseUint(line, 64); err != nil {
147-
return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
149+
return 0, errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
148150
} else if ok {
149151
stats.FlushesDuration = time.Duration(millis) * time.Millisecond
150152
}
151153
disks[diskIdx].recordStats(measuredAt, stats)
154+
countCollected++
152155
}
153-
return nil
156+
return countCollected, nil
154157
}
155158

156159
func splitLine(b []byte) (line, rest []byte) {

pkg/storage/disk/linux_parse_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2121
"github.com/cockroachdb/cockroach/pkg/util/log"
22+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2223
"github.com/cockroachdb/datadriven"
2324
"github.com/cockroachdb/pebble/vfs"
2425
"github.com/stretchr/testify/require"
@@ -60,7 +61,7 @@ func TestLinux_CollectDiskStats(t *testing.T) {
6061
// resizing logic.
6162
buf: make([]byte, 16),
6263
}
63-
err := s.collect(disks)
64+
_, err := s.collect(disks, timeutil.Now())
6465
if err != nil {
6566
return err.Error()
6667
}

pkg/storage/disk/monitor.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,17 @@
66
package disk
77

88
import (
9-
"fmt"
9+
"context"
1010
"slices"
1111
"time"
1212

1313
"github.com/cockroachdb/cockroach/pkg/util/envutil"
14+
"github.com/cockroachdb/cockroach/pkg/util/log"
1415
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
1516
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1617
"github.com/cockroachdb/errors"
1718
"github.com/cockroachdb/pebble/vfs"
19+
"github.com/cockroachdb/redact"
1820
)
1921

2022
var DefaultDiskStatsPollingInterval = envutil.EnvOrDefaultDuration("COCKROACH_DISK_STATS_POLLING_INTERVAL", 100*time.Millisecond)
@@ -28,7 +30,12 @@ type DeviceID struct {
2830

2931
// String returns the string representation of the device ID.
3032
func (d DeviceID) String() string {
31-
return fmt.Sprintf("%d:%d", d.major, d.minor)
33+
return redact.StringWithoutMarkers(d)
34+
}
35+
36+
// SafeFormat implements redact.SafeFormatter.
37+
func (d DeviceID) SafeFormat(w redact.SafePrinter, _ rune) {
38+
w.Printf("%d:%d", d.major, d.minor)
3239
}
3340

3441
// MonitorManager provides observability into a pool of disks by sampling disk stats
@@ -128,7 +135,7 @@ func (m *MonitorManager) unrefDisk(disk *monitoredDisk) {
128135
}
129136

130137
type statsCollector interface {
131-
collect(disks []*monitoredDisk) error
138+
collect(disks []*monitoredDisk, now time.Time) (countCollected int, err error)
132139
}
133140

134141
// monitorDisks runs a loop collecting disk stats for all monitored disks.
@@ -137,9 +144,13 @@ type statsCollector interface {
137144
// race where the MonitorManager creates a new stop channel after unrefDisk sends a message
138145
// across the old stop channel.
139146
func (m *MonitorManager) monitorDisks(collector statsCollector, stop chan struct{}) {
147+
// TODO(jackson): Should we propagate a context here to replace the stop
148+
// channel?
149+
ctx := context.TODO()
140150
ticker := time.NewTicker(DefaultDiskStatsPollingInterval)
141151
defer ticker.Stop()
142152

153+
every := log.Every(5 * time.Minute)
143154
for {
144155
select {
145156
case <-stop:
@@ -150,14 +161,28 @@ func (m *MonitorManager) monitorDisks(collector statsCollector, stop chan struct
150161
disks := m.mu.disks
151162
m.mu.Unlock()
152163

153-
if err := collector.collect(disks); err != nil {
164+
now := timeutil.Now()
165+
countCollected, err := collector.collect(disks, now)
166+
if err != nil {
154167
for i := range disks {
155168
disks[i].tracer.RecordEvent(traceEvent{
156169
time: timeutil.Now(),
157170
stats: Stats{},
158171
err: err,
159172
})
160173
}
174+
} else if countCollected != len(disks) && every.ShouldLog() {
175+
// Log a warning if we collected fewer disk stats than expected.
176+
log.Warningf(ctx, "collected %d disk stats, expected %d", countCollected, len(disks))
177+
cutoff := now.Add(-10 * time.Second)
178+
for i := range disks {
179+
if lastEventTime := disks[i].tracer.LastEventTime(); lastEventTime.IsZero() {
180+
log.Warningf(ctx, "disk %s has not recorded any stats", disks[i].deviceID)
181+
} else if lastEventTime.Before(cutoff) {
182+
log.Warningf(ctx, "disk %s has not recorded any stats since %s",
183+
disks[i].deviceID, lastEventTime)
184+
}
185+
}
161186
}
162187
}
163188
}
@@ -227,6 +252,11 @@ type Monitor struct {
227252
}
228253
}
229254

255+
// DeviceID returns the device ID of the disk being monitored.
256+
func (m *Monitor) DeviceID() DeviceID {
257+
return m.deviceID
258+
}
259+
230260
// CumulativeStats returns the most-recent stats observed.
231261
func (m *Monitor) CumulativeStats() (Stats, error) {
232262
if event := m.tracer.Latest(); event.err != nil {

pkg/storage/disk/monitor_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ import (
1717
)
1818

1919
type spyCollector struct {
20-
collectCount int
20+
collectCallCount int
2121
}
2222

23-
func (s *spyCollector) collect(disks []*monitoredDisk) error {
24-
s.collectCount++
25-
return nil
23+
func (s *spyCollector) collect(
24+
disks []*monitoredDisk, now time.Time,
25+
) (countCollected int, err error) {
26+
s.collectCallCount++
27+
return len(disks), nil
2628
}
2729

2830
func TestMonitorManager_monitorDisks(t *testing.T) {
@@ -45,7 +47,7 @@ func TestMonitorManager_monitorDisks(t *testing.T) {
4547

4648
time.Sleep(2 * DefaultDiskStatsPollingInterval)
4749
stop <- struct{}{}
48-
require.Greater(t, testCollector.collectCount, 0)
50+
require.Greater(t, testCollector.collectCallCount, 0)
4951
}
5052

5153
func TestMonitor_StatsWindow(t *testing.T) {

pkg/storage/disk/monitor_tracer.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,16 @@ func (m *monitorTracer) RecordEvent(event traceEvent) {
8686
}
8787
}
8888

89+
// LastEventTime returns the time of the last traceEvent that was queued.
90+
func (m *monitorTracer) LastEventTime() time.Time {
91+
m.mu.Lock()
92+
defer m.mu.Unlock()
93+
if m.sizeLocked() == 0 {
94+
return time.Time{}
95+
}
96+
return m.mu.trace[(m.mu.end-1)%m.capacity].time
97+
}
98+
8999
// Latest retrieves the last traceEvent that was queued. Returns a zero-valued
90100
// traceEvent if none exists.
91101
func (m *monitorTracer) Latest() traceEvent {

pkg/storage/disk/platform_darwin.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package disk
1010

1111
import (
1212
"io/fs"
13+
"time"
1314

1415
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
1516
"github.com/cockroachdb/pebble/vfs"
@@ -18,8 +19,8 @@ import (
1819

1920
type darwinCollector struct{}
2021

21-
func (darwinCollector) collect([]*monitoredDisk) error {
22-
return nil
22+
func (darwinCollector) collect(disks []*monitoredDisk, time time.Time) (int, error) {
23+
return len(disks), nil
2324
}
2425

2526
func newStatsCollector(fs vfs.FS) (*darwinCollector, error) {

pkg/storage/disk/platform_default.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ package disk
1010

1111
import (
1212
"io/fs"
13+
"time"
1314

1415
"github.com/cockroachdb/pebble/vfs"
1516
)
1617

1718
type defaultCollector struct{}
1819

19-
func (defaultCollector) collect([]*monitoredDisk) error {
20-
return nil
20+
func (defaultCollector) collect(disks []*monitoredDisk, time time.Time) (int, error) {
21+
return len(disks), nil
2122
}
2223

2324
func newStatsCollector(fs vfs.FS) (*defaultCollector, error) {

pkg/storage/disk/platform_linux.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ package disk
1111
import (
1212
"io"
1313
"io/fs"
14+
"time"
1415

1516
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
16-
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1717
"github.com/cockroachdb/errors"
1818
"github.com/cockroachdb/pebble/vfs"
1919
"golang.org/x/sys/unix"
@@ -27,15 +27,16 @@ type linuxStatsCollector struct {
2727
}
2828

2929
// collect collects disk stats for the identified devices.
30-
func (s *linuxStatsCollector) collect(disks []*monitoredDisk) error {
30+
func (s *linuxStatsCollector) collect(
31+
disks []*monitoredDisk, now time.Time,
32+
) (countCollected int, err error) {
3133
var n int
32-
var err error
3334
for {
3435
n, err = s.File.ReadAt(s.buf, 0)
3536
if errors.Is(err, io.EOF) {
3637
break
3738
} else if err != nil {
38-
return err
39+
return 0, err
3940
}
4041
// err == nil
4142
//
@@ -49,7 +50,7 @@ func (s *linuxStatsCollector) collect(disks []*monitoredDisk) error {
4950
// single read. Reallocate (doubling) the buffer and continue.
5051
s.buf = make([]byte, len(s.buf)*2)
5152
}
52-
return parseDiskStats(s.buf[:n], disks, timeutil.Now())
53+
return parseDiskStats(s.buf[:n], disks, now)
5354
}
5455

5556
func newStatsCollector(fs vfs.FS) (*linuxStatsCollector, error) {

0 commit comments

Comments
 (0)