Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 53 additions & 12 deletions src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,11 @@ impl JournaldSource {
info!("Starting journalctl.");
let cursor = checkpointer.lock().await.cursor.clone();
match self.starter.start(cursor.as_deref()) {
Ok((stream, running)) => {
if !self.run_stream(stream, &finalizer, shutdown.clone()).await {
Ok((stdout_stream, stderr_stream, running)) => {
if !self
.run_stream(stdout_stream, stderr_stream, &finalizer, shutdown.clone())
.await
{
return;
}
// Explicit drop to ensure it isn't dropped earlier.
Expand All @@ -488,25 +491,33 @@ impl JournaldSource {
/// Return `true` if should restart `journalctl`.
async fn run_stream<'a>(
&'a mut self,
mut stream: JournalStream,
mut stdout_stream: JournalStream,
stderr_stream: JournalStream,
finalizer: &'a Finalizer,
mut shutdown: ShutdownSignal,
) -> bool {
let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
let events_received = register!(EventsReceived);

// Spawn stderr handler task
let stderr_handler = tokio::spawn(Self::handle_stderr(stderr_stream));

let batch_size = self.batch_size;
loop {
let result = loop {
let mut batch = Batch::new(self);

// Start the timeout counter only once we have received a
// valid and non-filtered event.
while batch.events.is_empty() {
let item = tokio::select! {
_ = &mut shutdown => return false,
item = stream.next() => item,
_ = &mut shutdown => {
stderr_handler.abort();
return false;
},
item = stdout_stream.next() => item,
};
if !batch.handle_next(item) {
stderr_handler.abort();
return true;
}
}
Expand All @@ -517,7 +528,7 @@ impl JournaldSource {
for _ in 1..batch_size {
tokio::select! {
_ = &mut timeout => break,
result = stream.next() => if !batch.handle_next(result) {
result = stdout_stream.next() => if !batch.handle_next(result) {
break;
}
}
Expand All @@ -528,6 +539,29 @@ impl JournaldSource {
{
break x;
}
};

stderr_handler.abort();
result
}

/// Handle stderr stream from journalctl process
async fn handle_stderr(mut stderr_stream: JournalStream) {
use futures::StreamExt;
while let Some(result) = stderr_stream.next().await {
match result {
Ok(line) => {
let line_str = String::from_utf8_lossy(&line);
let trimmed = line_str.trim();
if !trimmed.is_empty() {
warn!("journalctl stderr: {}", trimmed);
}
}
Err(err) => {
warn!("Error reading journalctl stderr: {}", err);
break;
}
}
}
}
}
Expand Down Expand Up @@ -675,6 +709,7 @@ impl StartJournalctl {
fn make_command(&self, checkpoint: Option<&str>) -> Command {
let mut command = Command::new(&self.path);
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
command.arg("--follow");
command.arg("--all");
command.arg("--show-cursor");
Expand Down Expand Up @@ -711,18 +746,24 @@ impl StartJournalctl {
fn start(
&mut self,
checkpoint: Option<&str>,
) -> crate::Result<(JournalStream, RunningJournalctl)> {
) -> crate::Result<(JournalStream, JournalStream, RunningJournalctl)> {
let mut command = self.make_command(checkpoint);

let mut child = command.spawn().context(JournalctlSpawnSnafu)?;

let stream = FramedRead::new(
let stdout_stream = FramedRead::new(
child.stdout.take().unwrap(),
CharacterDelimitedDecoder::new(b'\n'),
)
.boxed();
);

let stderr = child.stderr.take().unwrap();
let stderr_stream = FramedRead::new(stderr, CharacterDelimitedDecoder::new(b'\n'));

Ok((stream, RunningJournalctl(child)))
Ok((
stdout_stream.boxed(),
stderr_stream.boxed(),
RunningJournalctl(child),
))
}
}

Expand Down
Loading