Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenLineage check in gcs_upload_download system test #45681

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.utils.trigger_rule import TriggerRule

from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
from providers.tests.system.openlineage.operator import OpenLineageTestOperator

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
Expand Down Expand Up @@ -82,14 +83,19 @@
# [END howto_operator_gcs_delete_bucket]
delete_bucket.trigger_rule = TriggerRule.ALL_DONE

check_events = OpenLineageTestOperator(
task_id="check_openlineage_events",
file_path=str(Path(__file__).parent / "resources" / "openlineage_gcs_upload_download.json"),
)

(
# TEST SETUP
create_bucket
>> upload_file
# TEST BODY
>> download_file
# TEST TEARDOWN
>> delete_bucket
>> [delete_bucket, check_events]
)

from tests_common.test_utils.watcher import watcher
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
[
{
"eventType": "START",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "gcs_upload_download.upload_file",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
},
"inputs": [
{
"namespace": "file",
"name": "{{ result.endswith('airflow/providers/tests/system/google/cloud/gcs/resources/example_upload.txt') }}"
kacpermuda marked this conversation as resolved.
Show resolved Hide resolved
}
],
"outputs": [
{
"namespace": "gs://bucket_gcs_upload_download_default",
"name": "example_upload.txt"
}
]
},
{
"eventType": "COMPLETE",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "gcs_upload_download.upload_file",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
},
"inputs": [
{
"namespace": "file",
"name": "{{ result.endswith('airflow/providers/tests/system/google/cloud/gcs/resources/example_upload.txt') }}"
}
],
"outputs": [
{
"namespace": "gs://bucket_gcs_upload_download_default",
"name": "example_upload.txt"
}
]
},
{
"eventType": "START",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "gcs_upload_download.download_file",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
},
"inputs": [
{
"namespace": "gs://bucket_gcs_upload_download_default",
"name": "example_upload.txt"
}
],
"outputs": [
{
"namespace": "file",
"name": "example_upload_download.txt"
}
]
},

{
"eventType": "COMPLETE",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "gcs_upload_download.download_file",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
},
"inputs": [
{
"namespace": "gs://bucket_gcs_upload_download_default",
"name": "example_upload.txt"
}
],
"outputs": [
{
"namespace": "file",
"name": "example_upload_download.txt"
}
]
}
]
2 changes: 2 additions & 0 deletions providers/tests/system/google/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import pytest

from providers.tests.system.openlineage.conftest import set_transport_variable # noqa: F401

REQUIRED_ENV_VARS = ("SYSTEM_TESTS_GCP_PROJECT",)


Expand Down
13 changes: 12 additions & 1 deletion providers/tests/system/openlineage/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import json
import logging
import os
import re
import uuid
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
Expand Down Expand Up @@ -63,6 +64,15 @@ def is_uuid(result: Any) -> str:
return "false"


def regex_match(result: Any, pattern: str) -> str:
try:
if re.match(pattern=pattern, string=result):
return "true"
except Exception:
pass
return "false"


def env_var(var: str, default: str | None = None) -> str:
"""
Use this jinja method to access the environment variable named 'var'.
Expand Down Expand Up @@ -97,6 +107,7 @@ def setup_jinja() -> Environment:
env.globals["any"] = any
env.globals["is_datetime"] = is_datetime
env.globals["is_uuid"] = is_uuid
env.globals["regex_match"] = regex_match
env.globals["env_var"] = env_var
env.globals["not_match"] = not_match
env.filters["url_scheme_authority"] = url_scheme_authority
Expand Down Expand Up @@ -151,7 +162,7 @@ def match(expected, result, env: Environment) -> bool:
except ValueError as e:
log.error("Error rendering jinja template %s: %s", expected, e)
return False
if rendered == "true" or rendered == result:
if str(rendered).lower() == "true" or rendered == result:
return True
log.error("Rendered value %s does not equal 'true' or %s", rendered, result)
return False
Expand Down