File tree 2 files changed +23
-3
lines changed
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/python
2 files changed +23
-3
lines changed Original file line number Diff line number Diff line change 17
17
18
18
package org .apache .seatunnel .transform .python ;
19
19
20
+ import org .apache .seatunnel .api .table .type .SeaTunnelRowAccessor ;
20
21
import py4j .GatewayServer ;
21
22
23
+ import java .util .Map ;
24
+ import java .util .concurrent .ConcurrentHashMap ;
25
+
22
26
public class PythonOperationProxy implements RowOperation {
23
27
24
28
private GatewayServer javaServer ;
29
+ private final Map <Long ,SeaTunnelRowAccessor > threadDataMap = new ConcurrentHashMap <>();
30
+ private final Map <Long ,Long > threadDataOffsetMap = new ConcurrentHashMap <>();
25
31
private PythonOperationProxy (){
26
32
if (INSTANCE != null ) {
27
33
throw new RuntimeException ("Please use newInstance() method for getting the single instance of this class." );
@@ -43,4 +49,16 @@ public static PythonOperationProxy newInstance(Integer javaServerPort) {
43
49
public void shutdown (){
44
50
this .javaServer .shutdown ();
45
51
}
52
+
53
+ public void putThreadData (long threadId , SeaTunnelRowAccessor inputRow ) {
54
+ this .threadDataMap .put (threadId ,inputRow );
55
+ this .threadDataOffsetMap .compute (threadId ,(thread ,offset ) ->{
56
+ if (offset == null ){
57
+ offset = 0L ;
58
+ }else {
59
+ offset = offset + 1 ;
60
+ }
61
+ return offset ;
62
+ });
63
+ }
46
64
}
Original file line number Diff line number Diff line change @@ -11,14 +11,14 @@ public class PythonTransform extends MultipleFieldOutputTransform {
11
11
12
12
public static final String PLUGIN_NAME = "Python" ;
13
13
14
- private final PythonTransformConfig config ;
14
+ private final PythonTransformConfig transformConfig ;
15
15
16
16
private final PythonOperationProxy pythonOperationProxy ;
17
17
18
18
public PythonTransform (@ NonNull CatalogTable inputCatalogTable , PythonTransformConfig transformConfig ) {
19
19
super (inputCatalogTable , transformConfig .getErrorHandleWay ());
20
- this .config = transformConfig ;
21
- pythonOperationProxy = initLocalSingletonJavaServer (config .getJavaServerPort ());
20
+ this .transformConfig = transformConfig ;
21
+ this . pythonOperationProxy = initLocalSingletonJavaServer (transformConfig .getJavaServerPort ());
22
22
}
23
23
24
24
private PythonOperationProxy initLocalSingletonJavaServer (Integer javaServerPort ) {
@@ -32,6 +32,8 @@ public String getPluginName() {
32
32
33
33
@ Override
34
34
protected Object [] getOutputFieldValues (SeaTunnelRowAccessor inputRow ) {
35
+ long threadId = Thread .currentThread ().getId ();
36
+ pythonOperationProxy .putThreadData (threadId ,inputRow );
35
37
return new Object [0 ];
36
38
}
37
39
You can’t perform that action at this time.
0 commit comments