Skip to content

Commit b953c55

Browse files
authored
Add logs when task finish running (#80)
1 parent 0f970a4 commit b953c55

1 file changed

Lines changed: 17 additions & 3 deletions

File tree

crates/fluxqueue-worker/src/worker.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use rmp_serde::from_slice;
66
use std::ffi::CString;
77
use std::path::{Path, PathBuf};
88
use std::sync::Arc;
9-
use std::time::Duration;
9+
use std::time::{Duration, Instant};
1010
use tokio::sync::watch;
1111
use tokio::task::JoinSet;
1212

@@ -127,10 +127,11 @@ async fn executor_loop(
127127
match res {
128128
Ok(Some(raw_data)) => {
129129
let task = deserialize_raw_task_data(&raw_data)?;
130+
let task_name = format!("{}:{}", &task.name, &task.id);
130131

131132
logger.info(format_args!(
132133
"Received a task '{}' with a total of {} Bytes",
133-
&task.name,
134+
&task_name,
134135
raw_data.len()
135136
));
136137

@@ -144,6 +145,7 @@ async fn executor_loop(
144145
return Ok(());
145146
};
146147

148+
let duration_start = Instant::now();
147149
let task_result = run_task(&task, task_function).await;
148150

149151
match task_result {
@@ -153,9 +155,21 @@ async fn executor_loop(
153155
.await {
154156
logger.error(format_args!("Failed to remove the task after successful run: {}", e));
155157
}
158+
let duration_end = duration_start.elapsed();
159+
logger.info(format_args!(
160+
"Task '{}' successfully finished in {}ms",
161+
&task_name,
162+
duration_end.as_millis()
163+
));
156164
}
157165
Err(e) => {
158-
logger.error(format_args!("Task '{}' failed: {}", &task.name, e));
166+
let duration_end = duration_start.elapsed();
167+
logger.error(format_args!(
168+
"Task '{}' failed in {}ms: {}",
169+
&task_name,
170+
duration_end.as_millis(),
171+
e
172+
));
159173
if let Err(err) = redis_client
160174
.mark_as_failed(&queue_name, &executor_id, &raw_data)
161175
.await {

0 commit comments

Comments
 (0)