Skip to content

feat: kafka sink#97

Open
sumitaich1998 wants to merge 8 commits into
mainfrom
feat/kafka-sink-cursor
Open

feat: kafka sink#97
sumitaich1998 wants to merge 8 commits into
mainfrom
feat/kafka-sink-cursor

Conversation

@sumitaich1998
Copy link
Copy Markdown

No description provided.

sumitaich1998 and others added 8 commits May 13, 2026 17:56
…nitions

Introduces the Owner-based config interface for the Kafka Sink, loading
all SINK_KAFKA_* environment variables: brokers, topic, proto message/key,
proto mapping, large message toggle, stream, and all Sink Stencil Client
configs (URLs, cache, refresh, auth, timeout, retries). Also exposes
source Stencil Client and INPUT_SCHEMA_PROTO_CLASS for the mapping engine.

Co-authored-by: Cursor <cursoragent@cursor.com>
Introduces KafkaSinkStencilUtils with separate StencilConfig builders
for source and sink Stencil Clients. The sink client uses
SINK_KAFKA_SCHEMA_REGISTRY_STENCIL_* configs, enabling independent
schema registry settings for the output proto. Also adds kafka-clients
and cel-java dependencies to build.gradle.

Co-authored-by: Cursor <cursoragent@cursor.com>
…valuation

Implements ProtoMapper using cel-java v0.5.2 to compile and evaluate
CEL expressions for proto-to-proto field mapping. Supports direct field
mapping, string concatenation, type casting, nested field access, ternary
operator, static constants, and field presence checks via has() macro.
Fails fast at startup on invalid expressions and reports per-message
evaluation errors at runtime.

Co-authored-by: Cursor <cursoragent@cursor.com>
Implements KafkaMessageParser that parses raw byte[] payloads into
DynamicMessage using the source proto schema fetched via the Stencil
Client. Validates non-null and non-empty payloads.

Co-authored-by: Cursor <cursoragent@cursor.com>
Implements KafkaMessageSerializer that serializes mapped output
DynamicMessage instances back to byte[] for Kafka production.

Co-authored-by: Cursor <cursoragent@cursor.com>
…gh config support

Implements KafkaProducerWrapper around the Kafka client producer. Supports
large message mode (max.request.size=20MB, compression=snappy) via
SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE. Any SINK_KAFKA_* env vars not
reserved by the sink are passed through as native Kafka producer properties.

Co-authored-by: Cursor <cursoragent@cursor.com>
… flow

Implements KafkaSink with pushToSink(List<Message>) that orchestrates:
parse source proto -> map via CEL -> serialize -> produce to Kafka.
Collects per-message errors from each stage into SinkResponse following
Depot's ErrorInfo pattern. Continues processing remaining messages when
individual messages fail.

Co-authored-by: Cursor <cursoragent@cursor.com>
Implements KafkaSinkFactory following Depot's factory pattern with
init() and create() methods. Initializes source and sink Stencil Clients,
compiles CEL mapping expressions for both value and key protos, and
creates KafkaSink instances with all components wired together.

