Skip to content

Commit 90d76bb

Browse files
authored
Adding functionality to read from specific timestamps for KDS source (#6415)
Signed-off-by: Divyansh Bokadia <[email protected]>
1 parent 0044a0d commit 90d76bb

File tree

7 files changed

+190
-9
lines changed

7 files changed

+190
-9
lines changed

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTracker.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@
1414
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.ConsumerStrategy;
1515
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
1616
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
17+
import software.amazon.kinesis.common.InitialPositionInStream;
1718
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
1819
import software.amazon.kinesis.common.StreamConfig;
1920
import software.amazon.kinesis.common.StreamIdentifier;
2021
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
2122
import software.amazon.kinesis.processor.MultiStreamTracker;
2223

2324
import java.time.Duration;
25+
import java.time.Instant;
26+
import java.time.ZoneOffset;
27+
import java.util.Date;
2428
import java.util.List;
2529
import java.util.Objects;
2630
import java.util.stream.Collectors;
@@ -45,28 +49,25 @@ public List<StreamConfig> streamConfigList() {
4549

4650
private StreamConfig createStreamConfig(KinesisStreamConfig kinesisStreamConfig) {
4751
StreamIdentifier streamIdentifier = getStreamIdentifier(kinesisStreamConfig);
52+
InitialPositionInStreamExtended initialPosition = getInitialPositionExtended(kinesisStreamConfig);
4853

4954
// if the consumer strategy is polling, skip look up for consumer
5055
if (sourceConfig.getConsumerStrategy() == ConsumerStrategy.POLLING) {
51-
return new StreamConfig(streamIdentifier,
52-
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition())
53-
);
56+
return new StreamConfig(streamIdentifier, initialPosition);
5457
}
5558

5659
// If stream arn and consumer arn is present, create a stream config based on the configured values
5760
if (Objects.nonNull(kinesisStreamConfig.getStreamArn()) && Objects.nonNull(kinesisStreamConfig.getConsumerArn())) {
58-
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()), kinesisStreamConfig.getConsumerArn());
61+
return new StreamConfig(streamIdentifier, initialPosition, kinesisStreamConfig.getConsumerArn());
5962
}
6063

6164
// If stream arn is provided, lookup consumer arn based on the consumer name which is the data prepper application name
6265
if (Objects.nonNull(kinesisStreamConfig.getStreamArn())) {
6366
String consumerArn = kinesisClientAPIHandler.getConsumerArnForStream(kinesisStreamConfig.getStreamArn(), this.applicationName);
64-
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()), consumerArn);
67+
return new StreamConfig(streamIdentifier, initialPosition, consumerArn);
6568
}
6669
// Default case
67-
return new StreamConfig(streamIdentifier,
68-
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition())
69-
);
70+
return new StreamConfig(streamIdentifier, initialPosition);
7071
}
7172

