Skip to content

Commit 3ac9ad9

Browse files
committed
lib/logstorage: close files in parallel inside parts opened for reading
This should reduce the time needed for closing the parts opened for reading on high-latency storage systems such as NFS of Ceph. Updates #517
1 parent 1414f37 commit 3ac9ad9

File tree

11 files changed

+103
-72
lines changed

11 files changed

+103
-72
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/VictoriaMetrics/VictoriaLogs
33
go 1.24.5
44

55
require (
6-
github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250727161922-a05e4cf67b09
6+
github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250727172355-8e155da0ac00
77
github.com/VictoriaMetrics/easyproto v0.1.4
88
github.com/VictoriaMetrics/metrics v1.38.0
99
github.com/cespare/xxhash/v2 v2.3.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250727161922-a05e4cf67b09 h1:+V7+2wKZMngpKjnW/n9eK7V3R0J97D/Y5rttiQYJ53g=
2-
github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250727161922-a05e4cf67b09/go.mod h1:NQGojsgYDAf4cR0k1us+EzDnsZSsHTun1Hp/f3+S6p4=
1+
github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250727172355-8e155da0ac00 h1:eMRy88id0ZRmg6GxTuLrF8sBYNzdAoJa1MIx5+HjnxE=
2+
github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250727172355-8e155da0ac00/go.mod h1:NQGojsgYDAf4cR0k1us+EzDnsZSsHTun1Hp/f3+S6p4=
33
github.com/VictoriaMetrics/easyproto v0.1.4 h1:r8cNvo8o6sR4QShBXQd1bKw/VVLSQma/V2KhTBPf+Sc=
44
github.com/VictoriaMetrics/easyproto v0.1.4/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710=
55
github.com/VictoriaMetrics/metrics v1.38.0 h1:1d0dRgVH8Nnu8dKMfisKefPC3q7gqf3/odyO0quAvyA=

lib/logstorage/block_stream_reader.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,10 @@ func (r *bloomValuesReader) totalBytesRead() uint64 {
9595
return r.bloom.bytesRead + r.values.bytesRead
9696
}
9797

98-
func (r *bloomValuesReader) MustClose() {
99-
r.bloom.MustClose()
100-
r.values.MustClose()
98+
func (r *bloomValuesReader) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
99+
dst = append(dst, &r.bloom)
100+
dst = append(dst, &r.values)
101+
return dst
101102
}
102103

103104
type bloomValuesStreamReader struct {
@@ -178,19 +179,25 @@ func (sr *streamReaders) totalBytesRead() uint64 {
178179
}
179180

180181
func (sr *streamReaders) MustClose() {
181-
sr.columnNamesReader.MustClose()
182-
sr.columnIdxsReader.MustClose()
183-
sr.metaindexReader.MustClose()
184-
sr.indexReader.MustClose()
185-
sr.columnsHeaderIndexReader.MustClose()
186-
sr.columnsHeaderReader.MustClose()
187-
sr.timestampsReader.MustClose()
188-
189-
sr.messageBloomValuesReader.MustClose()
190-
sr.oldBloomValuesReader.MustClose()
182+
// Close files in parallel in order to reduce the time needed for this operation
183+
// on high-latency storage systems such as NFS or Ceph.
184+
cs := []fs.MustCloser{
185+
&sr.columnNamesReader,
186+
&sr.columnIdxsReader,
187+
&sr.metaindexReader,
188+
&sr.indexReader,
189+
&sr.columnsHeaderIndexReader,
190+
&sr.columnsHeaderReader,
191+
&sr.timestampsReader,
192+
}
193+
194+
cs = sr.messageBloomValuesReader.appendClosers(cs)
195+
cs = sr.oldBloomValuesReader.appendClosers(cs)
191196
for i := range sr.bloomValuesShards {
192-
sr.bloomValuesShards[i].MustClose()
197+
cs = sr.bloomValuesShards[i].appendClosers(cs)
193198
}
199+
200+
fs.MustCloseParallel(cs)
194201
}
195202

196203
func (sr *streamReaders) getBloomValuesReaderForColumnName(name string) *bloomValuesReader {

lib/logstorage/block_stream_writer.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ func (w *bloomValuesWriter) totalBytesWritten() uint64 {
8484
return w.bloom.bytesWritten + w.values.bytesWritten
8585
}
8686

87-
func (w *bloomValuesWriter) appendWriteClosers(dst []filestream.WriteCloser) []filestream.WriteCloser {
88-
dst = append(dst, w.bloom.w)
89-
dst = append(dst, w.values.w)
87+
func (w *bloomValuesWriter) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
88+
dst = append(dst, &w.bloom)
89+
dst = append(dst, &w.values)
9090
return dst
9191
}
9292

@@ -156,22 +156,24 @@ func (sw *streamWriters) totalBytesWritten() uint64 {
156156
}
157157

158158
func (sw *streamWriters) MustClose() {
159-
wcs := []filestream.WriteCloser{
160-
sw.columnNamesWriter.w,
161-
sw.columnIdxsWriter.w,
162-
sw.metaindexWriter.w,
163-
sw.indexWriter.w,
164-
sw.columnsHeaderIndexWriter.w,
165-
sw.columnsHeaderWriter.w,
166-
sw.timestampsWriter.w,
159+
// Flush and close files in parallel in order to reduce the time needed for this operation
160+
// on high-latency storage systems such as NFS or Ceph.
161+
cs := []fs.MustCloser{
162+
&sw.columnNamesWriter,
163+
&sw.columnIdxsWriter,
164+
&sw.metaindexWriter,
165+
&sw.indexWriter,
166+
&sw.columnsHeaderIndexWriter,
167+
&sw.columnsHeaderWriter,
168+
&sw.timestampsWriter,
167169
}
168170

169-
wcs = sw.messageBloomValuesWriter.appendWriteClosers(wcs)
171+
cs = sw.messageBloomValuesWriter.appendClosers(cs)
170172
for i := range sw.bloomValuesShards {
171-
wcs = sw.bloomValuesShards[i].appendWriteClosers(wcs)
173+
cs = sw.bloomValuesShards[i].appendClosers(cs)
172174
}
173175

174-
filestream.MustCloseWritersParallel(wcs)
176+
fs.MustCloseParallel(cs)
175177
}
176178

177179
func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomValuesWriter {

lib/logstorage/part.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ type bloomValuesReaderAt struct {
5454
values fs.MustReadAtCloser
5555
}
5656

57-
func (r *bloomValuesReaderAt) MustClose() {
58-
r.bloom.MustClose()
59-
r.values.MustClose()
57+
func (r *bloomValuesReaderAt) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
58+
dst = append(dst, r.bloom)
59+
dst = append(dst, r.values)
60+
return dst
6061
}
6162

6263
func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
@@ -173,22 +174,28 @@ func mustOpenFilePart(pt *partition, path string) *part {
173174
}
174175

175176
func mustClosePart(p *part) {
176-
p.indexFile.MustClose()
177+
// Close files in parallel in order to speed up this operation
178+
// on high-latency storage systems such as NFS and Ceph.
179+
var cs []fs.MustCloser
180+
181+
cs = append(cs, p.indexFile)
177182
if p.ph.FormatVersion >= 1 {
178-
p.columnsHeaderIndexFile.MustClose()
183+
cs = append(cs, p.columnsHeaderIndexFile)
179184
}
180-
p.columnsHeaderFile.MustClose()
181-
p.timestampsFile.MustClose()
182-
p.messageBloomValues.MustClose()
185+
cs = append(cs, p.columnsHeaderFile)
186+
cs = append(cs, p.timestampsFile)
187+
cs = p.messageBloomValues.appendClosers(cs)
183188

184189
if p.ph.FormatVersion < 1 {
185-
p.oldBloomValues.MustClose()
190+
cs = p.oldBloomValues.appendClosers(cs)
186191
} else {
187192
for i := range p.bloomValuesShards {
188-
p.bloomValuesShards[i].MustClose()
193+
cs = p.bloomValuesShards[i].appendClosers(cs)
189194
}
190195
}
191196

197+
fs.MustCloseParallel(cs)
198+
192199
p.pt = nil
193200
}
194201

vendor/github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream/filestream.go

Lines changed: 0 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fs.go

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/block_stream_reader.go

Lines changed: 8 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/block_stream_writer.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/part.go

Lines changed: 8 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)