Skip to content

Commit 00e2378

Browse files
committed
IGNITE-28681 Operation Context integrated in Ignite internal async structures.
1 parent 2121cea commit 00e2378

48 files changed

Lines changed: 950 additions & 623 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

modules/commons/src/main/java/org/apache/ignite/internal/IgniteInternalWrapper.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,15 @@
2121
public interface IgniteInternalWrapper<T> {
2222
/** @return Wrapped object. */
2323
public T delegate();
24+
25+
/**
26+
* @param target Object to unwrap.
27+
* @return Original object.
28+
*/
29+
public static Object unwrap(Object target) {
30+
while (target instanceof IgniteInternalWrapper)
31+
target = ((IgniteInternalWrapper<?>)target).delegate();
32+
33+
return target;
34+
}
2435
}

modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareInClosure.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,4 @@ public OperationContextAwareInClosure(IgniteInClosure<E> delegate, OperationCont
4343
public static <E> IgniteInClosure<E> wrap(IgniteInClosure<E> delefate) {
4444
return wrap(delefate, OperationContextAwareInClosure::new);
4545
}
46-
4746
}

modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,32 +22,37 @@
2222
import org.apache.ignite.internal.thread.context.OperationContext;
2323
import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
2424

25-
/** */
26-
public abstract class OperationContextAwareWrapper<T> implements IgniteInternalWrapper<T> {
25+
/** Represents wrapper containing an arbitrary object along with {@link OperationContextSnapshot}. */
26+
public class OperationContextAwareWrapper<T> implements IgniteInternalWrapper<T> {
2727
/** */
2828
protected final T delegate;
2929

3030
/** */
3131
protected final OperationContextSnapshot snapshot;
3232

3333
/** */
34+
public OperationContextAwareWrapper(T delegate, OperationContextSnapshot snapshot) {
35+
this.delegate = delegate;
36+
this.snapshot = snapshot;
37+
}
38+
39+
/** {@inheritDoc} */
3440
@Override public T delegate() {
3541
return delegate;
3642
}
3743

3844
/** */
39-
protected OperationContextAwareWrapper(T delegate, OperationContextSnapshot snapshot) {
40-
this.delegate = delegate;
41-
this.snapshot = snapshot;
45+
public OperationContextSnapshot contextSnapshot() {
46+
return snapshot;
4247
}
4348

4449
/** */
45-
protected static <T> T wrap(T delegate, BiFunction<T, OperationContextSnapshot, T> wrapper) {
50+
public static <T> T wrap(T delegate, BiFunction<T, OperationContextSnapshot, T> wrapper) {
4651
return wrap(delegate, wrapper, false);
4752
}
4853

4954
/** */
50-
protected static <T> T wrap(T delegate, BiFunction<T, OperationContextSnapshot, T> wrapper, boolean ignoreEmptyContext) {
55+
public static <T> T wrap(T delegate, BiFunction<T, OperationContextSnapshot, T> wrapper, boolean ignoreEmptyContext) {
5156
if (delegate == null || delegate instanceof OperationContextAwareWrapper)
5257
return delegate;
5358

modules/commons/src/main/java/org/apache/ignite/thread/IgniteThread.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919

2020
import java.util.concurrent.atomic.AtomicLong;
2121
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
22+
import org.apache.ignite.internal.thread.context.OperationContext;
23+
import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper;
2224
import org.apache.ignite.internal.util.typedef.internal.A;
2325
import org.apache.ignite.internal.util.typedef.internal.S;
2426

27+
import static org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable.wrapIfContextNotEmpty;
28+
2529
/**
2630
* This class adds some necessary plumbing on top of the {@link Thread} class.
2731
* Specifically, it adds:
@@ -30,6 +34,7 @@
3034
* <li>Dedicated parent thread group</li>
3135
* <li>Backing interrupted flag</li>
3236
* <li>Name of the grid this thread belongs to</li>
37+
* <li>Automatic capturing of {@link OperationContext} of parent thread</li>
3338
* </ul>
3439
* <b>Note</b>: this class is intended for internal use only.
3540
*/
@@ -76,13 +81,18 @@ public IgniteThread(String igniteInstanceName, String threadName) {
7681
* @param r Runnable to execute.
7782
*/
7883
public IgniteThread(String igniteInstanceName, String threadName, Runnable r) {
79-
this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED);
84+
this(igniteInstanceName, threadName, wrapIfContextNotEmpty(r), GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED);
8085
}
8186

8287
/**
8388
* Creates grid thread with given name for a given Ignite instance with specified
8489
* thread group.
8590
*
91+
* <b>Note</b>: This constructor creates a thread that does NOT automatically acquire the parent thread's Operation
92+
* Context, ensuring that no Operation Context is attached to it at the start of execution. It is used in Ignite
93+
* thread pools and worker threads, which rely on this behavior to avoid unnecessary wrapping
94+
* (see {@link OperationContextAwareWrapper})
95+
*
8696
* @param igniteInstanceName Name of the Ignite instance this thread is created for.
8797
* @param threadName Name of thread.
8898
* @param r Runnable to execute.
@@ -101,20 +111,6 @@ public IgniteThread(String igniteInstanceName, String threadName, Runnable r, in
101111
this.plc = plc;
102112
}
103113

104-
/**
105-
* @param igniteInstanceName Name of the Ignite instance this thread is created for.
106-
* @param threadGrp Thread group.
107-
* @param threadName Name of thread.
108-
*/
109-
protected IgniteThread(String igniteInstanceName, ThreadGroup threadGrp, String threadName) {
110-
super(threadGrp, threadName);
111-
112-
this.igniteInstanceName = igniteInstanceName;
113-
this.compositeRwLockIdx = GRP_IDX_UNASSIGNED;
114-
this.stripe = -1;
115-
this.plc = GridIoPolicy.UNDEFINED;
116-
}
117-
118114
/**
119115
* @return Related {@link GridIoPolicy} for internal Ignite pools.
120116
*/

modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
320320
*
321321
* @return Data streamer processor.
322322
*/
323-
public <K, V> DataStreamProcessor<K, V> dataStream();
323+
public DataStreamProcessor dataStream();
324324

325325
/**
326326
* Gets event continuous processor.

modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -838,8 +838,8 @@ else if (!(comp instanceof DiscoveryNodeValidationProcessor
838838
}
839839

840840
/** {@inheritDoc} */
841-
@Override public <K, V> DataStreamProcessor<K, V> dataStream() {
842-
return (DataStreamProcessor<K, V>)dataLdrProc;
841+
@Override public DataStreamProcessor dataStream() {
842+
return dataLdrProc;
843843
}
844844

845845
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1098,7 +1098,7 @@ public void start(
10981098
startProcessor(new GridTaskProcessor(ctx));
10991099
startProcessor((GridProcessor)SCHEDULE.createOptional(ctx));
11001100
startProcessor(createComponent(IgniteRestProcessor.class, ctx));
1101-
startProcessor(new DataStreamProcessor<>(ctx));
1101+
startProcessor(new DataStreamProcessor(ctx));
11021102
startProcessor(new GridContinuousProcessor(ctx));
11031103
startProcessor(new DataStructuresProcessor(ctx));
11041104
startProcessor(createComponent(PlatformProcessor.class, ctx));

0 commit comments

Comments
 (0)