Skip to content

Commit

Permalink
Merge pull request #294 from agrare/map_state_item_batcher
Browse files Browse the repository at this point in the history
Add Map ItemBatcher/ItemSelector
  • Loading branch information
kbrock authored Jan 30, 2025
2 parents dbe4b7d + eb89144 commit 814c859
Show file tree
Hide file tree
Showing 5 changed files with 386 additions and 4 deletions.
1 change: 1 addition & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
require_relative "floe/workflow/choice_rule/and"
require_relative "floe/workflow/choice_rule/data"
require_relative "floe/workflow/context"
require_relative "floe/workflow/item_batcher"
require_relative "floe/workflow/item_processor"
require_relative "floe/workflow/intrinsic_function"
require_relative "floe/workflow/intrinsic_function/parser"
Expand Down
62 changes: 62 additions & 0 deletions lib/floe/workflow/item_batcher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

module Floe
class Workflow
class ItemBatcher
include ValidationMixin

attr_reader :name, :batch_input, :max_items_per_batch, :max_items_per_batch_path, :max_input_bytes_per_batch, :max_input_bytes_per_batch_path

def initialize(payload, name)
@name = name

@batch_input = PayloadTemplate.new(payload["BatchInput"]) if payload["BatchInput"]
@max_items_per_batch = payload["MaxItemsPerBatch"]
@max_input_bytes_per_batch = payload["MaxInputBytesPerBatch"]

@max_items_per_batch_path = ReferencePath.new(payload["MaxItemsPerBatchPath"]) if payload["MaxItemsPerBatchPath"]
@max_input_bytes_per_batch_path = ReferencePath.new(payload["MaxInputBytesPerBatchPath"]) if payload["MaxInputBytesPerBatchPath"]

validate!
end

def value(context, input, state_input = nil)
state_input ||= input

output = batch_input ? batch_input.value(context, state_input) : {}

input.each_slice(max_items(context, state_input)).map do |batch|
output.merge("Items" => batch)
end
end

private

def max_items(context, state_input)
return max_items_per_batch if max_items_per_batch
return if max_items_per_batch_path.nil?

result = max_items_per_batch_path.value(context, state_input)
raise runtime_field_error!("MaxItemsPerBatchPath", result, "must be a positive integer") if result.nil? || !result.kind_of?(Integer) || result <= 0

result
end

def validate!
if [max_items_per_batch, max_items_per_batch_path].all?(&:nil?)
parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\"")
end

parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path
parser_error!("must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") if max_input_bytes_per_batch && max_input_bytes_per_batch_path

if max_items_per_batch && (!max_items_per_batch.kind_of?(Integer) || max_items_per_batch <= 0)
invalid_field_error!("MaxItemsPerBatch", max_items_per_batch, "must be a positive integer")
end
if max_input_bytes_per_batch && (!max_input_bytes_per_batch.kind_of?(Integer) || max_input_bytes_per_batch <= 0)
invalid_field_error!("MaxInputBytesPerBatch", max_input_bytes_per_batch, "must be a positive integer")
end
end
end
end
end
23 changes: 19 additions & 4 deletions lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def initialize(workflow, name, payload)
@item_processor = ItemProcessor.new(payload["ItemProcessor"], name)
@items_path = ReferencePath.new(payload.fetch("ItemsPath", "$"))
@item_reader = payload["ItemReader"]
@item_selector = payload["ItemSelector"]
@item_batcher = payload["ItemBatcher"]
@item_selector = PayloadTemplate.new(payload["ItemSelector"]) if payload["ItemSelector"]
@item_batcher = ItemBatcher.new(payload["ItemBatcher"], name + ["ItemBatcher"]) if payload["ItemBatcher"]
@result_writer = payload["ResultWriter"]
@max_concurrency = payload["MaxConcurrency"]&.to_i
@tolerated_failure_percentage = payload["ToleratedFailurePercentage"]&.to_i
Expand All @@ -43,15 +43,30 @@ def initialize(workflow, name, payload)

