Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,15 @@ protected boolean tryTransfer(
clearEventsReferenceCount();
connector.eliminateHandler(this, true);
client.setShouldReturnSelf(true);
try {
client.returnSelf();
} catch (final IllegalStateException e) {
LOGGER.info(
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
}
client.returnSelf(
(e) -> {
if (e instanceof IllegalStateException) {
LOGGER.info(
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
return true;
}
return false;
});
this.client = null;
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,15 @@ private void returnClientIfNecessary() {
}

client.setShouldReturnSelf(true);
try {
client.returnSelf();
} catch (final IllegalStateException e) {
LOGGER.info(
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
}
client.returnSelf(
(e) -> {
if (e instanceof IllegalStateException) {
LOGGER.info(
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
return true;
}
return false;
});
client = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.function.Function;

public class ClientManager<K, V> implements IClientManager<K, V> {

Expand Down Expand Up @@ -79,6 +80,30 @@ public void returnClient(K node, V client) {
}
}

/**
* return a client V for node K to the {@link ClientManager}, and ignore some exception
*
* <p>Note: We do not define this interface in {@link IClientManager} to make you aware that the
* return of a client is automatic whenever a particular client is used.
*/
public void returnClient(K node, V client, Function<Exception, Boolean> ignoreError) {
if (node != null) {
try {
pool.returnObject(node, client);
} catch (Exception e) {
if (!Boolean.TRUE.equals(ignoreError.apply(e))) {
LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e);
}
}
} else if (client instanceof ThriftClient) {
((ThriftClient) client).invalidateAll();
LOGGER.warn(
"Return client {} to pool failed because the node is null. "
+ "This may cause resource leak, please check your code.",
client);
}
}

@Override
public void clear(K node) {
Optional.ofNullable(node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncClient
implements ThriftClient {
Expand Down Expand Up @@ -84,7 +85,8 @@ public void onComplete() {
public void onError(final Exception e) {
super.onError(e);
ThriftClient.resolveException(e, this);
returnSelf();
returnSelf(
(i) -> i instanceof IllegalStateException && "Client has an error!".equals(i.getMessage()));
}

@Override
Expand Down Expand Up @@ -114,6 +116,16 @@ public void returnSelf() {
}
}

/**
* return self, the method doesn't need to be called by the user and will be triggered after the
* RPC is finished.
*/
public void returnSelf(Function<Exception, Boolean> ignoreError) {
if (shouldReturnSelf.get()) {
clientManager.returnClient(endpoint, this, ignoreError);
}
}

public void setShouldReturnSelf(final boolean shouldReturnSelf) {
this.shouldReturnSelf.set(shouldReturnSelf);
}
Expand Down
Loading