Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use cli::run;
fn main() -> anyhow::Result<()> {
println!("Relago daemon application is started without fuckery!!!");

let _ = daemon::core::run();
let _ = daemon::journal::run();
// run();

Ok(())
Expand Down
1 change: 1 addition & 0 deletions crates/daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ systemd = "0.10.1"
dbus = { workspace = true }
utils = { path = "../utils" }
dbus-crossroads = { workspace = true }
nom = { workspace = true }

# nixlog ={ path = "../nixlog"}
212 changes: 135 additions & 77 deletions crates/daemon/src/journal.rs
Original file line number Diff line number Diff line change
@@ -1,129 +1,187 @@
#![warn(rust_2018_idioms)]

//! Follow future journal log messages and print up to 100 of them.
use anyhow::anyhow;
use nom::{self, Parser, branch::alt};
use systemd::journal::{self, Journal, JournalEntryField, JournalSeek};
use tracing::error;

const KEY_UNIT: &str = "_SYSTEMD_UNIT";
const KEY_MESSAGE: &str = "MESSAGE";
const KEY_PRIORITY: &str = "PRIORITY";
const KEY_CODE_FILE: &str = "CODE_FILE";
const KEY_ERRNO: &str = "ERRNO";
const KEY_SYSLOG_RAW: &str = "SYSLOG_RAW";
const KEY_STDOUT_ERR: &str = "_TRANSPORT=stdout";

const MAX_MESSAGES: usize = 5;

#[derive(Debug)]
struct JournalItem {
message: String,
// message_id: String,
priority: String,
unit: Option<String>,
errno: Option<String>,
std_err: Option<String>,
pub mod fields {
pub const MESSAGE: &str = "MESSAGE";
pub const MESSAGE_ID: &str = "MESSAGE_ID";
pub const PRIORITY: &str = "PRIORITY";
pub const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
pub const PID: &str = "_PID";
pub const UID: &str = "_UID";
pub const EXE: &str = "_EXE";
pub const COMM: &str = "_COMM";
pub const CMDLINE: &str = "_CMDLINE";
pub const TRANSPORT: &str = "_TRANSPORT";
pub const BOOT_ID: &str = "_BOOT_ID";
pub const ERRNO: &str = "ERRNO";
pub const CODE_FILE: &str = "CODE_FILE";
pub const SYSLOG_IDENTIFIER: &str = "SYSLOG_IDENTIFIER";

// for service
pub const UNIT: &str = "UNIT";
pub const USER_UNIT: &str = "__SEQNUM"; // Fixme: remove

pub const JOB_RESULT: &str = "JOB_RESULT";
pub const EXIT_CODE: &str = "EXIT_CODE";
pub const EXIT_STATUS: &str = "EXIT_STATUS";

// Coredump-specific fields
pub const COREDUMP_PID: &str = "COREDUMP_PID";
pub const COREDUMP_EXE: &str = "COREDUMP_EXE";
pub const COREDUMP_COMM: &str = "COREDUMP_COMM";
pub const COREDUMP_SIGNAL: &str = "COREDUMP_SIGNAL";
pub const COREDUMP_SIGNAL_NAME: &str = "COREDUMP_SIGNAL_NAME";
pub const COREDUMP_FILENAME: &str = "COREDUMP_FILENAME";
pub const COREDUMP_UID: &str = "COREDUMP_UID";
pub const COREDUMP_CMDLINE: &str = "COREDUMP_CMDLINE";
}

fn from_journal_fields(mut journal: Journal) -> JournalItem {
let keys = vec![KEY_UNIT, KEY_MESSAGE];
#[derive(Debug, Clone)]
pub enum Scope {
Service {
unit: String,
user_unit: Option<String>,
job_result: Option<String>,
exit_code: Option<u32>,
exit_signal: Option<String>,
},
Coredump {
exe: String
},

// let a = journal.get_data(field)
todo!()
}

// TODO: Need handle errror when illegal journal field
fn get_field_from_journal(journal: &mut Journal, j_field: &str) -> Option<String> {
let Some(Some(entry)) = journal.get_data(j_field).ok() else {
return None;
};
#[derive(Debug, Clone)]
pub struct JournalEntry {
pub message: String,
pub user_unit: Option<String>,
// pub priority: Option<u8>,
pub systemd_unit: Option<String>,
pub pid: Option<u32>,
pub exe: Option<String>,
pub comm: Option<String>,
pub transport: Option<String>,
pub errno: Option<i32>,
pub scope: Option<Scope>,
}

// TODO: need match option
return entry
pub fn get_field(journal: &mut Journal, field: &str) -> Option<String> {
let entry = journal.get_data(field).ok()??;
entry
.value()
.map(String::from_utf8_lossy)
.map(|v| v.into_owned());
.map(|v| v.into_owned())
}
struct J {

pub fn parse_scope(journal: &mut Journal) -> Option<Scope> {
let user_unit = get_field(journal, fields::USER_UNIT);
match get_field(journal, "UNIT") {
Some(unit) => Some(Scope::Service {
unit,
user_unit,
job_result: get_field(journal, fields::JOB_RESULT),
exit_code: get_field(journal, fields::PID).and_then(|s| s.parse().ok()),
exit_signal: get_field(journal, fields::EXIT_STATUS),
}),
None => None,
}
}

pub fn parse_dump(journal: &mut Journal) -> Option<Scope> {
let exe = get_field(journal, "COREDUMP_EXE")
.or_else(|| get_field(journal, "_EXE"));
match exe {
Some(exe) => Some(Scope::Coredump { exe }),
None => None
}
}

/// Extract a structured [`JournalEntry`] from the current journal position.
pub fn extract_entry(journal: &mut Journal) -> Option<JournalEntry> {
let message = get_field(journal, fields::MESSAGE)?;
let user_unit = get_field(journal, fields::USER_UNIT);
let scope = parse_scope(journal).or(parse_dump(journal));

Some(JournalEntry {
message,
// priority: get_field(journal, fields::PRIORITY).and_then(|s| s.parse().ok()),
user_unit,
systemd_unit: get_field(journal, fields::SYSTEMD_UNIT),
pid: get_field(journal, fields::PID).and_then(|s| s.parse().ok()),
exe: get_field(journal, fields::EXE),
comm: get_field(journal, fields::COMM),
transport: get_field(journal, fields::TRANSPORT),
errno: get_field(journal, fields::ERRNO).and_then(|s| s.parse().ok()),
// scope: parse_scope(journal),
scope,
})
}

pub struct JournalTail {
journal: Journal,
}

impl J {
pub fn new(mut journal: Journal) -> anyhow::Result<Self> {
// Seek to end of current log to prevent old messages from being printed
impl JournalTail {
/// Open the journal and seek to the tail (only new entries).
pub fn open() -> anyhow::Result<Self> {
let mut journal = journal::OpenOptions::default()
.open()
.map_err(|e| anyhow!("Could not open journal: {e}"))?;

journal
.seek(JournalSeek::Tail)
.map_err(|_| anyhow!("Could not seek to end of journal"))?;

// JournalSeek::Tail goes to the position after the most recent entry so step back to
// point to the most recent entry.
// Tail points past the last entry, step back to it
journal.previous()?;

Ok(Self { journal })
}

pub fn add_match(mut self, field: &str, value: &str) -> anyhow::Result<Self> {
self.journal
.match_add(field, value)
.map_err(|e| anyhow!("Failed to add match {field}={value}: {e}"))?;
Ok(self)
}
}

impl Iterator for J {
type Item = JournalItem;
impl Iterator for JournalTail {
type Item = JournalEntry;

fn next(&mut self) -> Option<Self::Item> {
loop {
match self.journal.next() {
Ok(0) => {
if let Err(err) = self.journal.wait(None) {
error!(error = %err, "failed to wait on journal");
error!(error = %err, "Failed to wait on journal");
return None;
}
}
Ok(_) => {
let message = get_field_from_journal(&mut self.journal, KEY_MESSAGE)?;

let unit = get_field_from_journal(&mut self.journal, KEY_UNIT);
// let unit = "someUNit".to_string();

let priority = get_field_from_journal(&mut self.journal, KEY_PRIORITY)?;
let errno = get_field_from_journal(&mut self.journal, KEY_ERRNO);
let std_err = get_field_from_journal(&mut self.journal, KEY_STDOUT_ERR);

println!("{}", priority);

let jr = JournalItem {
message,
priority,
unit,
errno,
std_err,
};

return Some(jr);
if let Some(entry) = extract_entry(&mut self.journal) {
return Some(entry);
}
}
Err(err) => {
error!(error = %err, "failed to get next on journal");
error!(error = %err, "Failed to read next journal entry");
return None;
}
}
}
}
}

pub fn main() -> Result<(), Box<dyn std::error::Error>> {
pub fn run() -> Result<(), Box<dyn std::error::Error>> {
println!("Starting journal-logger");

// Open the journal
let journal = journal::OpenOptions::default()
.open()
.expect("Could not open journal");

let j = J::new(journal)?;

let mut a = j.into_iter();
while let Some(log) = a.next() {
println!("{log:#?}");
let tail = JournalTail::open()?;
for entry in tail {
println!("{entry:#?}");
}

Ok(())
}

pub fn run() -> Result<(), Box<dyn std::error::Error>> {
main()
}
3 changes: 1 addition & 2 deletions crates/utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#![allow(dead_code)]


pub mod notify;
pub mod notify;