Skip to content

Commit

Permalink
Only implement MaxItemsPerBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Nov 6, 2024
1 parent 4af7a1d commit 4ab7b13
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
6 changes: 3 additions & 3 deletions lib/floe/workflow/item_batcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def max_items(context, state_input)
end

def max_input_bytes(context, state_input)
return max_input_bytes_per_batch if max_input_bytes_per_batch.present?
return max_input_bytes_per_batch if max_input_bytes_per_batch
return if max_input_bytes_per_batch_path.nil?
result = max_input_bytes_per_batch_path.value(context, state_input)
raise runtime_field_error!("MaxInputBytesPerBatchPath", result, "must be a positive integer") if result <= 0
Expand All @@ -51,8 +51,8 @@ def max_input_bytes(context, state_input)
end

def validate!
if [max_items_per_batch, max_input_bytes_per_batch, max_items_per_batch_path, max_input_bytes_per_batch_path].all?(&:nil?)
parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\", \"MaxInputBytesPerBatch\", \"MaxInputBytesPerBatchPath\"")
if [max_items_per_batch, max_items_per_batch_path].all?(&:nil?)
parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\"")
end

parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path
Expand Down
45 changes: 44 additions & 1 deletion spec/workflow/item_batcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
expect { subject }
.to raise_error(
Floe::InvalidWorkflowError,
"Map.ItemBatcher must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\", \"MaxInputBytesPerBatch\", \"MaxInputBytesPerBatchPath\""
"Map.ItemBatcher must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\""
)
end
end
Expand Down Expand Up @@ -50,17 +50,22 @@
let(:payload) { {"MaxInputBytesPerBatch" => 1_024} }

it "returns an ItemBatcher" do
pending "implement MaxInputBytesPerBatch"
expect(subject).to be_kind_of(described_class)
end

it "sets max_input_bytes_per_batch" do
pending "implement MaxInputBytesPerBatch"

expect(subject.max_input_bytes_per_batch).to eq(payload["MaxInputBytesPerBatch"])
end

context "that is an invalid value" do
let(:payload) { {"MaxInputBytesPerBatch" => 0} }

it "raises an exception" do
pending "implement MaxInputBytesPerBatch"

expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher field \"MaxInputBytesPerBatch\" value \"0\" must be a positive integer")
end
end
Expand All @@ -83,10 +88,14 @@
let(:payload) { {"MaxInputBytesPerBatchPath" => "$.batchSize"} }

it "returns an ItemBatcher" do
pending "implement MaxInputBytesPerBatchPath"

expect(subject).to be_kind_of(described_class)
end

it "sets max_input_bytes_per_batch_path" do
pending "implement MaxInputBytesPerBatchPath"

expect(subject.max_input_bytes_per_batch_path).to be_kind_of(Floe::Workflow::ReferencePath)
expect(subject.max_input_bytes_per_batch_path).to have_attributes(:path => ["batchSize"])
end
Expand All @@ -104,6 +113,8 @@
let(:payload) { {"MaxInputBytesPerBatch" => 1_024, "MaxInputBytesPerBatchPath" => "$.batchSize"} }

it "raises an exception" do
pending "implement MaxInputBytesPerBatchPath"

expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"")
end
end
Expand Down Expand Up @@ -139,6 +150,38 @@
end
end

context "with MaxInputBytesPerBatch" do
let(:payload) { {"MaxInputBytesPerBatch" => 1_024} }

it "returns in batches of 2" do
pending "support max bytes per batch"

expect(subject.value(context, input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}])
end
end

context "with MaxInputBytesPerBatchPath" do
let(:payload) { {"MaxInputBytesPerBatchPath" => "$.bytesPerBatch"} }
let(:state_input) { {"bytesPerBatch" => 1_024, "items" => input} }

it "returns in batches of 2" do
pending "support max bytes per batch"

expect(subject.value(context, input, state_input)).to eq([{"Items" => %w[a b]}, {"Items" => %w[c d]}, {"Items" => %w[e]}])
end

context "with an invalid value in input" do
let(:state_input) { {"bytesPerBatch" => 0, "items" => input} }

it "raises an exception" do
pending "support max bytes per batch"

expect { subject.value(context, input, state_input) }
.to raise_error(Floe::ExecutionError, "Map.ItemBatcher field \"MaxInputBytesPerBatchPath\" value \"0\" must be a positive integer")
end
end
end

context "with BatchInput" do
let(:payload) { {"BatchInput" => {"foo.$" => "$.bar"}, "MaxItemsPerBatch" => 2} }
let(:state_input) { {"bar" => "bar", "items" => input} }
Expand Down

0 comments on commit 4ab7b13

Please sign in to comment.