Skip to content

Commit 4b0a1af

Browse files
author
ssekaran
committed
Added randomization, iterations and logging
1 parent 8303257 commit 4b0a1af

File tree

6 files changed

+186
-20
lines changed

6 files changed

+186
-20
lines changed

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,7 @@ public enum CassandraRelevantProperties
480480
SERIALIZATION_EMPTY_TYPE_NONEMPTY_BEHAVIOR("cassandra.serialization.emptytype.nonempty_behavior"),
481481
SET_SEP_THREAD_NAME("cassandra.set_sep_thread_name", "true"),
482482
SHUTDOWN_ANNOUNCE_DELAY_IN_MS("cassandra.shutdown_announce_in_ms", "2000"),
483+
SIMULATOR_ITERATIONS("simulator.iterations", "3"),
483484
SIZE_RECORDER_INTERVAL("cassandra.size_recorder_interval", "300"),
484485

485486
/**

test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.ArrayList;
2525
import java.util.Arrays;
2626
import java.util.EnumMap;
27+
import java.util.LinkedHashMap;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.TreeMap;
@@ -38,7 +39,11 @@
3839
import com.google.common.util.concurrent.AsyncFunction;
3940
import com.google.common.util.concurrent.FutureCallback;
4041

42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
44+
4145
import org.apache.cassandra.concurrent.ExecutorFactory;
46+
import org.apache.cassandra.config.CassandraRelevantProperties;
4247
import org.apache.cassandra.config.ParameterizedClass;
4348
import org.apache.cassandra.distributed.Cluster;
4449
import org.apache.cassandra.distributed.api.Feature;
@@ -112,6 +117,8 @@
112117
@SuppressWarnings("RedundantCast")
113118
public class ClusterSimulation<S extends Simulation> implements AutoCloseable
114119
{
120+
private static final Logger logger = LoggerFactory.getLogger(ClusterSimulation.class);
121+
115122
public static final Class<?>[] SHARE = new Class[]
116123
{
117124
AsyncFunction.class,
@@ -188,6 +195,9 @@ public static abstract class Builder<S extends Simulation>
188195
protected HeapPool.Logged.Listener memoryListener;
189196
protected SimulatedTime.Listener timeListener = (i1, i2) -> {};
190197
protected LongConsumer onThreadLocalRandomCheck;
198+
protected String memtableType = null;
199+
protected String memtableAllocationType = null;
200+
protected String sstableFormat = null;
191201

192202
public Debug debug()
193203
{
@@ -516,6 +526,24 @@ public Builder<S> onThreadLocalRandomCheck(LongConsumer runnable)
516526
return this;
517527
}
518528

529+
public Builder<S> memtableType(String type)
530+
{
531+
this.memtableType = type;
532+
return this;
533+
}
534+
535+
public Builder<S> memtableAllocationType(String type)
536+
{
537+
this.memtableAllocationType = type;
538+
return this;
539+
}
540+
541+
public Builder<S> sstableFormat(String format)
542+
{
543+
this.sstableFormat = format;
544+
return this;
545+
}
546+
519547
public abstract ClusterSimulation<S> create(long seed) throws IOException;
520548
}
521549

@@ -654,8 +682,68 @@ public ClusterSimulation(RandomSource random, long seed, int uniqueNum,
654682

655683
execution = new SimulatedExecution();
656684

685+
// Track randomized configuration for consolidated logging
686+
Map<String, String> randomizedConfig = new LinkedHashMap<>();
687+
randomizedConfig.put("nodes", String.valueOf(numOfNodes));
688+
randomizedConfig.put("dcs", String.valueOf(numOfDcs));
689+
690+
// Log replication factors
691+
StringBuilder rfString = new StringBuilder();
692+
for (int i = 0; i < numOfDcs; ++i)
693+
{
694+
if (i > 0)
695+
rfString.append(",");
696+
rfString.append("dc").append(i).append(":").append(initialRf[i]);
697+
}
698+
randomizedConfig.put("replication_factors", rfString.toString());
699+
700+
// Randomize memtable type
701+
String memtableType;
702+
if (builder.memtableType != null)
703+
{
704+
memtableType = builder.memtableType;
705+
}
706+
else
707+
{
708+
String[] memtableTypes = {"TrieMemtable", "SkipListMemtable"};
709+
memtableType = memtableTypes[random.uniform(0, memtableTypes.length)];
710+
}
711+
randomizedConfig.put("memtable", memtableType);
712+
713+
// Randomize memtable allocation type (heap-based only to avoid InterruptibleChannel issues with offheap)
714+
String memtableAllocationType;
715+
if (builder.memtableAllocationType != null)
716+
{
717+
memtableAllocationType = builder.memtableAllocationType;
718+
}
719+
else
720+
{
721+
String[] allocationTypes = {
722+
"heap_buffers", // Slab allocator (pooled memory)
723+
"unslabbed_heap_buffers" // Direct heap allocation (no pooling)
724+
};
725+
memtableAllocationType = allocationTypes[random.uniform(0, allocationTypes.length)];
726+
}
727+
randomizedConfig.put("memtable_allocation_type", memtableAllocationType);
728+
729+
// Randomize SSTable format
730+
String sstableFormat;
731+
if (builder.sstableFormat != null)
732+
{
733+
sstableFormat = builder.sstableFormat;
734+
}
735+
else
736+
{
737+
String[] formats = {"big", "bti"};
738+
sstableFormat = formats[random.uniform(0, formats.length)];
739+
}
740+
randomizedConfig.put("sstable_format", sstableFormat);
741+
CassandraRelevantProperties.SSTABLE_FORMAT_DEFAULT.setString(sstableFormat);
742+
657743
KindOfSequence kindOfDriftSequence = Choices.uniform(KindOfSequence.values()).choose(random);
658744
KindOfSequence kindOfDiscontinuitySequence = Choices.uniform(KindOfSequence.values()).choose(random);
745+
randomizedConfig.put("clock_drift_sequence", kindOfDriftSequence.toString());
746+
randomizedConfig.put("clock_discontinuity_sequence", kindOfDiscontinuitySequence.toString());
659747
time = new SimulatedTime(numOfNodes, random, 1577836800000L /*Jan 1st UTC*/, builder.clockDriftNanos, kindOfDriftSequence,
660748
kindOfDiscontinuitySequence.period(builder.clockDiscontinuitIntervalNanos, random),
661749
builder.timeListener);
@@ -672,6 +760,7 @@ public ClusterSimulation(RandomSource random, long seed, int uniqueNum,
672760
ThreadAllocator threadAllocator = new ThreadAllocator(random, builder.threadCount, numOfNodes);
673761
List<String> allowedDiskAccessModes = Arrays.asList("mmap", "mmap_index_only", "standard");
674762
String disk_access_mode = allowedDiskAccessModes.get(random.uniform(0, allowedDiskAccessModes.size() - 1));
763+
randomizedConfig.put("disk_access_mode", disk_access_mode);
675764
boolean commitlogCompressed = random.decide(.5f);
676765
cluster = snitch.setup(Cluster.build(numOfNodes)
677766
.withRoot(fs.getPath("/cassandra"))
@@ -683,13 +772,26 @@ public ClusterSimulation(RandomSource random, long seed, int uniqueNum,
683772
.set("cas_contention_timeout", String.format("%dms", NANOSECONDS.toMillis(builder.contentionTimeoutNanos)))
684773
.set("request_timeout", String.format("%dms", NANOSECONDS.toMillis(builder.requestTimeoutNanos)))
685774
.set("memtable_heap_space", "1MiB")
686-
.set("memtable_allocation_type", builder.memoryListener != null ? "unslabbed_heap_buffers_logged" : "heap_buffers")
775+
.set("memtable_allocation_type", builder.memoryListener != null ? "unslabbed_heap_buffers_logged" : memtableAllocationType)
687776
.set("file_cache_size", "16MiB")
688777
.set("use_deterministic_table_id", true)
689778
.set("disk_access_mode", disk_access_mode)
690779
.set("failure_detector", SimulatedFailureDetector.Instance.class.getName())
691780
.set("commitlog_sync", "batch");
692781

782+
if (memtableType.equals("TrieMemtable"))
783+
{
784+
config.set("memtable", Map.of(
785+
"configurations", Map.of(
786+
"default", Map.of("class_name", "TrieMemtable"))));
787+
}
788+
else
789+
{
790+
config.set("memtable", Map.of(
791+
"configurations", Map.of(
792+
"default", Map.of("class_name", "SkipListMemtable"))));
793+
}
794+
693795
// TODO: Add remove() to IInstanceConfig
694796
if (config instanceof InstanceConfig)
695797
{
@@ -779,10 +881,18 @@ public void afterStartup(IInstance i)
779881
simulated.register(futureActionScheduler);
780882

781883
RunnableActionScheduler scheduler = builder.schedulerFactory.create(random);
782-
ClusterActions.Options options = new ClusterActions.Options(builder.topologyChangeLimit, Choices.uniform(KindOfSequence.values()).choose(random).period(builder.topologyChangeIntervalNanos, random),
884+
KindOfSequence topologyChangeSequence = Choices.uniform(KindOfSequence.values()).choose(random);
885+
ClusterActions.Options options = new ClusterActions.Options(builder.topologyChangeLimit, topologyChangeSequence.period(builder.topologyChangeIntervalNanos, random),
783886
Choices.random(random, builder.topologyChanges),
784887
minRf, initialRf, maxRf, null);
785888
simulation = factory.create(simulated, scheduler, cluster, options);
889+
890+
// Add remaining randomization tracking
891+
randomizedConfig.put("network_scheduler", futureActionScheduler.getKind().toString());
892+
randomizedConfig.put("runnable_scheduler", scheduler.getClass().getSimpleName());
893+
randomizedConfig.put("topology_change_sequence", topologyChangeSequence.toString());
894+
895+
logger.warn("Seed 0x{} - Randomized config: {}", Long.toHexString(seed), randomizedConfig);
786896
}
787897

788898
public synchronized void close() throws IOException

test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedFutureActionScheduler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public Scheduler(SchedulerConfig config, RandomSource random, KindOfSequence kin
6767
final int nodeCount;
6868
final RandomSource random;
6969
final SimulatedTime time;
70+
final KindOfSequence kind;
7071

7172
// TODO (feature): should we produce more than two simultaneous partitions?
7273
final BitSet isInDropPartition = new BitSet();
@@ -86,6 +87,7 @@ public Scheduler(SchedulerConfig config, RandomSource random, KindOfSequence kin
8687

8788
public SimulatedFutureActionScheduler(KindOfSequence kind, int nodeCount, RandomSource random, SimulatedTime time, NetworkConfig network, SchedulerConfig scheduler)
8889
{
90+
this.kind = kind;
8991
this.nodeCount = nodeCount;
9092
this.random = random;
9193
this.time = time;
@@ -196,4 +198,9 @@ public void onChange(Topology newTopology)
196198
if (oldTopology == null || (newTopology.quorumRf < oldTopology.quorumRf && newTopology.quorumRf < isInDropPartition.cardinality()))
197199
recompute();
198200
}
201+
202+
public KindOfSequence getKind()
203+
{
204+
return kind;
205+
}
199206
}

test/simulator/test/org/apache/cassandra/simulator/test/ShortPaxosSimulationTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,21 @@
2525

2626
import org.apache.cassandra.simulator.paxos.PaxosSimulationRunner;
2727

28+
import static org.apache.cassandra.simulator.test.SimulationTestBase.DEFAULT_ITERATIONS;
29+
2830
public class ShortPaxosSimulationTest
2931
{
3032
@Test
3133
public void simulationTest() throws IOException
3234
{
33-
PaxosSimulationRunner.main(new String[] { "run", "-n", "3..6", "-t", "1000", "-c", "2", "--cluster-action-limit", "2", "-s", "30" });
35+
PaxosSimulationRunner.main(new String[] { "run", "-n", "3..6", "-t", "1000", "-c", "2", "--cluster-action-limit", "2", "-s", "30", "--simulations", String.valueOf(DEFAULT_ITERATIONS) });
3436
}
3537

3638
@Test
3739
@Ignore("fails due to OOM DirectMemory - unclear why")
3840
public void selfReconcileTest() throws IOException
3941
{
40-
PaxosSimulationRunner.main(new String[] { "reconcile", "-n", "3..6", "-t", "1000", "-c", "2", "--cluster-action-limit", "2", "-s", "30", "--with-self" });
42+
PaxosSimulationRunner.main(new String[] { "reconcile", "-n", "3..6", "-t", "1000", "-c", "2", "--cluster-action-limit", "2", "-s", "30", "--with-self", "--simulations", String.valueOf(DEFAULT_ITERATIONS) });
4143
}
4244
}
4345

test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828

2929
import com.google.common.collect.Iterators;
3030

31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
3134
import org.apache.cassandra.concurrent.ExecutorFactory;
3235
import org.apache.cassandra.distributed.Cluster;
3336
import org.apache.cassandra.distributed.api.ConsistencyLevel;
@@ -64,6 +67,7 @@
6467
import static java.util.concurrent.TimeUnit.MILLISECONDS;
6568
import static java.util.concurrent.TimeUnit.NANOSECONDS;
6669
import static java.util.concurrent.TimeUnit.SECONDS;
70+
import static org.apache.cassandra.config.CassandraRelevantProperties.SIMULATOR_ITERATIONS;
6771
import static org.apache.cassandra.simulator.ActionSchedule.Mode.STREAM_LIMITED;
6872
import static org.apache.cassandra.simulator.ActionSchedule.Mode.UNLIMITED;
6973
import static org.apache.cassandra.simulator.ClusterSimulation.ISOLATE;
@@ -76,6 +80,9 @@
7680

7781
public class SimulationTestBase
7882
{
83+
private static final Logger logger = LoggerFactory.getLogger(SimulationTestBase.class);
84+
public static final int DEFAULT_ITERATIONS = SIMULATOR_ITERATIONS.getInt();
85+
7986
static abstract class DTestClusterSimulation implements Simulation
8087
{
8188
final SimulatedSystems simulated;
@@ -140,15 +147,23 @@ public void close() throws Exception
140147
public static void simulate(Function<DTestClusterSimulation, ActionList> init,
141148
Function<DTestClusterSimulation, ActionList> test,
142149
Consumer<ClusterSimulation.Builder<DTestClusterSimulation>> configure) throws IOException
150+
{
151+
simulate(init, test, configure, 1);
152+
}
153+
154+
public static void simulate(Function<DTestClusterSimulation, ActionList> init,
155+
Function<DTestClusterSimulation, ActionList> test,
156+
Consumer<ClusterSimulation.Builder<DTestClusterSimulation>> configure,
157+
int iterations) throws IOException
143158
{
144159
SimulationRunner.beforeAll();
145160
long seed = System.currentTimeMillis();
146-
RandomSource random = new RandomSource.Default();
147-
random.reset(seed);
148161
class Factory extends ClusterSimulation.Builder<DTestClusterSimulation>
149162
{
150163
public ClusterSimulation<DTestClusterSimulation> create(long seed) throws IOException
151164
{
165+
RandomSource random = new RandomSource.Default();
166+
random.reset(seed);
152167
return new ClusterSimulation<>(random, seed, 1, this,
153168
(c) -> {},
154169
(simulated, scheduler, cluster, options) -> new DTestClusterSimulation(simulated, scheduler, cluster) {
@@ -168,33 +183,63 @@ protected ActionList execute()
168183

169184
Factory factory = new Factory();
170185
configure.accept(factory);
171-
try (ClusterSimulation<?> cluster = factory.create(seed))
186+
for (int i = 0; i < iterations; i++)
172187
{
173-
try
174-
{
175-
cluster.simulation.run();
176-
}
177-
catch (Throwable t)
188+
long currentSeed = seed + i;
189+
logger.info("Running iteration {} of {} with seed {}L", i + 1, iterations, currentSeed);
190+
try (ClusterSimulation<?> cluster = factory.create(currentSeed))
178191
{
179-
throw new AssertionError(String.format("Failed on seed %s", Long.toHexString(seed)),
180-
t);
192+
try
193+
{
194+
cluster.simulation.run();
195+
}
196+
catch (Throwable t)
197+
{
198+
throw new AssertionError(String.format("Failed on seed 0x%s (base seed 0x%s + %d)",
199+
Long.toHexString(currentSeed), Long.toHexString(seed), i), t);
200+
}
181201
}
182202
}
183203
}
184204

185205
public static void simulate(IIsolatedExecutor.SerializableRunnable run,
186206
IIsolatedExecutor.SerializableRunnable check)
187207
{
188-
simulate(new IIsolatedExecutor.SerializableRunnable[]{run},
189-
check);
208+
simulate(new IIsolatedExecutor.SerializableRunnable[]{run}, check, 1);
209+
}
210+
211+
public static void simulate(IIsolatedExecutor.SerializableRunnable run,
212+
IIsolatedExecutor.SerializableRunnable check,
213+
int iterations)
214+
{
215+
simulate(new IIsolatedExecutor.SerializableRunnable[]{run}, check, iterations);
190216
}
191217

192218
public static void simulate(IIsolatedExecutor.SerializableRunnable[] runnables,
193219
IIsolatedExecutor.SerializableRunnable check)
220+
{
221+
simulate(runnables, check, 1);
222+
}
223+
224+
public static void simulate(IIsolatedExecutor.SerializableRunnable[] runnables,
225+
IIsolatedExecutor.SerializableRunnable check,
226+
int iterations)
227+
{
228+
long seed = System.currentTimeMillis();
229+
for (int i = 0; i < iterations; i++)
230+
{
231+
long currentSeed = seed + i;
232+
logger.info("Running iteration {} of {} with seed {}L", i + 1, iterations, currentSeed);
233+
simulate(runnables, check, currentSeed);
234+
}
235+
}
236+
237+
public static void simulate(IIsolatedExecutor.SerializableRunnable[] runnables,
238+
IIsolatedExecutor.SerializableRunnable check,
239+
long seed)
194240
{
195241
Failures failures = new Failures();
196242
RandomSource random = new RandomSource.Default();
197-
long seed = System.currentTimeMillis();
198243
System.out.println("Using seed: " + seed);
199244
random.reset(seed);
200245
SimulatedTime time = new SimulatedTime(1, random, 1577836800000L /*Jan 1st UTC*/, new LongRange(1, 100, MILLISECONDS, NANOSECONDS),

0 commit comments

Comments
 (0)