diff --git a/docs/benchmark-results/aws-s3.md b/docs/benchmark-results/aws-s3.md new file mode 100644 index 0000000000..5e7bc9ff14 --- /dev/null +++ b/docs/benchmark-results/aws-s3.md @@ -0,0 +1,264 @@ + +# AWS S3 Benchmark Results + +**Environment:** Intel Core i7-10850H @ 2.70GHz, 32 GB RAM, WSL2 (Linux 6.6.87.2), x86\_64 + +See `../../internal/impl/aws/s3/bench/` for configs and run instructions. +Read benchmarks are under `bench/read/`, write benchmarks under `bench/write/`. + +*** + + +# πŸ“Š PERFORMANCE REPORT + +## Executive Summary + +This benchmark evaluates S3 read and write throughput across three approaches: + +* Sequential S3 access (`aws_s3` bucket walk) +* Kafka Connect S3 Sink/Source +* Redpanda Connect S3 pipelines (single and multi-instance) + +### Key Findings + +* **S3 access is latency-bound unless parallelized.** + Sequential reads and writes are dominated by per-request overhead. + +* **Kafka Connect achieves the highest throughput (\~250k msg/s)** + due to parallel task execution and large-batch S3 writes. + +* **Redpanda Connect is throughput-capped (\~60k–73k msg/s)** + due to shared output constraints limiting S3 write concurrency. + +* **Batching is the dominant factor for write performance.** + Larger batches significantly reduce S3 request overhead. + +* **LocalStack introduces artificial ceilings.** + Results reflect LocalStack’s single-node S3 implementation rather than real AWS scalability. + +*** + +## Performance Summary + +| Workload | Best Throughput | Limiting Factor | +| ------------------------- | ------------------------ | --------------------- | +| Bucket walk (1KB) | \~563 msg/s | Request latency | +| Bucket walk (1MB) | \~190 msg/s (\~195 MB/s) | Transfer bandwidth | +| Kafka Connect S3 Source | \~73k msg/s | S3 read throughput | +| Kafka Connect S3 Sink | \~250k msg/s | S3 write throughput | +| Redpanda Connect (single) | \~61k msg/s | Shared S3 writer | +| Redpanda Connect (multi) | \~73k msg/s | S3 backend saturation | + +*** + +## Key Conclusions + +* **Parallelism is the primary driver of performance.** + Systems that issue multiple concurrent S3 requests achieve significantly higher throughput. + +* **Batch size is critical for write-heavy workloads.** + Larger batches reduce request overhead and improve efficiency. + +* **Architectural differences dominate tuning effects.** + Kafka Connect scales via independent tasks; Redpanda Connect is constrained by shared output. + +* **Measured ceilings are environment-dependent.** + LocalStack limits concurrency; real AWS S3 would likely increase absolute throughput and widen scaling differences. + + +*** + +# READ BENCHMARKS + +## Bucket Walk β€” Small Objects (1 KB) + +200,000 objects Γ— 1 KB. Default `aws_s3` input in bucket walk mode (no SQS), LocalStack. + +### msg/sec + +| GOMAXPROCS | size=1024 | +| ---------- | --------- | +| 1 | 563 | +| 2 | 556 | +| 4 | 548 | +| 8 | 544 | + +### kB/sec + +| GOMAXPROCS | size=1024 | +| ---------- | --------- | +| 1 | 577 | +| 2 | 569 | +| 4 | 561 | +| 8 | 557 | + +*** + +## Bucket Walk β€” Large Objects (1 MB) + +20,000 objects Γ— 1 MB. Same setup. + +### msg/sec + +| GOMAXPROCS | size=1048576 | +| ---------- | ------------ | +| 1 | 190 | +| 2 | 186 | +| 4 | 179 | +| 8 | 180 | + +### MB/sec + +| GOMAXPROCS | size=1048576 | +| ---------- | ------------ | +| 1 | 199 | +| 2 | 195 | +| 4 | 188 | +| 8 | 188 | + +*** + +## Kafka Connect S3 Source β€” Read Throughput + +### Results + +| TASKS | FLUSH | ELAPSED(s) | MSG/S | +| ----- | ----- | ---------- | ----- | +| 1 | 5000 | 49 | 61224 | +| 1 | 10000 | 50 | 60000 | +| 1 | 50000 | 66 | 45454 | +| 2 | 5000 | 41 | 73170 | +| 2 | 10000 | 42 | 71428 | +| 2 | 50000 | 51 | 58823 | +| 4 | 5000 | 42 | 71428 | +| 4 | 10000 | 42 | 71428 | +| 4 | 50000 | 50 | 60000 | +| 8 | 5000 | 42 | 71428 | +| 8 | 10000 | 41 | 73170 | +| 8 | 50000 | 57 | 52631 | + +*** + +## Read Observations + +* **Throughput is latency-bound for bucket walk.** + Sequential `GetObject` calls make HTTP round-trip time the dominant factor. + +* **CPU parallelism has no impact.** + Increasing `GOMAXPROCS` does not improve performance, confirming serialized I/O. + +* **Object size determines efficiency.** + Small objects (\~1 KB) are dominated by request overhead; large objects (\~1 MB) achieve high throughput due to efficient data transfer. + +* **Small-object workloads are inefficient.** + A 1000Γ— size increase yields \~340Γ— better throughput (MB/sec), showing request overhead dominates. + +* **Kafka Connect source follows the same S3 limits.** + Single-task throughput (\~60k msg/s) matches Redpanda write ceilings, indicating S3 request cost dominates. + +* **Parallelism improves read throughput up to saturation (\~73k msg/s).** + Beyond that, S3 becomes the bottleneck. + +* **LocalStack underestimates real latency impact.** + Real S3 deployments will show lower msg/sec due to network RTT. + +*** + +# WRITE BENCHMARKS + +## Kafka Connect S3 Sink β€” Write Throughput + +### Best Configurations + +| TASKS | FLUSH | POLL | FETCH MIN | MSG/S | +| ----- | ----- | ---- | ---------- | ------ | +| 16 | 50000 | 1000 | 1MB | 250000 | +| 2 | 50000 | 5000 | 4MB | 230769 | +| 4 | 50000 | 1000 | 1MB | 230769 | +| 8 | 50000 | 5000 | 1MB | 230769 | + +*** + +## Redpanda Connect S3 Sink β€” Single Process + +### Best Configurations + +| THREADS | FLUSH | FETCH MIN | MSG/S | +| ------- | ----- | ---------- | ----- | +| 2 | 5000 | 1MB | 61224 | +| 2 | 10000 | 1MB | 61224 | +| 4 | 10000 | 4MB | 61224 | +| 8 | 10000 | 4MB | 61224 | + +*** + +## Redpanda Connect S3 Sink β€” Multi-Instance + +### Best Configurations + +| INSTANCES | FLUSH | FETCH MIN | MSG/S | +| --------- | ----- | ---------- | ----- | +| 2 | 5000 | 1MB | 73170 | +| 8 | 10000 | 1MB | 73170 | + +*** + +## Write Observations + +### Kafka Connect + +* **`flush.size` is the dominant factor.** + Larger batches significantly improve throughput by reducing the number of S3 `PUT` operations. + +* **Parallelism helps but saturates quickly.** + Increasing tasks improves throughput until S3 becomes the limiting factor. + +* **Timing effects create discrete result bands.** + Flush interval and commit timing introduce measurable latency variance. + +* **Practical ceiling: \~230k–250k msg/s.** + This reflects LocalStack S3 limits rather than Kafka itself. + +*** + +### Redpanda Connect + +* **Single-process throughput is capped (\~60k msg/s).** + Performance is invariant across thread count and configuration. + +* **Processing parallelism does not translate to S3 parallelism.** + A shared output path limits scalability. + +* **Multiple instances improve throughput (\~73k msg/s).** + Parallel S3 writers across processes unlock limited scaling. + +* **Scaling saturates quickly.** + Beyond 2 instances, gains disappear due to S3 bottlenecks. + +* **Smaller flush sizes perform better.** + They avoid delays caused by timer-based flushes. + +*** + +# FINAL COMPARISON + +| Metric | Redpanda Connect | Kafka Connect | +| --------------------- | ---------------------------------- | -------------- | +| Peak throughput | **61k (single)** / **73k (multi)** | **250k msg/s** | +| Typical throughput | 51k–61k | 111k–230k | +| Parameter sensitivity | Low | High | +| Scaling model | Process-level | Task-level | +| Output concurrency | Limited | High | +| Resource footprint | \~200 MB RSS | \~2 GB JVM | + +*** + +## Summary + +* **Kafka Connect achieves \~4Γ— higher peak throughput**, driven by multiple independent S3 writers and strong batching efficiency. + +* **Redpanda Connect is limited by shared output constraints.** + Internal concurrency improves processing but not S3 write parallelism. + +* **Batching is critical for Kafka Connect but largely ineffective for Redpanda Connect.** + diff --git a/internal/impl/aws/s3/bench/read/kafka-connect/Dockerfile b/internal/impl/aws/s3/bench/read/kafka-connect/Dockerfile new file mode 100644 index 0000000000..971d3924cc --- /dev/null +++ b/internal/impl/aws/s3/bench/read/kafka-connect/Dockerfile @@ -0,0 +1,3 @@ +FROM confluentinc/cp-server-connect-base:7.7.8 +RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.23 +RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3-source:latest diff --git a/internal/impl/aws/s3/bench/read/kafka-connect/Taskfile.yaml b/internal/impl/aws/s3/bench/read/kafka-connect/Taskfile.yaml new file mode 100644 index 0000000000..ff86a7bdba --- /dev/null +++ b/internal/impl/aws/s3/bench/read/kafka-connect/Taskfile.yaml @@ -0,0 +1,382 @@ +version: "3" + +silent: true + +vars: + KAFKA_BOOTSTRAP: '{{.KAFKA_BOOTSTRAP | default "localhost:9092"}}' + CONNECT_URL: '{{.CONNECT_URL | default "http://localhost:8083"}}' + S3_ENDPOINT: '{{.S3_ENDPOINT | default "http://localhost:4566"}}' + CONNECTOR_NAME: '{{.CONNECTOR_NAME | default "s3-source-bench"}}' + SEEDER_NAME: s3-source-seeder + + # Source connector parameters + TASKS_MAX: '{{.TASKS_MAX | default "4"}}' + + # Seeder sink parameters (fixed β€” fast population, not benchmarked) + SEEDER_TASKS: 16 + SEEDER_FLUSH: 10000 + SEEDER_ROTATE_MS: 5000 + + # Matrix sweep + TASKS_LIST: '{{.TASKS_LIST | default "1 2 4 8 16"}}' + MATRIX_INTERVAL: '{{.MATRIX_INTERVAL | default "5"}}' + +tasks: + # ── Infrastructure ──────────────────────────────────────────────────────────── + + up: + desc: Build and start all infrastructure (Kafka, LocalStack, Kafka Connect), wait until ready + cmds: + - docker compose up -d --build + - until curl -sf {{.CONNECT_URL}}/ > /dev/null 2>&1; do sleep 3; done + - echo "Kafka Connect ready at {{.CONNECT_URL}}" + + down: + desc: Stop and remove all containers and volumes + cmds: + - docker compose down -v --remove-orphans + + # ── Seeding ─────────────────────────────────────────────────────────────────── + + topic:reset: + desc: Delete and recreate the bench-events Kafka topic + cmds: + - docker exec s3kcr-kafka kafka-topics --bootstrap-server localhost:29092 --delete --topic bench-events 2>/dev/null || true + - sleep 2 + - docker exec s3kcr-kafka kafka-topics --bootstrap-server localhost:29092 --create --if-not-exists --topic bench-events --partitions 16 --replication-factor 1 + - echo "topic reset" + + data:seed: + desc: | + Populate S3 with COUNT messages via the S3 Sink connector. + Resets bench-events topic, produces data, sinks to S3, waits for lag=0, removes seeder. + e.g. task data:seed COUNT=1000000 + vars: + COUNT: '{{.COUNT | default "1000000"}}' + cmds: + - curl -s -X DELETE {{.CONNECT_URL}}/connectors/{{.SEEDER_NAME}} > /dev/null || true + - sleep 1 + - docker exec s3kcr-localstack awslocal s3 rm s3://bench-events --recursive || true + - task: topic:reset + - | + COUNT={{.COUNT}} + KAFKA_BOOTSTRAP={{.KAFKA_BOOTSTRAP}} + go run ../../../../../../../cmd/redpanda-connect/main.go run ./producer.yaml + - | + # Register S3 Sink connector to write all messages to S3 + jq -n \ + --arg name "{{.SEEDER_NAME}}" \ + --arg tasks "{{.SEEDER_TASKS}}" \ + --arg flush "{{.SEEDER_FLUSH}}" \ + --arg rotate "{{.SEEDER_ROTATE_MS}}" \ + '{ + name: $name, + config: { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "tasks.max": $tasks, + "topics": "bench-events", + "s3.region": "us-east-1", + "s3.bucket.name": "bench-events", + "store.url": "http://localstack:4566", + "s3.part.size": "5242880", + "flush.size": $flush, + "rotate.interval.ms": "-1", + "rotate.schedule.interval.ms": $rotate, + "timezone": "UTC", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "schema.compatibility": "NONE", + "s3.path.style.access.enabled": "true" + } + }' > /tmp/seeder-cfg.json + RESP=$(curl -s -X POST {{.CONNECT_URL}}/connectors \ + -H "Content-Type: application/json" --data-binary @/tmp/seeder-cfg.json) + rm -f /tmp/seeder-cfg.json + if echo "$RESP" | jq -e '.error_code' > /dev/null 2>&1; then + echo "ERROR: $(echo "$RESP" | jq -r '.message')"; exit 1 + fi + echo "seeder registered: $(echo "$RESP" | jq -r '.name')" + + COUNT={{.COUNT}} + GROUP="connect-{{.SEEDER_NAME}}" + INTERVAL={{.MATRIX_INTERVAL}} + + get_lag() { + docker exec s3kcr-kafka kafka-consumer-groups \ + --bootstrap-server localhost:29092 \ + --describe --group "$1" 2>/dev/null \ + | awk '/bench-events/ { + cur = ($4 == "-") ? 0 : $4 + sum += $5 - cur + } END {print (NR == 0 ? -1 : sum)}' + } + + echo "Waiting for seeder to drain $COUNT messages to S3..." + START=$(date +%s) + while true; do + sleep "$INTERVAL" + LAG=$(get_lag "$GROUP") + [ "${LAG:--1}" -lt 0 ] 2>/dev/null && echo " (warming up...)" && continue + [ "$LAG" -lt 0 ] && LAG=0 + echo " lag: $LAG" + if [ "$LAG" -eq 0 ]; then + ELAPSED=$(( $(date +%s) - START )) + echo "Seeded $COUNT messages to S3 in ${ELAPSED}s" + break + fi + done + + # Wait for rotate.schedule.interval.ms to fire so all buffered records are flushed to S3 + # before we delete the connector (lag=0 only means Kafka consumer is drained, not S3 written) + sleep $(({{.SEEDER_ROTATE_MS}} / 1000 + 3)) + curl -s -X DELETE {{.CONNECT_URL}}/connectors/{{.SEEDER_NAME}} > /dev/null + echo "seeder removed β€” S3 populated" + + # ── Connector management ────────────────────────────────────────────────────── + + connector:status: + desc: Show connector and task status + cmds: + - curl -s {{.CONNECT_URL}}/connectors/{{.CONNECTOR_NAME}}/status | jq . + + connector:tasks: + desc: Show status of all connectors and their tasks + cmds: + - | + for name in $(curl -s {{.CONNECT_URL}}/connectors | jq -r '.[]'); do + echo "── $name ──" + curl -s {{.CONNECT_URL}}/connectors/$name/status | jq -r ' + " connector: \(.connector.state) \(if .connector.trace then "– \(.connector.trace | split("\n")[0])" else "" end)", + (if .tasks | length > 0 then + .tasks[] | " task \(.id): \(.state) \(if .trace then "– \(.trace | split("\n")[0])" else "" end)" + else + " (no tasks started)" + end) + ' + done + + # ── Observation ─────────────────────────────────────────────────────────────── + + bench:s3-count: + desc: Count objects in the S3 bench-events bucket + cmds: + - docker exec s3kcr-localstack awslocal s3 ls s3://bench-events --recursive | wc -l + + bench:offsets: + desc: Show current end offsets for bench-events + cmds: + - docker exec s3kcr-kafka kafka-get-offsets --bootstrap-server localhost:29092 --topic bench-events + + # ── Single parameterised run ────────────────────────────────────────────────── + + bench:run: + desc: | + Full benchmark run: seed S3, start source connector, measure until COUNT messages produced to Kafka. + e.g. task bench:run COUNT=1000000 TASKS_MAX=4 + vars: + COUNT: '{{.COUNT | default "1000000"}}' + cmds: + - task: data:seed + vars: { COUNT: '{{.COUNT}}' } + - curl -s -X DELETE {{.CONNECT_URL}}/connectors/{{.CONNECTOR_NAME}} > /dev/null || true + - sleep 1 + - | + get_end_offset() { + docker exec s3kcr-kafka kafka-get-offsets \ + --bootstrap-server localhost:29092 \ + --topic bench-events 2>/dev/null \ + | awk -F: '{sum += $NF} END {print sum+0}' + } + + BASE=$(get_end_offset bench-events) + TARGET=$((BASE + {{.COUNT}})) + echo "Base offset: $BASE Target: $TARGET" + + jq -n \ + --arg name "{{.CONNECTOR_NAME}}" \ + --arg tasks "{{.TASKS_MAX}}" \ + '{ + name: $name, + config: { + "connector.class": "io.confluent.connect.s3.source.S3SourceConnector", + "confluent.topic.bootstrap.servers": "kafka:29092", + "confluent.topic.replication.factor": "1", + "tasks.max": $tasks, + "s3.region": "us-east-1", + "s3.bucket.name": "bench-events", + "store.url": "http://localstack:4566", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "s3.path.style.access.enabled": "true", + "topics.dir": "topics", + "topics": "bench-events" + } + }' > /tmp/source-cfg.json + RESP=$(curl -s -X POST {{.CONNECT_URL}}/connectors \ + -H "Content-Type: application/json" --data-binary @/tmp/source-cfg.json) + rm -f /tmp/source-cfg.json + if echo "$RESP" | jq -e '.error_code' > /dev/null 2>&1; then + echo "ERROR: $(echo "$RESP" | jq -r '.message')"; exit 1 + fi + echo "registered: $(echo "$RESP" | jq -r '.name')" + + START=$(date +%s) + INTERVAL={{.MATRIX_INTERVAL}} + PREV_OFF=$BASE + PREV_TIME=$START + + printf "%-10s %-14s %s\n" "TIME" "OFFSET" "MSG/S" + printf "%-10s %-14s (registered)\n" "$(date +%H:%M:%S)" "$BASE" + + while true; do + sleep "$INTERVAL" + NOW=$(date +%s) + OFF=$(get_end_offset bench-events) + DELTA_T=$((NOW - PREV_TIME)) + DELTA_OFF=$((OFF - PREV_OFF)) + RATE=0 + [ "$DELTA_T" -gt 0 ] && RATE=$((DELTA_OFF / DELTA_T)) + printf "%-10s %-14s %s msg/s\n" "$(date +%H:%M:%S)" "$OFF" "$RATE" + + if [ "$OFF" -ge "$TARGET" ]; then + ELAPSED=$((NOW - START)) + AVG=$(( {{.COUNT}} / (ELAPSED > 0 ? ELAPSED : 1) )) + echo "---" + echo "Total messages : {{.COUNT}}" + echo "Elapsed : ${ELAPSED}s" + echo "Avg throughput : ${AVG} msg/s" + curl -s -X DELETE {{.CONNECT_URL}}/connectors/{{.CONNECTOR_NAME}} > /dev/null + break + fi + + PREV_OFF=$OFF; PREV_TIME=$NOW + done + + # ── Matrix sweep ────────────────────────────────────────────────────────────── + + bench:matrix: + desc: | + Seed S3 once, then sweep tasks.max values and print a comparison table. + Each combination uses a unique connector name so offsets always start from S3 beginning. + e.g. task bench:matrix COUNT=1000000 TASKS_LIST="1 2 4 8 16" + vars: + COUNT: '{{.COUNT | default "1000000"}}' + cmds: + - task: data:seed + vars: { COUNT: '{{.COUNT}}' } + - | + TASKS_LIST="{{.TASKS_LIST}}" + CONNECT_URL="{{.CONNECT_URL}}" + TOTAL={{.COUNT}} + INTERVAL={{.MATRIX_INTERVAL}} + + get_end_offset() { + docker exec s3kcr-kafka kafka-get-offsets \ + --bootstrap-server localhost:29092 \ + --topic bench-events 2>/dev/null \ + | awk -F: '{sum += $NF} END {print sum+0}' + } + + RESULTS_FILE=$(mktemp) + printf "%-8s %-12s %s\n" "TASKS" "ELAPSED(s)" "MSG/S" > "$RESULTS_FILE" + + RUN=0 + for tasks in $TASKS_LIST; do + RUN=$((RUN + 1)) + NAME="s3-src-$(printf '%03d' $RUN)" + + printf "\n=== run %d: tasks=%s ===\n" "$RUN" "$tasks" + + # Each new connector name has no stored offsets β†’ reads all S3 files from scratch. + # bench-events end offset grows by TOTAL with each run; BASE tracks the current floor. + BASE=$(get_end_offset bench-events) + TARGET=$((BASE + TOTAL)) + echo " base offset: $BASE target: $TARGET" + + jq -n \ + --arg name "$NAME" \ + --arg tasks "$tasks" \ + '{ + name: $name, + config: { + "connector.class": "io.confluent.connect.s3.source.S3SourceConnector", + "confluent.topic.bootstrap.servers": "kafka:29092", + "confluent.topic.replication.factor": "1", + "tasks.max": $tasks, + "s3.region": "us-east-1", + "s3.bucket.name": "bench-events", + "store.url": "http://localstack:4566", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "s3.path.style.access.enabled": "true", + "topics.dir": "topics", + "topics": "bench-events" + } + }' > /tmp/src-cfg.json + RESP=$(curl -s -X POST "$CONNECT_URL/connectors" \ + -H "Content-Type: application/json" --data-binary @/tmp/src-cfg.json) + rm -f /tmp/src-cfg.json + if echo "$RESP" | jq -e '.error_code' > /dev/null 2>&1; then + echo " ERROR: $(echo "$RESP" | jq -r '.message')" + continue + fi + echo " connector registered: $(echo "$RESP" | jq -r '.name')" + + COMBO_START=$(date +%s) + PREV_OFF=$BASE + PREV_TIME=$COMBO_START + printf " %-10s %-14s %s\n" "TIME" "OFFSET" "MSG/S" + printf " %-10s %-14s (registered)\n" "$(date +%H:%M:%S)" "$BASE" + + while true; do + sleep "$INTERVAL" + NOW=$(date +%s) + OFF=$(get_end_offset bench-events) + DELTA_T=$((NOW - PREV_TIME)) + DELTA_OFF=$((OFF - PREV_OFF)) + RATE=0 + [ "$DELTA_T" -gt 0 ] && RATE=$((DELTA_OFF / DELTA_T)) + printf " %-10s %-14s %s msg/s\n" "$(date +%H:%M:%S)" "$OFF" "$RATE" + + if [ "$OFF" -ge "$TARGET" ]; then + ELAPSED=$((NOW - COMBO_START)) + AVG=$(( TOTAL / (ELAPSED > 0 ? ELAPSED : 1) )) + printf " β†’ %ds, %s msg/s\n" "$ELAPSED" "$AVG" + printf "%-8s %-12s %s\n" "$tasks" "$ELAPSED" "$AVG" >> "$RESULTS_FILE" + break + fi + + PREV_OFF=$OFF; PREV_TIME=$NOW + done + + curl -s -X DELETE "$CONNECT_URL/connectors/$NAME" > /dev/null + sleep 2 + done + + echo "" + echo "════════════════════════════════════════════════════════════════" + echo " Matrix Results ($RUN combinations, COUNT=$TOTAL)" + echo "════════════════════════════════════════════════════════════════" + cat "$RESULTS_FILE" + rm -f "$RESULTS_FILE" + + # ── Logs ────────────────────────────────────────────────────────────────────── + + logs:connect: + desc: Follow Kafka Connect worker logs + cmds: + - docker compose logs -f kafka-connect + + logs:localstack: + desc: Follow LocalStack logs + cmds: + - docker compose logs -f localstack diff --git a/internal/impl/aws/s3/bench/read/kafka-connect/docker-compose.yaml b/internal/impl/aws/s3/bench/read/kafka-connect/docker-compose.yaml new file mode 100644 index 0000000000..0f08ac031e --- /dev/null +++ b/internal/impl/aws/s3/bench/read/kafka-connect/docker-compose.yaml @@ -0,0 +1,118 @@ +services: + # ── Kafka (KRaft, no ZooKeeper) ────────────────────────────────────────────── + kafka: + image: confluentinc/cp-kafka:7.7.8 + container_name: s3kcr-kafka + cpus: 3 + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk" + healthcheck: + test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"] + interval: 10s + timeout: 10s + retries: 12 + + # ── LocalStack (S3 source) ──────────────────────────────────────────────────── + localstack: + image: localstack/localstack:3 + container_name: s3kcr-localstack + ports: + - "4566:4566" + environment: + SERVICES: s3 + DEFAULT_REGION: us-east-1 + AWS_DEFAULT_REGION: us-east-1 + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:4566/_localstack/health"] + interval: 5s + timeout: 5s + retries: 12 + + # ── Kafka Connect + S3 plugin ───────────────────────────────────────────────── + kafka-connect: + build: . + container_name: s3kcr-connect + ports: + - "8083:8083" + environment: + CONNECT_BOOTSTRAP_SERVERS: kafka:29092 + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: s3-source-bench-group + CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs + CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: _connect-status + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" + CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect + CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components + CONNECT_OFFSET_FLUSH_INTERVAL_MS: "5000" + KAFKA_HEAP_OPTS: "-Xms512m -Xmx2g" + AWS_ACCESS_KEY_ID: test + AWS_SECRET_ACCESS_KEY: test + AWS_DEFAULT_REGION: us-east-1 + AWS_ENDPOINT_URL: http://localstack:4566 + depends_on: + kafka: + condition: service_healthy + localstack: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8083/"] + interval: 10s + timeout: 10s + retries: 15 + + # ── Create bench-events Kafka topic ────────────────────────────────────────── + kafka-setup: + image: confluentinc/cp-kafka:7.7.8 + container_name: s3kcr-kafka-setup + depends_on: + kafka: + condition: service_healthy + entrypoint: /bin/bash + command: + - -c + - | + kafka-topics --bootstrap-server kafka:29092 \ + --create --if-not-exists \ + --topic bench-events \ + --partitions 16 \ + --replication-factor 1 + echo "topic ready" + + # ── Create bench-events S3 bucket in LocalStack ─────────────────────────────── + localstack-setup: + image: amazon/aws-cli:latest + container_name: s3kcr-localstack-setup + depends_on: + localstack: + condition: service_healthy + environment: + AWS_ACCESS_KEY_ID: test + AWS_SECRET_ACCESS_KEY: test + AWS_DEFAULT_REGION: us-east-1 + entrypoint: /bin/bash + command: + - -c + - | + aws --endpoint-url http://localstack:4566 s3 mb s3://bench-events + echo "bucket ready" diff --git a/internal/impl/aws/s3/bench/read/kafka-connect/producer.yaml b/internal/impl/aws/s3/bench/read/kafka-connect/producer.yaml new file mode 100644 index 0000000000..cf42abe122 --- /dev/null +++ b/internal/impl/aws/s3/bench/read/kafka-connect/producer.yaml @@ -0,0 +1,28 @@ +# Produces COUNT synthetic events to the bench-events Kafka topic (used for seeding S3). +# +# Usage: +# COUNT=1000000 KAFKA_BOOTSTRAP=localhost:9092 \ +# go run ../../../../../../../cmd/redpanda-connect/main.go run ./producer.yaml + +input: + generate: + count: ${COUNT:1000000} + interval: "" + mapping: | + root = { + "id": count("events"), + "category": "cat_" + random_int(min: 1, max: 10).string(), + "value": random_int(min: 1, max: 1000000).float64() / 100.0, + "ts": now().ts_unix_micro() + } + +output: + kafka_franz: + seed_brokers: + - ${KAFKA_BOOTSTRAP:localhost:9092} + topic: bench-events + compression: snappy + batching: + count: 3000 + byte_size: 4194304 # 4 MiB + period: 5s diff --git a/internal/impl/aws/s3/bench/read/redpanda-connect/README.md b/internal/impl/aws/s3/bench/read/redpanda-connect/README.md new file mode 100644 index 0000000000..9d415844ea --- /dev/null +++ b/internal/impl/aws/s3/bench/read/redpanda-connect/README.md @@ -0,0 +1,67 @@ +# Benchmarking AWS S3 Input Component + +Benchmark demonstrating read throughput of Redpanda's `aws_s3` input connector. + +## Prerequisites + +Docker (for LocalStack) and Go (already required to build the project). + +## How to Run + +```bash +task run +``` + +This starts LocalStack, creates the `bench-objects` bucket, seeds 50,000 objects (~1 KB each), and runs the benchmark in one shot. + +### Re-running + +Objects persist in LocalStack for the lifetime of the container, so you can re-run the benchmark without re-seeding: + +```bash +go run ../../../../../cmd/redpanda-connect/main.go run ./benchmark_config.yaml +``` + +To re-seed with fresh data: + +```bash +task drop-bucket +task seed +go run ../../../../../cmd/redpanda-connect/main.go run ./benchmark_config.yaml +``` + +### Individual tasks + +```bash +task localstack:up # start LocalStack container +task create # create the bench-objects bucket +task seed # upload 50K objects (~1 KB each) +task drop-bucket # empty the bucket (delete all objects) +task localstack:down # stop and remove the container +``` + +### Seed options + +```bash +go run . seed --total 100000 --size 4096 --workers 128 +``` + +## Notes + +- LocalStack runs all AWS services locally; S3 is available on port 4566. +- The `aws_s3` input walks objects sequentially via `ListObjectsV2` (100 keys per page) and downloads each one. Throughput is dominated by per-object HTTP round trips, not raw data size. +- The benchmark terminates automatically once all objects have been read (`ErrEndOfInput`). +- To increase throughput, try larger objects (`--size`) or more objects. Parallelism in the connector itself is limited to one object at a time. + +### Expected Output + +``` +INFO rolling stats: 3000 msg/sec, 3 MB/sec @service=redpanda-connect bytes/sec=3.1e+06 label="" msg/sec=3000 path=root.output.processors.0 +INFO rolling stats: 3200 msg/sec, 3 MB/sec @service=redpanda-connect bytes/sec=3.3e+06 label="" msg/sec=3200 path=root.output.processors.0 +``` + +> **Note:** LocalStack runs in-process with no real network overhead. Production S3 throughput will differ depending on network latency, object size, and instance type. + +## Recording Results + +After running the benchmark, record your results in [`docs/benchmark-results/aws-s3.md`](../../../../../docs/benchmark-results/aws-s3.md). Append a new dated section with environment details, dataset, throughput numbers, and observations. See [`docs/benchmarking.md`](../../../../../docs/benchmarking.md) for the full guide. diff --git a/internal/impl/aws/s3/bench/read/redpanda-connect/Taskfile.yaml b/internal/impl/aws/s3/bench/read/redpanda-connect/Taskfile.yaml new file mode 100644 index 0000000000..2561d75303 --- /dev/null +++ b/internal/impl/aws/s3/bench/read/redpanda-connect/Taskfile.yaml @@ -0,0 +1,75 @@ +version: '3' + +vars: + ENDPOINT: http://localhost:4566 + REGION: us-east-1 + COUNT: '100000' + SIZE: '1024' + +env: + AWS_ACCESS_KEY_ID: xxxxx + AWS_SECRET_ACCESS_KEY: xxxxx + AWS_DEFAULT_REGION: "{{.REGION}}" + +tasks: + run: + desc: "Seed bucket and run the benchmark (e.g. task run CORES=4 COUNT=100000 SIZE=1024)" + silent: true + vars: + CORES: '{{.CORES | default ""}}' + cmds: + - task: up + - until curl -sf http://localhost:4566/_localstack/health > /dev/null; do sleep 1; done + - task: drop-bucket + - task: create + - task: seed + - echo "GOMAXPROCS={{.CORES | default "max"}} COUNT={{.COUNT}} SIZE={{.SIZE}}" + - | + {{if .CORES}}GOMAXPROCS={{.CORES}} {{end}}go run ../../../../../../../cmd/redpanda-connect/main.go run ./benchmark_config.yaml 2>&1 | grep "total stats" + + bench:all: + desc: "Run benchmark across 1,2,4,8 cores (e.g. task bench:all COUNT=100000 SIZE=102400)" + silent: true + cmds: + - task: run + vars: { CORES: '1', COUNT: '{{.COUNT}}', SIZE: '{{.SIZE}}' } + - task: run + vars: { CORES: '2', COUNT: '{{.COUNT}}', SIZE: '{{.SIZE}}' } + - task: run + vars: { CORES: '4', COUNT: '{{.COUNT}}', SIZE: '{{.SIZE}}' } + - task: run + vars: { CORES: '8', COUNT: '{{.COUNT}}', SIZE: '{{.SIZE}}' } + + seed: + silent: true + desc: "Upload objects to the bucket (e.g. task seed COUNT=100000 SIZE=102400)" + cmd: go run . seed --endpoint {{.ENDPOINT}} --region {{.REGION}} --total {{.COUNT}} --size {{.SIZE}} 2>&1 | grep "Completed" + + up: + silent: true + cmd: | + if docker ps -q --filter name=s3-bench | grep -q .; then + true + else + docker rm -fv s3-bench 2>/dev/null || true + docker run -d \ + --name s3-bench \ + -p 4566:4566 \ + localstack/localstack:3 > /dev/null + echo "LocalStack started." + fi + + down: + silent: true + cmd: docker rm -fv s3-bench > /dev/null + + localstack:logs: + cmd: docker logs -f s3-bench + + create: + silent: true + cmd: go run . setup --endpoint {{.ENDPOINT}} --region {{.REGION}} > /dev/null + + drop-bucket: + silent: true + cmd: go run . drop-bucket --endpoint {{.ENDPOINT}} --region {{.REGION}} > /dev/null diff --git a/internal/impl/aws/s3/bench/read/redpanda-connect/benchmark_config.yaml b/internal/impl/aws/s3/bench/read/redpanda-connect/benchmark_config.yaml new file mode 100644 index 0000000000..8303fa157f --- /dev/null +++ b/internal/impl/aws/s3/bench/read/redpanda-connect/benchmark_config.yaml @@ -0,0 +1,28 @@ +http: + debug_endpoints: true + +input: + aws_s3: + bucket: bench-objects + force_path_style_urls: true + endpoint: http://localhost:4566 + region: us-east-1 + credentials: + id: xxxxx + secret: xxxxx + token: xxxxx + +output: + processors: + - benchmark: + interval: 1s + count_bytes: true + drop: {} + +logger: + level: INFO + +metrics: + prometheus: + add_process_metrics: true + add_go_metrics: true diff --git a/internal/impl/aws/s3/bench/read/redpanda-connect/main.go b/internal/impl/aws/s3/bench/read/redpanda-connect/main.go new file mode 100644 index 0000000000..cb38503838 --- /dev/null +++ b/internal/impl/aws/s3/bench/read/redpanda-connect/main.go @@ -0,0 +1,255 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +// output_salesforce.go implements a Redpanda Connect output component that writes +// messages to Salesforce using either the sObject Collections REST API (realtime mode) +// or the Bulk API 2.0 (bulk mode). +// +// Messages are routed to the correct SObject configuration based on the "topic" field +// set by the per-topic processor. Each topic_mapping entry defines the SObject, operation, +// external ID field, and write mode for a given topic. + +// Package main provides a benchmark data generation tool for AWS S3. +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "os" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +const ( + bucketName = "bench-objects" + progressInterval = 5000 +) + +func newS3Client(endpoint, region string) *s3.Client { + cfg := aws.Config{ + Region: region, + Credentials: credentials.NewStaticCredentialsProvider("xxxxx", "xxxxx", "xxxxx"), + } + return s3.NewFromConfig(cfg, func(o *s3.Options) { + o.BaseEndpoint = aws.String(endpoint) + o.UsePathStyle = true + }) +} + +func main() { + if len(os.Args) < 2 { + fmt.Fprintf(os.Stderr, "usage: %s [flags]\n", os.Args[0]) + os.Exit(1) + } + switch os.Args[1] { + case "setup": + runSetup(os.Args[2:]) + case "seed": + runSeed(os.Args[2:]) + case "drop-bucket": + runDropBucket(os.Args[2:]) + default: + fmt.Fprintf(os.Stderr, "unknown subcommand %q\n", os.Args[1]) + os.Exit(1) + } +} + +// setup ----------------------------------------------------------------------- + +func runSetup(args []string) { + fs := flag.NewFlagSet("setup", flag.ExitOnError) + endpoint := fs.String("endpoint", "http://localhost:4566", "S3 endpoint URL") + region := fs.String("region", "us-east-1", "AWS region") + _ = fs.Parse(args) + + client := newS3Client(*endpoint, *region) + ctx := context.Background() + if err := createBucketIfNotExists(ctx, client); err != nil { + fmt.Fprintf(os.Stderr, "setup: %v\n", err) + os.Exit(1) + } + fmt.Println("Bucket ready.") +} + +func createBucketIfNotExists(ctx context.Context, client *s3.Client) error { + _, err := client.HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(bucketName)}) + if err == nil { + fmt.Printf("Bucket %s already exists.\n", bucketName) + return nil + } + + fmt.Printf("Creating bucket %s...\n", bucketName) + if _, err = client.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }); err != nil { + return fmt.Errorf("create %s: %w", bucketName, err) + } + + waiter := s3.NewBucketExistsWaiter(client) + if err := waiter.Wait(ctx, &s3.HeadBucketInput{Bucket: aws.String(bucketName)}, time.Minute); err != nil { + return fmt.Errorf("wait %s: %w", bucketName, err) + } + fmt.Printf("Bucket %s created.\n", bucketName) + return nil +} + +// seed ------------------------------------------------------------------------ + +func runSeed(args []string) { + fs := flag.NewFlagSet("seed", flag.ExitOnError) + endpoint := fs.String("endpoint", "http://localhost:4566", "S3 endpoint URL") + region := fs.String("region", "us-east-1", "AWS region") + total := fs.Int("total", 50000, "Number of objects to upload") + size := fs.Int("size", 1024, "Object size in bytes") + workers := fs.Int("workers", 64, "Number of concurrent workers") + _ = fs.Parse(args) + + client := newS3Client(*endpoint, *region) + ctx := context.Background() + if err := seedBucket(ctx, client, *total, *size, *workers); err != nil { + fmt.Fprintf(os.Stderr, "seed: %v\n", err) + os.Exit(1) + } +} + +func seedBucket(ctx context.Context, client *s3.Client, total, objSize, numWorkers int) error { + fmt.Printf("Uploading %d objects (~%d bytes each) to %s...\n", total, objSize, bucketName) + start := time.Now() + + jobs := make(chan int, numWorkers*2) + + var uploaded atomic.Int64 + var wg sync.WaitGroup + errCh := make(chan error, 1) + + for range numWorkers { + wg.Add(1) + go func() { + defer wg.Done() + for n := range jobs { + if err := uploadObject(ctx, client, n, objSize); err != nil { + select { + case errCh <- err: + default: + } + return + } + done := uploaded.Add(1) + if done%progressInterval == 0 { + elapsed := time.Since(start).Seconds() + fmt.Printf("Progress: %d/%d objects (%.0f obj/sec)\n", done, total, float64(done)/elapsed) + } + } + }() + } + + for i := range total { + select { + case jobs <- i: + case err := <-errCh: + close(jobs) + wg.Wait() + return err + } + } + close(jobs) + wg.Wait() + + select { + case err := <-errCh: + return err + default: + } + + elapsed := time.Since(start).Seconds() + done := uploaded.Load() + totalMB := float64(int64(objSize)*done) / 1e6 + fmt.Printf("Completed: %d objects uploaded to %s in %.1fs (%.0f obj/sec, %.1f MB/s)\n", + done, bucketName, elapsed, float64(done)/elapsed, totalMB/elapsed) + return nil +} + +func uploadObject(ctx context.Context, client *s3.Client, n, size int) error { + _, err := client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(fmt.Sprintf("bench/obj-%08d.json", n)), + Body: bytes.NewReader(makeObjectBody(n, size)), + ContentType: aws.String("application/json"), + }) + return err +} + +func makeObjectBody(n, size int) []byte { + prefix := fmt.Sprintf(`{"id":"obj-%d","created_at":"%s","data":"`, n, time.Now().UTC().Format(time.RFC3339)) + suffix := `"}` + pad := size - len(prefix) - len(suffix) + if pad < 0 { + pad = 0 + } + return []byte(prefix + strings.Repeat("x", pad) + suffix) +} + +// drop-bucket ----------------------------------------------------------------- + +func runDropBucket(args []string) { + fs := flag.NewFlagSet("drop-bucket", flag.ExitOnError) + endpoint := fs.String("endpoint", "http://localhost:4566", "S3 endpoint URL") + region := fs.String("region", "us-east-1", "AWS region") + _ = fs.Parse(args) + + client := newS3Client(*endpoint, *region) + ctx := context.Background() + deleted, err := emptyBucket(ctx, client) + if err != nil { + fmt.Fprintf(os.Stderr, "drop-bucket: %v\n", err) + os.Exit(1) + } + fmt.Printf("Deleted %d objects from %s.\n", deleted, bucketName) +} + +func emptyBucket(ctx context.Context, client *s3.Client) (int, error) { + var total int + var token *string + for { + out, err := client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: aws.String(bucketName), + ContinuationToken: token, + }) + if err != nil { + return total, fmt.Errorf("list: %w", err) + } + if len(out.Contents) == 0 { + break + } + ids := make([]types.ObjectIdentifier, len(out.Contents)) + for i, obj := range out.Contents { + ids[i] = types.ObjectIdentifier{Key: obj.Key} + } + if _, err := client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ + Bucket: aws.String(bucketName), + Delete: &types.Delete{Objects: ids}, + }); err != nil { + return total, fmt.Errorf("delete: %w", err) + } + total += len(ids) + if !aws.ToBool(out.IsTruncated) { + break + } + token = out.NextContinuationToken + } + return total, nil +} diff --git a/internal/impl/aws/s3/bench/write/kafka-connect/Dockerfile b/internal/impl/aws/s3/bench/write/kafka-connect/Dockerfile new file mode 100644 index 0000000000..71a6cbe350 --- /dev/null +++ b/internal/impl/aws/s3/bench/write/kafka-connect/Dockerfile @@ -0,0 +1,2 @@ +FROM confluentinc/cp-kafka-connect-base:7.7.8 +RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.23 diff --git a/internal/impl/aws/s3/bench/write/kafka-connect/README.md b/internal/impl/aws/s3/bench/write/kafka-connect/README.md new file mode 100644 index 0000000000..5a935ed625 --- /dev/null +++ b/internal/impl/aws/s3/bench/write/kafka-connect/README.md @@ -0,0 +1,109 @@ +# Kafka Connect S3 Sink β€” Benchmark + +Benchmarks Kafka Connect (Confluent S3 Sink) reading from Kafka and writing to S3 (LocalStack). + +See [`docs/benchmark-results/aws-s3.md`](../../../../../docs/benchmark-results/aws-s3.md) for results and comparison with Redpanda Connect. + +## Architecture + +``` +Redpanda Connect producer β†’ bench-events (16 partitions) β†’ Kafka Connect S3 Sink β†’ LocalStack S3 +``` + +- **Kafka** β€” KRaft broker, no ZooKeeper (`confluentinc/cp-kafka:7.7.8`) +- **LocalStack** β€” local S3 endpoint (`bench-events` bucket) +- **Kafka Connect** β€” S3 Sink connector (`confluentinc/kafka-connect-s3:10.5.23`) +- **Message format** β€” plain JSON, one record per line per S3 object + +## Prerequisites + +- Docker +- `task` (Taskfile runner) +- `jq` +- `aws` CLI (for `bench:s3-count` and bucket cleanup) +- `go` (to run the producer) + +## Quickstart + +```bash +task up +task bench:run COUNT=10000000 +task down +``` + +## Connector Parameters + +All parameters are configurable as Taskfile vars: + +| Var | Default | Connector field | Description | +|---|---|---|---| +| `TASKS_MAX` | `16` | `tasks.max` | Parallel sink tasks (capped at partition count) | +| `FLUSH_SIZE` | `10000` | `flush.size` | Records per S3 object | +| `POLL_RECORDS` | `5000` | `consumer.override.max.poll.records` | Records fetched per Kafka poll | +| `FETCH_MIN_BYTES` | `1048576` | `consumer.override.fetch.min.bytes` | Min bytes before broker responds (1 MiB) | +| `PART_SIZE` | `67108864` | `s3.part.size` | S3 multipart upload chunk (64 MiB) | + +Pass any of them to `connector:create` or `bench:run`: + +```bash +task bench:run COUNT=1000000 TASKS_MAX=8 FLUSH_SIZE=50000 POLL_RECORDS=10000 +``` + +## Matrix Benchmark + +Run every combination of the parameter lists and get a comparison table: + +```bash +task bench:matrix COUNT=1000000 +``` + +Default lists (54 combinations): + +| Var | Default values | +|---|---| +| `TASKS_LIST` | `4 8 16` | +| `FLUSH_LIST` | `1000 10000 50000` | +| `POLL_LIST` | `1000 5000 10000` | +| `FETCH_MIN_LIST` | `1048576 4194304` | + +Override any list to narrow the sweep: + +```bash +task bench:matrix COUNT=1000000 TASKS_LIST="8 16" FLUSH_LIST="10000 50000" POLL_LIST="5000" +``` + +Each combination gets a unique connector name (and therefore its own consumer group starting from offset 0), so data is loaded only once and all combinations run against the same dataset. + +Sample output: + +``` +════════════════════════════════════════════════════════════════ + Matrix Results (18 combinations, COUNT=1000000) +════════════════════════════════════════════════════════════════ +TASKS FLUSH POLL FETCH_MIN ELAPSED(s) MSG/S +4 1000 1000 1MB 48 20833 +4 1000 5000 1MB 39 25641 +4 10000 5000 1MB 31 32258 +8 10000 5000 1MB 18 55555 +16 50000 10000 4MB 11 90909 +... +``` + +## Tasks Reference + +| Task | Description | +|---|---| +| `task up` | Build and start all containers, wait for Connect readiness | +| `task down` | Stop and remove all containers and volumes | +| `task bench:run COUNT=N [params]` | Single parameterised run | +| `task bench:matrix COUNT=N [lists]` | Sweep all combinations, print table | +| `task bench:measure TOTAL=N INTERVAL=S` | Poll lag and print msg/s until drained | +| `task bench:lag` | Show current consumer group lag | +| `task bench:offsets` | Show end offsets for bench-events | +| `task bench:s3-count` | Count objects written to the S3 bucket | +| `task connector:create [params]` | Register connector with current var values | +| `task connector:status` | Show connector and task status | +| `task connector:delete` | Delete the connector (keeps infrastructure running) | +| `task data:load COUNT=N` | Produce N events to bench-events | +| `task logs:connect` | Follow Kafka Connect worker logs | +| `task logs:localstack` | Follow LocalStack logs | diff --git a/internal/impl/aws/s3/bench/write/kafka-connect/Taskfile.yaml b/internal/impl/aws/s3/bench/write/kafka-connect/Taskfile.yaml new file mode 100644 index 0000000000..d908868d2c --- /dev/null +++ b/internal/impl/aws/s3/bench/write/kafka-connect/Taskfile.yaml @@ -0,0 +1,525 @@ +version: "3" + +silent: true + +vars: + KAFKA_BOOTSTRAP: '{{.KAFKA_BOOTSTRAP | default "localhost:9092"}}' + CONNECT_URL: '{{.CONNECT_URL | default "http://localhost:8083"}}' + S3_ENDPOINT: '{{.S3_ENDPOINT | default "http://localhost:4566"}}' + CONNECTOR_NAME: '{{.CONNECTOR_NAME | default "s3-sink-bench"}}' + CONSUMER_GROUP: connect-s3-sink-bench + + # ── Connector parameters (used by connector:create and bench:run) ───────────── + TASKS_MAX: '{{.TASKS_MAX | default "16"}}' + FLUSH_SIZE: '{{.FLUSH_SIZE | default "10000"}}' + POLL_RECORDS: '{{.POLL_RECORDS | default "5000"}}' + FETCH_MIN_BYTES: '{{.FETCH_MIN_BYTES | default "1048576"}}' + PART_SIZE: '{{.PART_SIZE | default "5242880"}}' + # rotate.schedule.interval.ms: periodically flush whatever is buffered, regardless of + # flush.size. Works with DefaultPartitioner (unlike rotate.interval.ms which requires + # TimeBasedPartitioner). Ensures the final partial batch is committed so lag reaches 0. + ROTATE_SCHEDULE_MS: '{{.ROTATE_SCHEDULE_MS | default "10000"}}' + + # ── Matrix sweep lists (space-separated values to try for each parameter) ───── + TASKS_LIST: '{{.TASKS_LIST | default "4 8 16"}}' + FLUSH_LIST: '{{.FLUSH_LIST | default "1000 10000 50000"}}' + POLL_LIST: '{{.POLL_LIST | default "1000 5000 10000"}}' + FETCH_MIN_LIST: '{{.FETCH_MIN_LIST | default "1048576 4194304"}}' + MATRIX_INTERVAL: '{{.MATRIX_INTERVAL | default "5"}}' + +tasks: + # ── Infrastructure ──────────────────────────────────────────────────────────── + + up: + desc: Build and start all infrastructure (Kafka, LocalStack, Kafka Connect), wait until ready + cmds: + - docker compose up -d --build + - until curl -sf {{.CONNECT_URL}}/ > /dev/null 2>&1; do sleep 3; done + - echo "Kafka Connect ready at {{.CONNECT_URL}}" + + down: + desc: Stop and remove all containers and volumes + cmds: + - docker compose down -v --remove-orphans + + # ── Connector management ────────────────────────────────────────────────────── + + connector:create: + desc: | + Register the S3 sink connector with configurable parameters. + e.g. task connector:create TASKS_MAX=8 FLUSH_SIZE=5000 POLL_RECORDS=10000 FETCH_MIN_BYTES=4194304 + cmds: + - | + jq -n \ + --arg name "{{.CONNECTOR_NAME}}" \ + --arg tasks "{{.TASKS_MAX}}" \ + --arg flush "{{.FLUSH_SIZE}}" \ + --arg poll "{{.POLL_RECORDS}}" \ + --arg fetch "{{.FETCH_MIN_BYTES}}" \ + --arg part "{{.PART_SIZE}}" \ + --arg rotate "{{.ROTATE_SCHEDULE_MS}}" \ + '{ + name: $name, + config: { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "tasks.max": $tasks, + "topics": "bench-events", + "s3.region": "us-east-1", + "s3.bucket.name": "bench-events", + "store.url": "http://localstack:4566", + "s3.part.size": $part, + "flush.size": $flush, + "rotate.interval.ms": "-1", + "rotate.schedule.interval.ms": $rotate, + "timezone": "UTC", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "schema.compatibility": "NONE", + "consumer.override.fetch.min.bytes": $fetch, + "consumer.override.fetch.max.wait.ms": "500", + "consumer.override.max.poll.records": $poll + } + }' > /tmp/connector-cfg.json + RESP=$(curl -s -X POST {{.CONNECT_URL}}/connectors \ + -H "Content-Type: application/json" --data-binary @/tmp/connector-cfg.json) + rm -f /tmp/connector-cfg.json + echo "$RESP" | jq -r 'if .error_code then "ERROR \(.error_code): \(.message)" else "registered: \(.name)" end' + echo "$RESP" | jq -e '.error_code' > /dev/null 2>&1 && exit 1 || true + + connector:status: + desc: Show connector and task status + cmds: + - curl -s {{.CONNECT_URL}}/connectors/{{.CONNECTOR_NAME}}/status | jq . + + connector:tasks: + desc: Show status of all connectors and their tasks (useful for debugging failures) + cmds: + - | + for name in $(curl -s {{.CONNECT_URL}}/connectors | jq -r '.[]'); do + echo "── $name ──" + curl -s {{.CONNECT_URL}}/connectors/$name/status | jq -r ' + " connector: \(.connector.state) \(if .connector.trace then "– \(.connector.trace | split("\n")[0])" else "" end)", + (if .tasks | length > 0 then + .tasks[] | " task \(.id): \(.state) \(if .trace then "– \(.trace | split("\n")[0])" else "" end)" + else + " (no tasks started)" + end) + ' + done + + connector:delete: + desc: Delete the S3 sink connector (keeps infrastructure running) + cmds: + - curl -s -X DELETE {{.CONNECT_URL}}/connectors/{{.CONNECTOR_NAME}} || true + + # ── Data loading ────────────────────────────────────────────────────────────── + + topic:reset: + desc: Delete and recreate the bench-events Kafka topic (clears all accumulated messages) + cmds: + - docker exec s3kc-kafka kafka-topics --bootstrap-server localhost:29092 --delete --topic bench-events 2>/dev/null || true + - sleep 2 + - docker exec s3kc-kafka kafka-topics --bootstrap-server localhost:29092 --create --if-not-exists --topic bench-events --partitions 16 --replication-factor 1 + - echo "topic reset" + + data:load: + desc: "Produce events to bench-events topic (e.g. task data:load COUNT=5000000)" + vars: + COUNT: '{{.COUNT | default "1000000"}}' + env: + COUNT: '{{.COUNT}}' + KAFKA_BOOTSTRAP: '{{.KAFKA_BOOTSTRAP}}' + cmds: + - go run ../../../../../../../cmd/redpanda-connect/main.go run ./producer.yaml + + # ── Observation ─────────────────────────────────────────────────────────────── + + bench:lag: + desc: Show current consumer lag for the S3 sink connector group + cmds: + - docker exec s3kc-kafka kafka-consumer-groups --bootstrap-server localhost:29092 --describe --group {{.CONSUMER_GROUP}} + + bench:offsets: + desc: Show end offsets (total messages) for bench-events + cmds: + - docker exec s3kc-kafka kafka-get-offsets --bootstrap-server localhost:29092 --topic bench-events + + bench:s3-count: + desc: Count objects written to the bench-events S3 bucket + cmds: + - docker exec s3kc-localstack awslocal s3 ls s3://bench-events --recursive | wc -l + + bench:measure: + desc: "Poll lag and print msg/s until topic is drained (e.g. task bench:measure TOTAL=1000000 INTERVAL=5)" + vars: + TOTAL: '{{.TOTAL | default "1000000"}}' + INTERVAL: '{{.INTERVAL | default "5"}}' + GROUP: '{{.GROUP | default "connect-s3-sink-bench"}}' + cmds: + - | + GROUP={{.GROUP}} + INTERVAL={{.INTERVAL}} + TOTAL={{.TOTAL}} + + get_lag() { + docker exec s3kc-kafka kafka-consumer-groups \ + --bootstrap-server localhost:29092 \ + --describe --group "$1" 2>/dev/null \ + | awk '/bench-events/ { + cur = ($4 == "-") ? 0 : $4 + sum += $5 - cur + } END {print (NR == 0 ? -1 : sum)}' + } + + # Three phases: + # 0 – waiting for consumer group to appear (lag == -1 or 0) + # 1 – group appeared; waiting for first commit (lag starts decreasing) + # 2 – timing active (measures from first commit to zero lag) + # APPEAR_TIME is set in phase 0β†’1 so that a batch-flush (lag jumps straight + # to 0 without an intermediate decrease) still gets an accurate elapsed time. + PHASE=0 INITIAL_LAG="" PREV_LAG="" START="" PREV_TIME="" APPEAR_TIME="" + printf "%-10s %-14s %s\n" "TIME" "LAG" "MSG/S" + + while true; do + sleep "$INTERVAL" + LAG=$(get_lag "$GROUP") + NOW=$(date +%s) + + if [ "$PHASE" -eq 0 ]; then + if [ "${LAG:--1}" -gt 0 ] 2>/dev/null; then + INITIAL_LAG=$LAG; PREV_LAG=$LAG; APPEAR_TIME=$NOW; PHASE=1 + printf "%-10s %-14s (starting up...)\n" "$(date +%H:%M:%S)" "$LAG" + else + printf "%-10s %-14s (waiting for group...)\n" "$(date +%H:%M:%S)" "${LAG:--}" + fi + continue + fi + + if [ "$PHASE" -eq 1 ]; then + if [ "$LAG" -eq 0 ]; then + # Batch flush: all messages committed at once β€” time from group appearance + ELAPSED=$((NOW - APPEAR_TIME)) + AVG=$(( INITIAL_LAG / (ELAPSED > 0 ? ELAPSED : 1) )) + printf "%-10s %-14s (batch flush)\n" "$(date +%H:%M:%S)" "$LAG" + echo "---" + echo "Total messages : $INITIAL_LAG" + echo "Elapsed : ${ELAPSED}s" + echo "Avg throughput : ${AVG} msg/s" + break + elif [ "$LAG" -lt "$INITIAL_LAG" ]; then + START=$NOW; PREV_LAG=$LAG; PREV_TIME=$NOW; PHASE=2 + printf "%-10s %-14s (timing started)\n" "$(date +%H:%M:%S)" "$LAG" + else + printf "%-10s %-14s (waiting for first commit...)\n" "$(date +%H:%M:%S)" "$LAG" + fi + continue + fi + + [ "${LAG:--1}" -lt 0 ] && LAG=0 + DELTA_T=$((NOW - PREV_TIME)) + DELTA_LAG=$((PREV_LAG - LAG)) + RATE=0 + [ "$DELTA_T" -gt 0 ] && RATE=$((DELTA_LAG / DELTA_T)) + printf "%-10s %-14s %s msg/s\n" "$(date +%H:%M:%S)" "$LAG" "$RATE" + + if [ "$LAG" -eq 0 ]; then + ELAPSED=$((NOW - START)) + AVG=$(( INITIAL_LAG / (ELAPSED > 0 ? ELAPSED : 1) )) + echo "---" + echo "Total messages : $INITIAL_LAG" + echo "Elapsed : ${ELAPSED}s" + echo "Avg throughput : ${AVG} msg/s" + break + fi + + PREV_LAG=$LAG; PREV_TIME=$NOW + done + + # ── Single parameterised run ────────────────────────────────────────────────── + + bench:run: + desc: | + Single parameterised benchmark run: delete connector, load fresh data, create connector, measure. + e.g. task bench:run COUNT=1000000 TASKS_MAX=8 FLUSH_SIZE=5000 POLL_RECORDS=10000 + vars: + COUNT: '{{.COUNT | default "1000000"}}' + cmds: + - task: connector:delete + - sleep 2 + - docker exec s3kc-localstack awslocal s3 rm s3://bench-events --recursive || true + - task: topic:reset + - task: data:load + vars: { COUNT: '{{.COUNT}}' } + - | + # Create connector and start timer immediately β€” don't wait for first poll. + # INITIAL_LAG is COUNT (known), START is connector registration time. + jq -n \ + --arg name "{{.CONNECTOR_NAME}}" \ + --arg tasks "{{.TASKS_MAX}}" \ + --arg flush "{{.FLUSH_SIZE}}" \ + --arg poll "{{.POLL_RECORDS}}" \ + --arg fetch "{{.FETCH_MIN_BYTES}}" \ + --arg part "{{.PART_SIZE}}" \ + --arg rotate "{{.ROTATE_SCHEDULE_MS}}" \ + '{ + name: $name, + config: { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "tasks.max": $tasks, + "topics": "bench-events", + "s3.region": "us-east-1", + "s3.bucket.name": "bench-events", + "store.url": "http://localstack:4566", + "s3.part.size": $part, + "flush.size": $flush, + "rotate.interval.ms": "-1", + "rotate.schedule.interval.ms": $rotate, + "timezone": "UTC", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "schema.compatibility": "NONE", + "consumer.override.fetch.min.bytes": $fetch, + "consumer.override.fetch.max.wait.ms": "500", + "consumer.override.max.poll.records": $poll + } + }' > /tmp/connector-cfg.json + RESP=$(curl -s -X POST {{.CONNECT_URL}}/connectors \ + -H "Content-Type: application/json" --data-binary @/tmp/connector-cfg.json) + rm -f /tmp/connector-cfg.json + if echo "$RESP" | jq -e '.error_code' > /dev/null 2>&1; then + echo "ERROR: $(echo "$RESP" | jq -r '.message')"; exit 1 + fi + echo "registered: $(echo "$RESP" | jq -r '.name')" + + START=$(date +%s) + INITIAL_LAG={{.COUNT}} + GROUP="connect-{{.CONNECTOR_NAME}}" + INTERVAL={{.MATRIX_INTERVAL}} + PREV_LAG=$INITIAL_LAG + PREV_TIME=$START + + get_lag() { + docker exec s3kc-kafka kafka-consumer-groups \ + --bootstrap-server localhost:29092 \ + --describe --group "$1" 2>/dev/null \ + | awk '/bench-events/ { + cur = ($4 == "-") ? 0 : $4 + sum += $5 - cur + } END {print (NR == 0 ? -1 : sum)}' + } + + printf "%-10s %-14s %s\n" "TIME" "LAG" "MSG/S" + printf "%-10s %-14s (registered)\n" "$(date +%H:%M:%S)" "$INITIAL_LAG" + + while true; do + sleep "$INTERVAL" + LAG=$(get_lag "$GROUP") + NOW=$(date +%s) + + if [ "${LAG:--1}" -lt 0 ] 2>/dev/null; then + printf "%-10s %-14s (warming up...)\n" "$(date +%H:%M:%S)" "-" + continue + fi + + [ "$LAG" -lt 0 ] && LAG=0 + DELTA_T=$((NOW - PREV_TIME)) + DELTA_LAG=$((PREV_LAG - LAG)) + RATE=0 + [ "$DELTA_T" -gt 0 ] && RATE=$((DELTA_LAG / DELTA_T)) + printf "%-10s %-14s %s msg/s\n" "$(date +%H:%M:%S)" "$LAG" "$RATE" + + if [ "$LAG" -eq 0 ]; then + ELAPSED=$((NOW - START)) + AVG=$(( INITIAL_LAG / (ELAPSED > 0 ? ELAPSED : 1) )) + echo "---" + echo "Total messages : $INITIAL_LAG" + echo "Elapsed : ${ELAPSED}s" + echo "Avg throughput : ${AVG} msg/s" + break + fi + + PREV_LAG=$LAG; PREV_TIME=$NOW + done + + # ── Matrix sweep ────────────────────────────────────────────────────────────── + + bench:matrix: + desc: | + Sweep all parameter combinations and print a comparison table. + Loads COUNT messages once, then runs every combination with a unique connector + (each gets its own consumer group reading from offset 0). + Override lists: TASKS_LIST="4 16" FLUSH_LIST="1000 50000" POLL_LIST="5000" + e.g. task bench:matrix COUNT=1000000 + vars: + COUNT: '{{.COUNT | default "1000000"}}' + cmds: + - task: connector:delete + - sleep 1 + - docker exec s3kc-localstack awslocal s3 rm s3://bench-events --recursive || true + - task: topic:reset + - task: data:load + vars: { COUNT: '{{.COUNT}}' } + - | + TASKS_LIST="{{.TASKS_LIST}}" + FLUSH_LIST="{{.FLUSH_LIST}}" + POLL_LIST="{{.POLL_LIST}}" + FETCH_MIN_LIST="{{.FETCH_MIN_LIST}}" + CONNECT_URL="{{.CONNECT_URL}}" + TOTAL="{{.COUNT}}" + INTERVAL="{{.MATRIX_INTERVAL}}" + ROTATE_SCHEDULE_MS="{{.ROTATE_SCHEDULE_MS}}" + + # Warn if COUNT is smaller than the largest flush.size. + # The connector only writes a file when flush.size records accumulate OR + # rotate.schedule.interval.ms fires. Small COUNT still works (the schedule + # flushes the remainder) but throughput numbers won't reflect pure flush-driven + # performance. Recommend COUNT >> max(FLUSH_LIST) for realistic results. + MAX_FLUSH=0 + for f in $FLUSH_LIST; do [ "$f" -gt "$MAX_FLUSH" ] && MAX_FLUSH=$f; done + if [ "$TOTAL" -lt "$MAX_FLUSH" ]; then + echo "WARNING: COUNT=$TOTAL is less than max FLUSH_SIZE=$MAX_FLUSH." + echo " Results will be dominated by rotate.schedule.interval.ms ($ROTATE_SCHEDULE_MS ms)." + echo " For throughput benchmarks use COUNT >> $MAX_FLUSH (e.g. COUNT=1000000)." + echo "" + fi + + get_lag() { + docker exec s3kc-kafka kafka-consumer-groups \ + --bootstrap-server localhost:29092 \ + --describe --group "$1" 2>/dev/null \ + | awk '/bench-events/ { + cur = ($4 == "-") ? 0 : $4 + sum += $5 - cur + } END {print (NR == 0 ? -1 : sum)}' + } + + RESULTS_FILE=$(mktemp) + printf "%-8s %-10s %-8s %-12s %-12s %s\n" \ + "TASKS" "FLUSH" "POLL" "FETCH_MIN" "ELAPSED(s)" "MSG/S" > "$RESULTS_FILE" + + RUN=0 + for tasks in $TASKS_LIST; do + for flush in $FLUSH_LIST; do + for poll in $POLL_LIST; do + for fetch_min in $FETCH_MIN_LIST; do + RUN=$((RUN + 1)) + NAME="s3-bench-$(printf '%03d' $RUN)" + GROUP="connect-${NAME}" + FETCH_LABEL="${fetch_min}" + [ "$fetch_min" -ge 1048576 ] && FETCH_LABEL="$((fetch_min / 1048576))MB" + + printf "\n=== run %d: tasks=%-3s flush=%-6s poll=%-6s fetch_min=%s ===\n" \ + "$RUN" "$tasks" "$flush" "$poll" "$FETCH_LABEL" + + # Register connector with this combination + jq -n \ + --arg name "$NAME" \ + --arg tasks "$tasks" \ + --arg flush "$flush" \ + --arg poll "$poll" \ + --arg fetch "$fetch_min" \ + --arg rotate "$ROTATE_SCHEDULE_MS" \ + '{ + name: $name, + config: { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "tasks.max": $tasks, + "topics": "bench-events", + "s3.region": "us-east-1", + "s3.bucket.name": "bench-events", + "store.url": "http://localstack:4566", + "s3.part.size": "5242880", + "flush.size": $flush, + "rotate.interval.ms": "-1", + "rotate.schedule.interval.ms": $rotate, + "timezone": "UTC", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "schema.compatibility": "NONE", + "consumer.override.fetch.min.bytes": $fetch, + "consumer.override.fetch.max.wait.ms": "500", + "consumer.override.max.poll.records": $poll + } + }' > /tmp/connector-cfg.json + RESP=$(curl -s -X POST "$CONNECT_URL/connectors" \ + -H "Content-Type: application/json" --data-binary @/tmp/connector-cfg.json) + rm -f /tmp/connector-cfg.json + if echo "$RESP" | jq -e '.error_code' > /dev/null 2>&1; then + echo " ERROR: $(echo "$RESP" | jq -r '.message')" + continue + fi + echo " connector registered: $(echo "$RESP" | jq -r '.name')" + + # Timer starts at connector registration; INITIAL_LAG=TOTAL (known). + COMBO_START=$(date +%s) + INITIAL_LAG=$TOTAL + PREV_LAG=$TOTAL + PREV_TIME=$COMBO_START + printf " %-10s %-14s %s\n" "TIME" "LAG" "MSG/S" + printf " %-10s %-14s (registered)\n" "$(date +%H:%M:%S)" "$INITIAL_LAG" + + while true; do + sleep "$INTERVAL" + LAG=$(get_lag "$GROUP") + NOW=$(date +%s) + + if [ "${LAG:--1}" -lt 0 ] 2>/dev/null; then + printf " %-10s %-14s (warming up...)\n" "$(date +%H:%M:%S)" "-" + continue + fi + + [ "$LAG" -lt 0 ] && LAG=0 + DELTA_T=$((NOW - PREV_TIME)) + DELTA_LAG=$((PREV_LAG - LAG)) + RATE=0 + [ "$DELTA_T" -gt 0 ] && RATE=$((DELTA_LAG / DELTA_T)) + printf " %-10s %-14s %s msg/s\n" "$(date +%H:%M:%S)" "$LAG" "$RATE" + + if [ "$LAG" -eq 0 ]; then + ELAPSED=$((NOW - COMBO_START)) + AVG=$(( INITIAL_LAG / (ELAPSED > 0 ? ELAPSED : 1) )) + printf " β†’ %ds, %s msg/s\n" "$ELAPSED" "$AVG" + printf "%-8s %-10s %-8s %-12s %-12s %s\n" \ + "$tasks" "$flush" "$poll" "$FETCH_LABEL" "$ELAPSED" "$AVG" \ + >> "$RESULTS_FILE" + break + fi + + PREV_LAG=$LAG; PREV_TIME=$NOW + done + + # Clean up connector (leaves consumer group offsets intact as history) + curl -s -X DELETE "$CONNECT_URL/connectors/$NAME" > /dev/null + sleep 2 + done + done + done + done + + echo "" + echo "════════════════════════════════════════════════════════════════" + echo " Matrix Results ($RUN combinations, COUNT=$TOTAL)" + echo "════════════════════════════════════════════════════════════════" + cat "$RESULTS_FILE" + rm -f "$RESULTS_FILE" + + # ── Logs ────────────────────────────────────────────────────────────────────── + + logs:connect: + desc: Follow Kafka Connect worker logs + cmds: + - docker compose logs -f kafka-connect + + logs:localstack: + desc: Follow LocalStack logs + cmds: + - docker compose logs -f localstack diff --git a/internal/impl/aws/s3/bench/write/kafka-connect/connector.json b/internal/impl/aws/s3/bench/write/kafka-connect/connector.json new file mode 100644 index 0000000000..418087ff28 --- /dev/null +++ b/internal/impl/aws/s3/bench/write/kafka-connect/connector.json @@ -0,0 +1,31 @@ +{ + "name": "s3-sink-bench", + "config": { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "tasks.max": "16", + "topics": "bench-events", + + "s3.region": "us-east-1", + "s3.bucket.name": "bench-events", + "store.url": "http://localstack:4566", + "s3.part.size": "5242880", + "s3.path.style.access.enabled": "true", + + "flush.size": "10000", + "rotate.interval.ms": "60000", + "rotate.schedule.interval.ms": "-1", + + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "schema.compatibility": "NONE", + + "consumer.override.fetch.min.bytes": "1048576", + "consumer.override.fetch.max.wait.ms": "500", + "consumer.override.max.poll.records": "5000" + } +} diff --git a/internal/impl/aws/s3/bench/write/kafka-connect/docker-compose.yaml b/internal/impl/aws/s3/bench/write/kafka-connect/docker-compose.yaml new file mode 100644 index 0000000000..5185360886 --- /dev/null +++ b/internal/impl/aws/s3/bench/write/kafka-connect/docker-compose.yaml @@ -0,0 +1,120 @@ +services: + # ── Kafka (KRaft, no ZooKeeper) ────────────────────────────────────────────── + kafka: + image: confluentinc/cp-kafka:7.7.8 + container_name: s3kc-kafka + cpus: 3 + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk" + healthcheck: + test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"] + interval: 10s + timeout: 10s + retries: 12 + + # ── LocalStack (S3 sink target) ─────────────────────────────────────────────── + localstack: + image: localstack/localstack:3 + container_name: s3kc-localstack + ports: + - "4566:4566" + environment: + SERVICES: s3 + DEFAULT_REGION: us-east-1 + AWS_DEFAULT_REGION: us-east-1 + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:4566/_localstack/health"] + interval: 5s + timeout: 5s + retries: 12 + + # ── Kafka Connect + S3 Sink plugin (built from Dockerfile) ─────────────────── + kafka-connect: + build: . + container_name: s3kc-connect + ports: + - "8083:8083" + environment: + CONNECT_BOOTSTRAP_SERVERS: kafka:29092 + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: s3-sink-bench-group + CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs + CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: _connect-status + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" + CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect + CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components + # Flush Kafka consumer offsets every 5s (default 60s) so lag polling reflects S3 progress + CONNECT_OFFSET_FLUSH_INTERVAL_MS: "5000" + # Give the JVM enough heap for 16 tasks each buffering large polls + S3 part buffers + KAFKA_HEAP_OPTS: "-Xms512m -Xmx2g" + # Static credentials for LocalStack + AWS_ACCESS_KEY_ID: test + AWS_SECRET_ACCESS_KEY: test + AWS_DEFAULT_REGION: us-east-1 + depends_on: + kafka: + condition: service_healthy + localstack: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8083/"] + interval: 10s + timeout: 10s + retries: 15 + + # ── Create bench-events Kafka topic ────────────────────────────────────────── + kafka-setup: + image: confluentinc/cp-kafka:7.7.8 + container_name: s3kc-kafka-setup + depends_on: + kafka: + condition: service_healthy + entrypoint: /bin/bash + command: + - -c + - | + kafka-topics --bootstrap-server kafka:29092 \ + --create --if-not-exists \ + --topic bench-events \ + --partitions 16 \ + --replication-factor 1 + echo "topic ready" + + # ── Create bench-events S3 bucket in LocalStack ─────────────────────────────── + localstack-setup: + image: amazon/aws-cli:latest + container_name: s3kc-localstack-setup + depends_on: + localstack: + condition: service_healthy + environment: + AWS_ACCESS_KEY_ID: test + AWS_SECRET_ACCESS_KEY: test + AWS_DEFAULT_REGION: us-east-1 + entrypoint: /bin/bash + command: + - -c + - | + aws --endpoint-url http://localstack:4566 s3 mb s3://bench-events + echo "bucket ready" diff --git a/internal/impl/aws/s3/bench/write/kafka-connect/producer.yaml b/internal/impl/aws/s3/bench/write/kafka-connect/producer.yaml new file mode 100644 index 0000000000..40182e1c4e --- /dev/null +++ b/internal/impl/aws/s3/bench/write/kafka-connect/producer.yaml @@ -0,0 +1,30 @@ +# Produces COUNT synthetic events to the bench-events Kafka topic. +# +# Usage: +# go run ../../../../../../cmd/redpanda-connect/main.go run ./producer.yaml +# Override count/brokers: +# COUNT=5000000 KAFKA_BOOTSTRAP=localhost:9092 \ +# go run ../../../../../../cmd/redpanda-connect/main.go run ./producer.yaml + +input: + generate: + count: ${COUNT:1000000} + interval: "" + mapping: | + root = { + "id": count("events"), + "category": "cat_" + random_int(min: 1, max: 10).string(), + "value": random_int(min: 1, max: 1000000).float64() / 100.0, + "ts": now().ts_unix_micro() + } + +output: + kafka_franz: + seed_brokers: + - ${KAFKA_BOOTSTRAP:localhost:9092} + topic: bench-events + compression: snappy + batching: + count: 3000 + byte_size: 4194304 # 4 MiB + period: 5s diff --git a/internal/impl/aws/s3/bench/write/redpanda-connect/README.md b/internal/impl/aws/s3/bench/write/redpanda-connect/README.md new file mode 100644 index 0000000000..bc0162ac75 --- /dev/null +++ b/internal/impl/aws/s3/bench/write/redpanda-connect/README.md @@ -0,0 +1,101 @@ +# Redpanda Connect S3 Sink β€” Benchmark + +Benchmarks Redpanda Connect (`kafka_franz` input β†’ `aws_s3` output) reading from Kafka and writing to S3 (LocalStack). Results are directly comparable to the sibling [Kafka Connect benchmark](../kafka-connect/). + +See [`docs/benchmark-results/aws-s3.md`](../../../../../docs/benchmark-results/aws-s3.md) for results and comparison with Kafka Connect. + +## Architecture + +``` +Redpanda Connect producer β†’ bench-events (16 partitions) β†’ N pipeline instances β†’ LocalStack S3 +``` + +- **Kafka** β€” KRaft broker, no ZooKeeper (`confluentinc/cp-kafka:7.7.8`) +- **LocalStack** β€” local S3 endpoint (`bench-events` bucket) +- **Pipeline** β€” pre-built Go binary; N instances share the same consumer group, so Kafka distributes the 16 partitions across them (equivalent to Kafka Connect `tasks.max`) +- **Message format** β€” plain JSON, `archive: lines` batching (one file per flush) + +## Prerequisites + +- Docker +- `task` (Taskfile runner) +- `go` (to build producer and pipeline) + +## Quickstart + +```bash +task up +task bench:run COUNT=3000000 +task down +``` + +## Pipeline Parameters + +All parameters are configurable as Taskfile vars: + +| Var | Default | Description | +|---|---|---| +| `INSTANCES` | `1` | Number of pipeline processes (Kafka distributes partitions across them) | +| `FLUSH_SIZE` | `10000` | Records per S3 object (`batching.count`) | +| `FLUSH_PERIOD` | `10s` | Time-based flush (`batching.period`) | +| `FETCH_MIN_BYTES` | `1048576` | Min bytes before broker responds (1 MiB) | + +Pass any of them to `bench:run`: + +```bash +task bench:run COUNT=3000000 FLUSH_SIZE=50000 INSTANCES=4 +``` + +## Matrix Benchmark + +Loads data once and runs every combination of `INSTANCES Γ— FLUSH_SIZE Γ— FETCH_MIN_BYTES`: + +```bash +task bench:matrix COUNT=3000000 +``` + +Default lists (24 combinations): + +| Var | Default values | +|---|---| +| `INSTANCES_LIST` | `1 2 4 8` | +| `FLUSH_LIST` | `5000 10000 50000` | +| `FETCH_MIN_LIST` | `1048576 4194304` | + +Override any list to narrow the sweep: + +```bash +task bench:matrix COUNT=3000000 INSTANCES_LIST="1 2 4" FLUSH_LIST="10000 50000" +``` + +Each combination gets a unique consumer group (`rpc-rpc-bench-NNN`) starting from offset 0, so data is loaded once and all combinations read the same messages. + +## Comparison with Kafka Connect + +| Parameter | Redpanda Connect | Kafka Connect equivalent | +|---|---|---| +| `INSTANCES` | N pipeline processes, same consumer group | `tasks.max` | +| `FLUSH_SIZE` | `batching.count` | `flush.size` | +| `FLUSH_PERIOD` | `batching.period` | `rotate.schedule.interval.ms` | +| `FETCH_MIN_BYTES` | `fetch_min_bytes` | `consumer.override.fetch.min.bytes` | + +Key architectural difference: each Redpanda Connect instance is a single Go process. Kafka distributes the 16 topic partitions evenly across all instances sharing the consumer group. Kafka Connect runs as N separate JVM tasks each owning a partition subset. + +## Tasks Reference + +| Task | Description | +|---|---| +| `task up` | Start Kafka and LocalStack, wait for readiness | +| `task down` | Stop and remove all containers and volumes | +| `task bench:run COUNT=N [params]` | Full single run: reset, load, measure, stop | +| `task bench:matrix COUNT=N [lists]` | Sweep all combinations, print table | +| `task pipeline:start [params]` | Start pipeline instance(s) in background | +| `task pipeline:stop` | Stop all background pipeline instances | +| `task pipeline:logs` | Tail pipeline log (`/tmp/rpc-bench.log`) | +| `task topic:reset` | Delete and recreate `bench-events` topic | +| `task data:load COUNT=N` | Produce N events to bench-events | +| `task bench:lag` | Show current consumer group lag | +| `task bench:offsets` | Show end offsets for bench-events | +| `task bench:s3-count` | Count objects written to S3 bucket | +| `task logs:connect` | Tail Redpanda Connect pipeline log | +| `task logs:localstack` | Follow LocalStack container logs | diff --git a/internal/impl/aws/s3/bench/write/redpanda-connect/Taskfile.yaml b/internal/impl/aws/s3/bench/write/redpanda-connect/Taskfile.yaml new file mode 100644 index 0000000000..238993dfbe --- /dev/null +++ b/internal/impl/aws/s3/bench/write/redpanda-connect/Taskfile.yaml @@ -0,0 +1,259 @@ +version: "3" + +silent: true + +vars: + KAFKA_BOOTSTRAP: '{{.KAFKA_BOOTSTRAP | default "localhost:9092"}}' + S3_ENDPOINT: '{{.S3_ENDPOINT | default "http://localhost:4566"}}' + CONSUMER_GROUP: '{{.CONSUMER_GROUP | default "rpc-s3-bench"}}' + + # ── Pipeline parameters ─────────────────────────────────────────────────────── + INSTANCES: '{{.INSTANCES | default "1"}}' + FLUSH_SIZE: '{{.FLUSH_SIZE | default "10000"}}' + FLUSH_PERIOD: '{{.FLUSH_PERIOD | default "10s"}}' + FETCH_MIN_BYTES: '{{.FETCH_MIN_BYTES | default "1048576"}}' + + # ── Matrix sweep lists ──────────────────────────────────────────────────────── + INSTANCES_LIST: '{{.INSTANCES_LIST | default "1 2 4 8"}}' + FLUSH_LIST: '{{.FLUSH_LIST | default "5000 10000 50000"}}' + FETCH_MIN_LIST: '{{.FETCH_MIN_LIST | default "1048576 4194304"}}' + MATRIX_INTERVAL: '{{.MATRIX_INTERVAL | default "7"}}' + + # Pre-built pipeline binary (avoids go run wrapper process) + PIPELINE_BIN: /tmp/rpc-bench-pipeline + + # PID file for the background pipeline process(es) β€” one PID per line + PID_FILE: /tmp/rpc-bench.pid + LOG_FILE: /tmp/rpc-bench.log + +tasks: + # ── Infrastructure ──────────────────────────────────────────────────────────── + + up: + desc: Start Kafka and LocalStack, wait until ready + cmds: + - docker compose up -d + - until curl -sf http://localhost:4566/_localstack/health > /dev/null 2>&1; do sleep 3; done + - until docker exec s3rpc-kafka kafka-broker-api-versions --bootstrap-server localhost:9092 > /dev/null 2>&1; do sleep 3; done + - echo "Infrastructure ready" + + down: + desc: Stop and remove all containers and volumes + cmds: + - docker compose down -v --remove-orphans + + # ── Pipeline lifecycle ──────────────────────────────────────────────────────── + + pipeline:build: + desc: Compile the pipeline binary (cached after first build) + cmds: + - go build -o {{.PIPELINE_BIN}} ../../../../../../../cmd/redpanda-connect/ + - echo "pipeline binary ready at {{.PIPELINE_BIN}}" + + pipeline:start: + desc: | + Start N Redpanda Connect pipeline instances as background processes (same consumer group). + e.g. task pipeline:start FLUSH_SIZE=50000 INSTANCES=4 + deps: [pipeline:build] + cmds: + - | + rm -f {{.PID_FILE}} + for i in $(seq 1 {{.INSTANCES}}); do + CONSUMER_GROUP={{.CONSUMER_GROUP}} \ + FLUSH_SIZE={{.FLUSH_SIZE}} \ + FLUSH_PERIOD={{.FLUSH_PERIOD}} \ + FETCH_MIN_BYTES={{.FETCH_MIN_BYTES}} \ + KAFKA_BOOTSTRAP={{.KAFKA_BOOTSTRAP}} \ + S3_ENDPOINT={{.S3_ENDPOINT}} \ + {{.PIPELINE_BIN}} run ./pipeline.yaml \ + >> {{.LOG_FILE}} 2>&1 & + echo $! >> {{.PID_FILE}} + done + echo "started {{.INSTANCES}} instance(s) (PIDs: $(tr '\n' ' ' < {{.PID_FILE}}))" + + pipeline:stop: + desc: Stop all background pipeline instances + cmds: + - | + if [ -f {{.PID_FILE}} ]; then + while IFS= read -r pid; do + kill -9 "$pid" 2>/dev/null || true + wait "$pid" 2>/dev/null || true + done < {{.PID_FILE}} + rm -f {{.PID_FILE}} + echo "pipeline stopped" + fi + + pipeline:logs: + desc: Tail the pipeline log + cmds: + - tail -f {{.LOG_FILE}} + + # ── Data management ─────────────────────────────────────────────────────────── + + topic:reset: + desc: Delete and recreate bench-events topic (clears all messages) + cmds: + - docker exec s3rpc-kafka kafka-topics --bootstrap-server localhost:29092 --delete --topic bench-events 2>/dev/null || true + - sleep 2 + - docker exec s3rpc-kafka kafka-topics --bootstrap-server localhost:29092 --create --if-not-exists --topic bench-events --partitions 16 --replication-factor 1 + - echo "topic reset" + + data:load: + desc: "Produce events to bench-events topic (e.g. task data:load COUNT=5000000)" + vars: + COUNT: '{{.COUNT | default "1000000"}}' + env: + COUNT: '{{.COUNT}}' + KAFKA_BOOTSTRAP: '{{.KAFKA_BOOTSTRAP}}' + cmds: + - go run ../../../../../../../cmd/redpanda-connect/main.go run ./producer.yaml + + # ── Observation ─────────────────────────────────────────────────────────────── + + bench:lag: + desc: Show current consumer group lag + cmds: + - docker exec s3rpc-kafka kafka-consumer-groups --bootstrap-server localhost:29092 --describe --group {{.CONSUMER_GROUP}} + + bench:offsets: + desc: Show end offsets for bench-events + cmds: + - docker exec s3rpc-kafka kafka-get-offsets --bootstrap-server localhost:29092 --topic bench-events + + bench:s3-count: + desc: Count objects written to the S3 bench-events bucket + cmds: + - docker exec s3rpc-localstack awslocal s3 ls s3://bench-events --recursive | wc -l + + # ── Single parameterised run ────────────────────────────────────────────────── + + bench:run: + desc: Full benchmark run + vars: + COUNT: '{{.COUNT | default "1000000"}}' + cmds: + - task: pipeline:stop + - task: pipeline:build + - docker exec s3rpc-localstack awslocal s3 rm s3://bench-events --recursive || true + - task: topic:reset + - task: data:load + vars: { COUNT: '{{.COUNT}}' } + + - | + set -e + + INSTANCES={{.INSTANCES}} + FLUSH_SIZE={{.FLUSH_SIZE}} + FLUSH_PERIOD={{.FLUSH_PERIOD}} + FETCH_MIN_BYTES={{.FETCH_MIN_BYTES}} + KAFKA_BOOTSTRAP={{.KAFKA_BOOTSTRAP}} + S3_ENDPOINT={{.S3_ENDPOINT}} + CONSUMER_GROUP={{.CONSUMER_GROUP}} + INTERVAL={{.MATRIX_INTERVAL}} + PIPELINE_BIN={{.PIPELINE_BIN}} + + get_lag() { + docker exec s3rpc-kafka kafka-consumer-groups \ + --bootstrap-server localhost:29092 \ + --describe --group "$1" 2>/dev/null \ + | awk '/bench-events/ { + cur = ($4 == "-") ? 0 : $4 + sum += $5 - cur + } END {print (NR == 0 ? -1 : sum)}' + } + + # start pipelines + for i in $(seq 1 "$INSTANCES"); do + CONSUMER_GROUP=$CONSUMER_GROUP \ + FLUSH_SIZE=$FLUSH_SIZE \ + FLUSH_PERIOD=$FLUSH_PERIOD \ + FETCH_MIN_BYTES=$FETCH_MIN_BYTES \ + KAFKA_BOOTSTRAP=$KAFKA_BOOTSTRAP \ + S3_ENDPOINT=$S3_ENDPOINT \ + "$PIPELINE_BIN" run ./pipeline.yaml \ + >> {{.LOG_FILE}} 2>&1 & + done + + echo "started $INSTANCES instance(s)" + + START=$(date +%s) + INITIAL_LAG={{.COUNT}} + + while true; do + sleep "$INTERVAL" + + LAG=$(get_lag "$CONSUMER_GROUP") + + if [ "${LAG:--1}" -lt 0 ] 2>/dev/null; then + continue + fi + + if [ "$LAG" -lt 0 ]; then + LAG=0 + fi + + if [ "$LAG" -eq 0 ]; then + NOW=$(date +%s) + ELAPSED=$((NOW - START)) + + if [ "$ELAPSED" -gt 0 ]; then + AVG=$((INITIAL_LAG / ELAPSED)) + else + AVG=$INITIAL_LAG + fi + + echo "---" + echo "Total messages : $INITIAL_LAG" + echo "Elapsed : ${ELAPSED}s" + echo "Avg throughput : ${AVG} msg/s" + + # kill children + pkill -P $$ 2>/dev/null || true + kill -9 0 2>/dev/null || true + + exit 0 + fi + done + + + + # ── Logs ────────────────────────────────────────────────────────────────────── + + logs:connect: + desc: Tail the Redpanda Connect pipeline log + cmds: + - tail -f {{.LOG_FILE}} + + logs:localstack: + desc: Follow LocalStack logs + cmds: + - docker compose logs -f localstack + + + bench:matrix: + desc: Sweep via bench:run + vars: + COUNT: '{{.COUNT | default "1000000"}}' + cmds: + - | + INSTANCES_LIST="{{.INSTANCES_LIST}}" + FLUSH_LIST="{{.FLUSH_LIST}}" + FETCH_MIN_LIST="{{.FETCH_MIN_LIST}}" + + for instances in $INSTANCES_LIST; do + for flush in $FLUSH_LIST; do + for fetch_min in $FETCH_MIN_LIST; do + + echo "" + echo "=== instances=$instances flush=$flush fetch_min=$fetch_min ===" + + task bench:run \ + COUNT={{.COUNT}} \ + INSTANCES=$instances \ + FLUSH_SIZE=$flush \ + FETCH_MIN_BYTES=$fetch_min + + done + done + done diff --git a/internal/impl/aws/s3/bench/write/redpanda-connect/docker-compose.yaml b/internal/impl/aws/s3/bench/write/redpanda-connect/docker-compose.yaml new file mode 100644 index 0000000000..9d9d150d7f --- /dev/null +++ b/internal/impl/aws/s3/bench/write/redpanda-connect/docker-compose.yaml @@ -0,0 +1,77 @@ +services: + kafka: + image: confluentinc/cp-kafka:7.7.8 + container_name: s3rpc-kafka + hostname: kafka + cpus: 3 + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk" + healthcheck: + test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"] + interval: 10s + timeout: 10s + retries: 12 + + localstack: + image: localstack/localstack:3 + container_name: s3rpc-localstack + ports: + - "4566:4566" + environment: + SERVICES: s3 + DEFAULT_REGION: us-east-1 + AWS_DEFAULT_REGION: us-east-1 + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:4566/_localstack/health"] + interval: 5s + timeout: 5s + retries: 12 + + kafka-setup: + image: confluentinc/cp-kafka:7.7.8 + container_name: s3rpc-kafka-setup + depends_on: + kafka: + condition: service_healthy + entrypoint: /bin/bash + command: + - -c + - | + kafka-topics --bootstrap-server kafka:29092 \ + --create --if-not-exists \ + --topic bench-events \ + --partitions 16 \ + --replication-factor 1 + echo "topic ready" + + localstack-setup: + image: amazon/aws-cli:latest + container_name: s3rpc-localstack-setup + depends_on: + localstack: + condition: service_healthy + environment: + AWS_ACCESS_KEY_ID: test + AWS_SECRET_ACCESS_KEY: test + AWS_DEFAULT_REGION: us-east-1 + entrypoint: /bin/bash + command: + - -c + - | + aws --endpoint-url http://localstack:4566 s3 mb s3://bench-events + echo "bucket ready" diff --git a/internal/impl/aws/s3/bench/write/redpanda-connect/pipeline.yaml b/internal/impl/aws/s3/bench/write/redpanda-connect/pipeline.yaml new file mode 100644 index 0000000000..875ce35afc --- /dev/null +++ b/internal/impl/aws/s3/bench/write/redpanda-connect/pipeline.yaml @@ -0,0 +1,47 @@ +# Reads from bench-events Kafka topic and writes batched JSON to S3 (LocalStack). +# +# Usage: +# CONSUMER_GROUP=rpc-s3-bench FLUSH_SIZE=10000 \ +# go run ../../../../../../cmd/redpanda-connect/main.go run ./pipeline.yaml +# +# Env vars: +# CONSUMER_GROUP Kafka consumer group (default: rpc-s3-bench) +# FLUSH_SIZE Records per S3 object (default: 10000) +# FLUSH_PERIOD Time-based flush, e.g. "10s" (default: 10s) +# THREADS pipeline.threads β€” parallel output workers (default: 1) +# FETCH_MIN_BYTES Kafka fetch_min_bytes (default: 1048576 = 1 MiB) +# KAFKA_BOOTSTRAP Broker address (default: localhost:9092) +# S3_ENDPOINT S3 endpoint URL (default: http://localhost:4566) + +http: + enabled: false + +pipeline: + threads: ${THREADS:1} + +input: + kafka_franz: + seed_brokers: + - ${KAFKA_BOOTSTRAP:localhost:9092} + topics: + - bench-events + consumer_group: ${CONSUMER_GROUP:rpc-s3-bench} + fetch_min_bytes: ${FETCH_MIN_BYTES:1048576} + fetch_max_wait: 500ms + +output: + aws_s3: + bucket: bench-events + path: 'topics/bench-events/partition=${! meta("kafka_partition") }/${! count("objects") }.json' + endpoint: ${S3_ENDPOINT:http://localhost:4566} + force_path_style_urls: true + region: us-east-1 + credentials: + id: test + secret: test + batching: + count: ${FLUSH_SIZE:10000} + period: ${FLUSH_PERIOD:10s} + processors: + - archive: + format: lines diff --git a/internal/impl/aws/s3/bench/write/redpanda-connect/producer.yaml b/internal/impl/aws/s3/bench/write/redpanda-connect/producer.yaml new file mode 100644 index 0000000000..40182e1c4e --- /dev/null +++ b/internal/impl/aws/s3/bench/write/redpanda-connect/producer.yaml @@ -0,0 +1,30 @@ +# Produces COUNT synthetic events to the bench-events Kafka topic. +# +# Usage: +# go run ../../../../../../cmd/redpanda-connect/main.go run ./producer.yaml +# Override count/brokers: +# COUNT=5000000 KAFKA_BOOTSTRAP=localhost:9092 \ +# go run ../../../../../../cmd/redpanda-connect/main.go run ./producer.yaml + +input: + generate: + count: ${COUNT:1000000} + interval: "" + mapping: | + root = { + "id": count("events"), + "category": "cat_" + random_int(min: 1, max: 10).string(), + "value": random_int(min: 1, max: 1000000).float64() / 100.0, + "ts": now().ts_unix_micro() + } + +output: + kafka_franz: + seed_brokers: + - ${KAFKA_BOOTSTRAP:localhost:9092} + topic: bench-events + compression: snappy + batching: + count: 3000 + byte_size: 4194304 # 4 MiB + period: 5s