Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/io/undertow/UndertowLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -488,4 +488,8 @@ void nodeConfigCreated(URI connectionURI, String balancer, String domain, String
@LogMessage(level = WARN)
@Message(id = 5107, value = "Failed to set web socket timeout.")
void failedToSetWSTimeout(@Cause Exception e);

@LogMessage(level = WARN)
@Message(id = 5108, value = "Failed to transition to '%s' state in '%s'.")
void failedToTransitionToState(String state, Object src);
}
35 changes: 21 additions & 14 deletions core/src/main/java/io/undertow/client/ajp/AjpClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import io.undertow.client.ClientStatistics;
import org.jboss.logging.Logger;
Expand Down Expand Up @@ -103,23 +104,30 @@ public void handleEvent(AjpClientResponseStreamSourceChannel channel) {
private static final int CLOSE_REQ = 1 << 30;
private static final int CLOSED = 1 << 31;

private int state;
@SuppressWarnings("unused")
private volatile int state;
private static final AtomicIntegerFieldUpdater<AjpClientConnection> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(AjpClientConnection.class, "state");

private final ChannelListener.SimpleSetter<AjpClientConnection> closeSetter = new ChannelListener.SimpleSetter<>();
private final ClientStatistics clientStatistics;
private final List<ChannelListener<ClientConnection>> closeListeners = new CopyOnWriteArrayList<>();

AjpClientConnection(final AjpClientChannel connection, final OptionMap options, final ByteBufferPool bufferPool, ClientStatistics clientStatistics) {
AjpClientConnection(final AjpClientChannel channel, final OptionMap options, final ByteBufferPool bufferPool, ClientStatistics clientStatistics) {
this.clientStatistics = clientStatistics;
this.options = options;
this.connection = connection;
this.connection = channel;
this.bufferPool = bufferPool;

connection.addCloseTask(new ChannelListener<AjpClientChannel>() {
channel.addCloseTask(new ChannelListener<AjpClientChannel>() {
@Override
public void handleEvent(AjpClientChannel channel) {
log.debugf("connection to %s closed", getPeerAddress());
AjpClientConnection.this.state |= CLOSED;
final int oldVal = stateUpdater.getAndAccumulate(AjpClientConnection.this, CLOSED, (currentState, flag)-> currentState | flag);
if(anyAreSet(oldVal, CLOSED)) {
//this was closed already?
UndertowLogger.ROOT_LOGGER.failedToTransitionToState("CLOSED", AjpClientConnection.this);
return;
}
ChannelListeners.invokeChannelListener(AjpClientConnection.this, closeSetter.get());
for(ChannelListener<ClientConnection> listener : closeListeners) {
listener.handleEvent(AjpClientConnection.this);
Expand All @@ -135,8 +143,8 @@ public void handleEvent(AjpClientChannel channel) {
}
}
});
connection.getReceiveSetter().set(new ClientReceiveListener());
connection.resumeReceives();
channel.getReceiveSetter().set(new ClientReceiveListener());
channel.resumeReceives();
}

@Override
Expand Down Expand Up @@ -236,7 +244,7 @@ public void sendRequest(final ClientRequest request, final ClientCallback<Client
if (anyAreSet(state, UPGRADE_REQUESTED | UPGRADED)) {
clientCallback.failed(UndertowClientMessages.MESSAGES.invalidConnectionState());
return;
} else if (anyAreSet(state, CLOSE_REQ | CLOSED)) {
} else if (anyAreSet(state, CLOSED | CLOSE_REQ)) {
clientCallback.failed(UndertowClientMessages.MESSAGES.closedConnectionState());
return;
}
Expand Down Expand Up @@ -266,13 +274,13 @@ private void initiateRequest(AjpClientExchange AjpClientExchange) {
String connectionString = request.getRequestHeaders().getFirst(CONNECTION);
if (connectionString != null) {
if (CLOSE.equalToString(connectionString)) {
state |= CLOSE_REQ;
stateUpdater.getAndAccumulate(AjpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag);
}
} else if (request.getProtocol() != Protocols.HTTP_1_1) {
state |= CLOSE_REQ;
stateUpdater.getAndAccumulate(AjpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag);
}
if (request.getRequestHeaders().contains(UPGRADE)) {
state |= UPGRADE_REQUESTED;
stateUpdater.getAndAccumulate(AjpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag);
}

long length = 0;
Expand Down Expand Up @@ -327,7 +335,7 @@ public void close() throws IOException {
if (anyAreSet(state, CLOSED)) {
return;
}
state |= CLOSED | CLOSE_REQ;
stateUpdater.accumulateAndGet(this, CLOSED | CLOSE_REQ, (currentState, flag)-> currentState | flag);
connection.close();
}

Expand All @@ -336,7 +344,6 @@ public void close() throws IOException {
*/
public void requestDone() {
currentRequest = null;

if (anyAreSet(state, CLOSE_REQ)) {
safeClose(connection);
} else if (anyAreSet(state, UPGRADE_REQUESTED)) {
Expand All @@ -352,7 +359,7 @@ public void requestDone() {
}

public void requestClose() {
state |= CLOSE_REQ;
stateUpdater.getAndAccumulate(AjpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag);
}


Expand Down
17 changes: 10 additions & 7 deletions core/src/main/java/io/undertow/client/ajp/AjpClientExchange.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.xnio.channels.StreamSourceChannel;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static org.xnio.Bits.anyAreSet;

Expand All @@ -58,7 +59,9 @@ class AjpClientExchange extends AbstractAttachable implements ClientExchange {
private AjpClientResponseStreamSourceChannel responseChannel;
private AjpClientRequestClientStreamSinkChannel requestChannel;

private int state = 0;
@SuppressWarnings("unused")
private volatile int state = 0;
private static final AtomicIntegerFieldUpdater<AjpClientExchange> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(AjpClientExchange.class, "state");
private static final int REQUEST_TERMINATED = 1;
private static final int RESPONSE_TERMINATED = 1 << 1;

Expand All @@ -78,19 +81,19 @@ class AjpClientExchange extends AbstractAttachable implements ClientExchange {
}

void terminateRequest() {
state |= REQUEST_TERMINATED;
stateUpdater.accumulateAndGet(this, REQUEST_TERMINATED, (currentState, flag)-> currentState | flag);
if(!clientConnection.isOpen()) {
state |= RESPONSE_TERMINATED;
stateUpdater.accumulateAndGet(this, RESPONSE_TERMINATED, (currentState, flag)-> currentState | flag);
}
if (anyAreSet(state, RESPONSE_TERMINATED)) {
clientConnection.requestDone();
}
}

void terminateResponse() {
state |= RESPONSE_TERMINATED;
stateUpdater.accumulateAndGet(this, RESPONSE_TERMINATED, (currentState, flag)-> currentState | flag);
if(!clientConnection.isOpen()) {
state |= REQUEST_TERMINATED;
stateUpdater.accumulateAndGet(this, REQUEST_TERMINATED, (currentState, flag)-> currentState | flag);
}
if (anyAreSet(state, REQUEST_TERMINATED)) {
clientConnection.requestDone();
Expand Down Expand Up @@ -155,7 +158,7 @@ public StreamSinkChannel getRequestChannel() {
return new DetachableStreamSinkChannel(requestChannel) {
@Override
protected boolean isFinished() {
return anyAreSet(state, REQUEST_TERMINATED);
return anyAreSet(AjpClientExchange.this.state, REQUEST_TERMINATED);
}
};
}
Expand All @@ -165,7 +168,7 @@ public StreamSourceChannel getResponseChannel() {
return new DetachableStreamSourceChannel(responseChannel) {
@Override
protected boolean isFinished() {
return anyAreSet(state, RESPONSE_TERMINATED);
return anyAreSet(AjpClientExchange.this.state, RESPONSE_TERMINATED);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static io.undertow.client.UndertowClientMessages.MESSAGES;
import static org.xnio.Bits.allAreClear;
Expand Down Expand Up @@ -132,7 +133,9 @@ public void handleEvent(StreamSourceConduit channel) {
private static final int CLOSE_REQ = 1 << 30;
private static final int CLOSED = 1 << 31;

private int state;
@SuppressWarnings("unused")
private volatile int state;
private static final AtomicIntegerFieldUpdater<HttpClientConnection> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(HttpClientConnection.class, "state");
private final ChannelListener.SimpleSetter<HttpClientConnection> closeSetter = new ChannelListener.SimpleSetter<>();

private final ClientStatistics clientStatistics;
Expand Down Expand Up @@ -178,7 +181,13 @@ public void activity(long bytes) {

public void handleEvent(StreamConnection channel) {
log.debugf("connection to %s closed", getPeerAddress());
HttpClientConnection.this.state |= CLOSED;
final int oldVal = stateUpdater.getAndAccumulate(HttpClientConnection.this, CLOSED, (currentState, flag)-> currentState | flag);
if(anyAreSet(oldVal, CLOSED)) {
//this was closed already?
UndertowLogger.ROOT_LOGGER.failedToTransitionToState("CLOSED", HttpClientConnection.this);
//NOTE: cant do that....
//return;
}
ChannelListeners.invokeChannelListener(HttpClientConnection.this, closeSetter.get());
try {
if (pooledBuffer != null) {
Expand Down Expand Up @@ -256,7 +265,7 @@ public boolean isOpen() {
if(http2Delegate != null) {
return http2Delegate.isOpen();
}
return connection.isOpen() && allAreClear(state, CLOSE_REQ | CLOSED);
return connection.isOpen() && allAreClear(state, CLOSED | CLOSE_REQ);
}

@Override
Expand Down Expand Up @@ -348,10 +357,10 @@ public void sendRequest(final ClientRequest request, final ClientCallback<Client
http2Delegate.sendRequest(request, clientCallback);
return;
}
if (anyAreSet(state, UPGRADE_REQUESTED | UPGRADED)) {
if (anyAreSet(state,UPGRADE_REQUESTED | UPGRADED)) {
clientCallback.failed(UndertowClientMessages.MESSAGES.invalidConnectionState());
return;
} else if (anyAreSet(state, CLOSE_REQ | CLOSED)) {
} else if (anyAreSet(state, CLOSED | CLOSE_REQ)) {
clientCallback.failed(UndertowClientMessages.MESSAGES.closedConnectionState());
return;
}
Expand Down Expand Up @@ -381,19 +390,19 @@ private void initiateRequest(HttpClientExchange httpClientExchange) {
String connectionString = request.getRequestHeaders().getFirst(Headers.CONNECTION);
if (connectionString != null) {
if (Headers.CLOSE.equalToString(connectionString)) {
state |= CLOSE_REQ;
stateUpdater.accumulateAndGet(HttpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag);
} else if (Headers.UPGRADE.equalToString(connectionString)) {
state |= UPGRADE_REQUESTED;
stateUpdater.accumulateAndGet(HttpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag);
}
} else if (request.getProtocol() != Protocols.HTTP_1_1) {
state |= CLOSE_REQ;
stateUpdater.accumulateAndGet(HttpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag);
}
if (request.getRequestHeaders().contains(Headers.UPGRADE)) {
state |= UPGRADE_REQUESTED;
stateUpdater.accumulateAndGet(HttpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag);
}
if(request.getMethod().equals(Methods.CONNECT)) {
//we treat CONNECT like upgrade requests
state |= UPGRADE_REQUESTED;
stateUpdater.accumulateAndGet(HttpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag);
}

//setup the client request conduits
Expand Down Expand Up @@ -476,7 +485,7 @@ public StreamConnection performUpgrade() throws IOException {
if (allAreSet(state, UPGRADED | CLOSE_REQ | CLOSED)) {
throw new IOException(UndertowClientMessages.MESSAGES.connectionClosed());
}
state |= UPGRADED;
stateUpdater.accumulateAndGet(this, UPGRADED, (currentState, flag)-> currentState | flag);
connection.getSinkChannel().setConduit(originalSinkConduit);
connection.getSourceChannel().setConduit(pushBackStreamSourceConduit);
return connection;
Expand All @@ -490,7 +499,7 @@ public void close() throws IOException {
if (anyAreSet(state, CLOSED)) {
return;
}
state |= CLOSED | CLOSE_REQ;
stateUpdater.accumulateAndGet(this, CLOSED | CLOSE_REQ, (currentState, flag)-> currentState | flag);
ConnectionUtils.cleanClose(connection);
}

Expand All @@ -508,7 +517,7 @@ public void exchangeDone() {
if (anyAreSet(state, CLOSE_REQ)) {
currentRequest = null;
pendingResponse = null;
this.state |= CLOSED;
stateUpdater.accumulateAndGet(this, CLOSED, (currentState, flag)-> currentState | flag);
safeClose(connection);
} else if (anyAreSet(state, UPGRADE_REQUESTED)) {
connection.getSourceChannel().suspendReads();
Expand Down Expand Up @@ -630,7 +639,7 @@ public void handleEvent(StreamSourceChannel channel) {
if ((connectionString == null || !Headers.UPGRADE.equalToString(connectionString)) && !response.getResponseHeaders().contains(Headers.UPGRADE)) {
if(!currentRequest.getRequest().getMethod().equals(Methods.CONNECT) || response.getResponseCode() != 200) { //make sure it was not actually a connect request
//just unset the upgrade requested flag
HttpClientConnection.this.state &= ~UPGRADE_REQUESTED;
stateUpdater.accumulateAndGet(HttpClientConnection.this, ~UPGRADE_REQUESTED, (currentState, flag)-> currentState & flag);
}
}
}
Expand All @@ -647,7 +656,7 @@ public void handleEvent(StreamSourceChannel channel) {
close = true;
}
if(close) {
HttpClientConnection.this.state |= CLOSE_REQ;
stateUpdater.accumulateAndGet(HttpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag);
//we are going to close, kill any queued connections
HttpClientExchange ex = pendingQueue.poll();
while (ex != null) {
Expand All @@ -673,7 +682,7 @@ public void handleEvent(StreamSourceChannel channel) {
currentRequest.setResponse(response);
if(response.getResponseCode() == StatusCodes.EXPECTATION_FAILED) {
if(HttpContinue.requiresContinueResponse(currentRequest.getRequest().getRequestHeaders())) {
HttpClientConnection.this.state |= CLOSE_REQ;
stateUpdater.accumulateAndGet(HttpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag);
ConduitStreamSinkChannel sinkChannel = HttpClientConnection.this.connection.getSinkChannel();
sinkChannel.shutdownWrites();
if(!sinkChannel.flush()) {
Expand Down Expand Up @@ -749,7 +758,7 @@ private void prepareResponseChannel(ClientResponse response, ClientExchange exch
connection.getSourceChannel().setConduit(new FixedLengthStreamSourceConduit(connection.getSourceChannel().getConduit(), 0, responseFinishedListener));
} else {
connection.getSourceChannel().setConduit(new FinishableStreamSourceConduit(connection.getSourceChannel().getConduit(), responseFinishedListener));
state |= CLOSE_REQ;
stateUpdater.accumulateAndGet(this, CLOSE_REQ, (currentState, flag)-> currentState | flag);
}
}

Expand Down
23 changes: 13 additions & 10 deletions core/src/main/java/io/undertow/client/http/HttpClientExchange.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.xnio.channels.StreamSourceChannel;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static org.xnio.Bits.anyAreSet;

Expand All @@ -57,7 +58,9 @@ class HttpClientExchange extends AbstractAttachable implements ClientExchange {
private IOException failedReason;
private HttpRequestConduit requestConduit;

private int state = 0;
@SuppressWarnings("unused")
private volatile int state;
private static final AtomicIntegerFieldUpdater<HttpClientExchange> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(HttpClientExchange.class, "state");
private static final int REQUEST_TERMINATED = 1;
private static final int RESPONSE_TERMINATED = 1 << 1;

Expand All @@ -81,28 +84,28 @@ public void setRequestConduit(HttpRequestConduit requestConduit) {
}

void terminateRequest() {
if(anyAreSet(state, REQUEST_TERMINATED)) {
if(anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED)) {
return;
}
log.debugf("request terminated for request to %s %s", clientConnection.getPeerAddress(), getRequest().getPath());
state |= REQUEST_TERMINATED;
stateUpdater.accumulateAndGet(this, REQUEST_TERMINATED, (currentState, flag)-> currentState | flag);
clientConnection.requestDataSent();
if (anyAreSet(state, RESPONSE_TERMINATED)) {
if (anyAreSet(HttpClientExchange.this.state, RESPONSE_TERMINATED)) {
clientConnection.exchangeDone();
}
}

boolean isRequestDataSent() {
return anyAreSet(state, REQUEST_TERMINATED);
return anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED);
}

void terminateResponse() {
if(anyAreSet(state, RESPONSE_TERMINATED)) {
if(anyAreSet(HttpClientExchange.this.state, RESPONSE_TERMINATED)) {
return;
}
log.debugf("response terminated for request to %s %s", clientConnection.getPeerAddress(), getRequest().getPath());
state |= RESPONSE_TERMINATED;
if (anyAreSet(state, REQUEST_TERMINATED)) {
stateUpdater.accumulateAndGet(this, RESPONSE_TERMINATED, (currentState, flag)-> currentState | flag);
if (anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED)) {
clientConnection.exchangeDone();
}
}
Expand Down Expand Up @@ -168,7 +171,7 @@ public StreamSinkChannel getRequestChannel() {
return new DetachableStreamSinkChannel(clientConnection.getConnection().getSinkChannel()) {
@Override
protected boolean isFinished() {
return anyAreSet(state, REQUEST_TERMINATED);
return anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED);
}
};
}
Expand All @@ -178,7 +181,7 @@ public StreamSourceChannel getResponseChannel() {
return new DetachableStreamSourceChannel(clientConnection.getConnection().getSourceChannel()) {
@Override
protected boolean isFinished() {
return anyAreSet(state, RESPONSE_TERMINATED);
return anyAreSet(HttpClientExchange.this.state, RESPONSE_TERMINATED);
}
};
}
Expand Down
Loading