Skip to content

Commit 35a04d3

Browse files
committed
Cleanups
1 parent c870016 commit 35a04d3

11 files changed

Lines changed: 86 additions & 49 deletions

File tree

liblogjet/include/liblogjet.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ void lj_ingest_set_callback(lj_ingest_plugin *ctx, lj_record_callback cb, void *
163163
void lj_ingest_set_generic_callback(lj_ingest_plugin *ctx, lj_generic_record_callback cb, void *user);
164164
int lj_ingest_feed(lj_ingest_plugin *ctx, const uint8_t *data, size_t len);
165165
int lj_ingest_fetch(lj_ingest_plugin *ctx);
166+
const char *lj_ingest_last_error(lj_ingest_plugin *ctx);
166167
void lj_ingest_free(lj_ingest_plugin *ctx);
167168

168169
#ifdef __cplusplus

logjetd/src/plugin.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ fn push_unique_path(roots: &mut Vec<PathBuf>, path: PathBuf) {
449449
}
450450
}
451451

452-
// ── C ABI types mirroring liblogjet.h ───────────────────────────────────────
452+
// C ABI types mirroring liblogjet.h
453453

454454
// Legacy log-only signal mask (used when reserved[0] == 0 for old plugins).
455455
#[allow(dead_code)]
@@ -516,11 +516,12 @@ type SetCallbackFn = unsafe extern "C" fn(*mut LjIngestPlugin, RecordCallback, *
516516
type FeedFn = unsafe extern "C" fn(*mut LjIngestPlugin, *const u8, usize) -> c_int;
517517
type FetchFn = unsafe extern "C" fn(*mut LjIngestPlugin) -> c_int;
518518
type FreeFn = unsafe extern "C" fn(*mut LjIngestPlugin);
519+
type LastErrorFn = unsafe extern "C" fn(*mut LjIngestPlugin) -> *const c_char;
519520
type RecordCallback = unsafe extern "C" fn(*mut c_void, *const LjLogRecord);
520521
type GenericRecordCallback = unsafe extern "C" fn(*mut c_void, *const LjIngestRecordV1);
521522
type SetGenericCallbackFn = unsafe extern "C" fn(*mut LjIngestPlugin, GenericRecordCallback, *mut c_void);
522523

523-
// ── Plugin handle ───────────────────────────────────────────────────────────
524+
// Plugin handle
524525

525526
/// Resolved symbols from a loaded ingest plugin.
526527
struct PluginHandle {
@@ -534,6 +535,8 @@ struct PluginHandle {
534535
/// Multi-signal plugins export `lj_ingest_set_generic_callback`. If
535536
/// present, ljd calls it instead of `lj_ingest_set_callback`.
536537
set_generic_callback: Option<SetGenericCallbackFn>,
538+
/// Optional error message retrieval (`lj_ingest_last_error`).
539+
last_error: Option<LastErrorFn>,
537540
free: FreeFn,
538541
}
539542

@@ -554,6 +557,8 @@ impl PluginHandle {
554557
let fetch: Option<FetchFn> = lib.get::<FetchFn>(b"lj_ingest_fetch\0").ok().map(|sym| *sym);
555558
let set_generic_callback: Option<SetGenericCallbackFn> =
556559
lib.get::<SetGenericCallbackFn>(b"lj_ingest_set_generic_callback\0").ok().map(|sym| *sym);
560+
let last_error: Option<LastErrorFn> =
561+
lib.get::<LastErrorFn>(b"lj_ingest_last_error\0").ok().map(|sym| *sym);
557562
let free: libloading::Symbol<FreeFn> =
558563
lib.get(b"lj_ingest_free\0").map_err(|err| io::Error::other(format!("symbol lj_ingest_free: {err}")))?;
559564

@@ -563,6 +568,7 @@ impl PluginHandle {
563568
feed: *feed,
564569
fetch,
565570
set_generic_callback,
571+
last_error,
566572
free: *free,
567573
_lib: lib,
568574
})
@@ -575,7 +581,7 @@ impl PluginHandle {
575581
}
576582
}
577583

578-
// ── Callback plumbing ───────────────────────────────────────────────────────
584+
// Callback plumbing
579585

580586
/// Passed through the `void *user` pointer in the C callback.
581587
struct CallbackCtx {
@@ -777,7 +783,7 @@ pub(crate) fn build_otlp_payload(rec: OtlpRecord<'_>) -> Vec<u8> {
777783
request.encode_to_vec()
778784
}
779785

780-
// ── Public entry point ──────────────────────────────────────────────────────
786+
// Public entry point
781787

782788
/// Runs the plugin ingest loop: loads the .so, then either calls
783789
/// `lj_ingest_fetch` (active plugin) or binds TCP and feeds bytes (passive).
@@ -870,6 +876,15 @@ fn run_active_plugin(handle: &PluginHandle, spool: Arc<super::daemon::SharedSpoo
870876

871877
let rc = unsafe { fetch(plugin_ctx) };
872878

879+
if rc != 0
880+
&& let Some(last_error_fn) = handle.last_error {
881+
let msg = unsafe { last_error_fn(plugin_ctx) };
882+
if !msg.is_null() {
883+
let msg_str = unsafe { CStr::from_ptr(msg) }.to_string_lossy();
884+
eprintln!("ljd plugin error: {msg_str}");
885+
}
886+
}
887+
873888
unsafe { (handle.free)(plugin_ctx) };
874889
let _ = unsafe { Box::from_raw(ctx_ptr as *mut CallbackCtx) };
875890

plugins/logcat-ingest/src/lib.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::ffi::{CString, c_char, c_int, c_void};
1313
use std::io::{self, BufRead, BufReader};
1414
use std::time::{SystemTime, UNIX_EPOCH};
1515

16-
// ── C ABI types (must match liblogjet.h exactly) ────────────────────────────
16+
// C ABI types (must match liblogjet.h exactly)
1717

1818
#[repr(C)]
1919
pub struct LjAttribute {
@@ -71,7 +71,7 @@ pub extern "C" fn lj_ingest_descriptor_v1() -> *const LjIngestDescriptorV1 {
7171
&LOGCAT_INGEST_DESCRIPTOR.0
7272
}
7373

74-
// ── Severity constants ──────────────────────────────────────────────────────
74+
// Severity constants
7575

7676
const LJ_SEVERITY_TRACE: i32 = 1;
7777
const LJ_SEVERITY_DEBUG: i32 = 5;
@@ -81,14 +81,14 @@ const LJ_SEVERITY_ERROR: i32 = 17;
8181
const LJ_SEVERITY_FATAL: i32 = 21;
8282
const LJ_ATTR_STRING: i32 = 0;
8383

84-
// ── Plugin context ──────────────────────────────────────────────────────────
84+
// Plugin context
8585

8686
pub struct LogcatPlugin {
8787
callback: Option<RecordCallback>,
8888
user: *mut c_void,
8989
}
9090

91-
// ── Exported C ABI ──────────────────────────────────────────────────────────
91+
// Exported C ABI
9292

9393
/// Creates a new logcat parsing context.
9494
#[unsafe(no_mangle)]
@@ -158,7 +158,7 @@ pub unsafe extern "C" fn lj_ingest_free(ctx: *mut LogcatPlugin) {
158158
let _ = unsafe { Box::from_raw(ctx) };
159159
}
160160

161-
// ── Logcat parsing ──────────────────────────────────────────────────────────
161+
// Logcat parsing
162162

163163
struct Parsed<'a> {
164164
severity: i32,
@@ -263,7 +263,7 @@ fn map_logcat_level(ch: u8) -> (i32, &'static str) {
263263
}
264264
}
265265

266-
// ── Record emission ─────────────────────────────────────────────────────────
266+
// Record emission
267267

268268
fn emit_record(ctx: &LogcatPlugin, line: &str) {
269269
let Some(cb) = ctx.callback else { return };

plugins/perfetto-ingest/src/lib.rs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ mod trace_mapper;
1515

1616
use std::ffi::{c_char, c_int, c_void};
1717

18-
// ── C ABI types (must match liblogjet.h exactly) ────────────────────────────
18+
// C ABI types (must match liblogjet.h exactly)
1919

2020
#[repr(C)]
2121
pub struct LjAttribute {
@@ -66,7 +66,7 @@ pub struct LjIngestDescriptorV1 {
6666
reserved: [u64; 8],
6767
}
6868

69-
// ── Signal constants ────────────────────────────────────────────────────────
69+
// Signal constants
7070

7171
const LJ_INGEST_SIGNAL_LOGS: u32 = 1 << 0;
7272
const LJ_INGEST_SIGNAL_METRICS: u32 = 1 << 1;
@@ -82,7 +82,7 @@ const LJ_INGEST_RECORD_TYPE_TRACES: u32 = 3;
8282
#[allow(dead_code)]
8383
const LJ_INGEST_RECORD_TYPE_EVENTS: u32 = 4;
8484

85-
// ── Descriptor ──────────────────────────────────────────────────────────────
85+
// Descriptor
8686

8787
struct IngestDescriptor(LjIngestDescriptorV1);
8888

@@ -107,16 +107,17 @@ pub extern "C" fn lj_ingest_descriptor_v1() -> *const LjIngestDescriptorV1 {
107107
&PERFETTO_INGEST_DESCRIPTOR.0
108108
}
109109

110-
// ── Plugin context ──────────────────────────────────────────────────────────
110+
// Plugin context
111111

112112
pub struct PerfettoPlugin {
113113
pub(crate) legacy_callback: Option<RecordCallback>,
114114
pub(crate) legacy_user: *mut c_void,
115115
pub(crate) generic_callback: Option<GenericRecordCallback>,
116116
pub(crate) generic_user: *mut c_void,
117+
last_error: Option<String>,
117118
}
118119

119-
// ── Exported C ABI ──────────────────────────────────────────────────────────
120+
// Exported C ABI
120121

121122
#[unsafe(no_mangle)]
122123
pub extern "C" fn lj_ingest_create() -> *mut PerfettoPlugin {
@@ -125,6 +126,7 @@ pub extern "C" fn lj_ingest_create() -> *mut PerfettoPlugin {
125126
legacy_user: std::ptr::null_mut(),
126127
generic_callback: None,
127128
generic_user: std::ptr::null_mut(),
129+
last_error: None,
128130
}))
129131
}
130132

@@ -164,6 +166,23 @@ pub unsafe extern "C" fn lj_ingest_feed(_ctx: *mut PerfettoPlugin, _data: *const
164166
0
165167
}
166168

169+
/// Returns the last error message, or NULL if none.
170+
///
171+
/// # Safety
172+
///
173+
/// `ctx` must be a valid pointer from `lj_ingest_create`.
174+
#[unsafe(no_mangle)]
175+
pub unsafe extern "C" fn lj_ingest_last_error(ctx: *mut PerfettoPlugin) -> *const c_char {
176+
if ctx.is_null() {
177+
return std::ptr::null();
178+
}
179+
let ctx = unsafe { &*ctx };
180+
match &ctx.last_error {
181+
Some(msg) => msg.as_ptr().cast::<c_char>(),
182+
None => std::ptr::null(),
183+
}
184+
}
185+
167186
/// Active source: reads a `.pftrace` file, invokes trace_processor, maps
168187
/// results to OTel, and streams records through the generic callback.
169188
///
@@ -176,31 +195,32 @@ pub unsafe extern "C" fn lj_ingest_fetch(ctx: *mut PerfettoPlugin) -> c_int {
176195
eprintln!("perfetto-ingest: lj_ingest_fetch called with null context");
177196
return -1;
178197
}
179-
let ctx = unsafe { &*ctx };
198+
let ctx = unsafe { &mut *ctx };
180199

181200
let trace_file = match std::env::var("LJD_PERFETTO_TRACE_FILE") {
182201
Ok(path) => std::path::PathBuf::from(path),
183202
Err(_) => {
184-
eprintln!("perfetto-ingest: LJD_PERFETTO_TRACE_FILE is not set");
203+
ctx.last_error = Some("LJD_PERFETTO_TRACE_FILE is not set".to_string());
185204
return -2;
186205
}
187206
};
188207

189208
if !trace_file.is_file() {
190-
eprintln!("perfetto-ingest: trace file not found: {}", trace_file.display());
209+
ctx.last_error = Some(format!("trace file not found: {}", trace_file.display()));
191210
return -3;
192211
}
193212

194213
match run_pipeline(ctx, &trace_file) {
195214
Ok(()) => 0,
196215
Err(err) => {
216+
ctx.last_error = Some(err.to_string());
197217
eprintln!("perfetto-ingest: {err}");
198218
-4
199219
}
200220
}
201221
}
202222

203-
fn run_pipeline(plugin: &PerfettoPlugin, trace_file: &std::path::Path) -> Result<(), String> {
223+
fn run_pipeline(plugin: &mut PerfettoPlugin, trace_file: &std::path::Path) -> Result<(), String> {
204224
let tp_path = perfetto_invoke::find_trace_processor()
205225
.map_err(|err| format!("trace_processor not found: {err}"))?;
206226

@@ -270,7 +290,7 @@ pub unsafe extern "C" fn lj_ingest_free(ctx: *mut PerfettoPlugin) {
270290
let _ = unsafe { Box::from_raw(ctx) };
271291
}
272292

273-
// ── Record emission helpers ─────────────────────────────────────────────────
293+
// Record emission helpers
274294

275295
/// Calls the generic callback with a pre-encoded OTLP payload.
276296
///

plugins/perfetto-ingest/src/sqlite_reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
use std::path::Path;
1010

11-
// ── Typed models ────────────────────────────────────────────────────────────
11+
// Typed models
1212

1313
#[derive(Debug, Clone)]
1414
pub struct PerfettoSlice {
@@ -69,7 +69,7 @@ pub struct PerfettoClockSnapshot {
6969
pub clock_value: i64,
7070
}
7171

72-
// ── Database reader ──────────────────────────────────────────────────────────
72+
// Database reader
7373

7474
pub struct PerfettoDb {
7575
pub(crate) conn: rusqlite::Connection,

plugins/perfetto-ingest/src/trace_mapper.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ fn flush_batch(
192192
Ok(())
193193
}
194194

195-
// ── ID generation ──────────────────────────────────────────────────────────
195+
// ID generation
196196

197197
fn make_trace_id() -> [u8; 16] {
198198
let mut id = [0u8; 16];
@@ -211,7 +211,7 @@ fn make_span_id(slice_id: i64) -> [u8; 8] {
211211
id
212212
}
213213

214-
// ── Attribute helpers ──────────────────────────────────────────────────────
214+
// Attribute helpers
215215

216216
fn key_value(key: &str, value: AnyValue) -> KeyValue {
217217
KeyValue { key: key.to_string(), value: Some(value) }

plugins/perfetto-ingest/tests/unit/tests.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use prost::Message;
66
/// Captured emitted record: (record_type, timestamp_unix_ns, payload).
77
type EmittedRecord = (u32, u64, Vec<u8>);
88

9-
// ── sqlite_reader tests ─────────────────────────────────────────────────────
9+
// sqlite_reader tests
1010

1111
#[test]
1212
fn sqlite_reader_reads_slices_ordered_by_ts() {
@@ -110,7 +110,7 @@ fn sqlite_reader_reads_clock_snapshots() {
110110
assert_eq!(snaps[1].clock_value, 1_700_000_000_000_010_000);
111111
}
112112

113-
// ── metrics_reader tests ────────────────────────────────────────────────────
113+
// metrics_reader tests
114114

115115
#[test]
116116
fn metrics_reader_parses_scalar_metric() {
@@ -154,7 +154,7 @@ fn metrics_reader_parses_nested_metric() {
154154
assert_eq!(metrics[0].children[0].scalar_value, Some(5.0));
155155
}
156156

157-
// ── helpers ─────────────────────────────────────────────────────────────────
157+
// helpers
158158

159159
fn temp_json(name: &str, content: &str) -> std::path::PathBuf {
160160
let dir = std::env::temp_dir();
@@ -199,7 +199,7 @@ fn test_db() -> super::sqlite_reader::PerfettoDb {
199199
super::sqlite_reader::PerfettoDb { conn }
200200
}
201201

202-
// ── timestamp tests ─────────────────────────────────────────────────────────
202+
// timestamp tests
203203

204204
fn make_snapshots(pairs: &[(i64, i64)]) -> Vec<crate::sqlite_reader::PerfettoClockSnapshot> {
205205
pairs.iter().map(|(ts, cv)| crate::sqlite_reader::PerfettoClockSnapshot { ts: *ts, clock_value: *cv }).collect()
@@ -267,7 +267,7 @@ fn timestamp_has_realtime() {
267267
assert!(conv.has_realtime());
268268
}
269269

270-
// ── trace_mapper tests ──────────────────────────────────────────────────────
270+
// trace_mapper tests
271271

272272
#[test]
273273
fn trace_mapper_produces_spans_from_slices() {
@@ -301,7 +301,7 @@ fn trace_mapper_fails_without_realtime_require() {
301301
assert!(result.is_err());
302302
}
303303

304-
// ── metric_mapper tests ─────────────────────────────────────────────────────
304+
// metric_mapper tests
305305

306306
#[test]
307307
fn metric_mapper_encodes_scalar_metrics() {
@@ -355,7 +355,7 @@ fn metric_mapper_flattens_nested_metrics() {
355355
assert!(names.contains(&"parent.child"));
356356
}
357357

358-
// ── log_mapper tests ────────────────────────────────────────────────────────
358+
// log_mapper tests
359359

360360
#[test]
361361
fn log_mapper_produces_summary_log() {
@@ -376,7 +376,7 @@ fn log_mapper_produces_summary_log() {
376376
}
377377
}
378378

379-
// ── run_pipeline integration test ─────────────────────────────────────────────
379+
// run_pipeline integration test
380380

381381
#[test]
382382
fn run_pipeline_integration_with_sqlite() {
@@ -395,7 +395,7 @@ fn run_pipeline_integration_with_sqlite() {
395395
let _ = std::fs::remove_file(&tmp);
396396
}
397397

398-
// ── test helpers for mapper tests ───────────────────────────────────────────
398+
// test helpers for mapper tests
399399

400400
fn run_trace_mapper(
401401
db: &super::sqlite_reader::PerfettoDb,
@@ -445,6 +445,7 @@ fn dummy_plugin(cb: super::GenericRecordCallback) -> super::PerfettoPlugin {
445445
legacy_user: std::ptr::null_mut(),
446446
generic_callback: Some(cb),
447447
generic_user: user_ptr,
448+
last_error: None,
448449
}
449450
}
450451

0 commit comments

Comments
 (0)