Skip to content

Commit e02d047

Browse files
feat: Add new stream for pipeline jobs
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
1 parent 8548960 commit e02d047

12 files changed

Lines changed: 404 additions & 54 deletions

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ uv tool install git+https://github.com/Matatika/tap-meltano-cloud.git@main
1818
|:-------|:---------|:------------|
1919
| `workspaces` | `GET /workspaces` | `id` |
2020
| `pipelines` | `GET /workspaces/{workspaceId}/pipelines` | `id` |
21+
| `pipeline_jobs` | `GET /pipelines/{pipelineId}/jobs` | `id` |
2122
| `datasets` | `GET /workspaces/{workspaceId}/datasets` | `id` |
2223
| `jobs` | `GET /workspaces/{workspaceId}/jobs` | `id` |
2324
| `channels` | `GET /workspaces/{workspaceId}/channels` | `id` |

tap_meltano_cloud/streams/base.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,20 @@ def get_new_paginator(self) -> BaseAPIPaginator | None:
6565
return None
6666

6767

68-
class _WorkspaceChildSchema(StreamSchema[str]):
68+
class WorkspaceChildSchema(StreamSchema[str]):
6969
"""Schema for all workspace-scoped streams."""
7070

7171
@override
7272
def get_stream_schema(self, *args: Any, **kwargs: Any) -> dict:
7373
schema = super().get_stream_schema(*args, **kwargs)
7474
schema["properties"]["workspaceId"] = {
7575
"format": "uuid",
76-
"type": ["string", "null"],
76+
"type": "string",
7777
}
7878
return schema
7979

8080

81-
class _WorkspaceSchema(StreamSchema[str]):
81+
class WorkspaceSchema(StreamSchema[str]):
8282
"""Schema for workspace streams — excludes sensitive fields.
8383
8484
``deploymentSecret`` and ``sshPrivateKey`` can contain sensitive credentials,
@@ -94,7 +94,7 @@ def get_stream_schema(self, *args: Any, **kwargs: Any) -> dict:
9494
return schema
9595

9696

97-
class _PipelineSchema(_WorkspaceChildSchema):
97+
class PipelineSchema(WorkspaceChildSchema):
9898
"""Schema for pipeline streams — excludes the ``properties`` field.
9999
100100
Pipeline properties can contain sensitive data and there is currently no
@@ -109,7 +109,20 @@ def get_stream_schema(self, *args: Any, **kwargs: Any) -> dict:
109109
return schema
110110

111111

