Skip to content

Commit 6cd6410

Browse files
authored
[#26902] Add switch for supporting cross compile arm64 for dataflow (#27649)
* [#26902] Add switch for supporting cross compile arm64 for dataflow * fix gradles. --------- Co-authored-by: lostluck <[email protected]>
1 parent f8e103f commit 6cd6410

File tree

4 files changed

+52
-6
lines changed

4 files changed

+52
-6
lines changed

sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"encoding/json"
2323
"os"
24+
"strings"
2425

2526
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
2627
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
@@ -47,7 +48,12 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker
4748
} else {
4849
// Cross-compile as last resort.
4950

50-
worker, err := runnerlib.BuildTempWorkerBinary(ctx)
51+
var copts runnerlib.CompileOpts
52+
if strings.HasPrefix(opts.MachineType, "t2a") {
53+
copts.Arch = "arm64"
54+
}
55+
56+
worker, err := runnerlib.BuildTempWorkerBinary(ctx, copts)
5157
if err != nil {
5258
return presult, err
5359
}

sdks/go/pkg/beam/runners/universal/runnerlib/compile.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,20 @@ func IsWorkerCompatibleBinary() (string, bool) {
4141

4242
var unique int32
4343

44+
// CompileOpts are additional options for dynamic compiles of the local code
45+
// for development purposes. Production runs should build the worker binary
46+
// separately for the target environment.
47+
// See https://beam.apache.org/documentation/sdks/go-cross-compilation/ for details.
48+
type CompileOpts struct {
49+
OS, Arch string
50+
}
51+
4452
// BuildTempWorkerBinary creates a local worker binary in the tmp directory
4553
// for linux/amd64. Caller responsible for deleting the binary.
46-
func BuildTempWorkerBinary(ctx context.Context) (string, error) {
54+
func BuildTempWorkerBinary(ctx context.Context, opts CompileOpts) (string, error) {
4755
id := atomic.AddInt32(&unique, 1)
4856
filename := filepath.Join(os.TempDir(), fmt.Sprintf("worker-%v-%v", id, time.Now().UnixNano()))
49-
if err := buildWorkerBinary(ctx, filename); err != nil {
57+
if err := buildWorkerBinary(ctx, filename, opts); err != nil {
5058
return "", err
5159
}
5260
return filename, nil
@@ -59,7 +67,7 @@ func BuildTempWorkerBinary(ctx context.Context) (string, error) {
5967
// * /Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go (skip: 3)
6068
// /usr/local/go/src/runtime/proc.go (skip: 4) // not always present
6169
// /usr/local/go/src/runtime/asm_amd64.s (skip: 4 or 5)
62-
func buildWorkerBinary(ctx context.Context, filename string) error {
70+
func buildWorkerBinary(ctx context.Context, filename string, opts CompileOpts) error {
6371
program := ""
6472
var isTest bool
6573
for i := 3; ; i++ {
@@ -77,9 +85,17 @@ func buildWorkerBinary(ctx context.Context, filename string) error {
7785
}
7886
goos := "linux"
7987
goarch := "amd64"
88+
89+
if opts.OS != "" {
90+
goos = opts.OS
91+
}
92+
if opts.Arch != "" {
93+
goarch = opts.Arch
94+
}
95+
8096
cgo := "0"
8197

82-
log.Infof(ctx, "Cross-compiling %v with GOOS=%s GOARCH=%s CGO_ENABLED=%s as %v", goos, goarch, cgo, program, filename)
98+
log.Infof(ctx, "Cross-compiling %v with GOOS=%s GOARCH=%s CGO_ENABLED=%s as %v", program, goos, goarch, cgo, filename)
8399

84100
// Cross-compile given go program. Not awesome.
85101
program = program[:strings.LastIndex(program, "/")+1]

sdks/go/pkg/beam/runners/universal/runnerlib/execute.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO
4848
} else {
4949
// Cross-compile as last resort.
5050

51-
worker, err := BuildTempWorkerBinary(ctx)
51+
worker, err := BuildTempWorkerBinary(ctx, CompileOpts{})
5252
if err != nil {
5353
return presult, err
5454
}

sdks/go/test/build.gradle

+24
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,30 @@ task dataflowValidatesRunner() {
4545
}
4646
}
4747

48+
// ValidatesRunner tests for Dataflow. Runs tests in the integration directory
49+
// with Dataflow to validate that the runner behaves as expected, on arm64 machines.
50+
task dataflowValidatesRunnerARM64() {
51+
group = "Verification"
52+
53+
dependsOn ":sdks:go:test:goBuild"
54+
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
55+
56+
doLast {
57+
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
58+
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
59+
"--machine_type=t2a-standard-1",
60+
]
61+
def options = [
62+
"--runner dataflow",
63+
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
64+
]
65+
exec {
66+
executable "sh"
67+
args "-c", "./run_validatesrunner_tests.sh ${options.join(' ')}"
68+
}
69+
}
70+
}
71+
4872
// ValidatesRunner tests for Flink. Runs tests in the integration directory
4973
// with Flink to validate that the runner behaves as expected.
5074
task flinkValidatesRunner {

0 commit comments

Comments
 (0)