Skip to content

Commit af9f47f

Browse files
committed
Fix all tour examples from part 1
1 parent 935ca9c commit af9f47f

File tree

13 files changed

+254
-221
lines changed

13 files changed

+254
-221
lines changed

Diff for: learning/tour-of-beam/learning-content/common-transforms/aggregation/count/go-example/main.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ package main
2828

2929
import (
3030
"context"
31+
3132
"github.com/apache/beam/sdks/v2/go/pkg/beam"
3233
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
3334
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
@@ -37,6 +38,7 @@ import (
3738

3839
func main() {
3940
ctx := context.Background()
41+
beam.Init()
4042

4143
p, s := beam.NewPipelineWithRoot()
4244

@@ -51,7 +53,7 @@ func main() {
5153
err := beamx.Run(ctx, p)
5254

5355
if err != nil {
54-
log.Exitf(context.Background(), "Failed to execute job: %v", err)
56+
log.Exitf(ctx, "Failed to execute job: %v", err)
5557
}
5658
}
5759

Diff for: learning/tour-of-beam/learning-content/common-transforms/aggregation/max/go-example/main.go

+22-19
Original file line numberDiff line numberDiff line change
@@ -27,34 +27,37 @@
2727
package main
2828

2929
import (
30-
"context"
31-
"github.com/apache/beam/sdks/v2/go/pkg/beam"
32-
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
33-
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
34-
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
35-
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
30+
"context"
31+
32+
"github.com/apache/beam/sdks/v2/go/pkg/beam"
33+
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
34+
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
35+
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
36+
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
3637
)
3738

3839
func main() {
39-
ctx := context.Background()
40+
ctx := context.Background()
41+
beam.Init()
4042

41-
p, s := beam.NewPipelineWithRoot()
43+
p, s := beam.NewPipelineWithRoot()
4244

43-
// List of elements
44-
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
45+
// List of elements
46+
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
4547

46-
// The applyTransform() converts [input] to [output]
47-
output := applyTransform(s,input)
48+
// The applyTransform() converts [input] to [output]
49+
output := applyTransform(s, input)
4850

49-
debug.Printf(s, "PCollection maximum value: %v", output)
51+
debug.Printf(s, "PCollection maximum value: %v", output)
5052

51-
err := beamx.Run(ctx, p)
53+
err := beamx.Run(ctx, p)
5254

53-
if err != nil {
54-
log.Exitf(context.Background(), "Failed to execute job: %v", err)
55-
}
55+
if err != nil {
56+
log.Exitf(ctx, "Failed to execute job: %v", err)
57+
}
5658
}
59+
5760
// Return the maximum number from `PCollection`.
5861
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
59-
return stats.Max(s, input)
60-
}
62+
return stats.Max(s, input)
63+
}

Diff for: learning/tour-of-beam/learning-content/common-transforms/aggregation/mean/go-example/main.go

+21-19
Original file line numberDiff line numberDiff line change
@@ -27,35 +27,37 @@
2727
package main
2828

2929
import (
30-
"context"
31-
"github.com/apache/beam/sdks/v2/go/pkg/beam"
32-
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
33-
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
34-
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
35-
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
30+
"context"
31+
32+
"github.com/apache/beam/sdks/v2/go/pkg/beam"
33+
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
34+
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
35+
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
36+
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
3637
)
3738

3839
func main() {
39-
ctx := context.Background()
40+
ctx := context.Background()
41+
beam.Init()
4042

41-
p, s := beam.NewPipelineWithRoot()
43+
p, s := beam.NewPipelineWithRoot()
4244

43-
// List of elements
44-
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
45+
// List of elements
46+
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
4547

46-
// The applyTransform() converts [input] to [output]
47-
output := applyTransform(s, input)
48+
// The applyTransform() converts [input] to [output]
49+
output := applyTransform(s, input)
4850

49-
debug.Printf(s, "PCollection mean value: %v", output)
51+
debug.Printf(s, "PCollection mean value: %v", output)
5052

51-
err := beamx.Run(ctx, p)
53+
err := beamx.Run(ctx, p)
5254

53-
if err != nil {
54-
log.Exitf(context.Background(), "Failed to execute job: %v", err)
55-
}
55+
if err != nil {
56+
log.Exitf(ctx, "Failed to execute job: %v", err)
57+
}
5658
}
5759

5860
// Return the mean of numbers from `PCollection`.
5961
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
60-
return stats.Mean(s, input)
61-
}
62+
return stats.Mean(s, input)
63+
}

Diff for: learning/tour-of-beam/learning-content/common-transforms/aggregation/min/go-example/main.go

+21-19
Original file line numberDiff line numberDiff line change
@@ -27,35 +27,37 @@
2727
package main
2828

2929
import (
30-
"context"
31-
"github.com/apache/beam/sdks/v2/go/pkg/beam"
32-
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
33-
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
34-
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
35-
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
30+
"context"
31+
32+
"github.com/apache/beam/sdks/v2/go/pkg/beam"
33+
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
34+
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
35+
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
36+
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
3637
)
3738

