Skip to content

Commit c1e1ca7

Browse files
feat: Re-introduce structure fan-out as a toggled feature (#74)
1 parent 6ae2f98 commit c1e1ca7

16 files changed

Lines changed: 1876 additions & 9 deletions

File tree

docs/designs/023-structure-fan-out-feature-toggle/DESIGN.md

Lines changed: 950 additions & 0 deletions
Large diffs are not rendered by default.

sdmx-proxy-config/README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ fields default to `false` when omitted.
2121

2222
### Root: `ProxyConfiguration`
2323

24-
| Field | Required | Description | Available Values | Default |
25-
|------------|:--------:|-------------------------------------------------------------------------|----------------------------------|---------|
26-
| `configs` | Yes | List of registries the proxy can route to | Array of `RegistryConfiguration` | |
27-
| `agencies` | No | Explicit agency-to-registry routing overrides and sub-agency allowances | Array of `AgencyConfiguration` | (empty) |
24+
| Field | Required | Description | Available Values | Default |
25+
|--------------------------|:--------:|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------|---------|
26+
| `configs` | Yes | List of registries the proxy can route to | Array of `RegistryConfiguration` | |
27+
| `agencies` | No | Explicit agency-to-registry routing overrides and sub-agency allowances | Array of `AgencyConfiguration` | (empty) |
28+
| `structureFanOutEnabled` | No | When true, `GET /structure/{type}/*/.../...` is fanned out to every registry that supports the requested structure type and the parsed structures are merged into one response. Comma-separated agency IDs remain rejected with HTTP 501 | `true`, `false` | `false` |
2829

2930
### `RegistryConfiguration`
3031

sdmx-proxy-config/src/main/java/com/epam/sdmxproxy/configuration/data/ProxyConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,12 @@ public class ProxyConfiguration {
1111

1212
private List<AgencyConfiguration> agencies;
1313

14+
/**
15+
* When true, a structure query with {@code agencyId="*"} is fanned out to every
16+
* configured registry that supports the requested structure type, and results
17+
* are merged. When false (default), wildcard agency requests return HTTP 501.
18+
* Comma-separated agency IDs are rejected with 501 in both modes.
19+
*/
20+
private boolean structureFanOutEnabled;
21+
1422
}

sdmx-proxy-config/src/main/resources/sdmx_registries_config.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,5 +164,6 @@
164164
"primaryRegistry": "IMF",
165165
"allowSubAgencies": true
166166
}
167-
]
167+
],
168+
"structureFanOutEnabled": true
168169
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package com.epam.sdmxproxy.e2e.tests;
2+
3+
import com.epam.sdmxproxy.configuration.data.ProxyConfiguration;
4+
import com.epam.sdmxproxy.e2e.support.url.BaseUrlProvider;
5+
import com.epam.sdmxproxy.e2e.support.util.RestClient;
6+
import com.fasterxml.jackson.databind.ObjectMapper;
7+
import io.restassured.response.Response;
8+
import lombok.SneakyThrows;
9+
import lombok.extern.slf4j.Slf4j;
10+
import org.junit.jupiter.api.BeforeAll;
11+
import org.junit.jupiter.api.DisplayName;
12+
import org.junit.jupiter.api.Tag;
13+
import org.junit.jupiter.api.TestInstance;
14+
import org.junit.jupiter.params.ParameterizedTest;
15+
import org.junit.jupiter.params.provider.Arguments;
16+
import org.junit.jupiter.params.provider.MethodSource;
17+
18+
import java.util.List;
19+
import java.util.stream.Stream;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
/**
24+
* Verifies the proxy's structure fan-out contract when
25+
* {@code structureFanOutEnabled=true} is pushed in the proxy configuration:
26+
* a wildcard agency structure query
27+
* ({@code /structure/{type}/*\/*\/*}) is fanned out to every configured
28+
* registry that supports the requested structure type and the parsed
29+
* structures are merged into a single response.
30+
*
31+
* <p>This suite uses a single registry (BIS) for setup simplicity -- fan-out
32+
* across one registry still exercises the parallel-execution and merge code
33+
* paths, just with {@code n=1}. The {@link StructureWildcardE2ETest} sibling
34+
* covers the toggle-off (HTTP 501) contract.
35+
*/
36+
@Slf4j
37+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
38+
@Tag("contract")
39+
class StructureFanOutE2ETest {
40+
41+
private static final String BASE_PATH = "/statgpt/sdmx-proxy/api/v0";
42+
private static final String CONFIG_PATH = BASE_PATH + "/config";
43+
private static final List<String> SDMX_3_0_STRUCTURE_TYPES = List.of(
44+
"datastructure",
45+
"conceptscheme",
46+
"codelist",
47+
"dataflow"
48+
);
49+
private static final List<String> ACCEPT_HEADERS = List.of(
50+
"application/vnd.sdmx.structure+xml;version=2.1",
51+
"application/vnd.sdmx.structure+json;version=2.0.0"
52+
);
53+
54+
private final ObjectMapper objectMapper = new ObjectMapper();
55+
private RestClient restClient;
56+
57+
@BeforeAll
58+
@SneakyThrows
59+
void setUp() {
60+
restClient = new RestClient(BaseUrlProvider.getBaseUrl());
61+
62+
ProxyConfiguration config = objectMapper.readValue(
63+
getClass().getResourceAsStream(
64+
"/com/epam/sdmxproxy/e2e/tests/registry/bis/3_0/bis_3_0_registry_config.json"
65+
).readAllBytes(),
66+
ProxyConfiguration.class
67+
);
68+
config.setStructureFanOutEnabled(true);
69+
restClient.postResponse(CONFIG_PATH, objectMapper.writeValueAsString(config));
70+
}
71+
72+
static Stream<Arguments> fanOutCases() {
73+
return SDMX_3_0_STRUCTURE_TYPES.stream().flatMap(type ->
74+
ACCEPT_HEADERS.stream().map(accept -> Arguments.of(type, accept))
75+
);
76+
}
77+
78+
@ParameterizedTest(name = "type: {0}, accept: {1}")
79+
@DisplayName("Wildcard agency structure query returns merged content when fan-out is enabled")
80+
@MethodSource("fanOutCases")
81+
void wildcardAgencyReturnsMergedResponse(String structureType, String acceptHeader) {
82+
String path = String.format("%s/sdmx/3.0/structure/%s/*/*/*?detail=full", BASE_PATH, structureType);
83+
84+
Response response = restClient.getResponseWithAccept(path, acceptHeader);
85+
86+
assertThat(response.getStatusCode())
87+
.as("Wildcard agency structure query must return HTTP 200 when fan-out is enabled (type=%s)", structureType)
88+
.isEqualTo(200);
89+
90+
// Assert on byte length, not decoded String -- codelist responses are large enough that
91+
// RestAssured's byte->String conversion can OOM the test JVM. The raw bytes are already
92+
// materialized by RestAssured's response-cache, so this adds no memory pressure.
93+
byte[] body = response.getBody().asByteArray();
94+
assertThat(body.length)
95+
.as("Fan-out response body must not be empty (type=%s)", structureType)
96+
.isGreaterThan(0);
97+
}
98+
}

