Skip to content

Commit beeb6a8

Browse files
Merge pull request #49 from glassflow/add-stop-and-delete
ETL-35: Add stop and delete
2 parents bae5adb + 2794a89 commit beeb6a8

File tree

11 files changed

+325
-43
lines changed

11 files changed

+325
-43
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,19 @@ pipeline.resume()
184184
print(pipeline.status)
185185
```
186186

187+
### Stop pipeline
188+
189+
```python
190+
# Stop a pipeline gracefully
191+
client.stop_pipeline("my-pipeline-id")
192+
193+
# Stop a pipeline ungracefully (terminate)
194+
client.stop_pipeline("my-pipeline-id", terminate=True)
195+
196+
# Or stop via pipeline instance
197+
pipeline.stop()
198+
```
199+
187200
### Delete pipeline
188201

189202
```python

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.1.0
1+
3.2.0

src/glassflow/etl/api_client.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,51 @@ def _raise_api_error(response: httpx.Response) -> None:
6464
"""Raise an APIError based on the response."""
6565
status_code = response.status_code
6666
try:
67-
message = response.json().get("message", None)
67+
error_data = response.json()
68+
message = error_data.get("message", None)
69+
code = error_data.get("code", None)
6870
except json.JSONDecodeError:
6971
message = f"{status_code} {response.reason_phrase}"
72+
code = None
73+
error_data = {}
74+
7075
if status_code == 400:
71-
raise errors.ValidationError(status_code, message, response=response)
76+
# Handle specific status validation error codes
77+
if code == "TERMINAL_STATE_VIOLATION":
78+
raise errors.TerminalStateViolationError(
79+
status_code, message, response=response
80+
)
81+
elif code == "INVALID_STATUS_TRANSITION":
82+
raise errors.InvalidStatusTransitionError(
83+
status_code,
84+
message,
85+
response=response,
86+
)
87+
elif code == "UNKNOWN_STATUS":
88+
raise errors.UnknownStatusError(status_code, message, response=response)
89+
elif code == "PIPELINE_ALREADY_IN_STATE":
90+
raise errors.PipelineAlreadyInStateError(
91+
status_code, message, response=response
92+
)
93+
elif code == "PIPELINE_IN_TRANSITION":
94+
raise errors.PipelineInTransitionError(
95+
status_code, message, response=response
96+
)
97+
elif message and message.startswith("invalid json:"):
98+
raise errors.InvalidJsonError(status_code, message, response=response)
99+
elif message and message == "pipeline id cannot be empty":
100+
raise errors.EmptyPipelineIdError(
101+
status_code, message, response=response
102+
)
103+
elif message and message.startswith(
104+
"pipeline can only be deleted if it's stopped or terminated"
105+
):
106+
raise errors.PipelineDeletionStateViolationError(
107+
status_code, message, response=response
108+
)
109+
else:
110+
# Generic 400 error for unknown codes
111+
raise errors.ValidationError(status_code, message, response=response)
72112
elif status_code == 403:
73113
raise errors.ForbiddenError(status_code, message, response=response)
74114
elif status_code == 404:

