Skip to content

Commit b801d2f

Browse files
tests(kinesis): attempt to fix flaky tests (#670)
1 parent 7b43470 commit b801d2f

File tree

4 files changed

+24
-24
lines changed

4 files changed

+24
-24
lines changed

src/test/java/io/kestra/plugin/aws/kinesis/AbstractKinesisTest.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,36 +23,36 @@ public class AbstractKinesisTest extends AbstractLocalStackTest {
2323
@Inject
2424
protected RunContextFactory runContextFactory;
2525
protected static String streamArn;
26-
protected static final String STREAM_NAME = "stream";
26+
protected static String streamName;
2727

2828
@BeforeAll
29-
static void beforeAll() throws InterruptedException {
30-
try(KinesisClient kinesisClient = KinesisClient.builder()
31-
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(
32-
localstack.getAccessKey(),
33-
localstack.getSecretKey()
34-
)))
29+
static void setupStream() throws InterruptedException {
30+
streamName = "stream-" + IdUtils.create();
31+
32+
try (KinesisClient kinesisClient = KinesisClient.builder()
33+
.credentialsProvider(StaticCredentialsProvider.create(
34+
AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())
35+
))
3536
.region(Region.of(localstack.getRegion()))
3637
.endpointOverride(localstack.getEndpoint())
3738
.build()) {
3839

39-
try {
40-
kinesisClient.createStream(CreateStreamRequest.builder()
41-
.streamName(STREAM_NAME)
42-
.shardCount(1)
43-
.build());
44-
} catch (ResourceInUseException ignored) {}
40+
kinesisClient.createStream(CreateStreamRequest.builder()
41+
.streamName(streamName)
42+
.shardCount(1)
43+
.build());
4544

46-
DescribeStreamResponse ds = kinesisClient.describeStream(r -> r.streamName(STREAM_NAME));
47-
while (ds.streamDescription().streamStatus() != StreamStatus.ACTIVE) {
45+
DescribeStreamResponse ds;
46+
do {
4847
Thread.sleep(200);
49-
ds = kinesisClient.describeStream(r -> r.streamName(STREAM_NAME));
50-
}
48+
ds = kinesisClient.describeStream(r -> r.streamName(streamName));
49+
} while (ds.streamDescription().streamStatus() != StreamStatus.ACTIVE);
5150

5251
streamArn = ds.streamDescription().streamARN();
5352
}
5453
}
5554

55+
5656
protected String registerConsumer() throws Exception {
5757
try (KinesisClient kinesis = KinesisClient.builder()
5858
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(

src/test/java/io/kestra/plugin/aws/kinesis/ConsumeTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ void testConsume() throws Exception {
3737
.region(Property.ofValue(localstack.getRegion()))
3838
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
3939
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
40-
.streamName(Property.ofValue("stream"))
40+
.streamName(Property.ofValue(streamName))
4141
.records(List.of(record))
4242
.build();
4343

@@ -48,7 +48,7 @@ void testConsume() throws Exception {
4848
.region(Property.ofValue(localstack.getRegion()))
4949
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
5050
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
51-
.streamName(Property.ofValue("stream"))
51+
.streamName(Property.ofValue(streamName))
5252
.iteratorType(Property.ofValue(AbstractKinesis.IteratorType.TRIM_HORIZON))
5353
.maxRecords(Property.ofValue(10))
5454
.pollDuration(Property.ofValue(java.time.Duration.ofSeconds(1)))

src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,15 @@ void evaluate() throws Exception {
6767
triggers:
6868
- id: realtime
6969
type: io.kestra.plugin.aws.kinesis.RealtimeTrigger
70-
streamName: "stream"
70+
streamName: "%s"
7171
consumerArn: "%s"
7272
region: "us-east-1"
7373
accessKeyId: "test"
7474
secretKeyId: "test"
7575
endpointOverride: "http://localhost:4566"
7676
iteratorType: TRIM_HORIZON
7777
"""
78-
.formatted(consumerArn);
78+
.formatted(streamName, consumerArn);
7979

8080

8181
File tempFlow = File.createTempFile("kinesis-realtime", ".yaml");
@@ -93,7 +93,7 @@ void evaluate() throws Exception {
9393
.region(Property.ofValue(localstack.getRegion()))
9494
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
9595
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
96-
.streamName(Property.ofValue("stream"))
96+
.streamName(Property.ofValue(streamName))
9797
.records(List.of(record))
9898
.build();
9999

src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ void evaluate() throws Exception {
4141
.region(Property.ofValue(localstack.getRegion()))
4242
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
4343
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
44-
.streamName(Property.ofValue("stream"))
44+
.streamName(Property.ofValue(streamName))
4545
.records(List.of(record, record2, record3))
4646
.build();
4747

@@ -50,7 +50,7 @@ void evaluate() throws Exception {
5050
var trigger = Trigger.builder()
5151
.id(TriggerTest.class.getSimpleName())
5252
.type(TriggerTest.class.getName())
53-
.streamName(Property.ofValue("stream"))
53+
.streamName(Property.ofValue(streamName))
5454
.iteratorType(Property.ofValue(AbstractKinesis.IteratorType.TRIM_HORIZON))
5555
.maxRecords(Property.ofValue(3))
5656
.region(Property.ofValue(localstack.getRegion()))

0 commit comments

Comments
 (0)