Skip to content

Commit 710c318

Browse files
committed
Rename to max active bundles per worker
1 parent b9acd87 commit 710c318

File tree

6 files changed

+72
-72
lines changed

6 files changed

+72
-72
lines changed

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2026,9 +2026,9 @@ message StandardResourceHints {
20262026
// SDKs should accept and validate a positive integer count.
20272027
// Payload: ASCII encoded string of the base 10 representation of an integer number of CPUs.
20282028
CPU_COUNT = 2 [(beam_urn) = "beam:resources:cpu_count:v1"];
2029-
// Describes max number of active dofns per worker in transform's execution environment.
2029+
// Describes max number of active bundles per worker in transform's execution environment.
20302030
// SDKs should accept and validate a positive integer count.
2031-
// Payload: ASCII encoded string of the base 10 representation of an integer number of active dofns.
2032-
MAX_ACTIVE_DOFN_PER_WORKER = 3 [(beam_urn) = "beam:resources:max_active_dofn_per_worker:v1"];
2031+
// Payload: ASCII encoded string of the base 10 representation of an integer number of active bundles.
2032+
MAX_ACTIVE_BUNDLES_PER_WORKER = 3 [(beam_urn) = "beam:resources:max_actives_bundle_per_worker:v1"];
20332033
}
20342034
}

sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go

Lines changed: 32 additions & 32 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdks/go/pkg/beam/options/resource/hint.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -159,40 +159,40 @@ func (h minRAMHint) String() string {
159159
return fmt.Sprintf("min_ram=%v", humanize.Bytes(uint64(h.value)))
160160
}
161161

