Skip to content

Commit fb66d9c

Browse files
marijarinm.zharinovamfvanek
authored
Add example of batch reading from Kafka with context propagation (#311)
* add task functions to kotlin module * add span column, change unique constraint * правки по батчевой отправке и тесты * rearrange imports * Fix problem with propagator * review fixes * fix changed java version * fix after review * rewrite asserts without warnings * add test for KafkaReadingService * add new tracing test for kafka consumer * Fix properties * Refactor tests --------- Co-authored-by: m.zharinova <m.zharinova@tbank.ru> Co-authored-by: Ivan Vakhrushev <mfvanek@gmail.com>
1 parent fc50dce commit fb66d9c

39 files changed

Lines changed: 926 additions & 99 deletions

File tree

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ allprojects {
1212
version = "0.5.0"
1313

1414
repositories {
15-
mavenLocal()
1615
mavenCentral()
16+
mavenLocal()
1717
}
1818
}
1919

buildSrc/src/main/kotlin/sb-ot-demo.java-compile.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dependencies {
2121
if (osdetector.arch == "aarch_64") {
2222
testImplementation("io.netty:netty-all:4.1.104.Final")
2323
}
24+
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
2425
}
2526

2627
java {

db-migrations/build.gradle.kts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,15 @@
11
plugins {
2-
id("java")
2+
id("java-library")
33
id("sb-ot-demo.java-conventions")
4+
id("io.freefair.lombok")
5+
}
6+
7+
dependencies {
8+
implementation(platform(project(":common-internal-bom")))
9+
implementation(platform(libs.spring.boot.v3.dependencies))
10+
11+
implementation("io.micrometer:micrometer-tracing")
12+
implementation("org.apache.kafka:kafka-clients")
13+
implementation("org.slf4j:slf4j-api")
14+
implementation("org.springframework:spring-jdbc")
415
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2020-2025. Ivan Vakhrushev and others.
3+
* https://github.com/mfvanek/spring-boot-open-telemetry-demo
4+
*
5+
* Licensed under the Apache License 2.0
6+
*/
7+
8+
package io.github.mfvanek.db.migrations.common.saver;
9+
10+
import io.micrometer.tracing.Span;
11+
import io.micrometer.tracing.Tracer;
12+
import lombok.RequiredArgsConstructor;
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.apache.kafka.clients.consumer.ConsumerRecord;
15+
import org.slf4j.MDC;
16+
import org.springframework.jdbc.core.simple.JdbcClient;
17+
18+
import java.time.Clock;
19+
import java.time.LocalDateTime;
20+
import java.util.UUID;
21+
22+
@Slf4j
23+
@RequiredArgsConstructor
24+
public class DbSaver {
25+
26+
private final String tenantName;
27+
private final Tracer tracer;
28+
private final Clock clock;
29+
private final JdbcClient jdbcClient;
30+
31+
public void processSingleRecord(ConsumerRecord<UUID, String> record) {
32+
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
33+
final Span currentSpan = tracer.currentSpan();
34+
final String traceId = currentSpan != null ? currentSpan.context().traceId() : "";
35+
final String spanId = currentSpan != null ? currentSpan.context().spanId() : "";
36+
log.info("Received record: {} with traceId {} spanId {}", record.value(), traceId, spanId);
37+
jdbcClient.sql("""
38+
insert into otel_demo.storage(message, trace_id, span_id, created_at)
39+
values(:msg, :traceId, :currentSpan, :createdAt);""")
40+
.param("msg", record.value())
41+
.param("traceId", traceId)
42+
.param("currentSpan", spanId)
43+
.param("createdAt", LocalDateTime.now(clock))
44+
.update();
45+
}
46+
}
47+
}

db-migrations/src/main/resources/db/changelog/db.changelog-master.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,7 @@ databaseChangeLog:
33
file: db/changelog/sql/schema.sql
44
- include:
55
file: db/changelog/sql/storage.sql
6+
- include:
7+
file: db/changelog/sql/add_span_column.sql
8+
- include:
9+
file: db/changelog/sql/set_span_and_trace_unique.sql
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
--liquibase formatted sql
2+
3+
--changeset marina.zharinova:2025.08.31:add span column
4+
alter table otel_demo.storage add column span_id text;
5+
6+
--changeset marina.zharinova:2025.08.31:comment on span_id
7+
comment on column otel_demo.storage.span_id is 'SpanId of operation';
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
--liquibase formatted sql
2+
3+
--changeset ivan.vakhrushev:2025.08.31:remove unique from trace_id
4+
alter table otel_demo.storage drop constraint storage_trace_id_key;
5+
6+
--changeset marina.zharinova:2025.08.31:add constraint on trace_id with span_id
7+
alter table otel_demo.storage add constraint trace_span_unique unique(trace_id, span_id);

db-migrations/src/main/resources/db/changelog/sql/storage.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ create table if not exists otel_demo.storage
55
(
66
id bigint generated always as identity,
77
message text not null,
8-
trace_id varchar(64) not null unique,
8+
trace_id text not null unique,
99
created_at timestamptz not null
1010
);
1111

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2020-2025. Ivan Vakhrushev and others.
3+
* https://github.com/mfvanek/spring-boot-open-telemetry-demo
4+
*
5+
* Licensed under the Apache License 2.0
6+
*/
7+
8+
package io.github.mfvanek.spring.boot3.kotlin.test.config
9+
10+
import io.github.mfvanek.db.migrations.common.saver.DbSaver
11+
import io.micrometer.tracing.Tracer
12+
import org.springframework.beans.factory.annotation.Value
13+
import org.springframework.context.annotation.Bean
14+
import org.springframework.context.annotation.Configuration
15+
import org.springframework.jdbc.core.simple.JdbcClient
16+
import java.time.Clock
17+
18+
@Configuration(proxyBeanMethods = false)
19+
class DbConfig {
20+
21+
@Bean
22+
fun dbSaver(
23+
@Value("\${app.tenant.name}") tenantName: String,
24+
tracer: Tracer,
25+
clock: Clock,
26+
jdbcClient: JdbcClient
27+
): DbSaver {
28+
return DbSaver(tenantName, tracer, clock, jdbcClient)
29+
}
30+
}

spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/controllers/TimeController.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,22 @@ class TimeController(
2525
private val kafkaSendingService: KafkaSendingService,
2626
private val publicApiService: PublicApiService
2727
) {
28-
28+
// http://localhost:8090/current-time
2929
@GetMapping(path = ["/current-time"])
3030
fun getNow(): LocalDateTime {
3131
logger.trace { "tracer $tracer" }
3232
val traceId = tracer.currentSpan()?.context()?.traceId()
3333
logger.info { "Called method getNow. TraceId = $traceId" }
3434
val nowFromRemote = publicApiService.getZonedTime()
3535
val now = nowFromRemote ?: LocalDateTime.now(clock)
36-
kafkaSendingService.sendNotification("Current time = $now")
36+
val message = "Current time = $now"
37+
kafkaSendingService.sendNotification(message)
3738
.thenRun { logger.info { "Awaiting acknowledgement from Kafka" } }
3839
.get()
40+
41+
kafkaSendingService.sendNotificationToOtherTopic(message)
42+
.thenRun { logger.info { "Awaiting acknowledgement from Kafka with batch" } }
43+
.get()
3944
return now
4045
}
4146
}

0 commit comments

Comments
 (0)