Skip to content

Commit c1082b6

Browse files
committed
Resolving deadlock.
1 parent 5a210ff commit c1082b6

15 files changed

Lines changed: 433 additions & 206 deletions

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/AbstractSharedObjectsOneInputStreamOperator.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,14 @@
1818

1919
package org.apache.flink.ml.common.sharedobjects;
2020

21-
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
2221
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
23-
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
2422

25-
import java.lang.reflect.Method;
26-
import java.util.Collections;
2723
import java.util.List;
28-
import java.util.Map;
2924

30-
/**
31-
* A default implementation of {@link AbstractStreamOperator} which implements {@link
32-
* SharedObjectsStreamOperator}. Use this class to reduce boilerplate codes.
33-
*/
25+
/** The base class for all {@link OneInputStreamOperator} if shared objects are required. */
3426
public abstract class AbstractSharedObjectsOneInputStreamOperator<IN, OUT>
3527
extends AbstractSharedObjectsStreamOperator<OUT>
3628
implements OneInputStreamOperator<IN, OUT> {
3729

3830
public abstract List<ItemReadRequest<?>> registerReadsInProcessElement();
39-
40-
@Override
41-
public Map<Method, List<ItemReadRequest<?>>> registerReads() {
42-
try {
43-
return Collections.singletonMap(
44-
OneInputStreamOperator.class.getMethod("processElement", StreamRecord.class),
45-
registerReadsInProcessElement());
46-
} catch (NoSuchMethodException e) {
47-
throw new RuntimeException(e);
48-
}
49-
}
5031
}

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/AbstractSharedObjectsStreamOperator.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@
2323
import java.util.UUID;
2424

2525
/**
26-
* A default implementation of {@link AbstractStreamOperator} which implements {@link
27-
* SharedObjectsStreamOperator}.
26+
* A base class of stream operators which support shared objects.
27+
*
28+
* <p>Official subclasses, i.e., {@link AbstractSharedObjectsOneInputStreamOperator} and {@link
29+
* AbstractSharedObjectsTwoInputStreamOperator}, are strongly recommended.
30+
*
31+
* <p>If you are going to implement a subclass by yourself, you have to handle potential deadlocks.
2832
*/
29-
abstract class AbstractSharedObjectsStreamOperator<OUT> extends AbstractStreamOperator<OUT>
30-
implements SharedObjectsStreamOperator {
33+
public abstract class AbstractSharedObjectsStreamOperator<OUT> extends AbstractStreamOperator<OUT> {
3134

3235
private final String sharedObjectsAccessorID;
3336
private transient SharedObjectsContext sharedObjectsContext;
@@ -37,7 +40,7 @@ protected AbstractSharedObjectsStreamOperator() {
3740
sharedObjectsAccessorID = getClass().getSimpleName() + "-" + UUID.randomUUID();
3841
}
3942

40-
public void onSharedObjectsContextSet(SharedObjectsContext context) {
43+
protected void onSharedObjectsContextSet(SharedObjectsContext context) {
4144
sharedObjectsContext = context;
4245
}
4346

flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/AbstractSharedObjectsTwoInputStreamOperator.java

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,40 +18,16 @@
1818

1919
package org.apache.flink.ml.common.sharedobjects;
2020

21-
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
2221
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
23-
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
2422

25-
import java.lang.reflect.Method;
26-
import java.util.HashMap;
2723
import java.util.List;
28-
import java.util.Map;
2924

30-
/**
31-
* A default implementation of {@link AbstractStreamOperator} which implements {@link
32-
* SharedObjectsStreamOperator}. Use this class to reduce boilerplate codes.
33-
*/
25+
/** The base class for all {@link TwoInputStreamOperator} if shared objects are required. */
3426
public abstract class AbstractSharedObjectsTwoInputStreamOperator<IN1, IN2, OUT>
3527
extends AbstractSharedObjectsStreamOperator<OUT>
3628
implements TwoInputStreamOperator<IN1, IN2, OUT> {
3729

3830
public abstract List<ItemReadRequest<?>> registerReadsInProcessElement1();
3931

4032
public abstract List<ItemReadRequest<?>> registerReadsInProcessElement2();
41-
42-
@Override
43-
public Map<Method, List<ItemReadRequest<?>>> registerReads() {
44-
Map<Method, List<ItemReadRequest<?>>> m = new HashMap<>();
45-
try {
46-
m.put(
47-
TwoInputStreamOperator.class.getMethod("processElement1", StreamRecord.class),
48-
registerReadsInProcessElement1());
49-
m.put(
50-
TwoInputStreamOperator.class.getMethod("processElement2", StreamRecord.class),
51-
registerReadsInProcessElement2());
52-
} catch (NoSuchMethodException e) {
53-
throw new RuntimeException(e);
54-
}
55-
return m;
56-
}
5733
}

0 commit comments

Comments
 (0)