Skip to content
Merged
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@
<artifactId>janino</artifactId>
</dependency>

<dependency>
<groupId>com.azure.resourcemanager</groupId>
<artifactId>azure-resourcemanager-datafactory</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>

<!--TEST-->
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
import it.gov.pagopa.idpay.transactions.dto.ReportDTO;
import it.gov.pagopa.idpay.transactions.dto.ReportListDTO;
import it.gov.pagopa.idpay.transactions.dto.ReportRequest;
import it.gov.pagopa.idpay.transactions.dto.report.Report2RunDto;
import it.gov.pagopa.idpay.transactions.dto.report.ReportGenerateForce;
import jakarta.validation.Valid;
import org.springframework.data.domain.Pageable;
import org.springframework.data.web.PageableDefault;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;

import java.util.List;

@RequestMapping("/idpay/merchant/portal")
public interface ReportController {

Expand All @@ -27,6 +31,9 @@ Mono<ReportDTO> generateReport(@RequestHeader("x-merchant-id") String merchantId
@PathVariable("initiativeId") String initiativeId,
@RequestBody @Valid ReportRequest request);

@PostMapping("/reports/transaction/force")
Mono<List<Report2RunDto>> forceGenerateReports(@RequestBody ReportGenerateForce reportGenerateForce);


@PatchMapping("/initiatives/{initiativeId}/reports/{reportId}")
Mono<ReportDTO> patchReport(@PathVariable("initiativeId") String initiativeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import it.gov.pagopa.idpay.transactions.dto.ReportDTO;
import it.gov.pagopa.idpay.transactions.dto.ReportListDTO;
import it.gov.pagopa.idpay.transactions.dto.ReportRequest;
import it.gov.pagopa.idpay.transactions.dto.report.Report2RunDto;
import it.gov.pagopa.idpay.transactions.dto.report.ReportGenerateForce;
import it.gov.pagopa.idpay.transactions.service.ReportService;
import it.gov.pagopa.idpay.transactions.dto.mapper.ReportMapper;
import it.gov.pagopa.idpay.transactions.utils.Utilities;
Expand All @@ -12,6 +14,8 @@
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.List;

@RestController
@Slf4j
public class ReportControllerImpl implements ReportController {
Expand Down Expand Up @@ -55,4 +59,9 @@ public Mono<ReportDTO> patchReport(String initiativeId,
) {
return reportService.patchReport(initiativeId, reportId, request);
}

@Override
public Mono<List<Report2RunDto>> forceGenerateReports(ReportGenerateForce reportGenerateForce) {
return reportService.forceGenerateReports(reportGenerateForce);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package it.gov.pagopa.idpay.transactions.data.factory;

import com.azure.core.management.AzureEnvironment;
import com.azure.core.management.profile.AzureProfile;
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.azure.resourcemanager.datafactory.DataFactoryManager;


@Configuration
public class DataFactoryManagerConfig {
private final String tenantId;
private final String subscriptionId;

public DataFactoryManagerConfig(@Value("${app.data-factory.tenant-id}") String tenantId,
@Value("${app.data-factory.subscription-id}") String subscriptionId) {
this.tenantId = tenantId;
this.subscriptionId = subscriptionId;
}

@Bean
public DataFactoryManager dataFactoryManager () {
DefaultAzureCredential azureCredentials = new DefaultAzureCredentialBuilder().build();
AzureProfile azureProfile = new AzureProfile(tenantId, subscriptionId, AzureEnvironment.AZURE);
return DataFactoryManager.authenticate(azureCredentials, azureProfile);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package it.gov.pagopa.idpay.transactions.data.factory;

import it.gov.pagopa.idpay.transactions.model.Report;
import reactor.core.publisher.Mono;

public interface DataFactoryService {
Mono<String> triggerTransactionReportPipeline(Report report);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package it.gov.pagopa.idpay.transactions.data.factory;

import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.resourcemanager.datafactory.models.CreateRunResponse;
import it.gov.pagopa.idpay.transactions.exception.AzureConnectingErrorException;
import it.gov.pagopa.idpay.transactions.model.Report;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.azure.resourcemanager.datafactory.DataFactoryManager;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

@Service
@Slf4j
public class DataFactoryServiceImpl implements DataFactoryService{
private final DataFactoryManager dataFactoryManager;
private final String resourceGroup;
private final String factoryName;
private final String pipelineName;
private final int maxRetries;

public DataFactoryServiceImpl(DataFactoryManager dataFactoryManager,
@Value("${app.data-factory.resource-group}") String resourceGroup,
@Value("${app.data-factory.factory-name}") String factoryName,
@Value("${app.data-factory.pipeline-name}") String pipelineName,
@Value("${app.data-factory.max-retries}") int maxRetries) {
this.dataFactoryManager = dataFactoryManager;
this.resourceGroup = resourceGroup;
this.factoryName = factoryName;
this.pipelineName = pipelineName;
this.maxRetries = maxRetries;
}

@Override
public Mono<String> triggerTransactionReportPipeline(Report report) {
Mono<String> callMono = Mono.fromCallable(() -> {
log.info("[CALLING_DATA_FACTORY] Starting pipeline execution for Report {}", report.getId());
Response<CreateRunResponse> resp = dataFactoryManager.pipelines().createRunWithResponse(
resourceGroup,
factoryName,
pipelineName,
null,
false,
null,
false,
createPipelineParameters(report),
Context.NONE);

int status = resp.getStatusCode();
if (status < 200 || status >= 300) {
throw new IllegalStateException("ADF createRun failed. HTTP status: " + status);
}

CreateRunResponse body = resp.getValue();
if (body == null) {
throw new IllegalStateException("ADF createRun returned empty body");
}
log.info("[CALLING_DATA_FACTORY] Report {} generation request sent successfully. Run ID: {}", report.getId(), body.runId());
return body.runId();
})
.subscribeOn(Schedulers.boundedElastic());

return callMono
.retryWhen(Retry.fixedDelay(maxRetries, Duration.ofSeconds(1))
.onRetryExhaustedThrow((spec, signal) ->
new AzureConnectingErrorException(
"Failed to trigger ADF pipeline after " + (maxRetries + 1) + " attempts",
signal.failure()
)
)
);
}

private Map<String, Object> createPipelineParameters(Report report) {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("reportId", report.getId());
parameters.put("merchantId", report.getMerchantId());
parameters.put("initiativeId", report.getInitiativeId());
parameters.put("startDate", report.getStartPeriod());
parameters.put("endDate", report.getEndPeriod());
parameters.put("reportName", report.getFileName());

return parameters;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package it.gov.pagopa.idpay.transactions.dto.report;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Report2RunDto {
String reportId;
String runId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package it.gov.pagopa.idpay.transactions.dto.report;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ReportGenerateForce {
List<String> reportsId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package it.gov.pagopa.idpay.transactions.exception;

public class AzureConnectingErrorException extends RuntimeException{

public AzureConnectingErrorException(String message, Throwable failure) {
super(message, failure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import it.gov.pagopa.idpay.transactions.dto.PatchReportRequest;
import it.gov.pagopa.idpay.transactions.dto.ReportDTO;
import it.gov.pagopa.idpay.transactions.dto.ReportRequest;
import it.gov.pagopa.idpay.transactions.dto.report.Report2RunDto;
import it.gov.pagopa.idpay.transactions.dto.report.ReportGenerateForce;
import it.gov.pagopa.idpay.transactions.model.Report;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import reactor.core.publisher.Mono;

import java.util.List;

public interface ReportService {
Mono<Page<Report>> getTransactionsReports(String merchantId, String organizationRole, String initiativeId, Pageable pageable);

Expand All @@ -19,4 +23,6 @@ Mono<ReportDTO> generateReport(String merchantId,
Mono<ReportDTO> patchReport(String initiativeId,
String reportId,
PatchReportRequest request);

Mono<List<Report2RunDto>> forceGenerateReports(ReportGenerateForce reportGenerateForce);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

import it.gov.pagopa.common.web.exception.ClientExceptionWithBody;
import it.gov.pagopa.idpay.transactions.dto.PatchReportRequest;
import it.gov.pagopa.idpay.transactions.data.factory.DataFactoryService;
import it.gov.pagopa.idpay.transactions.dto.ReportDTO;
import it.gov.pagopa.idpay.transactions.dto.ReportRequest;
import it.gov.pagopa.idpay.transactions.dto.mapper.ReportMapper;
import it.gov.pagopa.idpay.transactions.dto.report.Report2RunDto;
import it.gov.pagopa.idpay.transactions.dto.report.ReportGenerateForce;
import it.gov.pagopa.idpay.transactions.enums.ReportStatus;
import it.gov.pagopa.idpay.transactions.enums.ReportType;
import it.gov.pagopa.idpay.transactions.enums.RewardBatchAssignee;
import it.gov.pagopa.idpay.transactions.exception.AzureConnectingErrorException;
import it.gov.pagopa.idpay.transactions.model.Report;
import it.gov.pagopa.idpay.transactions.repository.ReportRepository;
import it.gov.pagopa.idpay.transactions.utils.Utilities;
Expand All @@ -23,7 +27,6 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;

import static it.gov.pagopa.idpay.transactions.utils.ExceptionConstants.ExceptionCode.*;
import static it.gov.pagopa.idpay.transactions.utils.ExceptionConstants.ExceptionMessage.*;
Expand All @@ -39,10 +42,13 @@ public class ReportServiceImpl implements ReportService {

private final ReportMapper reportMapper;

public ReportServiceImpl(ReportRepository reportRepository, MerchantRestClient merchantRestClient, ReportMapper reportMapper) {
private final DataFactoryService dataFactoryService;

public ReportServiceImpl(ReportRepository reportRepository, MerchantRestClient merchantRestClient, ReportMapper reportMapper, DataFactoryService dataFactoryService) {
this.reportRepository = reportRepository;
this.merchantRestClient = merchantRestClient;
this.reportMapper = reportMapper;
this.dataFactoryService = dataFactoryService;
}

private static final List<String> ALLOWED_ROLES = List.of(
Expand Down Expand Up @@ -157,6 +163,14 @@ public Mono<ReportDTO> generateMerchantTransactionsReport(String merchantId,

return reportRepository.save(reportEntity);
})
.flatMap(report ->
triggerTransactionReportPipeline(report)
.thenReturn(report)
.onErrorResume(AzureConnectingErrorException.class, ex -> {
report.setReportStatus(ReportStatus.FAILED);
return reportRepository.save(report);
})
)
.map(reportMapper::toDTO)
.doOnSuccess(saved -> log.info("[GENERATE_REPORT] Saved report {} for merchant {}",
saved.getFileName(), Utilities.sanitizeString(merchantId)));
Expand Down Expand Up @@ -186,11 +200,29 @@ public Mono<ReportDTO> patchReport(String initiativeId,
if (request.getReportStatus() != null) {
report.setReportStatus(request.getReportStatus());
}
if(ReportStatus.GENERATED.equals(request.getReportStatus())){
report.setElaborationDate(LocalDateTime.now());
}

return reportRepository.save(report);
})
.map(reportMapper::toDTO);
}


@Override
public Mono<List<Report2RunDto>> forceGenerateReports(ReportGenerateForce reportGenerateForce) {
log.info("[RUN_GENERATE_REPORT] Request generate report {}", Utilities.sanitizeString(String.valueOf(reportGenerateForce.getReportsId())));
return reportRepository.findAllById(reportGenerateForce.getReportsId())
.flatMap(this::triggerTransactionReportPipeline)
.collectList();
}

private Mono<Report2RunDto> triggerTransactionReportPipeline(Report report) {
return dataFactoryService.triggerTransactionReportPipeline(report)
.map(runId ->
Report2RunDto.builder()
.reportId(report.getId())
.runId(runId).build());
}
}
9 changes: 8 additions & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,11 @@ app:
delete:
paginationSize: ${DELETE_PAGINATION_SIZE:45}
delayTime: ${DELETE_DELAY_TIME:1000}
sampling: ${SAMPLING_HASH_SEED:15121984}
sampling: ${SAMPLING_HASH_SEED:15121984}
data-factory:
resource-group: ${DATA_FACTORY_RESOURCE_GROUP:}
factory-name: ${DATA_FACTORY_NAME:}
pipeline-name: ${DATA_FACTORY_PIPELINE_NAME:idpay_transaction_report}
max-retries: ${DATA_FACTORY_RUN_PIPELINE_MAX_RETRIES:3}
tenant-id: ${DATA_FACTORY_TENANT_ID:}
subscription-id: ${DATA_FACTORY_SUBSCRIPTION_ID:}
Loading
Loading