Skip to content

Add OrderedList and Set state #34836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 5, 2025
77 changes: 77 additions & 0 deletions website/www/site/content/en/documentation/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -6544,6 +6544,83 @@ _ = (p | 'Read per user' >> ReadPerUser()
{{< code_sample "sdks/go/examples/snippets/04transforms.go" bag_state >}}
{{< /highlight >}}

#### SetState

A common use case for state is to accumulate unique elements. `SetState` allows for accumulating an unordered set
of elements.

{{< highlight java >}}
PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
@StateId("state") private final StateSpec<SetState<ValueT>> uniqueElements = StateSpecs.bag();

@ProcessElement public void process(
@Element KV<String, ValueT> element,
@StateId("state") SetState<ValueT> state) {
// Add the current element to the set state for this key.
state.add(element.getValue());
if (shouldFetch()) {
// Occasionally we fetch and process the values.
Iterable<ValueT> values = state.read();
processValues(values);
state.clear(); // Clear the state for this key.
}
}
}));
{{< /highlight >}}
{{< highlight py >}}
class SetStateDoFn(DoFn):
UNIQUE_ELEMENTS = SetStateSpec('buffer', coders.VarIntCoder())

def process(self, element_pair, state=DoFn.StateParam(UNIQUE_ELEMENTS)):
state.add(element_pair[1])
if should_fetch():
unique_elements = list(state.read())
process_values(unique_elements)
state.clear()

_ = (p | 'Read per user' >> ReadPerUser()
| 'Set state pardo' >> beam.ParDo(SetStateDoFn()))
{{< /highlight >}}

#### OrderListState

`OrderListState` state that accumulate elements in an ordered List.

{{< highlight java >}}
PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
@StateId("state") private final StateSpec<OrderedListState<ValueT>> uniqueElements = StateSpecs.bag();

@ProcessElement public void process(
@Element KV<String, ValueT> element,
@StateId("state") SetState<ValueT> state) {
// Add the current element to the set state for this key.
state.add(element.getValue());
if (shouldFetch()) {
// Occasionally we fetch and process the values.
Iterable<ValueT> values = state.read();
processValues(values);
state.clear(); // Clear the state for this key.
}
}
}));
{{< /highlight >}}
{{< highlight py >}}
class OrderedListStateDoFn(DoFn):
STATE_ELEMENTS = OrderedListStateSpec('buffer', coders.ListCoder())

def process(self, element_pair, state=DoFn.StateParam(STATE_ELEMENTS)):
state.add(element_pair[1])
if should_fetch():
elements = list(state.read())
process_values(elements)
state.clear()

_ = (p | 'Read per user' >> ReadPerUser()
| 'Set state pardo' >> beam.ParDo(OrderedListStateDoFn()))
{{< /highlight >}}

### 11.2. Deferred state reads {#deferred-state-reads}

When a `DoFn` contains multiple state specifications, reading each one in order can be slow. Calling the `read()` function
Expand Down
Loading