Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 56 additions & 26 deletions daprovider/anytrust/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ type randomBagOfFailures struct {
mutex sync.Mutex
}

func newRandomBagOfFailures(t *testing.T, nSuccess, nFailures int, highestFailureType failureType) *randomBagOfFailures {
func newRandomBagOfFailures(t *testing.T, rng *rand.Rand, nSuccess, nFailures int, highestFailureType failureType) *randomBagOfFailures {
var failures []failureType
for i := 0; i < nSuccess; i++ {
failures = append(failures, success)
}

for i := 0; i < nFailures; i++ {
failures = append(failures, failureType(rand.Int()%int(highestFailureType)+1))
failures = append(failures, failureType(rng.Intn(int(highestFailureType))+1))
}

rand.Shuffle(len(failures), func(i, j int) { failures[i], failures[j] = failures[j], failures[i] })
rng.Shuffle(len(failures), func(i, j int) { failures[i], failures[j] = failures[j], failures[i] })

log.Trace("Injected failures", "failures", failures)

Expand Down Expand Up @@ -167,22 +167,33 @@ func enableLogging() {
log.SetDefault(log.NewLogger(glogger))
}

func testConfigurableStorageFailures(t *testing.T, shouldFailAggregation bool) {
func countRetrievalFailures(ctx context.Context, storageServices []StorageService, cert *anytrustutil.DataAvailabilityCertificate, rawMsg []byte) int {
retrievalFailures := 0
for _, storageService := range storageServices {
messageRetrieved, err := storageService.GetByHash(ctx, cert.DataHash)
if err != nil || !bytes.Equal(rawMsg, messageRetrieved) {
retrievalFailures++
}
}
return retrievalFailures
}

func testConfigurableStorageFailures(t *testing.T, shouldFailAggregation bool, rng *rand.Rand) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

numBackendAnyTrust := (rand.Int() % 20) + 1
assumedHonest := (rand.Int() % numBackendAnyTrust) + 1
numBackendAnyTrust := rng.Intn(20) + 1
assumedHonest := rng.Intn(numBackendAnyTrust) + 1
var nFailures int
if shouldFailAggregation {
nFailures = max(assumedHonest, rand.Int()%(numBackendAnyTrust+1))
nFailures = max(assumedHonest, rng.Intn(numBackendAnyTrust+1))
} else {
nFailures = min(assumedHonest-1, rand.Int()%(numBackendAnyTrust+1))
nFailures = min(assumedHonest-1, rng.Intn(numBackendAnyTrust+1))
}
nSuccesses := numBackendAnyTrust - nFailures
log.Trace(fmt.Sprintf("Testing aggregator with K:%d with K=N+1-H, N:%d, H:%d, and %d successes", numBackendAnyTrust+1-assumedHonest, numBackendAnyTrust, assumedHonest, nSuccesses))

injectedFailures := newRandomBagOfFailures(t, nSuccesses, nFailures, dataCorruption)
injectedFailures := newRandomBagOfFailures(t, rng, nSuccesses, nFailures, dataCorruption)
var backends []ServiceDetails
var storageServices []StorageService
for i := 0; i < numBackendAnyTrust; i++ {
Expand Down Expand Up @@ -221,23 +232,34 @@ func testConfigurableStorageFailures(t *testing.T, shouldFailAggregation bool) {
return
}

// Wait for all stores that would succeed to succeed.
time.Sleep(time.Millisecond * 2000)
retrievalFailures := 0
for _, storageService := range storageServices {
messageRetrieved, err := storageService.GetByHash(ctx, cert.DataHash)
if err != nil {
retrievalFailures++
} else if !bytes.Equal(rawMsg, messageRetrieved) {
retrievalFailures++
// Wait until either enough writes become visible or timeout elapses.
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
deadline := time.NewTimer(daConfig.RequestTimeout)
defer deadline.Stop()
retrievalFailures := countRetrievalFailures(ctx, storageServices, cert, rawMsg)
for retrievalFailures > nFailures {
select {
case <-ctx.Done():
Fail(t, "Context canceled while waiting for backend stores to become visible")
case <-deadline.C:
// Recheck once at timeout boundary to avoid failing in the last ticker interval.
retrievalFailures = countRetrievalFailures(ctx, storageServices, cert, rawMsg)
if retrievalFailures <= nFailures {
break
}
Fail(t, fmt.Sprintf("Timed out waiting for backend stores: retrievalFailures(%d) > nFailures(%d)", retrievalFailures, nFailures))
case <-ticker.C:
retrievalFailures = countRetrievalFailures(ctx, storageServices, cert, rawMsg)
}
}

if retrievalFailures > nFailures {
Fail(t, fmt.Sprintf("retrievalFailures(%d) > nFailures(%d)", retrievalFailures, nFailures))
}
}

func initTest(t *testing.T) int {
func initTest(t *testing.T) (int, int64) {
t.Parallel()
seed := time.Now().UnixNano()
if len(*testflag.SeedFlag) > 0 {
Expand All @@ -246,7 +268,6 @@ func initTest(t *testing.T) int {
Require(t, err, "Failed to parse string")
seed = int64(intSeed)
}
rand.Seed(seed)

runs := 2 ^ 32
if len(*testflag.RunsFlag) > 0 {
Expand All @@ -261,26 +282,34 @@ func initTest(t *testing.T) int {

log.Trace(fmt.Sprintf("Running test with seed %d", seed))

return runs
return runs, seed
}

func TestAnyTrust_LessThanHStorageFailures(t *testing.T) {
runs := initTest(t)
runs, seed := initTest(t)

for i := 0; i < min(runs, 20); i++ {
i := i
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
t.Parallel()
testConfigurableStorageFailures(t, false)
subtestSeed := seed + int64(i)
log.Trace(fmt.Sprintf("Running subtest with seed %d", subtestSeed))
rng := rand.New(rand.NewSource(subtestSeed))
testConfigurableStorageFailures(t, false, rng)
})
}
}

func TestAnyTrust_AtLeastHStorageFailures(t *testing.T) {
runs := initTest(t)
runs, seed := initTest(t)
for i := 0; i < min(runs, 10); i++ {
i := i
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
t.Parallel()
testConfigurableStorageFailures(t, true)
subtestSeed := seed + int64(i)
log.Trace(fmt.Sprintf("Running subtest with seed %d", subtestSeed))
rng := rand.New(rand.NewSource(subtestSeed))
testConfigurableStorageFailures(t, true, rng)
})
}
}
Expand All @@ -299,7 +328,8 @@ func TestAnyTrust_InsufficientBackendsTriggersFallback(t *testing.T) {

log.Trace(fmt.Sprintf("Testing aggregator fallback with K:%d (K=N+1-H), N:%d, H:%d, and %d successes", numBackendAnyTrust+1-assumedHonest, numBackendAnyTrust, assumedHonest, nSuccesses))

injectedFailures := newRandomBagOfFailures(t, nSuccesses, nFailures, immediateError)
rng := rand.New(rand.NewSource(0))
injectedFailures := newRandomBagOfFailures(t, rng, nSuccesses, nFailures, immediateError)
var backends []ServiceDetails
for i := 0; i < numBackendAnyTrust; i++ {
privKey, err := blsSignatures.GeneratePrivKeyString()
Expand Down