Skip to content

feat(ingest): reconcile upstream lineage URN casing against DataHub#18004

Draft
puneetagarwal-datahub wants to merge 7 commits into
masterfrom
feat/normalize-lineage-urn-casing
Draft

feat(ingest): reconcile upstream lineage URN casing against DataHub#18004
puneetagarwal-datahub wants to merge 7 commits into
masterfrom
feat/normalize-lineage-urn-casing

Conversation

@puneetagarwal-datahub

@puneetagarwal-datahub puneetagarwal-datahub commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Summary

DataHub compares URNs as exact, case-sensitive strings. When two sources refer to the same physical table with different casing — e.g. a warehouse emits urn:li:dataset:(urn:li:dataPlatform:snowflake,DB.SCHEMA.TABLE,PROD) (uppercase) while a BI tool references ...,db.schema.table,PROD) (lowercase) — DataHub treats them as two different entities, so the lineage edge between them is not drawn and downstream multi-hop lineage silently breaks.

The only existing mitigation is the per-source convert_urns_to_lowercase flag, which keeps lineage connected by lowercasing every URN. That requires coordinating the flag across all sources, isn't available on some BI connectors (Looker, Tableau), and on case-sensitive platforms risks merging genuinely distinct tables (MyTable vs mytable) while also losing the warehouse's display casing.

This PR adds an opt-in ingestion-framework work unit processor that takes a different approach: instead of flattening identities to lowercase, it resolves each upstream warehouse reference to the casing of the entity that already exists in DataHub — per ingestion, with no global coordination, preserving the warehouse's original casing, and only when the match is unambiguous.

