I am using DrainingControl to toggle my kafka subscribe state.
public pause() {
this.drainingControl
.drainAndShutdown(system.executionContext())
.....
}
public start() {
this.drainingControl = this.kafkaGraph.run(this.system);
}
when I call the pause() method, the actor system complains StopFromStage wasn't delivered to kafka-consumer-actor...