Skip to content

Commit 3ea385d

Browse files
committed
fix: consolidate duplicate ShardedKey implementations in core module
Remove the deprecated org.apache.beam.sdk.values.ShardedKey and the internal (@VisibleForTesting) org.apache.beam.sdk.coders.ShardedKeyCoder, consolidating into the single org.apache.beam.sdk.util.ShardedKey. - Add of(K, int) factory and getShardNumber() accessor to util.ShardedKey so callers using integer shard numbers migrate without logic changes. The int is stored as a 4-byte big-endian byte array and round-trips through the Coder. - toString() displays integer shard number for 4-byte shard IDs instead of raw byte array, preserving log readability. - Migrate all consumers across core SDK, BigQuery IO, WriteFiles, ShardingFunction, Flink runner, and Spark runner. - Simplify auto-sharded write paths in WriteFiles and BatchLoads that previously required awkward conversion between the two ShardedKey types. - Add tests for int-based shard methods and coder round-trip behavior. - Update CHANGES.md with Breaking Changes entry. Wire format note: the removed ShardedKeyCoder (sdk.coders) had a different wire format from ShardedKey.Coder (sdk.util) — reversed field order and VarInt vs ByteArray shard encoding. ShardedKeyCoder was NOT a Beam standard/model coder and was only used transiently within pipeline constructions (GroupByKey, sharding), never for persistent state or checkpoints. The standard model coder (ShardedKey.Coder) is unchanged.
1 parent 5a65f6e commit 3ea385d

20 files changed

Lines changed: 108 additions & 186 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080

8181
## Breaking Changes
8282

