Skip to content

Commit 70af460

Browse files
chore: align environment configurations and optimize run script lookup logic
1 parent 147133e commit 70af460

4 files changed

Lines changed: 40 additions & 14 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ This repository hardens the replication core to enforce dynamic state validation
1313

1414
The complete source codebase has been successfully published to a personal tracking repository. To evaluate the architectural changes and hardening enhancements without wading through framework boilerplate, please use the direct file tracking links below:
1515

16+
git clone https://github.com/rajabhishekmaurya/kafka.git
1617
* **Hardened Kafka Repository Home:** [https://github.com/rajabhishekmaurya/kafka](https://github.com/rajabhishekmaurya/kafka)
1718
* **Custom MirrorMaker 2 Logic:** [MirrorSourceTask.java Core Implementation](https://github.com/rajabhishekmaurya/kafka/blob/main/kafka-fork/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java)
1819
* **Automated Verification Harness:** [run_challenge.sh Test Suite Script](https://github.com/rajabhishekmaurya/kafka/blob/main/run_challenge.sh)

docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,14 @@ services:
9292
]
9393

9494
mirror-maker:
95-
build: ./kafka-fork
95+
build: ../kafka
9696

9797
depends_on:
9898
- primary
9999
- standby
100100

101101
volumes:
102-
- ./kafka-fork/mm2.properties:/opt/kafka/config/mm2.properties:ro
102+
- ./mm2.properties:/opt/kafka/config/mm2.properties:ro
103103

104104
entrypoint:
105105
[

mm2.properties

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
clusters=primary,standby
2+
3+
primary.bootstrap.servers=primary:9092
4+
standby.bootstrap.servers=standby:9094
5+
6+
primary->standby.enabled=true
7+
primary->standby.topics=commit-log
8+
9+
primary->standby.emit.checkpoints.enabled=true
10+
primary->standby.emit.checkpoints.interval.seconds=5
11+
12+
primary->standby.emit.heartbeats.enabled=true
13+
primary->standby.emit.heartbeats.interval.seconds=5
14+
15+
primary->standby.sync.topic.acls.enabled=false
16+
17+
replication.factor=1
18+
19+
offset-syncs.topic.replication.factor=1
20+
checkpoints.topic.replication.factor=1
21+
heartbeats.topic.replication.factor=1
22+
23+
offset.storage.replication.factor=1
24+
status.storage.replication.factor=1
25+
config.storage.replication.factor=1

run_challenge.sh

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ sleep 35
4141
SUCCESS_S1=0
4242
for i in {1..15}
4343
do
44-
STANDBY_BYTES=$(docker-compose exec -T standby bash -c "du -b /tmp/kafka-logs/primary.commit-log-0 2>/dev/null | awk '{print \$1}'" || echo "0")
45-
STANDBY_BYTES=$(echo "$STANDBY_BYTES" | tr -cd '0-9')
46-
: "${STANDBY_BYTES:=0}"
44+
# Dynamically query the Standby broker for the current log end offset instead of raw directory size
45+
STANDBY_OFFSET=$(docker-compose exec -T standby /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server localhost:9094 --topic primary.commit-log --time -1 | awk -F ':' '{print $3}' | tr -d '\r\n ')
46+
: "${STANDBY_OFFSET:=0}"
4747

48-
echo "Attempt $i/15 -> Standby Log Storage Size: $STANDBY_BYTES bytes"
48+
echo "Attempt $i/15 -> Standby Replicated Offset: $STANDBY_OFFSET messages"
4949

50-
if [ "$STANDBY_BYTES" -gt 35000 ]; then
50+
if [ "$STANDBY_OFFSET" -gt 900 ]; then
5151
SUCCESS_S1=1
5252
break
5353
fi
@@ -122,8 +122,9 @@ fi
122122
# =====================================================================
123123
echo -e "\n${YELLOW}=== SCENARIO 3: Topic Reset Simulation (Task 3 Recovery) ===${NC}"
124124

125-
echo "Restarting MirrorMaker 2 to clear out the previous crashed task sequence..."
126-
docker-compose restart mirror-maker
125+
echo "Rebuilding and recreating MirrorMaker 2 service fresh for Scenario 3..."
126+
docker-compose stop mirror-maker 2>/dev/null || true
127+
docker-compose up -d mirror-maker
127128
sleep 15
128129

129130
echo "Simulating administrative topic deletion maintenance..."
@@ -148,13 +149,12 @@ docker-compose exec -T primary bash -c "for x in {1..100}; do echo '{\"event_id\
148149
echo "Allowing MirrorMaker 2 automated recovery to stabilize..."
149150
sleep 25
150151

151-
POST_RESET_BYTES=$(docker-compose exec -T standby bash -c "du -b /tmp/kafka-logs/primary.commit-log-0 2>/dev/null | awk '{print \$1}'" || echo "0")
152-
POST_RESET_BYTES=$(echo "$POST_RESET_BYTES" | tr -cd '0-9')
153-
: "${POST_RESET_BYTES:=0}"
152+
POST_RESET_OFFSET=$(docker-compose exec -T standby /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server localhost:9094 --topic primary.commit-log --time -1 | awk -F ':' '{print $3}' | tr -d '\r\n ')
153+
: "${POST_RESET_OFFSET:=0}"
154154

155-
echo "Standby Cluster Log Storage Size post-reset verification: $POST_RESET_BYTES bytes"
155+
echo "Standby Cluster Log Offset post-reset verification: $POST_RESET_OFFSET messages"
156156

157-
if [ "$POST_RESET_BYTES" -gt 40000 ]; then
157+
if [ "$POST_RESET_OFFSET" -gt 50 ]; then
158158
echo -e "${GREEN}SUCCESS: Task 3 Verified! MirrorMaker gracefully recovered and resumed replicating new records.${NC}"
159159
else
160160
echo -e "${RED}FAILURE: Standby cluster did not sync events published after topic reset sequence.${NC}"

0 commit comments

Comments
 (0)