diff --git a/src/connector/src/sink/coordinate.rs b/src/connector/src/sink/coordinate.rs index 650471f3a1b28..3222f6b487f21 100644 --- a/src/connector/src/sink/coordinate.rs +++ b/src/connector/src/sink/coordinate.rs @@ -105,7 +105,7 @@ impl>> Drop for CoordinatedS warn!("unable to send stop due to channel full") } Some(Err(e)) => { - warn!(e = ?e.as_report(), "failed to stop the coordinator"); + warn!(e = %e.as_report(), "failed to stop the coordinator"); } Some(Ok(_)) => {} } diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index 65df4d1c79366..715fadc1d3282 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -201,7 +201,7 @@ impl KinesisSplitReader { Err(SdkError::ServiceError(e)) if e.err().is_provisioned_throughput_exceeded_exception() => { - tracing::warn!( + tracing::debug!( "stream {:?} shard {:?} throughput exceeded, retry", self.stream_name, self.shard_id diff --git a/src/meta/src/barrier/worker.rs b/src/meta/src/barrier/worker.rs index 7cc8010020f62..7c1893035f694 100644 --- a/src/meta/src/barrier/worker.rs +++ b/src/meta/src/barrier/worker.rs @@ -341,7 +341,7 @@ impl GlobalBarrierWorker { self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await; match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => { runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| { - warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate"); + warn!(database_id = database_id.database_id, err = %e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate"); })?; let workers = runtime_info.database_fragment_info.workers(); for worker_id in workers { @@ -711,7 +711,7 @@ impl GlobalBarrierWorker { .reload_runtime_info() .await?; runtime_info_snapshot.validate().inspect_err(|e| { - warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate"); + warn!(err = %e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate"); })?; let BarrierWorkerRuntimeInfoSnapshot { active_streaming_nodes, diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 2c7fa423abb08..ab60d4b69053c 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -243,7 +243,7 @@ impl ActiveStreamingWorkerNodes { } } Err(e) => { - warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot"); + warn!(e = %e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot"); } } } diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 06354696de886..6727b6f83c6af 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -283,7 +283,7 @@ impl UploadingTask { debug!(task_info = ?self.task_info, "upload task finish"); } }) - .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed")) + .inspect_err(|e| error!(task_info = ?self.task_info, err = %e.as_report(), "upload task failed")) .map(|output| { Arc::new(StagingSstableInfo::new( output.new_value_ssts, diff --git a/src/stream/src/executor/exchange/permit.rs b/src/stream/src/executor/exchange/permit.rs index df8b4085eb21b..867d3bb7439e5 100644 --- a/src/stream/src/executor/exchange/permit.rs +++ b/src/stream/src/executor/exchange/permit.rs @@ -128,7 +128,7 @@ impl Sender { Message::Chunk(c) => { let card = c.cardinality().clamp(1, self.max_chunk_permits); if card == self.max_chunk_permits { - tracing::warn!(cardinality = c.cardinality(), "large chunk in exchange") + tracing::debug!(cardinality = c.cardinality(), "large chunk in exchange") } Some(permits::Value::Record(card as _)) } diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 9288a0163e5e8..243a202510e60 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -118,7 +118,7 @@ impl NowExecutor { { let mut curr_timestamp = None; if barriers.len() > 1 { - warn!( + debug!( "handle multiple barriers at once in now executor: {}", barriers.len() ); diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 9acb19932969e..4d2bb8a62946b 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -469,7 +469,7 @@ impl SourceBackfillExecutorInner { let Ok(msg) = msg else { let e = msg.unwrap_err(); tracing::warn!( - error = ?e.as_report(), + error = %e.as_report(), source_id = %self.source_id, "stream source reader error", ); diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 9097756401c8f..40c7009aa1f12 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -167,7 +167,7 @@ impl SourceExecutor { Err(e) => { tracing::error!( target: "auto_schema_change", - error = ?e.as_report(), "schema change error"); + error = %e.as_report(), "schema change error"); finish_tx.send(()).unwrap(); } } @@ -352,7 +352,7 @@ impl SourceExecutor { ) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); tracing::warn!( - error = ?e.as_report(), + error = %e.as_report(), actor_id = self.actor_ctx.id, source_id = %core.source_id, "stream source reader error",