83+
* Removed the previously-deprecated `org.apache.beam.sdk.values.ShardedKey` and the internal `org.apache.beam.sdk.coders.ShardedKeyCoder` (`@VisibleForTesting`). All usages should migrate to `org.apache.beam.sdk.util.ShardedKey`, which now provides `of(K, int)` and `getShardNumber()` convenience methods for integer shard numbers. The wire format of `ShardedKey.Coder` (from `sdk.util`) is unchanged; only the removed `ShardedKeyCoder` (from `sdk.coders`) had a different format, and it was not a Beam standard/model coder (Java) ([#37990](https://github.com/apache/beam/issues/37990)).
8384
* The Python SDK container's `boot.go` now passes pipeline options through a file instead of the `PIPELINE_OPTIONS` environment variable. If a user pairs a new Python SDK container with an older SDK version (which does not support the file-based approach), the pipeline options will not be recognized and the pipeline will fail. Users must ensure their SDK and container versions are synchronized ([#37370](https://github.com/apache/beam/issues/37370)).
8485

8586
## Deprecations

playground/frontend/playground_components/assets/symbols/java.g.yaml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10741,13 +10741,6 @@ ShardedKey:
1074110741
- structuralValue
1074210742
- toString
1074310743
- verifyDeterministic
10744-
ShardedKeyCoder:
10745-
methods:
10746-
- decode
10747-
- encode
10748-
- getCoderArguments
10749-
- of
10750-
- verifyDeterministic
1075110744
ShardingWritableByteChannel:
1075210745
methods:
1075310746
- addChannel

runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.beam.runners.flink.adapter.FlinkKey;
3131
import org.apache.beam.sdk.Pipeline;
3232
import org.apache.beam.sdk.coders.Coder;
33-
import org.apache.beam.sdk.coders.ShardedKeyCoder;
3433
import org.apache.beam.sdk.coders.VarIntCoder;
3534
import org.apache.beam.sdk.io.FileBasedSink;
3635
import org.apache.beam.sdk.io.ShardingFunction;
@@ -49,9 +48,9 @@
4948
import org.apache.beam.sdk.util.construction.UnconsumedReads;
5049
import org.apache.beam.sdk.util.construction.WriteFilesTranslation;
5150
import org.apache.beam.sdk.values.PCollection;
51+
import org.apache.beam.sdk.util.ShardedKey;
5252
import org.apache.beam.sdk.values.PCollectionView;
5353
import org.apache.beam.sdk.values.PValue;
54-
import org.apache.beam.sdk.values.ShardedKey;
5554
import org.apache.beam.sdk.values.TupleTag;
5655
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
5756
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
@@ -320,7 +319,7 @@ static class FlinkAutoBalancedShardKeyShardingFunction<UserT, DestinationT>
320319
private final int parallelism;
321320
private final int maxParallelism;
322321
private final Coder<DestinationT> destinationCoder;
323-
private final ShardedKeyCoder<Integer> shardedKeyCoder = ShardedKeyCoder.of(VarIntCoder.of());
322+
private final ShardedKey.Coder<Integer> shardedKeyCoder = ShardedKey.Coder.of(VarIntCoder.of());
324323
private transient Cache<Integer, Map<Integer, ShardedKey<Integer>>> cache;
325324

326325
private int shardNumber = -1;

runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@
4343
import org.apache.beam.sdk.transforms.GroupByKey;
4444
import org.apache.beam.sdk.transforms.ParDo;
4545
import org.apache.beam.sdk.util.SerializableUtils;
46+
import org.apache.beam.sdk.util.ShardedKey;
4647
import org.apache.beam.sdk.values.KV;
4748
import org.apache.beam.sdk.values.PCollection;
48-
import org.apache.beam.sdk.values.ShardedKey;
4949
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
5050
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
5151
import org.apache.flink.runtime.jobgraph.JobGraph;

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkSessionFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import org.apache.beam.sdk.coders.RowCoder;
5858
import org.apache.beam.sdk.coders.SerializableCoder;
5959
import org.apache.beam.sdk.coders.SetCoder;
60-
import org.apache.beam.sdk.coders.ShardedKeyCoder;
6160
import org.apache.beam.sdk.coders.SnappyCoder;
6261
import org.apache.beam.sdk.coders.SortedMapCoder;
6362
import org.apache.beam.sdk.coders.StringDelegateCoder;
@@ -74,6 +73,7 @@
7473
import org.apache.beam.sdk.transforms.join.UnionCoder;
7574
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
7675
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
76+
import org.apache.beam.sdk.util.ShardedKey;
7777
import org.apache.beam.sdk.util.construction.resources.PipelineResources;
7878
import org.apache.beam.sdk.values.PCollectionViews;
7979
import org.apache.beam.sdk.values.TupleTag;
@@ -255,7 +255,7 @@ public void registerClasses(Kryo kryo) {
255255
kryo.register(RowCoder.class);
256256
kryo.register(SerializableCoder.class);
257257
kryo.register(SetCoder.class);
258-
kryo.register(ShardedKeyCoder.class);
258+
kryo.register(ShardedKey.Coder.class);
259259
kryo.register(SnappyCoder.class);
260260
kryo.register(SortedMapCoder.class);
261261
kryo.register(StringDelegateCoder.class);

sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java

Lines changed: 0 additions & 63 deletions
This file was deleted.

sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardingFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.beam.sdk.io;
1919

2020
import java.io.Serializable;
21-
import org.apache.beam.sdk.values.ShardedKey;
21+
import org.apache.beam.sdk.util.ShardedKey;
2222

2323
/** Function for assigning {@link ShardedKey}s to input elements for sharded {@link WriteFiles}. */
2424
public interface ShardingFunction<UserT, DestinationT> extends Serializable {

sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.apache.beam.sdk.coders.IterableCoder;
3838
import org.apache.beam.sdk.coders.KvCoder;
3939
import org.apache.beam.sdk.coders.ListCoder;
40-
import org.apache.beam.sdk.coders.ShardedKeyCoder;
4140
import org.apache.beam.sdk.coders.StringUtf8Coder;
4241
import org.apache.beam.sdk.coders.VarIntCoder;
4342
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
@@ -76,6 +75,7 @@
7675
import org.apache.beam.sdk.transforms.windowing.Window;
7776
import org.apache.beam.sdk.util.CoderUtils;
7877
import org.apache.beam.sdk.util.MoreFutures;
78+
import org.apache.beam.sdk.util.ShardedKey;
7979
import org.apache.beam.sdk.values.KV;
8080
import org.apache.beam.sdk.values.PCollection;
8181
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -85,7 +85,6 @@
8585
import org.apache.beam.sdk.values.PCollectionViews;
8686
import org.apache.beam.sdk.values.PDone;
8787
import org.apache.beam.sdk.values.PValue;
88-
import org.apache.beam.sdk.values.ShardedKey;
8988
import org.apache.beam.sdk.values.TupleTag;
9089
import org.apache.beam.sdk.values.TupleTagList;
9190
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
@@ -627,7 +626,7 @@ public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
627626
PCollectionTuple spilledWriteTuple =
628627
writeTuple
629628
.get(unwrittenRecordsTag)
630-
.setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
629+
.setCoder(KvCoder.of(ShardedKey.Coder.of(VarIntCoder.of()), input.getCoder()))
631630
// Here we group by a synthetic shard number in the range [0, spill factor),
632631
// just for the sake of getting some parallelism within each destination when
633632
// writing the spilled records, whereas the non-spilled records don't have a shard
@@ -928,7 +927,7 @@ public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
928927
PCollectionTuple writtenFiles =
929928
shardedFiles
930929
.get(shardedRecords)
931-
.setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
930+
.setCoder(KvCoder.of(ShardedKey.Coder.of(VarIntCoder.of()), input.getCoder()))
932931
.apply("GroupIntoShards", GroupByKey.create())
933932
.apply(
934933
"WriteShardsIntoTempFiles",
@@ -984,7 +983,7 @@ public PCollection<List<FileResult<DestinationT>>> expand(PCollection<UserT> inp
984983
.withOutputTags(shardTag, TupleTagList.of(BAD_RECORD_TAG)));
985984
addErrorCollection(shardedElements);
986985

987-
PCollection<KV<org.apache.beam.sdk.util.ShardedKey<Integer>, Iterable<UserT>>> shardedInput =
986+
PCollection<KV<ShardedKey<Integer>, Iterable<UserT>>> shardedInput =
988987
shardedElements
989988
.get(shardTag)
990989
.setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
@@ -996,8 +995,7 @@ public PCollection<List<FileResult<DestinationT>>> expand(PCollection<UserT> inp
996995
.withShardedKey())
997996
.setCoder(
998997
KvCoder.of(
999-
org.apache.beam.sdk.util.ShardedKey.Coder.of(VarIntCoder.of()),
1000-
IterableCoder.of(input.getCoder())));
998+
ShardedKey.Coder.of(VarIntCoder.of()), IterableCoder.of(input.getCoder())));
1001999

10021000
TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecords");
10031001
// Write grouped elements to temp files.
@@ -1007,12 +1005,11 @@ public PCollection<List<FileResult<DestinationT>>> expand(PCollection<UserT> inp
10071005
"AddDummyShard",
10081006
MapElements.via(
10091007
new SimpleFunction<
1010-
KV<org.apache.beam.sdk.util.ShardedKey<Integer>, Iterable<UserT>>,
1008+
KV<ShardedKey<Integer>, Iterable<UserT>>,
10111009
KV<ShardedKey<Integer>, Iterable<UserT>>>() {
10121010
@Override
10131011
public KV<ShardedKey<Integer>, Iterable<UserT>> apply(
1014-
KV<org.apache.beam.sdk.util.ShardedKey<Integer>, Iterable<UserT>>
1015-
input) {
1012+
KV<ShardedKey<Integer>, Iterable<UserT>> input) {
10161013
// Add dummy shard since it is required by WriteShardsIntoTempFilesFn. It
10171014
// will be dropped after we generate the temp files.
10181015
return KV.of(
@@ -1022,7 +1019,7 @@ public KV<ShardedKey<Integer>, Iterable<UserT>> apply(
10221019
}))
10231020
.setCoder(
10241021
KvCoder.of(
1025-
ShardedKeyCoder.of(VarIntCoder.of()), IterableCoder.of(input.getCoder())))
1022+
ShardedKey.Coder.of(VarIntCoder.of()), IterableCoder.of(input.getCoder())))
10261023
.apply(
10271024
"WriteShardsIntoTempFiles",
10281025
ParDo.of(new WriteShardsIntoTempFilesFn(input.getCoder()))

sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedKey.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,26 @@
2222
import java.io.IOException;
2323
import java.io.InputStream;
2424
import java.io.OutputStream;
25+
import java.io.Serializable;
26+
import java.nio.ByteBuffer;
2527
import java.util.Arrays;
2628
import java.util.Collections;
2729
import java.util.List;
2830
import org.apache.beam.sdk.coders.ByteArrayCoder;
2931
import org.apache.beam.sdk.coders.StructuredCoder;
3032
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
3133

32-
/** A sharded key consisting of a user key and an opaque shard id represented by bytes. */
34+
/**
35+
* A sharded key consisting of a user key and a shard identifier.
36+
*
37+
* <p>The shard identifier is stored as an opaque byte array. Convenience methods are provided for
38+
* creating sharded keys with integer shard numbers, which are encoded as 4-byte big-endian arrays.
39+
*/
3340
@SuppressWarnings({
3441
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
3542
})
36-
public class ShardedKey<K> {
43+
public class ShardedKey<K> implements Serializable {
44+
private static final long serialVersionUID = 1L;
3745

3846
private final K key;
3947
private final byte[] shardId;
@@ -53,12 +61,42 @@ public static <K> ShardedKey<K> of(K key, byte[] shardId) {
5361
return new ShardedKey<K>(key, shardId);
5462
}
5563

64+
/**
65+
* Creates a ShardedKey with given key and integer shard number. The shard number is stored as a
66+
* 4-byte big-endian byte array.
67+
*/
68+
public static <K> ShardedKey<K> of(K key, int shardNumber) {
69+
checkArgument(key != null, "Key should not be null!");
70+
byte[] shardId = ByteBuffer.allocate(Integer.BYTES).putInt(shardNumber).array();
71+
return new ShardedKey<K>(key, shardId);
72+
}
73+
5674
public K getKey() {
5775
return key;
5876
}
5977

78+
/**
79+
* Returns the integer shard number. This method should only be called on ShardedKeys that were
80+
* created with {@link #of(Object, int)}, or whose shard id is a 4-byte big-endian encoded
81+
* integer.
82+
*
83+
* @throws IllegalArgumentException if the shard id is not 4 bytes
84+
*/
85+
public int getShardNumber() {
86+
checkArgument(
87+
shardId.length == Integer.BYTES,
88+
"ShardedKey was not created with an integer shard number (shard id has %s bytes,"
89+
+ " expected %s)",
90+
shardId.length,
91+
Integer.BYTES);
92+
return ByteBuffer.wrap(shardId).getInt();
93+
}
94+
6095
@Override
6196
public String toString() {
97+
if (shardId.length == Integer.BYTES) {
98+
return "ShardedKey{key=" + key + ", shard=" + ByteBuffer.wrap(shardId).getInt() + "}";
99+
}
62100
return "ShardedKey{key=" + key + ", shardId=" + Arrays.toString(shardId) + "}";
63101
}
64102

sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java

Lines changed: 0 additions & 71 deletions
This file was deleted.

0 commit comments

Comments
 (0)