-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathBlob.java
More file actions
299 lines (269 loc) · 13.5 KB
/
Blob.java
File metadata and controls
299 lines (269 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
/*
* Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT.
*/
package co.novu.utils;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
import co.novu.utils.reactive.ReactiveUtils;
/**
* A utility class for creating data blobs from various input sources that implements {@code HttpRequest.BodyPublisher}.
* <p>
* This class provides convenient factory methods to create blobs from:
* <ul>
* <li>File paths ({@link #from(Path)})</li>
* <li>InputStreams ({@link #from(InputStream)})</li>
* <li>Strings ({@link #from(String)})</li>
* <li>Byte arrays ({@link #from(byte[])})</li>
* <li>ByteBuffers ({@link #from(ByteBuffer)})</li>
* <li>Lists of ByteBuffers ({@link #from(List)})</li>
* <li>Reactive publishers ({@link #from(Flow.Publisher)})</li>
* </ul>
* <p>
* Each blob can be used directly as a {@code HttpRequest.BodyPublisher} since this class implements that interface.
* <p>
* Additionally, this class provides consumption methods for reactive data processing:
* <ul>
* <li>Get the stream as a {@code Flow.Publisher<ByteBuffer>} ({@link #asPublisher()})</li>
* <li>Collect the entire stream into a byte array ({@link #toByteArray()})</li>
* <li>Write the stream directly to a file ({@link #toFile(Path)})</li>
* </ul>
* <p>
* <b>Single-use consumption:</b> When using consumption methods ({@code asPublisher()}, {@code toByteArray()},
* {@code toFile()}), each {@code Blob} instance can only be consumed once. After any consumption method
* is called, the instance is considered consumed and cannot be reused. Any further attempt to use a consumption method
* will result in an {@code IllegalStateException}.
* <p>
* <b>Retry compatibility:</b> Most blob types support HTTP request retries effectively. However, InputStream-backed
* blobs ({@link #from(InputStream)}) do not support retries as the stream gets consumed during the first attempt.
* For retry-compatible scenarios, prefer file-based ({@link #from(Path)}) or in-memory ({@link #from(byte[])}) alternatives.
*/
public class Blob implements HttpRequest.BodyPublisher {
private final Flow.Publisher<ByteBuffer> publisher;
private final long contentLength;
private final AtomicBoolean consumed = new AtomicBoolean(false); // Flag for single-use consumption
/**
* Private constructor that takes a publisher and content length.
*
* @param publisher the underlying publisher
* @param contentLength the content length, or -1 if unknown
*/
private Blob(Flow.Publisher<ByteBuffer> publisher, long contentLength) {
this.publisher = Objects.requireNonNull(publisher, "Publisher cannot be null");
this.contentLength = contentLength;
}
/**
* Creates a {@code Blob} from a file path.
* <p>
* This method uses the Java HTTP client's {@code HttpRequest.BodyPublishers.ofFile()} to create
* a reactive publisher from the file content.
*
* @param path the path to the file to read
* @return a new {@code Blob} instance
* @throws FileNotFoundException if the file does not exist or cannot be read
* @throws NullPointerException if {@code path} is {@code null}
*/
public static Blob from(Path path) throws FileNotFoundException {
Objects.requireNonNull(path, "Path cannot be null");
HttpRequest.BodyPublisher bodyPublisher = HttpRequest.BodyPublishers.ofFile(path);
return new Blob(bodyPublisher, bodyPublisher.contentLength());
}
/**
* Creates a {@code Blob} from an {@code InputStream}.
* <p>
* This method uses {@code HttpRequest.BodyPublishers.ofInputStream()} to create a reactive
* publisher that reads from the InputStream lazily, avoiding blocking I/O operations.
* <p>
* <b>Important:</b> InputStream-backed blobs do not support retries effectively. If the HTTP request
* fails and is retried, the InputStream will have already been consumed during the first attempt,
* causing subsequent retry attempts to send empty request bodies. For retry-compatible scenarios,
* consider using {@link #from(Path)} for file-based data or {@link #from(byte[])} for in-memory data.
*
* @param inputStream the InputStream to read from
* @return a new {@code Blob} instance
* @throws NullPointerException if {@code inputStream} is {@code null}
*/
public static Blob from(InputStream inputStream) {
Objects.requireNonNull(inputStream, "InputStream cannot be null");
HttpRequest.BodyPublisher bodyPublisher = HttpRequest.BodyPublishers.ofInputStream(() -> inputStream);
return new Blob(bodyPublisher, -1); // Unknown length for InputStream
}
/**
* Creates a {@code Blob} from a String using UTF-8 encoding.
*
* @param string the string to convert to a Blob
* @return a new {@code Blob} instance
* @throws NullPointerException if {@code string} is {@code null}
*/
public static Blob from(String string) {
Objects.requireNonNull(string, "String cannot be null");
HttpRequest.BodyPublisher bodyPublisher = HttpRequest.BodyPublishers.ofString(string, StandardCharsets.UTF_8);
return new Blob(bodyPublisher, bodyPublisher.contentLength());
}
/**
* Creates a {@code Blob} from a byte array.
* <p>
* This method uses HttpRequest.BodyPublishers.ofByteArray().
*
* @param data the byte array to wrap as a Blob
* @return a new {@code Blob} instance
* @throws NullPointerException if {@code data} is {@code null}
*/
public static Blob from(byte[] data) {
Objects.requireNonNull(data, "Data cannot be null");
HttpRequest.BodyPublisher bodyPublisher = HttpRequest.BodyPublishers.ofByteArray(data);
return new Blob(bodyPublisher, data.length); // Known length for byte array
}
/**
* Creates a {@code Blob} from a single {@code ByteBuffer}.
*
* @param buffer the ByteBuffer to wrap as a Blob
* @return a new {@code Blob} instance
* @throws NullPointerException if {@code buffer} is {@code null}
*/
public static Blob from(ByteBuffer buffer) {
Objects.requireNonNull(buffer, "ByteBuffer cannot be null");
SubmissionPublisher<ByteBuffer> publisher = new SubmissionPublisher<>();
publisher.submit(buffer.duplicate()); // Use duplicate to avoid modifying original
publisher.close();
return new Blob(publisher, buffer.remaining()); // Known length from buffer
}
/**
* Creates a {@code Blob} from a list of {@code ByteBuffer}s.
*
* @param buffers the list of ByteBuffers to wrap as a Blob
* @return a new {@code Blob} instance
* @throws NullPointerException if {@code buffers} is {@code null}
*/
public static Blob from(List<ByteBuffer> buffers) {
Objects.requireNonNull(buffers, "ByteBuffer list cannot be null");
SubmissionPublisher<ByteBuffer> publisher = new SubmissionPublisher<>();
long totalLength = 0;
for (ByteBuffer buffer : buffers) {
publisher.submit(buffer.duplicate()); // Use duplicate to avoid modifying original
totalLength += buffer.remaining();
}
publisher.close();
return new Blob(publisher, totalLength); // Known length from sum of buffer remainings
}
/**
* Creates a {@code Blob} from a {@code Flow.Publisher<List<ByteBuffer>>}.
* <p>
* This method uses {@code ReactiveUtils.flatten()} to convert the publisher of lists
* into a publisher of individual ByteBuffers.
*
* @param sourcePublisher the publisher that provides data as lists of ByteBuffers
* @return a new {@code Blob} instance
* @throws NullPointerException if {@code sourcePublisher} is {@code null}
*/
public static Blob from(Flow.Publisher<List<ByteBuffer>> sourcePublisher) {
Objects.requireNonNull(sourcePublisher, "Source publisher cannot be null");
Flow.Publisher<ByteBuffer> flattenedPublisher = ReactiveUtils.flatten(sourcePublisher);
return new Blob(flattenedPublisher, -1); // Unknown length for reactive publisher
}
// Consumption methods (single-use)
/**
* Returns a {@code Flow.Publisher<ByteBuffer>} that emits individual {@code ByteBuffer}
* from the underlying stream.
* <p>
* <b>Consumes this instance:</b> After calling this method, this {@code Blob} cannot be used again.
*
* @return a publisher of individual {@code ByteBuffer} items.
* @throws IllegalStateException if this instance has already been consumed.
*/
public Flow.Publisher<ByteBuffer> asPublisher() {
return ensureNotConsumedAndMark();
}
/**
* Collects the entire stream into a single byte array.
* <p>
* <b>Consumes this instance:</b> After calling this method, this {@code Blob} cannot be used again.
* <p>
* The returned {@code CompletableFuture} completes when all data has been received and assembled into the byte array,
* or completes exceptionally if an error occurs.
*
* @return a {@code CompletableFuture} containing the complete byte array.
* @throws IllegalStateException if this instance has already been consumed.
*/
public CompletableFuture<byte[]> toByteArray() {
Flow.Publisher<ByteBuffer> currentPublisher = ensureNotConsumedAndMark();
// Convert Flow.Publisher<ByteBuffer> to Flow.Publisher<List<ByteBuffer>> for BodySubscriber
Flow.Publisher<List<ByteBuffer>> listPublisher = ReactiveUtils.wrapped(currentPublisher);
HttpResponse.BodySubscriber<byte[]> bodySubscriber = HttpResponse.BodySubscribers.ofByteArray();
listPublisher.subscribe(bodySubscriber);
return bodySubscriber.getBody().toCompletableFuture();
}
/**
* Writes the entire stream to the specified file path.
* <p>
* <b>Consumes this instance:</b> After calling this method, this {@code Blob} cannot be used again.
* <p>
* The returned {@code CompletableFuture} completes with the {@code Path} to the written file when all data
* has been successfully written, or completes exceptionally if an error occurs.
*
* @param destinationPath the path where the stream data will be written. If the file exists, it will be truncated.
* @return a {@code CompletableFuture} containing the {@code Path} to the written file.
* @throws NullPointerException if {@code destinationPath} is {@code null}.
* @throws IllegalStateException if this instance has already been consumed.
*/
public CompletableFuture<Path> toFile(Path destinationPath) {
Objects.requireNonNull(destinationPath, "Destination path cannot be null");
Flow.Publisher<ByteBuffer> currentPublisher = ensureNotConsumedAndMark();
// Convert Flow.Publisher<ByteBuffer> to Flow.Publisher<List<ByteBuffer>> for BodySubscriber
Flow.Publisher<List<ByteBuffer>> listPublisher = ReactiveUtils.wrapped(currentPublisher);
HttpResponse.BodySubscriber<Path> bodySubscriber = HttpResponse.BodySubscribers.ofFile(destinationPath);
listPublisher.subscribe(bodySubscriber);
return bodySubscriber.getBody().toCompletableFuture();
}
/**
* Converts the entire stream into an {@code InputStream} for traditional I/O operations.
* <p>
* <b>Consumes this instance:</b> After calling this method, this {@code Blob} cannot be used again.
* <p>
* The returned {@code CompletableFuture} completes with an {@code InputStream} containing all the data
* from the stream when ready, or completes exceptionally if an error occurs. The resulting InputStream
* can be used with traditional Java I/O APIs.
*
* @return a {@code CompletableFuture} containing an {@code InputStream} with the stream data.
* @throws IllegalStateException if this instance has already been consumed.
*/
public CompletableFuture<InputStream> toInputStream() {
Flow.Publisher<ByteBuffer> currentPublisher = ensureNotConsumedAndMark();
// Convert Flow.Publisher<ByteBuffer> to Flow.Publisher<List<ByteBuffer>> for BodySubscriber
Flow.Publisher<List<ByteBuffer>> listPublisher = ReactiveUtils.wrapped(currentPublisher);
HttpResponse.BodySubscriber<InputStream> bodySubscriber = HttpResponse.BodySubscribers.ofInputStream();
listPublisher.subscribe(bodySubscriber);
return bodySubscriber.getBody().toCompletableFuture();
}
/**
* Ensures this instance has not already been consumed, marks it as consumed, and returns the publisher.
*
* @return the {@code Flow.Publisher<ByteBuffer>} to be consumed.
* @throws IllegalStateException if this instance has already been consumed.
*/
private Flow.Publisher<ByteBuffer> ensureNotConsumedAndMark() {
if (!consumed.compareAndSet(false, true)) {
throw new IllegalStateException("This Blob instance has already been consumed and cannot be reused.");
}
return this.publisher;
}
// HttpRequest.BodyPublisher implementation
@Override
public long contentLength() {
return contentLength;
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
publisher.subscribe(subscriber);
}
}