Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Map ItemBatcher/ItemSelector #294

Merged
merged 11 commits into from
Jan 30, 2025
1 change: 1 addition & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
require_relative "floe/workflow/choice_rule/and"
require_relative "floe/workflow/choice_rule/data"
require_relative "floe/workflow/context"
require_relative "floe/workflow/item_batcher"
require_relative "floe/workflow/item_processor"
require_relative "floe/workflow/intrinsic_function"
require_relative "floe/workflow/intrinsic_function/parser"
Expand Down
62 changes: 62 additions & 0 deletions lib/floe/workflow/item_batcher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

module Floe
class Workflow
class ItemBatcher
include ValidationMixin

attr_reader :name, :batch_input, :max_items_per_batch, :max_items_per_batch_path, :max_input_bytes_per_batch, :max_input_bytes_per_batch_path

def initialize(payload, name)
@name = name

@batch_input = PayloadTemplate.new(payload["BatchInput"]) if payload["BatchInput"]
@max_items_per_batch = payload["MaxItemsPerBatch"]
@max_input_bytes_per_batch = payload["MaxInputBytesPerBatch"]

@max_items_per_batch_path = ReferencePath.new(payload["MaxItemsPerBatchPath"]) if payload["MaxItemsPerBatchPath"]
@max_input_bytes_per_batch_path = ReferencePath.new(payload["MaxInputBytesPerBatchPath"]) if payload["MaxInputBytesPerBatchPath"]

validate!
end

def value(context, input, state_input = nil)
state_input ||= input

output = batch_input ? batch_input.value(context, state_input) : {}

input.each_slice(max_items(context, state_input)).map do |batch|
output.merge("Items" => batch)
end
Comment on lines +28 to +30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future: would be nice to not resolve all input at the start.

end

private

def max_items(context, state_input)
return max_items_per_batch if max_items_per_batch
return if max_items_per_batch_path.nil?

result = max_items_per_batch_path.value(context, state_input)
raise runtime_field_error!("MaxItemsPerBatchPath", result, "must be a positive integer") if result.nil? || !result.kind_of?(Integer) || result <= 0

result
end

def validate!
if [max_items_per_batch, max_items_per_batch_path].all?(&:nil?)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optimization (read: feel free to ignore)
Count these and compare to 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean like [max_items_per_batch, max_items_per_batch_path].compact.count.zero?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea

unless [max_items_per_batch, max_items_per_batch_path].compact.count == 1
  parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\"")
end

parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\"")
end

parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path
parser_error!("must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") if max_input_bytes_per_batch && max_input_bytes_per_batch_path

if max_items_per_batch && (!max_items_per_batch.kind_of?(Integer) || max_items_per_batch <= 0)
invalid_field_error!("MaxItemsPerBatch", max_items_per_batch, "must be a positive integer")
end
if max_input_bytes_per_batch && (!max_input_bytes_per_batch.kind_of?(Integer) || max_input_bytes_per_batch <= 0)
invalid_field_error!("MaxInputBytesPerBatch", max_input_bytes_per_batch, "must be a positive integer")
end
end
end
end
end
23 changes: 19 additions & 4 deletions lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def initialize(workflow, name, payload)
@item_processor = ItemProcessor.new(payload["ItemProcessor"], name)
@items_path = ReferencePath.new(payload.fetch("ItemsPath", "$"))
@item_reader = payload["ItemReader"]
@item_selector = payload["ItemSelector"]
@item_batcher = payload["ItemBatcher"]
@item_selector = PayloadTemplate.new(payload["ItemSelector"]) if payload["ItemSelector"]
@item_batcher = ItemBatcher.new(payload["ItemBatcher"], name + ["ItemBatcher"]) if payload["ItemBatcher"]
@result_writer = payload["ResultWriter"]
@max_concurrency = payload["MaxConcurrency"]&.to_i
@tolerated_failure_percentage = payload["ToleratedFailurePercentage"]&.to_i
Expand All @@ -43,15 +43,30 @@ def initialize(workflow, name, payload)

