refactor: Remove redundant import aliases in api_util.py #2524
3 fail, 16 skipped, 353 pass in 19m 54s
Annotations
Check warning on line 0 in tests.integration_tests.cloud.test_custom_definitions
github-actions / PyTest Results (Full)
test_publish_custom_yaml_source (tests.integration_tests.cloud.test_custom_definitions) failed
build/test-results/test-results.xml [took 0s]
Raw output
airbyte_api.errors.sdkerror.SDKError: API error occurred: Status 500
{"status":500,"type":"https://reference.airbyte.com/reference/errors","title":"unexpected-problem","detail":"An unexpected problem has occurred. If this is an error that needs to be addressed, please submit a pull request or github issue.","documentationUrl":null,"data":{"message":"get(...) must not be null"}}
cloud_workspace = CloudWorkspace(workspace_id='19d7a891-8e0e-40ac-8a8c-5faf8d11e47c', client_id=<SecretString: ****>, client_secret=<SecretString: ****>, api_root='https://api.airbyte.com/v1')
@pytest.mark.requires_creds
def test_publish_custom_yaml_source(
cloud_workspace: CloudWorkspace,
) -> None:
"""Test publishing a custom YAML source definition."""
from airbyte._util import text_util
name = f"test-yaml-source-{text_util.generate_random_suffix()}"
> result = cloud_workspace.publish_custom_source_definition(
name=name,
manifest_yaml=TEST_YAML_MANIFEST,
unique=True,
pre_validate=True,
)
tests/integration_tests/cloud/test_custom_definitions.py:44:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
airbyte/cloud/workspaces.py:543: in publish_custom_source_definition
result = api_util.create_custom_yaml_source_definition(
airbyte/_util/api_util.py:1001: in create_custom_yaml_source_definition
response = airbyte_instance.declarative_source_definitions.create_declarative_source_definition(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <airbyte_api.declarativesourcedefinitions.DeclarativeSourceDefinitions object at 0x7f49bf76caf0>
request = CreateDeclarativeSourceDefinitionRequest(create_declarative_source_definition_request=CreateDeclarativeSourceDefinitio...ctor', 'field_path': []}}}}]}, name='test-yaml-source-01k6kkd19'), workspace_id='19d7a891-8e0e-40ac-8a8c-5faf8d11e47c')
def create_declarative_source_definition(self, request: api.CreateDeclarativeSourceDefinitionRequest) -> api.CreateDeclarativeSourceDefinitionResponse:
r"""Create a declarative source definition."""
hook_ctx = HookContext(operation_id='createDeclarativeSourceDefinition', oauth2_scopes=[], security_source=self.sdk_configuration.security)
base_url = utils.template_url(*self.sdk_configuration.get_server_details())
url = utils.generate_url(base_url, '/workspaces/{workspaceId}/definitions/declarative_sources', request)
if callable(self.sdk_configuration.security):
headers, query_params = utils.get_security(self.sdk_configuration.security())
else:
headers, query_params = utils.get_security(self.sdk_configuration.security)
req_content_type, data, form = utils.serialize_request_body(request, api.CreateDeclarativeSourceDefinitionRequest, "create_declarative_source_definition_request", False, False, 'json')
if req_content_type is not None and req_content_type not in ('multipart/form-data', 'multipart/mixed'):
headers['content-type'] = req_content_type
if data is None and form is None:
raise Exception('request body is required')
headers['Accept'] = 'application/json'
headers['user-agent'] = self.sdk_configuration.user_agent
client = self.sdk_configuration.client
try:
req = client.prepare_request(requests_http.Request('POST', url, params=query_params, data=data, files=form, headers=headers))
req = self.sdk_configuration.get_hooks().before_request(BeforeRequestContext(hook_ctx), req)
http_res = client.send(req)
except Exception as e:
_, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), None, e)
if e is not None:
raise e
if utils.match_status_codes(['4XX','5XX'], http_res.status_code):
result, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), http_res, None)
if e is not None:
raise e
if result is not None:
http_res = result
else:
http_res = self.sdk_configuration.get_hooks().after_success(AfterSuccessContext(hook_ctx), http_res)
res = api.CreateDeclarativeSourceDefinitionResponse(status_code=http_res.status_code, content_type=http_res.headers.get('Content-Type') or '', raw_response=http_res)
if http_res.status_code == 200:
# pylint: disable=no-else-return
if utils.match_content_type(http_res.headers.get('Content-Type') or '', 'application/json'):
out = utils.unmarshal_json(http_res.text, Optional[models.DeclarativeSourceDefinitionResponse])
res.declarative_source_definition_response = out
else:
content_type = http_res.headers.get('Content-Type')
raise errors.SDKError(f'unknown content-type received: {content_type}', http_res.status_code, http_res.text, http_res)
elif http_res.status_code >= 400 and http_res.status_code < 500:
raise errors.SDKError('API error occurred', http_res.status_code, http_res.text, http_res)
elif http_res.status_code >= 500 and http_res.status_code < 600:
> raise errors.SDKError('API error occurred', http_res.status_code, http_res.text, http_res)
E airbyte_api.errors.sdkerror.SDKError: API error occurred: Status 500
E {"status":500,"type":"https://reference.airbyte.com/reference/errors","title":"unexpected-problem","detail":"An unexpected problem has occurred. If this is an error that needs to be addressed, please submit a pull request or github issue.","documentationUrl":null,"data":{"message":"get(...) must not be null"}}
.venv/lib/python3.10/site-packages/airbyte_api/declarativesourcedefinitions.py:71: SDKError
Check warning on line 0 in tests.integration_tests.cloud.test_custom_definitions
github-actions / PyTest Results (Full)
test_publish_custom_docker_source (tests.integration_tests.cloud.test_custom_definitions) failed
build/test-results/test-results.xml [took 0s]
Raw output
airbyte_api.errors.sdkerror.SDKError: API error occurred: Status 400
{"status":400,"type":"https://reference.airbyte.com/reference/errors#bad-request","title":"bad-request","detail":"The request could not be understood by the server due to malformed syntax.","documentationUrl":null,"data":{"message":"Non-declarative definitions cannot be created or updated in Airbyte Cloud."}}
cloud_workspace = CloudWorkspace(workspace_id='19d7a891-8e0e-40ac-8a8c-5faf8d11e47c', client_id=<SecretString: ****>, client_secret=<SecretString: ****>, api_root='https://api.airbyte.com/v1')
@pytest.mark.requires_creds
def test_publish_custom_docker_source(
cloud_workspace: CloudWorkspace,
) -> None:
"""Test publishing a custom Docker source definition."""
from airbyte._util import text_util
name = f"test-docker-source-{text_util.generate_random_suffix()}"
> result = cloud_workspace.publish_custom_source_definition(
name=name,
docker_image="airbyte/test-source",
docker_tag="1.0.0",
documentation_url="https://example.com/docs",
unique=True,
)
tests/integration_tests/cloud/test_custom_definitions.py:93:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
airbyte/cloud/workspaces.py:552: in publish_custom_source_definition
result = api_util.create_custom_docker_source_definition(
airbyte/_util/api_util.py:1155: in create_custom_docker_source_definition
response = airbyte_instance.source_definitions.create_source_definition(request)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <airbyte_api.sourcedefinitions.SourceDefinitions object at 0x7f49c41c7700>
request = CreateSourceDefinitionRequest(create_definition_request=CreateDefinitionRequest(docker_image_tag='1.0.0', docker_repos...-source-01k6kkd1a', documentation_url='https://example.com/docs'), workspace_id='19d7a891-8e0e-40ac-8a8c-5faf8d11e47c')
def create_source_definition(self, request: api.CreateSourceDefinitionRequest) -> api.CreateSourceDefinitionResponse:
r"""Create a source definition."""
hook_ctx = HookContext(operation_id='createSourceDefinition', oauth2_scopes=[], security_source=self.sdk_configuration.security)
base_url = utils.template_url(*self.sdk_configuration.get_server_details())
url = utils.generate_url(base_url, '/workspaces/{workspaceId}/definitions/sources', request)
if callable(self.sdk_configuration.security):
headers, query_params = utils.get_security(self.sdk_configuration.security())
else:
headers, query_params = utils.get_security(self.sdk_configuration.security)
req_content_type, data, form = utils.serialize_request_body(request, api.CreateSourceDefinitionRequest, "create_definition_request", False, False, 'json')
if req_content_type is not None and req_content_type not in ('multipart/form-data', 'multipart/mixed'):
headers['content-type'] = req_content_type
if data is None and form is None:
raise Exception('request body is required')
headers['Accept'] = 'application/json'
headers['user-agent'] = self.sdk_configuration.user_agent
client = self.sdk_configuration.client
try:
req = client.prepare_request(requests_http.Request('POST', url, params=query_params, data=data, files=form, headers=headers))
req = self.sdk_configuration.get_hooks().before_request(BeforeRequestContext(hook_ctx), req)
http_res = client.send(req)
except Exception as e:
_, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), None, e)
if e is not None:
raise e
if utils.match_status_codes(['4XX','5XX'], http_res.status_code):
result, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), http_res, None)
if e is not None:
raise e
if result is not None:
http_res = result
else:
http_res = self.sdk_configuration.get_hooks().after_success(AfterSuccessContext(hook_ctx), http_res)
res = api.CreateSourceDefinitionResponse(status_code=http_res.status_code, content_type=http_res.headers.get('Content-Type') or '', raw_response=http_res)
if http_res.status_code == 200:
# pylint: disable=no-else-return
if utils.match_content_type(http_res.headers.get('Content-Type') or '', 'application/json'):
out = utils.unmarshal_json(http_res.text, Optional[models.DefinitionResponse])
res.definition_response = out
else:
content_type = http_res.headers.get('Content-Type')
raise errors.SDKError(f'unknown content-type received: {content_type}', http_res.status_code, http_res.text, http_res)
elif http_res.status_code >= 400 and http_res.status_code < 500:
> raise errors.SDKError('API error occurred', http_res.status_code, http_res.text, http_res)
E airbyte_api.errors.sdkerror.SDKError: API error occurred: Status 400
E {"status":400,"type":"https://reference.airbyte.com/reference/errors#bad-request","title":"bad-request","detail":"The request could not be understood by the server due to malformed syntax.","documentationUrl":null,"data":{"message":"Non-declarative definitions cannot be created or updated in Airbyte Cloud."}}
.venv/lib/python3.10/site-packages/airbyte_api/sourcedefinitions.py:69: SDKError
Check warning on line 0 in tests.integration_tests.cloud.test_custom_definitions
github-actions / PyTest Results (Full)
test_publish_custom_docker_destination (tests.integration_tests.cloud.test_custom_definitions) failed
build/test-results/test-results.xml [took 0s]
Raw output
airbyte_api.errors.sdkerror.SDKError: API error occurred: Status 400
{"status":400,"type":"https://reference.airbyte.com/reference/errors#bad-request","title":"bad-request","detail":"The request could not be understood by the server due to malformed syntax.","documentationUrl":null,"data":{"message":"Non-declarative definitions cannot be created or updated in Airbyte Cloud."}}
cloud_workspace = CloudWorkspace(workspace_id='19d7a891-8e0e-40ac-8a8c-5faf8d11e47c', client_id=<SecretString: ****>, client_secret=<SecretString: ****>, api_root='https://api.airbyte.com/v1')
@pytest.mark.requires_creds
def test_publish_custom_docker_destination(
cloud_workspace: CloudWorkspace,
) -> None:
"""Test publishing a custom Docker destination definition."""
from airbyte._util import text_util
name = f"test-docker-dest-{text_util.generate_random_suffix()}"
> result = cloud_workspace.publish_custom_destination_definition(
name=name,
docker_image="airbyte/test-destination",
docker_tag="1.0.0",
unique=True,
)
tests/integration_tests/cloud/test_custom_definitions.py:142:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
airbyte/cloud/workspaces.py:772: in publish_custom_destination_definition
result = api_util.create_custom_docker_destination_definition(
airbyte/_util/api_util.py:1303: in create_custom_docker_destination_definition
response = airbyte_instance.destination_definitions.create_destination_definition(request)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <airbyte_api.destinationdefinitions.DestinationDefinitions object at 0x7f49c63ba6e0>
request = CreateDestinationDefinitionRequest(create_definition_request=CreateDefinitionRequest(docker_image_tag='1.0.0', docker_...tion', name='test-docker-dest-01k6kktrv', documentation_url=None), workspace_id='19d7a891-8e0e-40ac-8a8c-5faf8d11e47c')
def create_destination_definition(self, request: api.CreateDestinationDefinitionRequest) -> api.CreateDestinationDefinitionResponse:
r"""Create a destination definition."""
hook_ctx = HookContext(operation_id='createDestinationDefinition', oauth2_scopes=[], security_source=self.sdk_configuration.security)
base_url = utils.template_url(*self.sdk_configuration.get_server_details())
url = utils.generate_url(base_url, '/workspaces/{workspaceId}/definitions/destinations', request)
if callable(self.sdk_configuration.security):
headers, query_params = utils.get_security(self.sdk_configuration.security())
else:
headers, query_params = utils.get_security(self.sdk_configuration.security)
req_content_type, data, form = utils.serialize_request_body(request, api.CreateDestinationDefinitionRequest, "create_definition_request", False, False, 'json')
if req_content_type is not None and req_content_type not in ('multipart/form-data', 'multipart/mixed'):
headers['content-type'] = req_content_type
if data is None and form is None:
raise Exception('request body is required')
headers['Accept'] = 'application/json'
headers['user-agent'] = self.sdk_configuration.user_agent
client = self.sdk_configuration.client
try:
req = client.prepare_request(requests_http.Request('POST', url, params=query_params, data=data, files=form, headers=headers))
req = self.sdk_configuration.get_hooks().before_request(BeforeRequestContext(hook_ctx), req)
http_res = client.send(req)
except Exception as e:
_, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), None, e)
if e is not None:
raise e
if utils.match_status_codes(['4XX','5XX'], http_res.status_code):
result, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), http_res, None)
if e is not None:
raise e
if result is not None:
http_res = result
else:
http_res = self.sdk_configuration.get_hooks().after_success(AfterSuccessContext(hook_ctx), http_res)
res = api.CreateDestinationDefinitionResponse(status_code=http_res.status_code, content_type=http_res.headers.get('Content-Type') or '', raw_response=http_res)
if http_res.status_code == 200:
# pylint: disable=no-else-return
if utils.match_content_type(http_res.headers.get('Content-Type') or '', 'application/json'):
out = utils.unmarshal_json(http_res.text, Optional[models.DefinitionResponse])
res.definition_response = out
else:
content_type = http_res.headers.get('Content-Type')
raise errors.SDKError(f'unknown content-type received: {content_type}', http_res.status_code, http_res.text, http_res)
elif http_res.status_code >= 400 and http_res.status_code < 500:
> raise errors.SDKError('API error occurred', http_res.status_code, http_res.text, http_res)
E airbyte_api.errors.sdkerror.SDKError: API error occurred: Status 400
E {"status":400,"type":"https://reference.airbyte.com/reference/errors#bad-request","title":"bad-request","detail":"The request could not be understood by the server due to malformed syntax.","documentationUrl":null,"data":{"message":"Non-declarative definitions cannot be created or updated in Airbyte Cloud."}}
.venv/lib/python3.10/site-packages/airbyte_api/destinationdefinitions.py:69: SDKError