Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Ns4Kafka brings a namespace-based deployment model for Kafka resources, inspired
* [HTTP Client](#http-client)
* [Timeout](#timeout)
* [Retry](#retry)
* [Scheduler](#scheduler)
* [Sensitive Endpoints](#sensitive-endpoints)
* [RapiDoc](#rapidoc)
* [Administration](#administration)
Expand Down Expand Up @@ -570,6 +571,19 @@ ns4kafka:
multiplier: '2.0'
```

#### Scheduler

Ns4Kafka schedules the deployment of connectors and performs health checks on Kafka Connect platforms. The scheduling frequency can be configured using the following properties:

```yaml
ns4kafka:
scheduler:
connector:
interval-ms: 30000
connect:
interval-ms: 60000
```

#### Sensitive Endpoints

Micronaut sensitive endpoints can be enabled or disabled through the application configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
public class Ns4KafkaProperties {
private AkhqProperties akhq = new AkhqProperties();
private ConfluentCloudProperties confluentCloud = new ConfluentCloudProperties();
private SchedulerProperties scheduler = new SchedulerProperties();
private SecurityProperties security = new SecurityProperties();
private StoreProperties store = new StoreProperties();
private String version;
Expand Down Expand Up @@ -86,6 +87,28 @@ public static class StreamCatalogProperties {
}
}

@Getter
@Setter
@ConfigurationProperties("scheduler")
public static class SchedulerProperties {
private ConnectorProperties connector = new ConnectorProperties();
private ConnectProperties connect = new ConnectProperties();

@Getter
@Setter
@ConfigurationProperties("connector")
public static class ConnectorProperties {
private int intervalMs = 30000;
}

@Getter
@Setter
@ConfigurationProperties("connect")
public static class ConnectProperties {
private int intervalMs = 60000;
}
}

@Getter
@Setter
@ConfigurationProperties("security")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package com.michelin.ns4kafka.service.executor;

import com.michelin.ns4kafka.property.Ns4KafkaProperties;
import io.micronaut.runtime.event.ApplicationStartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;
import io.micronaut.scheduling.annotation.Scheduled;
Expand All @@ -37,6 +38,7 @@ public class KafkaAsyncExecutorScheduler {
private final List<AccessControlEntryAsyncExecutor> accessControlEntryAsyncExecutors;
private final List<ConnectorAsyncExecutor> connectorAsyncExecutors;
private final List<UserAsyncExecutor> userAsyncExecutors;
private final Ns4KafkaProperties.SchedulerProperties schedulerProperties;

/**
* Constructor.
Expand All @@ -50,11 +52,13 @@ public KafkaAsyncExecutorScheduler(
List<TopicAsyncExecutor> topicAsyncExecutors,
List<AccessControlEntryAsyncExecutor> accessControlEntryAsyncExecutors,
List<ConnectorAsyncExecutor> connectorAsyncExecutors,
List<UserAsyncExecutor> userAsyncExecutors) {
List<UserAsyncExecutor> userAsyncExecutors,
Ns4KafkaProperties.SchedulerProperties schedulerProperties) {
this.topicAsyncExecutors = topicAsyncExecutors;
this.accessControlEntryAsyncExecutors = accessControlEntryAsyncExecutors;
this.connectorAsyncExecutors = connectorAsyncExecutors;
this.userAsyncExecutors = userAsyncExecutors;
this.schedulerProperties = schedulerProperties;
}

/**
Expand Down Expand Up @@ -83,7 +87,9 @@ public void schedule() {

/** Schedule connector synchronization. */
public void scheduleConnectorSynchronization() {
Flux.interval(Duration.ofSeconds(12), Duration.ofSeconds(30))
Flux.interval(
Duration.ofSeconds(12),
Duration.ofMillis(schedulerProperties.getConnector().getIntervalMs()))
.onBackpressureDrop(
_ -> log.debug("Skipping next connector synchronization. The previous one is still running."))
.concatMap(_ -> Flux.fromIterable(connectorAsyncExecutors).flatMap(ConnectorAsyncExecutor::run))
Expand All @@ -95,7 +101,9 @@ public void scheduleConnectorSynchronization() {

/** Schedule connector synchronization. */
public void scheduleConnectHealthCheck() {
Flux.interval(Duration.ofSeconds(5), Duration.ofSeconds(60))
Flux.interval(
Duration.ofSeconds(5),
Duration.ofMillis(schedulerProperties.getConnect().getIntervalMs()))
.onBackpressureDrop(_ ->
log.debug("Skipping next Connect cluster health check. The previous one is still running."))
.concatMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,41 +155,11 @@ public void synchronizeTopics() {
createTopics.removeIf(
topic -> topicsToIgnore.contains(topic.getMetadata().getName()));

if (!createTopics.isEmpty()) {
log.debug(
"Topic(s) to create: {}",
String.join(
", ",
createTopics.stream()
.map(topic -> topic.getMetadata().getName())
.toList()));
}

if (!updateTopics.isEmpty()) {
log.debug(
"Topic(s) to update: {}",
String.join(
", ",
updateTopics.keySet().stream()
.map(ConfigResource::name)
.toList()));
for (Map.Entry<ConfigResource, Collection<AlterConfigOp>> e : updateTopics.entrySet()) {
for (AlterConfigOp op : e.getValue()) {
log.debug(
"{} {} {}({})",
e.getKey().name(),
op.opType().toString(),
op.configEntry().name(),
op.configEntry().value());
}
}
}

if (managedClusterProperties.isSyncKstreamTopics()) {
HashSet<String> ns4KafkaTopicNames = ns4KafkaTopics.stream()
Set<String> ns4KafkaTopicNames = ns4KafkaTopics.stream()
.map(topic -> topic.getMetadata().getName())
.filter(topic -> topic.endsWith("-changelog") || topic.endsWith("-repartition"))
.collect(Collectors.toCollection(HashSet::new));
.collect(Collectors.toSet());

List<Topic> unsyncStreamInternalTopics =
getUnsyncStreamsInternalTopics(brokerTopics, ns4KafkaTopicNames);
Expand All @@ -202,12 +172,43 @@ public void synchronizeTopics() {
unsyncStreamInternalTopics.stream()
.map(topic -> topic.getMetadata().getName())
.toList()));
importTopics(unsyncStreamInternalTopics);
}
importTopics(unsyncStreamInternalTopics);
}

createTopics(createTopics);
alterTopics(updateTopics, checkTopics);
if (!createTopics.isEmpty()) {
log.debug(
"Topic(s) to create: {}",
String.join(
", ",
createTopics.stream()
.map(topic -> topic.getMetadata().getName())
.toList()));
createTopics(createTopics);
}

if (!updateTopics.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug(
"Topic(s) to update: {}",
String.join(
", ",
updateTopics.keySet().stream()
.map(ConfigResource::name)
.toList()));
for (Map.Entry<ConfigResource, Collection<AlterConfigOp>> e : updateTopics.entrySet()) {
for (AlterConfigOp op : e.getValue()) {
log.debug(
"{} {} {}({})",
e.getKey().name(),
op.opType().toString(),
op.configEntry().name(),
op.configEntry().value());
}
}
}
alterTopics(updateTopics, checkTopics);
}
alterCatalogInfo(checkTopics, brokerTopics);
} catch (ExecutionException | TimeoutException | CancellationException | KafkaStoreException e) {
log.error("An error occurred during the topic synchronization", e);
Expand All @@ -225,16 +226,16 @@ public void synchronizeTopics() {
* @return A list of unsynchronized Kafka Streams internal topics
*/
private List<Topic> getUnsyncStreamsInternalTopics(
Map<String, Topic> brokerTopics, HashSet<String> ns4KafkaTopicNames) {
Map<String, Topic> brokerTopics, Set<String> ns4KafkaTopicNames) {
List<Namespace> namespaces = namespaceService.findAll();
return brokerTopics.values().stream()
// Keep topics that are not already in Ns4Kafka
.filter(topic ->
!ns4KafkaTopicNames.contains(topic.getMetadata().getName()))
// Keep Kafka Streams topics
.filter(topic -> topic.getMetadata().getName().endsWith("-changelog")
|| topic.getMetadata().getName().endsWith("-repartition"))
.filter(topic -> !topicsToIgnore.contains(topic.getMetadata().getName()))
// Keep topics that are not already in Ns4Kafka
.filter(topic ->
!ns4KafkaTopicNames.contains(topic.getMetadata().getName()))
.map(topic -> {
// Ignore internal cluster topics. Only keep topics covered by Ns4Kafka.
Optional<Namespace> namespace = namespaceService.findByTopicName(
Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ ns4kafka:
attempt: "5"
delay: "2s"
multiplier: "2.0"
scheduler:
connector:
interval-ms: 30000
connect:
interval-ms: 60000
security:
admin-group: "_"
aes256-encryption-key: "changeitchangeitchangeitchangeit"
Expand Down