-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathoutput_sync.rs
More file actions
196 lines (170 loc) · 6.23 KB
/
output_sync.rs
File metadata and controls
196 lines (170 loc) · 6.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
// Copyright 2025 Lablup Inc. and Jeongkyu Shin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Thread-safe output synchronization for preventing race conditions
//! when multiple nodes write to stdout/stderr simultaneously.
use std::io::{self, Write};
use std::sync::{LazyLock, Mutex};
/// Global stdout mutex to prevent interleaved output
static STDOUT_MUTEX: LazyLock<Mutex<io::Stdout>> = LazyLock::new(|| Mutex::new(io::stdout()));
/// Global stderr mutex to prevent interleaved output
static STDERR_MUTEX: LazyLock<Mutex<io::Stderr>> = LazyLock::new(|| Mutex::new(io::stderr()));
/// Thread-safe println! that prevents output interleaving
///
/// This function acquires a mutex lock before writing to ensure
/// that the entire line is written atomically without interruption
/// from other threads.
pub fn synchronized_println(text: &str) -> io::Result<()> {
let mut stdout = STDOUT_MUTEX.lock().unwrap();
writeln!(stdout, "{text}")?;
stdout.flush()?;
Ok(())
}
/// Thread-safe eprintln! that prevents output interleaving
///
/// This function acquires a mutex lock before writing to ensure
/// that the entire line is written atomically without interruption
/// from other threads.
#[allow(dead_code)]
pub fn synchronized_eprintln(text: &str) -> io::Result<()> {
let mut stderr = STDERR_MUTEX.lock().unwrap();
writeln!(stderr, "{text}")?;
stderr.flush()?;
Ok(())
}
/// Batch write multiple lines to stdout atomically
///
/// This function writes multiple lines while holding the lock,
/// ensuring that all lines from the same node appear together.
#[allow(dead_code)]
pub fn synchronized_print_lines<'a, I>(lines: I) -> io::Result<()>
where
I: Iterator<Item = &'a str>,
{
let mut stdout = STDOUT_MUTEX.lock().unwrap();
for line in lines {
writeln!(stdout, "{line}")?;
}
stdout.flush()?;
Ok(())
}
/// Batch write multiple lines to stderr atomically
///
/// This function writes multiple lines while holding the lock,
/// ensuring that all lines from the same node appear together.
#[allow(dead_code)]
pub fn synchronized_eprint_lines<'a, I>(lines: I) -> io::Result<()>
where
I: Iterator<Item = &'a str>,
{
let mut stderr = STDERR_MUTEX.lock().unwrap();
for line in lines {
writeln!(stderr, "{line}")?;
}
stderr.flush()?;
Ok(())
}
/// Synchronized output writer for node prefixed output
pub struct NodeOutputWriter {
node_prefix: String,
no_prefix: bool,
}
impl NodeOutputWriter {
/// Create a new writer with a node prefix (prefix enabled by default)
#[allow(dead_code)]
pub fn new(node_host: &str) -> Self {
Self::new_with_no_prefix(node_host, false)
}
/// Create a new writer with optional prefix disabled
pub fn new_with_no_prefix(node_host: &str, no_prefix: bool) -> Self {
Self {
node_prefix: format!("[{node_host}]"),
no_prefix,
}
}
/// Format a line with or without prefix based on configuration
fn format_line(&self, line: &str) -> String {
if self.no_prefix {
line.to_string()
} else {
format!("{} {}", self.node_prefix, line)
}
}
/// Write stdout lines with optional node prefix atomically
pub fn write_stdout_lines(&self, text: &str) -> io::Result<()> {
let lines: Vec<String> = text.lines().map(|line| self.format_line(line)).collect();
if !lines.is_empty() {
let mut stdout = STDOUT_MUTEX.lock().unwrap();
for line in lines {
writeln!(stdout, "{line}")?;
}
stdout.flush()?;
}
Ok(())
}
/// Write stderr lines with optional node prefix atomically
pub fn write_stderr_lines(&self, text: &str) -> io::Result<()> {
let lines: Vec<String> = text.lines().map(|line| self.format_line(line)).collect();
if !lines.is_empty() {
let mut stderr = STDERR_MUTEX.lock().unwrap();
for line in lines {
writeln!(stderr, "{line}")?;
}
stderr.flush()?;
}
Ok(())
}
/// Write a single stdout line with optional node prefix
pub fn write_stdout(&self, line: &str) -> io::Result<()> {
synchronized_println(&self.format_line(line))
}
/// Write a single stderr line with optional node prefix
#[allow(dead_code)]
pub fn write_stderr(&self, line: &str) -> io::Result<()> {
synchronized_eprintln(&self.format_line(line))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_node_output_writer() {
let writer = NodeOutputWriter::new("test-host");
assert_eq!(writer.node_prefix, "[test-host]");
assert!(!writer.no_prefix);
}
#[test]
fn test_node_output_writer_with_no_prefix() {
let writer = NodeOutputWriter::new_with_no_prefix("test-host", true);
assert_eq!(writer.node_prefix, "[test-host]");
assert!(writer.no_prefix);
// Test format_line with no_prefix enabled
assert_eq!(writer.format_line("test output"), "test output");
// Test with no_prefix disabled
let writer_with_prefix = NodeOutputWriter::new_with_no_prefix("test-host", false);
assert_eq!(
writer_with_prefix.format_line("test output"),
"[test-host] test output"
);
}
#[test]
fn test_synchronized_output() {
// These tests just verify the functions compile and don't panic
// Actual thread safety is tested through integration tests
let _ = synchronized_println("test");
let _ = synchronized_eprintln("test error");
let lines = ["line1", "line2"];
let _ = synchronized_print_lines(lines.iter().copied());
let _ = synchronized_eprint_lines(lines.iter().copied());
}
}