Skip to content

Commit 18251b0

Browse files
committed
[Feature][Transform][python] add data on thread
1 parent 5b7da6a commit 18251b0

File tree

3 files changed

+87
-12
lines changed

3 files changed

+87
-12
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seatunnel.transform.python;
18+
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
23+
public class EndTagList {
24+
private final List<Object> list;
25+
private volatile boolean end;
26+
27+
public EndTagList(){
28+
this.list = new ArrayList<>();
29+
this.end = false;
30+
}
31+
32+
public EndTagList(List<Object> dataList){
33+
this.list = dataList;
34+
this.end = false;
35+
}
36+
37+
public List<Object> getList() {
38+
return list;
39+
}
40+
41+
public boolean isEnd() {
42+
return end;
43+
}
44+
45+
public void end(){
46+
this.end = true;
47+
}
48+
49+
public void add(Object obj) {
50+
this.list.add(obj);
51+
}
52+
}

Diff for: seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/python/PythonOperationProxy.java

+34-11
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@
1717

1818
package org.apache.seatunnel.transform.python;
1919

20+
import lombok.extern.slf4j.Slf4j;
2021
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
2122
import py4j.GatewayServer;
2223

24+
import java.util.List;
2325
import java.util.Map;
2426
import java.util.concurrent.ConcurrentHashMap;
2527

28+
@Slf4j
2629
public class PythonOperationProxy implements RowOperation {
2730

2831
private GatewayServer javaServer;
29-
private final Map<Long,SeaTunnelRowAccessor> threadDataMap = new ConcurrentHashMap<>();
30-
private final Map<Long,Long> threadDataOffsetMap = new ConcurrentHashMap<>();
32+
private final Map<Long, SeaTunnelRowAccessor> inputDataMap = new ConcurrentHashMap<>();
33+
34+
private final Map<Long, EndTagList> outputDataMap = new ConcurrentHashMap<>();
3135
private PythonOperationProxy(){
3236
if (INSTANCE != null) {
3337
throw new RuntimeException("Please use newInstance() method for getting the single instance of this class.");
@@ -51,14 +55,33 @@ public void shutdown(){
5155
}
5256

5357
public void putThreadData(long threadId, SeaTunnelRowAccessor inputRow) {
54-
this.threadDataMap.put(threadId,inputRow);
55-
this.threadDataOffsetMap.compute(threadId,(thread,offset) ->{
56-
if (offset == null){
57-
offset = 0L;
58-
}else {
59-
offset = offset + 1;
60-
}
61-
return offset;
62-
});
58+
this.inputDataMap.put(threadId,inputRow);
59+
}
60+
61+
public Object[] getOutputData(long threadId) {
62+
EndTagList endTagList = outputDataMap.get(threadId);
63+
while (endTagList == null || !endTagList.isEnd()){
64+
log.info("wait python process data");
65+
}
66+
return outputDataMap.get(threadId).getList().toArray(new Object[0]);
67+
}
68+
69+
public void addData(Long threadId,Object obj){
70+
EndTagList array = this.outputDataMap.getOrDefault(threadId, new EndTagList());
71+
array.add(obj);
72+
this.outputDataMap.put(threadId,array);
73+
}
74+
75+
public void addDataList(Long threadId,List<Object> dataList){
76+
EndTagList array = new EndTagList();
77+
this.outputDataMap.put(threadId,array);
78+
}
79+
80+
81+
public void end(Long threadId){
82+
EndTagList endTagList = this.outputDataMap.get(threadId);
83+
if (endTagList != null){
84+
endTagList.end();
85+
}
6386
}
6487
}

Diff for: seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/python/PythonTransform.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public String getPluginName() {
3434
protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
3535
long threadId = Thread.currentThread().getId();
3636
pythonOperationProxy.putThreadData(threadId,inputRow);
37-
return new Object[0];
37+
return pythonOperationProxy.getOutputData(threadId);
3838
}
3939

4040
@Override

0 commit comments

Comments
 (0)