sdmx-proxy/src/main/java/com/epam/sdmxproxy/controller/SdmxStructure30Controller.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,28 @@
33
import com.epam.sdmxproxy.api.SdmxStructure30Api;
44
import com.epam.sdmxproxy.common.data.SdmxMediaType;
55
import com.epam.sdmxproxy.common.data.TranslatedStructureQuery;
6+
import com.epam.sdmxproxy.configuration.data.ProxyConfiguration;
7+
import com.epam.sdmxproxy.registry.configuration.ProxyConfigurationProvider;
68
import com.epam.sdmxproxy.services.adapter.AdapterRouter;
9+
import com.epam.sdmxproxy.services.cache.CacheKeyGenerator;
10+
import com.epam.sdmxproxy.services.cache.CacheService;
711
import com.epam.sdmxproxy.services.translator.QueryTranslator;
812
import jakarta.annotation.Nullable;
913
import lombok.RequiredArgsConstructor;
1014
import lombok.extern.slf4j.Slf4j;
1115
import org.apache.commons.lang3.Strings;
16+
import org.springframework.http.MediaType;
1217
import org.springframework.http.ResponseEntity;
1318
import org.springframework.web.bind.annotation.PathVariable;
1419
import org.springframework.web.bind.annotation.RequestHeader;
1520
import org.springframework.web.bind.annotation.RequestParam;
1621
import org.springframework.web.bind.annotation.RestController;
1722
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
1823

24+
import java.util.List;
25+
import java.util.Optional;
26+
27+
import static com.epam.sdmxproxy.common.data.SdmxMediaType.parseMediaType;
1928
import static com.epam.sdmxproxy.controller.utils.ControllerUtils.ControllerType.STRUCTURE;
2029
import static com.epam.sdmxproxy.controller.utils.ControllerUtils.logRequestUrl;
2130

@@ -24,9 +33,13 @@
2433
@RequiredArgsConstructor
2534
public class SdmxStructure30Controller implements SdmxStructure30Api {
2635

36+
private static final String WILDCARD_AGENCY = "*";
37+
private static final String ACCEPT_HEADER_FALLBACK = SdmxMediaType.STRUCTURE_SDMX_JSON_2_0_0_VALUE;
38+
2739
private final QueryTranslator queryTranslator;
2840
private final AdapterRouter adapterRouter;
29-
private static final String ACCEPT_HEADER_FALLBACK = SdmxMediaType.STRUCTURE_SDMX_JSON_2_0_0_VALUE;
41+
private final ProxyConfigurationProvider configurationProvider;
42+
private final CacheService cacheService;
3043

3144
@Override
3245
public ResponseEntity<StreamingResponseBody> getResources(
@@ -45,6 +58,11 @@ public ResponseEntity<StreamingResponseBody> getResources(
4558
accept = ACCEPT_HEADER_FALLBACK;
4659
}
4760

61+
ProxyConfiguration config = configurationProvider.getConfiguration();
62+
if (WILDCARD_AGENCY.equals(agencyId) && config.isStructureFanOutEnabled()) {
63+
return fanOutResponse(structureType, resourceId, version, references, detail, accept, config);
64+
}
65+
4866
TranslatedStructureQuery structureQuery = queryTranslator.translateStructureQuery(
4967
structureType, agencyId, resourceId, version, references, detail, accept, sourceArtefactUrn
5068
);
@@ -53,4 +71,42 @@ public ResponseEntity<StreamingResponseBody> getResources(
5371
.contentType(structureQuery.getContentType())
5472
.body(adapterRouter.getStructures(structureQuery));
5573
}
74+
75+
private ResponseEntity<StreamingResponseBody> fanOutResponse(
76+
String structureType,
77+
String resourceId,
78+
String version,
79+
@Nullable String references,
80+
String detail,
81+
@Nullable String accept,
82+
ProxyConfiguration config
83+
) {
84+
MediaType contentType = parseMediaType(accept).getMediaType();
85+
int configsHash = config.getConfigs() != null ? config.getConfigs().hashCode() : 0;
86+
String cacheKey = CacheKeyGenerator.generateFanOutResponseKey(
87+
structureType, resourceId, version, references, detail, contentType, configsHash
88+
);
89+
90+
Optional<byte[]> cached = cacheService.getReadyResponse(cacheKey);
91+
if (cached.isPresent()) {
92+
log.debug("Fan-out cache hit: {}", cacheKey);
93+
byte[] body = cached.get();
94+
return ResponseEntity.ok()
95+
.contentType(contentType)
96+
.body(outputStream -> outputStream.write(body));
97+
}
98+
log.debug("Fan-out cache miss: {}", cacheKey);
99+
100+
List<TranslatedStructureQuery> queries = queryTranslator.translateWildcardStructureFanOut(
101+
structureType, resourceId, version, references, detail, accept
102+
);
103+
if (queries.isEmpty()) {
104+
log.debug("Fan-out produced no queries for type {} -- no configured registry supports it", structureType);
105+
return ResponseEntity.ok().contentType(contentType).body(outputStream -> {
106+
});
107+
}
108+
return ResponseEntity.ok()
109+
.contentType(queries.getFirst().getContentType())
110+
.body(adapterRouter.getStructuresWithFanOut(queries, cacheKey));
111+
}
56112
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.epam.sdmxproxy.exception;
2+
3+
import java.util.List;
4+
5+
/**
6+
* Thrown when every registry leg of a structure fan-out request failed.
7+
* Maps to HTTP 503 via the {@link ServiceUnavailableException} family --
8+
* this is an upstream-infrastructure failure, not a client error.
9+
*
10+
* <p>The detail message lists the failed registry names; that is curated
11+
* and safe to echo, so {@link #getClientMessage()} returns it instead of
12+
* the family-default generic message.
13+
*/
14+
public class StructureFanOutException extends ServiceUnavailableException {
15+
16+
private final List<String> failedRegistries;
17+
18+
public StructureFanOutException(String message, List<String> failedRegistries) {
19+
super(message);
20+
this.failedRegistries = List.copyOf(failedRegistries);
21+
}
22+
23+
public List<String> getFailedRegistries() {
24+
return failedRegistries;
25+
}
26+
27+
@Override
28+
public String getClientMessage() {
29+
return getMessage();
30+
}
31+
}

sdmx-proxy/src/main/java/com/epam/sdmxproxy/services/adapter/AdapterRouter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,26 @@
66
import io.sdmx.api.sdmx.model.beans.SdmxBeans;
77
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
88

9+
import java.util.List;
10+
911
public interface AdapterRouter {
1012
StreamingResponseBody getStructures(TranslatedStructureQuery query);
1113

1214
SdmxBeans getSdmxBeans(TranslatedStructureQuery query);
1315

16+
/**
17+
* Fan-out structure request: fetch + parse each per-registry query in parallel, merge the
18+
* resulting {@link SdmxBeans}, and stream the serialized response. The merged byte buffer
19+
* is written to the ready-response cache under {@code responseKey} only if every leg
20+
* succeeded; partial responses are still streamed but not cached.
21+
*
22+
* @param queries non-empty list of per-registry queries; the controller is responsible
23+
* for short-circuiting empty fan-outs
24+
* @param responseKey cache key for the merged response (computed via
25+
* {@link com.epam.sdmxproxy.services.cache.CacheKeyGenerator#generateFanOutResponseKey})
26+
*/
27+
StreamingResponseBody getStructuresWithFanOut(List<TranslatedStructureQuery> queries, String responseKey);
28+
1429
StreamingResponseBody getData(TranslatedDataQuery query);
1530

1631
StreamingResponseBody getAvailability(TranslatedAvailabilityQuery query);

sdmx-proxy/src/main/java/com/epam/sdmxproxy/services/adapter/AdapterRouterImpl.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,13 @@
44
import java.io.ByteArrayOutputStream;
55
import java.io.IOException;
66
import java.io.InputStream;
7+
import java.util.ArrayList;
78
import java.util.Collections;
89
import java.util.List;
910
import java.util.Optional;
11+
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
1014
import java.util.stream.Collectors;
1115

1216
import com.epam.sdmxproxy.common.data.TranslatedAvailabilityQuery;
@@ -22,6 +26,8 @@
2226
import com.epam.sdmxproxy.exception.AvailabilityConversionException;
2327
import com.epam.sdmxproxy.exception.DataConversionException;
2428
import com.epam.sdmxproxy.exception.StructureConversionException;
29+
import com.epam.sdmxproxy.exception.StructureFanOutException;
30+
import com.epam.sdmxproxy.exception.UnexpectedStateException;
2531
import com.epam.sdmxproxy.services.adapter.conversion.StreamingAvailabilityConversionService;
2632
import com.epam.sdmxproxy.services.adapter.conversion.StreamingDataConversionService;
2733
import com.epam.sdmxproxy.services.adapter.conversion.StreamingStructureConversionService;
@@ -224,6 +230,68 @@ private byte[] getStructureBytes(TranslatedStructureQuery query, String structur
224230
}
225231
}
226232

233+
@Override
234+
public StreamingResponseBody getStructuresWithFanOut(List<TranslatedStructureQuery> queries, String responseKey) {
235+
if (queries == null || queries.isEmpty()) {
236+
throw new UnexpectedStateException("getStructuresWithFanOut called with empty queries");
237+
}
238+
MediaType contentType = queries.getFirst().getContentType();
239+
return outputStream -> {
240+
try (ExecutorService executor = Executors.newFixedThreadPool(queries.size())) {
241+
FanOutLegResult legs = runFanOutLegs(queries, executor);
242+
SdmxBeans merged = mergeFanOutResults(legs.successes());
243+
244+
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
245+
streamingStructureConversionService.convert(merged, buffer, contentType);
246+
byte[] bytes = buffer.toByteArray();
247+
outputStream.write(bytes);
248+
249+
if (legs.failedRegistries().isEmpty()) {
250+
cacheService.putReadyResponse(responseKey, bytes);
251+
} else {
252+
log.debug("Fan-out response NOT cached for {} -- {} leg(s) failed: {}", responseKey, legs.failedRegistries().size(), legs.failedRegistries());
253+
}
254+
}
255+
};
256+
}
257+
258+
private FanOutLegResult runFanOutLegs(List<TranslatedStructureQuery> queries, ExecutorService executor) {
259+
List<CompletableFuture<Optional<SdmxBeans>>> futures = queries.stream()
260+
.map(q -> CompletableFuture.supplyAsync(() -> {
261+
try {
262+
return Optional.of(getSdmxBeans(q));
263+
} catch (Exception e) {
264+
log.warn("Fan-out leg failed for registry {}: {}", q.getRegistryConfiguration().getName(), e.getMessage(), e);
265+
return Optional.<SdmxBeans>empty();
266+
}
267+
}, executor))
268+
.toList();
269+
270+
List<SdmxBeans> ok = new ArrayList<>();
271+
List<String> failed = new ArrayList<>();
272+
for (int i = 0; i < futures.size(); i++) {
273+
Optional<SdmxBeans> result = futures.get(i).join();
274+
if (result.isPresent()) {
275+
ok.add(result.get());
276+
} else {
277+
failed.add(queries.get(i).getRegistryConfiguration().getName());
278+
}
279+
}
280+
if (ok.isEmpty()) {
281+
throw new StructureFanOutException("Structure fan-out failed: every registry leg failed (" + String.join(", ", failed) + ")", failed);
282+
}
283+
return new FanOutLegResult(ok, failed);
284+
}
285+
286+
private static SdmxBeans mergeFanOutResults(List<SdmxBeans> partials) {
287+
SdmxBeans merged = new SdmxBeansImpl();
288+
partials.forEach(merged::merge);
289+
return merged;
290+
}
291+
292+
private record FanOutLegResult(List<SdmxBeans> successes, List<String> failedRegistries) {
293+
}
294+
227295
@Override
228296
public StreamingResponseBody getData(TranslatedDataQuery query) {
229297
VersionSpecificRegistryConfiguration versionConfig = query.getVersionConfiguration();

0 commit comments

Comments
 (0)