Skip to content

Commit 0b11324

Browse files
committed
Convert CSAF tasks to workflows
Enables provider discovery and document import to be addressable by workflow instance ID, while also effectively prevent multiple instances for the same aggregator / provider to run concurrently. https://github.com/DependencyTrack/hyades-apiserver/blob/main/dex/docs/PATTERNS.md#singletons Signed-off-by: nscuro <nscuro@protonmail.com>
1 parent 13a1104 commit 0b11324

20 files changed

Lines changed: 501 additions & 235 deletions

File tree

api/src/main/openapi/paths/csaf/aggregators__id__provider-discovery.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,7 @@ post:
4242
$ref: "../../components/responses/generic-forbidden-error.yaml"
4343
"404":
4444
$ref: "../../components/responses/generic-not-found-error.yaml"
45+
"409":
46+
$ref: "../../components/responses/generic-conflict-error.yaml"
4547
default:
4648
$ref: "../../components/responses/generic-error.yaml"

apiserver/src/main/java/org/dependencytrack/csaf/CsafProviderDiscoveryEvent.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

apiserver/src/main/java/org/dependencytrack/csaf/CsafProviderDiscoveryTask.java renamed to apiserver/src/main/java/org/dependencytrack/csaf/DiscoverCsafProvidersActivity.java

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,73 +18,72 @@
1818
*/
1919
package org.dependencytrack.csaf;
2020

21-
import alpine.event.framework.Event;
22-
import alpine.event.framework.Subscriber;
21+
import org.dependencytrack.dex.api.Activity;
22+
import org.dependencytrack.dex.api.ActivityContext;
23+
import org.dependencytrack.dex.api.ActivitySpec;
24+
import org.dependencytrack.dex.api.failure.TerminalApplicationFailureException;
25+
import org.dependencytrack.proto.internal.workflow.v1.DiscoverCsafProvidersArg;
26+
import org.jspecify.annotations.Nullable;
2327
import org.slf4j.Logger;
2428
import org.slf4j.LoggerFactory;
2529
import org.slf4j.MDC;
2630

2731
import java.time.Instant;
2832
import java.util.List;
29-
import java.util.concurrent.ExecutionException;
33+
import java.util.UUID;
3034

3135
import static org.dependencytrack.persistence.jdbi.JdbiFactory.inJdbiTransaction;
3236
import static org.dependencytrack.persistence.jdbi.JdbiFactory.useJdbiTransaction;
37+
import static org.dependencytrack.persistence.jdbi.JdbiFactory.withJdbiHandle;
3338

