Skip to content

refactor: reduce log verbosity #21184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> Drop for CoordinatedS
warn!("unable to send stop due to channel full")
}
Some(Err(e)) => {
warn!(e = ?e.as_report(), "failed to stop the coordinator");
warn!(e = %e.as_report(), "failed to stop the coordinator");
}
Some(Ok(_)) => {}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kinesis/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl KinesisSplitReader {
Err(SdkError::ServiceError(e))
if e.err().is_provisioned_throughput_exceeded_exception() =>
{
tracing::warn!(
tracing::debug!(
"stream {:?} shard {:?} throughput exceeded, retry",
self.stream_name,
self.shard_id
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
warn!(database_id = database_id.database_id, err = %e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
})?;
let workers = runtime_info.database_fragment_info.workers();
for worker_id in workers {
Expand Down Expand Up @@ -711,7 +711,7 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
.reload_runtime_info()
.await?;
runtime_info_snapshot.validate().inspect_err(|e| {
warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
warn!(err = %e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
})?;
let BarrierWorkerRuntimeInfoSnapshot {
active_streaming_nodes,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl ActiveStreamingWorkerNodes {
}
}
Err(e) => {
warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
warn!(e = %e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/event_handler/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl UploadingTask {
debug!(task_info = ?self.task_info, "upload task finish");
}
})
.inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
.inspect_err(|e| error!(task_info = ?self.task_info, err = %e.as_report(), "upload task failed"))
.map(|output| {
Arc::new(StagingSstableInfo::new(
output.new_value_ssts,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/exchange/permit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Sender {
Message::Chunk(c) => {
let card = c.cardinality().clamp(1, self.max_chunk_permits);
if card == self.max_chunk_permits {
tracing::warn!(cardinality = c.cardinality(), "large chunk in exchange")
tracing::debug!(cardinality = c.cardinality(), "large chunk in exchange")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug means that we are not able to see it in production anymore. Is it still expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m okay with keeping it at WARN if it aids in troubleshooting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it at WARN, I think it suggests some issue on the user side if this warning keeps showing up. It can affect our backpressure mechanism if chunks are too large.

}
Some(permits::Value::Record(card as _))
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<S: StateStore> NowExecutor<S> {
{
let mut curr_timestamp = None;
if barriers.len() > 1 {
warn!(
debug!(
"handle multiple barriers at once in now executor: {}",
barriers.len()
);
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
let Ok(msg) = msg else {
let e = msg.unwrap_err();
tracing::warn!(
error = ?e.as_report(),
error = %e.as_report(),
source_id = %self.source_id,
"stream source reader error",
);
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl<S: StateStore> SourceExecutor<S> {
Err(e) => {
tracing::error!(
target: "auto_schema_change",
error = ?e.as_report(), "schema change error");
error = %e.as_report(), "schema change error");
finish_tx.send(()).unwrap();
}
}
Expand Down Expand Up @@ -352,7 +352,7 @@ impl<S: StateStore> SourceExecutor<S> {
) -> StreamExecutorResult<()> {
let core = self.stream_source_core.as_mut().unwrap();
tracing::warn!(
error = ?e.as_report(),
error = %e.as_report(),
actor_id = self.actor_ctx.id,
source_id = %core.source_id,
"stream source reader error",
Expand Down
Loading