Skip to content

Commit d779dc5

Browse files
authored
chore(kms-connector): better logs for message flow (#587)
1 parent 8e243c9 commit d779dc5

File tree

3 files changed

+25
-17
lines changed

3 files changed

+25
-17
lines changed

kms-connector/simple-connector/src/core/coordination/scheduler.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,21 @@ impl CoordinatedMessage {
6262
};
6363

6464
// Log concise scheduling info with message type
65-
info!(
66-
"⏳ Scheduled {}-{} in {}ms (delta: {}ms)",
67-
event_type,
68-
request_id,
69-
send_time.saturating_sub(now),
70-
send_delta_ms
71-
);
65+
let delay_ms = send_time as i64 - now as i64;
66+
67+
if delay_ms < 0 {
68+
// Only calculate when needed - delayed case
69+
info!(
70+
"⏳ Scheduled {}-{} in 0ms (delta: {}ms, delayed by {}ms)",
71+
event_type, request_id, send_delta_ms, -delay_ms
72+
);
73+
} else {
74+
// Normal case - use delay_ms directly (already positive)
75+
info!(
76+
"⏳ Scheduled {}-{} in {}ms (delta: {}ms)",
77+
event_type, request_id, delay_ms, send_delta_ms
78+
);
79+
}
7280

7381
Self {
7482
event,
@@ -166,7 +174,7 @@ impl<P: Provider + Clone + 'static> MessageScheduler<P> {
166174
if current_size >= self.config.pending_events_max {
167175
// Queue is completely full - signal critical backpressure and wait
168176
warn!(
169-
"💥 Message queue at capacity ({}), signaling critical backpressure",
177+
"🔥🔥🔥 Message queue at capacity ({}), signaling critical backpressure",
170178
current_size
171179
);
172180
let _ = self.backpressure_tx.send(BackpressureSignal::QueueCritical);
@@ -187,14 +195,14 @@ impl<P: Provider + Clone + 'static> MessageScheduler<P> {
187195
} else if current_size >= critical_threshold {
188196
// Queue is critically full - signal strong backpressure
189197
warn!(
190-
"🧨🧨 Message queue critically full ({}/{}), signaling backpressure",
198+
"⚠️🚨 Message queue critically full ({}/{}), signaling backpressure",
191199
current_size, self.config.pending_events_max
192200
);
193201
let _ = self.backpressure_tx.send(BackpressureSignal::QueueCritical);
194202
} else if current_size >= queue_threshold {
195203
// Queue is getting full - signal moderate backpressure
196204
warn!(
197-
"🧨 Message queue filling up ({}/{}), signaling backpressure",
205+
"⚠️ Message queue filling up ({}/{}), signaling backpressure",
198206
current_size, self.config.pending_events_max
199207
);
200208
let _ = self.backpressure_tx.send(BackpressureSignal::QueueFull);
@@ -513,7 +521,7 @@ impl<P: Provider + Clone + 'static> MessageScheduler<P> {
513521
match event {
514522
KmsCoreEvent::PublicDecryptionRequest(request) => {
515523
info!(
516-
"Preparing PublicDecryptionRequest-{} for sending",
524+
"📤 Dequeuing PublicDecryptionRequest-{}",
517525
request.decryptionId
518526
);
519527

@@ -571,7 +579,7 @@ impl<P: Provider + Clone + 'static> MessageScheduler<P> {
571579
}
572580
KmsCoreEvent::UserDecryptionRequest(request) => {
573581
info!(
574-
"Preparing UserDecryptionRequest-{} for sending",
582+
"📤 Dequeuing UserDecryptionRequest-{}",
575583
request.decryptionId
576584
);
577585

kms-connector/simple-connector/src/core/decryption/handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl<P: Provider + Clone> DecryptionHandler<P> {
7373
.collect();
7474

7575
info!(
76-
"Processing {}DecryptionRequest-{} with {} ciphertexts, key_id: {}, FHE types: [{}]",
76+
"⚙️ Processing {}DecryptionRequest-{} with {} ciphertexts, key_id: {}, FHE types: [{}]",
7777
request_type,
7878
request_id.to_string(),
7979
sns_ciphertext_materials.len(),

kms-connector/simple-connector/src/core/event_processor/processors.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ impl<P: Provider + Clone + 'static> EventProcessor<P> {
164164
let result = match event {
165165
KmsCoreEvent::PublicDecryptionRequest(req) => {
166166
info!(
167-
"Processing PublicDecryptionRequest-{}",
167+
"📭 Received PublicDecryptionRequest-{}",
168168
req.decryptionId
169169
);
170170

@@ -203,7 +203,7 @@ impl<P: Provider + Clone + 'static> EventProcessor<P> {
203203

204204
// Use MessageScheduler if coordinated sending is enabled
205205
if let Some(scheduler) = &self.message_scheduler {
206-
info!("Scheduling PublicDecryptionRequest-{} for coordinated sending", req_clone.decryptionId);
206+
info!("📋 Queuing PublicDecryptionRequest-{} for coordinated sending", req_clone.decryptionId);
207207

208208
// Get block timestamp for coordinated sending - this fixes the critical timing bug
209209
let event_id = req_clone.decryptionId.to_string();
@@ -235,7 +235,7 @@ impl<P: Provider + Clone + 'static> EventProcessor<P> {
235235

236236
KmsCoreEvent::UserDecryptionRequest(req) => {
237237
info!(
238-
"Processing UserDecryptionRequest-{}",
238+
"📭 Received UserDecryptionRequest-{}",
239239
req.decryptionId
240240
);
241241

@@ -282,7 +282,7 @@ impl<P: Provider + Clone + 'static> EventProcessor<P> {
282282

283283
// Use MessageScheduler if coordinated sending is enabled
284284
if let Some(scheduler) = &self.message_scheduler {
285-
info!("Scheduling UserDecryptionRequest-{} for coordinated sending", req_clone.decryptionId);
285+
info!("📋 Queuing UserDecryptionRequest-{} for coordinated sending", req_clone.decryptionId);
286286

287287
// Get block timestamp for coordinated sending - this fixes the critical timing bug
288288
let event_id = req_clone.decryptionId.to_string();

0 commit comments

Comments
 (0)