@@ -19,7 +19,10 @@ public class HistoricalMarketService implements AutoCloseable {
19
19
private final Map <String , HistoricalData > historicalCache ;
20
20
@ Getter
21
21
private final MarketDataProvider provider ;
22
+
22
23
private final ExecutorService requestExecutor ;
24
+ private final ExecutorService fetchExecutor ;
25
+
23
26
private final int maxRetries ;
24
27
private volatile boolean isInitialized = false ;
25
28
private final Object initLock = new Object ();
@@ -30,6 +33,7 @@ public HistoricalMarketService(MarketDataProvider provider, int maxRetries, Path
30
33
this .provider = provider ;
31
34
this .historicalCache = new ConcurrentHashMap <>();
32
35
this .requestExecutor = Executors .newSingleThreadExecutor ();
36
+ this .fetchExecutor = Executors .newFixedThreadPool (Runtime .getRuntime ().availableProcessors ());
33
37
this .maxRetries = maxRetries ;
34
38
this .cacheDirectory = Optional .ofNullable (cacheDirectory );
35
39
@@ -166,28 +170,39 @@ public CompletableFuture<Map<String, List<MarketDataPoint>>> fetchHistoricalData
166
170
LocalDateTime start ,
167
171
LocalDateTime end ) {
168
172
169
- return CompletableFuture .supplyAsync (() -> {
170
- if (!isInitialized ) {
171
- throw new IllegalStateException ("HistoricalMarketService not initialized" );
172
- }
173
-
174
- Map <String , List <MarketDataPoint >> result = new ConcurrentHashMap <>();
173
+ if (!isInitialized ) {
174
+ throw new IllegalStateException ("HistoricalMarketService not initialized" );
175
+ }
175
176
176
- for (String ticker : tickers ) {
177
- HistoricalData cachedData = historicalCache .get (ticker );
178
- if (cachedData == null && cacheDirectory .isPresent ()) {
179
- // Try loading from file cache as fallback if caching is enabled
180
- cachedData = loadFromCache (ticker , start , end );
181
- if (cachedData == null ) {
182
- throw new IllegalStateException ("No cached data for ticker: " + ticker );
177
+ Map <String , List <MarketDataPoint >> result = new ConcurrentHashMap <>();
178
+ List <CompletableFuture <Void >> futures = new ArrayList <>();
179
+
180
+ for (String ticker : tickers ) {
181
+ CompletableFuture <Void > future = CompletableFuture .runAsync (() -> {
182
+ try {
183
+ HistoricalData cachedData = historicalCache .get (ticker );
184
+ if (cachedData == null && cacheDirectory .isPresent ()) {
185
+ cachedData = loadFromCache (ticker , start , end );
186
+ if (cachedData == null ) {
187
+ throw new IllegalStateException ("No cached data for ticker: " + ticker );
188
+ }
189
+ historicalCache .put (ticker , cachedData );
183
190
}
184
- historicalCache .put (ticker , cachedData );
191
+ if (cachedData != null ) {
192
+ result .put (ticker , cachedData .getDataPoints (start , end ));
193
+ } else {
194
+ throw new IllegalStateException ("No data available for ticker: " + ticker );
195
+ }
196
+ } catch (Exception e ) {
197
+ throw new CompletionException (e );
185
198
}
186
- result .put (ticker , cachedData .getDataPoints (start , end ));
187
- }
199
+ }, fetchExecutor );
188
200
189
- return result ;
190
- });
201
+ futures .add (future );
202
+ }
203
+
204
+ return CompletableFuture .allOf (futures .toArray (new CompletableFuture [0 ]))
205
+ .thenApply (v -> result );
191
206
}
192
207
193
208
public void clearCache () {
@@ -211,12 +226,17 @@ public void clearCache() {
211
226
@ Override
212
227
public void close () {
213
228
requestExecutor .shutdown ();
229
+ fetchExecutor .shutdown ();
214
230
try {
215
231
if (!requestExecutor .awaitTermination (30 , TimeUnit .SECONDS )) {
216
232
requestExecutor .shutdownNow ();
217
233
}
234
+ if (!fetchExecutor .awaitTermination (30 , TimeUnit .SECONDS )) {
235
+ fetchExecutor .shutdownNow ();
236
+ }
218
237
} catch (InterruptedException e ) {
219
238
requestExecutor .shutdownNow ();
239
+ fetchExecutor .shutdownNow ();
220
240
Thread .currentThread ().interrupt ();
221
241
}
222
242
}
0 commit comments