Skip to content

Commit 473e776

Browse files
authored
Merge pull request #71 from amazon-contributing/fix-issue-70
Distribute workloads by config during execution
2 parents bfde3ee + f83b949 commit 473e776

File tree

2 files changed

+203
-10
lines changed

2 files changed

+203
-10
lines changed

src/main/java/com/oltpbenchmark/benchmarks/tpcc/TPCCBenchmark.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,13 @@ private List<TPCCWorker> createTerminalsOldWay() throws SQLException {
8686
// totalWarehouses is equal to numWarehouses in case of non-partitioned use case
8787
int totalWarehouses = (int) workConf.getScaleFactor();
8888

89-
if (totalWarehouses <= 0) {
90-
// At least one warehouse, @see
91-
// https://github.com/cmu-db/benchbase/blob/main/src/main/java/com/oltpbenchmark/benchmarks/tpcc/TPCCBenchmark.java
92-
totalWarehouses = 1;
93-
}
94-
95-
// Default values used for warehouse indexes and stride
96-
final int startWarehouseIndex = 1;
97-
final int endWarehouseIndex = totalWarehouses;
98-
final int stride = 1;
89+
// [startWarehouseIndex, endWarehouseIndex] are both included.
90+
// Use defaults if not configured: start=1, end=totalWarehouses, stride=1
91+
final int startWarehouseIndex =
92+
workConf.getStartWarehouseIndex() > 0 ? workConf.getStartWarehouseIndex() : 1;
93+
final int endWarehouseIndex =
94+
workConf.getEndWarehouseIndex() > 0 ? workConf.getEndWarehouseIndex() : totalWarehouses;
95+
final int stride = workConf.getStride() > 0 ? workConf.getStride() : 1;
9996

10097
LOG.info(
10198
"Start warehouse idx: {} end warehouse idx: {} stride: {}",
@@ -110,6 +107,10 @@ private List<TPCCWorker> createTerminalsOldWay() throws SQLException {
110107
final int numWarehouses = w_ids.size();
111108
int numTerminals = workConf.getTerminals();
112109

110+
assert startWarehouseIndex >= 1 : "The start index must be >= 1";
111+
assert endWarehouseIndex >= 1 : "The end index must be >= 1";
112+
assert endWarehouseIndex <= workConf.getScaleFactor()
113+
: "The end index must be within the scale factor";
113114
assert numWarehouses >= 1 : "At least need 1 warehouse to do benchmark";
114115

115116
// We distribute terminals evenly across the warehouses

src/test/java/com/oltpbenchmark/benchmarks/tpcc/TestTPCCBenchmark.java

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,20 @@
1616

1717
package com.oltpbenchmark.benchmarks.tpcc;
1818

19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertTrue;
21+
1922
import com.oltpbenchmark.api.AbstractTestBenchmarkModule;
23+
import com.oltpbenchmark.api.BenchmarkModule;
2024
import com.oltpbenchmark.api.Procedure;
25+
import com.oltpbenchmark.api.Worker;
2126
import com.oltpbenchmark.benchmarks.tpcc.procedures.*;
27+
import java.util.HashMap;
28+
import java.util.HashSet;
2229
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Set;
32+
import org.junit.Test;
2333

2434
public class TestTPCCBenchmark extends AbstractTestBenchmarkModule<TPCCBenchmark> {
2535

@@ -35,4 +45,186 @@ public List<Class<? extends Procedure>> procedures() {
3545
public Class<TPCCBenchmark> benchmarkClass() {
3646
return TPCCBenchmark.class;
3747
}
48+
49+
/**
50+
* Test that workers are created with correct warehouse IDs when using startWarehouseIndex,
51+
* endWarehouseIndex, and stride parameters for distributed benchmarking.
52+
*/
53+
@Test
54+
public void testCreateTerminalsWithStrideDistribution() throws Exception {
55+
// Configure for distributed benchmarking: warehouses 3, 6, 9 (stride=3, start=3, end=9)
56+
int scaleFactor = 10;
57+
int startWarehouseIndex = 3;
58+
int endWarehouseIndex = 9;
59+
int stride = 3;
60+
int terminals = 6;
61+
62+
this.workConf.setScaleFactor(scaleFactor);
63+
this.workConf.setStartWarehouseIndex(startWarehouseIndex);
64+
this.workConf.setEndWarehouseIndex(endWarehouseIndex);
65+
this.workConf.setStride(stride);
66+
this.workConf.setTerminals(terminals);
67+
68+
// Create workers
69+
List<Worker<? extends BenchmarkModule>> workers = this.benchmark.makeWorkers();
70+
71+
assertEquals("Should create correct number of workers", terminals, workers.size());
72+
73+
// Collect all warehouse IDs assigned to workers
74+
Set<Integer> assignedWarehouseIds = new HashSet<>();
75+
for (Worker<? extends BenchmarkModule> worker : workers) {
76+
TPCCWorker tpccWorker = (TPCCWorker) worker;
77+
assignedWarehouseIds.add(tpccWorker.getTerminalWarehouseID());
78+
}
79+
80+
// Expected warehouses: 3, 6, 9 (start=3, end=9, stride=3)
81+
Set<Integer> expectedWarehouseIds = Set.of(3, 6, 9);
82+
83+
assertEquals(
84+
"Workers should only be assigned to warehouses matching stride pattern",
85+
expectedWarehouseIds,
86+
assignedWarehouseIds);
87+
}
88+
89+
/**
90+
* Test that terminals are evenly distributed across warehouses. With 10 terminals across 3
91+
* warehouses, distribution should be approximately 3, 3, 4 (or similar even split).
92+
*/
93+
@Test
94+
public void testTerminalsEvenlyDistributedAcrossWarehouses() throws Exception {
95+
int scaleFactor = 10;
96+
int startWarehouseIndex = 1;
97+
int endWarehouseIndex = 3;
98+
int stride = 1;
99+
int terminals = 10;
100+
101+
this.workConf.setScaleFactor(scaleFactor);
102+
this.workConf.setStartWarehouseIndex(startWarehouseIndex);
103+
this.workConf.setEndWarehouseIndex(endWarehouseIndex);
104+
this.workConf.setStride(stride);
105+
this.workConf.setTerminals(terminals);
106+
107+
List<Worker<? extends BenchmarkModule>> workers = this.benchmark.makeWorkers();
108+
109+
assertEquals("Should create correct number of workers", terminals, workers.size());
110+
111+
// Count terminals per warehouse
112+
Map<Integer, Integer> terminalsPerWarehouse = new HashMap<>();
113+
for (Worker<? extends BenchmarkModule> worker : workers) {
114+
TPCCWorker tpccWorker = (TPCCWorker) worker;
115+
int warehouseId = tpccWorker.getTerminalWarehouseID();
116+
terminalsPerWarehouse.merge(warehouseId, 1, Integer::sum);
117+
}
118+
119+
// With 10 terminals across 3 warehouses, expect distribution like 3, 3, 4
120+
// Each warehouse should have at least floor(10/3)=3 terminals
121+
// and at most ceil(10/3)=4 terminals
122+
int numWarehouses = 3;
123+
int minTerminalsPerWarehouse = 3;
124+
int maxTerminalsPerWarehouse = 4;
125+
126+
for (Map.Entry<Integer, Integer> entry : terminalsPerWarehouse.entrySet()) {
127+
int warehouseId = entry.getKey();
128+
int count = entry.getValue();
129+
assertTrue(
130+
"Warehouse "
131+
+ warehouseId
132+
+ " has "
133+
+ count
134+
+ " terminals, expected between "
135+
+ minTerminalsPerWarehouse
136+
+ " and "
137+
+ maxTerminalsPerWarehouse,
138+
count >= minTerminalsPerWarehouse && count <= maxTerminalsPerWarehouse);
139+
}
140+
141+
// Verify all expected warehouses have terminals
142+
assertEquals(
143+
"All warehouses should have at least one terminal",
144+
numWarehouses,
145+
terminalsPerWarehouse.size());
146+
147+
// Verify total terminals assigned equals expected
148+
int totalAssigned = terminalsPerWarehouse.values().stream().mapToInt(Integer::intValue).sum();
149+
assertEquals(
150+
"Total terminals assigned should equal requested terminals", terminals, totalAssigned);
151+
152+
// Verify each worker has sequential ID from 0 to terminals-1
153+
for (int i = 0; i < workers.size(); i++) {
154+
assertEquals("Worker ID should be sequential", i, workers.get(i).getId());
155+
}
156+
}
157+
158+
/**
159+
* Test distributed benchmarking scenario: 3 instances each handling different warehouse subsets.
160+
* Instance 1: warehouses 1, 4, 7, 10 (start=1, stride=3) Instance 2: warehouses 2, 5, 8 (start=2,
161+
* stride=3) Instance 3: warehouses 3, 6, 9 (start=3, stride=3)
162+
*/
163+
@Test
164+
public void testDistributedBenchmarkingScenario() throws Exception {
165+
int scaleFactor = 10;
166+
int stride = 3;
167+
int terminalsPerInstance = 8;
168+
169+
// Simulate instance 1: warehouses 1, 4, 7, 10
170+
this.workConf.setScaleFactor(scaleFactor);
171+
this.workConf.setStartWarehouseIndex(1);
172+
this.workConf.setEndWarehouseIndex(10);
173+
this.workConf.setStride(stride);
174+
this.workConf.setTerminals(terminalsPerInstance);
175+
176+
List<Worker<? extends BenchmarkModule>> workers1 = this.benchmark.makeWorkers();
177+
178+
Set<Integer> instance1Warehouses = new HashSet<>();
179+
for (Worker<? extends BenchmarkModule> worker : workers1) {
180+
instance1Warehouses.add(((TPCCWorker) worker).getTerminalWarehouseID());
181+
}
182+
183+
// Instance 1 should only use warehouses 1, 4, 7, 10
184+
Set<Integer> expectedInstance1 = Set.of(1, 4, 7, 10);
185+
assertEquals(
186+
"Instance 1 should use warehouses 1, 4, 7, 10", expectedInstance1, instance1Warehouses);
187+
188+
// Simulate instance 2: warehouses 2, 5, 8
189+
this.workConf.setStartWarehouseIndex(2);
190+
this.workConf.setEndWarehouseIndex(10);
191+
this.benchmark =
192+
this.benchmark
193+
.getClass()
194+
.getConstructor(this.workConf.getClass())
195+
.newInstance(this.workConf);
196+
197+
List<Worker<? extends BenchmarkModule>> workers2 = this.benchmark.makeWorkers();
198+
199+
Set<Integer> instance2Warehouses = new HashSet<>();
200+
for (Worker<? extends BenchmarkModule> worker : workers2) {
201+
instance2Warehouses.add(((TPCCWorker) worker).getTerminalWarehouseID());
202+
}
203+
204+
// Instance 2 should only use warehouses 2, 5, 8
205+
Set<Integer> expectedInstance2 = Set.of(2, 5, 8);
206+
assertEquals(
207+
"Instance 2 should use warehouses 2, 5, 8", expectedInstance2, instance2Warehouses);
208+
209+
// Simulate instance 3: warehouses 3, 6, 9
210+
this.workConf.setStartWarehouseIndex(3);
211+
this.workConf.setEndWarehouseIndex(10);
212+
this.benchmark =
213+
this.benchmark
214+
.getClass()
215+
.getConstructor(this.workConf.getClass())
216+
.newInstance(this.workConf);
217+
218+
List<Worker<? extends BenchmarkModule>> workers3 = this.benchmark.makeWorkers();
219+
220+
Set<Integer> instance3Warehouses = new HashSet<>();
221+
for (Worker<? extends BenchmarkModule> worker : workers3) {
222+
instance3Warehouses.add(((TPCCWorker) worker).getTerminalWarehouseID());
223+
}
224+
225+
// Instance 3 should only use warehouses 3, 6, 9
226+
Set<Integer> expectedInstance3 = Set.of(3, 6, 9);
227+
assertEquals(
228+
"Instance 3 should use warehouses 3, 6, 9", expectedInstance3, instance3Warehouses);
229+
}
38230
}

0 commit comments

Comments
 (0)