Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 52 additions & 36 deletions rust/garbage_collector/src/construct_version_graph_orchestrator.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes in this file provide a little more debugging info + a new defensive check

Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::operators::{
fetch_lineage_file::{
FetchLineageFileError, FetchLineageFileInput, FetchLineageFileOperator,
FetchLineageFileOutput,
},
fetch_version_file::{
FetchVersionFileError, FetchVersionFileInput, FetchVersionFileOperator,
FetchVersionFileOutput,
},
get_version_file_paths::{
GetVersionFilePathsError, GetVersionFilePathsInput, GetVersionFilePathsOperator,
GetVersionFilePathsOutput,
use crate::{
operators::{
fetch_lineage_file::{
FetchLineageFileError, FetchLineageFileInput, FetchLineageFileOperator,
FetchLineageFileOutput,
},
fetch_version_file::{
FetchVersionFileError, FetchVersionFileInput, FetchVersionFileOperator,
FetchVersionFileOutput,
},
get_version_file_paths::{
GetVersionFilePathsError, GetVersionFilePathsInput, GetVersionFilePathsOperator,
GetVersionFilePathsOutput,
},
},
types::{VersionGraph, VersionGraphNode, VersionStatus},
};
use async_trait::async_trait;
use base64::{prelude::BASE64_STANDARD, Engine};
Expand All @@ -32,15 +35,6 @@ use thiserror::Error;
use tokio::sync::oneshot::{error::RecvError, Sender};
use tracing::Span;

#[derive(Debug, Clone, Copy)]
pub enum VersionStatus {
#[allow(dead_code)]
Alive {
created_at: DateTime<chrono::Utc>,
},
Deleted,
}

#[derive(Debug, Clone)]
struct VersionDependency {
source_collection_id: CollectionUuid,
Expand Down Expand Up @@ -91,16 +85,6 @@ impl ConstructVersionGraphOrchestrator {
}
}

#[derive(Debug, Clone)]
pub struct VersionGraphNode {
pub collection_id: CollectionUuid,
pub version: i64,
#[allow(dead_code)]
pub status: VersionStatus,
}

pub type VersionGraph = DiGraph<VersionGraphNode, ()>;

#[derive(Debug)]
#[allow(dead_code)]
pub struct ConstructVersionGraphResponse {
Expand Down Expand Up @@ -130,8 +114,10 @@ pub enum ConstructVersionGraphError {
InvalidUuid(#[from] uuid::Error),
#[error("Invalid timestamp: {0}")]
InvalidTimestamp(i64),
#[error("Expected node not found while constructing graph")]
ExpectedNodeNotFound,
#[error("Expected node not found while constructing graph (collection {0}@v{1:?})")]
ExpectedNodeNotFound(CollectionUuid, Option<i64>),
#[error("Invariant violation: {0}")]
InvariantViolation(String),
}

impl<E> From<TaskError<E>> for ConstructVersionGraphError
Expand Down Expand Up @@ -159,7 +145,8 @@ impl ChromaError for ConstructVersionGraphError {
ConstructVersionGraphError::FetchVersionFilePaths(err) => err.code(),
ConstructVersionGraphError::InvalidUuid(_) => ErrorCodes::Internal,
ConstructVersionGraphError::InvalidTimestamp(_) => ErrorCodes::InvalidArgument,
ConstructVersionGraphError::ExpectedNodeNotFound => ErrorCodes::Internal,
ConstructVersionGraphError::ExpectedNodeNotFound(_, _) => ErrorCodes::Internal,
ConstructVersionGraphError::InvariantViolation(_) => ErrorCodes::Internal,
}
}
}
Expand Down Expand Up @@ -224,9 +211,11 @@ impl ConstructVersionGraphOrchestrator {
ctx: &ComponentContext<ConstructVersionGraphOrchestrator>,
) -> Result<(), ConstructVersionGraphError> {
if self.num_pending_tasks == 0 {
// This map will be used as a basis for building the graph
let mut versions_by_collection_id: HashMap<CollectionUuid, Vec<(i64, VersionStatus)>> =
HashMap::new();

// Add all known versions from version files to map
for (collection_id, version_file) in self.version_files.iter() {
if let Some(versions) = &version_file.version_history {
for version in versions.versions.iter() {
Expand All @@ -251,6 +240,7 @@ impl ConstructVersionGraphOrchestrator {
}
}

// If any version appears as a version dependency (from the lineage file) but does not already exist in the map from the version files, the version must have been deleted.
for dependency in self.version_dependencies.iter() {
let source_collection_id = dependency.source_collection_id;
let source_collection_version = dependency.source_collection_version;
Expand All @@ -272,6 +262,11 @@ impl ConstructVersionGraphOrchestrator {
versions.sort_unstable_by_key(|v| v.0);
}

tracing::trace!(
"Versions by collection ID: {:#?}",
versions_by_collection_id
);

let mut graph = DiGraph::new();
for (collection_id, versions) in versions_by_collection_id.iter() {
let mut prev_node = None;
Expand All @@ -282,12 +277,14 @@ impl ConstructVersionGraphOrchestrator {
status: *status,
});
if let Some(prev) = prev_node {
// Add edge between each successive pair of collection versions
graph.add_edge(prev, node, ());
}
prev_node = Some(node);
}
}

// Add edges for forked collections
for dependency in self.version_dependencies.iter() {
let source_node = graph
.node_indices()
Expand All @@ -296,15 +293,25 @@ impl ConstructVersionGraphOrchestrator {
node.collection_id == dependency.source_collection_id
&& node.version == dependency.source_collection_version
})
.ok_or(ConstructVersionGraphError::ExpectedNodeNotFound)?;
.ok_or_else(|| {
ConstructVersionGraphError::ExpectedNodeNotFound(
dependency.source_collection_id,
Some(dependency.source_collection_version),
)
})?;

let target_node = graph
.node_indices()
.find(|n| {
let node = graph.node_weight(*n).expect("node index should exist");
node.collection_id == dependency.target_collection_id
})
.ok_or(ConstructVersionGraphError::ExpectedNodeNotFound)?;
.ok_or_else(|| {
ConstructVersionGraphError::ExpectedNodeNotFound(
dependency.target_collection_id,
None,
)
})?;

graph.add_edge(source_node, target_node, ());
}
Expand All @@ -317,6 +324,15 @@ impl ConstructVersionGraphOrchestrator {

tracing::trace!("Version files: {:#?}", self.version_files);

let components = petgraph::algo::connected_components(&graph);
if components != 1 {
// This is a defensive check, it should never happen
return Err(ConstructVersionGraphError::InvariantViolation(format!(
"Graph is not fully connected, found {} components",
components
)));
}

self.terminate_with_result(
Ok(ConstructVersionGraphResponse {
graph,
Expand Down
Loading
Loading