def process_input(context)
input = super
items_path.value(context, input)
input = items_path.value(context, input)
input = item_batcher.value(context, input, context.state["Input"]) if item_batcher
input
end

def start(context)
super

input = process_input(context)

context.state["ItemProcessorContext"] = input.map { |item| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => item.to_json).to_h }
context.state["ItemProcessorContext"] = input.map.with_index do |item, index|
item_processor_context = {
"Execution" => {
"Id" => context.execution["Id"]
},
"Map" => {
"Item" => {"Index" => index, "Value" => item}
}
}

item_processor_input = item_selector ? item_selector.value(item_processor_context, context.state["Input"]) : item

Context.new(item_processor_context, :input => item_processor_input.to_json).to_h
end
end

def end?
Expand Down
208 changes: 208 additions & 0 deletions spec/workflow/item_batcher_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
RSpec.describe Floe::Workflow::ItemBatcher do
let(:subject) { described_class.new(payload, ["Map", "ItemBatcher"]) }

describe "#initialize" do
context "with no MaxItems or MaxInputBytes" do
let(:payload) { {} }

it "raises an exception" do
expect { subject }
.to raise_error(
Floe::InvalidWorkflowError,
"Map.ItemBatcher must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\""
)
end
end

context "with a BatchInput field" do
let(:payload) { {"BatchInput" => "foo", "MaxItemsPerBatch" => 10} }

it "returns an ItemBatcher" do
expect(subject).to be_kind_of(described_class)
end

it "sets the BatchInput to a PayloadTemplate" do
expect(subject.batch_input).to be_kind_of(Floe::Workflow::PayloadTemplate)
end
end

context "with MaxItemsPerBatch" do
let(:payload) { {"MaxItemsPerBatch" => 10} }

it "returns an ItemBatcher" do
expect(subject).to be_kind_of(described_class)
end

it "sets max_items_per_batch" do
expect(subject.max_items_per_batch).to eq(payload["MaxItemsPerBatch"])
end