def process_input(context)
input = super
items_path.value(context, input)
input = items_path.value(context, input)
input = item_batcher.value(context, input, context.state["Input"]) if item_batcher
input
end

def start(context)
super

input = process_input(context)

context.state["ItemProcessorContext"] = input.map { |item| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => item.to_json).to_h }
context.state["ItemProcessorContext"] = input.map.with_index do |item, index|
item_processor_context = {
"Execution" => {
"Id" => context.execution["Id"]
},
"Map" => {
"Item" => {"Index" => index, "Value" => item}
}
}

item_processor_input = item_selector ? item_selector.value(item_processor_context, context.state["Input"]) : item

Context.new(item_processor_context, :input => item_processor_input.to_json).to_h
end
end

def end?
Expand Down
208 changes: 208 additions & 0 deletions spec/workflow/item_batcher_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
RSpec.describe Floe::Workflow::ItemBatcher do
let(:subject) { described_class.new(payload, ["Map", "ItemBatcher"]) }

describe "#initialize" do
context "with no MaxItems or MaxInputBytes" do
let(:payload) { {} }

it "raises an exception" do
expect { subject }
.to raise_error(
Floe::InvalidWorkflowError,
"Map.ItemBatcher must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\""
)
end
end

context "with a BatchInput field" do
let(:payload) { {"BatchInput" => "foo", "MaxItemsPerBatch" => 10} }

it "returns an ItemBatcher" do
expect(subject).to be_kind_of(described_class)
end

it "sets the BatchInput to a PayloadTemplate" do
expect(subject.batch_input).to be_kind_of(Floe::Workflow::PayloadTemplate)
end
end

context "with MaxItemsPerBatch" do
let(:payload) { {"MaxItemsPerBatch" => 10} }

it "returns an ItemBatcher" do
expect(subject).to be_kind_of(described_class)
end

it "sets max_items_per_batch" do
expect(subject.max_items_per_batch).to eq(payload["MaxItemsPerBatch"])
end

