Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@

package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.implementation.HttpConstants;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.Locale;

import static com.azure.cosmos.implementation.Utils.getUTF8BytesOrNull;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -47,4 +49,52 @@ public void headerNamesAreCaseInsensitive() {
assertThat(sp.getHeaderValue("kEy2")).isEqualTo("value2");
assertThat(sp.getHeaderValue("KEY3")).isEqualTo("value3");
}

@Test(groups = { "unit" })
public void withRemappedStatusCode_updatesStatusAndCharge() {
HashMap<String, String> headerMap = new HashMap<>();
headerMap.put(HttpConstants.HttpHeaders.REQUEST_CHARGE.toLowerCase(Locale.ROOT), "5.0");
headerMap.put("x-ms-activity-id", "test-activity");

StoreResponse original = new StoreResponse("endpoint", 200, headerMap, null, 0);
StoreResponse remapped = original.withRemappedStatusCode(201, 2.0);

assertThat(remapped.getStatus()).isEqualTo(201);
assertThat(remapped.getRequestCharge()).isEqualTo(7.0);
assertThat(remapped.getHeaderValue("x-ms-activity-id")).isEqualTo("test-activity");

// original is unmodified
assertThat(original.getStatus()).isEqualTo(200);
assertThat(original.getRequestCharge()).isEqualTo(5.0);
}

@Test(groups = { "unit" })
public void withRemappedStatusCode_doesNotInsertChargeWhenAbsent() {
HashMap<String, String> headerMap = new HashMap<>();
headerMap.put("x-ms-activity-id", "test-activity");

StoreResponse original = new StoreResponse("endpoint", 200, headerMap, null, 0);
StoreResponse remapped = original.withRemappedStatusCode(304, 1.0);

assertThat(remapped.getStatus()).isEqualTo(304);
assertThat(remapped.getHeaderValue(HttpConstants.HttpHeaders.REQUEST_CHARGE)).isNull();
}