src/glassflow/etl/client.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,34 @@ def create_pipeline(
107107

108108
return pipeline.create()
109109

110-
def delete_pipeline(self, pipeline_id: str, terminate: bool = True) -> None:
111-
"""Deletes the pipeline with the given ID.
110+
def stop_pipeline(self, pipeline_id: str, terminate: bool = False) -> None:
111+
"""Stops the pipeline with the given ID.
112112
113113
Args:
114-
pipeline_id: The ID of the pipeline to delete
114+
pipeline_id: The ID of the pipeline to stop
115115
terminate: Whether to terminate the pipeline (i.e. delete all the pipeline
116116
components and potentially all the events in the pipeline)
117+
118+
Raises:
119+
PipelineInTransitionError: If pipeline is in transition
120+
PipelineNotFoundError: If pipeline is not found
121+
APIError: If the API request fails
122+
"""
123+
Pipeline(host=self.host, pipeline_id=pipeline_id).stop(terminate=terminate)
124+
125+
def delete_pipeline(self, pipeline_id: str) -> None:
126+
"""Deletes the pipeline with the given ID.
127+
128+
Args:
129+
pipeline_id: The ID of the pipeline to delete
130+
117131
Raises:
132+
PipelineDeletionStateViolationError: If pipeline is not stopped or
133+
terminating
118134
PipelineNotFoundError: If pipeline is not found
119135
APIError: If the API request fails
120136
"""
121-
Pipeline(host=self.host, pipeline_id=pipeline_id).delete(terminate=terminate)
137+
Pipeline(host=self.host, pipeline_id=pipeline_id).delete()
122138

123139
def disable_tracking(self) -> None:
124140
"""Disable tracking of pipeline events."""

src/glassflow/etl/errors.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,39 @@ class InvalidDataTypeMappingError(GlassFlowError):
6060

6161
class InvalidBatchSizeError(GlassFlowError):
6262
"""Exception raised when a batch size is invalid."""
63+
64+
65+
# Status validation error classes for 400 Bad Request responses
66+
class TerminalStateViolationError(ValidationError):
67+
"""Raised when attempting to transition from a terminal state to another state."""
68+
69+
70+
class InvalidStatusTransitionError(ValidationError):
71+
"""Raised when attempting an invalid status transition."""
72+
73+
74+
class UnknownStatusError(ValidationError):
75+
"""Raised when an unknown pipeline status is encountered."""
76+
77+
78+
class PipelineAlreadyInStateError(ValidationError):
79+
"""Raised when pipeline is already in the requested state."""
80+
81+
82+
class PipelineInTransitionError(ValidationError):
83+
"""
84+
Raised when pipeline is currently transitioning and cannot perform the
85+
requested operation.
86+
"""
87+
88+
89+
class InvalidJsonError(ValidationError):
90+
"""Raised when malformed JSON is provided in request body."""
91+
92+
93+
class EmptyPipelineIdError(ValidationError):
94+
"""Raised when pipeline ID parameter is empty or whitespace."""
95+
96+
97+
class PipelineDeletionStateViolationError(ValidationError):
98+
"""Raised when attempting to delete a pipeline that's not in a deletable state."""

src/glassflow/etl/models/pipeline.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ class PipelineStatus(CaseInsensitiveStrEnum):
1717
PAUSING = "Pausing"
1818
PAUSED = "Paused"
1919
RESUMING = "Resuming"
20+
STOPPING = "Stopping"
21+
STOPPED = "Stopped"
2022
TERMINATING = "Terminating"
2123
TERMINATED = "Terminated"
2224
FAILED = "Failed"
25+
DELETED = "Deleted"
2326

2427

2528
class PipelineConfig(BaseModel):

src/glassflow/etl/models/source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class ConsumerGroupOffset(CaseInsensitiveStrEnum):
8181

8282

8383
class TopicConfig(BaseModel):
84-
consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.EARLIEST
84+
consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.LATEST
8585
name: str
8686
event_schema: Schema = Field(alias="schema")
8787
deduplication: Optional[DeduplicationConfig] = Field(default=DeduplicationConfig())

src/glassflow/etl/pipeline.py

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,25 +145,52 @@ def update(
145145
"""
146146
raise NotImplementedError("Updating is not implemented")
147147

148-
def delete(self, terminate: bool = True) -> None:
149-
"""Deletes the pipeline with the given ID.
148+
def delete(self) -> None:
149+
"""
150+
Deletes the pipeline from the database. Only pipelines that are stopped or
151+
terminating can be deleted.
152+
153+
Raises:
154+
PipelineDeletionStateViolationError: If pipeline is not stopped or
155+
terminating
156+
PipelineNotFoundError: If pipeline is not found
157+
APIError: If the API request fails
158+
"""
159+
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}"
160+
self._request("DELETE", endpoint, event_name="PipelineDeleted")
161+
self.status = models.PipelineStatus.DELETED
162+
163+
def stop(self, terminate: bool = False) -> Pipeline:
164+
"""
165+
Stops the pipeline. Gracefully by default, ungracefully if terminate is True.
166+
Ungracefully means deleting all the pipeline components without waiting for the
167+
events in the pipeline to be processed.
150168
151169
Args:
152170
terminate: Whether to terminate the pipeline (i.e. delete all the pipeline
153171
components and potentially all the events in the pipeline)
154172
173+
Returns:
174+
Pipeline: A Pipeline instance for the stopped pipeline
175+
155176
Raises:
177+
PipelineInTransitionError: If pipeline is in transition
156178
PipelineNotFoundError: If pipeline is not found
179+
InvalidStatusTransitionError: If pipeline is not in a state that can be
180+
stopped
157181
APIError: If the API request fails
158182
"""
159-
if not terminate:
160-
raise NotImplementedError("Graceful deletion is not implemented")
161-
162-
if self.config is None:
163-
self.get()
164-
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/terminate"
165-
self._request("DELETE", endpoint, event_name="PipelineDeleted")
166-
self.status = models.PipelineStatus.TERMINATING
183+
if terminate:
184+
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/terminate"
185+
next_status = models.PipelineStatus.TERMINATING
186+
event_name = "PipelineTerminated"
187+
else:
188+
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/stop"
189+
next_status = models.PipelineStatus.STOPPING
190+
event_name = "PipelineStopped"
191+
self._request("POST", endpoint, event_name=event_name)
192+
self.status = next_status
193+
return self
167194

168195
def pause(self) -> Pipeline:
169196
"""Pauses the pipeline with the given ID.
@@ -172,7 +199,10 @@ def pause(self) -> Pipeline:
172199
Pipeline: A Pipeline instance for the paused pipeline
173200
174201
Raises:
202+
PipelineInTransitionError: If pipeline is in transition
175203
PipelineNotFoundError: If pipeline is not found
204+
InvalidStatusTransitionError: If pipeline is not in a state that can be
205+
paused
176206
APIError: If the API request fails
177207
"""
178208
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/pause"
@@ -187,7 +217,10 @@ def resume(self) -> Pipeline:
187217
Pipeline: A Pipeline instance for the resumed pipeline
188218
189219
Raises:
220+
PipelineInTransitionError: If pipeline is in transition
190221
PipelineNotFoundError: If pipeline is not found
222+
InvalidStatusTransitionError: If pipeline is not in a state that can be
223+
resumed
191224
APIError: If the API request fails
192225
"""
193226
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/resume"

tests/data/error_scenarios.py

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,31 +51,132 @@ def get_http_error_scenarios():
5151
{
5252
"name": "not_found",
5353
"status_code": 404,
54-
"text": "Pipeline not found",
54+
"json_data": {"message": "Pipeline not found"},
5555
"expected_error": errors.PipelineNotFoundError,
5656
"error_message": "not found",
5757
},
5858
{
5959
"name": "forbidden",
6060
"status_code": 403,
61-
"text": "Pipeline already active",
61+
"json_data": {"message": "Pipeline already active"},
6262
"expected_error": errors.PipelineAlreadyExistsError,
6363
"error_message": "already exists",
6464
},
6565
{
6666
"name": "bad_request",
6767
"status_code": 400,
68-
"text": "Bad request",
68+
"json_data": {"message": "Bad request"},
6969
"expected_error": errors.ValidationError,
7070
"error_message": "Bad request",
7171
},
7272
{
7373
"name": "server_error",
7474
"status_code": 500,
75-
"text": "Internal server error",
75+
"json_data": {"message": "Internal server error"},
7676
"expected_error": errors.ServerError,
7777
"error_message": "Internal server error",
7878
},
79+
# Status validation error scenarios for 400 Bad Request responses
80+
{
81+
"name": "terminal_state_violation",
82+
"status_code": 400,
83+
"json_data": {
84+
"message": (
85+
"Cannot transition from terminal state Terminated to Running"
86+
),
87+
"code": "TERMINAL_STATE_VIOLATION",
88+
"current_status": "Terminated",
89+
"requested_status": "Running",
90+
},
91+
"expected_error": errors.TerminalStateViolationError,
92+
"error_message": (
93+
"Cannot transition from terminal state Terminated to Running"
94+
),
95+
},
96+
{
97+
"name": "invalid_status_transition",
98+
"status_code": 400,
99+
"json_data": {
100+
"message": "Invalid status transition from Running to Paused",
101+
"code": "INVALID_STATUS_TRANSITION",
102+
"current_status": "Running",
103+
"requested_status": "Paused",
104+
"valid_transitions": ["Stopping", "Terminating"],
105+
},
106+
"expected_error": errors.InvalidStatusTransitionError,
107+
"error_message": "Invalid status transition from Running to Paused",
108+
},
109+
{
110+
"name": "unknown_status",
111+
"status_code": 400,
112+
"json_data": {
113+
"message": "Unknown pipeline status: InvalidStatus",
114+
"code": "UNKNOWN_STATUS",
115+
"current_status": "InvalidStatus",
116+
},
117+
"expected_error": errors.UnknownStatusError,
118+
"error_message": "Unknown pipeline status: InvalidStatus",
119+
},
120+
{
121+
"name": "pipeline_already_in_state",
122+
"status_code": 400,
123+
"json_data": {
124+
"message": "Pipeline is already in Running state",
125+
"code": "PIPELINE_ALREADY_IN_STATE",
126+
"current_status": "Running",
127+
"requested_status": "Running",
128+
},
129+
"expected_error": errors.PipelineAlreadyInStateError,
130+
"error_message": "Pipeline is already in Running state",
131+
},
132+
{
133+
"name": "pipeline_in_transition",
134+
"status_code": 400,
135+
"json_data": {
136+
"message": (
137+
"Pipeline is currently transitioning from Pausing state, "
138+
"cannot perform Stopping operation"
139+
),
140+
"code": "PIPELINE_IN_TRANSITION",
141+
"current_status": "Pausing",
142+
"requested_status": "Stopping",
143+
},
144+
"expected_error": errors.PipelineInTransitionError,
145+
"error_message": (
146+
"Pipeline is currently transitioning from Pausing state, "
147+
"cannot perform Stopping operation"
148+
),
149+
},
150+
{
151+
"name": "invalid_json",
152+
"status_code": 400,
153+
"json_data": {"message": "invalid json: unexpected end of JSON input"},
154+
"expected_error": errors.InvalidJsonError,
155+
"error_message": "invalid json: unexpected end of JSON input",
156+
},
157+
{
158+
"name": "empty_pipeline_id",
159+
"status_code": 400,
160+
"json_data": {"message": "pipeline id cannot be empty"},
161+
"expected_error": errors.EmptyPipelineIdError,
162+
"error_message": "pipeline id cannot be empty",
163+
},
164+
{
165+
"name": "pipeline_deletion_state_violation",
166+
"status_code": 400,
167+
"json_data": {
168+
"message": (
169+
"pipeline can only be deleted if it's stopped or terminated, "
170+
"current status: Running"
171+
),
172+
"field": {"current_status": "Running"},
173+
},
174+
"expected_error": errors.PipelineDeletionStateViolationError,
175+
"error_message": (
176+
"pipeline can only be deleted if it's stopped or terminated, "
177+
"current status: Running"
178+
),
179+
},
79180
]
80181

81182

0 commit comments

Comments
 (0)