Skip to content

Commit a2e3420

Browse files
committed
SEDONA-725 Add pyflink to Sedona.
SEDONA-725 rearrange the spark module
1 parent ca9ca30 commit a2e3420

File tree

7 files changed

+288
-1
lines changed

7 files changed

+288
-1
lines changed

.github/workflows/lint.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ jobs:
3535
uses: actions/checkout@v4
3636
- uses: actions/setup-python@v5 # https://www.python.org/
3737
with:
38-
python-version: '3.x' # Version range or exact version of a Python version to use, using SemVer's version range syntax
38+
python-version: '3.10' # Version range or exact version of a Python version to use, using SemVer's version range syntax
3939
architecture: 'x64' # optional x64 or x86. Defaults to x64 if not specified
4040
- name: Install dependencies # https://pip.pypa.io/en/stable/
4141
run: |

.github/workflows/pyflink.yml

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
name: Sedona Pyflink Test
19+
20+
on:
21+
push:
22+
branches:
23+
- master
24+
paths:
25+
- 'common/**'
26+
- 'flink/**'
27+
- 'flink-shaded/**'
28+
- 'pom.xml'
29+
- 'python/**'
30+
- '.github/workflows/pyflink.yml'
31+
pull_request:
32+
branches:
33+
- '*'
34+
paths:
35+
- 'common/**'
36+
- 'flink/**'
37+
- 'flink-shaded/**'
38+
- 'pom.xml'
39+
- 'python/**'
40+
- '.github/workflows/pyflink.yml'
41+
42+
concurrency:
43+
group: ${{ github.workflow }}-${{ github.ref }}
44+
cancel-in-progress: true
45+
46+
jobs:
47+
test:
48+
runs-on: ubuntu-22.04
49+
strategy:
50+
matrix:
51+
include:
52+
- python: '3.10'
53+
steps:
54+
- uses: actions/checkout@v4
55+
- uses: actions/setup-java@v4
56+
with:
57+
distribution: 'zulu'
58+
java-version: '8'
59+
- uses: actions/setup-python@v5
60+
with:
61+
python-version: ${{ matrix.python }}
62+
- run: sudo apt-get -y install python3-pip python-dev-is-python3
63+
- run: mvn package -pl "org.apache.sedona:sedona-flink-shaded_2.12" -am -DskipTests
64+
- run: sudo pip3 install -U setuptools
65+
- run: sudo pip3 install -U wheel
66+
- run: sudo pip3 install -U virtualenvwrapper
67+
- run: python3 -m pip install uv
68+
- run: cd python
69+
- run: rm pyproject.toml
70+
- run: uv init --no-workspace
71+
- run: uv add apache-flink==1.20.1 shapely attr setuptools
72+
- run: uv add pytest --dev
73+
- run: |
74+
wget https://repo1.maven.org/maven2/org/datasyslab/geotools-wrapper/1.7.1-28.5/geotools-wrapper-1.7.1-28.5.jar
75+
export SEDONA_PYFLINK_EXTRA_JARS=${PWD}/$(find flink-shaded/target -name sedona-flink*.jar),${PWD}/geotools-wrapper-1.7.1-28.5.jar
76+
(cd python; PYTHONPATH=$(pwd) uv run pytest -v -s ./tests/flink)

python/sedona/flink/__init__.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import warnings
19+
20+
try:
21+
import pyflink
22+
from sedona.flink.context import SedonaContext
23+
except ImportError:
24+
warnings.warn(
25+
"Apache Sedona requires Pyflink. Please install PyFlink before using Sedona flink.",
26+
DeprecationWarning,
27+
stacklevel=2,
28+
)
29+
30+
__all__ = ["SedonaContext"]

