Skip to content

Commit 2f48073

Browse files
authored
Merge branch 'VictoriaMetrics:master' into master
2 parents f4cd5e8 + 2e1a9e3 commit 2f48073

File tree

121 files changed

+15966
-2017
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

121 files changed

+15966
-2017
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
/gocache-for-docker
1212
/victoria-logs-data
1313
/vlagent-remotewrite-data
14+
/vlagent-kubernetes-checkpoints.json
1415
.DS_store
1516
Gemfile.lock
1617
/_site

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ release-vlutils-windows-goarch: \
271271
vlogscli-windows-$(GOARCH)-prod.exe
272272

273273
pprof-cpu:
274-
go tool pprof -trim_path=github.com/VictoriaMetrics/VictoriaLogs@ $(PPROF_FILE)
274+
go tool pprof -trim_path=github.com/VictoriaMetrics/VictoriaLogs $(PPROF_FILE)
275275

276276
fmt:
277277
gofmt -l -w -s ./lib
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Running vlagent outside Kubernetes for local development
2+
3+
This guide explains how to run vlagent outside a Kubernetes cluster
4+
for local development and testing purposes.
5+
6+
## Prerequisites
7+
8+
Install [k3d](https://github.com/k3d-io/k3d) - a lightweight tool for running Kubernetes locally,
9+
with support for mounting `/var/log/*` folders from the guest to the host system.
10+
11+
## Setup
12+
13+
1. Create `/var/log/containers` and `/var/log/pods` directories on your host system:
14+
15+
```bash
16+
mkdir -p /var/log/containers /var/log/pods
17+
18+
# Ensure both vlagent and k3s have read/write permissions
19+
chmod a+rw /var/log/pods /var/log/containers
20+
```
21+
22+
2. Create a k3d cluster with proper volume mounts:
23+
24+
```bash
25+
k3d cluster create -v /var/log/containers:/var/log/containers@all -v /var/log/pods:/var/log/pods@all
26+
```
27+
28+
This command will also update the `~/.kube/config` file to use the new k3d cluster.
29+
vlagent will use this kubeconfig file to connect to the currently selected cluster.
30+
You can change the kubeconfig path via the `KUBECONFIG` environment variable.
31+
32+
3. Run vlagent with Kubernetes discovery enabled:
33+
34+
```bash
35+
./vlagent -remoteWrite.url=http://localhost:9428/internal/insert -kubernetes
36+
```
37+
38+
vlagent connects to the Kubernetes API to discover pods and containers running in the cluster.
39+
It reads logs from the `/var/log/containers` and `/var/log/pods` directories mounted on the host system.
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package kubernetescollector
2+
3+
import (
4+
"time"
5+
6+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
7+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
8+
)
9+
10+
// backoffTimer implements an exponential backoff timer with jitter.
11+
type backoffTimer struct {
12+
min time.Duration
13+
max time.Duration
14+
current time.Duration
15+
16+
timer *time.Timer
17+
}
18+
19+
// newBackoffTimer returns a new backoffTimer initialized with the given minDelay and maxDelay.
20+
// The caller must call stop() when the backoffTimer is no longer needed.
21+
func newBackoffTimer(minDelay, maxDelay time.Duration) backoffTimer {
22+
return backoffTimer{
23+
min: minDelay,
24+
max: maxDelay,
25+
current: minDelay,
26+
}
27+
}
28+
29+
// wait sleeps for the current delay with jitter, doubling the delay for the next wait.
30+
// Use currentDelay to get the current backoff duration.
31+
func (bt *backoffTimer) wait(stopCh <-chan struct{}) {
32+
v := timeutil.AddJitterToDuration(bt.current)
33+
bt.current *= 2
34+
if bt.current > bt.max {
35+
bt.current = bt.max
36+
}
37+
38+
if bt.timer == nil {
39+
bt.timer = timerpool.Get(v)
40+
} else {
41+
bt.timer.Reset(v)
42+
}
43+
44+
select {
45+
case <-stopCh:
46+
bt.timer.Stop()
47+
case <-bt.timer.C:
48+
}
49+
}
50+
51+
// currentDelay returns the current backoff duration.
52+
func (bt *backoffTimer) currentDelay() time.Duration {
53+
return bt.current
54+
}
55+
56+
// reset sets the backoff delay to its minimum.
57+
func (bt *backoffTimer) reset() {
58+
bt.current = bt.min
59+
}
60+
61+
// stop releases internal resources.
62+
func (bt *backoffTimer) stop() {
63+
if bt.timer != nil {
64+
timerpool.Put(bt.timer)
65+
bt.timer = nil
66+
}
67+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package kubernetescollector
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"fmt"
7+
"os"
8+
"slices"
9+
"strings"
10+
"sync"
11+
"time"
12+
13+
"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
14+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
15+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
16+
)
17+
18+
// checkpointsDB manages persistent log file reading state checkpoints.
19+
// It saves reading positions to disk to enable resuming log collection
20+
// after vlagent restarts without data loss or duplication.
21+
//
22+
// The caller is responsible for closing checkpointsDB via stop() method
23+
// when it's no longer needed.
24+
type checkpointsDB struct {
25+
checkpointsPath string
26+
27+
checkpoints map[string]checkpoint
28+
checkpointsLock sync.Mutex
29+
30+
wg sync.WaitGroup
31+
stopCh chan struct{}
32+
}
33+
34+
// startCheckpointsDB starts a checkpointsDB instance.
35+
// The caller must call stop() when the checkpointsDB is no longer needed.
36+
func startCheckpointsDB(path string) (*checkpointsDB, error) {
37+
checkpoints, err := readCheckpoints(path)
38+
if err != nil {
39+
return nil, err
40+
}
41+
42+
checkpointsMap := make(map[string]checkpoint)
43+
for _, cp := range checkpoints {
44+
checkpointsMap[cp.Path] = cp
45+
}
46+
47+
db := &checkpointsDB{
48+
checkpointsPath: path,
49+
checkpoints: checkpointsMap,
50+
stopCh: make(chan struct{}),
51+
}
52+
53+
db.startPeriodicSyncCheckpoints()
54+
55+
return db, nil
56+
}
57+
58+
// checkpoint represents a persistent snapshot of a log file reading state.
59+
//
60+
// The checkpoint is saved to disk to enable resuming log collection from the exact
61+
// position after vlagent restarts, preventing:
62+
// 1. Log duplication when logs are re-read from the beginning.
63+
// 2. Log loss when a log file was rotated while vlagent was down.
64+
// In this case we should find the rotated file.
65+
//
66+
// checkpoint includes pod metadata (common and stream fields) to allow immediate log processing
67+
// without waiting for the Kubernetes API server to provide pod information.
68+
type checkpoint struct {
69+
Path string `json:"path"`
70+
Inode uint64 `json:"inode"`
71+
Offset int64 `json:"offset"`
72+
CommonFields []logstorage.Field `json:"commonFields"`
73+
}
74+
75+
func (db *checkpointsDB) set(cp checkpoint) {
76+
db.checkpointsLock.Lock()
77+
defer db.checkpointsLock.Unlock()
78+
79+
db.checkpoints[cp.Path] = cp
80+
}
81+
82+
func (db *checkpointsDB) get(path string) (checkpoint, bool) {
83+
db.checkpointsLock.Lock()
84+
defer db.checkpointsLock.Unlock()
85+
86+
cp, ok := db.checkpoints[path]
87+
return cp, ok
88+
}
89+
90+
func (db *checkpointsDB) getAll() []checkpoint {
91+
db.checkpointsLock.Lock()
92+
defer db.checkpointsLock.Unlock()
93+
94+
cps := make([]checkpoint, 0, len(db.checkpoints))
95+
for _, cp := range db.checkpoints {
96+
cps = append(cps, cp)
97+
}
98+
99+
return cps
100+
}
101+
102+
func (db *checkpointsDB) delete(path string) {
103+
db.checkpointsLock.Lock()
104+
defer db.checkpointsLock.Unlock()
105+
106+
delete(db.checkpoints, path)
107+
}
108+
109+
func (db *checkpointsDB) mustSync() {
110+
cps := db.getAll()
111+
112+
slices.SortFunc(cps, func(a, b checkpoint) int {
113+
return strings.Compare(a.Path, b.Path)
114+
})
115+
116+
data, err := json.MarshalIndent(cps, "", "\t")
117+
if err != nil {
118+
logger.Panicf("BUG: cannot marshal checkpoints: %s", err)
119+
}
120+
121+
fs.MustWriteAtomic(db.checkpointsPath, data, true)
122+
}
123+
124+
func readCheckpoints(path string) ([]checkpoint, error) {
125+
data, err := os.ReadFile(path)
126+
if err != nil {
127+
if errors.Is(err, os.ErrNotExist) {
128+
logger.Infof("no checkpoints file found at %q; vlagent will read log files from the beginning", path)
129+
return nil, nil
130+
}
131+
return nil, fmt.Errorf("cannot read file checkpoints: %w", err)
132+
}
133+
134+
if len(data) == 0 {
135+
return nil, nil
136+
}
137+
138+
var checkpoints []checkpoint
139+
if err := json.Unmarshal(data, &checkpoints); err != nil {
140+
return nil, fmt.Errorf("cannot unmarshal file checkpoints from %q: %w", path, err)
141+
}
142+
143+
return checkpoints, nil
144+
}
145+
146+
// startPeriodicFlushCheckpoints periodically persists in-memory checkpoints to disk.
147+
//
148+
// It complements the explicit sync performed on graceful stop,
149+
// ensuring regular persistence even when the process is killed.
150+
func (db *checkpointsDB) startPeriodicSyncCheckpoints() {
151+
db.wg.Add(1)
152+
go func() {
153+
defer db.wg.Done()
154+
155+
ticker := time.NewTicker(1 * time.Minute)
156+
defer ticker.Stop()
157+
158+
for {
159+
select {
160+
case <-ticker.C:
161+
db.mustSync()
162+
case <-db.stopCh:
163+
db.mustSync()
164+
return
165+
}
166+
}
167+
}()
168+
}
169+
170+
func (db *checkpointsDB) stop() {
171+
close(db.stopCh)
172+
db.wg.Wait()
173+
}

0 commit comments

Comments
 (0)