Co-authored-by: Cursor <cursoragent@cursor.com>
Comment on lines +18 to +25
@Slf4j
public class KafkaSink implements Sink {

private final KafkaMessageParser messageParser;
private final ProtoMapper valueMapper;
private final ProtoMapper keyMapper;
private final KafkaMessageSerializer serializer;
private final KafkaProducerWrapper producer;
Copy link
Copy Markdown
Author

@sumitaich1998 sumitaich1998 May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every other depot sink like BigQuerySink, HttpSink, RedisSink, MaxComputeSink… all are recieving Instrumentation object and also a sink-specifc metrics object as constructor dependancy, but KafkaSink is not recieving neither of them. its using Lombok @slf4j directly for loging instead of going through Instrumentation which is the depot-standrd way to combine loging with StatsD metric emision. this means zero metrics are emited from kafka sink… no produce latancy, no sucess counters, no failure counters, no size distrubutions. also there is no KafkaSinkMetrics class anywere in the codebase, comparable to HttpSinkMetrics, MaxComputeMetrics, RedisSinkMetrics,etc. a KafkaSinkMetrics class shoud be created with metric name genrators for produce latancy, sucess/failure counters, mesage size etc, and an Instrumentation instance shoud be injected into KafkaSink and used for all loging and metric emision

Comment on lines +64 to +70
} catch (CelEvaluationException e) {
log.error("CEL mapping error for message at index {}: {}", i, e.getMessage());
response.addErrors(i, new ErrorInfo(e, ErrorType.INVALID_MESSAGE_ERROR));
} catch (Exception e) {
log.error("Error processing message at index {}: {}", i, e.getMessage());
response.addErrors(i, new ErrorInfo(e, ErrorType.DESERIALIZATION_ERROR));
}
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are four failure sorces in per-mesage pipeline… source proto parsing, mapping/CEL runtime erors, output serialisation, and kafka producer failures, each should have aprpropriate eror categorisation. but curent code is only distingushing two categories where CelEvaluationException goes to INVALID_MESSAGE_ERROR and evrything else goes to DESERIALIZATION_ERROR. so this means a serialisation eror like DynamicMessage.toByteArray() failing… gets incorectly labeled as DESERIALIZATION_ERROR, similarly a ClassCastException from unsafe cast at line 49 or any other unexpcted eror would also be labeled DESERIALIZATION_ERROR. eror types shoud be fine-grained… use DESERIALIZATION_ERROR for parsing failures only, INVALID_MESSAGE_ERROR for CEL/mapping erors, SINK_NON_RETRYABLE_ERROR for serialisation erors etc. shoud catch each excptn type separatly like InvalidProtocolBufferException from parsing, CelEvaluationException from mapping… to asign corect ErrorType

Comment on lines +77 to +83
try {
futures.get(j).get();
} catch (Exception e) {
log.error("Kafka produce error for message at index {}: {}", originalIndex, e.getMessage());
response.addErrors(originalIndex, new ErrorInfo(
new Exception("Kafka produce failed: " + e.getMessage(), e),
ErrorType.SINK_RETRYABLE_ERROR));
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

producer acknowledgements shoud distingush between sucess, retryable and non-retryable failure… for mapping same to per-mesage eror info. curently every produce failure is categorised as SINK_RETRYABLE_ERROR, but several kafka excptions are non-retryable like RecordTooLargeException which means mesage exceeds max.request.size, also InvalidTopicException, UnknownServerException, AuthorizationException… these shoud be mapped to SINK_NON_RETRYABLE_ERROR. the ExecutionException wraping shoud be unwrapped to inspct the cause, and cause type shoud determin whether eror is retryable or non-retryable, for examle can chek cause instanceof RecordTooLargeException || cause instanceof InvalidTopicException and then mark as SINK_NON_RETRYABLE_ERROR… otherwise mark as SINK_RETRYABLE_ERROR

}
}

producer.flush();
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if flush() throws excption like InterruptException, or if previus send() trigered a fatal producer eror… the excption propogates uncaught, causing entire pushToSink call to fail without returning a SinkResponse. this means all the per-mesage erors already colected in response are lost, and firehose canot do proper per-mesage eror handling. the flush() call shoud be wraped in try-catch, if it fails… all mesages that were succesfully sent (those in validIndexes) but whose futures havent been resolved shoud be marked as SINK_RETRYABLE_ERROR in response. also flush shoud be loged for batch procesing visibility