What it does

  • New AutoNormalizeLineageUrnsProcessor (work unit processor). For each configured upstream platform it bulk-loads that platform's URNs + schemas once via the existing SchemaResolverProvider, then resolves every lineage reference locally (no per-URN round trips).
  • Resolution is bidirectional and collision-safe: an exact match always wins; a unique case-insensitive match is healed; ambiguous collisions (e.g. orders vs Orders on a case-sensitive platform) and no-match cases are left unchanged.
  • Fixes both table-level references (UpstreamLineage, DashboardInfo) and column-level references (FineGrainedLineage field paths, using the resolver's schema info to correct column casing).
  • Only references to warehouse assets are rewritten — the entity the aspect is attached to and downstream fields are never touched. Intended for BI-tool / cross-platform ingestions; the warehouse's own reported casing/identity is respected.
  • Adds an optional matchType (EXACT / NORMALIZED) discriminator to the Upstream and FineGrainedLineage aspects, populated by the processor, so the UI can later explain whether a reference was matched exactly or healed via normalization.

Configuration

Opt-in, disabled by default. Enable on a BI-tool ingestion recipe and point it at the upstream warehouse platform(s):

flags:
  normalize_lineage_urn_casing:
    enabled: true
    upstream_platforms:
      - platform: snowflake
        platform_instance: my_instance   # optional
        env: PROD

No-op without a DataHub backend connection. Only reconciles against entities that already exist at ingestion time.

Testing

  • 18 unit tests covering both directions, exact-match precedence, ambiguous-collision safety, column-casing correction, dashboardInfo refs, matchType population, and the enable/disable gate.
  • Existing SchemaResolver test suite passes unchanged.
  • Validated end-to-end against a running instance: bidirectional table healing, column-casing correction, and matchType persisting through GMS.

Notes

  • Additive and backward-compatible: the new aspect field is optional and the behavior is behind an opt-in flag (default off). Embedding aspects' schema versions are bumped and RestLI snapshots regenerated accordingly.
  • Existing broken edges already in the graph are not retroactively healed; re-ingesting the affected source after enabling the flag fixes them.

Checklist

  • PR conforms to the Contributing Guideline (PR Title Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated
  • Docs added/updated — usage guide at metadata-ingestion/docs/dev_guides/lineage_urn_casing.md
  • No breaking change (additive optional field + opt-in flag, default off)

🤖 Generated with Claude Code

puneetagarwal-datahub and others added 2 commits June 23, 2026 18:45
…casing

Add AutoNormalizeLineageUrnsProcessor, a framework work-unit processor that
reconciles the casing of upstream warehouse references in lineage against the
casing DataHub already stores, so casing mismatches between sources (e.g. an
uppercase Snowflake table referenced as lowercase by a BI tool, or vice versa)
no longer produce two disconnected lineage nodes.

- Config-driven via FlagsConfig.normalize_lineage_urn_casing (enabled +
  upstream_platforms list); opt-in, default off; no-op without a backend.
  Intended for BI-tool ingestions, not the warehouse ingestion itself.
- Reuses SchemaResolverProvider to bulk-load each upstream platform once, then
  resolves locally. New SchemaResolver.resolve_urn_casing() resolves
  bidirectionally via a normalized-URN index, preferring exact matches and
  leaving ambiguous collisions unchanged.
- Fixes table-level (UpstreamLineage, dashboardInfo) and column-level
  (FineGrainedLineage field paths, via match_columns_to_schema) references.
  Only upstream references are touched; the entity and downstream fields are not.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add a LineageMatchType enum (EXACT/NORMALIZED) and an optional matchType field to
the Upstream and FineGrainedLineage aspects, so the UI can explain whether an
upstream reference matched an existing entity exactly or was healed via casing
normalization. Schema versions of the embedding aspects (upstreamLineage,
dataJobInputOutput) are bumped to v2 accordingly.

The lineage URN casing processor now populates matchType: EXACT when a reference
already matched an existing entity, NORMALIZED when it was rewritten to heal a
casing mismatch, and leaves it unset when no reconciliation was performed.
Fine-grained entries aggregate to NORMALIZED if any field was rewritten, else EXACT.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@github-actions github-actions Bot added ingestion PR or Issue related to the ingestion of metadata devops PR or Issue related to DataHub backend & deployment labels Jun 23, 2026
@codecov

codecov Bot commented Jun 23, 2026

Copy link
Copy Markdown

❌ 35 Tests Failed:

Tests completed Failed Passed Skipped
10127 35 10092 167
View the top 3 failed test(s) by shortest run time
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_file_inference_without_extension]
Stack Traces | 0.201s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'file_inference_without_extension.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f7')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d786e510>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_folder_no_partition_glob]
Stack Traces | 0.203s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'folder_no_partition_glob.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f0')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d5ff2fd0>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_folder_partition_with_partition_autodetect_traverse_min_max]
Stack Traces | 0.204s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'folder_partition_with_partition_autodetect_traverse_min_max.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f8')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d4f378d0>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_bucket_wildcard_with_nested_table]
Stack Traces | 0.207s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'bucket_wildcard_with_nested_table.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_b0')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe55cd515d0>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListBuckets operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_multiple_specs_of_different_buckets]
Stack Traces | 0.207s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'multiple_specs_of_different_buckets.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar5')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d7210590>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_folder_no_partition_exclude]
Stack Traces | 0.211s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'folder_no_partition_exclude.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f2')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe48b2f2050>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_multiple_specs_of_different_buckets]
Stack Traces | 0.214s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'multiple_specs_of_different_buckets.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_m1')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d63c7010>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_folder_partition_update_schema]
Stack Traces | 0.215s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'folder_partition_update_schema.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f3')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe499029d10>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_folder_partition_update_schema_with_partition_autodetect]
Stack Traces | 0.217s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'folder_partition_update_schema_with_partition_autodetect.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f9')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d269d5d0>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_folder_partition_update_schema_with_partition_autodetect_and_wildcard_dir]
Stack Traces | 0.218s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'folder_partition_update_schema_with_partition_autodetect_and_wildcard_dir.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f10')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe4a1c674d0>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_folder_no_partition_exclude]
Stack Traces | 0.22s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'folder_no_partition_exclude.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar3')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe487457a10>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_single_file]
Stack Traces | 0.22s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'single_file.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar8')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d47f5c90>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_bucket_wildcard_as_table]
Stack Traces | 0.221s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'bucket_wildcard_as_table.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_b1')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d527ac50>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListBuckets operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_folder_partition_update_schema]
Stack Traces | 0.221s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'folder_partition_update_schema.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar4')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe48dd19ad0>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_multiple_files]
Stack Traces | 0.223s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'multiple_files.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_m2')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe48f5c7c90>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_multiple_files]
Stack Traces | 0.224s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'multiple_files.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar11')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe48d290510>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_single_file]
Stack Traces | 0.225s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'single_file.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_s0')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d5bf6510>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_folder_partition_keyval]
Stack Traces | 0.226s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'folder_partition_keyval.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar6')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d6037dd0>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_folder_no_partition]
Stack Traces | 0.227s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'folder_no_partition.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar10')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe4951f6690>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_folder_partition_with_partition_autodetect_traverse_all]
Stack Traces | 0.228s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'folder_partition_with_partition_autodetect_traverse_all.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f4')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe48d8cd990>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_allow_table]
Stack Traces | 0.23s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'allow_table.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_a0')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe4a17f5350>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_folder_partition_keyval]
Stack Traces | 0.231s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'folder_partition_keyval.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f5')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d6abce50>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_deny_table]
Stack Traces | 0.236s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'deny_table.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_d0')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d4feccd0>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_file_without_extension]
Stack Traces | 0.239s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'file_without_extension.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar9')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe48abac950>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_multiple_spec_for_files]
Stack Traces | 0.242s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'multiple_spec_for_files.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_m0')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d4f6c110>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_folder_no_partition]
Stack Traces | 0.252s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'folder_no_partition.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f12')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe48d8d5c50>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_folder_partition_basic]
Stack Traces | 0.26s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'folder_partition_basic.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f6')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d4ab91d0>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_bucket_wildcard_single_file]
Stack Traces | 0.262s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'bucket_wildcard_single_file.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_b2')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d4167850>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListBuckets operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_multiple_spec_for_files]
Stack Traces | 0.283s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'multiple_spec_for_files.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar2')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d596aa10>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_folder_no_partition_glob]
Stack Traces | 0.404s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'folder_no_partition_glob.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar0')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d7b2c350>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_file_without_extension]
Stack Traces | 1.92s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'file_without_extension.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f11')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d7b7ed90>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_folder_no_partition_filename]
Stack Traces | 1.94s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'folder_no_partition_filename.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_f1')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe48ba8b0d0>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_folder_partition_basic]
Stack Traces | 1.95s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'folder_partition_basic.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar7')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe48a911190>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[shared_folder_no_partition_filename]
Stack Traces | 2.01s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/shared', 'folder_no_partition_filename.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_shar1')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d8a016d0>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError
tests.integration.s3.test_s3::test_data_lake_gcs_ingest[s3_bucket_wildcard_allow_table]
Stack Traces | 2.15s run time
pytestconfig = <_pytest.config.Config object at 0x7fe584d0db90>
s3_populate = None
source_file_tuple = ('..../s3/sources/s3', 'bucket_wildcard_allow_table.json')
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_data_lake_gcs_ingest_s3_b3')
mock_time = None

    @pytest.mark.integration
    @pytest.mark.parametrize(
        "source_file_tuple", shared_source_files + s3_source_files, ids=get_descriptive_id
    )
    def test_data_lake_gcs_ingest(
        pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time
    ):
        source_dir, source_file = source_file_tuple
        test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/"
    
        f = open(os.path.join(source_dir, source_file))
        source = json.load(f)
    
        config_dict = {}
    
        source["type"] = "gcs"
        source["config"]["credential"] = {
            "hmac_access_id": source["config"]["aws_config"]["aws_access_key_id"],
            "hmac_access_secret": source["config"]["aws_config"]["aws_secret_access_key"],
        }
        for path_spec in source["config"]["path_specs"]:
            path_spec["include"] = path_spec["include"].replace("s3://", "gs://")
        source["config"].pop("aws_config")
        source["config"].pop("profiling", None)
        source["config"].pop("sort_schema_fields", None)
        source["config"].pop("use_s3_bucket_tags", None)
        source["config"].pop("use_s3_content_type", None)
        source["config"].pop("use_s3_object_tags", None)
    
        config_dict["source"] = source
        config_dict["sink"] = {
            "type": "file",
            "config": {
                "filename": f"{tmp_path}/{source_file}",
            },
        }
    
        config_dict["run_id"] = source_file
    
        with patch("datahub.ingestion.source.gcs.gcs_utils.GCS_ENDPOINT_URL", None):
            pipeline = Pipeline.create(config_dict)
        pipeline.run()
>       pipeline.raise_from_status()

.../integration/s3/test_s3.py:259: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <datahub.ingestion.run.pipeline.Pipeline object at 0x7fe3d9e54910>
raise_warnings = False

    def raise_from_status(self, raise_warnings: bool = False) -> None:
        if self.source.get_report().failures:
>           raise PipelineExecutionError(
                "Source reported errors", self.source.get_report().failures
            )
E           datahub.configuration.common.PipelineExecutionError: ('Source reported errors', [{'title': 'Pipeline Error', 'message': 'Ingestion pipeline raised an unexpected exception!', 'context': ["<class 'botocore.exceptions.ClientError'>: An error occurred (SignatureDoesNotMatch) when calling the ListBuckets operation: Access denied."]}])

.../ingestion/run/pipeline.py:652: PipelineExecutionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Document the lineage URN casing normalization feature: the problem it solves,
how resolution works (bidirectional, exact-wins, collision-safe), what it fixes
(table + column level across upstreamLineage/fineGrainedLineage/dashboardInfo),
configuration reference, where to enable it (BI ingestions, not warehouse), the
matchType explainability field, and its requirements/limitations. Registered in
the ingestion guides sidebar.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…_lowercase

The guide implied no mitigation exists. Acknowledge the current
convert_urns_to_lowercase workaround, explain its limits (cross-source
coordination, unavailable on Looker/Tableau, lowercasing flattens display
casing and can merge distinct case-sensitive tables), and frame this feature as
resolving to the existing entity's casing rather than flattening identities.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…tion

Keep the normalize_lineage_urn_casing flag description coherent with the PR/
guide framing: clarify it resolves references to the existing entity's casing
rather than lowercasing every URN like convert_urns_to_lowercase.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add tests for real mixed-case identifiers (e.g. `DataHub` vs `datahub`):
heal lower->mixed and mixed->lower (both directions), upper->mixed, exact match
wins without mis-routing when both casings exist, and ambiguous third-casing
left unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The guide claimed a Snowflake-uppercase + BI-lowercase setup 'stays broken
regardless' because Looker/Tableau lack the flag. That's overstated — you can
lowercase the warehouse side to connect them. Clarify the real limitation: the
only lever is forcing lowercase, so you cannot connect them while preserving the
warehouse's original casing.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

devops PR or Issue related to DataHub backend & deployment ingestion PR or Issue related to the ingestion of metadata

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant