Skip to content

Commit b0c9044

Browse files
allow secrets in the connector config
1 parent aabd282 commit b0c9044

File tree

6 files changed

+128
-48
lines changed

6 files changed

+128
-48
lines changed

src/glassflow/client.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from .api_client import APIClient
44
from .models import errors, responses
5-
from .pipeline import Pipeline
5+
from .pipeline import ConnectorConfiguration, Pipeline
66
from .secret import Secret
77
from .space import Space
88

@@ -65,9 +65,9 @@ def create_pipeline(
6565
transformation_file: str = None,
6666
requirements: str = None,
6767
source_kind: str = None,
68-
source_config: dict = None,
68+
source_config: ConnectorConfiguration = None,
6969
sink_kind: str = None,
70-
sink_config: dict = None,
70+
sink_config: ConnectorConfiguration = None,
7171
env_vars: list[dict[str, str]] = None,
7272
state: str = "running",
7373
metadata: dict = None,

src/glassflow/models/api/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from .api import (
2+
ConnectorValueSecretRef,
3+
ConnectorValueValue,
24
ConsumeOutputEvent,
35
CreatePipeline,
46
CreateSecret,
@@ -32,4 +34,6 @@
3234
"ListSpaceScopes",
3335
"ListPipelines",
3436
"FunctionEnvironments",
37+
"ConnectorValueSecretRef",
38+
"ConnectorValueValue",
3539
]

src/glassflow/pipeline.py

+33-7
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
11
from __future__ import annotations
22

3+
from typing import Union
4+
5+
from typing_extensions import TypeAlias
6+
37
from .client import APIClient
48
from .models import api, errors, operations, responses
59
from .models.responses.pipeline import AccessToken
610
from .pipeline_data import PipelineDataSink, PipelineDataSource
11+
from .secret import Secret
12+
13+
ConnectorConfiguration: TypeAlias = dict[str, Union[str, list, Secret]]
714

815

916
class Pipeline(APIClient):
@@ -14,9 +21,9 @@ def __init__(
1421
space_id: str | None = None,
1522
id: str | None = None,
1623
source_kind: str | None = None,
17-
source_config: dict | None = None,
24+
source_config: ConnectorConfiguration | None = None,
1825
sink_kind: str | None = None,
19-
sink_config: dict | None = None,
26+
sink_config: ConnectorConfiguration | None = None,
2027
requirements: str | None = None,
2128
transformation_file: str | None = None,
2229
env_vars: list[dict[str, str]] | None = None,
@@ -182,9 +189,9 @@ def update(
182189
requirements: str | None = None,
183190
metadata: dict | None = None,
184191
source_kind: str | None = None,
185-
source_config: dict | None = None,
192+
source_config: ConnectorConfiguration | None = None,
186193
sink_kind: str | None = None,
187-
sink_config: dict | None = None,
194+
sink_config: ConnectorConfiguration | None = None,
188195
env_vars: list[dict[str, str]] | None = None,
189196
) -> Pipeline:
190197
"""
@@ -405,15 +412,19 @@ def _request(
405412
) from e
406413
raise e
407414

408-
@staticmethod
409415
def _fill_connector(
410-
connector_type: str, kind: str, config: dict
416+
self, connector_type: str, kind: str, config: dict
411417
) -> api.SourceConnector | api.SinkConnector:
412418
"""Format connector input"""
413419
if not kind and not config:
414420
connector = None
415421
elif kind and config:
416-
connector = dict(kind=kind, config=config)
422+
connector = dict(
423+
kind=kind,
424+
configuration={
425+
k: self._format_connector_config_value(v) for k, v in config.items()
426+
},
427+
)
417428
else:
418429
raise errors.ConnectorConfigValueError(connector_type)
419430

@@ -424,6 +435,21 @@ def _fill_connector(
424435
else:
425436
raise ValueError("connector_type must be 'source' or 'sink'")
426437

438+
@staticmethod
439+
def _format_connector_config_value(value: str | list | Secret):
440+
"""Formats configuration values to match API expectations"""
441+
if isinstance(value, Secret):
442+
config_value = api.ConnectorValueSecretRef(
443+
**{"secret_ref": {"type": "organization", "key": value.key}}
444+
)
445+
elif isinstance(value, list):
446+
config_value = value
447+
else:
448+
config_value = api.ConnectorValueValue(
449+
value=value,
450+
)
451+
return config_value
452+
427453
def _list_access_tokens(self) -> Pipeline:
428454
endpoint = f"/pipelines/{self.id}/access_tokens"
429455
http_res = self._request(method="GET", endpoint=endpoint)

tests/glassflow/integration_tests/conftest.py

+33-33
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,38 @@ def space_with_random_id_and_invalid_token():
4949

5050

5151
@pytest.fixture
52-
def pipeline(client, creating_space):
52+
def secret(client):
53+
return Secret(
54+
key="SecretKey",
55+
value="SecretValue",
56+
personal_access_token=client.personal_access_token,
57+
)
58+
59+
60+
@pytest.fixture
61+
def creating_secret(secret):
62+
secret.create()
63+
yield secret
64+
secret.delete()
65+
66+
67+
@pytest.fixture
68+
def secret_with_invalid_key_and_token():
69+
return Secret(
70+
key="InvalidSecretKey",
71+
personal_access_token="invalid-token",
72+
)
73+
74+
75+
@pytest.fixture
76+
def secret_with_invalid_key(client):
77+
return Secret(
78+
key="InvalidSecretKey", personal_access_token=client.personal_access_token
79+
)
80+
81+
82+
@pytest.fixture
83+
def pipeline(client, creating_space, creating_secret):
5384
return Pipeline(
5485
name="test_pipeline",
5586
space_id=creating_space.id,
@@ -60,7 +91,7 @@ def pipeline(client, creating_space):
6091
source_config={
6192
"project_id": "my-project-id",
6293
"subscription_id": "my-subscription-id",
63-
"credentials_json": "my-credentials.json",
94+
"credentials_json": creating_secret,
6495
},
6596
)
6697

@@ -123,34 +154,3 @@ def sink(source_with_published_events):
123154
pipeline_id=source_with_published_events.pipeline_id,
124155
pipeline_access_token=source_with_published_events.pipeline_access_token,
125156
)
126-
127-
128-
@pytest.fixture
129-
def secret(client):
130-
return Secret(
131-
key="SecretKey",
132-
value="SecretValue",
133-
personal_access_token=client.personal_access_token,
134-
)
135-
136-
137-
@pytest.fixture
138-
def creating_secret(secret):
139-
secret.create()
140-
yield secret
141-
secret.delete()
142-
143-
144-
@pytest.fixture
145-
def secret_with_invalid_key_and_token():
146-
return Secret(
147-
key="InvalidSecretKey",
148-
personal_access_token="invalid-token",
149-
)
150-
151-
152-
@pytest.fixture
153-
def secret_with_invalid_key(client):
154-
return Secret(
155-
key="InvalidSecretKey", personal_access_token=client.personal_access_token
156-
)

tests/glassflow/unit_tests/conftest.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,12 @@ def fetch_pipeline_response():
7070
"space_name": "test-space-name",
7171
"source_connector": {
7272
"kind": "google_pubsub",
73-
"config": {
74-
"project_id": "test-project",
75-
"subscription_id": "test-subscription",
76-
"credentials_json": "credentials.json",
73+
"configuration": {
74+
"project_id": {"value": "test-project"},
75+
"subscription_id": {"value": "test-subscription"},
76+
"credentials_json": {
77+
"secret_ref": {"type": "organization", "key": "credentialsJson"}
78+
},
7779
},
7880
},
7981
"sink_connector": {

tests/glassflow/unit_tests/pipeline_test.py

+49-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import pytest
22

3+
from glassflow import Pipeline, Secret
34
from glassflow.models import errors
4-
from glassflow.pipeline import Pipeline
55

66

77
def test_pipeline_with_transformation_file():
@@ -47,6 +47,54 @@ def test_pipeline_fail_with_connection_config_value_error():
4747
)
4848

4949

50+
def test_pipeline_with_connector_secret_ok():
51+
p = Pipeline(
52+
transformation_file="tests/data/transformation.py",
53+
personal_access_token="test-token",
54+
sink_kind="webhook",
55+
sink_config={
56+
"url": Secret(key="testKey", personal_access_token="test-token"),
57+
"method": "POST",
58+
"headers": [{"name": "Content-Type", "value": "application/json"}],
59+
},
60+
source_kind="google_pubsub",
61+
source_config={
62+
"project_id": "test-project-id",
63+
"subscription_id": "test-subscription-id",
64+
"credentials_json": Secret(
65+
key="testCredentials", personal_access_token="test-token"
66+
),
67+
},
68+
)
69+
70+
expected_source_connector = {
71+
"kind": "google_pubsub",
72+
"configuration": {
73+
"project_id": {"value": "test-project-id"},
74+
"subscription_id": {"value": "test-subscription-id"},
75+
"credentials_json": {
76+
"secret_ref": {"type": "organization", "key": "testCredentials"}
77+
},
78+
},
79+
}
80+
expected_sink_connector = {
81+
"kind": "webhook",
82+
"configuration": {
83+
"url": {"secret_ref": {"type": "organization", "key": "testKey"}},
84+
"method": {"value": "POST"},
85+
"headers": [{"name": "Content-Type", "value": "application/json"}],
86+
},
87+
}
88+
assert (
89+
p.source_connector.model_dump(mode="json", exclude_none=True)
90+
== expected_source_connector
91+
)
92+
assert (
93+
p.sink_connector.model_dump(mode="json", exclude_none=True)
94+
== expected_sink_connector
95+
)
96+
97+
5098
def test_fetch_pipeline_ok(
5199
get_pipeline_request_mock,
52100
get_access_token_request_mock,

0 commit comments

Comments
 (0)