Skip to content

Commit 3bba529

Browse files
committed
Complete notification publishing migration
* Enables notification router and removes feature flag. * Implements dex workflow for publishing. * Removes direct publishing to Kafka. * Adds partial rule filtering at notification emission time. Signed-off-by: nscuro <nscuro@protonmail.com>
1 parent 28613d3 commit 3bba529

150 files changed

Lines changed: 5317 additions & 2877 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

api/src/main/openapi/components/schemas/secrets/create-secret-request.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ properties:
2424
value:
2525
type: string
2626
minLength: 1
27-
maxLength: 1024
27+
maxLength: 4096
2828
required:
2929
- name
3030
- value

api/src/main/openapi/components/schemas/secrets/update-secret-request.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@ properties:
2626
description: >-
2727
The new value. Omit this field to retain the current value.
2828
minLength: 1
29-
maxLength: 1024
29+
maxLength: 4096

apiserver/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,7 @@
590590
--add-opens java.base/java.net=ALL-UNNAMED
591591
--add-opens java.base/java.lang=ALL-UNNAMED
592592
--add-opens java.base/java.util=ALL-UNNAMED
593+
--add-opens java.base/java.util.concurrent=ALL-UNNAMED
593594
-javaagent:${settings.localRepository}/org/mockito/mockito-core/${lib.mockito.version}/mockito-core-${lib.mockito.version}.jar
594595
-Xshare:off
595596
</argLine>

apiserver/src/main/java/org/dependencytrack/common/MdcKeys.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public final class MdcKeys {
3838
public static final String MDC_KAFKA_RECORD_PARTITION = "kafkaRecordPartition";
3939
public static final String MDC_KAFKA_RECORD_OFFSET = "kafkaRecordOffset";
4040
public static final String MDC_KAFKA_RECORD_KEY = "kafkaRecordKey";
41+
public static final String MDC_NOTIFICATION_GROUP = "notificationGroup";
42+
public static final String MDC_NOTIFICATION_ID = "notificationId";
43+
public static final String MDC_NOTIFICATION_LEVEL = "notificationLevel";
44+
public static final String MDC_NOTIFICATION_RULE_NAME = "notificationRuleName";
45+
public static final String MDC_NOTIFICATION_SCOPE = "notificationScope";
4146
public static final String MDC_PLUGIN = "plugin";
4247
public static final String MDC_PROJECT_NAME = "projectName";
4348
public static final String MDC_PROJECT_UUID = "projectUuid";

apiserver/src/main/java/org/dependencytrack/dev/DevServicesInitializer.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -154,20 +154,6 @@ public void contextInitialized(final ServletContextEvent event) {
154154
}
155155

156156
final var topicsToCreate = new ArrayList<>(List.of(
157-
new NewTopic(KafkaTopics.NOTIFICATION_ANALYZER.name(), 1, (short) 1),
158-
new NewTopic(KafkaTopics.NOTIFICATION_BOM.name(), 1, (short) 1),
159-
new NewTopic(KafkaTopics.NOTIFICATION_CONFIGURATION.name(), 1, (short) 1),
160-
new NewTopic(KafkaTopics.NOTIFICATION_DATASOURCE_MIRRORING.name(), 1, (short) 1),
161-
new NewTopic(KafkaTopics.NOTIFICATION_FILE_SYSTEM.name(), 1, (short) 1),
162-
new NewTopic(KafkaTopics.NOTIFICATION_INTEGRATION.name(), 1, (short) 1),
163-
new NewTopic(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY.name(), 1, (short) 1),
164-
new NewTopic(KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY.name(), 1, (short) 1),
165-
new NewTopic(KafkaTopics.NOTIFICATION_POLICY_VIOLATION.name(), 1, (short) 1),
166-
new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE.name(), 1, (short) 1),
167-
new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name(), 1, (short) 1),
168-
new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name(), 1, (short) 1),
169-
new NewTopic(KafkaTopics.NOTIFICATION_REPOSITORY.name(), 1, (short) 1),
170-
new NewTopic(KafkaTopics.NOTIFICATION_VEX.name(), 1, (short) 1),
171157
new NewTopic(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name(), 1, (short) 1),
172158
new NewTopic(KafkaTopics.REPO_META_ANALYSIS_RESULT.name(), 1, (short) 1),
173159
new NewTopic(KafkaTopics.VULN_ANALYSIS_COMMAND.name(), 1, (short) 1),

