Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions components/log-ingestor/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async fn health() -> &'static str {

#[utoipa::path(
post,
path = "/s3_scanner",
path = "/s3/scanner",
tags = ["IngestionJob"],
description = "Creates an ingestion job and starts a job instance that periodically scans the \
specified S3 bucket and key prefix for new objects to ingest.\n\n\
Expand Down Expand Up @@ -194,7 +194,7 @@ async fn create_s3_scanner_job(

#[utoipa::path(
post,
path = "/sqs_listener",
path = "/s3/listener",
tags = ["IngestionJob"],
description = "Creates an ingestion job and starts a job instance that monitors an SQS queue. \
The queue receives notifications whenever new objects are added to the specified S3 bucket \
Expand Down Expand Up @@ -245,7 +245,7 @@ async fn create_sqs_listener_job(

#[utoipa::path(
post,
path = "/job/{job_id}/terminate",
path = "/s3/{job_id}/terminate",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | ⚡ Quick win

/s3/{job_id}/terminate conflates job-type namespace with a type-agnostic operation.

job_id carries no type information (it is just an opaque ID). Placing the terminate operation under the /s3/ prefix implies it is specific to S3 jobs. If a non-S3 ingestion job type is added later, a separate termination path would be required, or the /s3/ prefix would have to be extended in an ad-hoc way.

A resource-oriented path like /jobs/{job_id}/terminate or simply the original /job/{job_id}/terminate keeps the termination operation decoupled from job type, matching the semantics of the handler (shutdown_and_remove_job_instance accepts any IngestionJobId). Worth confirming whether issue #2166 explicitly mandates this path shape or whether a generic /jobs/{job_id}/terminate also satisfies the scheme.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/log-ingestor/src/routes.rs` at line 248, The route path currently
uses a type-specific prefix "/s3/{job_id}/terminate" which wrongly ties a
type-agnostic operation to S3; change the route to a resource-oriented path such
as "/jobs/{job_id}/terminate" (or "/job/{job_id}/terminate" if that matches
existing conventions) and update the route registration so the handler
shutdown_and_remove_job_instance remains unchanged and continues to accept an
IngestionJobId; ensure any doc/comments and tests referencing the old "/s3/…"
path are updated accordingly.

tags = ["IngestionJob"],
description = "Terminates the ingestion job instance identified by the given job ID.\n\n\
This operation only terminates the job instance; the job record, including its status and \
Expand Down
88 changes: 44 additions & 44 deletions docs/src/_static/generated/log-ingestor-openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,46 @@
}
}
},
"/job/{job_id}/terminate": {
"/s3/listener": {
"post": {
"tags": [
"IngestionJob"
],
"description": "Terminates the ingestion job instance identified by the given job ID.\n\nThis operation only terminates the job instance; the job record, including its status and statistics, is preserved and can still be accessible using the same job ID.\n\nThis operation is idempotent.",
"operationId": "terminate_ingestion_job",
"parameters": [
{
"name": "job_id",
"in": "path",
"description": "The ID of the job to terminate.",
"required": true,
"schema": {
"$ref": "#/components/schemas/u64"
"description": "Creates an ingestion job and starts a job instance that monitors an SQS queue. The queue receives notifications whenever new objects are added to the specified S3 bucket and key prefix.\n\nThe specified SQS queue must be dedicated to this ingestion job. Upon successful ingestion, the job deletes the corresponding message from the queue to ensure objects are not ingested multiple times.\n\nTo maintain correctness and avoid backpressure, the job may also delete messages that are irrelevant to this ingestion job (for example, messages referring to objects outside the configured bucket or key prefix).",
"operationId": "create_sqs_listener_job",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/SqsListenerConfig"
}
}
}
],
},
"required": true
},
"responses": {
"200": {
"description": "",
"description": "The ID of the created job.",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/TerminateResponse"
"$ref": "#/components/schemas/CreationResponse"
}
}
}
},
"404": {
"description": "The specified job ID does not correspond to an ingestion job.",
"400": {
"description": "Custom endpoint URLs are not supported for SQS listener jobs, or the specified number of concurrent listener tasks is invalid.",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
}
}
}
},
"409": {
"description": "A prefix conflict was detected. For the same bucket and dataset, only one running ingestion job instance may monitor a given S3 prefix or any of its ancestor or descendant prefixes.",
"content": {
"application/json": {
"schema": {
Expand All @@ -84,7 +93,7 @@
}
}
},
"/s3_scanner": {
"/s3/scanner": {
"post": {
"tags": [
"IngestionJob"
Expand Down Expand Up @@ -145,46 +154,37 @@
}
}
},
"/sqs_listener": {
"/s3/{job_id}/terminate": {
"post": {
"tags": [
"IngestionJob"
],
"description": "Creates an ingestion job and starts a job instance that monitors an SQS queue. The queue receives notifications whenever new objects are added to the specified S3 bucket and key prefix.\n\nThe specified SQS queue must be dedicated to this ingestion job. Upon successful ingestion, the job deletes the corresponding message from the queue to ensure objects are not ingested multiple times.\n\nTo maintain correctness and avoid backpressure, the job may also delete messages that are irrelevant to this ingestion job (for example, messages referring to objects outside the configured bucket or key prefix).",
"operationId": "create_sqs_listener_job",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/SqsListenerConfig"
}
"description": "Terminates the ingestion job instance identified by the given job ID.\n\nThis operation only terminates the job instance; the job record, including its status and statistics, is preserved and can still be accessible using the same job ID.\n\nThis operation is idempotent.",
"operationId": "terminate_ingestion_job",
"parameters": [
{
"name": "job_id",
"in": "path",
"description": "The ID of the job to terminate.",
"required": true,
"schema": {
"$ref": "#/components/schemas/u64"
}
},
"required": true
},
}
],
"responses": {
"200": {
"description": "The ID of the created job.",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/CreationResponse"
}
}
}
},
"400": {
"description": "Custom endpoint URLs are not supported for SQS listener jobs, or the specified number of concurrent listener tasks is invalid.",
"description": "",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
"$ref": "#/components/schemas/TerminateResponse"
}
}
}
},
"409": {
"description": "A prefix conflict was detected. For the same bucket and dataset, only one running ingestion job instance may monitor a given S3 prefix or any of its ancestor or descendant prefixes.",
"404": {
"description": "The specified job ID does not correspond to an ingestion job.",
"content": {
"application/json": {
"schema": {
Expand Down
Loading