for (int i = 0; i < messages.size(); i++) {
Message message = messages.get(i);
try {
byte[] rawValue = (byte[]) message.getLogMessage();
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Message.getLogMessage() returns Object, if mesage contains something other than byte[] like a String or already-parsed object… this will throw a ClassCastException, which would then be caught by genric catch (Exception e) and incorectly reported as DESERIALIZATION_ERROR. shoud be explicit instanceof chek with descriptve eror mesage before doing the cast… so that if type is wrong, proper eror with class name is thrown rather than genric ClassCastException

Comment on lines +125 to +127
@Key("SINK_KAFKA_LINGER_MS")
@DefaultValue("")
String getSinkKafkaLingerMs();
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this confg method exists in KafkaSinkConfig but is never used anywere in code. meanwhile the pasthrough mechanisim in KafkaProducerWrapper would convrt same env var SINK_KAFKA_LINGER_MS to kafka property linger.ms and apply it. having explict confg method that is never consumed is confusing and misleading… developrs may think its being used somwhere. either remove this method from confg interfce, or actualy use it in KafkaProducerWrapper and add to isReservedKey() list

private final StencilClient sourceStencilClient;
private final String sourceProtoClass;

public DynamicMessage parse(byte[] payload) throws Exception {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

declaring throws Exception on method forces all callers to handle broadest posible excption type, loses type informaton about what can actualy go wrong, and makes imposible for caller to distingush between expctd erors like malformd protobuf data and unexpctd bugs. StencilClient.parse() method likely throws InvalidProtocolBufferException or similar specifc excption… signatuer shoud declare specifc excption types that can ocur instd of genric Exception

Comment on lines +7 to +11
public byte[] serialize(DynamicMessage message) {
if (message == null) {
return new byte[0];
}
return message.toByteArray();
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if message is null… serializer returns empty byte array rather than signaling eror. this means empty 0-byte mesage would be produced to output kafka topic with no indicaton that somthing went wrong. a null DynamicMessage reaching serializer indicats a bug in upstream pipeline becoz mapper shoud never return null. this shoud throw IllegalArgumentException or IllegalStateException rather than silently producing corupt output… in data pipeline, silent data coruption is worse than loud failure

Comment on lines +9 to +19
public static StencilConfig getSinkStencilConfig(KafkaSinkConfig config, StatsDClient statsDClient) {
return StencilConfig.builder()
.cacheAutoRefresh(config.getSinkKafkaSchemaRegistryStencilCacheAutoRefresh())
.cacheTtlMs(config.getSinkKafkaSchemaRegistryStencilCacheTtlMs())
.statsDClient(statsDClient)
.fetchHeaders(config.getSinkKafkaSchemaRegistryStencilFetchHeaders())
.fetchBackoffMinMs(config.getSinkKafkaSchemaRegistryStencilFetchBackoffMinMs())
.fetchRetries(config.getSinkKafkaSchemaRegistryStencilFetchRetries())
.fetchTimeoutMs(config.getSinkKafkaSchemaRegistryStencilFetchTimeoutMs())
.refreshStrategy(config.getSinkKafkaSchemaRegistryStencilRefreshStrategy())
.build();
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaSinkConfig defines SINK_KAFKA_SCHEMA_REGISTRY_STENCIL_FETCH_AUTH_BEARER_TOKEN via getSinkKafkaSchemaRegistryStencilFetchAuthBearerToken()… but its never used when building either sink or source StencilConfig. if stencil server reqires authentication… this confg value is silently ignord and stencil client will fail with auth eror, with no indicaton that token was configred but not applied. chek whether StencilConfig.builder() has authToken or similar seter… if so, shoud be set here, if not then token shoud be convrtd to appropriate auth header and added to fetchHeaders

import java.util.List;

@Config.DisableFeature(Config.DisableableFeature.PARAMETER_FORMATTING)
public interface KafkaSinkConfig extends Config {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every other depot sink confg interfce extends SinkConfig which provids getMetricsApplicationPrefix() used by all SinkMetrics subclases. KafkaSinkConfig extends only org.aeonbits.owner.Config… which means the KafkaSinkMetrics class which needs to be created wont be able to follow patern new KafkaSinkMetrics(sinkConfig) like new BigQueryMetrics(sinkConfig) does, becoz KafkaSinkMetrics extends SinkMetrics and SinkMetrics construtor takes SinkConfig. also applicaton-level metric prefix wont be availble… breaking metric naming convntion {applicationPrefix}sink_kafka_*. either KafkaSinkConfig shoud extend SinkConfig… or seprate SinkConfig shoud be loaded alongside it in factory

KafkaMessageSerializer serializer = new KafkaMessageSerializer();
KafkaProducerWrapper producerWrapper = new KafkaProducerWrapper(mockProducer, "output-topic");

kafkaSink = new KafkaSink(messageParser, valueMapper, null, serializer, producerWrapper);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in setUp() method… keyMapper is always passed as null, so there is no test at all for the key mapping path. when keyMapper is non-null the sink does extra mapping and serialisation for the key part of ProducerRecord, but none of this is being testd. shoud have atleast one test where keyMapper is provided and verify that produced record has both mapped key and mapped value… otherwise the entire key mapping feture is untested

Comment on lines +103 to +112
@Test
public void shouldReportDeserializationErrorForInvalidMessage() {
List<Message> messages = Collections.singletonList(
new Message(null, "invalid-bytes".getBytes()));

SinkResponse response = kafkaSink.pushToSink(messages);
Assert.assertTrue(response.hasErrors());
Assert.assertNotNull(response.getErrorsFor(0));
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test only cheks hasErrors() and assertNotNull on eror… but doesnt assert the actual ErrorType. other depot sink tests like BigQuerySinkTest always assert ErrorType.DESERIALIZATION_ERROR or whatever specifc type is expectd. without asserting eror type… if someone changes the eror categorisation from DESERIALIZATION_ERROR to SINK_RETRYABLE_ERROR by mistake, test would still pass. same isue in shouldReportErrorForNullMessage test also… shoud do Assert.assertEquals(ErrorType.DESERIALIZATION_ERROR, response.getErrorsFor(0).getErrorType())

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

public class KafkaSinkTest {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no test for kafkaSink.pushToSink(Collections.emptyList()). this is important edge case… empty batch shoud return SinkResponse with no erors and no interacton with producer. without this test there is no guarntee that empty list doesnt cause IndexOutOfBoundsException or other unexpcted behavour

Comment on lines +123 to +163
@Test
@SuppressWarnings("unchecked")
public void shouldReportProducerErrorPerMessage() {
CompletableFuture<RecordMetadata> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new RuntimeException("Broker unavailable"));
when(mockProducer.send(any(ProducerRecord.class))).thenReturn(failedFuture);

TestBookingLogMessage source = TestBookingLogMessage.newBuilder()
.setOrderNumber("ORD-1")
.build();
List<Message> messages = Collections.singletonList(
new Message(null, source.toByteArray()));

SinkResponse response = kafkaSink.pushToSink(messages);
Assert.assertTrue(response.hasErrors());
Assert.assertEquals(ErrorType.SINK_RETRYABLE_ERROR,
response.getErrorsFor(0).getErrorType());
}

@Test
@SuppressWarnings("unchecked")
public void shouldContinueProcessingAfterErrorInMiddle() {
RecordMetadata metadata = new RecordMetadata(
new TopicPartition("output-topic", 0), 0, 0, 0, 0L, 0, 0);
when(mockProducer.send(any(ProducerRecord.class)))
.thenReturn(CompletableFuture.completedFuture(metadata));

TestBookingLogMessage validMsg = TestBookingLogMessage.newBuilder()
.setOrderNumber("ORD-1")
.build();

List<Message> messages = new ArrayList<>();
messages.add(new Message(null, validMsg.toByteArray()));
messages.add(new Message(null, "invalid".getBytes()));
messages.add(new Message(null, validMsg.toByteArray()));

SinkResponse response = kafkaSink.pushToSink(messages);
Assert.assertTrue(response.hasErrors());
Assert.assertNull(response.getErrorsFor(0));
Assert.assertNotNull(response.getErrorsFor(1));
Assert.assertNull(response.getErrorsFor(2));
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test shouldReportProducerErrorPerMessage only tests single mesage with producer failure, and shouldContinueProcessingAfterErrorInMiddle tests deserialization eror in middle of batch but not producer failure in middle. there is no test where batch of say 3 mesages is sent and producer suceeds for mesage 0 and 2 but fails for mesage 1… this is importnt scenario to verify that eror is corectly attributed to exct mesage index and other mesages remain sucesfull in SinkResponse

Comment on lines +38 to +56
@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
stencilClient = StencilClientFactory.getClient();

Descriptors.Descriptor sourceDesc = stencilClient.get(
"com.gotocompany.depot.TestBookingLogMessage");
Descriptors.Descriptor sinkDesc = stencilClient.get(
"com.gotocompany.depot.TestMessage");

String mapping = "{"
+ "\"order_number\": \"com.gotocompany.depot.TestBookingLogMessage.order_number\","
+ "\"order_url\": \"com.gotocompany.depot.TestBookingLogMessage.order_url\""
+ "}";

ProtoMapper valueMapper = new ProtoMapper(sourceDesc, sinkDesc, mapping);
KafkaMessageParser messageParser = new KafkaMessageParser(stencilClient,
"com.gotocompany.depot.TestBookingLogMessage");
KafkaMessageSerializer serializer = new KafkaMessageSerializer();
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only KafkaProducer is mocked but ProtoMapper, KafkaMessageParser, KafkaMessageSerializer and StencilClient are all real objects. this makes it more of an integraton test than unit test. if ProtoMapper has a bug… this test will also fail, making it hard to isolate failures. for proper unit test shoud mock all dependancys and only test KafkaSink logic in isolaton

Comment on lines +29 to +33
@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
wrapper = new KafkaProducerWrapper(mockProducer, "test-topic");
}
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test only uses package-privte construtor that takes pre-built KafkaProducer. the public construtor KafkaProducerWrapper(KafkaSinkConfig config, Map<String, String> envVars) is completly untesd. this means applySinkKafkaOverrides() method, isReservedKey() method, large mesage confg logic, and properties building are all without any test coverage. these are critcal parts… especialy pasthrough overide logic where we already identifed a bug that it can silently overide core properites

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class KafkaProducerWrapperTest {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applySinkKafkaOverrides converts env var names to kafka property names and applies them, and isReservedKey filters out known config keys from being passd thru. both methods have importnt logic with potntial bugs… but zero test coverage. shoud add tests like shouldApplyPassthroughOverrides, shouldNotOverrideReservedKeys, shouldHandleNullEnvVars, shouldConvertEnvVarNameToKafkaProperty etc

Comment on lines +12 to +33
public class KafkaMessageParserTest {

private KafkaMessageParser parser;
private StencilClient stencilClient;

@Before
public void setUp() {
stencilClient = StencilClientFactory.getClient();
parser = new KafkaMessageParser(stencilClient,
"com.gotocompany.depot.TestBookingLogMessage");
}


@Test(expected = IllegalArgumentException.class)
public void shouldThrowOnNullPayload() throws Exception {
parser.parse(null);
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowOnEmptyPayload() throws Exception {
parser.parse(new byte[0]);
}
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are only 2 tests and both are for eror cases (null and empty payload). the happy path where valid protobuf bytes are parsed into DynamicMessage is completly mising. also no test for invalid/corrupt protobuf bytes that are non-empty but not valid proto format. this is very low coverage for a class that is the first step in the per-mesage pipeline… shoud add atleast shouldParseValidProtobufPayload and shouldThrowOnCorruptProtobufPayload tests

Comment on lines +20 to +25
@Test
public void shouldReturnEmptyBytesForNullMessage() {
byte[] result = serializer.serialize(null);
Assert.assertNotNull(result);
Assert.assertEquals(0, result.length);
}
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is asserting that null input returns empty byte array as corect behavour. but returning empty bytes for null input is actualy a bug that silently produces corupt empty mesages to kafka. the test is basicly encoding the bug as expectd behavour. if the fix is to throw IllegalArgumentException for null input… this test would need to change to @test(expected = IllegalArgumentException.class)

@sumitaich1998 sumitaich1998 self-assigned this May 15, 2026
@sumitaich1998
Copy link
Copy Markdown
Author

overall test coverage is very less for all classes of kafka sink , there are very few unit tests which dont cover all the corner cases

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant