Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pull-based Ingestion] Add Kinesis plugin support #17615

Merged
merged 8 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Renaming the node role search to warm ([#17573](https://github.com/opensearch-project/OpenSearch/pull/17573))
- Introduce a new search node role to hold search only shards ([#17620](https://github.com/opensearch-project/OpenSearch/pull/17620))
- Add dfs transformation function in XContentMapValues ([#17612](https://github.com/opensearch-project/OpenSearch/pull/17612))
- Added Kinesis support as a plugin for the pull-based ingestion ([#17615](https://github.com/opensearch-project/OpenSearch/pull/17615)

### Dependencies
- Bump `ch.qos.logback:logback-core` from 1.5.16 to 1.5.17 ([#17609](https://github.com/opensearch-project/OpenSearch/pull/17609))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,34 @@
}

@Override
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(KafkaOffset offset, long maxMessages, int timeoutMillis)
throws TimeoutException {
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(
KafkaOffset offset,
boolean includeStart,
long maxMessages,
int timeoutMillis
) throws TimeoutException {
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(offset.getOffset(), maxMessages, timeoutMillis)
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(
offset.getOffset(),
includeStart,
maxMessages,
timeoutMillis
)
);
return records;
}

@Override
public KafkaOffset nextPointer() {
return new KafkaOffset(lastFetchedOffset + 1);
}

@Override
public KafkaOffset nextPointer(KafkaOffset pointer) {
return new KafkaOffset(pointer.getOffset() + 1);
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(long maxMessages, int timeoutMillis) throws TimeoutException {
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(

Check warning on line 142 in plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java#L141-L142

Added lines #L141 - L142 were not covered by tests
lastFetchedOffset,
false,
maxMessages,
timeoutMillis
)
);
return records;

Check warning on line 149 in plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java#L149

Added line #L149 was not covered by tests
}

@Override
Expand Down Expand Up @@ -191,18 +203,28 @@
return new KafkaOffset(offsetValue);
}

private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(long startOffset, long maxMessages, int timeoutMillis) {
if (lastFetchedOffset < 0 || lastFetchedOffset != startOffset - 1) {
logger.info("Seeking to offset {}", startOffset);
consumer.seek(topicPartition, startOffset);
private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(
long startOffset,
boolean includeStart,
long maxMessages,
int timeoutMillis
) {
long kafkaStartOffset = startOffset;
if (!includeStart) {
kafkaStartOffset += 1;

Check warning on line 214 in plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java#L214

Added line #L214 was not covered by tests
}

if (lastFetchedOffset < 0 || lastFetchedOffset != kafkaStartOffset - 1) {
logger.info("Seeking to offset {}", kafkaStartOffset);
consumer.seek(topicPartition, kafkaStartOffset);
// update the last fetched offset so that we don't need to seek again if no more messages to fetch
lastFetchedOffset = startOffset - 1;
lastFetchedOffset = kafkaStartOffset - 1;
}

ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(timeoutMillis));
List<ConsumerRecord<byte[], byte[]>> messageAndOffsets = consumerRecords.records(topicPartition);

long endOffset = startOffset + maxMessages;
long endOffset = kafkaStartOffset + maxMessages;
List<ReadResult<KafkaOffset, KafkaMessage>> results = new ArrayList<>();

for (ConsumerRecord<byte[], byte[]> messageAndOffset : messageAndOffsets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ public void testReadNext() throws Exception {

when(mockConsumer.poll(any(Duration.class))).thenReturn(records);

List<IngestionShardConsumer.ReadResult<KafkaOffset, KafkaMessage>> result = consumer.readNext(new KafkaOffset(0), 10, 1000);
List<IngestionShardConsumer.ReadResult<KafkaOffset, KafkaMessage>> result = consumer.readNext(new KafkaOffset(0), true, 10, 1000);

assertEquals(1, result.size());
assertEquals("message", new String(result.get(0).getMessage().getPayload(), StandardCharsets.UTF_8));
assertEquals(1, consumer.nextPointer().getOffset());
assertEquals(0, consumer.getShardId());
assertEquals("client1", consumer.getClientId());
}
Expand Down
251 changes: 251 additions & 0 deletions plugins/ingestion-kinesis/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description = 'Pull-based ingestion plugin to consume from Kinesis'
classname = 'org.opensearch.plugin.kinesis.KinesisPlugin'
}

versions << [
'docker': '3.3.6',
'testcontainers': '1.19.7',
'ducttape': '1.0.8',
'snappy': '1.1.10.7',
]

dependencies {
// aws sdk
api "software.amazon.awssdk:sdk-core:${versions.aws}"
api "software.amazon.awssdk:annotations:${versions.aws}"
api "software.amazon.awssdk:aws-core:${versions.aws}"
api "software.amazon.awssdk:auth:${versions.aws}"
api "software.amazon.awssdk:identity-spi:${versions.aws}"
api "software.amazon.awssdk:checksums:${versions.aws}"
api "software.amazon.awssdk:checksums-spi:${versions.aws}"
api "software.amazon.awssdk.crt:aws-crt:${versions.awscrt}"
api "software.amazon.awssdk:http-auth:${versions.aws}"
api "software.amazon.awssdk:http-auth-aws:${versions.aws}"
api "software.amazon.awssdk:http-auth-spi:${versions.aws}"
api "software.amazon.awssdk:retries:${versions.aws}"
api "software.amazon.awssdk:retries-spi:${versions.aws}"
api "software.amazon.awssdk:endpoints-spi:${versions.aws}"
api "software.amazon.awssdk:http-client-spi:${versions.aws}"
api "software.amazon.awssdk:apache-client:${versions.aws}"
api "software.amazon.awssdk:metrics-spi:${versions.aws}"
api "software.amazon.awssdk:profiles:${versions.aws}"
api "software.amazon.awssdk:regions:${versions.aws}"
api "software.amazon.awssdk:utils:${versions.aws}"
api "software.amazon.awssdk:aws-json-protocol:${versions.aws}"
api "software.amazon.awssdk:protocol-core:${versions.aws}"
api "software.amazon.awssdk:json-utils:${versions.aws}"
api "software.amazon.awssdk:third-party-jackson-core:${versions.aws}"
api "software.amazon.awssdk:aws-xml-protocol:${versions.aws}"
api "software.amazon.awssdk:aws-json-protocol:${versions.aws}"
api "software.amazon.awssdk:aws-query-protocol:${versions.aws}"
api "software.amazon.awssdk:sts:${versions.aws}"
api "software.amazon.awssdk:netty-nio-client:${versions.aws}"
api "software.amazon.awssdk:kinesis:${versions.aws}"
api "software.amazon.awssdk:aws-cbor-protocol:${versions.aws}"
api "software.amazon.awssdk:third-party-jackson-dataformat-cbor:${versions.aws}"

api "org.apache.httpcomponents:httpclient:${versions.httpclient}"
api "org.apache.httpcomponents:httpcore:${versions.httpcore}"
api "commons-logging:commons-logging:${versions.commonslogging}"
api "commons-codec:commons-codec:${versions.commonscodec}"
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
api "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
api "joda-time:joda-time:${versions.joda}"
api "org.slf4j:slf4j-api:${versions.slf4j}"

// network stack
api "io.netty:netty-buffer:${versions.netty}"
api "io.netty:netty-codec:${versions.netty}"
api "io.netty:netty-codec-http:${versions.netty}"
api "io.netty:netty-codec-http2:${versions.netty}"
api "io.netty:netty-common:${versions.netty}"
api "io.netty:netty-handler:${versions.netty}"
api "io.netty:netty-resolver:${versions.netty}"
api "io.netty:netty-transport:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
api "io.netty:netty-transport-classes-epoll:${versions.netty}"


// test
testImplementation "com.github.docker-java:docker-java-api:${versions.docker}"
testImplementation "com.github.docker-java:docker-java-transport:${versions.docker}"
testImplementation "com.github.docker-java:docker-java-transport-zerodep:${versions.docker}"
testImplementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
testImplementation "org.testcontainers:testcontainers:${versions.testcontainers}"
testImplementation "org.testcontainers:localstack:${versions.testcontainers}"
testImplementation "org.rnorth.duct-tape:duct-tape:${versions.ducttape}"
testImplementation "org.apache.commons:commons-compress:${versions.commonscompress}"
testImplementation "commons-io:commons-io:${versions.commonsio}"
testImplementation 'org.awaitility:awaitility:4.2.0'
}

internalClusterTest{
environment 'TESTCONTAINERS_RYUK_DISABLED', 'true'
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
systemProperty 'tests.security.manager', 'false'
}

tasks.named("dependencyLicenses").configure {
mapping from: /jackson-.*/, to: 'jackson'
mapping from: /netty-.*/, to: 'netty'
mapping from: /log4j-.*/, to: 'log4j'
}

thirdPartyAudit {
ignoreMissingClasses(
'com.aayushatharva.brotli4j.Brotli4jLoader',
'com.aayushatharva.brotli4j.decoder.DecoderJNI$Status',
'com.aayushatharva.brotli4j.decoder.DecoderJNI$Wrapper',
'com.aayushatharva.brotli4j.encoder.BrotliEncoderChannel',
'com.aayushatharva.brotli4j.encoder.Encoder$Mode',
'com.aayushatharva.brotli4j.encoder.Encoder$Parameters',

'com.google.protobuf.nano.CodedOutputByteBufferNano',
'com.google.protobuf.nano.MessageNano',

'org.apache.avalon.framework.logger.Logger',
'org.apache.log.Hierarchy',
'org.apache.log.Logger',
'org.apache.log4j.Level',
'org.apache.log4j.Logger',
'org.apache.log4j.Priority',

'org.slf4j.impl.StaticLoggerBinder',
'org.slf4j.impl.StaticMDCBinder',
'org.slf4j.impl.StaticMarkerBinder',

'org.graalvm.nativeimage.hosted.Feature',
'org.graalvm.nativeimage.hosted.Feature$AfterImageWriteAccess',

'com.ning.compress.BufferRecycler',
'com.ning.compress.lzf.ChunkDecoder',
'com.ning.compress.lzf.ChunkEncoder',
'com.ning.compress.lzf.LZFChunk',
'com.ning.compress.lzf.LZFEncoder',
'com.ning.compress.lzf.util.ChunkDecoderFactory',
'com.ning.compress.lzf.util.ChunkEncoderFactory',

'javax.servlet.ServletContextEvent',
'javax.servlet.ServletContextListener',

'io.netty.internal.tcnative.Buffer',
'io.netty.internal.tcnative.CertificateCompressionAlgo',
'io.netty.internal.tcnative.Library',
'io.netty.internal.tcnative.SSLContext',
'io.netty.internal.tcnative.SSLPrivateKeyMethod',

'io.netty.internal.tcnative.AsyncSSLPrivateKeyMethod',
'io.netty.internal.tcnative.AsyncTask',
'io.netty.internal.tcnative.CertificateCallback',
'io.netty.internal.tcnative.CertificateVerifier',
'io.netty.internal.tcnative.ResultCallback',
'io.netty.internal.tcnative.SessionTicketKey',
'io.netty.internal.tcnative.SniHostNameMatcher',
'io.netty.internal.tcnative.SSL',
'io.netty.internal.tcnative.SSLSession',
'io.netty.internal.tcnative.SSLSessionCache',

'lzma.sdk.lzma.Encoder',
'net.jpountz.lz4.LZ4Compressor',
'net.jpountz.lz4.LZ4Factory',
'net.jpountz.lz4.LZ4FastDecompressor',
'net.jpountz.xxhash.XXHash32',
'net.jpountz.xxhash.XXHashFactory',

// from io.netty.handler.ssl.util.BouncyCastleSelfSignedCertGenerator (netty)
'org.bouncycastle.cert.X509v3CertificateBuilder',
'org.bouncycastle.cert.jcajce.JcaX509CertificateConverter',
'org.bouncycastle.operator.jcajce.JcaContentSignerBuilder',
'org.bouncycastle.openssl.PEMEncryptedKeyPair',
'org.bouncycastle.openssl.PEMParser',
'org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter',
'org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder',
'org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder',
'org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo',

'org.conscrypt.AllocatedBuffer',
'org.conscrypt.BufferAllocator',
'org.conscrypt.Conscrypt',
'org.conscrypt.HandshakeListener',

'org.eclipse.jetty.alpn.ALPN$ClientProvider',
'org.eclipse.jetty.alpn.ALPN$ServerProvider',
'org.eclipse.jetty.alpn.ALPN',

// from io.netty.handler.ssl.JettyNpnSslEngine (netty)
'org.eclipse.jetty.npn.NextProtoNego$ClientProvider',
'org.eclipse.jetty.npn.NextProtoNego$ServerProvider',
'org.eclipse.jetty.npn.NextProtoNego',

// from io.netty.handler.codec.marshalling.ChannelBufferByteInput (netty)
'org.jboss.marshalling.ByteInput',

// from io.netty.handler.codec.marshalling.ChannelBufferByteOutput (netty)
'org.jboss.marshalling.ByteOutput',

// from io.netty.handler.codec.marshalling.CompatibleMarshallingEncoder (netty)
'org.jboss.marshalling.Marshaller',

// from io.netty.handler.codec.marshalling.ContextBoundUnmarshallerProvider (netty)
'org.jboss.marshalling.MarshallerFactory',
'org.jboss.marshalling.MarshallingConfiguration',
'org.jboss.marshalling.Unmarshaller',

'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration',

'software.amazon.eventstream.HeaderValue',
'software.amazon.eventstream.Message',
'software.amazon.eventstream.MessageDecoder'
)

ignoreViolations (
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$1',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$2',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$3',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$4',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5',

'io.netty.util.internal.PlatformDependent0',
'io.netty.util.internal.PlatformDependent0$1',
'io.netty.util.internal.PlatformDependent0$2',
'io.netty.util.internal.PlatformDependent0$3',
'io.netty.util.internal.PlatformDependent0$4',
'io.netty.util.internal.PlatformDependent0$6',

'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueConsumerNodeRef',
'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueProducerNodeRef',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueColdProducerFields',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueConsumerFields',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueProducerFields',
'io.netty.util.internal.shaded.org.jctools.queues.LinkedQueueNode',
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerLimitField',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeLongArrayAccess',
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
c5acc1da9567290302d80ffa1633785afa4ce630
Loading
Loading