apiserver/src/main/java/org/dependencytrack/dex/DexEngineInitializer.java

Lines changed: 151 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,22 @@
2626
import org.dependencytrack.common.EncryptedPageTokenEncoder;
2727
import org.dependencytrack.common.datasource.DataSourceRegistry;
2828
import org.dependencytrack.common.health.HealthCheckRegistry;
29+
import org.dependencytrack.dex.activity.DeleteFilesActivity;
2930
import org.dependencytrack.dex.engine.api.DexEngine;
3031
import org.dependencytrack.dex.engine.api.DexEngineConfig;
3132
import org.dependencytrack.dex.engine.api.DexEngineFactory;
33+
import org.dependencytrack.dex.engine.api.TaskType;
34+
import org.dependencytrack.dex.engine.api.TaskWorkerOptions;
35+
import org.dependencytrack.dex.engine.api.request.CreateTaskQueueRequest;
36+
import org.dependencytrack.notification.PublishNotificationActivity;
37+
import org.dependencytrack.notification.PublishNotificationWorkflow;
38+
import org.dependencytrack.notification.templating.pebble.PebbleNotificationTemplateRendererFactory;
39+
import org.dependencytrack.persistence.jdbi.ConfigPropertyDao;
40+
import org.dependencytrack.plugin.PluginManager;
41+
import org.dependencytrack.proto.internal.workflow.v1.DeleteFilesArgument;
42+
import org.dependencytrack.proto.internal.workflow.v1.PublishNotificationActivityArg;
43+
import org.dependencytrack.proto.internal.workflow.v1.PublishNotificationWorkflowArg;
44+
import org.dependencytrack.secret.management.SecretManager;
3245
import org.eclipse.microprofile.config.Config;
3346
import org.eclipse.microprofile.config.ConfigProvider;
3447
import org.jspecify.annotations.Nullable;
@@ -38,10 +51,19 @@
3851
import javax.sql.DataSource;
3952
import java.io.IOException;
4053
import java.time.Duration;
54+
import java.util.Collection;
55+
import java.util.List;
56+
import java.util.Map;
4157
import java.util.Optional;
4258
import java.util.ServiceLoader;
59+
import java.util.regex.Pattern;
60+
import java.util.stream.StreamSupport;
4361

4462
import static java.util.Objects.requireNonNull;
63+
import static org.dependencytrack.dex.api.payload.PayloadConverters.protoConverter;
64+
import static org.dependencytrack.dex.api.payload.PayloadConverters.voidConverter;
65+
import static org.dependencytrack.model.ConfigPropertyConstants.GENERAL_BASE_URL;
66+
import static org.dependencytrack.persistence.jdbi.JdbiFactory.withJdbiHandle;
4567

