@@ -51,7 +51,7 @@ public class TopicOperatorMain implements Liveness, Readiness {
5151 /* test */ final BatchingTopicController controller ;
5252
5353 private SharedIndexInformer <KafkaTopic > informer ; // guarded by this
54- Thread shutdownHook ; // guarded by this
54+ /* test */ volatile Thread shutdownHook ;
5555
5656 private final ResourceEventHandler <KafkaTopic > resourceEventHandler ;
5757 private final HealthCheckAndMetricsServer healthAndMetricsServer ;
@@ -93,7 +93,7 @@ synchronized void start() {
9393 throw new IllegalStateException ();
9494 }
9595
96- shutdownHook = new Thread (this ::shutdown , "TopicOperator-shutdown-hook" );
96+ shutdownHook = new Thread (this ::stop , "TopicOperator-shutdown-hook" );
9797 LOGGER .infoOp ("Installing shutdown hook" );
9898 Runtime .getRuntime ().addShutdownHook (shutdownHook );
9999 LOGGER .infoOp ("Starting health and metrics" );
@@ -119,28 +119,17 @@ synchronized void start() {
119119 }
120120
121121 synchronized void stop () {
122+ // Execute the actual shutdown sequence (idempotent).
122123 if (shutdownHook == null ) {
123- throw new IllegalStateException ();
124+ LOGGER .debugOp ("Already shut down." );
125+ return ;
124126 }
125- // shutdown(), will be be invoked indirectly by calling
126- // hook.run() has the side effect of nullifying this.shutdownHook
127- // so retain a reference now so we have something to call
128- // removeShutdownHook() with.
129- var hook = shutdownHook ;
130- // Call run (not start()) on the thread so that shutdown() is executed
131- // on this thread.
132- shutdown ();
133- // stop() is _not_ called from the shutdown hook, so calling
134- // removeShutdownHook() should not cause IAE.
135- Runtime .getRuntime ().removeShutdownHook (hook );
136- }
137-
138- private synchronized void shutdown () {
127+ shutdownHook = null ;
128+ LOGGER .infoOp ("Shutdown initiated" );
139129 // Note: This method can be invoked on either via the shutdown hook thread or
140130 // on the thread(s) on which stop()/start() are called
141- LOGGER .infoOp ("Shutdown initiated" );
142131 try {
143- shutdownHook = null ;
132+ // Idempotent resource teardown.
144133 if (informer != null ) {
145134 informer .stop ();
146135 informer = null ;
@@ -152,6 +141,7 @@ private synchronized void shutdown() {
152141 this .healthAndMetricsServer .stop ();
153142 LOGGER .infoOp ("Shutdown completed normally" );
154143 } catch (InterruptedException e ) {
144+ Thread .currentThread ().interrupt ();
155145 LOGGER .infoOp ("Interrupted during shutdown" );
156146 throw new RuntimeException (e );
157147 }
@@ -170,7 +160,7 @@ public static void main(String[] args) {
170160 public boolean isAlive () {
171161 boolean running ;
172162 synchronized (this ) {
173- running = informer .isRunning ();
163+ running = informer != null && informer .isRunning ();
174164 }
175165 if (!running ) {
176166 LOGGER .infoOp ("isAlive returning false because informer is not running" );
@@ -184,7 +174,7 @@ public boolean isAlive() {
184174 public boolean isReady () {
185175 boolean running ;
186176 synchronized (this ) {
187- running = informer .isRunning ();
177+ running = informer != null && informer .isRunning ();
188178 }
189179 if (!running ) {
190180 LOGGER .infoOp ("isReady returning false because informer is not running" );
0 commit comments