Skip to content

Commit

Permalink
Merge pull request #1009 from digitalservicebund/delete-after-publish…
Browse files Browse the repository at this point in the history
…-finished

delete-after-publish-finished
  • Loading branch information
malte-laukoetter authored Feb 18, 2025
2 parents 55322fb + 791f2e9 commit f8a1ee9
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import de.bund.digitalservice.ris.norms.utils.XmlMapper;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -51,8 +52,8 @@ public class BucketService
PublishPrivateNormPort,
DeletePublicNormPort,
DeletePrivateNormPort,
DeleteAllPublicNormsPort,
DeleteAllPrivateNormsPort,
DeleteAllPublicDokumentePort,
DeleteAllPrivateDokumentePort,
PublishChangelogsPort {

@Value("${otc.obs.private.bucket-name}")
Expand Down Expand Up @@ -103,13 +104,23 @@ public void deletePublicNorm(DeletePublicNormPort.Command command) throws Bucket
}

@Override
public void deleteAllPublicNorms() {
deleteAllExceptChangelog(publicS3Client, publicBucketName);
public void deleteAllPublicDokumente(DeleteAllPublicDokumentePort.Command command) {
deleteAllDokumenteLastModifiedBefore(
publicS3Client,
publicBucketName,
publicChangelog,
command.lastChangeBefore()
);
}

@Override
public void deleteAllPrivateNorms() {
deleteAllExceptChangelog(privateS3Client, privateBucketName);
public void deleteAllPrivateDokumente(DeleteAllPrivateDokumentePort.Command command) {
deleteAllDokumenteLastModifiedBefore(
privateS3Client,
privateBucketName,
privateChangelog,
command.lastChangeBefore()
);
}

@Override
Expand Down Expand Up @@ -235,7 +246,8 @@ private void deleteFromBucket(
}

/**
* Deletes all objects in the specified S3 bucket, except for the changelog files, which are contained within the "changelogs" folder
* Deletes all Dokumente in the specified S3 bucket, (not the changelog files) which have not been changed since the
* given date.
* The deletion process handles pagination automatically if there are more than 1,000 objects in the bucket.
* <p>
* AWS S3 allows a maximum of 1,000 keys to be processed per delete request. This method retrieves and deletes objects
Expand All @@ -244,10 +256,20 @@ private void deleteFromBucket(
*
* @param s3Client the S3 client used to interact with the S3 service
* @param bucketName the name of the S3 bucket where the objects are located
* @param lastChangeBefore Dokumente that have been changed since this date are ignored
*/
private void deleteAllExceptChangelog(final S3Client s3Client, final String bucketName) {
private void deleteAllDokumenteLastModifiedBefore(
final S3Client s3Client,
final String bucketName,
Changelog changelog,
Instant lastChangeBefore
) {
try {
ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(bucketName).build();
ListObjectsV2Request listRequest = ListObjectsV2Request
.builder()
.bucket(bucketName)
.prefix("eli")
.build();
ListObjectsV2Response listResponse;
int objectsDeleted = 0;
do {
Expand All @@ -256,7 +278,10 @@ private void deleteAllExceptChangelog(final S3Client s3Client, final String buck

for (S3Object s3Object : listResponse.contents()) {
final String key = s3Object.key();
if (!key.startsWith(Changelog.FOLDER + "/")) {
if (
!key.startsWith(Changelog.FOLDER + "/") &&
s3Object.lastModified().isBefore(lastChangeBefore)
) {
objectsToDelete.add(ObjectIdentifier.builder().key(key).build());
}
}
Expand All @@ -268,6 +293,9 @@ private void deleteAllExceptChangelog(final S3Client s3Client, final String buck
.build();
s3Client.deleteObjects(deleteRequest);
objectsDeleted += objectsToDelete.size();
objectsToDelete.forEach(objectIdentifier ->
changelog.addContent(Changelog.DELETED, objectIdentifier.key())
);
}

listRequest =
Expand All @@ -278,7 +306,7 @@ private void deleteAllExceptChangelog(final S3Client s3Client, final String buck
throw new BucketException(
BucketException.Operation.DELETE,
bucketName,
"All norms could not be deleted",
"All dokumente could not be deleted",
e
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.time.Instant;
import java.util.*;
import lombok.Getter;

Expand All @@ -15,7 +15,7 @@
public class Changelog {

public static final String FOLDER = "changelogs";
public static final String FILE_NAME_FORMAT = "changelog-%s.json";
public static final String FILE_NAME_FORMAT = "%s-norms.json";

public static final String CHANGED = "changed";
public static final String DELETED = "deleted";
Expand All @@ -27,7 +27,8 @@ public class Changelog {
private final String fileName;

public Changelog() {
this.fileName = Paths.get(FOLDER, FILE_NAME_FORMAT.formatted(LocalDate.now())).toString();
this.fileName =
Paths.get(FOLDER, FILE_NAME_FORMAT.formatted(Instant.now().toString())).toString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import jakarta.annotation.PostConstruct;
import java.io.*;
import java.nio.file.*;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
Expand Down Expand Up @@ -73,7 +74,11 @@ public ListObjectsV2Response listObjectsV2(ListObjectsV2Request listObjectsV2Req
// If the file is not under 'eli', return only the file name
key = path.getFileName().toString();
}
return S3Object.builder().key(key).build();
return S3Object
.builder()
.key(key)
.lastModified(Instant.ofEpochMilli(path.toFile().lastModified()))
.build();
})
.toList();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package de.bund.digitalservice.ris.norms.application.port.output;

import de.bund.digitalservice.ris.norms.domain.entity.Dokument;
import java.time.Instant;

/**
* Interface representing the output port for deleting multiple {@link Dokument} entities from a storage location designated
* for private data.
*/
public interface DeleteAllPrivateDokumentePort {
/**
* Deletes all {@link Dokument} entities that have not been edited since the given date from a designated private
* storage location.
*
* @param command command for deleting Dokumente
*/
void deleteAllPrivateDokumente(DeleteAllPrivateDokumentePort.Command command);

/**
* Command for deleting dokumente
*
* @param lastChangeBefore Dokumente last edited after the given date are not deleted.
*/
record Command(Instant lastChangeBefore) {}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package de.bund.digitalservice.ris.norms.application.port.output;

import de.bund.digitalservice.ris.norms.domain.entity.Dokument;
import java.time.Instant;

/**
* Interface representing the output port for deleting all {@link Dokument} entities from a storage location designated
* for public data.
*/
public interface DeleteAllPublicDokumentePort {
/**
* Deletes all {@link Dokument} entities that have not been edited since the given date from a designated public
* storage location.
*
* @param command command for deleting Dokumente
*/
void deleteAllPublicDokumente(DeleteAllPublicDokumentePort.Command command);

/**
* Command for deleting dokumente
*
* @param lastChangeBefore Dokumente last edited after the given date are not deleted.
*/
record Command(Instant lastChangeBefore) {}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import de.bund.digitalservice.ris.norms.domain.entity.eli.NormManifestationEli;
import de.bund.digitalservice.ris.norms.utils.NodeParser;
import de.bund.digitalservice.ris.norms.utils.exceptions.StorageException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZoneOffset;
Expand All @@ -33,8 +34,8 @@ public class PublishService implements PublishNormUseCase {
private final DeletePublicNormPort deletePublicNormPort;
private final DeletePrivateNormPort deletePrivateNormPort;
private final LoadLastMigrationLogPort loadLastMigrationLogPort;
private final DeleteAllPublicNormsPort deleteAllPublicNormsPort;
private final DeleteAllPrivateNormsPort deleteAllPrivateNormsPort;
private final DeleteAllPublicDokumentePort deleteAllPublicDokumentePort;
private final DeleteAllPrivateDokumentePort deleteAllPrivateDokumentePort;
private final PublishChangelogsPort publishChangelogsPort;

public PublishService(
Expand All @@ -46,8 +47,8 @@ public PublishService(
DeletePublicNormPort deletePublicNormPort,
DeletePrivateNormPort deletePrivateNormPort,
LoadLastMigrationLogPort loadLastMigrationLogPort,
DeleteAllPublicNormsPort deleteAllPublicNormsPort,
DeleteAllPrivateNormsPort deleteAllPrivateNormsPort,
DeleteAllPublicDokumentePort deleteAllPublicDokumentePort,
DeleteAllPrivateDokumentePort deleteAllPrivateDokumentePort,
PublishChangelogsPort publishChangelogsPort
) {
this.loadNormManifestationElisByPublishStatePort = loadNormManifestationElisByPublishStatePort;
Expand All @@ -58,55 +59,64 @@ public PublishService(
this.deletePublicNormPort = deletePublicNormPort;
this.deletePrivateNormPort = deletePrivateNormPort;
this.loadLastMigrationLogPort = loadLastMigrationLogPort;
this.deleteAllPublicNormsPort = deleteAllPublicNormsPort;
this.deleteAllPrivateNormsPort = deleteAllPrivateNormsPort;
this.deleteAllPublicDokumentePort = deleteAllPublicDokumentePort;
this.deleteAllPrivateDokumentePort = deleteAllPrivateDokumentePort;
this.publishChangelogsPort = publishChangelogsPort;
}

@Override
public void processQueuedFilesForPublish() {
final Instant startOfProcessing = Instant.now();
final LocalDate today = startOfProcessing.atZone(ZoneId.systemDefault()).toLocalDate();

List<NormManifestationEli> manifestationElis =
loadNormManifestationElisByPublishStatePort.loadNormManifestationElisByPublishState(
new LoadNormManifestationElisByPublishStatePort.Command(NormPublishState.QUEUED_FOR_PUBLISH)
);

log.info("Found {} norms that are queued for publishing", manifestationElis.size());

manifestationElis.forEach(manifestationEli -> {
log.info("Processing norm with manifestation eli {}", manifestationEli);
Optional<Norm> norm = loadNormPort.loadNorm(new LoadNormPort.Command(manifestationEli));
norm.ifPresent(this::processNorm);
if (norm.isEmpty()) {
log.error("Norm with manifestation eli {} not found", manifestationEli);
}
});

loadLastMigrationLogPort
.loadLastMigrationLog()
.ifPresent(migrationLog -> {
final LocalDate createdAtDate = migrationLog
.getCreatedAt()
.atZone(ZoneId.systemDefault())
.toLocalDate();
final LocalDate today = LocalDate.now();
final LocalDate yesterday = today.minusDays(1);
if (createdAtDate.equals(today) || createdAtDate.equals(yesterday)) {
log.info(
"Migration log found with timestamp {} (UTC). Deleting all norms in both buckets",
"Migration log found with timestamp {} (UTC) and {} dokumente.",
migrationLog
.getCreatedAt()
.atOffset(ZoneOffset.UTC)
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
migrationLog.getSize()
);
if (migrationLog.getSize() <= 0) {
throw new MigrationJobException();
}
deleteAllPublicNormsPort.deleteAllPublicNorms();
deleteAllPrivateNormsPort.deleteAllPrivateNorms();
log.info("Deleted all norms in both buckets");
log.info("Deleting all old dokumente in both buckets");
deleteAllPublicDokumentePort.deleteAllPublicDokumente(
new DeleteAllPublicDokumentePort.Command(startOfProcessing)
);
deleteAllPrivateDokumentePort.deleteAllPrivateDokumente(
new DeleteAllPrivateDokumentePort.Command(startOfProcessing)
);
log.info("Deleted all dokumente in both buckets");
}
});

List<NormManifestationEli> manifestationElis =
loadNormManifestationElisByPublishStatePort.loadNormManifestationElisByPublishState(
new LoadNormManifestationElisByPublishStatePort.Command(NormPublishState.QUEUED_FOR_PUBLISH)
);

log.info("Found {} norms that are queued for publishing", manifestationElis.size());

manifestationElis.forEach(manifestationEli -> {
log.info("Processing norm with manifestation eli {}", manifestationEli);
Optional<Norm> norm = loadNormPort.loadNorm(new LoadNormPort.Command(manifestationEli));
norm.ifPresent(this::processNorm);
if (norm.isEmpty()) {
log.error("Norm with manifestation eli {} not found", manifestationEli);
}
});
publishChangelogsPort.publishChangelogs(new PublishChangelogsPort.Command(true));
publishChangelogsPort.publishChangelogs(new PublishChangelogsPort.Command(false));
log.info("Publish job successfully completed.");
}

Expand Down Expand Up @@ -167,7 +177,7 @@ private void rollbackPrivatePublish(Norm norm) {
try {
deletePrivateNormPort.deletePrivateNorm(new DeletePrivateNormPort.Command(norm));
log.info(
"Deleted privated norm on rollback strategy: {}",
"Deleted private norm on rollback strategy: {}",
norm.getManifestationEli().toString()
);
} catch (StorageException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.time.LocalDate;
import org.junit.jupiter.api.Test;

class ChangelogTest {
Expand All @@ -14,7 +13,7 @@ void createsChangelogWithGivenDate() {
final Changelog changelog = new Changelog();
assertThat(changelog.getFileName()).isNotEmpty();
assertThat(changelog.getFileName())
.isEqualTo("changelogs/changelog-%s.json".formatted(LocalDate.now().toString()));
.matches("changelogs/\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d+Z-norms\\.json");
}

@Test
Expand Down
Loading

0 comments on commit f8a1ee9

Please sign in to comment.