Skip to content

Commit 910223a

Browse files
my-vegetable-has-explodedJiangJiaWei1103Future-Outlier
authored
[historyserver][collector] Add file-level idempotency check for prev-logs processing on container restart (#4321)
* feat(historyserver):re-push prev-logs on pod restart Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * chroe(historyserver): replace hard code path Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * fmt. Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * test(historyserver): add test for logcollector restart Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * add e2e test for repush Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * fix e2e test. Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * add Troubleshooting. Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * rm redundant cleanup. Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * reuse WatchPrevLogsLoops to scan existing logs. Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * simulate partial upload in e2e test. Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * fix unit test. Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * fix lint Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * fix mv race condition in e2e test. Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * Apply suggestion from @JiangJiaWei1103 Co-authored-by: 江家瑋 <36886416+JiangJiaWei1103@users.noreply.github.com> Signed-off-by: yi wang <48236141+my-vegetable-has-exploded@users.noreply.github.com> * address comments. Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * e2e test: add assertions and update description Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> * Better test Signed-off-by: Future-Outlier <eric901201@gmail.com> --------- Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> Signed-off-by: yi wang <48236141+my-vegetable-has-exploded@users.noreply.github.com> Signed-off-by: Future-Outlier <eric901201@gmail.com> Co-authored-by: 江家瑋 <36886416+JiangJiaWei1103@users.noreply.github.com> Co-authored-by: Future-Outlier <eric901201@gmail.com>
1 parent d46505b commit 910223a

File tree

4 files changed

+424
-30
lines changed

4 files changed

+424
-30
lines changed

historyserver/docs/set_up_collector.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,26 @@ kubectl delete -f historyserver/config/raycluster.yaml
141141
You're supposed to see the uploaded logs and events in the minio UI as below:
142142

143143
![write_logs_and_events](https://github.com/ray-project/kuberay/blob/db7cb864061518ed4cfa7bf48cf05cfbfeb49f95/historyserver/docs/assets/write_logs_and_events.png)
144+
145+
## Troubleshooting
146+
147+
### "too many open files" error
148+
149+
If you encounter `level=fatal msg="Create fsnotify NewWatcher error too many open files"` in the collector logs,
150+
it is likely due to the inotify limits on the Kubernetes nodes.
151+
152+
To fix this, increase the limits on the **host nodes** (not inside the container):
153+
154+
```bash
155+
# Apply changes immediately
156+
sudo sysctl -w fs.inotify.max_user_instances=8192
157+
sudo sysctl -w fs.inotify.max_user_watches=524288
158+
```
159+
160+
To make these changes persistent across reboots, use the following lines:
161+
162+
```text
163+
echo "fs.inotify.max_user_instances=8192" | sudo tee -a /etc/sysctl.conf
164+
echo "fs.inotify.max_user_watches=524288" | sudo tee -a /etc/sysctl.conf
165+
sudo sysctl -p
166+
```

historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,24 @@ import (
2222
)
2323

2424
type RayLogHandler struct {
25-
Writer storage.StorageWriter
26-
LogFiles chan string
27-
HttpClient *http.Client
28-
ShutdownChan chan struct{}
29-
logFilePaths map[string]bool
30-
MetaDir string
31-
RayClusterName string
32-
LogDir string
33-
RayNodeName string
34-
RayClusterID string
35-
RootDir string
36-
SessionDir string
37-
prevLogsDir string
38-
PushInterval time.Duration
39-
LogBatching int
40-
filePathMu sync.Mutex
41-
EnableMeta bool
25+
Writer storage.StorageWriter
26+
LogFiles chan string
27+
HttpClient *http.Client
28+
ShutdownChan chan struct{}
29+
logFilePaths map[string]bool
30+
MetaDir string
31+
RayClusterName string
32+
LogDir string
33+
RayNodeName string
34+
RayClusterID string
35+
RootDir string
36+
SessionDir string
37+
prevLogsDir string
38+
persistCompleteLogsDir string
39+
PushInterval time.Duration
40+
LogBatching int
41+
filePathMu sync.Mutex
42+
EnableMeta bool
4243
}
4344

4445
func (r *RayLogHandler) Start(stop <-chan struct{}) error {
@@ -49,6 +50,7 @@ func (r *RayLogHandler) Start(stop <-chan struct{}) error {
4950
func (r *RayLogHandler) Run(stop <-chan struct{}) error {
5051
// watchPath := r.LogDir
5152
r.prevLogsDir = "/tmp/ray/prev-logs"
53+
r.persistCompleteLogsDir = "/tmp/ray/persist-complete-logs"
5254

5355
// Initialize log file paths storage
5456
r.logFilePaths = make(map[string]bool)
@@ -62,6 +64,11 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error {
6264
// Setup signal handling for SIGTERM
6365
sigChan := make(chan os.Signal, 1)
6466
signal.Notify(sigChan, syscall.SIGTERM)
67+
68+
// WatchPrevLogsLoops performs an initial scan of the prev-logs directory on startup
69+
// to process leftover log files in prev-logs/{sessionID}/{nodeID}/logs/ directories.
70+
// After scanning, it watches for new directories and files. This ensures incomplete
71+
// uploads from previous runs are resumed.
6572
go r.WatchPrevLogsLoops()
6673
if r.EnableMeta {
6774
go r.WatchSessionLatestLoops() // Watch session_latest symlink changes
@@ -231,7 +238,7 @@ func (r *RayLogHandler) WatchPrevLogsLoops() {
231238
}
232239

233240
// Also check and create persist-complete-logs directory
234-
completeLogsDir := "/tmp/ray/persist-complete-logs"
241+
completeLogsDir := r.persistCompleteLogsDir
235242
if _, err := os.Stat(completeLogsDir); os.IsNotExist(err) {
236243
logrus.Infof("persist-complete-logs directory does not exist, creating it: %s", completeLogsDir)
237244
if err := os.MkdirAll(completeLogsDir, 0o777); err != nil {
@@ -492,6 +499,38 @@ func (r *RayLogHandler) processSessionPrevLogs(sessionDir string) {
492499
}
493500
}
494501

502+
// isFileAlreadyPersisted checks if a log file has already been uploaded to storage and moved to
503+
// the persist-complete-logs directory. This prevents duplicate uploads during collector restarts.
504+
//
505+
// When a log file is successfully uploaded, it is moved from prev-logs to persist-complete-logs
506+
// to mark it as processed. This function checks if the equivalent file path exists in the
507+
// persist-complete-logs directory.
508+
//
509+
// Example:
510+
//
511+
// Given absoluteLogPath = "/tmp/ray/prev-logs/session_123/node_456/logs/raylet.out"
512+
// This function checks if "/tmp/ray/persist-complete-logs/session_123/node_456/logs/raylet.out" exists
513+
// - If exists: returns true (file was already uploaded, skip it)
514+
// - If not exists: returns false (file needs to be uploaded)
515+
func (r *RayLogHandler) isFileAlreadyPersisted(absoluteLogPath, sessionID, nodeID string) bool {
516+
// Calculate the relative path within the logs directory
517+
logsDir := filepath.Join(r.prevLogsDir, sessionID, nodeID, "logs")
518+
relativeLogPath, err := filepath.Rel(logsDir, absoluteLogPath)
519+
if err != nil {
520+
logrus.Errorf("Failed to get relative path for %s: %v", absoluteLogPath, err)
521+
return false
522+
}
523+
524+
// Construct the path in persist-complete-logs
525+
persistedPath := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID, "logs", relativeLogPath)
526+
527+
// Check if the file exists
528+
if _, err := os.Stat(persistedPath); err == nil {
529+
return true
530+
}
531+
return false
532+
}
533+
495534
// processPrevLogsDir processes logs in a /tmp/ray/prev-logs/{sessionid}/{nodeid} directory
496535
func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
497536
// Extract session ID and node ID from the path
@@ -513,13 +552,6 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
513552
return
514553
}
515554

516-
// Check if this directory has already been processed by checking in persist-complete-logs
517-
completeDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID, "logs")
518-
if _, err := os.Stat(completeDir); err == nil {
519-
logrus.Infof("Session %s node %s logs already processed, skipping", sessionID, nodeID)
520-
return
521-
}
522-
523555
logrus.Infof("Processing prev-logs for session: %s, node: %s", sessionID, nodeID)
524556

525557
logsDir := filepath.Join(sessionNodeDir, "logs")
@@ -550,6 +582,12 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
550582
return nil
551583
}
552584

585+
// Check if this file has already been persisted
586+
if r.isFileAlreadyPersisted(path, sessionID, nodeID) {
587+
logrus.Debugf("File %s already persisted, skipping", path)
588+
return nil
589+
}
590+
553591
// Process log file
554592
if err := r.processPrevLogFile(path, logsDir, sessionID, nodeID); err != nil {
555593
logrus.Errorf("Failed to process prev-log file %s: %v", path, err)
@@ -616,7 +654,7 @@ func (r *RayLogHandler) processPrevLogFile(absoluteLogPathName, localLogDir, ses
616654
logrus.Infof("Successfully wrote object %s, size: %d bytes", objectName, len(content))
617655

618656
// Move the processed file to persist-complete-logs directory to avoid re-uploading
619-
completeBaseDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID)
657+
completeBaseDir := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID)
620658
completeDir := filepath.Join(completeBaseDir, "logs")
621659

622660
if _, err := os.Stat(completeDir); os.IsNotExist(err) {
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package logcollector
2+
3+
import (
4+
"io"
5+
"os"
6+
"path/filepath"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
. "github.com/onsi/gomega"
12+
)
13+
14+
// MockStorageWriter is a mock implementation of storage.StorageWriter for testing
15+
type MockStorageWriter struct {
16+
mu sync.Mutex
17+
createdDirs []string
18+
writtenFiles map[string]string // path -> content
19+
}
20+
21+
func NewMockStorageWriter() *MockStorageWriter {
22+
return &MockStorageWriter{
23+
createdDirs: make([]string, 0),
24+
writtenFiles: make(map[string]string),
25+
}
26+
}
27+
28+
func (m *MockStorageWriter) CreateDirectory(path string) error {
29+
m.mu.Lock()
30+
defer m.mu.Unlock()
31+
m.createdDirs = append(m.createdDirs, path)
32+
return nil
33+
}
34+
35+
func (m *MockStorageWriter) WriteFile(file string, reader io.ReadSeeker) error {
36+
content, err := io.ReadAll(reader)
37+
if err != nil {
38+
return err
39+
}
40+
m.mu.Lock()
41+
defer m.mu.Unlock()
42+
m.writtenFiles[file] = string(content)
43+
return nil
44+
}
45+
46+
// setupRayTestEnvironment creates test directories under /tmp/ray for realistic testing
47+
// This matches the actual paths used by the logcollector
48+
func setupRayTestEnvironment(t *testing.T) (string, func()) {
49+
baseDir := filepath.Join("/tmp", "ray-test-"+t.Name())
50+
51+
// Create base directory
52+
if err := os.MkdirAll(baseDir, 0755); err != nil {
53+
t.Fatalf("Failed to create base dir: %v", err)
54+
}
55+
56+
// Create prev-logs and persist-complete-logs directories
57+
prevLogsDir := filepath.Join(baseDir, "prev-logs")
58+
persistLogsDir := filepath.Join(baseDir, "persist-complete-logs")
59+
60+
if err := os.MkdirAll(prevLogsDir, 0755); err != nil {
61+
t.Fatalf("Failed to create prev-logs dir: %v", err)
62+
}
63+
if err := os.MkdirAll(persistLogsDir, 0755); err != nil {
64+
t.Fatalf("Failed to create persist-complete-logs dir: %v", err)
65+
}
66+
67+
cleanup := func() {
68+
os.RemoveAll(baseDir)
69+
}
70+
71+
return baseDir, cleanup
72+
}
73+
74+
// createTestLogFile creates a test log file with given content
75+
func createTestLogFile(t *testing.T, path string, content string) {
76+
dir := filepath.Dir(path)
77+
if err := os.MkdirAll(dir, 0755); err != nil {
78+
t.Fatalf("Failed to create directory %s: %v", dir, err)
79+
}
80+
81+
if err := os.WriteFile(path, []byte(content), 0644); err != nil {
82+
t.Fatalf("Failed to write file %s: %v", path, err)
83+
}
84+
}
85+
86+
// TestIsFileAlreadyPersisted tests the file-level persistence check
87+
func TestIsFileAlreadyPersisted(t *testing.T) {
88+
baseDir, cleanup := setupRayTestEnvironment(t)
89+
defer cleanup()
90+
91+
// Use the actual prev-logs directory structure that matches production
92+
handler := &RayLogHandler{
93+
prevLogsDir: filepath.Join(baseDir, "prev-logs"),
94+
persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"),
95+
}
96+
97+
sessionID := "session-123"
98+
nodeID := "node-456"
99+
100+
// Create prev-logs structure
101+
prevLogsPath := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs", "worker.log")
102+
createTestLogFile(t, prevLogsPath, "test log content")
103+
104+
// Test case 1: File not yet persisted
105+
if handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) {
106+
t.Error("Expected file to not be persisted yet")
107+
}
108+
109+
// Create the persisted file in persist-complete-logs
110+
persistedPath := filepath.Join(baseDir, "persist-complete-logs", sessionID, nodeID, "logs", "worker.log")
111+
createTestLogFile(t, persistedPath, "test log content")
112+
113+
// Test case 2: File already persisted
114+
if !handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) {
115+
t.Error("Expected file to be detected as persisted")
116+
}
117+
}
118+
119+
// TestScanAndProcess tests the full lifecycle: partial upload, interruption, and resumption via scan.
120+
//
121+
// This test simulates a crash recovery scenario:
122+
// 1. Two log files exist in prev-logs
123+
// 2. Only file1 is processed (simulating partial success before crash)
124+
// 3. File1 is restored to prev-logs (simulating incomplete rename during crash)
125+
// 4. WatchPrevLogsLoops is started (simulating collector restart)
126+
// 5. Verify that file1 is NOT re-uploaded (idempotency) and file2 is uploaded
127+
// 6. Verify that the node directory is cleaned up after all files are processed
128+
func TestScanAndProcess(t *testing.T) {
129+
g := NewWithT(t)
130+
131+
baseDir, cleanup := setupRayTestEnvironment(t)
132+
defer cleanup()
133+
134+
mockWriter := NewMockStorageWriter()
135+
handler := &RayLogHandler{
136+
Writer: mockWriter,
137+
RootDir: "/test-root",
138+
prevLogsDir: filepath.Join(baseDir, "prev-logs"),
139+
persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"),
140+
ShutdownChan: make(chan struct{}),
141+
RayClusterName: "test-cluster",
142+
RayClusterID: "cluster-123",
143+
}
144+
145+
sessionID := "session-lifecycle"
146+
nodeID := "node-1"
147+
logsDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs")
148+
149+
// Prepare two log files in prev-logs directory
150+
f1 := filepath.Join(logsDir, "file1.log")
151+
f2 := filepath.Join(logsDir, "file2.log")
152+
createTestLogFile(t, f1, "content1")
153+
createTestLogFile(t, f2, "content2")
154+
155+
// --- Step 1: Process file1 only (simulating partial success before crash) ---
156+
err := handler.processPrevLogFile(f1, logsDir, sessionID, nodeID)
157+
if err != nil {
158+
t.Fatalf("Failed to process file1: %v", err)
159+
}
160+
161+
// Verify file1 is uploaded to storage
162+
if len(mockWriter.writtenFiles) != 1 {
163+
t.Errorf("Expected 1 file in storage, got %d", len(mockWriter.writtenFiles))
164+
}
165+
166+
// Manually restore file1 to prev-logs to simulate a crash right after upload
167+
// but before the rename operation completed
168+
createTestLogFile(t, f1, "content1")
169+
170+
// --- Step 2: Start the startup scan in background (simulating collector restart) ---
171+
go handler.WatchPrevLogsLoops()
172+
173+
// --- Step 3: Use Eventually to wait for async processing ---
174+
sessionNodeDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID)
175+
176+
// Wait until storage has exactly 2 files.
177+
// file1 should NOT be re-uploaded because it already exists in persist-complete-logs.
178+
// Only file2 should be newly uploaded.
179+
g.Eventually(func() int {
180+
mockWriter.mu.Lock()
181+
defer mockWriter.mu.Unlock()
182+
return len(mockWriter.writtenFiles)
183+
}, 5*time.Second, 100*time.Millisecond).Should(Equal(2),
184+
"Storage should have 2 unique files (file1 should NOT be re-uploaded due to idempotency check)")
185+
186+
// Wait until the node directory in prev-logs is removed.
187+
// After all files are processed and moved to persist-complete-logs,
188+
// the node directory should be cleaned up.
189+
g.Eventually(func() bool {
190+
_, err := os.Stat(sessionNodeDir)
191+
return os.IsNotExist(err)
192+
}, 5*time.Second, 100*time.Millisecond).Should(BeTrue(),
193+
"Node directory should be removed after all files are processed and moved to persist-complete-logs")
194+
195+
// Signal the background goroutine to exit gracefully
196+
close(handler.ShutdownChan)
197+
}

0 commit comments

Comments
 (0)