Storage - Fix Flaky Stress Tests#48359
Conversation
- read functions had FAIL_FAST which would throw an error when the stream had reached then end and we wanted to read from the stream again. So we removed from both reads. - refactor code so that the exit criteria is a tthe beginning - refactor the emitContentInfo for dry
- changed emitValue to tryEmitValue - remove Sinks.EmitFailureHandler.FAIL_FAST so that multiple closes does not cause an error to be thrown
- opentelemetry-runtime-telemetry-java8 from 2.24.0-alpha -> 2.15.0-alpha - opentelemetry-logback-appender-1.0 from 2.24.0-alpha -> 2.15.0-alpha
There was a problem hiding this comment.
Pull request overview
This PR aims to reduce flakiness in Storage stress tests by making cleanup more resilient, making CRC telemetry streams tolerate re-subscription/double-close behaviors, and aligning dependencies with the chosen OpenTelemetry runtime metrics version.
Changes:
- Replace unconditional deletes with
deleteIfExists()across multiple stress scenarios to avoid cleanup failures when resources are already gone. - Add retry/timeout-based global cleanup logic in scenario base classes and add retry logic to async runs.
- Adjust CRC stream emission behavior to avoid failures on repeated terminal events; downgrade OpenTelemetry instrumentation dependencies to
2.15.0-alpha.
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 20 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java | Adjust JVM runtime metrics registration and make timeout/cancellation detection null-safe. |
| sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java | Switch sink emission to tryEmitValue to tolerate double-close. |
| sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java | Refactor EOF emission, add resubscription state reset, and switch to tryEmitValue. |
| sdk/storage/azure-storage-stress/pom.xml | Downgrade OTel runtime telemetry + logback appender to 2.15.0-alpha. |
| sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java | Use deleteIfExists() during per-test cleanup. |
| sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java | Add retrying global cleanup + async retry behavior and new logging. |
| sdk/storage/azure-storage-file-share-stress/pom.xml | Downgrade OTel runtime telemetry + logback appender to 2.15.0-alpha. |
| sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java | Use deleteIfExists() during per-test cleanup. |
| sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java | Use deleteIfExists() during per-test cleanup. |
| sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java | Add retrying global cleanup + async retry behavior and new logging. |
| sdk/storage/azure-storage-file-datalake-stress/pom.xml | Downgrade OTel runtime telemetry + logback appender to 2.15.0-alpha. |
| sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java | Use deleteIfExists() and swallow delete errors during cleanup. |
| sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java | Use deleteIfExists() during per-test cleanup. |
| sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java | Use deleteIfExists() during per-test cleanup. |
| sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java | Add retrying global cleanup + async retry behavior and new logging. |
| sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java | Use deleteIfExists() and swallow delete errors during cleanup. |
| sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java | Use deleteIfExists() during per-test cleanup. |
| sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java | Use deleteIfExists() during per-test cleanup. |
| sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java | Use deleteIfExists() during per-test cleanup. |
| sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java | Add retrying global cleanup + async retry behavior and structured logging. |
| sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java | Use deleteIfExists() and swallow delete errors during cleanup. |
| sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java | Use deleteIfExists() during per-test cleanup. |
| sdk/storage/azure-storage-blob-stress/pom.xml | Downgrade OTel runtime telemetry + logback appender to 2.15.0-alpha. |
| sdk/parents/azure-client-sdk-parent/pom.xml | Downgrade io.clientcore:linting-extensions used by checkstyle plugin from beta.2 to beta.1. |
...talake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java
Outdated
Show resolved
Hide resolved
| private Mono<Void> cleanupContainerWithRetry() { | ||
| return asyncNoFaultContainerClient.deleteIfExists() | ||
| .then(super.globalCleanupAsync()); | ||
| .then() // Convert Mono<Boolean> to Mono<Void> | ||
| .timeout(Duration.ofSeconds(30)) | ||
| .retry(3) | ||
| .onErrorResume(error -> { | ||
| // If container deletion fails, try to delete all blobs first then retry container deletion | ||
| return deleteAllBlobsInContainer() | ||
| .then(asyncNoFaultContainerClient.deleteIfExists()) | ||
| .then() // Convert Mono<Boolean> to Mono<Void> | ||
| .timeout(Duration.ofSeconds(30)) | ||
| .onErrorResume(finalError -> { | ||
| // Log the error but don't fail the test | ||
| LOGGER.atWarning() | ||
| .addKeyValue("error", finalError.getMessage()) | ||
| .log("Final container cleanup failed after retries"); | ||
| return Mono.empty(); | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Delete all blobs in the container to help with cleanup. | ||
| */ | ||
| private Mono<Void> deleteAllBlobsInContainer() { | ||
| return asyncNoFaultContainerClient.listBlobs() | ||
| .flatMap(blobItem -> | ||
| asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()).delete()) | ||
| .then() | ||
| .timeout(Duration.ofSeconds(60)) | ||
| .onErrorResume(error -> { | ||
| // Log but continue - some blobs might have been deleted | ||
| LOGGER.atWarning() | ||
| .addKeyValue("error", error.getMessage()) | ||
| .log("Blob cleanup partially failed"); | ||
| return Mono.empty(); | ||
| }); |
There was a problem hiding this comment.
cleanupContainerWithRetry/deleteAllBlobsInContainer uses flatMap without bounding concurrency. If a run creates many blobs, cleanup may issue a burst of delete requests and hit throttling/timeouts, making cleanup less reliable. Consider limiting concurrency (flatMap(..., concurrency)) or using concatMap, potentially with a small backoff between retries.
There was a problem hiding this comment.
sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java
Show resolved
Hide resolved
| private Mono<Void> deleteAllFilesInShare() { | ||
| return asyncNoFaultShareClient.getDirectoryClient("").listFilesAndDirectories() | ||
| .flatMap(fileRef -> | ||
| asyncNoFaultShareClient.getFileClient(fileRef.getName()).delete()) | ||
| .then() |
There was a problem hiding this comment.
cleanupShareWithRetry/deleteAllFilesInShare currently treats every ShareFileItem as a file (getFileClient(...).delete()) and doesn't handle directories. Since listFilesAndDirectories() returns both files and directories, this will fail for directories and likely prevent share deletion, defeating the retry cleanup intent. Please branch on fileRef.isDirectory() and delete directories (ideally recursively, deleting children before the directory) as well as files.
| return asyncNoFaultShareClient.getDirectoryClient("").listFilesAndDirectories() | ||
| .flatMap(fileRef -> | ||
| asyncNoFaultShareClient.getFileClient(fileRef.getName()).delete()) | ||
| .then() | ||
| .timeout(java.time.Duration.ofSeconds(60)) | ||
| .onErrorResume(error -> { |
There was a problem hiding this comment.
deleteAllFilesInShare uses flatMap without any concurrency limit, which can trigger a large number of parallel delete requests during cleanup and increase throttling/failure risk. Consider bounding concurrency (flatMap with a concurrency parameter) or using concatMap for more predictable cleanup behavior.
There was a problem hiding this comment.
maybe we could also explore doing this synchronously to prevent throttling?
There was a problem hiding this comment.
I think you're right. There aren't that many resources that need to be cleaned up, but piling them all at once could cause issues.
| private Mono<Void> cleanupContainerWithRetry() { | ||
| return asyncNoFaultContainerClient.deleteIfExists() | ||
| .then(super.globalCleanupAsync()); | ||
| .then() // Convert Mono<Boolean> to Mono<Void> | ||
| .timeout(Duration.ofSeconds(30)) | ||
| .retry(3) | ||
| .onErrorResume(error -> { | ||
| // If container deletion fails, try to delete all blobs first then retry container deletion | ||
| return deleteAllBlobsInContainer() | ||
| .then(asyncNoFaultContainerClient.deleteIfExists()) | ||
| .then() // Convert Mono<Boolean> to Mono<Void> | ||
| .timeout(Duration.ofSeconds(30)) | ||
| .onErrorResume(finalError -> { | ||
| // Log the error but don't fail the test | ||
| LOGGER.atWarning() | ||
| .addKeyValue("error", finalError.getMessage()) | ||
| .log("Final container cleanup failed after retries"); | ||
| return Mono.empty(); | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Delete all blobs in the container to help with cleanup. | ||
| */ | ||
| private Mono<Void> deleteAllBlobsInContainer() { | ||
| return asyncNoFaultContainerClient.listBlobs() | ||
| .flatMap(blobItem -> | ||
| asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()).delete()) | ||
| .then() | ||
| .timeout(Duration.ofSeconds(60)) | ||
| .onErrorResume(error -> { | ||
| // Log but continue - some blobs might have been deleted | ||
| LOGGER.atWarning() | ||
| .addKeyValue("error", error.getMessage()) | ||
| .log("Blob cleanup partially failed"); | ||
| return Mono.empty(); | ||
| }); |
There was a problem hiding this comment.
cleanupContainerWithRetry/deleteAllBlobsInContainer uses flatMap without a concurrency limit, which can create a large burst of delete calls and increase throttling/timeouts during cleanup. Consider bounding concurrency or switching to concatMap for more reliable cleanup.
There was a problem hiding this comment.
sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java
Outdated
Show resolved
Hide resolved
...re-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java
Show resolved
Hide resolved
sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java
Show resolved
Hide resolved
| return asyncNoFaultShareClient.getDirectoryClient("").listFilesAndDirectories() | ||
| .flatMap(fileRef -> | ||
| asyncNoFaultShareClient.getFileClient(fileRef.getName()).delete()) | ||
| .then() | ||
| .timeout(java.time.Duration.ofSeconds(60)) | ||
| .onErrorResume(error -> { |
There was a problem hiding this comment.
maybe we could also explore doing this synchronously to prevent throttling?
| private Mono<Void> deleteAllFilesInFileSystem() { | ||
| return asyncNoFaultFileSystemClient.listPaths() | ||
| .flatMap(pathItem -> | ||
| asyncNoFaultFileSystemClient.getFileAsyncClient(pathItem.getName()).delete()) |
There was a problem hiding this comment.
| private Mono<Void> cleanupContainerWithRetry() { | ||
| return asyncNoFaultContainerClient.deleteIfExists() | ||
| .then(super.globalCleanupAsync()); | ||
| .then() // Convert Mono<Boolean> to Mono<Void> | ||
| .timeout(Duration.ofSeconds(30)) | ||
| .retry(3) | ||
| .onErrorResume(error -> { | ||
| // If container deletion fails, try to delete all blobs first then retry container deletion | ||
| return deleteAllBlobsInContainer() | ||
| .then(asyncNoFaultContainerClient.deleteIfExists()) | ||
| .then() // Convert Mono<Boolean> to Mono<Void> | ||
| .timeout(Duration.ofSeconds(30)) | ||
| .onErrorResume(finalError -> { | ||
| // Log the error but don't fail the test | ||
| LOGGER.atWarning() | ||
| .addKeyValue("error", finalError.getMessage()) | ||
| .log("Final container cleanup failed after retries"); | ||
| return Mono.empty(); | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Delete all blobs in the container to help with cleanup. | ||
| */ | ||
| private Mono<Void> deleteAllBlobsInContainer() { | ||
| return asyncNoFaultContainerClient.listBlobs() | ||
| .flatMap(blobItem -> | ||
| asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()).delete()) | ||
| .then() | ||
| .timeout(Duration.ofSeconds(60)) | ||
| .onErrorResume(error -> { | ||
| // Log but continue - some blobs might have been deleted | ||
| LOGGER.atWarning() | ||
| .addKeyValue("error", error.getMessage()) | ||
| .log("Blob cleanup partially failed"); | ||
| return Mono.empty(); | ||
| }); |
There was a problem hiding this comment.
| private Mono<Void> cleanupContainerWithRetry() { | ||
| return asyncNoFaultContainerClient.deleteIfExists() | ||
| .then(super.globalCleanupAsync()); | ||
| .then() // Convert Mono<Boolean> to Mono<Void> | ||
| .timeout(Duration.ofSeconds(30)) | ||
| .retry(3) | ||
| .onErrorResume(error -> { | ||
| // If container deletion fails, try to delete all blobs first then retry container deletion | ||
| return deleteAllBlobsInContainer() | ||
| .then(asyncNoFaultContainerClient.deleteIfExists()) | ||
| .then() // Convert Mono<Boolean> to Mono<Void> | ||
| .timeout(Duration.ofSeconds(30)) | ||
| .onErrorResume(finalError -> { | ||
| // Log the error but don't fail the test | ||
| LOGGER.atWarning() | ||
| .addKeyValue("error", finalError.getMessage()) | ||
| .log("Final container cleanup failed after retries"); | ||
| return Mono.empty(); | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Delete all blobs in the container to help with cleanup. | ||
| */ | ||
| private Mono<Void> deleteAllBlobsInContainer() { | ||
| return asyncNoFaultContainerClient.listBlobs() | ||
| .flatMap(blobItem -> | ||
| asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()).delete()) | ||
| .then() | ||
| .timeout(Duration.ofSeconds(60)) | ||
| .onErrorResume(error -> { | ||
| // Log but continue - some blobs might have been deleted | ||
| LOGGER.atWarning() | ||
| .addKeyValue("error", error.getMessage()) | ||
| .log("Blob cleanup partially failed"); | ||
| return Mono.empty(); | ||
| }); |
There was a problem hiding this comment.
This is a fix to fix the issues we've been having with the stress tests.