Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Map ItemBatcher/ItemSelector #294

Merged
merged 11 commits into from
Jan 30, 2025
Merged

Conversation

agrare
Copy link
Member

@agrare agrare commented Nov 5, 2024

The ItemBatcher provides a way to group large array inputs into larger batches for better performance

You can provide a MaxItemsPerBatch or MaxItemsPerBatchPath which limits the number of items in a batch, as well as a MaxInputBytesPerBatch and MaxInputBytesPerBatchPath which limits the size of the input in bytes.

The output will be a set of {"Items": []} payloads, one for each batch.

Batching Items

A Map State MAY have an "ItemBatcher" field, whose value MUST be a JSON object and is called the ItemBatcher Configuration. The ItemBatcher Configuration causes the interpreter to batch selected items into sub-arrays before passing them to each invocation. The interpreter will limit each sub-array to the maximum number of items specified by the "MaxItemsPerBatch" field, and to the maximum size in bytes specified by the "MaxInputBytesPerBatch" field.

The ItemBatcher Configuration MAY have a "BatchInput" field, whose value MUST be a Payload Template. An ItemBatcher Configuration MAY have a "MaxItemsPerBatch" field, whose value MUST be a positive integer. An ItemBatcher Configuration MAY have a "MaxInputBytesPerBatch" field, whose value MUST be a positive integer.

The default of "ItemBatcher" is the selected item. Put another way, the interpreter will not batch items if no "ItemBatcher" field is provided.

Both the "MaxItemsPerBatch" and "MaxInputBytesPerBatch" can be provided indirectly. A Map State may have "MaxItemsPerBatchPath" and "MaxInputBytesPerBatchPath" fields which MUST be Reference Paths which, when resolved, MUST select fields whose values are positive integers. A Map State MUST NOT include both "MaxItemsPerBatch" and "MaxItemsPerBatchPath" or both "MaxInputBytesPerBatch" and "MaxInputBytesPerBatchPath".

An ItemBatcher Configuration MUST contain at least one of "MaxItemsPerBatch", "MaxItemsPerBatchPath", "MaxInputBytesPerBatch", or "MaxInputBytesPerBatchPath".

https://docs.aws.amazon.com/step-functions/latest/dg/input-output-itembatcher.html
#241

@agrare agrare requested a review from Fryguy as a code owner November 5, 2024 20:06
@agrare agrare force-pushed the map_state_item_batcher branch 4 times, most recently from 43adee0 to a521ed3 Compare November 6, 2024 15:51
@agrare agrare changed the title [WIP] Add Map ItemBatcher Add Map ItemBatcher Nov 6, 2024
@agrare agrare force-pushed the map_state_item_batcher branch from 4ab7b13 to 494553e Compare November 6, 2024 18:19
@agrare agrare changed the title Add Map ItemBatcher Add Map ItemBatcher/ItemSelector Nov 8, 2024
@agrare agrare mentioned this pull request Nov 8, 2024
7 tasks
@agrare agrare added the enhancement New feature or request label Nov 8, 2024
@agrare agrare force-pushed the map_state_item_batcher branch from 209a1da to 2cd4ab2 Compare November 12, 2024 14:35
Comment on lines +28 to +30
input.each_slice(max_items(context, state_input)).map do |batch|
output.merge("Items" => batch)
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future: would be nice to not resolve all input at the start.

end

def validate!
if [max_items_per_batch, max_items_per_batch_path].all?(&:nil?)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optimization (read: feel free to ignore)
Count these and compare to 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean like [max_items_per_batch, max_items_per_batch_path].compact.count.zero?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea

unless [max_items_per_batch, max_items_per_batch_path].compact.count == 1
  parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\"")
end

Comment on lines 52 to 53
invalid_field_error!("MaxItemsPerBatch", max_items_per_batch, "must be a positive integer") if max_items_per_batch && max_items_per_batch <= 0
invalid_field_error!("MaxInputBytesPerBatch", max_input_bytes_per_batch, "must be a positive integer") if max_input_bytes_per_batch && max_input_bytes_per_batch <= 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we care, but these will throw a ruby runtime error if these are not an integer. I really want to avoid that. Maybe do a to_i?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a kind_of?(Integer) check, I don't want to "blindly" to_i it because that would hide someone passing in a float which is kind of unexpected.

Copy link
Member

@kbrock kbrock left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. I did not run.

I had put in a bunch of thoughts, but I do want some form of protection around invalid references to avoid ruby runtime errors. to_i is just one suggestion for a fix.

return if max_items_per_batch_path.nil?

result = max_items_per_batch_path.value(context, state_input)
raise runtime_field_error!("MaxItemsPerBatchPath", result, "must be a positive integer") if result <= 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use a to_i

Comment on lines 52 to 53
invalid_field_error!("MaxItemsPerBatch", max_items_per_batch, "must be a positive integer") if max_items_per_batch && max_items_per_batch <= 0
invalid_field_error!("MaxInputBytesPerBatch", max_input_bytes_per_batch, "must be a positive integer") if max_input_bytes_per_batch && max_input_bytes_per_batch <= 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_i for both of these (to avoid the runtime error)

Comment on lines 146 to 163
it "raises an exception" do
expect { subject.value(context, input, state_input) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"0\" must be a positive integer")
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we want a test with invalid values. (are we doing tests for these?)

e.g.:

let(:state_input) { {"batchSize" => "a", "items" => input} }

@agrare
Copy link
Member Author

agrare commented Jan 29, 2025

Added Integer type checks and tests for floats and strings

@agrare agrare force-pushed the map_state_item_batcher branch from 2cd4ab2 to eb89144 Compare January 29, 2025 17:20
@kbrock kbrock merged commit 814c859 into ManageIQ:master Jan 30, 2025
4 of 5 checks passed
@agrare agrare deleted the map_state_item_batcher branch January 30, 2025 00:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants