Skip to content

Commit a72ac75

Browse files
feat: P4ADEV-3720 implement SEND stream consume workflow (1/2) (#367)
Added new endpoint for starting stream event consume workflow (modified openapi); added SendNotificationStreamConsumeWFImpl skeleton (will add implementation in P4ADEV-3720 after activity pr on same ticket); added TaskQueue SendNotificationStreamConsumeWF (used by SendNotificationStreamConsumeWFImpl) and SendNotificationStreamConsumeWF_LOCAL (will be used by PublishSendNotificationPaymentEventActivityImpl).
1 parent d0536a9 commit a72ac75

File tree

14 files changed

+330
-43
lines changed

14 files changed

+330
-43
lines changed

openapi/generated.openapi.json

Lines changed: 84 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1682,6 +1682,47 @@
16821682
} ]
16831683
}
16841684
},
1685+
"/workflowhub/workflow/send-notification/streams/{streamId}/consume" : {
1686+
"get" : {
1687+
"tags" : [ "SendNotification" ],
1688+
"description" : "Start send stream events consume process.",
1689+
"operationId" : "consumeSendStream",
1690+
"parameters" : [ {
1691+
"name" : "streamId",
1692+
"in" : "path",
1693+
"description" : "The id of send stream.",
1694+
"required" : true,
1695+
"schema" : {
1696+
"type" : "string"
1697+
}
1698+
} ],
1699+
"responses" : {
1700+
"200" : {
1701+
"description" : "Consume of stream started",
1702+
"content" : {
1703+
"application/json" : {
1704+
"schema" : {
1705+
"$ref" : "#/components/schemas/WorkflowCreatedDTO"
1706+
}
1707+
}
1708+
}
1709+
},
1710+
"500" : {
1711+
"description" : "Server Error",
1712+
"content" : {
1713+
"application/json" : {
1714+
"schema" : {
1715+
"$ref" : "#/components/schemas/WorkflowErrorDTO"
1716+
}
1717+
}
1718+
}
1719+
}
1720+
},
1721+
"security" : [ {
1722+
"BearerAuth" : [ ]
1723+
} ]
1724+
}
1725+
},
16851726
"/workflowhub/schedules/{scheduleId}/info" : {
16861727
"get" : {
16871728
"tags" : [ "Schedule" ],
@@ -1832,27 +1873,6 @@
18321873
}
18331874
}
18341875
},
1835-
"PageMetadata" : {
1836-
"type" : "object",
1837-
"properties" : {
1838-
"size" : {
1839-
"type" : "integer",
1840-
"format" : "int64"
1841-
},
1842-
"totalElements" : {
1843-
"type" : "integer",
1844-
"format" : "int64"
1845-
},
1846-
"totalPages" : {
1847-
"type" : "integer",
1848-
"format" : "int64"
1849-
},
1850-
"number" : {
1851-
"type" : "integer",
1852-
"format" : "int64"
1853-
}
1854-
}
1855-
},
18561876
"DebtPositionSyncWfName" : {
18571877
"type" : "string",
18581878
"enum" : [ "fineWf" ]
@@ -1899,6 +1919,27 @@
18991919
}
19001920
}
19011921
},
1922+
"PageMetadata" : {
1923+
"type" : "object",
1924+
"properties" : {
1925+
"size" : {
1926+
"type" : "integer",
1927+
"format" : "int64"
1928+
},
1929+
"totalElements" : {
1930+
"type" : "integer",
1931+
"format" : "int64"
1932+
},
1933+
"totalPages" : {
1934+
"type" : "integer",
1935+
"format" : "int64"
1936+
},
1937+
"number" : {
1938+
"type" : "integer",
1939+
"format" : "int64"
1940+
}
1941+
}
1942+
},
19021943
"WfExecutionConfig" : {
19031944
"type" : "object",
19041945
"discriminator" : {
@@ -2573,6 +2614,28 @@
25732614
},
25742615
"required" : [ "createdAt", "lastExecution", "lastUpdatedAt", "nextActionTimes", "numActions", "numActionsMissedCatchupWindow", "numActionsSkippedOverlap", "recentActions", "runningActions", "scheduleId" ]
25752616
},
2617+
"PagedModelWorkflowTypeOrg" : {
2618+
"type" : "object",
2619+
"properties" : {
2620+
"_embedded" : {
2621+
"type" : "object",
2622+
"properties" : {
2623+
"workflowTypeOrgs" : {
2624+
"type" : "array",
2625+
"items" : {
2626+
"$ref" : "#/components/schemas/WorkflowTypeOrg"
2627+
}
2628+
}
2629+
}
2630+
},
2631+
"_links" : {
2632+
"$ref" : "#/components/schemas/Links"
2633+
},
2634+
"page" : {
2635+
"$ref" : "#/components/schemas/PageMetadata"
2636+
}
2637+
}
2638+
},
25762639
"DebtPositionWorkflowType" : {
25772640
"type" : "object",
25782641
"properties" : {
@@ -2687,28 +2750,6 @@
26872750
}
26882751
}
26892752
},
2690-
"PagedModelWorkflowTypeOrg" : {
2691-
"type" : "object",
2692-
"properties" : {
2693-
"_embedded" : {
2694-
"type" : "object",
2695-
"properties" : {
2696-
"workflowTypeOrgs" : {
2697-
"type" : "array",
2698-
"items" : {
2699-
"$ref" : "#/components/schemas/WorkflowTypeOrg"
2700-
}
2701-
}
2702-
}
2703-
},
2704-
"_links" : {
2705-
"$ref" : "#/components/schemas/Links"
2706-
},
2707-
"page" : {
2708-
"$ref" : "#/components/schemas/PageMetadata"
2709-
}
2710-
}
2711-
},
27122753
"Link" : {
27132754
"type" : "object",
27142755
"properties" : {

openapi/p4pa-workflow-hub.openapi.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,32 @@ paths:
572572
application/json:
573573
schema:
574574
$ref: '#/components/schemas/WorkflowErrorDTO'
575+
/workflow/send-notification/streams/{streamId}/consume:
576+
get:
577+
tags:
578+
- SendNotification
579+
operationId: consumeSendStream
580+
description: Start send stream events consume process.
581+
parameters:
582+
- name: streamId
583+
in: path
584+
required: true
585+
description: The id of send stream.
586+
schema:
587+
type: string
588+
responses:
589+
'200':
590+
description: "Consume of stream started"
591+
content:
592+
application/json:
593+
schema:
594+
$ref: '#/components/schemas/WorkflowCreatedDTO'
595+
'500':
596+
description: Server Error
597+
content:
598+
application/json:
599+
schema:
600+
$ref: '#/components/schemas/WorkflowErrorDTO'
575601
/workflow/assessments/receipt/{receiptId}:
576602
post:
577603
tags:

src/main/java/it/gov/pagopa/pu/workflow/controller/wf/SendNotificationControllerImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,11 @@ public ResponseEntity<WorkflowCreatedDTO> retrieveNotificationDate(String sendNo
3232
return new ResponseEntity<>(createWorkflowResponseDTO, HttpStatus.OK);
3333
}
3434

35+
@Override
36+
public ResponseEntity<WorkflowCreatedDTO> consumeSendStream(String sendStreamId) {
37+
log.info("Starting stream consuming workflow for sendStreamId: {}", sendStreamId);
38+
WorkflowCreatedDTO createWorkflowResponseDTO = service.sendNotificationStreamConsume(sendStreamId);
39+
return new ResponseEntity<>(createWorkflowResponseDTO, HttpStatus.OK);
40+
}
41+
3542
}

src/main/java/it/gov/pagopa/pu/workflow/service/wf/send/SendNotificationService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@
66
public interface SendNotificationService {
77
WorkflowCreatedDTO sendNotificationProcess(String sendNotificationId);
88
WorkflowCreatedDTO sendNotificationDateRetrieve(String sendNotificationId);
9+
WorkflowCreatedDTO sendNotificationStreamConsume(String sendStreamId);
910
}

src/main/java/it/gov/pagopa/pu/workflow/service/wf/send/SendNotificationServiceImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,9 @@ public WorkflowCreatedDTO sendNotificationProcess(String sendNotificationId) {
2525
public WorkflowCreatedDTO sendNotificationDateRetrieve(String sendNotificationId) {
2626
return sendNotificationWFClient.startSendNotificationDateRetrieve(sendNotificationId);
2727
}
28+
29+
@Override
30+
public WorkflowCreatedDTO sendNotificationStreamConsume(String sendStreamId) {
31+
return sendNotificationWFClient.startSendNotificationStreamConsume(sendStreamId);
32+
}
2833
}

src/main/java/it/gov/pagopa/pu/workflow/utilities/TaskQueueConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ private TaskQueueConstants(){}
3737
public static final String TASK_QUEUE_SEND_RESERVED_NOTIFICATION = "SendNotificationProcessWF";
3838
public static final String TASK_QUEUE_SEND_RESERVED_NOTIFICATION_LOCAL = "SendNotificationProcessWF_LOCAL";
3939

40+
public static final String TASK_QUEUE_SEND_RESERVED_STREAM = "SendNotificationStreamConsumeWF";
41+
public static final String TASK_QUEUE_SEND_RESERVED_STREAM_LOCAL = "SendNotificationStreamConsumeWF_LOCAL";
42+
4043
public static final String TASK_QUEUE_SEND_LOW_PRIORITY = "SendWF";
4144
//endregion
4245

src/main/java/it/gov/pagopa/pu/workflow/wf/pagopa/send/SendNotificationWFClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import it.gov.pagopa.pu.workflow.utilities.TaskQueueConstants;
77
import it.gov.pagopa.pu.workflow.wf.pagopa.send.wfretrievedt.SendNotificationDateRetrieveWF;
88
import it.gov.pagopa.pu.workflow.wf.pagopa.send.wfsendnotification.SendNotificationProcessWF;
9+
import it.gov.pagopa.pu.workflow.wf.pagopa.send.wfsendnotification.SendNotificationStreamConsumeWF;
910
import lombok.extern.slf4j.Slf4j;
1011
import org.springframework.stereotype.Service;
1112

@@ -60,4 +61,17 @@ public void scheduleSendNotificationDateRetrieve(String sendNotificationId, Dura
6061
nextSchedule);
6162
workflowClientService.start(workflow::sendNotificationDateRetrieve, sendNotificationId);
6263
}
64+
65+
public WorkflowCreatedDTO startSendNotificationStreamConsume(String sendStreamId) {
66+
String taskQueue = TaskQueueConstants.TASK_QUEUE_SEND_RESERVED_STREAM;
67+
String workflowId = generateWorkflowId(sendStreamId, SendNotificationStreamConsumeWF.class);
68+
69+
SendNotificationStreamConsumeWF workflow = workflowService.buildWorkflowStubToStartNew(
70+
SendNotificationStreamConsumeWF.class,
71+
taskQueue,
72+
workflowId
73+
);
74+
return workflowClientService.start(workflow::readSendStream, sendStreamId);
75+
}
76+
6377
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package it.gov.pagopa.pu.workflow.wf.pagopa.send.wfsendnotification;
2+
3+
import io.temporal.workflow.WorkflowInterface;
4+
import io.temporal.workflow.WorkflowMethod;
5+
6+
/**
7+
* Workflow interface for managing the stream events consume process.
8+
* <p>
9+
* This workflow coordinates a series of activities for consuming the notification
10+
* events in the SEND stream.
11+
* </p>
12+
* <p>
13+
* The process is designed to handle notification events from SEND stream.
14+
* </p>
15+
* @see <a href=https://pagopa.atlassian.net/wiki/spaces/SPAC/pages/2626388056/Notifica+SEND+New>Confluence page</a>
16+
* */
17+
@WorkflowInterface
18+
public interface SendNotificationStreamConsumeWF {
19+
/**
20+
* Workflow method to handle notification events from stream.
21+
*
22+
* @param sendStreamId the unique identifier of the stream to read from
23+
*/
24+
@WorkflowMethod
25+
void readSendStream(String sendStreamId);
26+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package it.gov.pagopa.pu.workflow.wf.pagopa.send.wfsendnotification;
2+
3+
import io.temporal.spring.boot.WorkflowImpl;
4+
import it.gov.pagopa.pu.workflow.config.temporal.TemporalWFImplementationCustomizer;
5+
import it.gov.pagopa.pu.workflow.utilities.TaskQueueConstants;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.beans.BeansException;
8+
import org.springframework.context.ApplicationContext;
9+
import org.springframework.context.ApplicationContextAware;
10+
11+
@Slf4j
12+
@WorkflowImpl(taskQueues = TaskQueueConstants.TASK_QUEUE_SEND_RESERVED_STREAM)
13+
public class SendNotificationStreamConsumeWFImpl implements SendNotificationStreamConsumeWF, ApplicationContextAware {
14+
15+
/**
16+
* Temporal workflow will not allow to use injection in order to avoid <a href="https://docs.temporal.io/workflows#non-deterministic-change">non-deterministic changes</a> due to dynamic reconfiguration.<BR />
17+
* Anyway it allows to override ActivityOptions, but actually it's not supporting the override based on the particular workflow.<BR />
18+
* In {@link TemporalWFImplementationCustomizer} we are already setting defaults to all workflows.<BR />
19+
* Use this as an example to override based on the particular workflow.
20+
*/
21+
@Override
22+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
23+
//TODO P4ADEV-3720 add implementation after activity pr on same ticket
24+
}
25+
26+
@Override
27+
public void readSendStream(String sendStreamId) {
28+
// TODO P4ADEV-3720 add implementation after activity pr on same ticket
29+
}
30+
31+
}

