1818 */
1919package org .dependencytrack .csaf ;
2020
21- import alpine .event .framework .Event ;
22- import alpine .event .framework .Subscriber ;
2321import io .csaf .retrieval .RetrievedDocument ;
24- import org .dependencytrack .common .pagination .PageIterator ;
22+ import org .dependencytrack .dex .api .Activity ;
23+ import org .dependencytrack .dex .api .ActivityContext ;
24+ import org .dependencytrack .dex .api .ActivitySpec ;
25+ import org .dependencytrack .dex .api .failure .TerminalApplicationFailureException ;
2526import org .dependencytrack .model .Advisory ;
2627import org .dependencytrack .model .Vulnerability ;
2728import org .dependencytrack .persistence .QueryManager ;
29+ import org .dependencytrack .proto .internal .workflow .v1 .ImportCsafDocumentsArg ;
2830import org .jdbi .v3 .core .Handle ;
31+ import org .jspecify .annotations .Nullable ;
2932import org .slf4j .Logger ;
3033import org .slf4j .LoggerFactory ;
3134import org .slf4j .MDC ;
3538import java .util .HashSet ;
3639import java .util .Iterator ;
3740import java .util .List ;
41+ import java .util .UUID ;
3842import java .util .concurrent .ExecutionException ;
3943import java .util .stream .Stream ;
4044
4347import static org .dependencytrack .persistence .jdbi .JdbiFactory .withJdbiHandle ;
4448
4549/**
46- * TODO: Refactor to dex workflow + activity once dex engine is integrated.
47- * * Use workflow instance ID to ensure only one workflow run per provider can exist.
48- * * Use same concurrency key across all providers to serialize their execution.
49- * * Create an "uber-workflow" that that triggers import for all providers.
50- * * Schedule the uber-workflow to run at least once daily.
51- *
5250 * @since 5.7.0
5351 */
54- public class CsafDocumentImportTask implements Subscriber {
52+ @ ActivitySpec (name = "import-csaf-documents" )
53+ public final class ImportCsafDocumentsActivity implements Activity <ImportCsafDocumentsArg , Void > {
5554
56- private static final Logger LOGGER = LoggerFactory .getLogger (CsafDocumentImportTask .class );
55+ private static final Logger LOGGER = LoggerFactory .getLogger (ImportCsafDocumentsActivity .class );
5756
5857 private final CsafClient csafClient ;
5958
60- CsafDocumentImportTask (CsafClient csafClient ) {
59+ ImportCsafDocumentsActivity (CsafClient csafClient ) {
6160 this .csafClient = csafClient ;
6261 }
6362
64- @ SuppressWarnings ("unused" )
65- public CsafDocumentImportTask () {
63+ public ImportCsafDocumentsActivity () {
6664 this (new CsafClient ());
6765 }
6866
6967 @ Override
70- public void inform (Event e ) {
71- if (!(e instanceof CsafDocumentImportEvent )) {
72- return ;
68+ public @ Nullable Void execute (
69+ ActivityContext ctx ,
70+ @ Nullable ImportCsafDocumentsArg arg ) throws Exception {
71+ if (arg == null ) {
72+ throw new TerminalApplicationFailureException ("No argument provided" );
7373 }
7474
75- final List <CsafProvider > providers = getEnabledProviders ();
76- if (providers .isEmpty ()) {
77- LOGGER .info ("No providers available to import documents from" );
78- return ;
75+ final CsafProvider provider = withJdbiHandle (
76+ handle -> handle .attach (CsafProviderDao .class ).getById (UUID .fromString (arg .getProviderId ())));
77+ if (provider == null ) {
78+ throw new TerminalApplicationFailureException (
79+ "Provider with ID %s does not exist" .formatted (arg .getProviderId ()));
7980 }
8081
81- for (final CsafProvider provider : providers ) {
82- try (var ignored = MDC .putCloseable ("csafProvider" , provider .getName ())) {
83- importDocuments (provider );
84- } catch (ExecutionException | RuntimeException ex ) {
85- LOGGER .error ("Failed to import CSAF documents" , ex );
86- } catch (InterruptedException ex ) {
87- LOGGER .warn ("Interrupted while importing CSAF documents" );
88- Thread .currentThread ().interrupt ();
89- break ;
82+ try (var ignored = MDC .putCloseable ("csafProvider" , provider .getName ())) {
83+ if (!provider .isEnabled ()) {
84+ LOGGER .info ("Provider is disabled" );
85+ return null ;
9086 }
87+
88+ importDocuments (ctx , provider );
9189 }
92- }
9390
94- private List <CsafProvider > getEnabledProviders () {
95- return withJdbiHandle (handle -> {
96- final var dao = handle .attach (CsafProviderDao .class );
97-
98- return PageIterator .stream (
99- pageToken -> dao .list (
100- new ListCsafProvidersQuery ()
101- .withEnabled (true )
102- .withPageToken (pageToken )))
103- .toList ();
104- });
91+ return null ;
10592 }
10693
107- private void importDocuments (CsafProvider provider ) throws ExecutionException , InterruptedException {
94+ private void importDocuments (
95+ ActivityContext ctx ,
96+ CsafProvider provider ) throws ExecutionException , InterruptedException {
10897 Instant latestDocumentReleaseDate = provider .getLatestDocumentReleaseDate ();
10998 if (latestDocumentReleaseDate != null ) {
11099 LOGGER .info ("Importing CSAF documents modified since {}" , latestDocumentReleaseDate );
@@ -119,6 +108,7 @@ private void importDocuments(CsafProvider provider) throws ExecutionException, I
119108 final var documentBatch = new ArrayList <RetrievedDocument >(25 );
120109 while (documentIterator .hasNext ()) {
121110 final RetrievedDocument doc = documentIterator .next ();
111+ ctx .maybeHeartbeat ();
122112
123113 final Instant docReleaseDate = doc .getJson ().getDocument ().getTracking ().getCurrent_release_date ().getValue$kotlinx_datetime ();
124114 if (latestDocumentReleaseDate == null || latestDocumentReleaseDate .isBefore (docReleaseDate )) {
@@ -133,6 +123,7 @@ private void importDocuments(CsafProvider provider) throws ExecutionException, I
133123 }
134124
135125 if (!documentBatch .isEmpty ()) {
126+ ctx .maybeHeartbeat ();
136127 processDocuments (documentBatch );
137128 documentBatch .clear ();
138129 }
0 commit comments