3439
/**
35-
* TODO: Refactor to dex workflow + activity once dex engine is integrated.
36-
* * Use workflow instance ID to ensure only one workflow run per aggregator can exist.
37-
* * Use same concurrency key across all aggregators to serialize their execution.
38-
*
3940
* @since 5.7.0
4041
*/
41-
public class CsafProviderDiscoveryTask implements Subscriber {
42+
@ActivitySpec(name = "discover-csaf-providers")
43+
public final class DiscoverCsafProvidersActivity implements Activity<DiscoverCsafProvidersArg, Void> {
4244

43-
private static final Logger LOGGER = LoggerFactory.getLogger(CsafProviderDiscoveryTask.class);
45+
private static final Logger LOGGER = LoggerFactory.getLogger(DiscoverCsafProvidersActivity.class);
4446

4547
private final CsafClient csafClient;
4648

47-
CsafProviderDiscoveryTask(CsafClient csafClient) {
49+
DiscoverCsafProvidersActivity(CsafClient csafClient) {
4850
this.csafClient = csafClient;
4951
}
5052

51-
@SuppressWarnings("unused") // Used by event system.
52-
public CsafProviderDiscoveryTask() {
53+
public DiscoverCsafProvidersActivity() {
5354
this(new CsafClient());
5455
}
5556

5657
@Override
57-
public void inform(Event e) {
58-
if (!(e instanceof final CsafProviderDiscoveryEvent event)) {
59-
return;
58+
public @Nullable Void execute(
59+
ActivityContext ctx,
60+
@Nullable DiscoverCsafProvidersArg arg) throws Exception {
61+
if (arg == null) {
62+
throw new TerminalApplicationFailureException("No argument provided");
6063
}
6164

62-
final CsafAggregator aggregator = event.getAggregator();
65+
final CsafAggregator aggregator = withJdbiHandle(
66+
handle -> handle.attach(CsafAggregatorDao.class).getById(UUID.fromString(arg.getAggregatorId())));
67+
if (aggregator == null) {
68+
throw new TerminalApplicationFailureException(
69+
"Aggregator with ID %s does not exist".formatted(arg.getAggregatorId()));
70+
}
6371

6472
try (var ignored = MDC.putCloseable("csafAggregator", aggregator.getNamespace().toString())) {
6573
LOGGER.info("Discovering providers");
6674

67-
final List<CsafProvider> discoveredProviders;
68-
try {
69-
discoveredProviders =
70-
csafClient.discoverProviders(aggregator)
71-
.peek(provider -> {
72-
provider.setEnabled(false);
73-
provider.setDiscoveredFrom(aggregator.getId());
74-
provider.setDiscoveredAt(Instant.now());
75-
})
76-
.toList();
77-
} catch (ExecutionException | RuntimeException ex) {
78-
throw new IllegalStateException("Failed to discover providers", ex);
79-
} catch (InterruptedException ex) {
80-
LOGGER.warn("Interrupted while discovering providers", ex);
81-
Thread.currentThread().interrupt();
82-
return;
83-
}
75+
final List<CsafProvider> discoveredProviders =
76+
csafClient.discoverProviders(aggregator)
77+
.peek(provider -> {
78+
provider.setEnabled(false);
79+
provider.setDiscoveredFrom(aggregator.getId());
80+
provider.setDiscoveredAt(Instant.now());
81+
})
82+
.toList();
8483

8584
if (discoveredProviders.isEmpty()) {
8685
LOGGER.info("No providers discovered");
87-
return;
86+
return null;
8887
}
8988

9089
LOGGER.info("Discovered {} providers", discoveredProviders.size());
@@ -98,6 +97,8 @@ public void inform(Event e) {
9897
useJdbiTransaction(handle -> handle.attach(CsafAggregatorDao.class)
9998
.updateLastDiscoveryAtById(aggregator.getId(), Instant.now()));
10099
}
100+
101+
return null;
101102
}
102103

103104
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* This file is part of Dependency-Track.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* Copyright (c) OWASP Foundation. All Rights Reserved.
18+
*/
19+
package org.dependencytrack.csaf;
20+
21+
import org.dependencytrack.dex.api.Workflow;
22+
import org.dependencytrack.dex.api.WorkflowContext;
23+
import org.dependencytrack.dex.api.WorkflowSpec;
24+
import org.dependencytrack.dex.api.failure.TerminalApplicationFailureException;
25+
import org.dependencytrack.proto.internal.workflow.v1.DiscoverCsafProvidersArg;
26+
import org.jspecify.annotations.Nullable;
27+
28+
/**
29+
* @since 5.7.0
30+
*/
31+
@WorkflowSpec(name = "discover-csaf-providers")
32+
public final class DiscoverCsafProvidersWorkflow implements Workflow<DiscoverCsafProvidersArg, Void> {
33+
34+
@Override
35+
public @Nullable Void execute(
36+
WorkflowContext<DiscoverCsafProvidersArg> ctx,
37+
@Nullable DiscoverCsafProvidersArg arg) throws Exception {
38+
if (arg == null) {
39+
throw new TerminalApplicationFailureException("No argument provided");
40+
}
41+
42+
ctx.activity(DiscoverCsafProvidersActivity.class).call(arg).await();
43+
return null;
44+
}
45+
46+
}

apiserver/src/main/java/org/dependencytrack/csaf/CsafDocumentImportTask.java renamed to apiserver/src/main/java/org/dependencytrack/csaf/ImportCsafDocumentsActivity.java

Lines changed: 34 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818
*/
1919
package org.dependencytrack.csaf;
2020

21-
import alpine.event.framework.Event;
22-
import alpine.event.framework.Subscriber;
2321
import io.csaf.retrieval.RetrievedDocument;
24-
import org.dependencytrack.common.pagination.PageIterator;
22+
import org.dependencytrack.dex.api.Activity;
23+
import org.dependencytrack.dex.api.ActivityContext;
24+
import org.dependencytrack.dex.api.ActivitySpec;
25+
import org.dependencytrack.dex.api.failure.TerminalApplicationFailureException;
2526
import org.dependencytrack.model.Advisory;
2627
import org.dependencytrack.model.Vulnerability;
2728
import org.dependencytrack.persistence.QueryManager;
29+
import org.dependencytrack.proto.internal.workflow.v1.ImportCsafDocumentsArg;
2830
import org.jdbi.v3.core.Handle;
31+
import org.jspecify.annotations.Nullable;
2932
import org.slf4j.Logger;
3033
import org.slf4j.LoggerFactory;
3134
import org.slf4j.MDC;
@@ -35,6 +38,7 @@
3538
import java.util.HashSet;
3639
import java.util.Iterator;
3740
import java.util.List;
41+
import java.util.UUID;
3842
import java.util.concurrent.ExecutionException;
3943
import java.util.stream.Stream;
4044

@@ -43,68 +47,53 @@
4347
import static org.dependencytrack.persistence.jdbi.JdbiFactory.withJdbiHandle;
4448

4549
/**
46-
* TODO: Refactor to dex workflow + activity once dex engine is integrated.
47-
* * Use workflow instance ID to ensure only one workflow run per provider can exist.
48-
* * Use same concurrency key across all providers to serialize their execution.
49-
* * Create an "uber-workflow" that that triggers import for all providers.
50-
* * Schedule the uber-workflow to run at least once daily.
51-
*
5250
* @since 5.7.0
5351
*/
54-
public class CsafDocumentImportTask implements Subscriber {
52+
@ActivitySpec(name = "import-csaf-documents")
53+
public final class ImportCsafDocumentsActivity implements Activity<ImportCsafDocumentsArg, Void> {
5554

56-
private static final Logger LOGGER = LoggerFactory.getLogger(CsafDocumentImportTask.class);
55+
private static final Logger LOGGER = LoggerFactory.getLogger(ImportCsafDocumentsActivity.class);
5756

5857
private final CsafClient csafClient;
5958

60-
CsafDocumentImportTask(CsafClient csafClient) {
59+
ImportCsafDocumentsActivity(CsafClient csafClient) {
6160
this.csafClient = csafClient;
6261
}
6362

64-
@SuppressWarnings("unused")
65-
public CsafDocumentImportTask() {
63+
public ImportCsafDocumentsActivity() {
6664
this(new CsafClient());
6765
}
6866

6967
@Override
70-
public void inform(Event e) {
71-
if (!(e instanceof CsafDocumentImportEvent)) {
72-
return;
68+
public @Nullable Void execute(
69+
ActivityContext ctx,
70+
@Nullable ImportCsafDocumentsArg arg) throws Exception {
71+
if (arg == null) {
72+
throw new TerminalApplicationFailureException("No argument provided");
7373
}
7474

75-
final List<CsafProvider> providers = getEnabledProviders();
76-
if (providers.isEmpty()) {
77-
LOGGER.info("No providers available to import documents from");
78-
return;
75+
final CsafProvider provider = withJdbiHandle(
76+
handle -> handle.attach(CsafProviderDao.class).getById(UUID.fromString(arg.getProviderId())));
77+
if (provider == null) {
78+
throw new TerminalApplicationFailureException(
79+
"Provider with ID %s does not exist".formatted(arg.getProviderId()));
7980
}
8081

81-
for (final CsafProvider provider : providers) {
82-
try (var ignored = MDC.putCloseable("csafProvider", provider.getName())) {
83-
importDocuments(provider);
84-
} catch (ExecutionException | RuntimeException ex) {
85-
LOGGER.error("Failed to import CSAF documents", ex);
86-
} catch (InterruptedException ex) {
87-
LOGGER.warn("Interrupted while importing CSAF documents");
88-
Thread.currentThread().interrupt();
89-
break;
82+
try (var ignored = MDC.putCloseable("csafProvider", provider.getName())) {
83+
if (!provider.isEnabled()) {
84+
LOGGER.info("Provider is disabled");
85+
return null;
9086
}
87+
88+
importDocuments(ctx, provider);
9189
}
92-
}
9390

94-
private List<CsafProvider> getEnabledProviders() {
95-
return withJdbiHandle(handle -> {
96-
final var dao = handle.attach(CsafProviderDao.class);
97-
98-
return PageIterator.stream(
99-
pageToken -> dao.list(
100-
new ListCsafProvidersQuery()
101-
.withEnabled(true)
102-
.withPageToken(pageToken)))
103-
.toList();
104-
});
91+
return null;
10592
}
10693

107-
private void importDocuments(CsafProvider provider) throws ExecutionException, InterruptedException {
94+
private void importDocuments(
95+
ActivityContext ctx,
96+
CsafProvider provider) throws ExecutionException, InterruptedException {
10897
Instant latestDocumentReleaseDate = provider.getLatestDocumentReleaseDate();
10998
if (latestDocumentReleaseDate != null) {
11099
LOGGER.info("Importing CSAF documents modified since {}", latestDocumentReleaseDate);
@@ -119,6 +108,7 @@ private void importDocuments(CsafProvider provider) throws ExecutionException, I
119108
final var documentBatch = new ArrayList<RetrievedDocument>(25);
120109
while (documentIterator.hasNext()) {
121110
final RetrievedDocument doc = documentIterator.next();
111+
ctx.maybeHeartbeat();
122112

123113
final Instant docReleaseDate = doc.getJson().getDocument().getTracking().getCurrent_release_date().getValue$kotlinx_datetime();
124114
if (latestDocumentReleaseDate == null || latestDocumentReleaseDate.isBefore(docReleaseDate)) {
@@ -133,6 +123,7 @@ private void importDocuments(CsafProvider provider) throws ExecutionException, I
133123
}
134124

135125
if (!documentBatch.isEmpty()) {
126+
ctx.maybeHeartbeat();
136127
processDocuments(documentBatch);
137128
documentBatch.clear();
138129
}

0 commit comments

Comments
 (0)