Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions src/api/shared/handlers/reaction_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,18 @@ pub async fn create_reaction_handler(
log::info!("Reaction '{reaction_id}' created successfully");

if auto_start {
if let Err(e) = core.start_reaction(&reaction_id).await {
log::warn!("Failed to auto-start reaction '{reaction_id}': {e}");
// `add_reaction_with_metadata` already auto-starts the reaction
// when the server is running, so it is usually already starting
// or running by now. Only start it here if it is still Stopped
// (i.e. `add` did not auto-start it), and report genuine start
// failures as errors rather than warnings.
if matches!(
core.get_reaction_status(&reaction_id).await,
Ok(ComponentStatus::Stopped)
) {
if let Err(e) = core.start_reaction(&reaction_id).await {
log::error!("Failed to auto-start reaction '{reaction_id}': {e}");
}
}
}

Expand Down Expand Up @@ -244,8 +254,18 @@ pub async fn upsert_reaction_handler(
log::info!("Reaction '{reaction_id}' created successfully");

if auto_start {
if let Err(e) = core.start_reaction(&reaction_id).await {
log::warn!("Failed to auto-start reaction '{reaction_id}': {e}");
// `add_reaction_with_metadata` already auto-starts the reaction
// when the server is running, so it is usually already starting
// or running by now. Only start it here if it is still Stopped
// (i.e. `add` did not auto-start it), and report genuine start
// failures as errors rather than warnings.
if matches!(
core.get_reaction_status(&reaction_id).await,
Ok(ComponentStatus::Stopped)
) {
if let Err(e) = core.start_reaction(&reaction_id).await {
log::error!("Failed to auto-start reaction '{reaction_id}': {e}");
}
}
}

Expand Down
28 changes: 24 additions & 4 deletions src/api/shared/handlers/source_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,18 @@ pub async fn create_source_handler(
log::info!("Source '{source_id}' created successfully");

if auto_start {
if let Err(e) = core.start_source(&source_id).await {
log::warn!("Failed to auto-start source '{source_id}': {e}");
// `add_source_with_metadata` already auto-starts the source
// when the server is running, so it is usually already starting
// or running by now. Only start it here if it is still Stopped
// (i.e. `add` did not auto-start it), and report genuine start
// failures as errors rather than warnings.
if matches!(
core.get_source_status(&source_id).await,
Ok(ComponentStatus::Stopped)
) {
if let Err(e) = core.start_source(&source_id).await {
log::error!("Failed to auto-start source '{source_id}': {e}");
}
}
}

Expand Down Expand Up @@ -247,8 +257,18 @@ pub async fn upsert_source_handler(
log::info!("Source '{source_id}' created successfully");

if auto_start {
if let Err(e) = core.start_source(&source_id).await {
log::warn!("Failed to auto-start source '{source_id}': {e}");
// `add_source_with_metadata` already auto-starts the source
// when the server is running, so it is usually already starting
// or running by now. Only start it here if it is still Stopped
// (i.e. `add` did not auto-start it), and report genuine start
// failures as errors rather than warnings.
if matches!(
core.get_source_status(&source_id).await,
Ok(ComponentStatus::Stopped)
) {
if let Err(e) = core.start_source(&source_id).await {
log::error!("Failed to auto-start source '{source_id}': {e}");
}
}
}

Expand Down
66 changes: 66 additions & 0 deletions tests/api_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,72 @@ async fn test_reaction_lifecycle_via_api() {
assert!(json["data"]["links"]["full"].is_string());
}

/// Creating a reaction with `autoStart: true` via the API must leave it in the
/// `Running` state. `DrasiLib::add_reaction_with_metadata` already auto-starts
/// the reaction when the server is running, so the handler must not report a
/// spurious failure for the (redundant) start that follows. See the issue
/// "Incorrect log messages starting up Drasi Server".
#[tokio::test]
async fn test_create_reaction_auto_start_via_api() {
let (router, core, _registry) = create_test_router().await;
let base = "/instances/test-server";
let graph = core.component_graph();

// POST a new reaction with autoStart enabled, subscribing to an existing query.
let body = serde_json::json!({
"kind": "application",
"id": "post-auto-reaction",
"queries": ["reaction-query"],
"autoStart": true,
});

let response = router
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri(format!("{base}/reactions"))
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.unwrap(),
)
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["success"], true);

// The reaction should be auto-started and reach Running without any failure.
drasi_lib::wait_for_status(
&graph,
"post-auto-reaction",
&[drasi_lib::channels::ComponentStatus::Running],
std::time::Duration::from_secs(5),
)
.await
.expect("post-auto-reaction should reach Running after creation");

// Confirm via the API that the reaction is reported as Running.
let response = router
.clone()
.oneshot(
Request::builder()
.uri(format!("{base}/reactions/post-auto-reaction"))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["success"], true);
assert_eq!(json["data"]["status"], "Running");
}

#[tokio::test]
async fn test_source_logs_snapshot_via_api() {
// Use unique instance ID to avoid interference with parallel tests
Expand Down