Skip to content

Commit dd8c603

Browse files
committed
把MatchWindow放在FlowSessionCache统一管理
1 parent 6fe7ee9 commit dd8c603

3 files changed

Lines changed: 36 additions & 30 deletions

File tree

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66

77
package modelengine.fit.waterflow.domain.context;
88

9+
import modelengine.fit.waterflow.domain.context.repo.flowsession.FlowSessionRepo;
10+
911
import java.util.HashSet;
1012
import java.util.List;
1113
import java.util.Map;
1214
import java.util.Set;
1315
import java.util.UUID;
14-
import java.util.concurrent.ConcurrentHashMap;
1516
import java.util.stream.Collectors;
1617

1718
/**
@@ -21,8 +22,6 @@
2122
* @since 1.0
2223
*/
2324
public class MatchWindow extends Window {
24-
private static final Map<String, MatchWindow> all = new ConcurrentHashMap<>();
25-
2625
private final Set<MatchWindow> arms = new HashSet<>();
2726

2827
/**
@@ -41,24 +40,26 @@ public MatchWindow(Window source, UUID id, Object data) {
4140
/**
4241
* 创建一个MatchWindow
4342
*
43+
* @param flowId 流程ID
4444
* @param source 源窗口
4545
* @param id 窗口ID
4646
* @param data 窗口数据
4747
* @return 返回创建的MatchWindow对象
4848
*/
49-
public static synchronized MatchWindow from(Window source, UUID id, Object data) {
50-
// Use composite key: sessionId + UUID to prevent cross-session pollution
51-
String cacheKey = source.getSession().getId() + ":" + id.toString();
52-
MatchWindow window = all.get(cacheKey);
49+
public static synchronized MatchWindow from(String flowId, Window source, UUID id, Object data) {
50+
// 从 FlowSessionRepo 获取缓存
51+
Map<UUID, MatchWindow> cache = FlowSessionRepo.getMatchWindowCache(flowId, source.getSession());
52+
53+
MatchWindow window = cache.get(id);
5354
if (window == null) {
5455
window = new MatchWindow(source, id, data);
5556
FlowSession session = new FlowSession(source.getSession());
5657
session.setWindow(window);
57-
all.put(cacheKey, window);
58+
cache.put(id, window);
5859
}
5960
WindowToken token = window.createToken();
6061
token.beginConsume();
61-
List<MatchWindow> arms = all.values().stream().filter(t -> t.from == source).collect(Collectors.toList());
62+
List<MatchWindow> arms = cache.values().stream().filter(t -> t.from == source).collect(Collectors.toList());
6263
for (MatchWindow a : arms) {
6364
a.setArms(arms);
6465
}
@@ -82,23 +83,4 @@ public void complete() {
8283
public boolean fulfilled() {
8384
return this.from.isComplete() && this.from.isOngoing();
8485
}
85-
86-
@Override
87-
public void tryFinish() {
88-
super.tryFinish();
89-
// 当 MatchWindow 完全完成时,从缓存中移除以防止内存泄漏
90-
if (this.isDone()) {
91-
cleanup();
92-
}
93-
}
94-
95-
/**
96-
* 从缓存中移除当前 MatchWindow
97-
*/
98-
private void cleanup() {
99-
if (this.getSession() != null) {
100-
String cacheKey = this.getSession().getId() + ":" + this.key().toString();
101-
all.remove(cacheKey);
102-
}
103-
}
10486
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowsession/FlowSessionRepo.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import modelengine.fit.waterflow.domain.context.FlatMapSourceWindow;
1010
import modelengine.fit.waterflow.domain.context.FlowSession;
11+
import modelengine.fit.waterflow.domain.context.MatchWindow;
1112
import modelengine.fit.waterflow.domain.context.Window;
1213
import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo;
1314
import modelengine.fitframework.inspection.Validation;
@@ -90,6 +91,19 @@ public static FlatMapSourceWindow getFlatMapSource(String flowId, Window window,
9091
.getFlatMapSourceWindow(window, repo);
9192
}
9293

94+
/**
95+
* 获取 MatchWindow 缓存 Map,用于存储和检索 MatchWindow 实例
96+
*
97+
* @param flowId The unique identifier of the flow.
98+
* @param session The current session context.
99+
* @return MatchWindow 缓存 Map
100+
*/
101+
public static Map<UUID, MatchWindow> getMatchWindowCache(String flowId, FlowSession session) {
102+
Validation.notNull(flowId, "Flow id cannot be null.");
103+
Validation.notNull(session, "Session cannot be null.");
104+
return getFlowSessionCache(flowId, session).getMatchWindowCache();
105+
}
106+
93107
/**
94108
* Releases all resources associated with a specific flow session.
95109
*
@@ -137,6 +151,12 @@ private static class FlowSessionCache {
137151
*/
138152
private final Map<UUID, FlatMapSourceWindow> flatMapSourceWindows = new ConcurrentHashMap<>();
139153

154+
/**
155+
* 记录流程中条件匹配节点产生的窗口信息,用于将同一批数据汇聚。
156+
* 其中索引为 match window 的唯一标识。
157+
*/
158+
private final Map<UUID, MatchWindow> matchWindows = new ConcurrentHashMap<>();
159+
140160
private final Map<String, Integer> accOrders = new ConcurrentHashMap<>();
141161

142162
private FlowSession getNextToSession(FlowSession session) {
@@ -165,6 +185,10 @@ private FlatMapSourceWindow getFlatMapSourceWindow(Window window, FlowContextRep
165185
});
166186
}
167187

188+
private Map<UUID, MatchWindow> getMatchWindowCache() {
189+
return this.matchWindows;
190+
}
191+
168192
private int getNextAccOrder(String nodeId) {
169193
return this.accOrders.compute(nodeId, (key, value) -> {
170194
if (value == null) {

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/MatchHappen.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ public MatchHappen<O, D, I, F> match(Operators.Whether<I> whether,
5050
Operators.BranchProcessor<O, D, I, F> processor) {
5151
UUID id = UUID.randomUUID();
5252
State<I, D, I, F> branchStart = new State<>(this.node.publisher()
53-
.just(input -> input.setSession(
54-
MatchWindow.from(input.getWindow(), id, input.getData()).getSession()), whether)
53+
.just(input -> input.setSession(MatchWindow.from(this.node.processor.getStreamId(),
54+
input.getWindow(), id, input.getData()).getSession()), whether)
5555
.displayAs(SpecialDisplayNode.BRANCH.name()), this.node.getFlow());
5656
State<O, D, ?, F> branch = processor.process(branchStart);
5757
this.branches.add(branch);

0 commit comments

Comments
 (0)