Skip to content

Commit 8abfeec

Browse files
committed
Adopt Azure sdk for Objectstore
1 parent d064584 commit 8abfeec

File tree

13 files changed

+676
-89
lines changed

13 files changed

+676
-89
lines changed

multiapps-controller-persistence/pom.xml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,12 @@
9595
<artifactId>aws-s3</artifactId>
9696
</dependency>
9797
<dependency>
98-
<groupId>org.apache.jclouds.provider</groupId>
99-
<artifactId>azureblob</artifactId>
98+
<groupId>com.azure</groupId>
99+
<artifactId>azure-storage-blob</artifactId>
100+
</dependency>
101+
<dependency>
102+
<groupId>com.azure</groupId>
103+
<artifactId>azure-core-http-okhttp</artifactId>
100104
</dependency>
101105
<dependency>
102106
<groupId>org.apache.jclouds</groupId>

multiapps-controller-persistence/src/main/java/module-info.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
requires transitive org.cloudfoundry.multiapps.controller.api;
2929

3030
requires aliyun.sdk.oss;
31+
requires com.azure.core;
32+
requires com.azure.core.http.okhttp;
33+
requires com.azure.storage.blob;
3134
requires com.fasterxml.jackson.annotation;
3235
requires com.fasterxml.jackson.databind;
3336
requires com.google.auth;

multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/Messages.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public final class Messages {
5454

5555
// ERROR log messages:
5656
public static final String UPLOAD_STREAM_FAILED_TO_CLOSE = "Cannot close file upload stream";
57+
public static final String CANNOT_PARSE_CONTAINER_URI_OF_OBJECT_STORE = "Cannot parse container_uri of object store";
5758

5859
// WARN log messages:
5960
public static final String COULD_NOT_CLOSE_RESULT_SET = "Could not close result set.";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
package org.cloudfoundry.multiapps.controller.persistence.services;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.net.MalformedURLException;
6+
import java.net.URL;
7+
import java.time.LocalDateTime;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.Set;
12+
import java.util.function.Predicate;
13+
import java.util.stream.Collectors;
14+
15+
import com.azure.core.http.HttpClient;
16+
import com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder;
17+
import com.azure.core.http.policy.ExponentialBackoffOptions;
18+
import com.azure.core.http.policy.RetryOptions;
19+
import com.azure.storage.blob.BlobClient;
20+
import com.azure.storage.blob.BlobContainerClient;
21+
import com.azure.storage.blob.BlobServiceClient;
22+
import com.azure.storage.blob.BlobServiceClientBuilder;
23+
import com.azure.storage.blob.models.BlobItem;
24+
import com.azure.storage.blob.models.BlobListDetails;
25+
import com.azure.storage.blob.models.BlobRange;
26+
import com.azure.storage.blob.models.BlobStorageException;
27+
import com.azure.storage.blob.models.ListBlobsOptions;
28+
import com.azure.storage.blob.options.BlobParallelUploadOptions;
29+
import org.cloudfoundry.multiapps.controller.persistence.Messages;
30+
import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry;
31+
import org.cloudfoundry.multiapps.controller.persistence.util.ObjectStoreConstants;
32+
import org.cloudfoundry.multiapps.controller.persistence.util.ObjectStoreFilter;
33+
import org.cloudfoundry.multiapps.controller.persistence.util.ObjectStoreMapper;
34+
35+
public class AzureObjectStoreFileStorage implements FileStorage {
36+
37+
private static final String SAS_TOKEN = "sas_token";
38+
private static final String CONTAINER_NAME = "container_name";
39+
private static final String CONTAINER_URI = "container_uri";
40+
private final HttpClient httpClient;
41+
private final BlobContainerClient containerClient;
42+
43+
public AzureObjectStoreFileStorage(Map<String, Object> credentials) {
44+
this.containerClient = createContainerClient(credentials);
45+
this.httpClient = new OkHttpAsyncHttpClientBuilder().build();
46+
}
47+
48+
@Override
49+
public void addFile(FileEntry fileEntry, InputStream content) throws FileStorageException {
50+
BlobClient blobClient = containerClient.getBlobClient(fileEntry.getId());
51+
try {
52+
BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(content);
53+
blobParallelUploadOptions.setMetadata(ObjectStoreMapper.createFileEntryMetadata(fileEntry));
54+
55+
blobClient.uploadWithResponse(blobParallelUploadOptions, ObjectStoreConstants.OBJECT_STORE_TOTAL_TIMEOUT_CONFIG_IN_MINUTES,
56+
null);
57+
} catch (BlobStorageException e) {
58+
throw new FileStorageException(e);
59+
}
60+
}
61+
62+
@Override
63+
public List<FileEntry> getFileEntriesWithoutContent(List<FileEntry> fileEntries) throws FileStorageException {
64+
Set<String> existingFiles = getAllEntriesNames();
65+
return fileEntries.stream()
66+
.filter(fileEntry -> !existingFiles.contains(fileEntry.getId()))
67+
.toList();
68+
}
69+
70+
@Override
71+
public void deleteFile(String id, String space) throws FileStorageException {
72+
BlobClient blobClient = containerClient.getBlobClient(id);
73+
try {
74+
blobClient.deleteIfExists();
75+
} catch (BlobStorageException e) {
76+
throw new FileStorageException(e);
77+
}
78+
}
79+
80+
@Override
81+
public void deleteFilesBySpaceIds(List<String> spaceIds) throws FileStorageException {
82+
removeBlobsByFilter(blob -> ObjectStoreFilter.filterBySpaceIds(blob.getMetadata(), spaceIds));
83+
}
84+
85+
@Override
86+
public void deleteFilesBySpaceAndNamespace(String space, String namespace) {
87+
removeBlobsByFilter(blob -> ObjectStoreFilter.filterBySpaceAndNamespace(blob.getMetadata(), space, namespace));
88+
}
89+
90+
@Override
91+
public int deleteFilesModifiedBefore(LocalDateTime modificationTime) throws FileStorageException {
92+
return removeBlobsByFilter(
93+
blob -> ObjectStoreFilter.filterByModificationTime(blob.getMetadata(), blob.getName(), modificationTime));
94+
}
95+
96+
@Override
97+
public <T> T processFileContent(String space, String id, FileContentProcessor<T> fileContentProcessor) throws FileStorageException {
98+
FileEntry fileEntry = ObjectStoreMapper.createFileEntry(space, id);
99+
try (InputStream inputStream = openBlobInputStream(fileEntry)) {
100+
return fileContentProcessor.process(inputStream);
101+
} catch (Exception e) {
102+
throw new FileStorageException(e);
103+
}
104+
}
105+
106+
private InputStream openBlobInputStream(FileEntry fileEntry) throws FileStorageException {
107+
BlobClient blobClient = containerClient.getBlobClient(fileEntry.getId());
108+
try {
109+
return blobClient.openInputStream();
110+
} catch (BlobStorageException e) {
111+
throw new FileStorageException(e);
112+
}
113+
}
114+
115+
@Override
116+
public InputStream openInputStream(String space, String id) throws FileStorageException {
117+
FileEntry fileEntry = ObjectStoreMapper.createFileEntry(space, id);
118+
return openBlobInputStream(fileEntry);
119+
}
120+
121+
@Override
122+
public void testConnection() {
123+
containerClient.getBlobClient("test");
124+
}
125+
126+
@Override
127+
public void deleteFilesByIds(List<String> fileIds) throws FileStorageException {
128+
removeBlobsByFilter(blob -> fileIds.contains(blob.getName()));
129+
}
130+
131+
@Override
132+
public <T> T processArchiveEntryContent(FileContentToProcess fileContentToProcess, FileContentProcessor<T> fileContentProcessor)
133+
throws FileStorageException {
134+
FileEntry fileEntry = ObjectStoreMapper.createFileEntry(fileContentToProcess.getSpaceGuid(), fileContentToProcess.getGuid());
135+
BlobClient blobClient = containerClient.getBlobClient(fileEntry.getId());
136+
long contentSize = fileContentToProcess.getEndOffset() - fileContentToProcess.getStartOffset();
137+
BlobRange blobRange = new BlobRange(fileContentToProcess.getStartOffset(), contentSize);
138+
139+
try {
140+
return fileContentProcessor.process(blobClient.openInputStream(blobRange, null));
141+
} catch (IOException e) {
142+
throw new FileStorageException(e);
143+
}
144+
}
145+
146+
protected BlobContainerClient createContainerClient(Map<String, Object> credentials) {
147+
BlobServiceClient serviceClient = new BlobServiceClientBuilder().endpoint(getContainerUriEndpoint(credentials))
148+
.retryOptions(createRetryOptions())
149+
.httpClient(httpClient)
150+
.sasToken((String) credentials.get(SAS_TOKEN))
151+
.buildClient();
152+
153+
return serviceClient.getBlobContainerClient((String) credentials.get(CONTAINER_NAME));
154+
}
155+
156+
public String getContainerUriEndpoint(Map<String, Object> credentials) {
157+
if (!credentials.containsKey(CONTAINER_URI)) {
158+
return null;
159+
}
160+
try {
161+
URL containerUri = new URL((String) credentials.get(CONTAINER_URI));
162+
return new URL(containerUri.getProtocol(), containerUri.getHost(), containerUri.getPort(), "").toString();
163+
} catch (MalformedURLException e) {
164+
throw new IllegalStateException(Messages.CANNOT_PARSE_CONTAINER_URI_OF_OBJECT_STORE, e);
165+
}
166+
}
167+
168+
private RetryOptions createRetryOptions() {
169+
ExponentialBackoffOptions exponentialBackoffOptions = new ExponentialBackoffOptions().setBaseDelay(
170+
ObjectStoreConstants.OBJECT_STORE_INITIAL_RETRY_DELAY_CONFIG_IN_MILLIS)
171+
.setMaxDelay(
172+
ObjectStoreConstants.OBJECT_STORE_MAX_RETRY_DELAY_CONFIG_IN_SECONDS)
173+
.setMaxRetries(
174+
ObjectStoreConstants.OBJECT_STORE_MAX_ATTEMPTS_CONFIG);
175+
176+
return new RetryOptions(exponentialBackoffOptions);
177+
}
178+
179+
private int removeBlobsByFilter(Predicate<? super BlobItem> filter) {
180+
Set<String> blobNames = getEntryNames(filter);
181+
List<Boolean> deletedBlobsResults = new ArrayList<>();
182+
183+
if (blobNames.isEmpty()) {
184+
return 0;
185+
}
186+
for (String blobName : blobNames) {
187+
BlobClient blobClient = containerClient.getBlobClient(blobName);
188+
deletedBlobsResults.add(blobClient.deleteIfExists());
189+
}
190+
191+
deletedBlobsResults.removeIf(Boolean.FALSE::equals);
192+
193+
return deletedBlobsResults.size();
194+
}
195+
196+
protected Set<String> getEntryNames(Predicate<? super BlobItem> filter) {
197+
ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
198+
BlobListDetails blobListDetails = new BlobListDetails();
199+
blobListDetails.setRetrieveMetadata(true);
200+
listBlobsOptions.setDetails(blobListDetails);
201+
202+
return containerClient.listBlobs(listBlobsOptions, ObjectStoreConstants.OBJECT_STORE_TOTAL_TIMEOUT_CONFIG_IN_MINUTES)
203+
.stream()
204+
.filter(filter)
205+
.map(BlobItem::getName)
206+
.collect(Collectors.toSet());
207+
}
208+
209+
public Set<String> getAllEntriesNames() {
210+
return containerClient.listBlobs()
211+
.stream()
212+
.map(BlobItem::getName)
213+
.collect(Collectors.toSet());
214+
}
215+
}

multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/services/GcpObjectStoreFileStorage.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import java.io.InputStream;
66
import java.nio.channels.Channels;
77
import java.text.MessageFormat;
8-
import java.time.Duration;
98
import java.time.LocalDateTime;
109
import java.util.ArrayList;
1110
import java.util.Base64;
@@ -28,6 +27,7 @@
2827
import com.google.cloud.storage.StorageRetryStrategy;
2928
import org.cloudfoundry.multiapps.controller.persistence.Messages;
3029
import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry;
30+
import org.cloudfoundry.multiapps.controller.persistence.util.ObjectStoreConstants;
3131
import org.cloudfoundry.multiapps.controller.persistence.util.ObjectStoreFilter;
3232
import org.cloudfoundry.multiapps.controller.persistence.util.ObjectStoreMapper;
3333
import org.springframework.http.MediaType;
@@ -36,13 +36,8 @@ public class GcpObjectStoreFileStorage implements FileStorage {
3636

3737
private final String bucketName;
3838
private final Storage storage;
39-
private static final String BUCKET = "bucket";
40-
private static final int OBJECT_STORE_MAX_ATTEMPTS_CONFIG = 6;
41-
private static final double OBJECT_STORE_RETRY_DELAY_MULTIPLIER_CONFIG = 2.0;
42-
private static final Duration OBJECT_STORE_TOTAL_TIMEOUT_CONFIG_IN_MINUTES = Duration.ofMinutes(10);
43-
private static final Duration OBJECT_STORE_MAX_RETRY_DELAY_CONFIG_IN_SECONDS = Duration.ofSeconds(10);
44-
private static final Duration OBJECT_STORE_INITIAL_RETRY_DELAY_CONFIG_IN_MILLIS = Duration.ofMillis(250);
4539
private static final String BASE_64_ENCODED_PRIVATE_KEY_DATA = "base64EncodedPrivateKeyData";
40+
private static final String BUCKET = "bucket";
4641

4742
public GcpObjectStoreFileStorage(Map<String, Object> credentials) {
4843
this.bucketName = (String) credentials.get(BUCKET);
@@ -55,11 +50,12 @@ protected Storage createObjectStoreStorage(Map<String, Object> credentials) {
5550
.setStorageRetryStrategy(StorageRetryStrategy.getUniformStorageRetryStrategy())
5651
.setRetrySettings(
5752
RetrySettings.newBuilder()
58-
.setMaxAttempts(OBJECT_STORE_MAX_ATTEMPTS_CONFIG)
59-
.setTotalTimeoutDuration(OBJECT_STORE_TOTAL_TIMEOUT_CONFIG_IN_MINUTES)
60-
.setMaxRetryDelayDuration(OBJECT_STORE_MAX_RETRY_DELAY_CONFIG_IN_SECONDS)
61-
.setInitialRetryDelayDuration(OBJECT_STORE_INITIAL_RETRY_DELAY_CONFIG_IN_MILLIS)
62-
.setRetryDelayMultiplier(OBJECT_STORE_RETRY_DELAY_MULTIPLIER_CONFIG)
53+
.setMaxAttempts(ObjectStoreConstants.OBJECT_STORE_MAX_ATTEMPTS_CONFIG)
54+
.setTotalTimeoutDuration(ObjectStoreConstants.OBJECT_STORE_TOTAL_TIMEOUT_CONFIG_IN_MINUTES)
55+
.setMaxRetryDelayDuration(ObjectStoreConstants.OBJECT_STORE_MAX_RETRY_DELAY_CONFIG_IN_SECONDS)
56+
.setInitialRetryDelayDuration(
57+
ObjectStoreConstants.OBJECT_STORE_INITIAL_RETRY_DELAY_CONFIG_IN_MILLIS)
58+
.setRetryDelayMultiplier(ObjectStoreConstants.OBJECT_STORE_RETRY_DELAY_MULTIPLIER_CONFIG)
6359
.build())
6460
.build()
6561
.getService();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.cloudfoundry.multiapps.controller.persistence.util;
2+
3+
import java.time.Duration;
4+
5+
public class ObjectStoreConstants {
6+
7+
private ObjectStoreConstants() {
8+
}
9+
10+
public static final int OBJECT_STORE_MAX_ATTEMPTS_CONFIG = 6;
11+
public static final double OBJECT_STORE_RETRY_DELAY_MULTIPLIER_CONFIG = 2.0;
12+
public static final Duration OBJECT_STORE_TOTAL_TIMEOUT_CONFIG_IN_MINUTES = Duration.ofMinutes(10);
13+
public static final Duration OBJECT_STORE_MAX_RETRY_DELAY_CONFIG_IN_SECONDS = Duration.ofSeconds(10);
14+
public static final Duration OBJECT_STORE_INITIAL_RETRY_DELAY_CONFIG_IN_MILLIS = Duration.ofMillis(250);
15+
}

0 commit comments

Comments
 (0)