Skip to content

Commit 3a6c574

Browse files
committed
dekaf: Remove log forwarding from the registry
We never ended up setting a task name so no logs got sent, and it's best if we don't allow external data like from HTTP requests to get into the ops logs
1 parent 2a2e801 commit 3a6c574

File tree

4 files changed

+24
-51
lines changed

4 files changed

+24
-51
lines changed

crates/dekaf/src/lib.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ impl App {
243243
// We only set this after successfully validating that this task exists and is a Dekaf
244244
// materialization. Otherwise we will either log auth errors attempting to append to
245245
// a journal that doesn't exist, or possibly log confusing errors to a different task's logs entirely.
246-
logging::get_log_forwarder().set_task_name(username.clone(), labels.build.clone());
246+
logging::get_log_forwarder()
247+
.map(|f| f.set_task_name(username.clone(), labels.build.clone()));
247248

248249
// 3. Validate that the provided password matches the task's bearer token
249250
if password != config.token {
@@ -261,8 +262,8 @@ impl App {
261262
)?))
262263
} else if username.contains("{") {
263264
// Since we don't have a task, we also don't have a logs journal to write to,
264-
// so we should isable log forwarding for this session.
265-
logging::get_log_forwarder().shutdown();
265+
// so we should disable log forwarding for this session.
266+
logging::get_log_forwarder().map(|f| f.shutdown());
266267

267268
let raw_token = String::from_utf8(base64::decode(password)?.to_vec())?;
268269
let refresh: RefreshToken = serde_json::from_str(raw_token.as_str())?;

crates/dekaf/src/logging.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ where
131131
)
132132
}
133133

134-
pub fn get_log_forwarder() -> TaskForwarder<GazetteWriter> {
135-
TASK_FORWARDER.get()
134+
pub fn get_log_forwarder() -> Option<TaskForwarder<GazetteWriter>> {
135+
TASK_FORWARDER.try_with(|v| v.clone()).ok()
136136
}
137137

138138
pub fn set_log_level(level: ops::LogLevel) {

crates/dekaf/src/read.rs

+18-16
Original file line numberDiff line numberDiff line change
@@ -354,22 +354,24 @@ impl Read {
354354
// Right: Input documents from journal. Left: Input docs from destination. Out: Right Keys ⋃ Left Keys
355355
// Dekaf reads docs from journals, so it emits "right". It doesn't do reduction with a destination system,
356356
// so it does not emit "left". And right now, it does not reduce at all, so "out" is the same as "right".
357-
logging::get_log_forwarder().send_stats(
358-
self.collection_name.to_owned(),
359-
ops::stats::Binding {
360-
right: Some(ops::stats::DocsAndBytes {
361-
docs_total: stats_records,
362-
bytes_total: stats_bytes,
363-
}),
364-
out: Some(ops::stats::DocsAndBytes {
365-
docs_total: stats_records,
366-
bytes_total: stats_bytes,
367-
}),
368-
left: None,
369-
last_source_published_at: last_source_published_at
370-
.and_then(|c| c.to_pb_json_timestamp()),
371-
},
372-
);
357+
logging::get_log_forwarder().map(|f| {
358+
f.send_stats(
359+
self.collection_name.to_owned(),
360+
ops::stats::Binding {
361+
right: Some(ops::stats::DocsAndBytes {
362+
docs_total: stats_records,
363+
bytes_total: stats_bytes,
364+
}),
365+
out: Some(ops::stats::DocsAndBytes {
366+
docs_total: stats_records,
367+
bytes_total: stats_bytes,
368+
}),
369+
left: None,
370+
last_source_published_at: last_source_published_at
371+
.and_then(|c| c.to_pb_json_timestamp()),
372+
},
373+
);
374+
});
373375

374376
let frozen = buf.freeze();
375377

crates/dekaf/src/registry.rs

-30
Original file line numberDiff line numberDiff line change
@@ -27,42 +27,12 @@ pub fn build_router(app: Arc<App>) -> axum::Router<()> {
2727
get(get_subject_latest),
2828
)
2929
.route("/schemas/ids/:id", get(get_schema_by_id))
30-
.route_layer(axum::middleware::from_fn_with_state(
31-
app.clone(),
32-
attach_logger,
33-
))
3430
.layer(tower_http::trace::TraceLayer::new_for_http())
3531
.with_state(app);
3632

3733
schema_router
3834
}
3935

40-
async fn attach_logger(
41-
axum::extract::State(app): axum::extract::State<Arc<App>>,
42-
request: axum::extract::Request,
43-
next: axum::middleware::Next,
44-
) -> Response {
45-
let writer = GazetteWriter::new(app);
46-
47-
logging::forward_logs(
48-
writer,
49-
tokio_util::sync::CancellationToken::new(),
50-
async move {
51-
if let Some(user_agent) = request.headers().get(USER_AGENT) {
52-
match user_agent.to_str() {
53-
Ok(user_agent) => {
54-
tracing::Span::current()
55-
.record_hierarchical(SESSION_CLIENT_ID_FIELD_MARKER, user_agent);
56-
}
57-
Err(e) => tracing::warn!(?e, "Got bad user agent header"),
58-
};
59-
}
60-
next.run(request).await
61-
},
62-
)
63-
.await
64-
}
65-
6636
// List all collections as "subjects", which are generally Kafka topics in the ecosystem.
6737
#[tracing::instrument(skip_all)]
6838
async fn all_subjects(

0 commit comments

Comments
 (0)