7273
private StreamIdentifier getStreamIdentifier(final KinesisStreamConfig kinesisStreamConfig) {
@@ -79,6 +80,21 @@ private StreamIdentifier getStreamIdentifier(final KinesisStreamConfig kinesisSt
7980

8081
return kinesisClientAPIHandler.getStreamIdentifier(streamArn != null ? streamArn : streamName);
8182
}
83+
84+
private InitialPositionInStreamExtended getInitialPositionExtended(KinesisStreamConfig kinesisStreamConfig) {
85+
if (kinesisStreamConfig.getInitialPosition() == InitialPositionInStream.AT_TIMESTAMP) {
86+
Instant timestamp;
87+
if (Objects.nonNull(kinesisStreamConfig.getInitialTimestamp())) {
88+
timestamp = kinesisStreamConfig.getInitialTimestamp().atOffset(ZoneOffset.UTC).toInstant();
89+
} else if (Objects.nonNull(kinesisStreamConfig.getRange())) {
90+
timestamp = Instant.now().minus(kinesisStreamConfig.getRange());
91+
} else {
92+
throw new IllegalArgumentException("Either initial_timestamp or range must be specified when using AT_TIMESTAMP initial_position");
93+
}
94+
return InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(timestamp));
95+
}
96+
return InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition());
97+
}
8298
/**
8399
* Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec
84100
*/

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/InitialPositionInStreamConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
@Getter
2121
public enum InitialPositionInStreamConfig {
2222
LATEST("latest", InitialPositionInStream.LATEST),
23-
EARLIEST("earliest", InitialPositionInStream.TRIM_HORIZON);
23+
EARLIEST("earliest", InitialPositionInStream.TRIM_HORIZON),
24+
AT_TIMESTAMP("at_timestamp", InitialPositionInStream.AT_TIMESTAMP);
2425

2526
private final String position;
2627

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@
1111
package org.opensearch.dataprepper.plugins.kinesis.source.configuration;
1212

1313
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
15+
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
1416
import jakarta.validation.Valid;
1517
import lombok.Getter;
1618
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
1719
import software.amazon.awssdk.arns.Arn;
1820
import software.amazon.kinesis.common.InitialPositionInStream;
1921

2022
import java.time.Duration;
23+
import java.time.LocalDateTime;
2124
import java.util.Objects;
2225

2326
@Getter
@@ -44,6 +47,13 @@ public class KinesisStreamConfig {
4447
@JsonProperty("checkpoint_interval")
4548
private Duration checkPointInterval = MINIMAL_CHECKPOINT_INTERVAL;
4649

50+
@JsonProperty("range")
51+
private Duration range;
52+
53+
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
54+
@JsonProperty("initial_timestamp")
55+
private LocalDateTime initialTimestamp;
56+
4757
public InitialPositionInStream getInitialPosition() {
4858
return initialPosition.getPositionInStream();
4959
}

data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTrackerTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828

2929
import java.time.Duration;
3030
import java.time.Instant;
31+
import java.time.LocalDateTime;
32+
import java.time.ZoneOffset;
3133
import java.util.ArrayList;
34+
import java.util.Date;
3235
import java.util.HashMap;
3336
import java.util.List;
3437
import java.util.Map;
@@ -37,6 +40,7 @@
3740
import static org.junit.jupiter.api.Assertions.assertEquals;
3841
import static org.junit.jupiter.api.Assertions.assertNull;
3942
import static org.junit.jupiter.api.Assertions.assertThrows;
43+
import static org.junit.jupiter.api.Assertions.assertTrue;
4044
import static org.mockito.Mockito.mock;
4145
import static org.mockito.Mockito.when;
4246

@@ -190,6 +194,85 @@ void testStreamConfigWithStreamArnOnly() {
190194
assertEquals("streamName", expectedIdentifier.streamName());
191195
}
192196

197+
@Test
198+
void testStreamConfigWithAtTimeStampInitialPositionWithInitialTimestamp() {
199+
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);
200+
final String streamArnString = "arn:aws:kinesis:us-east-1:123456789012:stream/streamName";
201+
when(streamConfig.getStreamArn()).thenReturn(streamArnString);
202+
when(streamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.AT_TIMESTAMP);
203+
when(streamConfig.getInitialTimestamp()).thenReturn(LocalDateTime.of(2024, 1, 15, 10, 30));
204+
when(kinesisSourceConfig.getStreams()).thenReturn(List.of(streamConfig));
205+
206+
StreamIdentifier expectedIdentifier = StreamIdentifier.multiStreamInstance(Arn.fromString(streamArnString), 100L);
207+
when(kinesisClientAPIHandler.getStreamIdentifier(streamConfig.getStreamArn()))
208+
.thenReturn(expectedIdentifier);
209+
final String expectedConsumerArn = UUID.randomUUID().toString();
210+
when(kinesisClientAPIHandler.getConsumerArnForStream(streamConfig.getStreamArn(), APPLICATION_NAME))
211+
.thenReturn(expectedConsumerArn);
212+
213+
List<StreamConfig> configs = createObjectUnderTest().streamConfigList();
214+
215+
assertEquals(1, configs.size());
216+
StreamConfig resultConfig = configs.get(0);
217+
assertEquals(expectedIdentifier, resultConfig.streamIdentifier());
218+
assertEquals(expectedConsumerArn, resultConfig.consumerArn());
219+
assertEquals("streamName", expectedIdentifier.streamName());
220+
assertEquals(InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(
221+
streamConfig.getInitialTimestamp().atOffset(ZoneOffset.UTC).toInstant())), resultConfig.initialPositionInStreamExtended());
222+
}
223+
224+
@Test
225+
void testStreamConfigWithAtTimeStampInitialPositionWithRange() {
226+
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);
227+
final String streamArnString = "arn:aws:kinesis:us-east-1:123456789012:stream/streamName";
228+
when(streamConfig.getStreamArn()).thenReturn(streamArnString);
229+
when(streamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.AT_TIMESTAMP);
230+
when(streamConfig.getRange()).thenReturn(Duration.ofMinutes(30));
231+
when(kinesisSourceConfig.getStreams()).thenReturn(List.of(streamConfig));
232+
233+
StreamIdentifier expectedIdentifier = StreamIdentifier.multiStreamInstance(Arn.fromString(streamArnString), 100L);
234+
when(kinesisClientAPIHandler.getStreamIdentifier(streamConfig.getStreamArn()))
235+
.thenReturn(expectedIdentifier);
236+
final String expectedConsumerArn = UUID.randomUUID().toString();
237+
when(kinesisClientAPIHandler.getConsumerArnForStream(streamConfig.getStreamArn(), APPLICATION_NAME))
238+
.thenReturn(expectedConsumerArn);
239+
240+
List<StreamConfig> configs = createObjectUnderTest().streamConfigList();
241+
242+
assertEquals(1, configs.size());
243+
StreamConfig resultConfig = configs.get(0);
244+
assertEquals(expectedIdentifier, resultConfig.streamIdentifier());
245+
assertEquals(expectedConsumerArn, resultConfig.consumerArn());
246+
assertEquals("streamName", expectedIdentifier.streamName());
247+
assertEquals(InitialPositionInStream.AT_TIMESTAMP, resultConfig.initialPositionInStreamExtended().getInitialPositionInStream());
248+
249+
long actualTime = resultConfig.initialPositionInStreamExtended().getTimestamp().getTime();
250+
assertTrue(actualTime <= System.currentTimeMillis() - Duration.ofMinutes(30).toMillis());
251+
}
252+
253+
@Test
254+
void testStreamConfigWithAtTimeStampInitialPositionWithNoRangeAndNoInitialTimestamp() {
255+
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);
256+
final String streamArnString = "arn:aws:kinesis:us-east-1:123456789012:stream/streamName";
257+
when(streamConfig.getStreamArn()).thenReturn(streamArnString);
258+
when(streamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.AT_TIMESTAMP);
259+
when(streamConfig.getRange()).thenReturn(null);
260+
when(streamConfig.getInitialTimestamp()).thenReturn(null);
261+
when(kinesisSourceConfig.getStreams()).thenReturn(List.of(streamConfig));
262+
263+
StreamIdentifier expectedIdentifier = StreamIdentifier.multiStreamInstance(Arn.fromString(streamArnString), 100L);
264+
when(kinesisClientAPIHandler.getStreamIdentifier(streamConfig.getStreamArn()))
265+
.thenReturn(expectedIdentifier);
266+
267+
KinesisMultiStreamTracker tracker = createObjectUnderTest();
268+
269+
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
270+
tracker::streamConfigList);
271+
assertEquals("Either initial_timestamp or range must be " +
272+
"specified when using AT_TIMESTAMP initial_position",
273+
exception.getMessage());
274+
}
275+
193276
@Test
194277
void testStreamConfigWithNoArnOrName() {
195278
KinesisStreamConfig streamConfig = mock(KinesisStreamConfig.class);

data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/InitialPositionInStreamConfigTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,13 @@ void testInitialPositionGetByNameEarliest() {
3535
assertEquals(initialPositionInStreamConfig.getPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
3636
}
3737

38+
@Test
39+
void testInitialPositionGetByNameAtTimestamp() {
40+
final InitialPositionInStreamConfig initialPositionInStreamConfig = InitialPositionInStreamConfig.fromPositionValue("at_timestamp");
41+
assertEquals(initialPositionInStreamConfig, InitialPositionInStreamConfig.AT_TIMESTAMP);
42+
assertEquals(initialPositionInStreamConfig.toString(), "at_timestamp");
43+
assertEquals(initialPositionInStreamConfig.getPosition(), "at_timestamp");
44+
assertEquals(initialPositionInStreamConfig.getPositionInStream(), InitialPositionInStream.AT_TIMESTAMP);
45+
}
46+
3847
}

data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import java.io.Reader;
2929
import java.io.StringReader;
3030
import java.time.Duration;
31+
import java.time.LocalDateTime;
3132
import java.util.HashMap;
3233
import java.util.List;
3334
import java.util.Map;
35+
import java.util.stream.Collectors;
3436

3537
import static org.hamcrest.MatcherAssert.assertThat;
3638
import static org.hamcrest.Matchers.notNullValue;
@@ -46,6 +48,7 @@ public class KinesisSourceConfigTest {
4648
private static final String PIPELINE_CONFIG_CHECKPOINT_ENABLED = "pipeline_with_checkpoint_enabled.yaml";
4749
private static final String PIPELINE_CONFIG_STREAM_ARN_ENABLED = "pipeline_with_stream_arn_config.yaml";
4850
private static final String PIPELINE_CONFIG_STREAM_ARN_CONSUMER_ARN_ENABLED = "pipeline_with_stream_arn_consumer_arn_config.yaml";
51+
private static final String PIPELINE_CONFIG_WITH_INITIAL_POSITION_AT_TIMESTAMP = "pipeline_with_initial_position_at_timestamp_config.yaml";
4952
private static final Duration MINIMAL_CHECKPOINT_INTERVAL = Duration.ofMillis(2 * 60 * 1000); // 2 minute
5053

5154
KinesisSourceConfig kinesisSourceConfig;
@@ -234,4 +237,49 @@ void testSourceConfigWithStreamArnConsumerArn() {
234237
assertEquals(kinesisStreamConfig.getCheckPointInterval(), MINIMAL_CHECKPOINT_INTERVAL);
235238
}
236239
}
240+
241+
@Test
242+
@Tag(PIPELINE_CONFIG_WITH_INITIAL_POSITION_AT_TIMESTAMP)
243+
void testSourceConfigWithInitialPositionAtTimestamp() {
244+
245+
assertThat(kinesisSourceConfig, notNullValue());
246+
assertEquals(KinesisSourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, kinesisSourceConfig.getNumberOfRecordsToAccumulate());
247+
assertEquals(KinesisSourceConfig.DEFAULT_TIME_OUT_IN_MILLIS, kinesisSourceConfig.getBufferTimeout());
248+
assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts());
249+
assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime());
250+
assertFalse(kinesisSourceConfig.isAcknowledgments());
251+
assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout());
252+
assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue());
253+
assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1);
254+
assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsRoleArn(), "arn:aws:iam::123456789012:role/OSI-PipelineRole");
255+
assertNull(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsExternalId());
256+
assertNull(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsHeaderOverrides());
257+
assertNotNull(kinesisSourceConfig.getCodec());
258+
List<KinesisStreamConfig> streamConfigs = kinesisSourceConfig.getStreams();
259+
assertEquals(kinesisSourceConfig.getConsumerStrategy(), ConsumerStrategy.ENHANCED_FAN_OUT);
260+
261+
assertEquals(streamConfigs.size(), 2);
262+
263+
Map<String, KinesisStreamConfig> streamConfigMap = streamConfigs.stream()
264+
.collect(Collectors.toMap(KinesisStreamConfig::getName, config -> config));
265+
266+
assertEquals(2, streamConfigMap.size());
267+
268+
KinesisStreamConfig stream1 = streamConfigMap.get("stream-1");
269+
assertNotNull(stream1);
270+
assertNull(stream1.getStreamArn());
271+
assertNull(stream1.getConsumerArn());
272+
assertEquals(InitialPositionInStream.AT_TIMESTAMP, stream1.getInitialPosition());
273+
assertEquals(Duration.parse("P3DT12H"), stream1.getRange());
274+
assertNull(stream1.getInitialTimestamp());
275+
276+
KinesisStreamConfig stream2 = streamConfigMap.get("stream-2");
277+
assertNotNull(stream2);
278+
assertNull(stream2.getStreamArn());
279+
assertNull(stream2.getConsumerArn());
280+
assertEquals(InitialPositionInStream.AT_TIMESTAMP, stream2.getInitialPosition());
281+
assertNull(stream2.getRange());
282+
assertEquals(LocalDateTime.parse("2024-01-15T10:30:00"), stream2.getInitialTimestamp());
283+
}
284+
237285
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
source:
2+
kinesis:
3+
streams:
4+
- stream_name: "stream-1"
5+
initial_position: "AT_TIMESTAMP"
6+
range: "P3DT12H"
7+
- stream_name: "stream-2"
8+
initial_position: "AT_TIMESTAMP"
9+
initial_timestamp: "2024-01-15T10:30:00"
10+
codec:
11+
ndjson:
12+
aws:
13+
sts_role_arn: "arn:aws:iam::123456789012:role/OSI-PipelineRole"
14+
region: "us-east-1"

0 commit comments

Comments
 (0)