Skip to content

Commit 4a73a9b

Browse files
fix: prevent state table collision by including system/user prefix
1 parent dad0287 commit 4a73a9b

File tree

4 files changed

+67
-6
lines changed

4 files changed

+67
-6
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,22 @@ public interface StateTag<StateT extends State> extends Serializable {
5555
/** An identifier for the state cell that this tag references. */
5656
String getId();
5757

58+
/**
59+
* Returns the full state identifier including the system/user prefix.
60+
*
61+
* <p>This is used to distinguish between system-defined and user-defined state tags and prevent
62+
* collisions in state tables when tags have the same raw ID but different prefixes.
63+
*/
64+
default String getIdWithPrefix() {
65+
StringBuilder sb = new StringBuilder();
66+
try {
67+
appendTo(sb);
68+
} catch (IOException e) {
69+
throw new RuntimeException("Failed to get prefixed ID", e);
70+
}
71+
return sb.toString();
72+
}
73+
5874
/** The specification for the state stored in the referenced cell. */
5975
StateSpec<StateT> getSpec();
6076

runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ public class StateTags {
5050
new Equivalence<StateTag>() {
5151
@Override
5252
protected boolean doEquivalent(StateTag a, StateTag b) {
53-
return a.getId().equals(b.getId());
53+
return a.getIdWithPrefix().equals(b.getIdWithPrefix());
5454
}
5555

5656
@Override
5757
protected int doHash(StateTag stateTag) {
58-
return stateTag.getId().hashCode();
58+
return stateTag.getIdWithPrefix().hashCode();
5959
}
6060
};
6161

runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,4 +192,52 @@ public void testCombiningValueWithContextEquality() {
192192
StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1),
193193
StateTags.convertToBagTagInternal((StateTag) barCoder1Max));
194194
}
195+
196+
@Test
197+
public void testSystemAndUserTagsWithSameIdDoNotCollide() {
198+
StateTag<?> userTag = StateTags.value("collision", StringUtf8Coder.of());
199+
StateTag<?> systemTag =
200+
StateTags.makeSystemTagInternal(StateTags.value("collision", StringUtf8Coder.of()));
201+
202+
// Same raw ID, different prefixed IDs.
203+
assertEquals(userTag.getId(), systemTag.getId());
204+
assertNotEquals(userTag.getIdWithPrefix(), systemTag.getIdWithPrefix());
205+
206+
// Tags are equal by ID, but state tables use prefixed IDs to prevent collision.
207+
assertEquals(userTag, systemTag);
208+
}
209+
210+
@Test
211+
public void testIdWithPrefixForUserTag() {
212+
StateTag<?> userTag = StateTags.value("test", StringUtf8Coder.of());
213+
String prefixedId = userTag.getIdWithPrefix();
214+
215+
assertEquals('u', prefixedId.charAt(0));
216+
assertEquals("utest", prefixedId);
217+
}
218+
219+
@Test
220+
public void testIdWithPrefixForSystemTag() {
221+
StateTag<?> systemTag =
222+
StateTags.makeSystemTagInternal(StateTags.value("test", StringUtf8Coder.of()));
223+
String prefixedId = systemTag.getIdWithPrefix();
224+
225+
assertEquals('s', prefixedId.charAt(0));
226+
assertEquals("stest", prefixedId);
227+
}
228+
229+
@Test
230+
public void testIdEquivalenceWithPrefix() {
231+
StateTag<?> userTag1 = StateTags.value("collision", StringUtf8Coder.of());
232+
StateTag<?> userTag2 = StateTags.value("collision", StringUtf8Coder.of());
233+
StateTag<?> systemTag =
234+
StateTags.makeSystemTagInternal(StateTags.value("collision", StringUtf8Coder.of()));
235+
236+
// Same ID and prefix.
237+
assertEquals(StateTags.ID_EQUIVALENCE.wrap(userTag1), StateTags.ID_EQUIVALENCE.wrap(userTag2));
238+
239+
// Same ID, different prefix.
240+
assertNotEquals(
241+
StateTags.ID_EQUIVALENCE.wrap(userTag1), StateTags.ID_EQUIVALENCE.wrap(systemTag));
242+
}
195243
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,7 @@ abstract static class StateTableKey {
274274
public abstract String getId();
275275

276276
public static StateTableKey create(StateNamespace namespace, StateTag<?> stateTag) {
277-
// TODO(https://github.com/apache/beam/issues/36753): stateTag.getId() returns only the
278-
// string tag without system/user prefix. This could cause a collision between system and
279-
// user tag with the same id. Consider adding the prefix to state table key.
280-
return new AutoValue_CachingStateTable_StateTableKey(namespace, stateTag.getId());
277+
return new AutoValue_CachingStateTable_StateTableKey(namespace, stateTag.getIdWithPrefix());
281278
}
282279
}
283280

0 commit comments

Comments
 (0)