162-
// MaxParallelismPerWorker hints that this scope should be worked by max this number of messages in parallel on the
162+
// MaxActiveBundlesPerWorker hints that this scope should be worked by max this number of messages in parallel on the
163163
// same machine.
164164
//
165165
// Hints are advisory only and runners may not respect them.
166166
//
167167
// See https://beam.apache.org/documentation/runtime/resource-hints/ for more information about
168168
// resource hints.
169-
func MaxParallelismPerWorker(v int64) Hint {
170-
return maxActiveDoFnPerWorkerHint{value: v}
169+
func MaxActiveBundlesPerWorker(v int64) Hint {
170+
return maxActiveBundlesPerWorkerHint{value: v}
171171
}
172172

173-
type maxActiveDoFnPerWorkerHint struct {
173+
type maxActiveBundlesPerWorkerHint struct {
174174
value int64
175175
}
176176

177-
func (h maxActiveDoFnPerWorkerHint) URN() string {
178-
return "beam:resources:max_active_dofn_per_worker:v1"
177+
func (h maxActiveBundlesPerWorkerHint) URN() string {
178+
return "beam:resources:max_active_bundles_per_worker:v1"
179179
}
180180

181-
func (h maxActiveDoFnPerWorkerHint) Payload() []byte {
181+
func (h maxActiveBundlesPerWorkerHint) Payload() []byte {
182182
// Go strings are utf8, and if the string is ascii,
183183
// byte conversion handles that directly.
184184
return []byte(strconv.FormatInt(h.value, 10))
185185
}
186186

187-
func (h maxActiveDoFnPerWorkerHint) MergeWithOuter(outer Hint) Hint {
187+
func (h maxActiveBundlesPerWorkerHint) MergeWithOuter(outer Hint) Hint {
188188
// Intentional runtime panic from type assertion to catch hint merge errors.
189-
if outer.(maxActiveDoFnPerWorkerHint).value < h.value {
189+
if outer.(maxActiveBundlesPerWorkerHint).value < h.value {
190190
return outer
191191
}
192192
return h
193193
}
194194

195-
func (h maxActiveDoFnPerWorkerHint) String() string {
195+
func (h maxActiveBundlesPerWorkerHint) String() string {
196196
return fmt.Sprintf("max_parallelism_per_worker=%v", uint64(h.value))
197197
}
198198

@@ -208,7 +208,7 @@ func ParseMaxParallelismPerWorker(v string) Hint {
208208
if err != nil {
209209
panic(fmt.Sprintf("resource.ParseMaxParallelismPerWorker: unable to parse %q: %v", v, err))
210210
}
211-
return MaxParallelismPerWorker(int64(b))
211+
return MaxActiveBundlesPerWorker(int64(b))
212212
}
213213

214214
// Accelerator hints that this scope should be put in a machine with a given accelerator.

sdks/go/pkg/beam/options/resource/hint_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ func TestCPUCountHint_Payload(t *testing.T) {
144144
}
145145

146146
func TestMaxActiveDoFnPerWorkerHint_MergeWith(t *testing.T) {
147-
low := maxActiveDoFnPerWorkerHint{value: 2}
148-
high := maxActiveDoFnPerWorkerHint{value: 4}
147+
low := maxActiveBundlesPerWorkerHint{value: 2}
148+
high := maxActiveBundlesPerWorkerHint{value: 4}
149149

150150
if got, want := low.MergeWithOuter(high), low; got != want {
151151
t.Errorf("%v.MergeWith(%v) = %v, want %v", low, high, got, want)
@@ -166,7 +166,7 @@ func TestMaxParallelismPerWorkerHint_Payload(t *testing.T) {
166166
}
167167

168168
for _, test := range tests {
169-
h := maxActiveDoFnPerWorkerHint{value: int64(test.value)}
169+
h := maxActiveBundlesPerWorkerHint{value: int64(test.value)}
170170
if got, want := h.Payload(), []byte(test.payload); !bytes.Equal(got, want) {
171171
t.Errorf("%v.Payload() = %v, want %v", h, got, want)
172172
}
@@ -196,8 +196,8 @@ func TestStandardHintUrns(t *testing.T) {
196196
h: CPUCount(4),
197197
urn: getStandardURN(pipepb.StandardResourceHints_CPU_COUNT),
198198
}, {
199-
h: MaxParallelismPerWorker(2),
200-
urn: getStandardURN(pipepb.StandardResourceHints_MAX_ACTIVE_DOFN_PER_WORKER),
199+
h: MaxActiveBundlesPerWorker(2),
200+
urn: getStandardURN(pipepb.StandardResourceHints_MAX_ACTIVE_BUNDLES_PER_WORKER),
201201
}}
202202

203203
for _, test := range tests {
@@ -292,14 +292,14 @@ func TestHints_MergeWithOuter(t *testing.T) {
292292

293293
func TestHints_Payloads(t *testing.T) {
294294
{
295-
hs := NewHints(MinRAMBytes(2e9), Accelerator("type:jeans;count1;"), CPUCount(4), MaxParallelismPerWorker(2))
295+
hs := NewHints(MinRAMBytes(2e9), Accelerator("type:jeans;count1;"), CPUCount(4), MaxActiveBundlesPerWorker(2))
296296

297297
got := hs.Payloads()
298298
want := map[string][]byte{
299-
"beam:resources:min_ram_bytes:v1": []byte("2000000000"),
300-
"beam:resources:accelerator:v1": []byte("type:jeans;count1;"),
301-
"beam:resources:cpu_count:v1": []byte("4"),
302-
"beam:resources:max_active_dofn_per_worker:v1": []byte("2"),
299+
"beam:resources:min_ram_bytes:v1": []byte("2000000000"),
300+
"beam:resources:accelerator:v1": []byte("type:jeans;count1;"),
301+
"beam:resources:cpu_count:v1": []byte("4"),
302+
"beam:resources:max_active_bundles_per_worker:v1": []byte("2"),
303303
}
304304
if !reflect.DeepEqual(got, want) {
305305
t.Errorf("hs.Payloads() = %v, want %v", got, want)

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ public class ResourceHints {
4949
private static final String MIN_RAM_URN = "beam:resources:min_ram_bytes:v1";
5050
private static final String ACCELERATOR_URN = "beam:resources:accelerator:v1";
5151
private static final String CPU_COUNT_URN = "beam:resources:cpu_count:v1";
52-
private static final String MAX_ACTIVE_DOFN_PER_WORKER =
53-
"beam:resources:max_active_dofn_per_worker:v1";
52+
private static final String MAX_ACTIVE_BUNDLES_PER_WORKER =
53+
"beam:resources:max_active_bundles_per_worker:v1";
5454

5555
// TODO: reference this from a common location in all packages that use this.
5656
private static String getUrn(ProtocolMessageEnum value) {
@@ -62,8 +62,8 @@ private static String getUrn(ProtocolMessageEnum value) {
6262
checkState(ACCELERATOR_URN.equals(getUrn(StandardResourceHints.Enum.ACCELERATOR)));
6363
checkState(CPU_COUNT_URN.equals(getUrn(StandardResourceHints.Enum.CPU_COUNT)));
6464
checkState(
65-
MAX_ACTIVE_DOFN_PER_WORKER.equals(
66-
(getUrn(StandardResourceHints.Enum.MAX_ACTIVE_DOFN_PER_WORKER))));
65+
MAX_ACTIVE_BUNDLES_PER_WORKER.equals(
66+
(getUrn(StandardResourceHints.Enum.MAX_ACTIVE_BUNDLES_PER_WORKER))));
6767
}
6868

6969
private static ImmutableMap<String, String> hintNameToUrn =
@@ -73,17 +73,17 @@ private static String getUrn(ProtocolMessageEnum value) {
7373
.put("accelerator", ACCELERATOR_URN)
7474
.put("cpuCount", CPU_COUNT_URN)
7575
.put("cpu_count", CPU_COUNT_URN) // Courtesy alias.
76-
.put("max_active_dofn_per_worker", MAX_ACTIVE_DOFN_PER_WORKER)
77-
.put("maxActiveDoFnPerWorker", MAX_ACTIVE_DOFN_PER_WORKER) // Courtesy alias.
78-
.put("max_active_dofns_per_worker", MAX_ACTIVE_DOFN_PER_WORKER) // Courtesy alias.
76+
.put("max_active_bundles_per_worker", MAX_ACTIVE_BUNDLES_PER_WORKER)
77+
.put("maxActiveBundlesPerWorker", MAX_ACTIVE_BUNDLES_PER_WORKER) // Courtesy alias.
78+
.put("max_active_bundle_per_worker", MAX_ACTIVE_BUNDLES_PER_WORKER) // Courtesy alias.
7979
.build();
8080

8181
private static ImmutableMap<String, Function<String, ResourceHint>> parsers =
8282
ImmutableMap.<String, Function<String, ResourceHint>>builder()
8383
.put(MIN_RAM_URN, s -> new BytesHint(BytesHint.parse(s)))
8484
.put(ACCELERATOR_URN, s -> new StringHint(s))
8585
.put(CPU_COUNT_URN, s -> new IntHint(IntHint.parse(s)))
86-
.put(MAX_ACTIVE_DOFN_PER_WORKER, s -> new IntHint(IntHint.parse(s)))
86+
.put(MAX_ACTIVE_BUNDLES_PER_WORKER, s -> new IntHint(IntHint.parse(s)))
8787
.build();
8888

8989
private static final ResourceHints EMPTY = new ResourceHints(ImmutableMap.of());
@@ -339,15 +339,15 @@ public ResourceHints withCPUCount(int cpuCount) {
339339
return withHint(CPU_COUNT_URN, new IntHint(cpuCount));
340340
}
341341

342-
public ResourceHints withMaxActiveDoFnPerWorker(int maxActiveDoFnPerWorker) {
343-
if (maxActiveDoFnPerWorker <= 0) {
342+
public ResourceHints withMaxActiveBundlesPerWorker(int maxActiveBundlesPerWorker) {
343+
if (maxActiveBundlesPerWorker <= 0) {
344344
LOG.error(
345-
"Encountered invalid non-positive max active dofn per worker hint value {}.\n"
345+
"Encountered invalid non-positive max active bundles per worker hint value {}.\n"
346346
+ "The value is ignored.",
347-
maxActiveDoFnPerWorker);
347+
maxActiveBundlesPerWorker);
348348
return this;
349349
}
350-
return withHint(MAX_ACTIVE_DOFN_PER_WORKER, new IntHint(maxActiveDoFnPerWorker));
350+
return withHint(MAX_ACTIVE_BUNDLES_PER_WORKER, new IntHint(maxActiveBundlesPerWorker));
351351
}
352352

353353
public Map<String, ResourceHint> hints() {
@@ -370,7 +370,7 @@ public ResourceHints mergeWithOuter(ResourceHints outer) {
370370
.get(key)
371371
.mergeWithOuter(
372372
outerHint.getValue(),
373-
/*is_inverse*/ outerHint.getKey().equals(MAX_ACTIVE_DOFN_PER_WORKER)));
373+
/*is_inverse*/ outerHint.getKey().equals(MAX_ACTIVE_BUNDLES_PER_WORKER)));
374374
} else {
375375
newHints.put(outerHint);
376376
}

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHintsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@ public void testFromOptions() {
9595
"--resourceHints=min_ram=1KB",
9696
"--resourceHints=accelerator=foo",
9797
"--resourceHints=cpu_count=4",
98-
"--resourceHints=max_active_dofn_per_worker=2")
98+
"--resourceHints=max_active_bundles_per_worker=2")
9999
.as(ResourceHintsOptions.class);
100100
ResourceHints fromOptions = ResourceHints.fromOptions(options);
101101
ResourceHints expect =
102102
ResourceHints.create()
103103
.withMinRam(1000)
104104
.withAccelerator("foo")
105105
.withCPUCount(4)
106-
.withMaxActiveDoFnPerWorker(2);
106+
.withMaxActiveBundlesPerWorker(2);
107107
assertEquals(fromOptions, expect);
108108
}
109109
}

0 commit comments

Comments
 (0)