Skip to content

Commit 233b885

Browse files
committed
Refactor share objects infra to resolve challenges.
1 parent e098994 commit 233b885

24 files changed

Lines changed: 1472 additions & 825 deletions

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/PoolID.java renamed to flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/AbstractSharedObjectsOneInputStreamOperator.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@
1818

1919
package org.apache.flink.ml.common.sharedobjects;
2020

21-
import org.apache.flink.util.AbstractID;
21+
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
2222

23-
/** ID of a pool for shared objects. */
24-
class PoolID extends AbstractID {
25-
private static final long serialVersionUID = 1L;
23+
import java.util.List;
2624

27-
public PoolID(byte[] bytes) {
28-
super(bytes);
29-
}
25+
/** The base class for {@link OneInputStreamOperator}s where shared objects are accessed. */
26+
public abstract class AbstractSharedObjectsOneInputStreamOperator<IN, OUT>
27+
extends AbstractSharedObjectsStreamOperator<OUT>
28+
implements OneInputStreamOperator<IN, OUT> {
3029

31-
public PoolID() {}
30+
public abstract List<ReadRequest<?>> readRequestsInProcessElement();
3231
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.ml.common.sharedobjects;
20+
21+
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
22+
23+
import java.util.UUID;
24+
25+
/**
26+
* A base class of stream operators where shared objects are required.
27+
*
28+
* <p>Official subclasses, i.e., {@link AbstractSharedObjectsOneInputStreamOperator} and {@link
29+
* AbstractSharedObjectsTwoInputStreamOperator}, are strongly recommended.
30+
*
31+
* <p>If you are going to implement a subclass by yourself, you have to handle potential deadlocks.
32+
*/
33+
public abstract class AbstractSharedObjectsStreamOperator<OUT> extends AbstractStreamOperator<OUT> {
34+
35+
/**
36+
* A unique identifier for the instance, which is kept unchanged between client side and
37+
* runtime.
38+
*/
39+
private final String accessorID;
40+
41+
/** The context for shared objects reads/writes. */
42+
protected transient SharedObjectsContext context;
43+
44+
AbstractSharedObjectsStreamOperator() {
45+
super();
46+
accessorID = getClass().getSimpleName() + "-" + UUID.randomUUID();
47+
}
48+
49+
void onSharedObjectsContextSet(SharedObjectsContext context) {
50+
this.context = context;
51+
}
52+
53+
String getAccessorID() {
54+
return accessorID;
55+
}
56+
}

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/SharedObjectsStreamOperator.java renamed to flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/AbstractSharedObjectsTwoInputStreamOperator.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,16 @@
1818

1919
package org.apache.flink.ml.common.sharedobjects;
2020

21-
/** Interface for all operators that need to access the shared objects. */
22-
public interface SharedObjectsStreamOperator {
21+
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
2322

24-
/**
25-
* Set the shared objects context in runtime.
26-
*
27-
* @param context The context for shared objects.
28-
*/
29-
void onSharedObjectsContextSet(SharedObjectsContext context);
23+
import java.util.List;
3024

31-
/**
32-
* Get a unique ID to represent the operator instance. The ID must be kept unchanged through its
33-
* lifetime.
34-
*
35-
* @return A unique ID.
36-
*/
37-
String getSharedObjectsAccessorID();
25+
/** The base class for {@link TwoInputStreamOperator}s where shared objects are accessed. */
26+
public abstract class AbstractSharedObjectsTwoInputStreamOperator<IN1, IN2, OUT>
27+
extends AbstractSharedObjectsStreamOperator<OUT>
28+
implements TwoInputStreamOperator<IN1, IN2, OUT> {
29+
30+
public abstract List<ReadRequest<?>> readRequestsInProcessElement1();
31+
32+
public abstract List<ReadRequest<?>> readRequestsInProcessElement2();
3833
}

0 commit comments

Comments
 (0)