112-
class _DataComponentSchema(_WorkspaceChildSchema):
112+
class PipelineJobSchema(WorkspaceChildSchema):
113+
"""Schema for pipeline job streams."""
114+
115+
@override
116+
def get_stream_schema(self, *args: Any, **kwargs: Any) -> dict:
117+
schema = super().get_stream_schema(*args, **kwargs)
118+
schema["properties"]["pipelineId"] = {
119+
"format": "uuid",
120+
"type": "string",
121+
}
122+
return schema
123+
124+
125+
class DataComponentSchema(WorkspaceChildSchema):
113126
"""Schema for data component streams — excludes the ``properties`` field.
114127
115128
Data component properties can contain sensitive data and there is currently no
@@ -124,7 +137,7 @@ def get_stream_schema(self, *args: Any, **kwargs: Any) -> dict:
124137
return schema
125138

126139

127-
class _DataStoreSchema(_WorkspaceChildSchema):
140+
class DataStoreSchema(WorkspaceChildSchema):
128141
"""Schema for data store streams — excludes sensitive fields.
129142
130143
``properties`` and ``jdbcUrl`` can contain sensitive data (credentials,

tap_meltano_cloud/streams/by_workspace.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@
55
import sys
66
from typing import TYPE_CHECKING, Any
77

8+
from singer_sdk import StreamSchema
9+
810
from .base import (
911
OPENAPI_SCHEMA,
1012
MeltanoCloudStream,
11-
_DataComponentSchema,
12-
_DataStoreSchema,
13-
_PipelineSchema,
14-
_WorkspaceChildSchema,
15-
_WorkspaceSchema,
13+
DataComponentSchema,
14+
DataStoreSchema,
15+
PipelineSchema,
16+
PipelineJobSchema,
17+
WorkspaceChildSchema,
18+
WorkspaceSchema,
1619
)
1720

1821
if sys.version_info >= (3, 12):
@@ -41,13 +44,14 @@ def __init__(self, *args: Any, workspace_ids: list[str], **kwargs: Any) -> None:
4144
def partitions(self) -> list[dict]:
4245
return self._partitions
4346

47+
4448
class WorkspacesStream(_ByWorkspaceStream):
4549
"""Fetches individual workspaces by ID."""
4650

4751
name = "workspaces"
4852
path = "/workspaces/{workspaceId}"
4953
records_jsonpath = "$"
50-
schema = _WorkspaceSchema(OPENAPI_SCHEMA, key="WorkspaceResource")
54+
schema = WorkspaceSchema(OPENAPI_SCHEMA, key="WorkspaceResource")
5155

5256
@override
5357
def post_process(self, row: dict, context: Context | None = None) -> dict | None:
@@ -62,21 +66,39 @@ class PipelinesStream(_ByWorkspaceStream):
6266
name = "pipelines"
6367
path = "/workspaces/{workspaceId}/pipelines"
6468
records_jsonpath = "$._embedded.pipelines[*]"
65-
schema = _PipelineSchema(OPENAPI_SCHEMA, key="PipelineResource")
69+
schema = PipelineSchema(OPENAPI_SCHEMA, key="PipelineResource")
6670

6771
@override
6872
def post_process(self, row: dict, context: Context | None = None) -> dict | None:
6973
row.pop("properties", None)
7074
return super().post_process(row, context)
7175

76+
def get_child_context(self, record: dict, context: Context | None = None) -> Context:
77+
"""Get child context for a pipeline record."""
78+
return {
79+
"pipelineId": record["id"],
80+
"workspaceId": context["workspaceId"] if context else None,
81+
}
82+
83+
84+
class PipelineJobsStream(_ByWorkspaceStream):
85+
"""Jobs stream."""
86+
87+
name = "pipeline_jobs"
88+
path = "/pipelines/{pipelineId}/jobs"
89+
records_jsonpath = "$._embedded.jobs[*]"
90+
schema = PipelineJobSchema(OPENAPI_SCHEMA, key="JobResource")
91+
92+
parent_stream_type = PipelinesStream
93+
7294

7395
class DatasetsStream(_ByWorkspaceStream):
7496
"""Datasets stream."""
7597

7698
name = "datasets"
7799
path = "/workspaces/{workspaceId}/datasets"
78100
records_jsonpath = "$._embedded.datasets[*]"
79-
schema = _WorkspaceChildSchema(OPENAPI_SCHEMA, key="DatasetResource")
101+
schema = WorkspaceChildSchema(OPENAPI_SCHEMA, key="DatasetResource")
80102

81103

82104
class JobsStream(_ByWorkspaceStream):
@@ -85,7 +107,7 @@ class JobsStream(_ByWorkspaceStream):
85107
name = "jobs"
86108
path = "/workspaces/{workspaceId}/jobs"
87109
records_jsonpath = "$._embedded.jobs[*]"
88-
schema = _WorkspaceChildSchema(OPENAPI_SCHEMA, key="JobResource")
110+
schema = WorkspaceChildSchema(OPENAPI_SCHEMA, key="JobResource")
89111

90112

91113
class ChannelsStream(_ByWorkspaceStream):
@@ -94,7 +116,7 @@ class ChannelsStream(_ByWorkspaceStream):
94116
name = "channels"
95117
path = "/workspaces/{workspaceId}/channels"
96118
records_jsonpath = "$._embedded.channels[*]"
97-
schema = _WorkspaceChildSchema(OPENAPI_SCHEMA, key="ChannelResource")
119+
schema = WorkspaceChildSchema(OPENAPI_SCHEMA, key="ChannelResource")
98120

99121

100122
class DataStoresStream(_ByWorkspaceStream):
@@ -103,7 +125,7 @@ class DataStoresStream(_ByWorkspaceStream):
103125
name = "datastores"
104126
path = "/workspaces/{workspaceId}/datastores"
105127
records_jsonpath = "$._embedded.datastores[*]"
106-
schema = _DataStoreSchema(OPENAPI_SCHEMA, key="DataStoreResource")
128+
schema = DataStoreSchema(OPENAPI_SCHEMA, key="DataStoreResource")
107129

108130
@override
109131
def post_process(self, row: dict, context: Context | None = None) -> dict | None:
@@ -118,7 +140,7 @@ class DataComponentsStream(_ByWorkspaceStream):
118140
name = "datacomponents"
119141
path = "/workspaces/{workspaceId}/datacomponents"
120142
records_jsonpath = "$._embedded.datacomponents[*]"
121-
schema = _DataComponentSchema(OPENAPI_SCHEMA, key="DataComponentResource")
143+
schema = DataComponentSchema(OPENAPI_SCHEMA, key="DataComponentResource")
122144

123145
@override
124146
def post_process(self, row: dict, context: Context | None = None) -> dict | None:

tap_meltano_cloud/streams/me.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77

88
from .base import (
99
OPENAPI_SCHEMA,
10+
DataComponentSchema,
11+
DataStoreSchema,
1012
MeltanoCloudStream,
11-
_DataComponentSchema,
12-
_DataStoreSchema,
13-
_PipelineSchema,
14-
_WorkspaceChildSchema,
15-
_WorkspaceSchema,
13+
PipelineSchema,
14+
WorkspaceChildSchema,
15+
WorkspaceSchema,
1616
)
1717

1818
if sys.version_info >= (3, 12):
@@ -32,7 +32,7 @@ class WorkspacesStream(MeltanoCloudStream):
3232
name = "workspaces"
3333
path = "/workspaces"
3434
records_jsonpath = "$._embedded.workspaces[*]"
35-
schema = _WorkspaceSchema(OPENAPI_SCHEMA, key="WorkspaceResource")
35+
schema = WorkspaceSchema(OPENAPI_SCHEMA, key="WorkspaceResource")
3636

3737
@override
3838
def generate_child_contexts(
@@ -62,21 +62,39 @@ class PipelinesStream(_WorkspaceChildStream):
6262
name = "pipelines"
6363
path = "/workspaces/{workspaceId}/pipelines"
6464
records_jsonpath = "$._embedded.pipelines[*]"
65-
schema = _PipelineSchema(OPENAPI_SCHEMA, key="PipelineResource")
65+
schema = PipelineSchema(OPENAPI_SCHEMA, key="PipelineResource")
6666

6767
@override
6868
def post_process(self, row: dict, context: Context | None = None) -> dict | None:
6969
row.pop("properties", None)
7070
return super().post_process(row, context)
7171

72+
def get_child_context(self, record: dict, context: Context | None = None) -> Context:
73+
"""Get child context for a pipeline record."""
74+
return {
75+
"pipelineId": record["id"],
76+
"workspaceId": context["workspaceId"] if context else None,
77+
}
78+
79+
80+
class PipelineJobsStream(_WorkspaceChildStream):
81+
"""Jobs stream."""
82+
83+
name = "pipeline_jobs"
84+
path = "/pipelines/{pipelineId}/jobs"
85+
records_jsonpath = "$._embedded.jobs[*]"
86+
schema = WorkspaceChildSchema(OPENAPI_SCHEMA, key="JobResource")
87+
88+
parent_stream_type = PipelinesStream # type: ignore[assignment]
89+
7290

7391
class DatasetsStream(_WorkspaceChildStream):
7492
"""Datasets stream."""
7593

7694
name = "datasets"
7795
path = "/workspaces/{workspaceId}/datasets"
7896
records_jsonpath = "$._embedded.datasets[*]"
79-
schema = _WorkspaceChildSchema(OPENAPI_SCHEMA, key="DatasetResource")
97+
schema = WorkspaceChildSchema(OPENAPI_SCHEMA, key="DatasetResource")
8098

8199

82100
class JobsStream(_WorkspaceChildStream):
@@ -85,7 +103,7 @@ class JobsStream(_WorkspaceChildStream):
85103
name = "jobs"
86104
path = "/workspaces/{workspaceId}/jobs"
87105
records_jsonpath = "$._embedded.jobs[*]"
88-
schema = _WorkspaceChildSchema(OPENAPI_SCHEMA, key="JobResource")
106+
schema = WorkspaceChildSchema(OPENAPI_SCHEMA, key="JobResource")
89107

90108

91109
class ChannelsStream(_WorkspaceChildStream):
@@ -94,7 +112,7 @@ class ChannelsStream(_WorkspaceChildStream):
94112
name = "channels"
95113
path = "/workspaces/{workspaceId}/channels"
96114
records_jsonpath = "$._embedded.channels[*]"
97-
schema = _WorkspaceChildSchema(OPENAPI_SCHEMA, key="ChannelResource")
115+
schema = WorkspaceChildSchema(OPENAPI_SCHEMA, key="ChannelResource")
98116

99117

100118
class DataStoresStream(_WorkspaceChildStream):
@@ -103,7 +121,7 @@ class DataStoresStream(_WorkspaceChildStream):
103121
name = "datastores"
104122
path = "/workspaces/{workspaceId}/datastores"
105123
records_jsonpath = "$._embedded.datastores[*]"
106-
schema = _DataStoreSchema(OPENAPI_SCHEMA, key="DataStoreResource")
124+
schema = DataStoreSchema(OPENAPI_SCHEMA, key="DataStoreResource")
107125

108126
@override
109127
def post_process(self, row: dict, context: Context | None = None) -> dict | None:
@@ -118,7 +136,7 @@ class DataComponentsStream(_WorkspaceChildStream):
118136
name = "datacomponents"
119137
path = "/workspaces/{workspaceId}/datacomponents"
120138
records_jsonpath = "$._embedded.datacomponents[*]"
121-
schema = _DataComponentSchema(OPENAPI_SCHEMA, key="DataComponentResource")
139+
schema = DataComponentSchema(OPENAPI_SCHEMA, key="DataComponentResource")
122140

123141
@override
124142
def post_process(self, row: dict, context: Context | None = None) -> dict | None:

tap_meltano_cloud/tap.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def discover_streams(self) -> list[base.MeltanoCloudStream]:
6868
return [
6969
by_workspace.WorkspacesStream(self, workspace_ids=workspace_ids),
7070
by_workspace.PipelinesStream(self, workspace_ids=workspace_ids),
71+
by_workspace.PipelineJobsStream(self, workspace_ids=workspace_ids),
7172
by_workspace.DatasetsStream(self, workspace_ids=workspace_ids),
7273
by_workspace.JobsStream(self, workspace_ids=workspace_ids),
7374
by_workspace.ChannelsStream(self, workspace_ids=workspace_ids),
@@ -78,6 +79,7 @@ def discover_streams(self) -> list[base.MeltanoCloudStream]:
7879
return [
7980
me.WorkspacesStream(self),
8081
me.PipelinesStream(self),
82+
me.PipelineJobsStream(self),
8183
me.DatasetsStream(self),
8284
me.JobsStream(self),
8385
me.ChannelsStream(self),

tests/__snapshots__/test_schema_evolution/test_catalog_changes[channels].json

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,10 +356,7 @@
356356
},
357357
"workspaceId": {
358358
"format": "uuid",
359-
"type": [
360-
"string",
361-
"null"
362-
]
359+
"type": "string"
363360
}
364361
},
365362
"required": [

tests/__snapshots__/test_schema_evolution/test_catalog_changes[datacomponents].json

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31014,10 +31014,7 @@
3101431014
},
3101531015
"workspaceId": {
3101631016
"format": "uuid",
31017-
"type": [
31018-
"string",
31019-
"null"
31020-
]
31017+
"type": "string"
3102131018
}
3102231019
},
3102331020
"required": [

tests/__snapshots__/test_schema_evolution/test_catalog_changes[datasets].json

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -426,10 +426,7 @@
426426
},
427427
"workspaceId": {
428428
"format": "uuid",
429-
"type": [
430-
"string",
431-
"null"
432-
]
429+
"type": "string"
433430
}
434431
},
435432
"type": "object"

tests/__snapshots__/test_schema_evolution/test_catalog_changes[datastores].json

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31045,10 +31045,7 @@
3104531045
},
3104631046
"workspaceId": {
3104731047
"format": "uuid",
31048-
"type": [
31049-
"string",
31050-
"null"
31051-
]
31048+
"type": "string"
3105231049
}
3105331050
},
3105431051
"required": [

tests/__snapshots__/test_schema_evolution/test_catalog_changes[jobs].json

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,10 +302,7 @@
302302
},
303303
"workspaceId": {
304304
"format": "uuid",
305-
"type": [
306-
"string",
307-
"null"
308-
]
305+
"type": "string"
309306
}
310307
},
311308
"type": "object"

0 commit comments

Comments
 (0)