3839
func main() {
39-
ctx := context.Background()
40+
ctx := context.Background()
41+
beam.Init()
4042

41-
p, s := beam.NewPipelineWithRoot()
43+
p, s := beam.NewPipelineWithRoot()
4244

43-
// List of elements
44-
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
45+
// List of elements
46+
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
4547

46-
// The applyTransform() converts [input] to [output]
47-
output := applyTransform(s, input)
48+
// The applyTransform() converts [input] to [output]
49+
output := applyTransform(s, input)
4850

49-
debug.Printf(s, "PCollection minimum value: %v", output)
51+
debug.Printf(s, "PCollection minimum value: %v", output)
5052

51-
err := beamx.Run(ctx, p)
53+
err := beamx.Run(ctx, p)
5254

53-
if err != nil {
54-
log.Exitf(context.Background(), "Failed to execute job: %v", err)
55-
}
55+
if err != nil {
56+
log.Exitf(ctx, "Failed to execute job: %v", err)
57+
}
5658
}
5759

5860
// Return the minimum of numbers from `PCollection`.
5961
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
60-
return stats.Min(s, input)
61-
}
62+
return stats.Min(s, input)
63+
}

Diff for: learning/tour-of-beam/learning-content/common-transforms/aggregation/sum/go-example/main.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ package main
2828

2929
import (
3030
"context"
31+
3132
"github.com/apache/beam/sdks/v2/go/pkg/beam"
3233
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
3334
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
@@ -37,6 +38,7 @@ import (
3738

3839
func main() {
3940
ctx := context.Background()
41+
beam.Init()
4042

4143
p, s := beam.NewPipelineWithRoot()
4244

@@ -51,7 +53,7 @@ func main() {
5153
err := beamx.Run(ctx, p)
5254

5355
if err != nil {
54-
log.Exitf(context.Background(), "Failed to execute job: %v", err)
56+
log.Exitf(ctx, "Failed to execute job: %v", err)
5557
}
5658
}
5759

Diff for: learning/tour-of-beam/learning-content/common-transforms/filter/go-example/main.go

+22-22
Original file line numberDiff line numberDiff line change
@@ -27,39 +27,39 @@
2727
package main
2828

2929
import (
30-
"context"
31-
"github.com/apache/beam/sdks/v2/go/pkg/beam"
32-
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
33-
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
34-
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
35-
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
30+
"context"
31+
32+
"github.com/apache/beam/sdks/v2/go/pkg/beam"
33+
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
34+
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
35+
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
36+
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
3637
)
3738

3839
func main() {
39-
ctx := context.Background()
40+
ctx := context.Background()
41+
beam.Init()
4042

41-
p, s := beam.NewPipelineWithRoot()
43+
p, s := beam.NewPipelineWithRoot()
4244

43-
// List of elements
44-
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
45+
// List of elements
46+
input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
4547

46-
// The [input] filtered with the applyTransform()
47-
output := applyTransform(s, input)
48+
// The [input] filtered with the applyTransform()
49+
output := applyTransform(s, input)
4850

49-
debug.Printf(s, "PCollection filtered value: %v", output)
51+
debug.Printf(s, "PCollection filtered value: %v", output)
5052

51-
err := beamx.Run(ctx, p)
53+
err := beamx.Run(ctx, p)
5254

53-
if err != nil {
54-
log.Exitf(context.Background(), "Failed to execute job: %v", err)
55-
}
55+
if err != nil {
56+
log.Exitf(ctx, "Failed to execute job: %v", err)
57+
}
5658
}
5759

5860
// The method filters the collection so that the numbers are even
5961
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
60-
return filter.Exclude(s, input, func(element int) bool {
61-
return element % 2 == 1
62-
})
62+
return filter.Exclude(s, input, func(element int) bool {
63+
return element%2 == 1
64+
})
6365
}
64-
65-

Diff for: learning/tour-of-beam/learning-content/common-transforms/motivating-challenge/go-challenge/main.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,21 @@ package main
2828

2929
import (
3030
"context"
31+
"strconv"
32+
"strings"
33+
3134
"github.com/apache/beam/sdks/v2/go/pkg/beam"
3235
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
3336
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
3437
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
3538
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
3639
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
3740
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
38-
"strconv"
39-
"strings"
4041
)
4142

4243
func main() {
4344
ctx := context.Background()
45+
beam.Init()
4446

4547
p, s := beam.NewPipelineWithRoot()
4648

@@ -71,7 +73,7 @@ func main() {
7173
err := beamx.Run(ctx, p)
7274

7375
if err != nil {
74-
log.Exitf(context.Background(), "Failed to execute job: %v", err)
76+
log.Exitf(ctx, "Failed to execute job: %v", err)
7577
}
7678
}
7779

Diff for: learning/tour-of-beam/learning-content/common-transforms/motivating-challenge/go-solution/main.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,21 @@ package main
2828

2929
import (
3030
"context"
31+
"strconv"
32+
"strings"
33+
3134
"github.com/apache/beam/sdks/v2/go/pkg/beam"
3235
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
3336
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
3437
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
3538
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
3639
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
3740
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
38-
"strconv"
39-
"strings"
4041
)
4142

4243
func main() {
4344
ctx := context.Background()
45+
beam.Init()
4446

4547
p, s := beam.NewPipelineWithRoot()
4648

@@ -73,7 +75,7 @@ func main() {
7375
err := beamx.Run(ctx, p)
7476

7577
if err != nil {
76-
log.Exitf(context.Background(), "Failed to execute job: %v", err)
78+
log.Exitf(ctx, "Failed to execute job: %v", err)
7779
}
7880
}
7981

0 commit comments

Comments
 (0)