python/sedona/flink/context.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import warnings
19+
20+
try:
21+
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
22+
from pyflink.datastream import StreamExecutionEnvironment
23+
from pyflink.java_gateway import get_gateway
24+
except ImportError:
25+
StreamTableEnvironment = None
26+
StreamExecutionEnvironment = None
27+
EnvironmentSettings = None
28+
warnings.warn(
29+
"Apache Sedona requires Pyflink. Please install PyFlink before using Sedona flink.",
30+
DeprecationWarning,
31+
stacklevel=2,
32+
)
33+
34+
35+
class SedonaContext:
36+
37+
@classmethod
38+
def create(
39+
cls, env: StreamExecutionEnvironment, settings: EnvironmentSettings
40+
) -> StreamTableEnvironment:
41+
table_env = StreamTableEnvironment.create(env, settings)
42+
gateway = get_gateway()
43+
44+
flink_sedona_context = gateway.jvm.org.apache.sedona.flink.SedonaContext
45+
46+
table_env_j = flink_sedona_context.create(
47+
env._j_stream_execution_environment, table_env._j_tenv
48+
)
49+
50+
table_env._j_tenv = table_env_j
51+
52+
return table_env

python/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
"spark": ["pyspark>=2.3.0"],
6464
"pydeck-map": ["geopandas", "pydeck==0.8.0"],
6565
"kepler-map": ["geopandas", "keplergl==0.3.2"],
66+
"flink": ["apache-flink>=1.19.0"],
6667
"all": [
6768
"pyspark>=2.3.0",
6869
"geopandas",

python/tests/flink/conftest.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import os
19+
20+
import pytest
21+
22+
23+
EXTRA_JARS = os.getenv("SEDONA_PYFLINK_EXTRA_JARS")
24+
25+
26+
def has_pyflink():
27+
try:
28+
import pyflink
29+
except ImportError:
30+
return False
31+
return True
32+
33+
34+
if has_pyflink():
35+
from sedona.flink import SedonaContext
36+
37+
try:
38+
from pyflink.datastream import StreamExecutionEnvironment
39+
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
40+
except ImportError:
41+
pytest.skip("PyFlink is not installed. Skipping tests that require PyFlink.")
42+
43+
@pytest.fixture(scope="module")
44+
def flink_settings():
45+
return EnvironmentSettings.in_streaming_mode()
46+
47+
@pytest.fixture(scope="module")
48+
def stream_env() -> StreamExecutionEnvironment:
49+
env = StreamExecutionEnvironment.get_execution_environment()
50+
jars = EXTRA_JARS.split(",") if EXTRA_JARS else []
51+
for jar in jars:
52+
env.add_jars(f"file://{jar}")
53+
54+
return env
55+
56+
@pytest.fixture(scope="module")
57+
def table_env(
58+
stream_env: StreamExecutionEnvironment, flink_settings: EnvironmentSettings
59+
) -> StreamTableEnvironment:
60+
return SedonaContext.create(stream_env, flink_settings)
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from shapely.wkb import loads
19+
import pytest
20+
21+
from tests.flink.conftest import has_pyflink
22+
23+
if not has_pyflink():
24+
pytest.skip(
25+
"PyFlink is not installed. Skipping tests that require PyFlink.",
26+
allow_module_level=True,
27+
)
28+
29+
30+
def test_register(table_env):
31+
result = (
32+
table_env.sql_query("SELECT ST_ASBinary(ST_Point(1.0, 2.0))")
33+
.execute()
34+
.collect()
35+
)
36+
37+
assert 1 == len([el for el in result])
38+
39+
40+
def test_register_udf(table_env):
41+
from pyflink.table.udf import ScalarFunction, udf
42+
43+
class Buffer(ScalarFunction):
44+
def eval(self, s):
45+
geom = loads(s)
46+
return geom.buffer(1).wkb
47+
48+
table_env.create_temporary_function(
49+
"ST_BufferPython", udf(Buffer(), result_type="Binary")
50+
)
51+
52+
buffer_table = table_env.sql_query(
53+
"SELECT ST_BufferPython(ST_ASBinary(ST_Point(1.0, 2.0))) AS buffer"
54+
)
55+
56+
table_env.create_temporary_view("buffer_table", buffer_table)
57+
58+
result = (
59+
table_env.sql_query("SELECT ST_Area(ST_GeomFromWKB(buffer)) FROM buffer_table")
60+
.execute()
61+
.collect()
62+
)
63+
64+
items = [el for el in result]
65+
area = items[0][0]
66+
67+
assert 3.12 < area < 3.14
68+
assert 1 == len(items)

0 commit comments

Comments
 (0)