diff --git a/crates/core/src/kernel/transaction/mod.rs b/crates/core/src/kernel/transaction/mod.rs index 84fe981adb..2120e4b9ae 100644 --- a/crates/core/src/kernel/transaction/mod.rs +++ b/crates/core/src/kernel/transaction/mod.rs @@ -339,6 +339,45 @@ impl CommitData { } Ok(bytes::Bytes::from(jsons.join("\n"))) } + + /// Update num_retries in operationMetrics within serialized bytes. + /// This allows updating the retry count without re-serializing all actions. + pub fn update_retry_count_in_bytes( + bytes: &Bytes, + num_retries: u64, + ) -> Result { + + let bytes_str = std::str::from_utf8(bytes.as_ref()) + .expect("Delta log bytes should always be valid UTF-8"); + + let lines: Vec<&str> = bytes_str.split('\n').collect(); + let mut updated_lines = Vec::with_capacity(lines.len()); + + for line in lines { + if line.contains("\"commitInfo\"") { + let mut action: Value = serde_json::from_str(line) + .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?; + + if let Some(commit_info) = action.get_mut("commitInfo") { + if let Some(Value::Object(metrics)) = commit_info.get_mut("operationMetrics") { + metrics.insert( + "num_retries".to_string(), + Value::Number(num_retries.into()), + ); + } + } + // Serialize just this updated line + let updated_line = serde_json::to_string(&action) + .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?; + updated_lines.push(updated_line); + } else { + // Keep other action lines unchanged (Add, Remove, etc.) + updated_lines.push(line.to_string()); + } + } + + Ok(Bytes::from(updated_lines.join("\n"))) + } } #[derive(Clone, Debug, Copy)] @@ -728,9 +767,25 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { let version: i64 = latest_version + 1; Span::current().record("target_version", version); + // Calculate current retry count (attempt 1 = 0 retries, attempt 2 = 1 retry, etc.) + let current_retries = (attempt_number - 1) as u64; + + // Update operationMetrics.numRetries in the serialized bytes + let updated_commit_or_bytes = match &commit_or_bytes { + CommitOrBytes::LogBytes(bytes) => { + let updated_bytes = CommitData::update_retry_count_in_bytes(bytes, current_retries)?; + CommitOrBytes::LogBytes(updated_bytes) + } + CommitOrBytes::TmpCommit(_path) => { + // For TmpCommit stores, keep original behavior for now + // (these stores write to tmp file first, then rename) + commit_or_bytes.clone() + } + }; + match this .log_store - .write_commit_entry(version, commit_or_bytes.clone(), this.operation_id) + .write_commit_entry(version, updated_commit_or_bytes, this.operation_id) .await { Ok(()) => { diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index e4a9880f06..b3abc58b89 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -178,6 +178,9 @@ pub struct WriteMetrics { pub num_added_rows: usize, /// Time taken to execute the entire operation pub execution_time_ms: u64, + /// Number of retries before successful commit + #[serde(default)] + pub num_retries: u64, } impl super::Operation<()> for WriteBuilder {