@Test(groups = { "unit" })
public void getResponseHeaders_isUnmodifiable() {
HashMap<String, String> headerMap = new HashMap<>();
headerMap.put("key1", "value1");

StoreResponse sp = new StoreResponse("endpoint", 200, headerMap, null, 0);

try {
sp.getResponseHeaders().put("key2", "value2");
assertThat(false).as("Expected UnsupportedOperationException").isTrue();
} catch (UnsupportedOperationException e) {
// expected
}

// internal state is not affected
assertThat(sp.getHeaderValue("key2")).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.assertj.core.api.Condition;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -39,7 +38,7 @@ public Builder hasHeader(String headerKey) {
validators.add(new StoreResponseValidator() {
@Override
public void validate(StoreResponse resp) {
assertThat(Arrays.asList(resp.getResponseHeaderNames()).contains(headerKey)).isTrue();
assertThat(resp.getHeaderValue(headerKey)).isNotNull();
}
});
return this;
Expand All @@ -49,9 +48,7 @@ public Builder withHeader(String headerKey, String headerValue) {
validators.add(new StoreResponseValidator() {
@Override
public void validate(StoreResponse resp) {
assertThat(Arrays.asList(resp.getResponseHeaderNames())).asList().contains(headerKey);
int index = Arrays.asList(resp.getResponseHeaderNames()).indexOf(headerKey);
assertThat(resp.getResponseHeaderValues()[index]).isEqualTo(headerValue);
assertThat(resp.getHeaderValue(headerKey)).isEqualTo(headerValue);
}
});
return this;
Expand All @@ -62,9 +59,8 @@ public Builder withHeaderValueCondition(String headerKey, Condition<String> cond
validators.add(new StoreResponseValidator() {
@Override
public void validate(StoreResponse resp) {
assertThat(Arrays.asList(resp.getResponseHeaderNames())).asList().contains(headerKey);
int index = Arrays.asList(resp.getResponseHeaderNames()).indexOf(headerKey);
String value = resp.getResponseHeaderValues()[index];
String value = resp.getHeaderValue(headerKey);
assertThat(value).isNotNull();
condition.matches(value);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,14 @@ public class RxDocumentServiceResponse {
private CosmosDiagnostics cosmosDiagnostics;

public RxDocumentServiceResponse(DiagnosticsClientContext diagnosticsClientContext, StoreResponse response) {
String[] headerNames = response.getResponseHeaderNames();
String[] headerValues = response.getResponseHeaderValues();

this.headersMap = new HashMap<>(headerNames.length);
// StoreResponse.getResponseHeaders() returns an unmodifiable view over lowercase-keyed map.
// Copy it for RxDocumentServiceResponse's own mutable use (callers may put into headersMap).
this.headersMap = new HashMap<>(response.getResponseHeaders());

// Gets status code.
this.statusCode = response.getStatus();

// Extracts headers.
for (int i = 0; i < headerNames.length; i++) {
this.headersMap.put(headerNames[i], headerValues[i]);
}

this.storeResponse = response;
this.diagnosticsClientContext = diagnosticsClientContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -69,13 +71,13 @@ public class RxGatewayStoreModel implements RxStoreModel, HttpTransportSerialize
private static final boolean leakDetectionDebuggingEnabled = ResourceLeakDetector.getLevel().ordinal() >=
ResourceLeakDetector.Level.ADVANCED.ordinal();
private static final boolean HTTP_CONNECTION_WITHOUT_TLS_ALLOWED = Configs.isHttpConnectionWithoutTLSAllowed();
private static final List<String> headersNeedToBeEscaped = Arrays.asList(
private static final Set<String> headersNeedToBeEscaped = new HashSet<>(Arrays.asList(
HttpConstants.HttpHeaders.PARTITION_KEY,
HttpConstants.HttpHeaders.POST_TRIGGER_EXCLUDE,
HttpConstants.HttpHeaders.POST_TRIGGER_INCLUDE,
HttpConstants.HttpHeaders.PRE_TRIGGER_EXCLUDE,
HttpConstants.HttpHeaders.PRE_TRIGGER_INCLUDE
);
));

private final DiagnosticsClientContext clientContext;
private final Logger logger = LoggerFactory.getLogger(RxGatewayStoreModel.class);
Expand Down Expand Up @@ -211,7 +213,7 @@ public StoreResponse unwrapToStoreResponse(
String endpoint,
RxDocumentServiceRequest request,
int statusCode,
HttpHeaders headers,
Map<String, String> headers,
ByteBuf retainedContent) {

checkNotNull(headers, "Argument 'headers' must not be null.");
Expand All @@ -235,20 +237,20 @@ public StoreResponse unwrapToStoreResponse(
logger.debug("RxGatewayStoreModel before creating StoreResponse - refCnt: {}", retainedContent.refCnt());
}

return new StoreResponse(
return StoreResponse.withLowerCasedHeaders(
endpoint,
statusCode,
HttpUtils.unescape(headers.toLowerCaseMap()),
headers,
new ByteBufInputStream(retainedContent, true),
size);
} else {
retainedContent.release();
}

return new StoreResponse(
return StoreResponse.withLowerCasedHeaders(
endpoint,
statusCode,
HttpUtils.unescape(headers.toLowerCaseMap()),
headers,
null,
0);
}
Expand Down Expand Up @@ -437,8 +439,9 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
.publishOn(CosmosSchedulers.TRANSPORT_RESPONSE_BOUNDED_ELASTIC)
.flatMap(httpResponse -> {

// header key/value pairs
HttpHeaders httpResponseHeaders = httpResponse.headers();
// Build lowercase header map directly from transport headers.
// For HTTP/2, keys are already lowercase (no toLowerCase overhead).
Map<String, String> responseHeaders = HttpUtils.unescape(httpResponse.headersAsLowerCaseMap());
int httpResponseStatus = httpResponse.statusCode();

// Track the retained ByteBuf so we can release it as a safety net in doFinally
Expand Down Expand Up @@ -503,7 +506,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
}
StoreResponse rsp = request
.getEffectiveHttpTransportSerializer(this)
.unwrapToStoreResponse(httpRequest.uri().toString(), request, httpResponseStatus, httpResponseHeaders, content);
.unwrapToStoreResponse(httpRequest.uri().toString(), request, httpResponseStatus, responseHeaders, content);

// Only clear retainedBufRef AFTER StoreResponse successfully takes ownership.
// If unwrapToStoreResponse throws, retainedBufRef remains set so doFinally
Expand Down Expand Up @@ -707,7 +710,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo

private void validateOrThrow(RxDocumentServiceRequest request,
HttpResponseStatus status,
HttpHeaders headers,
Map<String, String> headers,
ByteBuf retainedBodyAsByteBuf) {

int statusCode = status.code();
Expand All @@ -729,7 +732,7 @@ private void validateOrThrow(RxDocumentServiceRequest request,
String.format("%s, StatusCode: %s", cosmosError.getMessage(), statusCodeString),
cosmosError.getPartitionedQueryExecutionInfo());

CosmosException dce = BridgeInternal.createCosmosException(request.requestContext.resourcePhysicalAddress, statusCode, cosmosError, headers.toLowerCaseMap());
CosmosException dce = BridgeInternal.createCosmosException(request.requestContext.resourcePhysicalAddress, statusCode, cosmosError, headers);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
throw dce;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.cosmos.implementation;

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.implementation.directconnectivity.HttpUtils;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConstants;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdFramer;
Expand Down Expand Up @@ -99,7 +100,7 @@ public StoreResponse unwrapToStoreResponse(
String endpoint,
RxDocumentServiceRequest request,
int statusCode,
HttpHeaders headers,
Map<String, String> headers,
ByteBuf content) {

if (content == null) {
Expand Down Expand Up @@ -141,7 +142,7 @@ public StoreResponse unwrapToStoreResponse(
endpoint,
request,
response.getStatus().code(),
new HttpHeaders(response.getHeaders().asMap(request.getActivityId())),
HttpUtils.unescape(new HashMap<>(response.getHeaders().asMap(request.getActivityId()))),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Blocking · Correctness: Mixed-case RNTBD header keys bypass lowercase normalization

new HashMap<>(...) copies keys verbatim from the ImmutableMap returned by RntbdResponseHeaders.asMap(). That map is built using WFConstants.BackendHeaders constants, which include mixed-case values:

// WFConstants.BackendHeaders
public static final String GLOBAL_COMMITTED_LSN = "x-ms-global-Committed-lsn";  // uppercase 'C'

The resulting HashMap therefore contains "x-ms-global-Committed-lsn" as a key. It is then passed to super.unwrapToStoreResponse()StoreResponse.withLowerCasedHeaders(…, keysAlreadyLowerCased=true), which stores the key as-is, without normalisation.

Subsequent lookups call getHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN), which normalises the argument to "x-ms-global-committed-lsn" (all-lowercase) via attribute.toLowerCase(Locale.ROOT). That key is absent from the map, so the lookup returns null and globalCommittedLSN is always −1 in ThinClient mode — silently breaking quorum-read consistency semantics.

Root cause: before this PR, new HttpHeaders(immutableMap) called set() for every entry, which performed name.toLowerCase(Locale.ROOT). The new new HashMap<>(...) copy preserves the original casing.

Fix — normalise during the copy:

Map(String, String) rawHeaders = response.getHeaders().asMap(request.getActivityId());
Map(String, String) loweredHeaders = new HashMap<>(rawHeaders.size());
rawHeaders.forEach((k, v) -> loweredHeaders.put(k.toLowerCase(Locale.ROOT), v));
StoreResponse storeResponse = super.unwrapToStoreResponse(
    endpoint, request, response.getStatus().code(),
    HttpUtils.unescape(loweredHeaders), payloadBuf);

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

payloadBuf
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.azure.cosmos.implementation.ISessionToken;
import com.azure.cosmos.implementation.InternalServerErrorException;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RMResources;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
Expand Down Expand Up @@ -174,19 +173,7 @@ private RxDocumentServiceResponse completeResponse(
StoreResponse storeResponse,
RxDocumentServiceRequest request) throws InternalServerErrorException {

if (storeResponse.getResponseHeaderNames().length != storeResponse.getResponseHeaderValues().length) {
throw new InternalServerErrorException(
Exceptions.getInternalServerErrorMessage(RMResources.InvalidBackendResponse),
HttpConstants.SubStatusCodes.INVALID_BACKEND_RESPONSE);
}

Map<String, String> headers = new HashMap<>(storeResponse.getResponseHeaderNames().length);
for (int idx = 0; idx < storeResponse.getResponseHeaderNames().length; idx++) {
String name = storeResponse.getResponseHeaderNames()[idx];
String value = storeResponse.getResponseHeaderValues()[idx];

headers.put(name, value);
}
Map<String, String> headers = new HashMap<>(storeResponse.getResponseHeaders());

this.updateResponseHeader(request, headers);
this.captureSessionToken(request, headers);
Expand Down
Loading
Loading