Skip to content

Commit 7fdd31d

Browse files
committed
[Feature][Transform][python] init PythonOperationProxy
1 parent 2f918d2 commit 7fdd31d

File tree

4 files changed

+56
-3
lines changed

4 files changed

+56
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seatunnel.transform.jsonpath;
18+
19+
20+
public class SingletonServer {
21+
}

Diff for: seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/python/PythonOperationProxy.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,30 @@
1717

1818
package org.apache.seatunnel.transform.python;
1919

20-
public class PythonOperationProxy implements RowOperation {}
20+
import py4j.GatewayServer;
21+
22+
public class PythonOperationProxy implements RowOperation {
23+
24+
private GatewayServer javaServer;
25+
private PythonOperationProxy(){
26+
if (INSTANCE != null) {
27+
throw new RuntimeException("Please use newInstance() method for getting the single instance of this class.");
28+
}
29+
}
30+
private static volatile PythonOperationProxy INSTANCE;
31+
public static PythonOperationProxy newInstance(Integer javaServerPort) {
32+
if (INSTANCE == null){
33+
synchronized (PythonOperationProxy.class){
34+
if (INSTANCE == null){
35+
PythonOperationProxy operationProxy = new PythonOperationProxy();
36+
operationProxy.javaServer = new GatewayServer(operationProxy,javaServerPort);
37+
}
38+
}
39+
}
40+
return INSTANCE;
41+
}
42+
43+
public void shutdown(){
44+
this.javaServer.shutdown();
45+
}
46+
}

Diff for: seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/python/PythonTransform.java

+7
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,16 @@ public class PythonTransform extends MultipleFieldOutputTransform {
1313

1414
private final PythonTransformConfig config;
1515

16+
private final PythonOperationProxy pythonOperationProxy;
17+
1618
public PythonTransform(@NonNull CatalogTable inputCatalogTable, PythonTransformConfig transformConfig) {
1719
super(inputCatalogTable, transformConfig.getErrorHandleWay());
1820
this.config = transformConfig;
21+
pythonOperationProxy = initLocalSingletonJavaServer(config.getJavaServerPort());
22+
}
23+
24+
private PythonOperationProxy initLocalSingletonJavaServer(Integer javaServerPort) {
25+
return PythonOperationProxy.newInstance(javaServerPort);
1926
}
2027

2128
@Override

Diff for: seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/python/PythonTransformConfig.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ public static PythonTransformConfig of(ReadonlyConfig config) {
105105
private static String loadCodeFromPath(String filePath) {
106106
try {
107107
// 读取整个文件内容到字符串
108-
String code = new String(Files.readAllBytes(Paths.get(filePath)));
109-
return code;
108+
return new String(Files.readAllBytes(Paths.get(filePath)));
110109
} catch (IOException e) {
111110
// 处理可能发生的IO异常
112111
throw new TransformException(LOAD_SOURCE_CODE_FROM_PATH_ERROR,

0 commit comments

Comments
 (0)