|
2 | 2 |
|
3 | 3 | import pytest
|
4 | 4 |
|
5 |
| -try: |
6 |
| - from sedona.flink import SedonaContext |
7 |
| -except ImportError: |
8 |
| - pass |
9 |
| - |
10 |
| -from pyflink.datastream import StreamExecutionEnvironment |
11 |
| -from pyflink.table import EnvironmentSettings, StreamTableEnvironment |
12 |
| - |
13 | 5 |
|
14 | 6 | EXTRA_JARS = os.getenv("SEDONA_PYFLINK_EXTRA_JARS")
|
15 | 7 |
|
16 | 8 |
|
17 |
| -@pytest.fixture(scope="module") |
18 |
| -def flink_settings(): |
19 |
| - return EnvironmentSettings.in_streaming_mode() |
20 |
| - |
| 9 | +def has_pyflink(): |
| 10 | + try: |
| 11 | + import pyflink |
| 12 | + except ImportError: |
| 13 | + return False |
| 14 | + return True |
21 | 15 |
|
22 |
| -@pytest.fixture(scope="module") |
23 |
| -def stream_env() -> StreamExecutionEnvironment: |
24 |
| - env = StreamExecutionEnvironment.get_execution_environment() |
25 |
| - jars = EXTRA_JARS.split(",") if EXTRA_JARS else [] |
26 |
| - for jar in jars: |
27 |
| - env.add_jars(f"file://{jar}") |
28 | 16 |
|
29 |
| - return env |
30 |
| - |
31 |
| - |
32 |
| -@pytest.fixture(scope="module") |
33 |
| -def table_env( |
34 |
| - stream_env: StreamExecutionEnvironment, flink_settings: EnvironmentSettings |
35 |
| -) -> StreamTableEnvironment: |
36 |
| - return SedonaContext.create(stream_env, flink_settings) |
| 17 | +if has_pyflink(): |
| 18 | + from sedona.flink import SedonaContext |
| 19 | + from pyflink.datastream import StreamExecutionEnvironment |
| 20 | + from pyflink.table import EnvironmentSettings, StreamTableEnvironment |
| 21 | + |
| 22 | + @pytest.fixture(scope="module") |
| 23 | + def flink_settings(): |
| 24 | + return EnvironmentSettings.in_streaming_mode() |
| 25 | + |
| 26 | + @pytest.fixture(scope="module") |
| 27 | + def stream_env() -> StreamExecutionEnvironment: |
| 28 | + env = StreamExecutionEnvironment.get_execution_environment() |
| 29 | + jars = EXTRA_JARS.split(",") if EXTRA_JARS else [] |
| 30 | + for jar in jars: |
| 31 | + env.add_jars(f"file://{jar}") |
| 32 | + |
| 33 | + return env |
| 34 | + |
| 35 | + @pytest.fixture(scope="module") |
| 36 | + def table_env( |
| 37 | + stream_env: StreamExecutionEnvironment, flink_settings: EnvironmentSettings |
| 38 | + ) -> StreamTableEnvironment: |
| 39 | + return SedonaContext.create(stream_env, flink_settings) |
0 commit comments