1515import io .quarkus .runtime .Startup ;
1616import io .quarkus .runtime .configuration .ConfigUtils ;
1717import io .smallrye .mutiny .Multi ;
18+ import io .smallrye .mutiny .Uni ;
19+ import io .smallrye .mutiny .subscription .MultiEmitter ;
20+ import it .pagopa .selfcare .onboarding .common .PartyRole ;
21+ import it .pagopa .selfcare .product .entity .ProductRoleInfo ;
22+ import it .pagopa .selfcare .product .service .ProductService ;
1823import it .pagopa .selfcare .user .UserUtils ;
1924import it .pagopa .selfcare .user .client .EventHubFdRestClient ;
2025import it .pagopa .selfcare .user .client .EventHubRestClient ;
26+ import it .pagopa .selfcare .user .event .client .InternalDelegationApiClient ;
27+ import it .pagopa .selfcare .user .event .client .InternalUserApiClient ;
28+ import it .pagopa .selfcare .user .event .client .InternalUserGroupApiClient ;
2129import it .pagopa .selfcare .user .event .entity .UserInstitution ;
2230import it .pagopa .selfcare .user .event .mapper .NotificationMapper ;
2331import it .pagopa .selfcare .user .event .repository .UserInstitutionRepository ;
2836import it .pagopa .selfcare .user .model .constants .OnboardedProductState ;
2937import jakarta .enterprise .context .ApplicationScoped ;
3038import jakarta .inject .Inject ;
39+ import jakarta .ws .rs .core .Response ;
3140import lombok .extern .slf4j .Slf4j ;
3241import org .bson .BsonDocument ;
3342import org .bson .conversions .Bson ;
3443import org .eclipse .microprofile .config .inject .ConfigProperty ;
3544import org .eclipse .microprofile .rest .client .inject .RestClient ;
3645import org .jboss .resteasy .reactive .ClientWebApplicationException ;
46+ import org .openapi .quarkus .internal_json .model .AddMembersToUserGroupDto ;
47+ import org .openapi .quarkus .internal_json .model .AddUserRoleDto ;
48+ import org .openapi .quarkus .internal_json .model .DelegationResponse ;
49+ import org .openapi .quarkus .internal_json .model .Product ;
3750import org .openapi .quarkus .user_registry_json .api .UserApi ;
3851
3952import java .time .Duration ;
@@ -67,7 +80,7 @@ public class UserInstitutionCdcService {
6780
6881
6982 private final TelemetryClient telemetryClient ;
70-
83+ private final ProductService productService ;
7184 private final TableClient tableClient ;
7285 private final String mongodbDatabase ;
7386 private final ReactiveMongoClient mongoClient ;
@@ -78,11 +91,26 @@ public class UserInstitutionCdcService {
7891 private final Integer maxRetry ;
7992 private final boolean sendEventsEnabled ;
8093 private final boolean sendFdEventsEnabled ;
94+ private final boolean addOnAggregatesEnabled ;
95+ private final List <String > addOnAggregatesGroupProducts ;
8196
8297
8398 @ RestClient
8499 @ Inject
85100 UserApi userRegistryApi ;
101+
102+ @ RestClient
103+ @ Inject
104+ InternalDelegationApiClient delegationApi ;
105+
106+ @ RestClient
107+ @ Inject
108+ InternalUserApiClient userApi ;
109+
110+ @ RestClient
111+ @ Inject
112+ InternalUserGroupApiClient userGroupApi ;
113+
86114 @ RestClient
87115 @ Inject
88116 EventHubRestClient eventHubRestClient ;
@@ -101,8 +129,10 @@ public UserInstitutionCdcService(ReactiveMongoClient mongoClient,
101129 @ ConfigProperty (name = "user-cdc.retry" ) Integer maxRetry ,
102130 @ ConfigProperty (name = "user-cdc.send-events.watch.enabled" ) Boolean sendEventsEnabled ,
103131 @ ConfigProperty (name = "user-cdc.send-events-fd.watch.enabled" ) Boolean sendFdEventsEnabled ,
132+ @ ConfigProperty (name = "user-cdc.add-on-aggregates.watch.enabled" ) Boolean addOnAggregatesEnabled ,
133+ @ ConfigProperty (name = "user-cdc.add-on-aggregates.group.products" ) List <String > addOnAggregatesGroupProducts ,
104134 UserInstitutionRepository userInstitutionRepository ,
105- TelemetryClient telemetryClient ,
135+ TelemetryClient telemetryClient , ProductService productService ,
106136 TableClient tableClient , NotificationMapper notificationMapper ) {
107137 this .mongoClient = mongoClient ;
108138 this .mongodbDatabase = mongodbDatabase ;
@@ -111,10 +141,13 @@ public UserInstitutionCdcService(ReactiveMongoClient mongoClient,
111141 this .retryMaxBackOff = retryMaxBackOff ;
112142 this .retryMinBackOff = retryMinBackOff ;
113143 this .telemetryClient = telemetryClient ;
144+ this .productService = productService ;
114145 this .tableClient = tableClient ;
115146 this .notificationMapper = notificationMapper ;
116147 this .sendEventsEnabled = sendEventsEnabled ;
117148 this .sendFdEventsEnabled = sendFdEventsEnabled ;
149+ this .addOnAggregatesEnabled = addOnAggregatesEnabled ;
150+ this .addOnAggregatesGroupProducts = addOnAggregatesGroupProducts ;
118151 telemetryClient .getContext ().getOperation ().setName (OPERATION_NAME );
119152 initOrderStream ();
120153 }
@@ -174,17 +207,21 @@ public void propagateDocumentToConsumers(ChangeStreamDocument<UserInstitution> d
174207
175208 boolean userMailIsChanged = isUserMailChanged (userInstitutionChanged );
176209
177- if (Boolean . FALSE . equals ( userMailIsChanged ) ) {
210+ if (! userMailIsChanged ) {
178211 consumerUserInstitutionRepositoryEvent (document );
179212 }
180213
181- if (Boolean . TRUE . equals ( sendEventsEnabled ) ) {
214+ if (sendEventsEnabled ) {
182215 consumerToSendScUserEvent (document );
183216 }
184217
185- if (Boolean . TRUE . equals ( sendFdEventsEnabled ) && hasFdProduct ) {
218+ if (sendFdEventsEnabled && hasFdProduct ) {
186219 consumerToSendUserEventForFD (document , userMailIsChanged );
187220 }
221+
222+ if (addOnAggregatesEnabled ) {
223+ consumerToAddOnAggregates (userInstitutionChanged );
224+ }
188225 }
189226
190227 private boolean isUserMailChanged (UserInstitution userInstitutionChanged ) {
@@ -341,4 +378,139 @@ private TrackEventInput toTrackEventInputByUserInstitution(UserInstitution userI
341378 .institutionId (userInstitution .getInstitutionId ())
342379 .build ();
343380 }
381+
382+ public void consumerToAddOnAggregates (UserInstitution userInstitutionChanged ) {
383+ log .info ("Starting consumerToAddOnAggregates on UserInstitution with id {}" , userInstitutionChanged .getId ());
384+ userInstitutionChanged .getProducts ().stream ()
385+ .filter (p -> p .getStatus () == OnboardedProductState .ACTIVE && Boolean .TRUE .equals (p .getToAddOnAggregates ()))
386+ .forEach (p -> {
387+ final String parentInstitutionId = userInstitutionChanged .getInstitutionId ();
388+ final String userId = userInstitutionChanged .getUserId ();
389+ final String userMailUuid = userInstitutionChanged .getUserMailUuid ();
390+ getDelegations (parentInstitutionId , p .getProductId ())
391+ .onItem ().transformToUniAndConcatenate (d ->
392+ createUserOnAggregate (d .getInstitutionId (), d .getInstitutionName (), d .getInstitutionType (), parentInstitutionId , userId , userMailUuid , p )
393+ .onItem ().transformToUni (r ->
394+ addUserToAggregatorGroup (d .getInstitutionId (), parentInstitutionId , p .getProductId (), userId )
395+ .onFailure ().recoverWithNull ()
396+ )
397+ .onFailure ().recoverWithNull ()
398+ )
399+ .subscribe ().with (
400+ r -> {},
401+ t -> log .error ("Error in consumerToAddOnAggregates on UserInstitution with id {}" , userInstitutionChanged .getId (), t ),
402+ () -> log .info ("Completed consumerToAddOnAggregates on UserInstitution with id {}" , userInstitutionChanged .getId ())
403+ );
404+ });
405+ }
406+
407+ private Multi <DelegationResponse > getDelegations (String parentInstitutionId , String productId ) {
408+ log .info ("Getting delegations toAddOnAggregates of parentInstitutionId {} with productId {}" , parentInstitutionId , productId );
409+ return Multi .createFrom ().emitter (emitter ->
410+ fetchDelegationPages (parentInstitutionId , productId , 0 , emitter ));
411+ }
412+
413+ private void fetchDelegationPages (String parentInstitutionId , String productId , int page ,
414+ MultiEmitter <? super DelegationResponse > emitter ) {
415+ delegationApi .getDelegationsUsingGET2 (null , parentInstitutionId , productId , null , null , null , page , null )
416+ .onFailure ().retry ().withBackOff (Duration .ofSeconds (retryMinBackOff ), Duration .ofSeconds (retryMaxBackOff )).atMost (maxRetry )
417+ .subscribe ().with (r -> {
418+ r .getDelegations ().stream ()
419+ .filter (d -> DelegationResponse .StatusEnum .ACTIVE .equals (d .getStatus ())
420+ && DelegationResponse .TypeEnum .EA .equals (d .getType ()))
421+ .forEach (d -> {
422+ log .info ("Found ACTIVE EA delegation with id {} (from: {}, to: {}, product: {})" , d .getId (), d .getInstitutionId (), d .getBrokerId (), d .getProductId ());
423+ emitter .emit (d );
424+ });
425+ if (r .getPageInfo ().getPageNo () < (r .getPageInfo ().getTotalPages () - 1 )) {
426+ final int nextPage = page + 1 ;
427+ log .debug ("Switching to page {} to get more delegations for parentInstitutionId {} and productId {}" , nextPage , parentInstitutionId , productId );
428+ fetchDelegationPages (parentInstitutionId , productId , nextPage , emitter );
429+ } else {
430+ log .debug ("No more delegation pages for parentInstitutionId {} and productId {}" , parentInstitutionId , productId );
431+ emitter .complete ();
432+ }
433+ }, t -> {
434+ log .error ("Error fetching delegations page {} for parentInstitutionId {} and productId {}: {}" , page , parentInstitutionId , productId , t .getMessage ());
435+ emitter .fail (t );
436+ });
437+ }
438+
439+ private Uni <Response > createUserOnAggregate (String institutionId , String institutionDescription , String institutionType , String parentInstitutionId ,
440+ String userId , String userMailUuid , OnboardedProduct product ) {
441+ log .info ("Creating user {} to institution {} and product {} from parentInstitutionId {}" , userId , institutionId , product .getProductId (), parentInstitutionId );
442+ final AddUserRoleDto addUserRoleDto = AddUserRoleDto .builder ()
443+ .institutionId (institutionId )
444+ .institutionDescription (institutionDescription )
445+ .product (Product .builder ()
446+ .role (getRoleToPropagate (product .getRole ()))
447+ .productId (product .getProductId ())
448+ .productRoles (getProductRolesToPropagate (product .getRole (), product .getProductRole (), product .getProductId (), institutionType ))
449+ .tokenId (product .getTokenId ())
450+ .build ())
451+ .hasToSendEmail (false )
452+ .userMailUuid (userMailUuid )
453+ .build ();
454+ return userApi .createUserByUserId (userId , addUserRoleDto )
455+ .onFailure ().retry ().withBackOff (Duration .ofSeconds (retryMinBackOff ), Duration .ofSeconds (retryMaxBackOff )).atMost (maxRetry )
456+ .onFailure ().invoke (t -> log .error ("Error creating user {} on aggregate {} for parentInstitutionId {}: {}" , userId , institutionId , parentInstitutionId , t .getMessage ()))
457+ .onItem ().invoke (r -> log .info ("Created user on aggregate {} for parentInstitutionId {} - status {}" , institutionId , parentInstitutionId , r .getStatus ()));
458+ }
459+
460+ /**
461+ * Determines the role to assign to the new user on the aggregate.
462+ * If the parent's role is OPERATOR, returns OPERATOR.
463+ * For any other role (e.g., MANAGER, DELEGATE, etc.), returns ADMIN_EA.
464+ *
465+ * @param parentRole the role of the user on the aggregator
466+ * @return the role to assign to the new user on the aggregate
467+ */
468+ private String getRoleToPropagate (PartyRole parentRole ) {
469+ return PartyRole .OPERATOR .equals (parentRole ) ? PartyRole .OPERATOR .name () : PartyRole .ADMIN_EA .name ();
470+ }
471+
472+ /**
473+ * Determines the productRole to assign to the new user on the aggregate.
474+ * If the parentRole is OPERATOR, we simply propagate the same parentProductRole, otherwise
475+ * we retrieve the productRole associated to the ADMIN_EA role for the product using the ProductService.
476+ *
477+ * @param parentRole the role of the user on the aggregator
478+ * @param parentProductRole the productRole of the user on the aggregator
479+ * @param productId the productId of the user on the aggregator
480+ * @param targetInstitutionType the institutionType of the aggregate
481+ * @return the productRole to assign to the new user on the aggregate
482+ */
483+ private List <String > getProductRolesToPropagate (PartyRole parentRole , String parentProductRole , String productId , String targetInstitutionType ) {
484+ if (PartyRole .OPERATOR .equals (parentRole )) {
485+ return List .of (parentProductRole );
486+ } else {
487+ final it .pagopa .selfcare .product .entity .Product product = productService .getProduct (productId );
488+ final Map <PartyRole , ProductRoleInfo > roleMappings = product .getRoleMappings (targetInstitutionType );
489+ return Optional .ofNullable (roleMappings .get (PartyRole .ADMIN_EA ))
490+ .map (pri -> pri .getRoles ().stream ().findFirst ()
491+ .map (r -> List .of (r .getCode ()))
492+ .orElse (List .of ()))
493+ .orElse (List .of ());
494+ }
495+ }
496+
497+ private Uni <Response > addUserToAggregatorGroup (String institutionId , String parentInstitutionId , String productId , String userId ) {
498+ if (!addOnAggregatesGroupProducts .contains (productId )) {
499+ log .info ("Skipping adding user {} to group of institution {}, parentInstitutionId {} and product {}" , userId , institutionId , parentInstitutionId , productId );
500+ return Uni .createFrom ().item (Response .status (Response .Status .OK ).build ());
501+ }
502+
503+ log .info ("Adding user {} to group of institution {}, parentInstitutionId {} and product {}" , userId , institutionId , parentInstitutionId , productId );
504+ final AddMembersToUserGroupDto addMembersToUserGroupDto = AddMembersToUserGroupDto .builder ()
505+ .institutionId (institutionId )
506+ .parentInstitutionId (parentInstitutionId )
507+ .productId (productId )
508+ .members (Set .of (UUID .fromString (userId )))
509+ .build ();
510+ return userGroupApi .addMembersToUserGroupWithParentInstitutionIdUsingPUT (addMembersToUserGroupDto )
511+ .onFailure ().retry ().withBackOff (Duration .ofSeconds (retryMinBackOff ), Duration .ofSeconds (retryMaxBackOff )).atMost (maxRetry )
512+ .onFailure ().invoke (t -> log .error ("Error adding user {} to group of institution {} with parentInstitutionId {}: {}" , userId , institutionId , parentInstitutionId , t .getMessage ()))
513+ .onItem ().invoke (r -> log .info ("Added user {} on group of institution {} with parentInstitutionId {} - status {}" , userId , institutionId , parentInstitutionId , r .getStatus ()));
514+ }
515+
344516}
0 commit comments