Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Phil/controller backoffs #2038

Merged
merged 5 commits into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 69 additions & 41 deletions crates/agent/src/controllers/capture/auto_discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use models::status::{
use crate::{
controllers::{
publication_status::{self, PendingPublication},
ControllerState, NextRun,
ControllerErrorExt, ControllerState, NextRun,
},
controlplane::ConnectorSpec,
discovers::DiscoverOutput,
Expand Down Expand Up @@ -45,44 +45,57 @@ pub async fn update<C: ControlPlane>(
control_plane: &C,
pub_status: &mut PublicationStatus,
) -> anyhow::Result<bool> {
// Ensure that `next_at` has been initialized.
update_next_run(status, state, model, control_plane).await?;
if status
.next_at
.map(|due| control_plane.current_time() <= due)
.unwrap_or(true)
{
return Ok(false);

// Do we need to auto-discover now?
let now = control_plane.current_time();
let AutoDiscoverStatus {
ref next_at,
ref failure,
..
} = status;
match (next_at, failure) {
// Either the spec or autoDiscover is disabled
(None, _) => return Ok(false),
// Not due yet
(Some(n), None) if *n > now => return Ok(false),
// Previous attempt failed, and we're not yet ready for a retry.
// Return an error so that it's clear that auto-discovers are not working for this capture.
(Some(n), Some(f)) if *n > now => {
return crate::controllers::backoff_err(
NextRun::after(*n).with_jitter_percent(5),
"auto-discover",
f.count,
);
}
// next_at <= now, so proceed with the auto-discover
(Some(n), _) => {
tracing::debug!(due_at = %n, "starting auto-discover");
}
}

tracing::debug!("starting auto-discover");
// We'll return the original discover error if it fails
let result = try_auto_discover(state, model, control_plane, pub_status).await;

// We'll return whether we've actually published anything. If all we did
// was run a discover that found no changes, then we may proceed with
// other controller actions.
let has_changes = match result {
Ok(outcome) => {
let has_changes = outcome.is_successful() && outcome.has_changes();
let result = outcome.get_result();
record_outcome(status, outcome);
result?; // return an error if the auto-discover failed

// Auto-discover was successful, so determine the time of the next attempt
update_next_run(status, state, model, control_plane).await?;
has_changes
}
let outcome = match result {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional nit: could use map_err

Ok(outcome) => outcome,
Err(error) => {
tracing::debug!(?error, "auto-discover failed with error");
let outcome = AutoDiscoverOutcome::error(
control_plane.current_time(),
&state.catalog_name,
&error,
);
record_outcome(status, outcome);
return Err(error);
let failed_at = control_plane.current_time();
AutoDiscoverOutcome::error(failed_at, &state.catalog_name, &error)
}
};
let has_changes = outcome.has_changes();
let return_result = outcome.get_result();
record_outcome(status, outcome);
update_next_run(status, state, model, control_plane).await?;

// If either the discover or publication failed, return now with an error,
// with the retry set to the next attempt time.
return_result.with_maybe_retry(
status
.next_at
.map(|ts| NextRun::after(ts).with_jitter_percent(5)),
)?;
Ok(has_changes)
}

Expand All @@ -92,20 +105,21 @@ async fn update_next_run<C: ControlPlane>(
model: &models::CaptureDef,
control_plane: &C,
) -> anyhow::Result<()> {
if model.shards.disable {
// Do you even auto-discover, bro?
if model.shards.disable || model.auto_discover.is_none() {
status.next_at = None;
return Ok(());
}

if status.next_at.is_none()
|| status.next_at.is_some_and(|n| {
status
.last_success
.as_ref()
.map(|ls| ls.ts > n)
.unwrap_or(false)
})
{
// Was there a successful auto-discover since the `next_at` time?
let last_attempt_successful = status.next_at.is_some_and(|n| {
status
.last_success
.as_ref()
.map(|ls| ls.ts > n)
.unwrap_or(false)
});
if status.next_at.is_none() || last_attempt_successful {
// `next_at` is `None` or else we've successfully completed a
// discover since, so determine the next auto-discover time.
// If there's no `connector_tags` row for this capture connector
Expand All @@ -129,7 +143,21 @@ async fn update_next_run<C: ControlPlane>(
let next = prev + auto_discover_interval;
tracing::debug!(%next, %auto_discover_interval, "determined new next_at time");
status.next_at = Some(next);
return Ok(());
}

// Sad path, the previous attempt failed so determine a time for the next attempt, with a backoff.
if let Some(fail) = status
.failure
.as_ref()
.filter(|f| status.next_at.is_some_and(|n| n < f.last_outcome.ts))
{
let backoff_minutes = (fail.count as i64 * 10).min(120);
let next_attempt = fail.last_outcome.ts + chrono::Duration::minutes(backoff_minutes);
status.next_at = Some(next_attempt);
}

// There's not been an attempted auto-discover since `next_at`, so keep the current value
Ok(())
}

Expand Down
61 changes: 47 additions & 14 deletions crates/agent/src/controllers/dependencies.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeSet;

use anyhow::Context;
use chrono::{DateTime, Utc};
use models::{status::publications::PublicationStatus, AnySpec, ModelDef};

use crate::ControlPlane;
Expand Down Expand Up @@ -86,14 +87,21 @@ impl Dependencies {
DF: FnOnce(&BTreeSet<String>) -> anyhow::Result<(String, M)>,
M: Into<models::AnySpec>,
{
let mut pending = self.start_update(state, handle_deleted).await?;
let mut pending = self
.start_update(state, control_plane.current_time(), handle_deleted)
.await?;
if pending.has_pending() {
pending
let pub_result = pending
.finish(state, pub_status, control_plane)
.await
.context("failed to execute publish")?
.error_for_status()
.with_maybe_retry(backoff_publication_failure(state.failures))?;
.error_for_status();
// The most recent failure in the history is guaranteed to be the
// right one if this attempt has failed, so an empty prefix works.
let failures = super::last_pub_failed(pub_status, "")
.map(|(_, count)| count)
.unwrap_or(1);
pub_result.with_retry(backoff_publication_failure(failures))?;
Ok(true)
} else {
Ok(false)
Expand All @@ -107,6 +115,7 @@ impl Dependencies {
pub async fn start_update<DF, M>(
&mut self,
state: &ControllerState,
now: DateTime<Utc>,
handle_deleted: DF,
) -> anyhow::Result<PendingPublication>
where
Expand All @@ -118,13 +127,37 @@ impl Dependencies {
return Ok(pending_pub);
}

if self.deleted.is_empty() {
let detail = if self.deleted.is_empty() {
// This is the common case
let new_hash = self.hash.as_deref().unwrap_or("None");
let old_hash = state.live_dependency_hash.as_deref().unwrap_or("None");
let detail = format!(
format!(
"in response to change in dependencies, prev hash: {old_hash}, new hash: {new_hash}"
);
)
} else {
"in response to deletion one or more depencencies".to_string()
};

// Do we need to backoff a previous failed attempt? First question is
// whether the last attempt failed. Note that our use of the full detail
// when matching the prefix is intentional, so that the backoff gets
// reset whenever the dependency hashes change.
if let Some((last, fail_count)) = state
.current_status
.publication_status()
.and_then(|s| super::last_pub_failed(s, &detail))
{
let backoff = backoff_publication_failure(fail_count);
// 0 the jitter when computing here so that we don't randomly use a greater jitter
// than was determined when the backoff error was first returned. This isn't critical,
// but avoids potentially "extra" controller runs.
let next_attempt = last + backoff.with_jitter_percent(0).compute_duration();
if next_attempt > now {
return super::backoff_err(backoff, "dependency update publication", fail_count);
}
}

if self.deleted.is_empty() {
pending_pub.start_touch(state, detail);
} else {
let (detail, updated_model) = handle_deleted(&self.deleted)
Expand All @@ -143,12 +176,12 @@ impl Dependencies {
}
}

fn backoff_publication_failure(prev_failures: i32) -> Option<NextRun> {
if prev_failures < 3 {
Some(NextRun::after_minutes(prev_failures.max(1) as u32))
} else if prev_failures < 10 {
Some(NextRun::after_minutes(prev_failures as u32 * 60))
fn backoff_publication_failure(prev_failures: u32) -> NextRun {
let mins = if prev_failures < 3 {
prev_failures.max(1)
} else {
None
}
// max of 5 hours between attempts
(prev_failures * 30).min(300)
};
NextRun::after_minutes(mins as u32)
}
59 changes: 43 additions & 16 deletions crates/agent/src/controllers/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn maybe_publish<C: ControlPlane>(
// because we need to handle the schema evolution whenever we publish. The collection schemas could have changed
// since the last publish, and we might need to apply `onIncompatibleSchemaChange` actions.
let dependency_pub = dependencies
.start_update(state, |deleted| {
.start_update(state, control_plane.current_time(), |deleted| {
Ok(handle_deleted_dependencies(deleted, model.clone()))
})
.await?;
Expand Down Expand Up @@ -109,7 +109,7 @@ async fn maybe_publish<C: ControlPlane>(
status.source_capture.take();
}

let periodic = periodic::start_periodic_publish_update(state, control_plane);
let periodic = periodic::start_periodic_publish_update(state, control_plane)?;
if periodic.has_pending() {
do_publication(&mut status.publications, state, periodic, control_plane).await?;
return Ok(true);
Expand Down Expand Up @@ -319,16 +319,48 @@ pub async fn update_source_capture<C: ControlPlane>(
) -> anyhow::Result<bool> {
let capture_spec = live_capture.model();

// Did a prior attempt to add bindings fail?
let prev_failed = !status.up_to_date;

// Record the bindings that we plan to add. This will remain if we
// return an error while trying to add them, so that we can see the new
// binginds in the status if something goes wrong. If all goes well,
// bindings in the status if something goes wrong. If all goes well,
// we'll clear this at the end.
status.add_bindings = get_bindings_to_add(capture_spec, model);
status.up_to_date = status.add_bindings.is_empty();
if status.up_to_date {
return Ok(false);
}

// Avoid generating a detail with hundreds of collection names
let detail = if status.add_bindings.len() > 10 {
format!(
"adding {} bindings to match the sourceCapture",
status.add_bindings.len()
)
} else {
format!(
"adding binding(s) to match the sourceCapture: [{}]",
status.add_bindings.iter().join(", ")
)
};

// Check whether the prior attempt failed, and whether we need to backoff
// before trying again. Note that if the detail message changes (i.e. the
// source capture bindings changed), then the backoff will effectively be
// reset, because we match on the detail message. This is intentional, so
// that changes which may allow the publication to succeed will get retried
// immediately.
if let Some((last_attempt, fail_count)) =
super::last_pub_failed(pub_status, &detail).filter(|_| prev_failed)
{
let backoff = backoff_failed_source_capture_pub(fail_count);
let next = last_attempt + backoff.with_jitter_percent(0).compute_duration();
if next > control_plane.current_time() {
return super::backoff_err(backoff, &detail, fail_count);
}
}

// We need to update the materialization model to add the bindings. This
// requires the `resource_spec_schema` of the connector so that we can
// generate valid `resource`s for the new bindings.
Expand All @@ -343,19 +375,6 @@ pub async fn update_source_capture<C: ControlPlane>(
.context("failed to fetch connector spec")?;
let resource_spec_pointers = pointer_for_schema(connector_spec.resource_config_schema.get())?;

// Avoid generating a detail with hundreds of collection names
let detail = if status.add_bindings.len() > 10 {
format!(
"adding {} bindings to match the sourceCapture",
status.add_bindings.len()
)
} else {
format!(
"adding binding(s) to match the sourceCapture: [{}]",
status.add_bindings.iter().join(", ")
)
};

let mut new_model = model.clone();
update_linked_materialization(
model.source_capture.as_ref().unwrap(),
Expand All @@ -374,6 +393,14 @@ pub async fn update_source_capture<C: ControlPlane>(
Ok(true)
}

fn backoff_failed_source_capture_pub(failures: u32) -> NextRun {
let mins = match failures {
0..3 => failures * 3,
_ => (failures * 10).min(300),
};
NextRun::after_minutes(mins)
}

fn get_bindings_to_add(
capture_spec: &models::CaptureDef,
materialization_spec: &models::MaterializationDef,
Expand Down
Loading