context "that is an invalid value" do
it "raises an exception" do
expect { described_class.new({"MaxItemsPerBatch" => 0}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"0\" must be a positive integer")
expect { described_class.new({"MaxItemsPerBatch" => -1}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"-1\" must be a positive integer")
expect { described_class.new({"MaxItemsPerBatch" => 2.5}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"2.5\" must be a positive integer")
expect { described_class.new({"MaxItemsPerBatch" => "1"}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"1\" must be a positive integer")
end
end
end

context "with MaxInputBytesPerBatch" do
let(:payload) { {"MaxInputBytesPerBatch" => 1_024} }

it "returns an ItemBatcher" do
pending "implement MaxInputBytesPerBatch"
expect(subject).to be_kind_of(described_class)
end

it "sets max_input_bytes_per_batch" do
pending "implement MaxInputBytesPerBatch"

expect(subject.max_input_bytes_per_batch).to eq(payload["MaxInputBytesPerBatch"])
end

context "that is an invalid value" do
it "raises an exception" do
pending "implement MaxInputBytesPerBatch"

expect { described_class.new({"MaxInputBytesPerBatch" => 0}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"0\" must be a positive integer")
expect { described_class.new({"MaxInputBytesPerBatch" => -1}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"-1\" must be a positive integer")
expect { described_class.new({"MaxInputBytesPerBatch" => 2.5}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"2.5\" must be a positive integer")
expect { described_class.new({"MaxInputBytesPerBatch" => "1"}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"1\" must be a positive integer")
end
end
end

context "with MaxItemsPerBatchPath" do
let(:payload) { {"MaxItemsPerBatchPath" => "$.maxBatchItems"} }

it "returns an ItemBatcher" do
expect(subject).to be_kind_of(described_class)
end

it "sets max_items_per_batch_path" do
expect(subject.max_items_per_batch_path).to be_kind_of(Floe::Workflow::ReferencePath)
expect(subject.max_items_per_batch_path).to have_attributes(:path => ["maxBatchItems"])
end
end

context "with MaxInputBytesPerBatchPath" do
let(:payload) { {"MaxInputBytesPerBatchPath" => "$.batchSize"} }

it "returns an ItemBatcher" do
pending "implement MaxInputBytesPerBatchPath"

expect(subject).to be_kind_of(described_class)
end

it "sets max_input_bytes_per_batch_path" do
pending "implement MaxInputBytesPerBatchPath"

expect(subject.max_input_bytes_per_batch_path).to be_kind_of(Floe::Workflow::ReferencePath)
expect(subject.max_input_bytes_per_batch_path).to have_attributes(:path => ["batchSize"])
end
end

context "with MaxItemsPerBatch and MaxItemsPerBatchPath" do
let(:payload) { {"MaxItemsPerBatch" => 10, "MaxItemsPerBatchPath" => "$.maxBatchItems"} }

it "raises an exception" do
expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"")
end
end

context "with MaxInputBytesPerBatch and MaxInputBytesPerBatchPath" do
let(:payload) { {"MaxInputBytesPerBatch" => 1_024, "MaxInputBytesPerBatchPath" => "$.batchSize"} }

it "raises an exception" do
pending "implement MaxInputBytesPerBatchPath"

expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"")
end
end
end

describe "#value" do
let(:context) { {} }
let(:input) { %w[a b c d e] }

context "with MaxItemsPerBatch" do
let(:payload) { {"MaxItemsPerBatch" => 2} }

it "returns in batches of 2" do
expect(subject.value(context, input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}])
end
end

context "with MaxItemsPerBatchPath" do
let(:payload) { {"MaxItemsPerBatchPath" => "$.batchSize"} }
let(:state_input) { {"batchSize" => 2, "items" => input} }

it "returns in batches of 2" do
expect(subject.value(context, input, state_input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}])
end

context "with an invalid value in input" do
it "raises an exception" do
expect { subject.value(context, input, {"batchSize" => 0, "items" => input}) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"0\" must be a positive integer")
expect { subject.value(context, input, {"batchSize" => -1, "items" => input}) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"-1\" must be a positive integer")
expect { subject.value(context, input, {"batchSize" => 2.5, "items" => input}) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"2.5\" must be a positive integer")
expect { subject.value(context, input, {"batchSize" => "1", "items" => input}) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"1\" must be a positive integer")
end
end
end

context "with MaxInputBytesPerBatch" do
let(:payload) { {"MaxInputBytesPerBatch" => 1_024} }

it "returns in batches of 2" do
pending "support max bytes per batch"

expect(subject.value(context, input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}])
end
end

context "with MaxInputBytesPerBatchPath" do
let(:payload) { {"MaxInputBytesPerBatchPath" => "$.bytesPerBatch"} }
let(:state_input) { {"bytesPerBatch" => 1_024, "items" => input} }

it "returns in batches of 2" do
pending "support max bytes per batch"

expect(subject.value(context, input, state_input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}])
end

context "with an invalid value in input" do
let(:state_input) { {"bytesPerBatch" => 0, "items" => input} }

it "raises an exception" do
pending "support max bytes per batch"

expect { subject.value(context, input, state_input) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxInputBytesPerBatchPath\" value \"0\" must be a positive integer")
end
end
end

context "with BatchInput" do
let(:payload) { {"BatchInput" => {"foo.$" => "$.bar"}, "MaxItemsPerBatch" => 2} }
let(:state_input) { {"bar" => "bar", "items" => input} }

it "merges BatchInput with payloads" do
expect(subject.value(context, input, state_input)).to eq([{"foo" => "bar", "Items" => %w[a b]}, {"foo" => "bar", "Items" => %w[c d]}, {"foo" => "bar", "Items" => %w[e]}])
end
end
end
end
Loading