Skip to content

Commit f83b949

Browse files
author
Hanzhou Tang
committed
Distribute workloads by config during execution
This PR fixes the workload distribution logic to respect configuration settings during execution. Also adds unit test. Fixes #70
1 parent bfde3ee commit f83b949

File tree

2 files changed

+203
-10
lines changed

2 files changed

+203
-10
lines changed

src/main/java/com/oltpbenchmark/benchmarks/tpcc/TPCCBenchmark.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,13 @@ private List<TPCCWorker> createTerminalsOldWay() throws SQLException {
8686
// totalWarehouses is equal to numWarehouses in case of non-partitioned use case
8787
int totalWarehouses = (int) workConf.getScaleFactor();
8888

89-
if (totalWarehouses <= 0) {
90-
// At least one warehouse, @see
91-
// https://github.com/cmu-db/benchbase/blob/main/src/main/java/com/oltpbenchmark/benchmarks/tpcc/TPCCBenchmark.java
92-
totalWarehouses = 1;
93-
}
94-
95-
// Default values used for warehouse indexes and stride
96-
final int startWarehouseIndex = 1;
97-
final int endWarehouseIndex = totalWarehouses;
98-
final int stride = 1;
89+
// [startWarehouseIndex, endWarehouseIndex] are both included.
90+
// Use defaults if not configured: start=1, end=totalWarehouses, stride=1
91+
final int startWarehouseIndex =
92+
workConf.getStartWarehouseIndex() > 0 ? workConf.getStartWarehouseIndex() : 1;
93+
final int endWarehouseIndex =
94+
workConf.getEndWarehouseIndex() > 0 ? workConf.getEndWarehouseIndex() : totalWarehouses;
95+
final int stride = workConf.getStride() > 0 ? workConf.getStride() : 1;
9996

10097
LOG.info(
10198
"Start warehouse idx: {} end warehouse idx: {} stride: {}",
@@ -110,6 +107,10 @@ private List<TPCCWorker> createTerminalsOldWay() throws SQLException {
110107
final int numWarehouses = w_ids.size();
111108
int numTerminals = workConf.getTerminals();
112109

110+
assert startWarehouseIndex >= 1 : "The start index must be >= 1";
111+
assert endWarehouseIndex >= 1 : "The end index must be >= 1";
112+
assert endWarehouseIndex <= workConf.getScaleFactor()
113+
: "The end index must be within the scale factor";
113114
assert numWarehouses >= 1 : "At least need 1 warehouse to do benchmark";
114115

115116
// We distribute terminals evenly across the warehouses

src/test/java/com/oltpbenchmark/benchmarks/tpcc/TestTPCCBenchmark.java

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,20 @@
1616

1717
package com.oltpbenchmark.benchmarks.tpcc;
1818

19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertTrue;
21+
1922
import com.oltpbenchmark.api.AbstractTestBenchmarkModule;
23+
import com.oltpbenchmark.api.BenchmarkModule;
2024
import com.oltpbenchmark.api.Procedure;
25+
import com.oltpbenchmark.api.Worker;
2126
import com.oltpbenchmark.benchmarks.tpcc.procedures.*;
27+
import java.util.HashMap;
28+
import java.util.HashSet;
2229
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Set;
32+
import org.junit.Test;
2333

2434
public class TestTPCCBenchmark extends AbstractTestBenchmarkModule<TPCCBenchmark> {
2535

@@ -35,4 +45,186 @@ public List<Class<? extends Procedure>> procedures() {
3545
public Class<TPCCBenchmark> benchmarkClass() {
3646
return TPCCBenchmark.class;
3747
}
48+
49+
/**
50+
* Test that workers are created with correct warehouse IDs when using startWarehouseIndex,
51+
* endWarehouseIndex, and stride parameters for distributed benchmarking.
52+
*/
53+
@Test
54+
public void testCreateTerminalsWithStrideDistribution() throws Exception {
55+
// Configure for distributed benchmarking: warehouses 3, 6, 9 (stride=3, start=3, end=9)
56+
int scaleFactor = 10;
57+
int startWarehouseIndex = 3;
58+
int endWarehouseIndex = 9;
59+
int stride = 3;
60+
int terminals = 6;
61+
62+
this.workConf.setScaleFactor(scaleFactor);
63+
this.workConf.setStartWarehouseIndex(startWarehouseIndex);
64+
this.workConf.setEndWarehouseIndex(endWarehouseIndex);
65+
this.workConf.setStride(stride);
66+
this.workConf.setTerminals(terminals);
67+
68+
// Create workers
69+
List<Worker<? extends BenchmarkModule>> workers = this.benchmark.makeWorkers();
70+
71+
assertEquals("Should create correct number of workers", terminals, workers.size());
72+
73+
// Collect all warehouse IDs assigned to workers
74+
Set<Integer> assignedWarehouseIds = new HashSet<>();
75+
for (Worker<? extends BenchmarkModule> worker : workers) {
76+
TPCCWorker tpccWorker = (TPCCWorker) worker;
77+
assignedWarehouseIds.add(tpccWorker.getTerminalWarehouseID());
78+
}
79+
80+
// Expected warehouses: 3, 6, 9 (start=3, end=9, stride=3)
81+
Set<Integer> expectedWarehouseIds = Set.of(3, 6, 9);
82+
83+
assertEquals(
84+
"Workers should only be assigned to warehouses matching stride pattern",
85+
expectedWarehouseIds,
86+
assignedWarehouseIds);
87+
}
88+
89+
/**
90+
* Test that terminals are evenly distributed across warehouses. With 10 terminals across 3
91+
* warehouses, distribution should be approximately 3, 3, 4 (or similar even split).
92+
*/
93+
@Test
94+
public void testTerminalsEvenlyDistributedAcrossWarehouses() throws Exception {
95+
int scaleFactor = 10;
96+
int startWarehouseIndex = 1;
97+
int endWarehouseIndex = 3;
98+
int stride = 1;
99+
int terminals = 10;
100+
101+
this.workConf.setScaleFactor(scaleFactor);
102+
this.workConf.setStartWarehouseIndex(startWarehouseIndex);
103+
this.workConf.setEndWarehouseIndex(endWarehouseIndex);
104+
this.workConf.setStride(stride);
105+
this.workConf.setTerminals(terminals);
106+
107+
List<Worker<? extends BenchmarkModule>> workers = this.benchmark.makeWorkers();
108+
109+
assertEquals("Should create correct number of workers", terminals, workers.size());
110+
111+
// Count terminals per warehouse
112+
Map<Integer, Integer> terminalsPerWarehouse = new HashMap<>();
113+
for (Worker<? extends BenchmarkModule> worker : workers) {
114+
TPCCWorker tpccWorker = (TPCCWorker) worker;
115+
int warehouseId = tpccWorker.getTerminalWarehouseID();
116+
terminalsPerWarehouse.merge(warehouseId, 1, Integer::sum);
117+
}
118+
119+
// With 10 terminals across 3 warehouses, expect distribution like 3, 3, 4
120+
// Each warehouse should have at least floor(10/3)=3 terminals
121+
// and at most ceil(10/3)=4 terminals
122+
int numWarehouses = 3;
123+
int minTerminalsPerWarehouse = 3;
124+
int maxTerminalsPerWarehouse = 4;
125+
126+
for (Map.Entry<Integer, Integer> entry : terminalsPerWarehouse.entrySet()) {
127+
int warehouseId = entry.getKey();
128+
int count = entry.getValue();
129+
assertTrue(
130+
"Warehouse "
131+
+ warehouseId
132+
+ " has "
133+
+ count
134+
+ " terminals, expected between "
135+
+ minTerminalsPerWarehouse
136+
+ " and "
137+
+ maxTerminalsPerWarehouse,
138+
count >= minTerminalsPerWarehouse && count <= maxTerminalsPerWarehouse);
139+
}
140+
141+
// Verify all expected warehouses have terminals
142+
assertEquals(
143+
"All warehouses should have at least one terminal",
144+
numWarehouses,
145+
terminalsPerWarehouse.size());
146+
147+
// Verify total terminals assigned equals expected
148+
int totalAssigned = terminalsPerWarehouse.values().stream().mapToInt(Integer::intValue).sum();
149+
assertEquals(
150+
"Total terminals assigned should equal requested terminals", terminals, totalAssigned);
151+
152+
// Verify each worker has sequential ID from 0 to terminals-1
153+
for (int i = 0; i < workers.size(); i++) {
154+
assertEquals("Worker ID should be sequential", i, workers.get(i).getId());
155+
}
156+
}
157+
158+
/**
159+
* Test distributed benchmarking scenario: 3 instances each handling different warehouse subsets.
160+
* Instance 1: warehouses 1, 4, 7, 10 (start=1, stride=3) Instance 2: warehouses 2, 5, 8 (start=2,
161+
* stride=3) Instance 3: warehouses 3, 6, 9 (start=3, stride=3)
162+
*/
163+
@Test
164+
public void testDistributedBenchmarkingScenario() throws Exception {
165+
int scaleFactor = 10;
166+
int stride = 3;
167+
int terminalsPerInstance = 8;
168+
169+
// Simulate instance 1: warehouses 1, 4, 7, 10
170+
this.workConf.setScaleFactor(scaleFactor);
171+
this.workConf.setStartWarehouseIndex(1);
172+
this.workConf.setEndWarehouseIndex(10);
173+
this.workConf.setStride(stride);
174+
this.workConf.setTerminals(terminalsPerInstance);
175+
176+
List<Worker<? extends BenchmarkModule>> workers1 = this.benchmark.makeWorkers();
177+
178+
Set<Integer> instance1Warehouses = new HashSet<>();
179+
for (Worker<? extends BenchmarkModule> worker : workers1) {
180+
instance1Warehouses.add(((TPCCWorker) worker).getTerminalWarehouseID());
181+
}
182+
183+
// Instance 1 should only use warehouses 1, 4, 7, 10
184+
Set<Integer> expectedInstance1 = Set.of(1, 4, 7, 10);
185+
assertEquals(
186+
"Instance 1 should use warehouses 1, 4, 7, 10", expectedInstance1, instance1Warehouses);
187+
188+
// Simulate instance 2: warehouses 2, 5, 8
189+
this.workConf.setStartWarehouseIndex(2);
190+
this.workConf.setEndWarehouseIndex(10);
191+
this.benchmark =
192+
this.benchmark
193+
.getClass()
194+
.getConstructor(this.workConf.getClass())
195+
.newInstance(this.workConf);
196+
197+
List<Worker<? extends BenchmarkModule>> workers2 = this.benchmark.makeWorkers();
198+
199+
Set<Integer> instance2Warehouses = new HashSet<>();
200+
for (Worker<? extends BenchmarkModule> worker : workers2) {
201+
instance2Warehouses.add(((TPCCWorker) worker).getTerminalWarehouseID());
202+
}
203+
204+
// Instance 2 should only use warehouses 2, 5, 8
205+
Set<Integer> expectedInstance2 = Set.of(2, 5, 8);
206+
assertEquals(
207+
"Instance 2 should use warehouses 2, 5, 8", expectedInstance2, instance2Warehouses);
208+
209+
// Simulate instance 3: warehouses 3, 6, 9
210+
this.workConf.setStartWarehouseIndex(3);
211+
this.workConf.setEndWarehouseIndex(10);
212+
this.benchmark =
213+
this.benchmark
214+
.getClass()
215+
.getConstructor(this.workConf.getClass())
216+
.newInstance(this.workConf);
217+
218+
List<Worker<? extends BenchmarkModule>> workers3 = this.benchmark.makeWorkers();
219+
220+
Set<Integer> instance3Warehouses = new HashSet<>();
221+
for (Worker<? extends BenchmarkModule> worker : workers3) {
222+
instance3Warehouses.add(((TPCCWorker) worker).getTerminalWarehouseID());
223+
}
224+
225+
// Instance 3 should only use warehouses 3, 6, 9
226+
Set<Integer> expectedInstance3 = Set.of(3, 6, 9);
227+
assertEquals(
228+
"Instance 3 should use warehouses 3, 6, 9", expectedInstance3, instance3Warehouses);
229+
}
38230
}

0 commit comments

Comments
 (0)