@@ -30,6 +30,7 @@ import (
30
30
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
31
31
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
32
32
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
33
+ "golang.org/x/exp/maps"
33
34
)
34
35
35
36
// DataSource is a Root execution unit.
@@ -40,9 +41,12 @@ type DataSource struct {
40
41
Coder * coder.Coder
41
42
Out Node
42
43
PCol PCollection // Handles size metrics. Value instead of pointer so it's initialized by default in tests.
44
+ // OnTimerTransforms maps PtransformIDs to their execution nodes that handle OnTimer callbacks.
45
+ OnTimerTransforms map [string ]* ParDo
43
46
44
- source DataManager
45
- state StateReader
47
+ source DataManager
48
+ state StateReader
49
+ curInst string
46
50
47
51
index int64
48
52
splitIdx int64
@@ -94,20 +98,79 @@ func (n *DataSource) Up(ctx context.Context) error {
94
98
// StartBundle initializes this datasource for the bundle.
95
99
func (n * DataSource ) StartBundle (ctx context.Context , id string , data DataContext ) error {
96
100
n .mu .Lock ()
101
+ n .curInst = id
97
102
n .source = data .Data
98
103
n .state = data .State
99
104
n .start = time .Now ()
100
- n .index = - 1
105
+ n .index = 0
101
106
n .splitIdx = math .MaxInt64
102
107
n .mu .Unlock ()
103
108
return n .Out .StartBundle (ctx , id , data )
104
109
}
105
110
111
+ // splitSuccess is a marker error to indicate we've reached the split index.
112
+ // Akin to io.EOF.
113
+ var splitSuccess = errors .New ("split index reached" )
114
+
115
+ // process handles converting elements from the data source to timers.
116
+ //
117
+ // The data and timer callback functions must return an io.EOF if the reader terminates to signal that an additional
118
+ // buffer is desired. On successful splits, [splitSuccess] must be returned to indicate that the
119
+ // PTransform is done processing data for this instruction.
120
+ func (n * DataSource ) process (ctx context.Context , data func (bcr * byteCountReader , ptransformID string ) error , timer func (bcr * byteCountReader , ptransformID , timerFamilyID string ) error ) error {
121
+ // The SID contains this instruction's expected data processing transform (this one).
122
+ elms , err := n .source .OpenElementChan (ctx , n .SID , maps .Keys (n .OnTimerTransforms ))
123
+ if err != nil {
124
+ return err
125
+ }
126
+
127
+ n .PCol .resetSize () // initialize the size distribution for this bundle.
128
+ var r bytes.Reader
129
+
130
+ var byteCount int
131
+ bcr := byteCountReader {reader : & r , count : & byteCount }
132
+
133
+ splitPrimaryComplete := map [string ]bool {}
134
+ for {
135
+ var err error
136
+ select {
137
+ case e , ok := <- elms :
138
+ // Channel closed, so time to exit
139
+ if ! ok {
140
+ return nil
141
+ }
142
+ if splitPrimaryComplete [e .PtransformID ] {
143
+ continue
144
+ }
145
+ if len (e .Data ) > 0 {
146
+ r .Reset (e .Data )
147
+ err = data (& bcr , e .PtransformID )
148
+ }
149
+ if len (e .Timers ) > 0 {
150
+ r .Reset (e .Timers )
151
+ err = timer (& bcr , e .PtransformID , e .TimerFamilyID )
152
+ }
153
+
154
+ if err == splitSuccess {
155
+ // Returning splitSuccess means we've split, and aren't consuming the remaining buffer.
156
+ // We mark the PTransform done to ignore further data.
157
+ splitPrimaryComplete [e .PtransformID ] = true
158
+ } else if err != nil && err != io .EOF {
159
+ return errors .Wrap (err , "source failed" )
160
+ }
161
+ // io.EOF means the reader successfully drained.
162
+ // We're ready for a new buffer.
163
+ case <- ctx .Done ():
164
+ return nil
165
+ }
166
+ }
167
+ }
168
+
106
169
// ByteCountReader is a passthrough reader that counts all the bytes read through it.
107
170
// It trusts the nested reader to return accurate byte information.
108
171
type byteCountReader struct {
109
172
count * int
110
- reader io.ReadCloser
173
+ reader io.Reader
111
174
}
112
175
113
176
func (r * byteCountReader ) Read (p []byte ) (int , error ) {
@@ -117,7 +180,10 @@ func (r *byteCountReader) Read(p []byte) (int, error) {
117
180
}
118
181
119
182
func (r * byteCountReader ) Close () error {
120
- return r .reader .Close ()
183
+ if c , ok := r .reader .(io.Closer ); ok {
184
+ c .Close ()
185
+ }
186
+ return nil
121
187
}
122
188
123
189
func (r * byteCountReader ) reset () int {
@@ -128,15 +194,6 @@ func (r *byteCountReader) reset() int {
128
194
129
195
// Process opens the data source, reads and decodes data, kicking off element processing.
130
196
func (n * DataSource ) Process (ctx context.Context ) ([]* Checkpoint , error ) {
131
- r , err := n .source .OpenRead (ctx , n .SID )
132
- if err != nil {
133
- return nil , err
134
- }
135
- defer r .Close ()
136
- n .PCol .resetSize () // initialize the size distribution for this bundle.
137
- var byteCount int
138
- bcr := byteCountReader {reader : r , count : & byteCount }
139
-
140
197
c := coder .SkipW (n .Coder )
141
198
wc := MakeWindowDecoder (n .Coder .Window )
142
199
@@ -155,58 +212,63 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
155
212
}
156
213
157
214
var checkpoints []* Checkpoint
158
- for {
159
- if n .incrementIndexAndCheckSplit () {
160
- break
161
- }
162
- // TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
163
- ws , t , pn , err := DecodeWindowedValueHeader (wc , r )
164
- if err != nil {
165
- if err == io .EOF {
166
- break
215
+ err := n .process (ctx , func (bcr * byteCountReader , ptransformID string ) error {
216
+ for {
217
+ // TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
218
+ ws , t , pn , err := DecodeWindowedValueHeader (wc , bcr .reader )
219
+ if err != nil {
220
+ return err
167
221
}
168
- return nil , errors .Wrap (err , "source failed" )
169
- }
170
-
171
- // Decode key or parallel element.
172
- pe , err := cp .Decode (& bcr )
173
- if err != nil {
174
- return nil , errors .Wrap (err , "source decode failed" )
175
- }
176
- pe .Timestamp = t
177
- pe .Windows = ws
178
- pe .Pane = pn
179
222
180
- var valReStreams []ReStream
181
- for _ , cv := range cvs {
182
- values , err := n .makeReStream (ctx , cv , & bcr , len (cvs ) == 1 && n .singleIterate )
223
+ // Decode key or parallel element.
224
+ pe , err := cp .Decode (bcr )
183
225
if err != nil {
184
- return nil , err
226
+ return errors . Wrap ( err , "source decode failed" )
185
227
}
186
- valReStreams = append (valReStreams , values )
187
- }
228
+ pe .Timestamp = t
229
+ pe .Windows = ws
230
+ pe .Pane = pn
188
231
189
- if err := n .Out .ProcessElement (ctx , pe , valReStreams ... ); err != nil {
190
- return nil , err
191
- }
192
- // Collect the actual size of the element, and reset the bytecounter reader.
193
- n .PCol .addSize (int64 (bcr .reset ()))
194
- bcr .reader = r
195
-
196
- // Check if there's a continuation and return residuals
197
- // Needs to be done immeadiately after processing to not lose the element.
198
- if c := n .getProcessContinuation (); c != nil {
199
- cp , err := n .checkpointThis (ctx , c )
200
- if err != nil {
201
- // Errors during checkpointing should fail a bundle.
202
- return nil , err
232
+ var valReStreams []ReStream
233
+ for _ , cv := range cvs {
234
+ values , err := n .makeReStream (ctx , cv , bcr , len (cvs ) == 1 && n .singleIterate )
235
+ if err != nil {
236
+ return err
237
+ }
238
+ valReStreams = append (valReStreams , values )
203
239
}
204
- if cp != nil {
205
- checkpoints = append (checkpoints , cp )
240
+
241
+ if err := n .Out .ProcessElement (ctx , pe , valReStreams ... ); err != nil {
242
+ return err
243
+ }
244
+ // Collect the actual size of the element, and reset the bytecounter reader.
245
+ n .PCol .addSize (int64 (bcr .reset ()))
246
+
247
+ // Check if there's a continuation and return residuals
248
+ // Needs to be done immediately after processing to not lose the element.
249
+ if c := n .getProcessContinuation (); c != nil {
250
+ cp , err := n .checkpointThis (ctx , c )
251
+ if err != nil {
252
+ // Errors during checkpointing should fail a bundle.
253
+ return err
254
+ }
255
+ if cp != nil {
256
+ checkpoints = append (checkpoints , cp )
257
+ }
258
+ }
259
+ // We've finished processing an element, check if we have finished a split.
260
+ if n .incrementIndexAndCheckSplit () {
261
+ return splitSuccess
206
262
}
207
263
}
208
- }
209
- return checkpoints , nil
264
+ },
265
+ func (bcr * byteCountReader , ptransformID , timerFamilyID string ) error {
266
+ tmap , err := decodeTimer (cp , wc , bcr )
267
+ log .Infof (ctx , "DEBUGLOG: timer received for: %v and %v - %+v err: %v" , ptransformID , timerFamilyID , tmap , err )
268
+ return nil
269
+ })
270
+
271
+ return checkpoints , err
210
272
}
211
273
212
274
func (n * DataSource ) makeReStream (ctx context.Context , cv ElementDecoder , bcr * byteCountReader , onlyStream bool ) (ReStream , error ) {
@@ -313,7 +375,7 @@ func (n *DataSource) makeReStream(ctx context.Context, cv ElementDecoder, bcr *b
313
375
}
314
376
}
315
377
316
- func readStreamToBuffer (cv ElementDecoder , r io.ReadCloser , size int64 , buf []FullValue ) ([]FullValue , error ) {
378
+ func readStreamToBuffer (cv ElementDecoder , r io.Reader , size int64 , buf []FullValue ) ([]FullValue , error ) {
317
379
for i := int64 (0 ); i < size ; i ++ {
318
380
value , err := cv .Decode (r )
319
381
if err != nil {
@@ -472,7 +534,7 @@ func (n *DataSource) checkpointThis(ctx context.Context, pc sdf.ProcessContinuat
472
534
// The bufSize param specifies the estimated number of elements that will be
473
535
// sent to this DataSource, and is used to be able to perform accurate splits
474
536
// even if the DataSource has not yet received all its elements. A bufSize of
475
- // 0 or less indicates that its unknown, and so uses the current known size.
537
+ // 0 or less indicates that it's unknown, and so uses the current known size.
476
538
func (n * DataSource ) Split (ctx context.Context , splits []int64 , frac float64 , bufSize int64 ) (SplitResult , error ) {
477
539
if n == nil {
478
540
return SplitResult {}, fmt .Errorf ("failed to split at requested splits: {%v}, DataSource not initialized" , splits )
0 commit comments