@@ -64,6 +64,8 @@ object ExtendedWindowOperator {
64
64
65
65
}
66
66
67
+ private [aggregate] case class NuWindowContextHolder (var nuWindowContext : NuWindowContext )
68
+
67
69
private [aggregate] sealed trait NuWindowContext
68
70
69
71
private [aggregate] final case class OnElementWindowContext (
@@ -82,7 +84,7 @@ private[aggregate] class ExtendedWindowOperator[A](
82
84
aggregateFunction : AggregateFunction [Input [A ], AnyRef , AnyRef ],
83
85
trigger : Trigger [_ >: Input [A ], TimeWindow ],
84
86
preserveContext : Boolean ,
85
- private val contextHolderRef : AtomicReference [ NuWindowContext ] = new AtomicReference (OnTimerWindowContext )
87
+ private val contextHolderRef : NuWindowContextHolder = NuWindowContextHolder (OnTimerWindowContext )
86
88
) extends WindowOperator [String , Input [A ], AnyRef , ValueWithContext [AnyRef ], TimeWindow ](
87
89
assigner,
88
90
assigner.getWindowSerializer(stream.getExecutionConfig),
@@ -102,13 +104,12 @@ private[aggregate] class ExtendedWindowOperator[A](
102
104
) {
103
105
104
106
override def processElement (element : StreamRecord [ValueWithContext [StringKeyedValue [A ]]]): Unit = {
105
- contextHolderRef.set(
107
+ contextHolderRef.nuWindowContext =
106
108
OnElementWindowContext (if (preserveContext) Some (element.getValue.context) else None , element.getTimestamp)
107
- )
108
109
try {
109
110
super .processElement(element)
110
111
} finally {
111
- contextHolderRef.set( OnTimerWindowContext )
112
+ contextHolderRef.nuWindowContext = OnTimerWindowContext
112
113
}
113
114
}
114
115
@@ -117,7 +118,7 @@ private[aggregate] class ExtendedWindowOperator[A](
117
118
private class ValueEmittingWindowFunction (
118
119
convertToEngineRuntimeContext : RuntimeContext => EngineRuntimeContext ,
119
120
nodeId : String ,
120
- private val contextHolderRef : AtomicReference [ NuWindowContext ]
121
+ private val contextHolderRef : NuWindowContextHolder
121
122
) extends ProcessWindowFunction [AnyRef , ValueWithContext [AnyRef ], String , TimeWindow ] {
122
123
123
124
@ transient
@@ -134,7 +135,7 @@ private class ValueEmittingWindowFunction(
134
135
out : Collector [ValueWithContext [AnyRef ]]
135
136
): Unit = {
136
137
elements.forEach { element =>
137
- val contextOpt = contextHolderRef.get match {
138
+ val contextOpt = contextHolderRef.nuWindowContext match {
138
139
case OnElementWindowContext (contextToPreserve, timestampToOverride) =>
139
140
// in current flink implementation out is always of type TimestampedCollector
140
141
out.asInstanceOf [TimestampedCollector [_]].setAbsoluteTimestamp(timestampToOverride)
0 commit comments