Skip to content

Commit 22e31f5

Browse files
committed
tests: Add OpenLineage check in gcs_upload_download system test
Signed-off-by: Kacper Muda <[email protected]>
1 parent d1b2a44 commit 22e31f5

File tree

4 files changed

+144
-2
lines changed

4 files changed

+144
-2
lines changed

providers/tests/system/google/cloud/gcs/example_gcs_upload_download.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from airflow.utils.trigger_rule import TriggerRule
3333

3434
from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
35+
from providers.tests.system.openlineage.operator import OpenLineageTestOperator
3536

3637
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
3738
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
@@ -82,14 +83,19 @@
8283
# [END howto_operator_gcs_delete_bucket]
8384
delete_bucket.trigger_rule = TriggerRule.ALL_DONE
8485

86+
check_events = OpenLineageTestOperator(
87+
task_id="check_openlineage_events",
88+
file_path=str(Path(__file__).parent / "resources" / "openlineage_gcs_upload_download.json"),
89+
)
90+
8591
(
8692
# TEST SETUP
8793
create_bucket
8894
>> upload_file
8995
# TEST BODY
9096
>> download_file
9197
# TEST TEARDOWN
92-
>> delete_bucket
98+
>> [delete_bucket, check_events]
9399
)
94100

95101
from tests_common.test_utils.watcher import watcher
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
[
2+
{
3+
"eventType": "START",
4+
"eventTime": "{{ is_datetime(result) }}",
5+
"run": {
6+
"runId": "{{ is_uuid(result) }}"
7+
},
8+
"job": {
9+
"namespace": "default",
10+
"name": "gcs_upload_download.upload_file",
11+
"facets": {
12+
"jobType": {
13+
"integration": "AIRFLOW",
14+
"jobType": "TASK",
15+
"processingType": "BATCH"
16+
}
17+
}
18+
},
19+
"inputs": [
20+
{
21+
"namespace": "file",
22+
"name": "{{ result.endswith('airflow/providers/tests/system/google/cloud/gcs/resources/example_upload.txt') }}"
23+
}
24+
],
25+
"outputs": [
26+
{
27+
"namespace": "gs://bucket_gcs_upload_download_default",
28+
"name": "example_upload.txt"
29+
}
30+
]
31+
},
32+
{
33+
"eventType": "COMPLETE",
34+
"eventTime": "{{ is_datetime(result) }}",
35+
"run": {
36+
"runId": "{{ is_uuid(result) }}"
37+
},
38+
"job": {
39+
"namespace": "default",
40+
"name": "gcs_upload_download.upload_file",
41+
"facets": {
42+
"jobType": {
43+
"integration": "AIRFLOW",
44+
"jobType": "TASK",
45+
"processingType": "BATCH"
46+
}
47+
}
48+
},
49+
"inputs": [
50+
{
51+
"namespace": "file",
52+
"name": "{{ result.endswith('airflow/providers/tests/system/google/cloud/gcs/resources/example_upload.txt') }}"
53+
}
54+
],
55+
"outputs": [
56+
{
57+
"namespace": "gs://bucket_gcs_upload_download_default",
58+
"name": "example_upload.txt"
59+
}
60+
]
61+
},
62+
{
63+
"eventType": "START",
64+
"eventTime": "{{ is_datetime(result) }}",
65+
"run": {
66+
"runId": "{{ is_uuid(result) }}"
67+
},
68+
"job": {
69+
"namespace": "default",
70+
"name": "gcs_upload_download.download_file",
71+
"facets": {
72+
"jobType": {
73+
"integration": "AIRFLOW",
74+
"jobType": "TASK",
75+
"processingType": "BATCH"
76+
}
77+
}
78+
},
79+
"inputs": [
80+
{
81+
"namespace": "gs://bucket_gcs_upload_download_default",
82+
"name": "example_upload.txt"
83+
}
84+
],
85+
"outputs": [
86+
{
87+
"namespace": "file",
88+
"name": "example_upload_download.txt"
89+
}
90+
]
91+
},
92+
93+
{
94+
"eventType": "COMPLETE",
95+
"eventTime": "{{ is_datetime(result) }}",
96+
"run": {
97+
"runId": "{{ is_uuid(result) }}"
98+
},
99+
"job": {
100+
"namespace": "default",
101+
"name": "gcs_upload_download.download_file",
102+
"facets": {
103+
"jobType": {
104+
"integration": "AIRFLOW",
105+
"jobType": "TASK",
106+
"processingType": "BATCH"
107+
}
108+
}
109+
},
110+
"inputs": [
111+
{
112+
"namespace": "gs://bucket_gcs_upload_download_default",
113+
"name": "example_upload.txt"
114+
}
115+
],
116+
"outputs": [
117+
{
118+
"namespace": "file",
119+
"name": "example_upload_download.txt"
120+
}
121+
]
122+
}
123+
]

providers/tests/system/google/conftest.py

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import pytest
2020

21+
from providers.tests.system.openlineage.conftest import set_transport_variable # noqa: F401
22+
2123
REQUIRED_ENV_VARS = ("SYSTEM_TESTS_GCP_PROJECT",)
2224

2325

providers/tests/system/openlineage/operator.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import json
2121
import logging
2222
import os
23+
import re
2324
import uuid
2425
from typing import TYPE_CHECKING, Any
2526
from urllib.parse import urlparse
@@ -63,6 +64,15 @@ def is_uuid(result: Any) -> str:
6364
return "false"
6465

6566

67+
def regex_match(result: Any, pattern: str) -> str:
68+
try:
69+
if re.match(pattern=pattern, string=result):
70+
return "true"
71+
except Exception:
72+
pass
73+
return "false"
74+
75+
6676
def env_var(var: str, default: str | None = None) -> str:
6777
"""
6878
Use this jinja method to access the environment variable named 'var'.
@@ -97,6 +107,7 @@ def setup_jinja() -> Environment:
97107
env.globals["any"] = any
98108
env.globals["is_datetime"] = is_datetime
99109
env.globals["is_uuid"] = is_uuid
110+
env.globals["regex_match"] = regex_match
100111
env.globals["env_var"] = env_var
101112
env.globals["not_match"] = not_match
102113
env.filters["url_scheme_authority"] = url_scheme_authority
@@ -151,7 +162,7 @@ def match(expected, result, env: Environment) -> bool:
151162
except ValueError as e:
152163
log.error("Error rendering jinja template %s: %s", expected, e)
153164
return False
154-
if rendered == "true" or rendered == result:
165+
if str(rendered).lower() == "true" or rendered == result:
155166
return True
156167
log.error("Rendered value %s does not equal 'true' or %s", rendered, result)
157168
return False

0 commit comments

Comments
 (0)