@@ -58,16 +58,31 @@ const (
58
58
// TODO(https://github.com/apache/beam/issues/20267): Return a mapping of filename to dependency, rather than []*jobpb.ArtifactMetadata.
59
59
// TODO(https://github.com/apache/beam/issues/20267): Leverage richness of roles rather than magic names to understand artifacts.
60
60
func Materialize (ctx context.Context , endpoint string , dependencies []* pipepb.ArtifactInformation , rt string , dest string ) ([]* pipepb.ArtifactInformation , error ) {
61
+ log .Printf ("Materialize called: endpoint=%s, num_dependencies=%d, retrieval_token=%s, dest=%s" , endpoint , len (dependencies ), rt , dest )
62
+ var artifacts []* pipepb.ArtifactInformation
63
+ var err error
64
+
61
65
if len (dependencies ) > 0 {
62
- return newMaterialize (ctx , endpoint , dependencies , dest )
66
+ log .Printf ("Using new artifact materialization logic (dependencies provided)." )
67
+ artifacts , err = newMaterialize (ctx , endpoint , dependencies , dest )
63
68
} else if rt == "" || rt == NoArtifactsStaged {
64
- return []* pipepb.ArtifactInformation {}, nil
69
+ log .Printf ("No artifacts to materialize (empty retrieval token or special value)." )
70
+ artifacts , err = []* pipepb.ArtifactInformation {}, nil
65
71
} else {
66
- return legacyMaterialize (ctx , endpoint , rt , dest )
72
+ log .Printf ("Using legacy artifact materialization logic (retrieval token provided)." )
73
+ artifacts , err = legacyMaterialize (ctx , endpoint , rt , dest )
67
74
}
75
+
76
+ if err != nil {
77
+ log .Printf ("Materialize finished with error: %v" , err )
78
+ } else {
79
+ log .Printf ("Materialize finished successfully, returning %d artifacts." , len (artifacts ))
80
+ }
81
+ return artifacts , err
68
82
}
69
83
70
84
func newMaterialize (ctx context.Context , endpoint string , dependencies []* pipepb.ArtifactInformation , dest string ) ([]* pipepb.ArtifactInformation , error ) {
85
+ log .Printf ("newMaterialize: Dialing artifact endpoint %s" , endpoint )
71
86
cc , err := grpcx .Dial (ctx , endpoint , 2 * time .Minute )
72
87
if err != nil {
73
88
return nil , err
@@ -78,10 +93,13 @@ func newMaterialize(ctx context.Context, endpoint string, dependencies []*pipepb
78
93
}
79
94
80
95
func newMaterializeWithClient (ctx context.Context , client jobpb.ArtifactRetrievalServiceClient , dependencies []* pipepb.ArtifactInformation , dest string ) ([]* pipepb.ArtifactInformation , error ) {
96
+ log .Printf ("newMaterializeWithClient: Resolving %d artifacts" , len (dependencies ))
81
97
resolution , err := client .ResolveArtifacts (ctx , & jobpb.ResolveArtifactsRequest {Artifacts : dependencies })
82
98
if err != nil {
99
+ log .Printf ("newMaterializeWithClient: Error resolving artifacts: %v" , err )
83
100
return nil , err
84
101
}
102
+ log .Printf ("newMaterializeWithClient: Resolved %d replacements" , len (resolution .Replacements ))
85
103
86
104
var artifacts []* pipepb.ArtifactInformation
87
105
var list []retrievable
@@ -134,7 +152,14 @@ func newMaterializeWithClient(ctx context.Context, client jobpb.ArtifactRetrieva
134
152
})
135
153
}
136
154
137
- return artifacts , MultiRetrieve (ctx , 10 , list , dest )
155
+ log .Printf ("newMaterializeWithClient: Preparing to retrieve %d artifacts via MultiRetrieve" , len (list ))
156
+ err = MultiRetrieve (ctx , 10 , list , dest )
157
+ if err != nil {
158
+ log .Printf ("newMaterializeWithClient: MultiRetrieve failed: %v" , err )
159
+ } else {
160
+ log .Printf ("newMaterializeWithClient: MultiRetrieve succeeded." )
161
+ }
162
+ return artifacts , err
138
163
}
139
164
140
165
// Used for generating unique IDs. We assign uniquely generated names to staged files without staging names.
@@ -255,19 +280,24 @@ func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w io.W
255
280
}
256
281
257
282
func legacyMaterialize (ctx context.Context , endpoint string , rt string , dest string ) ([]* pipepb.ArtifactInformation , error ) {
283
+ log .Printf ("legacyMaterialize: Dialing artifact endpoint %s" , endpoint )
258
284
cc , err := grpcx .Dial (ctx , endpoint , 2 * time .Minute )
259
285
if err != nil {
286
+ log .Printf ("legacyMaterialize: Error dialing endpoint: %v" , err )
260
287
return nil , err
261
288
}
262
289
defer cc .Close ()
263
290
264
291
client := jobpb .NewLegacyArtifactRetrievalServiceClient (cc )
265
292
293
+ log .Printf ("legacyMaterialize: Getting manifest with retrieval token %s" , rt )
266
294
m , err := client .GetManifest (ctx , & jobpb.GetManifestRequest {RetrievalToken : rt })
267
295
if err != nil {
296
+ log .Printf ("legacyMaterialize: Error getting manifest: %v" , err )
268
297
return nil , errors .Wrap (err , "failed to get manifest" )
269
298
}
270
299
mds := m .GetManifest ().GetArtifact ()
300
+ log .Printf ("legacyMaterialize: Got manifest with %d artifacts" , len (mds ))
271
301
272
302
var artifacts []* pipepb.ArtifactInformation
273
303
var list []retrievable
@@ -298,13 +328,22 @@ func legacyMaterialize(ctx context.Context, endpoint string, rt string, dest str
298
328
})
299
329
}
300
330
301
- return artifacts , MultiRetrieve (ctx , 10 , list , dest )
331
+ log .Printf ("legacyMaterialize: Preparing to retrieve %d artifacts via MultiRetrieve" , len (list ))
332
+ err = MultiRetrieve (ctx , 10 , list , dest )
333
+ if err != nil {
334
+ log .Printf ("legacyMaterialize: MultiRetrieve failed: %v" , err )
335
+ } else {
336
+ log .Printf ("legacyMaterialize: MultiRetrieve succeeded." )
337
+ }
338
+ return artifacts , err
302
339
}
303
340
304
341
// MultiRetrieve retrieves multiple artifacts concurrently, using at most 'cpus'
305
342
// goroutines. It retries each artifact a few times. Convenience wrapper.
306
343
func MultiRetrieve (ctx context.Context , cpus int , list []retrievable , dest string ) error {
344
+ log .Printf ("MultiRetrieve: Starting retrieval of %d artifacts with %d workers to %s" , len (list ), cpus , dest )
307
345
if len (list ) == 0 {
346
+ log .Printf ("MultiRetrieve: No artifacts to retrieve." )
308
347
return nil
309
348
}
310
349
if cpus < 1 {
@@ -337,17 +376,29 @@ func MultiRetrieve(ctx context.Context, cpus int, list []retrievable, dest strin
337
376
}
338
377
failures = append (failures , err .Error ())
339
378
if len (failures ) > attempts {
340
- permErr .TrySetError (errors .Errorf ("failed to retrieve %v in %v attempts: %v" , dest , attempts , strings .Join (failures , "; " )))
379
+ errMsg := errors .Errorf ("failed to retrieve artifact in %v attempts: %v" , attempts , strings .Join (failures , "; " ))
380
+ log .Printf ("MultiRetrieve worker: Giving up after %d attempts: %v" , attempts , errMsg )
381
+ permErr .TrySetError (errMsg )
341
382
break // give up
342
383
}
343
- time .Sleep (time .Duration (rand .Intn (5 )+ 1 ) * time .Second )
384
+ sleepDuration := time .Duration (rand .Intn (5 )+ 1 ) * time .Second
385
+ log .Printf ("MultiRetrieve worker: Retrying after error (%d/%d attempts), sleeping for %v: %v" , len (failures ), attempts , sleepDuration , err )
386
+ time .Sleep (sleepDuration )
387
+ } else {
388
+ log .Printf ("MultiRetrieve worker : Successfully retrieved artifact .")
344
389
}
345
390
}
346
391
}()
347
392
}
348
393
wg .Wait ()
349
394
350
- return permErr .Error ()
395
+ finalErr := permErr .Error ()
396
+ if finalErr != nil {
397
+ log .Printf ("MultiRetrieve: Finished with error: %v" , finalErr )
398
+ } else {
399
+ log .Printf ("MultiRetrieve: Finished successfully." )
400
+ }
401
+ return finalErr
351
402
}
352
403
353
404
type retrievable interface {
0 commit comments