4668
/**
4769
* @since 5.7.0
@@ -74,14 +96,69 @@ public void contextInitialized(ServletContextEvent event) {
7496
final var healthCheckRegistry = (HealthCheckRegistry) servletContext.getAttribute(HealthCheckRegistry.class.getName());
7597
requireNonNull(healthCheckRegistry, "healthCheckRegistry has not been initialized");
7698

99+
final var pluginManager = (PluginManager) servletContext.getAttribute(PluginManager.class.getName());
100+
requireNonNull(pluginManager, "pluginManager has not been initialized");
101+
102+
final var secretManager = (SecretManager) servletContext.getAttribute(SecretManager.class.getName());
103+
requireNonNull(pluginManager, "secretManager has not been initialized");
104+
105+
final var templateRendererFactory = new PebbleNotificationTemplateRendererFactory(
106+
Map.of("baseUrl", () -> withJdbiHandle(
107+
handle -> handle
108+
.attach(ConfigPropertyDao.class)
109+
.getOptionalValue(GENERAL_BASE_URL)
110+
.orElse(null))));
111+
77112
final var engineFactory = ServiceLoader.load(DexEngineFactory.class).findFirst().orElseThrow();
78113
engine = engineFactory.create(engineConfig);
79114

80-
// Register workflows and activities here.
115+
engine.registerWorkflow(
116+
new PublishNotificationWorkflow(),
117+
protoConverter(PublishNotificationWorkflowArg.class),
118+
voidConverter(),
119+
Duration.ofMinutes(1));
120+
engine.registerActivity(
121+
new DeleteFilesActivity(pluginManager),
122+
protoConverter(DeleteFilesArgument.class),
123+
voidConverter(),
124+
Duration.ofMinutes(1));
125+
engine.registerActivity(
126+
new PublishNotificationActivity(
127+
pluginManager,
128+
secretManager::getSecretValue,
129+
templateRendererFactory),
130+
protoConverter(PublishNotificationActivityArg.class),
131+
voidConverter(),
132+
Duration.ofMinutes(1));
133+
134+
ensureTaskQueues(engine, List.of(
135+
new CreateTaskQueueRequest(TaskType.WORKFLOW, "default", 1000),
136+
new CreateTaskQueueRequest(TaskType.ACTIVITY, "default", 1000),
137+
new CreateTaskQueueRequest(TaskType.ACTIVITY, "notifications", 25)));
138+
139+
for (final String workerName : getWorkflowWorkerNames(config)) {
140+
if (!isTaskWorkerEnabled(config, TaskType.WORKFLOW, workerName)) {
141+
LOGGER.info("Not registering workflow worker '{}' because it is disabled", workerName);
142+
continue;
143+
}
144+
LOGGER.info("Registering workflow worker '{}'", workerName);
145+
146+
final TaskWorkerOptions workerOptions =
147+
getTaskWorkerOptions(config, TaskType.WORKFLOW, workerName);
148+
engine.registerTaskWorker(workerOptions);
149+
}
81150

82-
// Create task queues here.
151+
for (final String workerName : getActivityWorkerNames(config)) {
152+
if (!isTaskWorkerEnabled(config, TaskType.ACTIVITY, workerName)) {
153+
LOGGER.info("Not registering activity worker '{}' because it is disabled", workerName);
154+
continue;
155+
}
156+
LOGGER.info("Registering activity worker '{}'", workerName);
83157

84-
// Register task workers here.
158+
final TaskWorkerOptions workerOptions =
159+
getTaskWorkerOptions(config, TaskType.ACTIVITY, workerName);
160+
engine.registerTaskWorker(workerOptions);
161+
}
85162

86163
LOGGER.info("Starting durable execution engine");
87164
healthCheckRegistry.addCheck(new DexEngineHealthCheck(engine));
@@ -177,6 +254,77 @@ private DexEngineConfig createEngineConfig() {
177254
return engineConfig;
178255
}
179256

257+
private void ensureTaskQueues(DexEngine engine, Collection<CreateTaskQueueRequest> requests) {
258+
for (final var request : requests) {
259+
final boolean created = engine.createTaskQueue(request);
260+
if (created) {
261+
LOGGER.info(
262+
"Created {} task queue '{}' with capacity {}",
263+
request.type().name().toLowerCase(),
264+
request.name(),
265+
request.capacity());
266+
}
267+
}
268+
}
269+
270+
private static final Pattern WORKFLOW_WORKER_PROPERTY_PATTERN =
271+
Pattern.compile("^dt\\.dex-engine\\.workflow-worker\\..+\\..+$");
272+
273+
private static List<String> getWorkflowWorkerNames(Config config) {
274+
return StreamSupport.stream(config.getPropertyNames().spliterator(), false)
275+
.filter(name -> WORKFLOW_WORKER_PROPERTY_PATTERN.matcher(name).matches())
276+
.map(name -> name.split("\\.", 5)[3])
277+
.distinct()
278+
.toList();
279+
}
280+
281+
private static final Pattern ACTIVITY_WORKER_PROPERTY_PATTERN =
282+
Pattern.compile("^dt\\.dex-engine\\.activity-worker\\..+\\..+$");
283+
284+
private static List<String> getActivityWorkerNames(Config config) {
285+
return StreamSupport.stream(config.getPropertyNames().spliterator(), false)
286+
.filter(name -> ACTIVITY_WORKER_PROPERTY_PATTERN.matcher(name).matches())
287+
.map(name -> name.split("\\.", 5)[3])
288+
.distinct()
289+
.toList();
290+
}
291+
292+
private static boolean isTaskWorkerEnabled(Config config, TaskType taskType, String name) {
293+
return config
294+
.getOptionalValue(
295+
switch (taskType) {
296+
case ACTIVITY -> "dt.dex-engine.activity-worker.%s.enabled".formatted(name);
297+
case WORKFLOW -> "dt.dex-engine.workflow-worker.%s.enabled".formatted(name);
298+
},
299+
boolean.class)
300+
.orElse(true);
301+
}
302+
303+
private static TaskWorkerOptions getTaskWorkerOptions(Config config, TaskType type, String name) {
304+
final var prefix = switch (type) {
305+
case ACTIVITY -> "dt.dex-engine.activity-worker.%s.".formatted(name);
306+
case WORKFLOW -> "dt.dex-engine.workflow-worker.%s.".formatted(name);
307+
};
308+
309+
final var queueName = config.getValue(prefix + "queue-name", String.class);
310+
final var maxConcurrency = config.getValue(prefix + "max-concurrency", int.class);
311+
final var minPollInterval = config
312+
.getOptionalValue(prefix + "min-poll-interval-ms", long.class)
313+
.map(Duration::ofMillis)
314+
.orElse(null);
315+
final IntervalFunction pollBackoffFunction = getBackoffFunction(config, prefix).orElse(null);
316+
317+
var options = new TaskWorkerOptions(type, name, queueName, maxConcurrency);
318+
if (minPollInterval != null) {
319+
options = options.withMinPollInterval(minPollInterval);
320+
}
321+
if (pollBackoffFunction != null) {
322+
options = options.withPollBackoffFunction(pollBackoffFunction);
323+
}
324+
325+
return options;
326+
}
327+
180328
private static Optional<IntervalFunction> getBackoffFunction(Config config, String prefix) {
181329
final Optional<Long> initialDelayMillis = config.getOptionalValue(prefix + ".initial-delay-ms", long.class);
182330
final Optional<Double> multiplier = config.getOptionalValue(prefix + ".multiplier", double.class);
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* This file is part of Dependency-Track.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* Copyright (c) OWASP Foundation. All Rights Reserved.
18+
*/
19+
package org.dependencytrack.dex.activity;
20+
21+
import org.dependencytrack.dex.api.Activity;
22+
import org.dependencytrack.dex.api.ActivityContext;
23+
import org.dependencytrack.dex.api.ActivitySpec;
24+
import org.dependencytrack.dex.api.failure.TerminalApplicationFailureException;
25+
import org.dependencytrack.filestorage.api.FileStorage;
26+
import org.dependencytrack.filestorage.proto.v1.FileMetadata;
27+
import org.dependencytrack.plugin.PluginManager;
28+
import org.dependencytrack.proto.internal.workflow.v1.DeleteFilesArgument;
29+
import org.jspecify.annotations.Nullable;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.stream.Collectors;
36+
37+
/**
38+
* @since 5.7.0
39+
*/
40+
@ActivitySpec(name = "delete-files")
41+
public final class DeleteFilesActivity implements Activity<DeleteFilesArgument, Void> {
42+
43+
private static final Logger LOGGER = LoggerFactory.getLogger(DeleteFilesActivity.class);
44+
45+
private final PluginManager pluginManager;
46+
47+
public DeleteFilesActivity(PluginManager pluginManager) {
48+
this.pluginManager = pluginManager;
49+
}
50+
51+
@Override
52+
public @Nullable Void execute(
53+
ActivityContext ctx,
54+
@Nullable DeleteFilesArgument argument) throws Exception {
55+
if (argument == null) {
56+
throw new TerminalApplicationFailureException("No argument provided");
57+
}
58+
if (argument.getFileMetadataCount() == 0) {
59+
return null;
60+
}
61+
62+
final Map<String, List<FileMetadata>> fileMetadataByProvider =
63+
argument.getFileMetadataList().stream()
64+
.collect(Collectors.groupingBy(FileMetadata::getProviderName));
65+
66+
for (final String providerName : fileMetadataByProvider.keySet()) {
67+
try (final var fileStorage = pluginManager.getExtension(FileStorage.class, providerName)) {
68+
69+
// TODO: Call fileStorage#deleteMany here once available.
70+
for (final FileMetadata fileMetadata : fileMetadataByProvider.get(providerName)) {
71+
LOGGER.debug("Deleting file {}", fileMetadata.getLocation());
72+
fileStorage.delete(fileMetadata);
73+
}
74+
}
75+
}
76+
77+
return null;
78+
}
79+
80+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* This file is part of Dependency-Track.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* Copyright (c) OWASP Foundation. All Rights Reserved.
18+
*/
19+
@NullMarked
20+
package org.dependencytrack.dex.activity;
21+
22+
import org.jspecify.annotations.NullMarked;

0 commit comments

Comments
 (0)