src/main/resources/application.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,22 @@ spring:
149149
initial-concurrent-task-pollers: "\${WF_SEND_RESERVED_NOTIFICATION_LOCAL_MIN_POLLER_SIZE:1}"
150150
min-concurrent-task-pollers: "\${WF_SEND_RESERVED_NOTIFICATION_LOCAL_MIN_POLLER_SIZE:1}"
151151
max-concurrent-task-pollers: "\${WF_SEND_RESERVED_NOTIFICATION_LOCAL_MAX_POLLER_SIZE:2}"
152+
- task-queue: SendNotificationStreamConsumeWF
153+
capacity:
154+
workflow-task-pollers-configuration:
155+
poller-behavior-autoscaling:
156+
enabled: true
157+
initial-concurrent-task-pollers: "\${WF_SEND_RESERVED_STREAM_MIN_POLLER_SIZE:1}"
158+
min-concurrent-task-pollers: "\${WF_SEND_RESERVED_STREAM_MIN_POLLER_SIZE:1}"
159+
max-concurrent-task-pollers: "\${WF_SEND_RESERVED_STREAM_MAX_POLLER_SIZE:5}"
160+
- task-queue: SendNotificationStreamConsumeWF_LOCAL
161+
capacity:
162+
activity-task-pollers-configuration:
163+
poller-behavior-autoscaling:
164+
enabled: true
165+
initial-concurrent-task-pollers: "\${WF_SEND_RESERVED_STREAM_LOCAL_MIN_POLLER_SIZE:1}"
166+
min-concurrent-task-pollers: "\${WF_SEND_RESERVED_STREAM_LOCAL_MIN_POLLER_SIZE:1}"
167+
max-concurrent-task-pollers: "\${WF_SEND_RESERVED_STREAM_LOCAL_MAX_POLLER_SIZE:5}"
152168
- task-queue: SendWF
153169
capacity:
154170
workflow-task-pollers-configuration:

0 commit comments

Comments
 (0)