Skip to content

Commit 8840489

Browse files
committed
Make the connector placement table ParentTokens column nullable. (#34812)
Use `./gradlew :sdks:java:io:google-cloud-platform:spotlessApply` to format the affected files. The ParentToken column is used for single-split based change stream partitions. If multi-split based change stream is supported, then the column will be null.
1 parent 63193fe commit 8840489

File tree

6 files changed

+11
-32
lines changed

6 files changed

+11
-32
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void createPartitionMetadataTable() {
128128
+ COLUMN_PARTITION_TOKEN
129129
+ "\" text NOT NULL,\""
130130
+ COLUMN_PARENT_TOKENS
131-
+ "\" text[] NOT NULL,\""
131+
+ "\" text[],\""
132132
+ COLUMN_START_TIMESTAMP
133133
+ "\" timestamptz NOT NULL,\""
134134
+ COLUMN_END_TIMESTAMP
@@ -184,7 +184,7 @@ public void createPartitionMetadataTable() {
184184
+ COLUMN_PARTITION_TOKEN
185185
+ " STRING(MAX) NOT NULL,"
186186
+ COLUMN_PARENT_TOKENS
187-
+ " ARRAY<STRING(MAX)> NOT NULL,"
187+
+ " ARRAY<STRING(MAX)>,"
188188
+ COLUMN_START_TIMESTAMP
189189
+ " TIMESTAMP NOT NULL,"
190190
+ COLUMN_END_TIMESTAMP

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ private void createFakeParentPartition() {
8686
PartitionMetadata parentPartition =
8787
PartitionMetadata.newBuilder()
8888
.setPartitionToken(InitialPartition.PARTITION_TOKEN)
89-
.setParentTokens(InitialPartition.PARENT_TOKENS)
9089
.setStartTimestamp(startTimestamp)
9190
.setEndTimestamp(endTimestamp)
9291
.setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@ public class PartitionMetadataMapper {
8080
public PartitionMetadata from(Struct row) {
8181
return PartitionMetadata.newBuilder()
8282
.setPartitionToken(row.getString(COLUMN_PARTITION_TOKEN))
83-
.setParentTokens(Sets.newHashSet(row.getStringList(COLUMN_PARENT_TOKENS)))
83+
.setParentTokens(
84+
!row.isNull(COLUMN_PARENT_TOKENS)
85+
? Sets.newHashSet(row.getStringList(COLUMN_PARENT_TOKENS))
86+
: null)
8487
.setStartTimestamp(row.getTimestamp(COLUMN_START_TIMESTAMP))
8588
.setEndTimestamp(row.getTimestamp(COLUMN_END_TIMESTAMP))
8689
.setHeartbeatMillis(row.getLong(COLUMN_HEARTBEAT_MILLIS))

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/InitialPartition.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.spanner.changestreams.model;
1919

20-
import java.util.HashSet;
21-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
22-
2320
/**
2421
* Utility class to determine initial partition constants and methods.
2522
*
@@ -33,8 +30,6 @@ public class InitialPartition {
3330
* recognised by Cloud Spanner.
3431
*/
3532
public static final String PARTITION_TOKEN = "Parent0";
36-
/** The empty set representing the initial partition parent tokens. */
37-
public static final HashSet<String> PARENT_TOKENS = Sets.newHashSet();
3833

3934
/**
4035
* Verifies if the given partition token is the initial partition.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/PartitionMetadata.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public enum State {
6161
}
6262

6363
private String partitionToken;
64-
private HashSet<String> parentTokens;
64+
@Nullable @org.apache.avro.reflect.Nullable private HashSet<String> parentTokens;
6565

6666
@AvroEncode(using = TimestampEncoding.class)
6767
private Timestamp startTimestamp;
@@ -98,7 +98,7 @@ private PartitionMetadata() {}
9898

9999
public PartitionMetadata(
100100
String partitionToken,
101-
HashSet<String> parentTokens,
101+
@Nullable HashSet<String> parentTokens,
102102
Timestamp startTimestamp,
103103
Timestamp endTimestamp,
104104
long heartbeatMillis,
@@ -130,7 +130,7 @@ public String getPartitionToken() {
130130
* The unique partition identifiers of the parent partitions where this child partition originated
131131
* from.
132132
*/
133-
public HashSet<String> getParentTokens() {
133+
public @Nullable HashSet<String> getParentTokens() {
134134
return parentTokens;
135135
}
136136

@@ -269,7 +269,7 @@ public static Builder newBuilder() {
269269
public static class Builder {
270270

271271
private String partitionToken;
272-
private HashSet<String> parentTokens;
272+
@Nullable private HashSet<String> parentTokens;
273273
private Timestamp startTimestamp;
274274
private Timestamp endTimestamp;
275275
private Long heartbeatMillis;
@@ -303,7 +303,7 @@ public Builder setPartitionToken(String partitionToken) {
303303
}
304304

305305
/** Sets the collection of parent partition identifiers. */
306-
public Builder setParentTokens(HashSet<String> parentTokens) {
306+
public Builder setParentTokens(@Nullable HashSet<String> parentTokens) {
307307
this.parentTokens = parentTokens;
308308
return this;
309309
}
@@ -376,7 +376,6 @@ public Builder setFinishedAt(@Nullable Timestamp finishedAt) {
376376
*/
377377
public PartitionMetadata build() {
378378
Preconditions.checkState(partitionToken != null, "partitionToken");
379-
Preconditions.checkState(parentTokens != null, "parentTokens");
380379
Preconditions.checkState(startTimestamp != null, "startTimestamp");
381380
Preconditions.checkState(heartbeatMillis != null, "heartbeatMillis");
382381
Preconditions.checkState(state != null, "state");

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/PartitionMetadataTest.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -124,23 +124,6 @@ public void testBuilderThrowsExceptionWhenPartitionTokenMissing() {
124124
.build());
125125
}
126126

127-
@Test
128-
public void testBuilderThrowsExceptionWhenParentTokenMissing() {
129-
assertThrows(
130-
"parentToken",
131-
IllegalStateException.class,
132-
() ->
133-
PartitionMetadata.newBuilder()
134-
.setPartitionToken(PARTITION_TOKEN)
135-
.setStartTimestamp(START_TIMESTAMP)
136-
.setEndTimestamp(END_TIMESTAMP)
137-
.setHeartbeatMillis(10)
138-
.setState(State.CREATED)
139-
.setWatermark(WATERMARK)
140-
.setCreatedAt(CREATED_AT)
141-
.build());
142-
}
143-
144127
@Test
145128
public void testBuilderThrowsExceptionWhenStartTimestampMissing() {
146129
assertThrows(

0 commit comments

Comments
 (0)