Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17688: Http2SolrClient: use Request.Listener not HttpListenerFactory #3233

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
final class HttpSolrClientProvider implements AutoCloseable {

static final String METRIC_SCOPE_NAME = "defaultHttpSolrClientProvider";

private final Http2SolrClient httpSolrClient;

private final InstrumentedHttpListenerFactory trackHttpSolrMetrics;
Expand All @@ -44,7 +43,7 @@ final class HttpSolrClientProvider implements AutoCloseable {
initializeMetrics(parentContext);

Http2SolrClient.Builder httpClientBuilder =
new Http2SolrClient.Builder().withListenerFactory(List.of(trackHttpSolrMetrics));
new Http2SolrClient.Builder().withRequestListeners(List.of(trackHttpSolrMetrics));

if (cfg != null) {
httpClientBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ public void init(PluginInfo info) {
.withIdleTimeout(soTimeout, TimeUnit.MILLISECONDS)
.withExecutor(commExecutor)
.withMaxConnectionsPerHost(maxConnectionsPerHost)
.withRequestListeners(List.of(this.httpListenerFactory))
.build();
this.defaultClient.addListenerFactory(this.httpListenerFactory);
this.loadbalancer = new LBHttp2SolrClient.Builder<Http2SolrClient>(defaultClient).build();

initReplicaListTransformers(getParameter(args, "replicaRouting", null, sb));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.http.protocol.HttpContext;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpListenerFactory;
import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.common.params.ModifiableSolrParams;
Expand Down Expand Up @@ -377,8 +376,8 @@ PublicKey fetchPublicKeyFromRemote(String nodename) {

@Override
public void setup(Http2SolrClient client) {
final HttpListenerFactory.RequestResponseListener listener =
new HttpListenerFactory.RequestResponseListener() {
final var listener =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this listener no per-request state so can simply add this listener to client level.

new Request.Listener() {
private static final String CACHED_REQUEST_USER_KEY = "cachedRequestUser";

@Override
Expand Down Expand Up @@ -432,7 +431,7 @@ private Optional<String> getUserFromJettyRequest(Request request) {
(String) request.getAttributes().get(CACHED_REQUEST_USER_KEY));
}
};
client.addListenerFactory(() -> listener);
client.getHttpClient().getRequestListeners().add(listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
Expand Down Expand Up @@ -127,21 +128,21 @@ public UpdateShardHandler(UpdateShardHandlerConfig cfg) {
Http2SolrClient.Builder recoveryOnlyClientBuilder = new Http2SolrClient.Builder();
if (cfg != null) {
updateOnlyClientBuilder
.withRequestListeners(List.of(trackHttpSolrMetrics))
.withConnectionTimeout(cfg.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS)
.withIdleTimeout(cfg.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS)
.withMaxConnectionsPerHost(cfg.getMaxUpdateConnectionsPerHost());
recoveryOnlyClientBuilder
.withRequestListeners(List.of(trackHttpSolrMetrics))
.withConnectionTimeout(cfg.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS)
.withIdleTimeout(cfg.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS)
.withMaxConnectionsPerHost(cfg.getMaxUpdateConnectionsPerHost());
}

updateOnlyClientBuilder.withTheseParamNamesInTheUrl(urlParamNames);
updateOnlyClient = updateOnlyClientBuilder.build();
updateOnlyClient.addListenerFactory(trackHttpSolrMetrics);

recoveryOnlyClient = recoveryOnlyClientBuilder.build();
recoveryOnlyClient.addListenerFactory(trackHttpSolrMetrics);

ThreadFactory recoveryThreadFactory = new SolrNamedThreadFactory("recoveryExecutor");
if (cfg != null && cfg.getMaxRecoveryThreads() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@
import io.opentelemetry.api.trace.Span;
import java.util.Locale;
import java.util.Map;
import org.apache.solr.client.solrj.impl.HttpListenerFactory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.metrics.SolrMetricProducer;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.util.tracing.TraceUtils;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;

/**
* A HttpListenerFactory tracking metrics and distributed tracing. The Metrics are inspired and
* A Jetty request listener tracking metrics and distributed tracing. The Metrics are inspired and
* partially copied from dropwizard httpclient library.
*/
public class InstrumentedHttpListenerFactory implements SolrMetricProducer, HttpListenerFactory {
public class InstrumentedHttpListenerFactory implements Request.Listener, SolrMetricProducer {
// TODO rename to maybe InstrumentionJettyRequestListener

public interface NameStrategy {
String getNameFor(String scope, Request request);
Expand Down Expand Up @@ -86,38 +87,44 @@ private static String methodNameString(Request request) {
}

@Override
public RequestResponseListener get() {
return new RequestResponseListener() {
Timer.Context timerContext;
Span span = Span.getInvalid();

@Override
public void onQueued(Request request) {
// do tracing onQueued because it's called from Solr's thread
span = Span.current();
TraceUtils.injectTraceContext(request);
}
public final void onQueued(Request req) {
var listener = new PerRequestListener(req);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this scenario is maybe a bit more awkward because on onQueued, we need to register a listener specific for this request because the listener has per-request state (fields). This was previously happening for all listeners in Http2SolrClient but you'll see it gets removed there now.

req.onRequestBegin(listener);
req.onComplete(listener);
}

/** A per-request instantiated listener. */
private class PerRequestListener implements Request.BeginListener, Response.CompleteListener {

final Span span;
Timer.Context timerContext;

// called onQueued
PerRequestListener(Request request) {
// do tracing onQueued because it's called from Solr's thread
span = Span.current();
TraceUtils.injectTraceContext(request);
}

@Override
public void onBegin(Request request) {
if (solrMetricsContext != null) {
timerContext = timer(request).time();
}
if (span.isRecording()) {
span.addEvent("Client Send"); // perhaps delayed a bit after the span started in enqueue
}
@Override
public void onBegin(Request request) {
if (solrMetricsContext != null) {
timerContext = timer(request).time();
}
if (span.isRecording()) {
span.addEvent("Client Send"); // perhaps delayed a bit after the span started in enqueue
}
}

@Override
public void onComplete(Result result) {
if (timerContext != null) {
timerContext.stop();
}
if (result.isFailed() && span.isRecording()) {
span.addEvent(result.toString()); // logs failure info and interesting stuff
}
@Override
public void onComplete(Result result) {
if (timerContext != null) {
timerContext.stop();
}
};
if (result.isFailed() && span.isRecording()) {
span.addEvent(result.toString()); // logs failure info and interesting stuff
}
}
}

private Timer timer(Request request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.net.CookieStore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand All @@ -47,7 +46,6 @@
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.SSLConfig;
import org.apache.solr.client.solrj.impl.HttpListenerFactory.RequestResponseListener;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
Expand All @@ -64,13 +62,11 @@
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.Origin.Address;
import org.eclipse.jetty.client.Origin.Protocol;
import org.eclipse.jetty.client.ProtocolHandlers;
import org.eclipse.jetty.client.ProxyConfiguration;
import org.eclipse.jetty.client.Socks4Proxy;
import org.eclipse.jetty.client.api.AuthenticationStore;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.util.InputStreamRequestContent;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
Expand All @@ -94,15 +90,10 @@
import org.slf4j.MDC;

/**
* Difference between this {@link Http2SolrClient} and {@link HttpSolrClient}:
* A {@link SolrClient} based on Jetty {@link HttpClient}. The class name is unfortunate, as it
* supports both HTTP/1.1 AND HTTP/2.
*
* <ul>
* <li>{@link Http2SolrClient} sends requests in HTTP/2
* <li>{@link Http2SolrClient} can point to multiple urls
* <li>{@link Http2SolrClient} does not expose its internal httpClient like {@link
* HttpSolrClient#getHttpClient()}, sharing connection pools should be done by {@link
* Http2SolrClient.Builder#withHttpClient(Http2SolrClient)}
* </ul>
* <p>Use {@link Builder#Builder(String)} to create one.
*/
public class Http2SolrClient extends HttpSolrClientBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
Expand All @@ -114,7 +105,6 @@ public class Http2SolrClient extends HttpSolrClientBase {

private final HttpClient httpClient;

private List<HttpListenerFactory> listenerFactory = new ArrayList<>();
protected AsyncTracker asyncTracker = new AsyncTracker();

private final boolean closeClient;
Expand All @@ -140,9 +130,7 @@ protected Http2SolrClient(String serverBaseUrl, Builder builder) {
this.httpClient = createHttpClient(builder);
this.closeClient = true;
}
if (builder.listenerFactory != null) {
this.listenerFactory.addAll(builder.listenerFactory);
}

updateDefaultMimeTypeForParser();

this.httpClient.setFollowRedirects(Boolean.TRUE.equals(builder.followRedirects));
Expand All @@ -158,24 +146,12 @@ private void initAuthStoreFromExistingClient(HttpClient httpClient) {
this.authenticationStore = (AuthenticationStoreHolder) httpClient.getAuthenticationStore();
}

@Deprecated(since = "9.7")
public void addListenerFactory(HttpListenerFactory factory) {
this.listenerFactory.add(factory);
}

// internal usage only
HttpClient getHttpClient() {
/** internal use only */
public HttpClient getHttpClient() {
return httpClient;
}

// internal usage only
ProtocolHandlers getProtocolHandlers() {
return httpClient.getProtocolHandlers();
}

private HttpClient createHttpClient(Builder builder) {
HttpClient httpClient;

executor = builder.executor;
if (executor == null) {
BlockingArrayQueue<Runnable> queue = new BlockingArrayQueue<>(256, 256);
Expand Down Expand Up @@ -224,6 +200,7 @@ private HttpClient createHttpClient(Builder builder) {
clientConnector.setSelectors(2);

HttpClientTransport transport;
HttpClient httpClient;
if (builder.useHttp1_1) {
if (log.isDebugEnabled()) {
log.debug("Create Http2SolrClient with HTTP/1.1 transport");
Expand Down Expand Up @@ -265,6 +242,10 @@ private HttpClient createHttpClient(Builder builder) {

setupProxy(builder, httpClient);

if (builder.requestListeners != null) {
httpClient.getRequestListeners().addAll(builder.requestListeners);
}

try {
httpClient.start();
} catch (Exception e) {
Expand Down Expand Up @@ -465,16 +446,16 @@ public void onHeaders(Response response) {
try {
NamedList<Object> body =
processErrorsAndResponse(solrRequest, response, is, url);
mdcCopyHelper.onBegin(null);
mdcCopyHelper.onBegin();
log.debug("response processing success");
future.complete(body);
} catch (SolrClient.RemoteSolrException | SolrServerException e) {
mdcCopyHelper.onBegin(null);
mdcCopyHelper.onBegin();
log.debug("response processing failed", e);
future.completeExceptionally(e);
} finally {
log.debug("response processing completed");
mdcCopyHelper.onComplete(null);
mdcCopyHelper.onComplete();
}
});
}
Expand Down Expand Up @@ -632,12 +613,6 @@ private void decorateRequest(Request req, SolrRequest<?> solrRequest, boolean is
}

setBasicAuthHeader(solrRequest, req);
for (HttpListenerFactory factory : listenerFactory) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer need to do this; the listeners are now at httpClient level, not request level. Granted the listener onQueued can choose to register another listener (and one does do this).

HttpListenerFactory.RequestResponseListener listener = factory.get();
listener.onQueued(req);
req.onRequestBegin(listener);
req.onComplete(listener);
}

if (isAsync) {
req.onRequestQueued(asyncTracker.queuedListener);
Expand Down Expand Up @@ -900,7 +875,7 @@ public static class Builder

protected Long keyStoreReloadIntervalSecs;

private List<HttpListenerFactory> listenerFactory;
private List<Request.Listener> requestListeners;

public Builder() {
super();
Expand All @@ -926,8 +901,9 @@ public Builder(String baseSolrUrl) {
this.baseSolrUrl = baseSolrUrl;
}

public Http2SolrClient.Builder withListenerFactory(List<HttpListenerFactory> listenerFactory) {
this.listenerFactory = listenerFactory;
/** These Jetty {@link Request.Listener}s are registered on the internal {@link HttpClient}. */
public Http2SolrClient.Builder withRequestListeners(List<Request.Listener> requestListeners) {
this.requestListeners = requestListeners;
return this;
}

Expand Down Expand Up @@ -1103,10 +1079,6 @@ public Builder withHttpClient(Http2SolrClient http2SolrClient) {
if (this.urlParamNames == null) {
this.urlParamNames = http2SolrClient.urlParamNames;
}
if (this.listenerFactory == null) {
this.listenerFactory = new ArrayList<HttpListenerFactory>();
http2SolrClient.listenerFactory.forEach(this.listenerFactory::add);
}
if (this.executor == null) {
this.executor = http2SolrClient.executor;
}
Expand Down Expand Up @@ -1165,22 +1137,19 @@ static SslContextFactory.Client getDefaultSslContextFactory() {
}

/**
* Helper class in change of copying MDC context across all threads involved in processing a
* request. This does not strictly need to be a RequestResponseListener, but using it since it
* already provides hooks into the request processing lifecycle.
* Helper class in charge of copying MDC context across all threads involved in processing a
* request.
*/
private static class MDCCopyHelper extends RequestResponseListener {
private static class MDCCopyHelper {
private final Map<String, String> submitterContext = MDC.getCopyOfContextMap();
private Map<String, String> threadContext;

@Override
public void onBegin(Request request) {
void onBegin() {
threadContext = MDC.getCopyOfContextMap();
updateContextMap(submitterContext);
}

@Override
public void onComplete(Result result) {
void onComplete() {
updateContextMap(threadContext);
}

Expand Down
Loading
Loading