context "that is an invalid value" do
it "raises an exception" do
expect { described_class.new({"MaxItemsPerBatch" => 0}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"0\" must be a positive integer")
expect { described_class.new({"MaxItemsPerBatch" => -1}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"-1\" must be a positive integer")
expect { described_class.new({"MaxItemsPerBatch" => 2.5}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"2.5\" must be a positive integer")
expect { described_class.new({"MaxItemsPerBatch" => "1"}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"1\" must be a positive integer")
end
end
end

context "with MaxInputBytesPerBatch" do
let(:payload) { {"MaxInputBytesPerBatch" => 1_024} }

it "returns an ItemBatcher" do
pending "implement MaxInputBytesPerBatch"
expect(subject).to be_kind_of(described_class)
end

it "sets max_input_bytes_per_batch" do
pending "implement MaxInputBytesPerBatch"

expect(subject.max_input_bytes_per_batch).to eq(payload["MaxInputBytesPerBatch"])
end

context "that is an invalid value" do
it "raises an exception" do
pending "implement MaxInputBytesPerBatch"

expect { described_class.new({"MaxInputBytesPerBatch" => 0}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"0\" must be a positive integer")
expect { described_class.new({"MaxInputBytesPerBatch" => -1}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"-1\" must be a positive integer")
expect { described_class.new({"MaxInputBytesPerBatch" => 2.5}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"2.5\" must be a positive integer")
expect { described_class.new({"MaxInputBytesPerBatch" => "1"}, ["Map", "ItemBatcher"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"1\" must be a positive integer")
end
end
end

context "with MaxItemsPerBatchPath" do
let(:payload) { {"MaxItemsPerBatchPath" => "$.maxBatchItems"} }

it "returns an ItemBatcher" do
expect(subject).to be_kind_of(described_class)
end

it "sets max_items_per_batch_path" do
expect(subject.max_items_per_batch_path).to be_kind_of(Floe::Workflow::ReferencePath)
expect(subject.max_items_per_batch_path).to have_attributes(:path => ["maxBatchItems"])
end
end

context "with MaxInputBytesPerBatchPath" do
let(:payload) { {"MaxInputBytesPerBatchPath" => "$.batchSize"} }

it "returns an ItemBatcher" do
pending "implement MaxInputBytesPerBatchPath"

expect(subject).to be_kind_of(described_class)
end

it "sets max_input_bytes_per_batch_path" do
pending "implement MaxInputBytesPerBatchPath"

expect(subject.max_input_bytes_per_batch_path).to be_kind_of(Floe::Workflow::ReferencePath)
expect(subject.max_input_bytes_per_batch_path).to have_attributes(:path => ["batchSize"])
end
end

context "with MaxItemsPerBatch and MaxItemsPerBatchPath" do
let(:payload) { {"MaxItemsPerBatch" => 10, "MaxItemsPerBatchPath" => "$.maxBatchItems"} }

it "raises an exception" do
expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"")
end
end

context "with MaxInputBytesPerBatch and MaxInputBytesPerBatchPath" do
let(:payload) { {"MaxInputBytesPerBatch" => 1_024, "MaxInputBytesPerBatchPath" => "$.batchSize"} }

it "raises an exception" do
pending "implement MaxInputBytesPerBatchPath"

expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"")
end
end
end

describe "#value" do
let(:context) { {} }
let(:input) { %w[a b c d e] }

context "with MaxItemsPerBatch" do
let(:payload) { {"MaxItemsPerBatch" => 2} }

it "returns in batches of 2" do
expect(subject.value(context, input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}])
end
end

context "with MaxItemsPerBatchPath" do
let(:payload) { {"MaxItemsPerBatchPath" => "$.batchSize"} }
let(:state_input) { {"batchSize" => 2, "items" => input} }

it "returns in batches of 2" do
expect(subject.value(context, input, state_input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}])
end

context "with an invalid value in input" do
it "raises an exception" do
expect { subject.value(context, input, {"batchSize" => 0, "items" => input}) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"0\" must be a positive integer")
expect { subject.value(context, input, {"batchSize" => -1, "items" => input}) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"-1\" must be a positive integer")
expect { subject.value(context, input, {"batchSize" => 2.5, "items" => input}) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"2.5\" must be a positive integer")
expect { subject.value(context, input, {"batchSize" => "1", "items" => input}) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"1\" must be a positive integer")
end
end
end

context "with MaxInputBytesPerBatch" do
let(:payload) { {"MaxInputBytesPerBatch" => 1_024} }

it "returns in batches of 2" do
pending "support max bytes per batch"

expect(subject.value(context, input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}])
end
end

context "with MaxInputBytesPerBatchPath" do
let(:payload) { {"MaxInputBytesPerBatchPath" => "$.bytesPerBatch"} }
let(:state_input) { {"bytesPerBatch" => 1_024, "items" => input} }

it "returns in batches of 2" do
pending "support max bytes per batch"

expect(subject.value(context, input, state_input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}])
end

context "with an invalid value in input" do
let(:state_input) { {"bytesPerBatch" => 0, "items" => input} }

it "raises an exception" do
pending "support max bytes per batch"

expect { subject.value(context, input, state_input) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxInputBytesPerBatchPath\" value \"0\" must be a positive integer")
end
end
end

context "with BatchInput" do
let(:payload) { {"BatchInput" => {"foo.$" => "$.bar"}, "MaxItemsPerBatch" => 2} }
let(:state_input) { {"bar" => "bar", "items" => input} }

it "merges BatchInput with payloads" do
expect(subject.value(context, input, state_input)).to eq([{"foo" => "bar", "Items" => %w[a b]}, {"foo" => "bar", "Items" => %w[c d]}, {"foo" => "bar", "Items" => %w[e]}])
end
end
end
end
Loading

0 comments on commit 814c859

Please sign in to comment.