1
1
package org .prebid .cache .handlers ;
2
2
3
- import com .codahale .metrics .Timer ;
4
3
import com .github .benmanes .caffeine .cache .Caffeine ;
5
4
import io .github .resilience4j .circuitbreaker .CircuitBreaker ;
6
5
import io .github .resilience4j .reactor .circuitbreaker .operator .CircuitBreakerOperator ;
10
9
import org .apache .http .client .utils .URIBuilder ;
11
10
import org .prebid .cache .builders .PrebidServerResponseBuilder ;
12
11
import org .prebid .cache .exceptions .UnsupportedMediaTypeException ;
13
- import org .prebid .cache .metrics .GraphiteMetricsRecorder ;
14
12
import org .prebid .cache .metrics .MetricsRecorder ;
13
+ import org .prebid .cache .metrics .MetricsRecorder .MetricsRecorderTimer ;
15
14
import org .prebid .cache .model .PayloadWrapper ;
16
15
import org .prebid .cache .repository .CacheConfig ;
17
16
import org .prebid .cache .repository .ReactiveRepository ;
@@ -47,7 +46,7 @@ public class GetCacheHandler extends CacheHandler {
47
46
public GetCacheHandler (final ReactiveRepository <PayloadWrapper , String > repository ,
48
47
final CacheConfig config ,
49
48
final ApiConfig apiConfig ,
50
- final GraphiteMetricsRecorder metricsRecorder ,
49
+ final MetricsRecorder metricsRecorder ,
51
50
final PrebidServerResponseBuilder builder ,
52
51
final CircuitBreaker circuitBreaker ) {
53
52
this .metricsRecorder = metricsRecorder ;
@@ -72,8 +71,7 @@ private static Map<String, WebClient> createClientsCache(final int ttl, final in
72
71
public Mono <ServerResponse > fetch (ServerRequest request ) {
73
72
// metrics
74
73
metricsRecorder .markMeterForTag (this .metricTagPrefix , MetricsRecorder .MeasurementTag .REQUEST );
75
- final var timerContext =
76
- metricsRecorder .createRequestContextTimerOptionalForServiceType (this .type ).orElse (null );
74
+ final var timerContext = metricsRecorder .createRequestTimerForServiceType (this .type );
77
75
78
76
return request .queryParam (ID_KEY ).map (id -> fetch (request , id , timerContext )).orElseGet (() -> {
79
77
final var responseMono = ErrorHandler .createInvalidParameters ();
@@ -83,14 +81,14 @@ public Mono<ServerResponse> fetch(ServerRequest request) {
83
81
84
82
private Mono <ServerResponse > fetch (final ServerRequest request ,
85
83
final String id ,
86
- final Timer . Context timerContext ) {
84
+ final MetricsRecorderTimer timerContext ) {
87
85
88
86
final var cacheUrl = resolveCacheUrl (request );
89
87
90
88
final var responseMono =
91
89
StringUtils .containsIgnoreCase (cacheUrl , config .getAllowedProxyHost ())
92
- ? processProxyRequest (request , id , cacheUrl )
93
- : processRequest (request , id );
90
+ ? processProxyRequest (request , id , cacheUrl )
91
+ : processRequest (request , id );
94
92
95
93
return finalizeResult (responseMono , request , timerContext );
96
94
}
@@ -99,10 +97,10 @@ private String resolveCacheUrl(final ServerRequest request) {
99
97
final var cacheHostParam = request .queryParam (CACHE_HOST_KEY ).orElse (null );
100
98
if (StringUtils .isNotBlank (cacheHostParam )) {
101
99
return new URIBuilder ()
102
- .setHost (cacheHostParam )
103
- .setPath (apiConfig .getPath ())
104
- .setScheme (config .getHostParamProtocol ())
105
- .toString ();
100
+ .setHost (cacheHostParam )
101
+ .setPath (apiConfig .getPath ())
102
+ .setScheme (config .getHostParamProtocol ())
103
+ .toString ();
106
104
}
107
105
108
106
return null ;
@@ -115,66 +113,56 @@ private Mono<ServerResponse> processProxyRequest(final ServerRequest request,
115
113
final var webClient = clientsCache .computeIfAbsent (cacheUrl , WebClient ::create );
116
114
117
115
return webClient .get ()
118
- .uri (uriBuilder -> uriBuilder .queryParam (ID_KEY , idKeyParam ).build ())
119
- .headers (httpHeaders -> httpHeaders .addAll (request .headers ().asHttpHeaders ()))
120
- .exchange ()
121
- .transform (CircuitBreakerOperator .of (circuitBreaker ))
122
- .timeout (Duration .ofMillis (config .getTimeoutMs ()))
123
- .subscribeOn (Schedulers .parallel ())
124
- .handle (this ::updateProxyMetrics )
125
- .flatMap (GetCacheHandler ::fromClientResponse )
126
- .doOnError (error -> {
127
- metricsRecorder .getProxyFailure ().mark ();
128
- log .info ("Failed to send request: '{}', cause: '{}'" ,
129
- ExceptionUtils .getMessage (error ), ExceptionUtils .getMessage (error ));
130
-
131
- });
116
+ .uri (uriBuilder -> uriBuilder .queryParam (ID_KEY , idKeyParam ).build ())
117
+ .headers (httpHeaders -> httpHeaders .addAll (request .headers ().asHttpHeaders ()))
118
+ .exchange ()
119
+ .transform (CircuitBreakerOperator .of (circuitBreaker ))
120
+ .timeout (Duration .ofMillis (config .getTimeoutMs ()))
121
+ .subscribeOn (Schedulers .parallel ())
122
+ .handle (this ::updateProxyMetrics )
123
+ .flatMap (GetCacheHandler ::fromClientResponse )
124
+ .doOnError (error -> {
125
+ metricsRecorder .getProxyFailure ().increment ();
126
+ log .info ("Failed to send request: '{}', cause: '{}'" ,
127
+ ExceptionUtils .getMessage (error ), ExceptionUtils .getMessage (error ));
128
+ });
132
129
}
133
130
134
131
private void updateProxyMetrics (final ClientResponse clientResponse ,
135
132
final SynchronousSink <ClientResponse > sink ) {
136
133
if (HttpStatus .OK .equals (clientResponse .statusCode ())) {
137
- metricsRecorder .getProxySuccess ().mark ();
134
+ metricsRecorder .getProxySuccess ().increment ();
138
135
} else {
139
- metricsRecorder .getProxyFailure ().mark ();
136
+ metricsRecorder .getProxyFailure ().increment ();
140
137
}
141
138
142
139
sink .next (clientResponse );
143
140
}
144
141
145
142
private static Mono <ServerResponse > fromClientResponse (final ClientResponse clientResponse ) {
146
143
return ServerResponse .status (clientResponse .statusCode ())
147
- .headers (headerConsumer -> clientResponse .headers ().asHttpHeaders ().forEach (headerConsumer ::addAll ))
148
- .body (clientResponse .bodyToMono (String .class ), String .class );
144
+ .headers (headerConsumer -> clientResponse .headers ().asHttpHeaders ().forEach (headerConsumer ::addAll ))
145
+ .body (clientResponse .bodyToMono (String .class ), String .class );
149
146
}
150
147
151
148
private Mono <ServerResponse > processRequest (final ServerRequest request , final String keyIdParam ) {
152
149
final var normalizedId = String .format ("%s%s" , config .getPrefix (), keyIdParam );
153
150
return repository .findById (normalizedId )
154
- .transform (CircuitBreakerOperator .of (circuitBreaker ))
155
- .timeout (Duration .ofMillis (config .getTimeoutMs ()))
156
- .subscribeOn (Schedulers .parallel ())
157
- .transform (this ::validateErrorResult )
158
- .flatMap (wrapper -> createServerResponse (wrapper , request ))
159
- .switchIfEmpty (ErrorHandler .createResourceNotFound (normalizedId ));
151
+ .transform (CircuitBreakerOperator .of (circuitBreaker ))
152
+ .timeout (Duration .ofMillis (config .getTimeoutMs ()))
153
+ .subscribeOn (Schedulers .parallel ())
154
+ .transform (this ::validateErrorResult )
155
+ .flatMap (wrapper -> createServerResponse (wrapper , request ))
156
+ .switchIfEmpty (ErrorHandler .createResourceNotFound (normalizedId ));
160
157
}
161
158
162
159
private Mono <ServerResponse > createServerResponse (final PayloadWrapper wrapper , final ServerRequest request ) {
163
-
164
160
if (wrapper .getPayload ().getType ().equals (PayloadType .JSON .toString ())) {
165
- metricsRecorder .markMeterForTag (this .metricTagPrefix ,
166
- MetricsRecorder .MeasurementTag .JSON );
167
- return builder .createResponseMono (request ,
168
- MediaType .APPLICATION_JSON_UTF8 ,
169
- wrapper );
170
- } else if (wrapper .getPayload ()
171
- .getType ()
172
- .equals (PayloadType .XML .toString ())) {
173
- metricsRecorder .markMeterForTag (this .metricTagPrefix ,
174
- MetricsRecorder .MeasurementTag .XML );
175
- return builder .createResponseMono (request ,
176
- MediaType .APPLICATION_XML ,
177
- wrapper );
161
+ metricsRecorder .markMeterForTag (this .metricTagPrefix , MetricsRecorder .MeasurementTag .JSON );
162
+ return builder .createResponseMono (request , MediaType .APPLICATION_JSON_UTF8 , wrapper );
163
+ } else if (wrapper .getPayload ().getType ().equals (PayloadType .XML .toString ())) {
164
+ metricsRecorder .markMeterForTag (this .metricTagPrefix , MetricsRecorder .MeasurementTag .XML );
165
+ return builder .createResponseMono (request , MediaType .APPLICATION_XML , wrapper );
178
166
}
179
167
180
168
return Mono .error (new UnsupportedMediaTypeException (UNSUPPORTED_MEDIATYPE ));
0 commit comments