Skip to content

Commit 902371a

Browse files
feat: segmental caching
1 parent e6c22a0 commit 902371a

File tree

4 files changed

+203
-19
lines changed

4 files changed

+203
-19
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>net.swofty</groupId>
88
<artifactId>stockmarkettester</artifactId>
9-
<version>1.1.7</version>
9+
<version>1.1.8</version>
1010

1111
<properties>
1212
<maven.compiler.source>21</maven.compiler.source>

src/main/java/net/swofty/stockmarkettester/data/HistoricalMarketService.java

+17-18
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import net.swofty.stockmarkettester.MarketConfig;
55
import net.swofty.stockmarkettester.orders.HistoricalData;
66
import net.swofty.stockmarkettester.orders.MarketDataPoint;
7+
import net.swofty.stockmarkettester.data.cache.SegmentedHistoricalCache;
78

89
import java.io.IOException;
910
import java.io.ObjectInputStream;
@@ -16,9 +17,9 @@
1617
import java.util.concurrent.*;
1718

1819
public class HistoricalMarketService implements AutoCloseable {
19-
private final Map<String, HistoricalData> historicalCache;
2020
@Getter
2121
private final MarketDataProvider provider;
22+
private final SegmentedHistoricalCache segmentedCache;
2223

2324
private final ExecutorService requestExecutor;
2425
private final ExecutorService fetchExecutor;
@@ -31,11 +32,11 @@ public class HistoricalMarketService implements AutoCloseable {
3132

3233
public HistoricalMarketService(MarketDataProvider provider, int maxRetries, Path cacheDirectory) {
3334
this.provider = provider;
34-
this.historicalCache = new ConcurrentHashMap<>();
3535
this.requestExecutor = Executors.newSingleThreadExecutor();
3636
this.fetchExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
3737
this.maxRetries = maxRetries;
3838
this.cacheDirectory = Optional.ofNullable(cacheDirectory);
39+
this.segmentedCache = cacheDirectory != null ? new SegmentedHistoricalCache(cacheDirectory) : null;
3940

4041
if (this.cacheDirectory.isPresent()) {
4142
try {
@@ -106,11 +107,10 @@ public CompletableFuture<Void> initialize(Set<String> tickers, int previousDays,
106107
boolean success = false;
107108
int attempts = 0;
108109

109-
// Try to load from cache if caching is enabled
110-
if (cacheDirectory.isPresent()) {
111-
HistoricalData cachedData = loadFromCache(ticker, start, end);
112-
if (cachedData != null) {
113-
historicalCache.put(ticker, cachedData);
110+
// Try to load from segmented cache if enabled
111+
if (segmentedCache != null) {
112+
Optional<HistoricalData> cachedData = segmentedCache.get(ticker, start, end);
113+
if (cachedData.isPresent()) {
114114
System.out.println("Loaded cached data for " + ticker);
115115
success = true;
116116
continue;
@@ -123,13 +123,12 @@ public CompletableFuture<Void> initialize(Set<String> tickers, int previousDays,
123123
System.out.println("Fetching historical data for " + ticker);
124124
HistoricalData data = provider.fetchHistoricalData(
125125
Set.of(ticker), start, end, marketConfig).get();
126-
historicalCache.put(ticker, data);
127126

128127
long cooldown = (60 / provider.getRateLimit()) * 1000;
129128

130-
// Save to cache if enabled
131-
if (cacheDirectory.isPresent()) {
132-
saveToCache(ticker, data, start, end);
129+
// Save to segmented cache if enabled
130+
if (segmentedCache != null) {
131+
segmentedCache.put(ticker, start, end, data);
133132
System.out.println("Successfully fetched and cached data for " + ticker);
134133
} else {
135134
System.out.println("Successfully fetched data for " + ticker);
@@ -180,18 +179,18 @@ public CompletableFuture<Map<String, List<MarketDataPoint>>> fetchHistoricalData
180179
for (String ticker : tickers) {
181180
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
182181
try {
183-
HistoricalData cachedData = historicalCache.get(ticker);
184-
if (cachedData == null && cacheDirectory.isPresent()) {
185-
cachedData = loadFromCache(ticker, start, end);
186-
if (cachedData == null) {
187-
throw new IllegalStateException("No cached data for ticker: " + ticker);
182+
HistoricalData cachedData = null;
183+
if (segmentedCache != null) {
184+
Optional<HistoricalData> segmentedData = segmentedCache.get(ticker, start, end);
185+
if (segmentedData.isPresent()) {
186+
cachedData = segmentedData.get();
188187
}
189-
historicalCache.put(ticker, cachedData);
190188
}
189+
191190
if (cachedData != null) {
192191
result.put(ticker, cachedData.getDataPoints(start, end));
193192
} else {
194-
throw new IllegalStateException("No data available for ticker: " + ticker);
193+
throw new IllegalStateException("No cached data available for ticker: " + ticker);
195194
}
196195
} catch (Exception e) {
197196
throw new CompletionException(e);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package net.swofty.stockmarkettester.data;
2+
3+
import net.swofty.stockmarkettester.orders.HistoricalData;
4+
import net.swofty.stockmarkettester.orders.MarketDataPoint;
5+
6+
import java.io.*;
7+
import java.nio.file.Files;
8+
import java.nio.file.Path;
9+
import java.time.LocalDateTime;
10+
import java.time.format.DateTimeFormatter;
11+
import java.util.*;
12+
import java.util.concurrent.ConcurrentHashMap;
13+
import java.util.stream.Collectors;
14+
15+
public class SegmentedHistoricalCache {
16+
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd");
17+
private final Path cacheDirectory;
18+
private final Map<String, TreeMap<LocalDateTime, CacheSegment>> segmentIndex;
19+
20+
private static class CacheSegment implements Serializable {
21+
@Serial
22+
private static final long serialVersionUID = 1L;
23+
private final LocalDateTime start;
24+
private final LocalDateTime end;
25+
private final String ticker;
26+
private final HistoricalData data;
27+
28+
public CacheSegment(String ticker, LocalDateTime start, LocalDateTime end, HistoricalData data) {
29+
this.ticker = ticker;
30+
this.start = start;
31+
this.end = end;
32+
this.data = data;
33+
}
34+
35+
public boolean containsTimeRange(LocalDateTime queryStart, LocalDateTime queryEnd) {
36+
return !start.isAfter(queryStart) && !end.isBefore(queryEnd);
37+
}
38+
39+
public boolean overlaps(LocalDateTime queryStart, LocalDateTime queryEnd) {
40+
return !end.isBefore(queryStart) && !start.isAfter(queryEnd);
41+
}
42+
43+
public HistoricalData getData() {
44+
return data;
45+
}
46+
47+
public LocalDateTime getStart() {
48+
return start;
49+
}
50+
51+
public LocalDateTime getEnd() {
52+
return end;
53+
}
54+
}
55+
56+
public SegmentedHistoricalCache(Path cacheDirectory) {
57+
this.cacheDirectory = cacheDirectory;
58+
this.segmentIndex = new ConcurrentHashMap<>();
59+
initializeFromDisk();
60+
}
61+
62+
private void initializeFromDisk() {
63+
if (!Files.exists(cacheDirectory)) {
64+
try {
65+
Files.createDirectories(cacheDirectory);
66+
} catch (IOException e) {
67+
throw new RuntimeException("Failed to create cache directory", e);
68+
}
69+
return;
70+
}
71+
72+
try {
73+
Files.walk(cacheDirectory)
74+
.filter(Files::isRegularFile)
75+
.filter(p -> p.toString().endsWith(".cache"))
76+
.forEach(this::loadSegment);
77+
} catch (IOException e) {
78+
throw new RuntimeException("Failed to initialize cache from disk", e);
79+
}
80+
}
81+
82+
private void loadSegment(Path path) {
83+
try (ObjectInputStream ois = new ObjectInputStream(Files.newInputStream(path))) {
84+
CacheSegment segment = (CacheSegment) ois.readObject();
85+
addToIndex(segment);
86+
} catch (IOException | ClassNotFoundException e) {
87+
System.err.println("Failed to load cache segment: " + path);
88+
try {
89+
Files.delete(path);
90+
} catch (IOException ignored) {}
91+
}
92+
}
93+
94+
private void addToIndex(CacheSegment segment) {
95+
segmentIndex.computeIfAbsent(segment.ticker, k -> new TreeMap<>())
96+
.put(segment.start, segment);
97+
}
98+
99+
public Optional<HistoricalData> get(String ticker, LocalDateTime start, LocalDateTime end) {
100+
TreeMap<LocalDateTime, CacheSegment> segments = segmentIndex.get(ticker);
101+
if (segments == null) return Optional.empty();
102+
103+
// First try to find a single segment that contains the entire range
104+
for (CacheSegment segment : segments.values()) {
105+
if (segment.containsTimeRange(start, end)) {
106+
return Optional.of(segment.getData());
107+
}
108+
}
109+
110+
// If no single segment contains the range, try to merge overlapping segments
111+
List<CacheSegment> overlappingSegments = segments.values().stream()
112+
.filter(s -> s.overlaps(start, end))
113+
.sorted(Comparator.comparing(CacheSegment::getStart))
114+
.collect(Collectors.toList());
115+
116+
if (overlappingSegments.isEmpty()) return Optional.empty();
117+
118+
// Check if segments form a continuous range
119+
LocalDateTime currentEnd = overlappingSegments.get(0).getStart();
120+
for (CacheSegment segment : overlappingSegments) {
121+
if (segment.getStart().isAfter(currentEnd)) {
122+
return Optional.empty(); // Gap in the data
123+
}
124+
currentEnd = segment.getEnd();
125+
}
126+
127+
if (currentEnd.isBefore(end)) return Optional.empty();
128+
129+
// Merge the segments
130+
HistoricalData mergedData = new HistoricalData(ticker);
131+
for (CacheSegment segment : overlappingSegments) {
132+
List<MarketDataPoint> points = segment.getData().getDataPoints(start, end);
133+
points.forEach(mergedData::addDataPoint);
134+
}
135+
136+
return Optional.of(mergedData);
137+
}
138+
139+
public void put(String ticker, LocalDateTime start, LocalDateTime end, HistoricalData data) {
140+
CacheSegment segment = new CacheSegment(ticker, start, end, data);
141+
addToIndex(segment);
142+
saveSegment(segment);
143+
}
144+
145+
private void saveSegment(CacheSegment segment) {
146+
Path path = getSegmentPath(segment);
147+
try (ObjectOutputStream oos = new ObjectOutputStream(Files.newOutputStream(path))) {
148+
oos.writeObject(segment);
149+
} catch (IOException e) {
150+
System.err.println("Failed to save cache segment: " + e.getMessage());
151+
}
152+
}
153+
154+
private Path getSegmentPath(CacheSegment segment) {
155+
String filename = String.format("%s_%s_to_%s.cache",
156+
segment.ticker,
157+
segment.start.format(DATE_FORMAT),
158+
segment.end.format(DATE_FORMAT));
159+
return cacheDirectory.resolve(filename);
160+
}
161+
162+
public void clearCache() {
163+
try {
164+
Files.walk(cacheDirectory)
165+
.filter(Files::isRegularFile)
166+
.forEach(file -> {
167+
try {
168+
Files.delete(file);
169+
} catch (IOException e) {
170+
System.err.println("Failed to delete cache file: " + file);
171+
}
172+
});
173+
segmentIndex.clear();
174+
} catch (IOException e) {
175+
throw new RuntimeException("Failed to clear cache", e);
176+
}
177+
}
178+
}

src/main/java/net/swofty/stockmarkettester/fetchers/AlphaVantageFetcher.java

+7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.fasterxml.jackson.databind.JsonNode;
66
import com.fasterxml.jackson.databind.ObjectMapper;
77
import com.fasterxml.jackson.databind.SerializationFeature;
8+
import net.swofty.stockmarkettester.exceptions.MarketDataException;
89

910
import java.io.*;
1011
import java.net.URI;
@@ -19,6 +20,7 @@
1920
import java.util.concurrent.CompletableFuture;
2021
import java.util.concurrent.ConcurrentHashMap;
2122
import java.time.Duration;
23+
import java.util.logging.Logger;
2224

2325
public class AlphaVantageFetcher {
2426
private static String apiKey;
@@ -343,6 +345,11 @@ public CompletableFuture<List<NewsSentiment>> getNewsSentiments(String ticker) {
343345
JsonNode root = mapper.readTree(response.body());
344346
List<NewsSentiment> sentiments = new ArrayList<>();
345347

348+
if (!root.has("feed")) {
349+
System.out.println(root);
350+
throw new MarketDataException("No data found in response", null);
351+
}
352+
346353
JsonNode feed = root.get("feed");
347354
for (JsonNode article : feed) {
348355
List<TickerSentiment> tickerSentiments = new ArrayList<>();

0 commit comments

Comments
 (0)