From 3fe90697627a436952757d232c5f0e024b5d9119 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Tue, 5 Nov 2024 15:05:39 -0500 Subject: [PATCH 01/11] Add Map ItemBatcher --- lib/floe.rb | 1 + lib/floe/workflow/item_batcher.rb | 36 ++++++++++ lib/floe/workflow/states/map.rb | 2 +- spec/workflow/item_batcher_spec.rb | 108 +++++++++++++++++++++++++++++ 4 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 lib/floe/workflow/item_batcher.rb create mode 100644 spec/workflow/item_batcher_spec.rb diff --git a/lib/floe.rb b/lib/floe.rb index 6cfe559e..d3f3d1a9 100644 --- a/lib/floe.rb +++ b/lib/floe.rb @@ -20,6 +20,7 @@ require_relative "floe/workflow/choice_rule/and" require_relative "floe/workflow/choice_rule/data" require_relative "floe/workflow/context" +require_relative "floe/workflow/item_batcher" require_relative "floe/workflow/item_processor" require_relative "floe/workflow/intrinsic_function" require_relative "floe/workflow/intrinsic_function/parser" diff --git a/lib/floe/workflow/item_batcher.rb b/lib/floe/workflow/item_batcher.rb new file mode 100644 index 00000000..00d69292 --- /dev/null +++ b/lib/floe/workflow/item_batcher.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +module Floe + class Workflow + class ItemBatcher + include ValidationMixin + + attr_reader :name, :batch_input, :max_items_per_batch, :max_items_per_batch_path, :max_input_bytes_per_batch, :max_input_bytes_per_batch_path + + def initialize(payload, name) + @name = name + + @batch_input = PayloadTemplate.new(payload["BatchInput"]) if payload["BatchInput"] + @max_items_per_batch = payload["MaxItemsPerBatch"] + @max_input_bytes_per_batch = payload["MaxInputBytesPerBatch"] + + @max_items_per_batch_path = ReferencePath.new(payload["MaxItemsPerBatchPath"]) if payload["MaxItemsPerBatchPath"] + @max_input_bytes_per_batch_path = ReferencePath.new(payload["MaxInputBytesPerBatchPath"]) if payload["MaxInputBytesPerBatchPath"] + + if [max_items_per_batch, max_input_bytes_per_batch, max_items_per_batch_path, max_input_bytes_per_batch_path].all?(&:nil?) + parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\", \"MaxInputBytesPerBatch\", \"MaxInputBytesPerBatchPath\"") + end + + parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path + parser_error!("must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") if max_input_bytes_per_batch && max_input_bytes_per_batch_path + end + + def value(context, input) + max_items = max_items_per_batch || max_items_per_batch_path&.value(context, input) + max_input_bytes = max_input_bytes_per_batch || max_input_bytes_per_batch_path&.value(context, input) + + input.each_slice(max_items).to_a + end + end + end +end diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index e1af12cd..54e676a4 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -32,7 +32,7 @@ def initialize(workflow, name, payload) @items_path = ReferencePath.new(payload.fetch("ItemsPath", "$")) @item_reader = payload["ItemReader"] @item_selector = payload["ItemSelector"] - @item_batcher = payload["ItemBatcher"] + @item_batcher = ItemBatcher.new(payload["ItemBatcher"], name + ["ItemBatcher"]) if payload["ItemBatcher"] @result_writer = payload["ResultWriter"] @max_concurrency = payload["MaxConcurrency"]&.to_i @tolerated_failure_percentage = payload["ToleratedFailurePercentage"]&.to_i diff --git a/spec/workflow/item_batcher_spec.rb b/spec/workflow/item_batcher_spec.rb new file mode 100644 index 00000000..49a526ea --- /dev/null +++ b/spec/workflow/item_batcher_spec.rb @@ -0,0 +1,108 @@ +RSpec.describe Floe::Workflow::ItemBatcher do + let(:subject) { described_class.new(payload, ["Map", "ItemBatcher"]) } + + describe "#initialize" do + context "with no MaxItems or MaxInputBytes" do + let(:payload) { {} } + + it "raises an exception" do + expect { subject } + .to raise_error( + Floe::InvalidWorkflowError, + "Map.ItemBatcher must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\", \"MaxInputBytesPerBatch\", \"MaxInputBytesPerBatchPath\"" + ) + end + end + + context "with a BatchInput field" do + let(:payload) { {"BatchInput" => "foo", "MaxItemsPerBatch" => 10} } + + it "returns an ItemBatcher" do + expect(subject).to be_kind_of(described_class) + end + + it "sets the BatchInput to a PayloadTemplate" do + expect(subject.batch_input).to be_kind_of(Floe::Workflow::PayloadTemplate) + end + end + + context "with MaxItemsPerBatch" do + let(:payload) { {"MaxItemsPerBatch" => 10} } + + it "returns an ItemBatcher" do + expect(subject).to be_kind_of(described_class) + end + + it "sets max_items_per_batch" do + expect(subject.max_items_per_batch).to eq(payload["MaxItemsPerBatch"]) + end + end + + context "with MaxInputBytesPerBatch" do + let(:payload) { {"MaxInputBytesPerBatch" => 1_024} } + + it "returns an ItemBatcher" do + expect(subject).to be_kind_of(described_class) + end + + it "sets max_input_bytes_per_batch" do + expect(subject.max_input_bytes_per_batch).to eq(payload["MaxInputBytesPerBatch"]) + end + end + + context "with MaxItemsPerBatchPath" do + let(:payload) { {"MaxItemsPerBatchPath" => "$.maxBatchItems"} } + + it "returns an ItemBatcher" do + expect(subject).to be_kind_of(described_class) + end + + it "sets max_items_per_batch_path" do + expect(subject.max_items_per_batch_path).to be_kind_of(Floe::Workflow::ReferencePath) + expect(subject.max_items_per_batch_path).to have_attributes(:path => ["maxBatchItems"]) + end + end + + context "with MaxInputBytesPerBatchPath" do + let(:payload) { {"MaxInputBytesPerBatchPath" => "$.batchSize"} } + + it "returns an ItemBatcher" do + expect(subject).to be_kind_of(described_class) + end + + it "sets max_input_bytes_per_batch_path" do + expect(subject.max_input_bytes_per_batch_path).to be_kind_of(Floe::Workflow::ReferencePath) + expect(subject.max_input_bytes_per_batch_path).to have_attributes(:path => ["batchSize"]) + end + end + + context "with MaxItemsPerBatch and MaxItemsPerBatchPath" do + let(:payload) { {"MaxItemsPerBatch" => 10, "MaxItemsPerBatchPath" => "$.maxBatchItems"} } + + it "raises an exception" do + expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") + end + end + + context "with MaxInputBytesPerBatch and MaxInputBytesPerBatchPath" do + let(:payload) { {"MaxInputBytesPerBatch" => 1_024, "MaxInputBytesPerBatchPath" => "$.batchSize"} } + + it "raises an exception" do + expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") + end + end + end + + describe "#value" do + let(:context) { {} } + let(:input) { %w[a b c d e] } + + context "with MaxItemsPerBatch" do + let(:payload) { {"MaxItemsPerBatch" => 2} } + + it "returns in batches of 2" do + expect(subject.value(context, input)).to eq([%w[a b], %w[c d], %w[e]]) + end + end + end +end From fb6e75a5e923f3ad4bfb8702d0c969026eabce0b Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 6 Nov 2024 10:31:00 -0500 Subject: [PATCH 02/11] ItemBatcher should return {"Items": []} --- lib/floe/workflow/item_batcher.rb | 4 +++- lib/floe/workflow/states/map.rb | 4 +++- spec/workflow/item_batcher_spec.rb | 2 +- spec/workflow/states/map_spec.rb | 31 ++++++++++++++++++++++++++++++ 4 files changed, 38 insertions(+), 3 deletions(-) diff --git a/lib/floe/workflow/item_batcher.rb b/lib/floe/workflow/item_batcher.rb index 00d69292..de4cdcd1 100644 --- a/lib/floe/workflow/item_batcher.rb +++ b/lib/floe/workflow/item_batcher.rb @@ -29,7 +29,9 @@ def value(context, input) max_items = max_items_per_batch || max_items_per_batch_path&.value(context, input) max_input_bytes = max_input_bytes_per_batch || max_input_bytes_per_batch_path&.value(context, input) - input.each_slice(max_items).to_a + input.each_slice(max_items).map do |batch| + {"Items" => batch} + end end end end diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index 54e676a4..c716883c 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -43,7 +43,9 @@ def initialize(workflow, name, payload) def process_input(context) input = super - items_path.value(context, input) + input = items_path.value(context, input) + input = item_batcher.value(context, input) if item_batcher + input end def start(context) diff --git a/spec/workflow/item_batcher_spec.rb b/spec/workflow/item_batcher_spec.rb index 49a526ea..fef755cf 100644 --- a/spec/workflow/item_batcher_spec.rb +++ b/spec/workflow/item_batcher_spec.rb @@ -101,7 +101,7 @@ let(:payload) { {"MaxItemsPerBatch" => 2} } it "returns in batches of 2" do - expect(subject.value(context, input)).to eq([%w[a b], %w[c d], %w[e]]) + expect(subject.value(context, input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}]) end end end diff --git a/spec/workflow/states/map_spec.rb b/spec/workflow/states/map_spec.rb index 6ae36373..01870a10 100644 --- a/spec/workflow/states/map_spec.rb +++ b/spec/workflow/states/map_spec.rb @@ -168,6 +168,37 @@ expect(ctx.output).to eq(["red", "green", "blue"]) end end + + context "with an ItemBatcher" do + let(:input) { {"foo" => "bar", "colors" => ["red", "green", "blue"]} } + let(:workflow) do + payload = { + "Validate-All" => { + "Type" => "Map", + "ItemsPath" => "$.colors", + "ItemBatcher" => {"MaxItemsPerBatch" => 2}, + "MaxConcurrency" => 1, + "ItemProcessor" => { + "StartAt" => "Validate", + "States" => { + "Validate" => { + "Type" => "Pass", + "InputPath" => "$.Items", + "End" => true + } + } + }, + "End" => true, + } + } + make_workflow(ctx, payload) + end + + it "sets the context output" do + loop while state.run_nonblock!(ctx) != 0 + expect(ctx.output).to eq([["red", "green"], ["blue"]]) + end + end end describe "#finish" do From f903a25a96d5b0ba09a3d77eec34ce664c6854c1 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 6 Nov 2024 10:39:02 -0500 Subject: [PATCH 03/11] Support ItemBatcher Paths --- lib/floe/workflow/item_batcher.rb | 8 +++++--- lib/floe/workflow/states/map.rb | 2 +- spec/workflow/item_batcher_spec.rb | 9 +++++++++ spec/workflow/states/map_spec.rb | 31 ++++++++++++++++++++++++++++++ 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/lib/floe/workflow/item_batcher.rb b/lib/floe/workflow/item_batcher.rb index de4cdcd1..a2f45a42 100644 --- a/lib/floe/workflow/item_batcher.rb +++ b/lib/floe/workflow/item_batcher.rb @@ -25,9 +25,11 @@ def initialize(payload, name) parser_error!("must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") if max_input_bytes_per_batch && max_input_bytes_per_batch_path end - def value(context, input) - max_items = max_items_per_batch || max_items_per_batch_path&.value(context, input) - max_input_bytes = max_input_bytes_per_batch || max_input_bytes_per_batch_path&.value(context, input) + def value(context, input, state_input = nil) + state_input ||= input + + max_items = max_items_per_batch || max_items_per_batch_path&.value(context, state_input) + max_input_bytes = max_input_bytes_per_batch || max_input_bytes_per_batch_path&.value(context, state_input) input.each_slice(max_items).map do |batch| {"Items" => batch} diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index c716883c..bb56992a 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -44,7 +44,7 @@ def initialize(workflow, name, payload) def process_input(context) input = super input = items_path.value(context, input) - input = item_batcher.value(context, input) if item_batcher + input = item_batcher.value(context, input, context.state["Input"]) if item_batcher input end diff --git a/spec/workflow/item_batcher_spec.rb b/spec/workflow/item_batcher_spec.rb index fef755cf..82098d59 100644 --- a/spec/workflow/item_batcher_spec.rb +++ b/spec/workflow/item_batcher_spec.rb @@ -104,5 +104,14 @@ expect(subject.value(context, input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}]) end end + + context "with MaxItemsPerBatchPath" do + let(:payload) { {"MaxItemsPerBatchPath" => "$.batchSize"} } + let(:state_input) { {"batchSize" => 2, "items" => input} } + + it "returns in batches of 2" do + expect(subject.value(context, input, state_input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}]) + end + end end end diff --git a/spec/workflow/states/map_spec.rb b/spec/workflow/states/map_spec.rb index 01870a10..0cd74b95 100644 --- a/spec/workflow/states/map_spec.rb +++ b/spec/workflow/states/map_spec.rb @@ -198,6 +198,37 @@ loop while state.run_nonblock!(ctx) != 0 expect(ctx.output).to eq([["red", "green"], ["blue"]]) end + + context "with MaxItemsPerBatchPath" do + let(:input) { {"maxItems" => 2, "colors" => ["red", "green", "blue"]} } + let(:workflow) do + payload = { + "Validate-All" => { + "Type" => "Map", + "ItemsPath" => "$.colors", + "ItemBatcher" => {"MaxItemsPerBatchPath" => "$.maxItems"}, + "MaxConcurrency" => 1, + "ItemProcessor" => { + "StartAt" => "Validate", + "States" => { + "Validate" => { + "Type" => "Pass", + "InputPath" => "$.Items", + "End" => true + } + } + }, + "End" => true, + } + } + make_workflow(ctx, payload) + end + + it "sets the context output" do + loop while state.run_nonblock!(ctx) != 0 + expect(ctx.output).to eq([["red", "green"], ["blue"]]) + end + end end end From a8c662088c2f58eea38862b9cbf5b6089caf43e5 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 6 Nov 2024 10:45:49 -0500 Subject: [PATCH 04/11] Add BatchInput support --- lib/floe/workflow/item_batcher.rb | 4 +++- spec/workflow/item_batcher_spec.rb | 9 +++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/lib/floe/workflow/item_batcher.rb b/lib/floe/workflow/item_batcher.rb index a2f45a42..56171a50 100644 --- a/lib/floe/workflow/item_batcher.rb +++ b/lib/floe/workflow/item_batcher.rb @@ -31,8 +31,10 @@ def value(context, input, state_input = nil) max_items = max_items_per_batch || max_items_per_batch_path&.value(context, state_input) max_input_bytes = max_input_bytes_per_batch || max_input_bytes_per_batch_path&.value(context, state_input) + output = batch_input ? batch_input.value(context, state_input) : {} + input.each_slice(max_items).map do |batch| - {"Items" => batch} + output.merge("Items" => batch) end end end diff --git a/spec/workflow/item_batcher_spec.rb b/spec/workflow/item_batcher_spec.rb index 82098d59..917c8b45 100644 --- a/spec/workflow/item_batcher_spec.rb +++ b/spec/workflow/item_batcher_spec.rb @@ -113,5 +113,14 @@ expect(subject.value(context, input, state_input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}]) end end + + context "with BatchInput" do + let(:payload) { {"BatchInput" => {"foo.$" => "$.bar"}, "MaxItemsPerBatch" => 2} } + let(:state_input) { {"bar" => "bar", "items" => input} } + + it "merges BatchInput with payloads" do + expect(subject.value(context, input, state_input)).to eq([{"foo" => "bar", "Items" => %w[a b]}, {"foo" => "bar", "Items" => %w[c d]}, {"foo" => "bar", "Items" => %w[e]}]) + end + end end end From 9ac64ee4d0f93462d2746ab2ed71d1fdc2f8a571 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 6 Nov 2024 10:59:17 -0500 Subject: [PATCH 05/11] Check that MaxItemsPerBatch/MaxInputBytesPerBatch are positive --- lib/floe/workflow/item_batcher.rb | 6 ++++-- spec/workflow/item_batcher_spec.rb | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/lib/floe/workflow/item_batcher.rb b/lib/floe/workflow/item_batcher.rb index 56171a50..04bf8fec 100644 --- a/lib/floe/workflow/item_batcher.rb +++ b/lib/floe/workflow/item_batcher.rb @@ -21,8 +21,10 @@ def initialize(payload, name) parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\", \"MaxInputBytesPerBatch\", \"MaxInputBytesPerBatchPath\"") end - parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path - parser_error!("must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") if max_input_bytes_per_batch && max_input_bytes_per_batch_path + parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path + parser_error!("must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") if max_input_bytes_per_batch && max_input_bytes_per_batch_path + invalid_field_error!("MaxItemsPerBatch", max_items_per_batch, "must be a positive integer") if max_items_per_batch && max_items_per_batch <= 0 + invalid_field_error!("MaxInputBytesPerBatch", max_input_bytes_per_batch, "must be a positive integer") if max_input_bytes_per_batch && max_input_bytes_per_batch <= 0 end def value(context, input, state_input = nil) diff --git a/spec/workflow/item_batcher_spec.rb b/spec/workflow/item_batcher_spec.rb index 917c8b45..0c792b36 100644 --- a/spec/workflow/item_batcher_spec.rb +++ b/spec/workflow/item_batcher_spec.rb @@ -36,6 +36,14 @@ it "sets max_items_per_batch" do expect(subject.max_items_per_batch).to eq(payload["MaxItemsPerBatch"]) end + + context "that is an invalid value" do + let(:payload) { {"MaxItemsPerBatch" => 0} } + + it "raises an exception" do + expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"0\" must be a positive integer") + end + end end context "with MaxInputBytesPerBatch" do @@ -48,6 +56,14 @@ it "sets max_input_bytes_per_batch" do expect(subject.max_input_bytes_per_batch).to eq(payload["MaxInputBytesPerBatch"]) end + + context "that is an invalid value" do + let(:payload) { {"MaxInputBytesPerBatch" => 0} } + + it "raises an exception" do + expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"0\" must be a positive integer") + end + end end context "with MaxItemsPerBatchPath" do From 3cbf010d6783094f2b60eb5226e11f751d5299df Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 6 Nov 2024 11:53:14 -0500 Subject: [PATCH 06/11] Raise execution error if path batch is invalid --- lib/floe/workflow/item_batcher.rb | 3 +++ spec/workflow/item_batcher_spec.rb | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/lib/floe/workflow/item_batcher.rb b/lib/floe/workflow/item_batcher.rb index 04bf8fec..889ba496 100644 --- a/lib/floe/workflow/item_batcher.rb +++ b/lib/floe/workflow/item_batcher.rb @@ -33,6 +33,9 @@ def value(context, input, state_input = nil) max_items = max_items_per_batch || max_items_per_batch_path&.value(context, state_input) max_input_bytes = max_input_bytes_per_batch || max_input_bytes_per_batch_path&.value(context, state_input) + raise runtime_field_error!("MaxItemsPerBatchPath", max_items, "must be a positive integer") if max_items && max_items <= 0 + raise runtime_field_error!("MaxInputBytesPerBatchPath", max_input_bytes, "must be a positive integer") if max_input_bytes && max_input_bytes <= 0 + output = batch_input ? batch_input.value(context, state_input) : {} input.each_slice(max_items).map do |batch| diff --git a/spec/workflow/item_batcher_spec.rb b/spec/workflow/item_batcher_spec.rb index 0c792b36..0bde58a9 100644 --- a/spec/workflow/item_batcher_spec.rb +++ b/spec/workflow/item_batcher_spec.rb @@ -128,6 +128,15 @@ it "returns in batches of 2" do expect(subject.value(context, input, state_input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}]) end + + context "with an invalid value in input" do + let(:state_input) { {"batchSize" => 0, "items" => input} } + + it "raises an exception" do + expect { subject.value(context, input, state_input) } + .to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"0\" must be a positive integer") + end + end end context "with BatchInput" do From 2168a7a152a9eec6661b3110f197f703593e69da Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 6 Nov 2024 12:00:50 -0500 Subject: [PATCH 07/11] Extract max_items logic to a method --- lib/floe/workflow/item_batcher.rb | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/lib/floe/workflow/item_batcher.rb b/lib/floe/workflow/item_batcher.rb index 889ba496..c535eb7e 100644 --- a/lib/floe/workflow/item_batcher.rb +++ b/lib/floe/workflow/item_batcher.rb @@ -30,18 +30,30 @@ def initialize(payload, name) def value(context, input, state_input = nil) state_input ||= input - max_items = max_items_per_batch || max_items_per_batch_path&.value(context, state_input) - max_input_bytes = max_input_bytes_per_batch || max_input_bytes_per_batch_path&.value(context, state_input) - - raise runtime_field_error!("MaxItemsPerBatchPath", max_items, "must be a positive integer") if max_items && max_items <= 0 - raise runtime_field_error!("MaxInputBytesPerBatchPath", max_input_bytes, "must be a positive integer") if max_input_bytes && max_input_bytes <= 0 - output = batch_input ? batch_input.value(context, state_input) : {} - input.each_slice(max_items).map do |batch| + input.each_slice(max_items(context, state_input)).map do |batch| output.merge("Items" => batch) end end + + def max_items(context, state_input) + return max_items_per_batch if max_items_per_batch + return if max_items_per_batch_path.nil? + result = max_items_per_batch_path.value(context, state_input) + raise runtime_field_error!("MaxItemsPerBatchPath", result, "must be a positive integer") if result <= 0 + + result + end + + def max_input_bytes(context, state_input) + return max_input_bytes_per_batch if max_input_bytes_per_batch.present? + return if max_input_bytes_per_batch_path.nil? + result = max_input_bytes_per_batch_path.value(context, state_input) + raise runtime_field_error!("MaxInputBytesPerBatchPath", result, "must be a positive integer") if result <= 0 + + result + end end end end From 289ac33a90a734d5f24ee1ce26561403f0fe7e21 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 6 Nov 2024 12:02:00 -0500 Subject: [PATCH 08/11] Move ItemBatcher validations to method --- lib/floe/workflow/item_batcher.rb | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/lib/floe/workflow/item_batcher.rb b/lib/floe/workflow/item_batcher.rb index c535eb7e..812ddb4c 100644 --- a/lib/floe/workflow/item_batcher.rb +++ b/lib/floe/workflow/item_batcher.rb @@ -17,14 +17,7 @@ def initialize(payload, name) @max_items_per_batch_path = ReferencePath.new(payload["MaxItemsPerBatchPath"]) if payload["MaxItemsPerBatchPath"] @max_input_bytes_per_batch_path = ReferencePath.new(payload["MaxInputBytesPerBatchPath"]) if payload["MaxInputBytesPerBatchPath"] - if [max_items_per_batch, max_input_bytes_per_batch, max_items_per_batch_path, max_input_bytes_per_batch_path].all?(&:nil?) - parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\", \"MaxInputBytesPerBatch\", \"MaxInputBytesPerBatchPath\"") - end - - parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path - parser_error!("must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") if max_input_bytes_per_batch && max_input_bytes_per_batch_path - invalid_field_error!("MaxItemsPerBatch", max_items_per_batch, "must be a positive integer") if max_items_per_batch && max_items_per_batch <= 0 - invalid_field_error!("MaxInputBytesPerBatch", max_input_bytes_per_batch, "must be a positive integer") if max_input_bytes_per_batch && max_input_bytes_per_batch <= 0 + validate! end def value(context, input, state_input = nil) @@ -37,6 +30,8 @@ def value(context, input, state_input = nil) end end + private + def max_items(context, state_input) return max_items_per_batch if max_items_per_batch return if max_items_per_batch_path.nil? @@ -54,6 +49,17 @@ def max_input_bytes(context, state_input) result end + + def validate! + if [max_items_per_batch, max_input_bytes_per_batch, max_items_per_batch_path, max_input_bytes_per_batch_path].all?(&:nil?) + parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\", \"MaxInputBytesPerBatch\", \"MaxInputBytesPerBatchPath\"") + end + + parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path + parser_error!("must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") if max_input_bytes_per_batch && max_input_bytes_per_batch_path + invalid_field_error!("MaxItemsPerBatch", max_items_per_batch, "must be a positive integer") if max_items_per_batch && max_items_per_batch <= 0 + invalid_field_error!("MaxInputBytesPerBatch", max_input_bytes_per_batch, "must be a positive integer") if max_input_bytes_per_batch && max_input_bytes_per_batch <= 0 + end end end end From 4fea6cc5637302d4438a35a61fba6403db0fb9cd Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 6 Nov 2024 13:11:09 -0500 Subject: [PATCH 09/11] Only implement MaxItemsPerBatch --- lib/floe/workflow/item_batcher.rb | 14 ++-------- spec/workflow/item_batcher_spec.rb | 45 +++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/lib/floe/workflow/item_batcher.rb b/lib/floe/workflow/item_batcher.rb index 812ddb4c..0b6b36bf 100644 --- a/lib/floe/workflow/item_batcher.rb +++ b/lib/floe/workflow/item_batcher.rb @@ -35,24 +35,16 @@ def value(context, input, state_input = nil) def max_items(context, state_input) return max_items_per_batch if max_items_per_batch return if max_items_per_batch_path.nil? + result = max_items_per_batch_path.value(context, state_input) raise runtime_field_error!("MaxItemsPerBatchPath", result, "must be a positive integer") if result <= 0 result end - def max_input_bytes(context, state_input) - return max_input_bytes_per_batch if max_input_bytes_per_batch.present? - return if max_input_bytes_per_batch_path.nil? - result = max_input_bytes_per_batch_path.value(context, state_input) - raise runtime_field_error!("MaxInputBytesPerBatchPath", result, "must be a positive integer") if result <= 0 - - result - end - def validate! - if [max_items_per_batch, max_input_bytes_per_batch, max_items_per_batch_path, max_input_bytes_per_batch_path].all?(&:nil?) - parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\", \"MaxInputBytesPerBatch\", \"MaxInputBytesPerBatchPath\"") + if [max_items_per_batch, max_items_per_batch_path].all?(&:nil?) + parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\"") end parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path diff --git a/spec/workflow/item_batcher_spec.rb b/spec/workflow/item_batcher_spec.rb index 0bde58a9..0b58dbd6 100644 --- a/spec/workflow/item_batcher_spec.rb +++ b/spec/workflow/item_batcher_spec.rb @@ -9,7 +9,7 @@ expect { subject } .to raise_error( Floe::InvalidWorkflowError, - "Map.ItemBatcher must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\", \"MaxInputBytesPerBatch\", \"MaxInputBytesPerBatchPath\"" + "Map.ItemBatcher must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\"" ) end end @@ -50,10 +50,13 @@ let(:payload) { {"MaxInputBytesPerBatch" => 1_024} } it "returns an ItemBatcher" do + pending "implement MaxInputBytesPerBatch" expect(subject).to be_kind_of(described_class) end it "sets max_input_bytes_per_batch" do + pending "implement MaxInputBytesPerBatch" + expect(subject.max_input_bytes_per_batch).to eq(payload["MaxInputBytesPerBatch"]) end @@ -61,6 +64,8 @@ let(:payload) { {"MaxInputBytesPerBatch" => 0} } it "raises an exception" do + pending "implement MaxInputBytesPerBatch" + expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"0\" must be a positive integer") end end @@ -83,10 +88,14 @@ let(:payload) { {"MaxInputBytesPerBatchPath" => "$.batchSize"} } it "returns an ItemBatcher" do + pending "implement MaxInputBytesPerBatchPath" + expect(subject).to be_kind_of(described_class) end it "sets max_input_bytes_per_batch_path" do + pending "implement MaxInputBytesPerBatchPath" + expect(subject.max_input_bytes_per_batch_path).to be_kind_of(Floe::Workflow::ReferencePath) expect(subject.max_input_bytes_per_batch_path).to have_attributes(:path => ["batchSize"]) end @@ -104,6 +113,8 @@ let(:payload) { {"MaxInputBytesPerBatch" => 1_024, "MaxInputBytesPerBatchPath" => "$.batchSize"} } it "raises an exception" do + pending "implement MaxInputBytesPerBatchPath" + expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") end end @@ -139,6 +150,38 @@ end end + context "with MaxInputBytesPerBatch" do + let(:payload) { {"MaxInputBytesPerBatch" => 1_024} } + + it "returns in batches of 2" do + pending "support max bytes per batch" + + expect(subject.value(context, input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}]) + end + end + + context "with MaxInputBytesPerBatchPath" do + let(:payload) { {"MaxInputBytesPerBatchPath" => "$.bytesPerBatch"} } + let(:state_input) { {"bytesPerBatch" => 1_024, "items" => input} } + + it "returns in batches of 2" do + pending "support max bytes per batch" + + expect(subject.value(context, input, state_input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}]) + end + + context "with an invalid value in input" do + let(:state_input) { {"bytesPerBatch" => 0, "items" => input} } + + it "raises an exception" do + pending "support max bytes per batch" + + expect { subject.value(context, input, state_input) } + .to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxInputBytesPerBatchPath\" value \"0\" must be a positive integer") + end + end + end + context "with BatchInput" do let(:payload) { {"BatchInput" => {"foo.$" => "$.bar"}, "MaxItemsPerBatch" => 2} } let(:state_input) { {"bar" => "bar", "items" => input} } From 596436d6a6029ecca6de404ec7b586db266c3c84 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Fri, 8 Nov 2024 13:39:33 -0500 Subject: [PATCH 10/11] Add ItemSelector support --- lib/floe/workflow/states/map.rb | 17 ++++++++++++++-- spec/workflow/states/map_spec.rb | 34 ++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index bb56992a..23a8a7e9 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -31,7 +31,7 @@ def initialize(workflow, name, payload) @item_processor = ItemProcessor.new(payload["ItemProcessor"], name) @items_path = ReferencePath.new(payload.fetch("ItemsPath", "$")) @item_reader = payload["ItemReader"] - @item_selector = payload["ItemSelector"] + @item_selector = PayloadTemplate.new(payload["ItemSelector"]) if payload["ItemSelector"] @item_batcher = ItemBatcher.new(payload["ItemBatcher"], name + ["ItemBatcher"]) if payload["ItemBatcher"] @result_writer = payload["ResultWriter"] @max_concurrency = payload["MaxConcurrency"]&.to_i @@ -53,7 +53,20 @@ def start(context) input = process_input(context) - context.state["ItemProcessorContext"] = input.map { |item| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => item.to_json).to_h } + context.state["ItemProcessorContext"] = input.map.with_index do |item, index| + item_processor_context = { + "Execution" => { + "Id" => context.execution["Id"] + }, + "Map" => { + "Item" => {"Index" => index, "Value" => item} + } + } + + item_processor_input = item_selector ? item_selector.value(item_processor_context, context.state["Input"]) : item + + Context.new(item_processor_context, :input => item_processor_input.to_json).to_h + end end def end? diff --git a/spec/workflow/states/map_spec.rb b/spec/workflow/states/map_spec.rb index 0cd74b95..76f44ead 100644 --- a/spec/workflow/states/map_spec.rb +++ b/spec/workflow/states/map_spec.rb @@ -169,6 +169,40 @@ end end + context "with an ItemSelector" do + let(:input) { {"delivery-partner" => "ACME", "colors" => ["red", "green", "blue"]} } + let(:workflow) do + payload = { + "Validate-All" => { + "Type" => "Map", + "ItemsPath" => "$.colors", + "ItemSelector" => { + "index.$" => "$$.Map.Item.Index", + "value.$" => "$$.Map.Item.Value", + "courier.$" => "$.delivery-partner" + }, + "MaxConcurrency" => 1, + "ItemProcessor" => { + "StartAt" => "Validate", + "States" => { + "Validate" => { + "Type" => "Pass", + "End" => true + } + } + }, + "End" => true, + } + } + make_workflow(ctx, payload) + end + + it "sets the context output" do + loop while state.run_nonblock!(ctx) != 0 + expect(ctx.output).to eq([{"index" => 0, "value" => "red", "courier" => "ACME"}, {"index" => 1, "value" => "green", "courier" => "ACME"}, {"index" => 2, "value" => "blue", "courier" => "ACME"}]) + end + end + context "with an ItemBatcher" do let(:input) { {"foo" => "bar", "colors" => ["red", "green", "blue"]} } let(:workflow) do From eb89144b50f136fd16b9022d3e202e254fa0c7e6 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 29 Jan 2025 11:48:07 -0500 Subject: [PATCH 11/11] Add Integer type checking to ItemBatcher --- lib/floe/workflow/item_batcher.rb | 15 +++++++++----- spec/workflow/item_batcher_spec.rb | 32 +++++++++++++++++++++--------- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/lib/floe/workflow/item_batcher.rb b/lib/floe/workflow/item_batcher.rb index 0b6b36bf..b6b5b6f9 100644 --- a/lib/floe/workflow/item_batcher.rb +++ b/lib/floe/workflow/item_batcher.rb @@ -37,7 +37,7 @@ def max_items(context, state_input) return if max_items_per_batch_path.nil? result = max_items_per_batch_path.value(context, state_input) - raise runtime_field_error!("MaxItemsPerBatchPath", result, "must be a positive integer") if result <= 0 + raise runtime_field_error!("MaxItemsPerBatchPath", result, "must be a positive integer") if result.nil? || !result.kind_of?(Integer) || result <= 0 result end @@ -47,10 +47,15 @@ def validate! parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\"") end - parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path - parser_error!("must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") if max_input_bytes_per_batch && max_input_bytes_per_batch_path - invalid_field_error!("MaxItemsPerBatch", max_items_per_batch, "must be a positive integer") if max_items_per_batch && max_items_per_batch <= 0 - invalid_field_error!("MaxInputBytesPerBatch", max_input_bytes_per_batch, "must be a positive integer") if max_input_bytes_per_batch && max_input_bytes_per_batch <= 0 + parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path + parser_error!("must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") if max_input_bytes_per_batch && max_input_bytes_per_batch_path + + if max_items_per_batch && (!max_items_per_batch.kind_of?(Integer) || max_items_per_batch <= 0) + invalid_field_error!("MaxItemsPerBatch", max_items_per_batch, "must be a positive integer") + end + if max_input_bytes_per_batch && (!max_input_bytes_per_batch.kind_of?(Integer) || max_input_bytes_per_batch <= 0) + invalid_field_error!("MaxInputBytesPerBatch", max_input_bytes_per_batch, "must be a positive integer") + end end end end diff --git a/spec/workflow/item_batcher_spec.rb b/spec/workflow/item_batcher_spec.rb index 0b58dbd6..2dcc856d 100644 --- a/spec/workflow/item_batcher_spec.rb +++ b/spec/workflow/item_batcher_spec.rb @@ -38,10 +38,15 @@ end context "that is an invalid value" do - let(:payload) { {"MaxItemsPerBatch" => 0} } - it "raises an exception" do - expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"0\" must be a positive integer") + expect { described_class.new({"MaxItemsPerBatch" => 0}, ["Map", "ItemBatcher"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"0\" must be a positive integer") + expect { described_class.new({"MaxItemsPerBatch" => -1}, ["Map", "ItemBatcher"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"-1\" must be a positive integer") + expect { described_class.new({"MaxItemsPerBatch" => 2.5}, ["Map", "ItemBatcher"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"2.5\" must be a positive integer") + expect { described_class.new({"MaxItemsPerBatch" => "1"}, ["Map", "ItemBatcher"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxItemsPerBatch\" value \"1\" must be a positive integer") end end end @@ -61,12 +66,17 @@ end context "that is an invalid value" do - let(:payload) { {"MaxInputBytesPerBatch" => 0} } - it "raises an exception" do pending "implement MaxInputBytesPerBatch" - expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"0\" must be a positive integer") + expect { described_class.new({"MaxInputBytesPerBatch" => 0}, ["Map", "ItemBatcher"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"0\" must be a positive integer") + expect { described_class.new({"MaxInputBytesPerBatch" => -1}, ["Map", "ItemBatcher"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"-1\" must be a positive integer") + expect { described_class.new({"MaxInputBytesPerBatch" => 2.5}, ["Map", "ItemBatcher"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"2.5\" must be a positive integer") + expect { described_class.new({"MaxInputBytesPerBatch" => "1"}, ["Map", "ItemBatcher"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"1\" must be a positive integer") end end end @@ -141,11 +151,15 @@ end context "with an invalid value in input" do - let(:state_input) { {"batchSize" => 0, "items" => input} } - it "raises an exception" do - expect { subject.value(context, input, state_input) } + expect { subject.value(context, input, {"batchSize" => 0, "items" => input}) } .to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"0\" must be a positive integer") + expect { subject.value(context, input, {"batchSize" => -1, "items" => input}) } + .to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"-1\" must be a positive integer") + expect { subject.value(context, input, {"batchSize" => 2.5, "items" => input}) } + .to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"2.5\" must be a positive integer") + expect { subject.value(context, input, {"batchSize" => "1", "items" => input}) } + .to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxItemsPerBatchPath\" value \"1\" must be a positive integer") end end end