|
19 | 19 | import io.airlift.testing.TestingTicker; |
20 | 20 | import io.trino.hive.thrift.metastore.Table; |
21 | 21 | import org.apache.thrift.TException; |
| 22 | +import org.apache.thrift.transport.TTransportException; |
22 | 23 | import org.junit.jupiter.api.Test; |
23 | 24 |
|
24 | 25 | import java.net.SocketTimeoutException; |
| 26 | +import java.net.URI; |
| 27 | +import java.util.HashMap; |
25 | 28 | import java.util.Map; |
26 | 29 | import java.util.Optional; |
27 | 30 |
|
@@ -183,6 +186,27 @@ public void testReturnsToDefaultClientAfterErrorOnFallback() |
183 | 186 | assertEqualHiveClient(metastoreClient3, DEFAULT_CLIENT); |
184 | 187 | } |
185 | 188 |
|
| 189 | + @Test |
| 190 | + public void testBackoffAppliedOnConnectionFailure() |
| 191 | + throws TException |
| 192 | + { |
| 193 | + ConnectionCountingThriftMetastoreClientFactory clientFactory = new ConnectionCountingThriftMetastoreClientFactory( |
| 194 | + ImmutableMap.of(DEFAULT_URI, Optional.empty(), FALLBACK_URI, Optional.of(FALLBACK_CLIENT))); |
| 195 | + StaticMetastoreConfig config = new StaticMetastoreConfig().setMetastoreUris(ImmutableList.of(DEFAULT_URI, FALLBACK_URI)); |
| 196 | + StaticTokenAwareMetastoreClientFactory metastoreClientFactory = new StaticTokenAwareMetastoreClientFactory( |
| 197 | + config, new ThriftMetastoreAuthenticationConfig(), clientFactory, new TestingTicker()); |
| 198 | + |
| 199 | + assertEqualHiveClient(metastoreClientFactory.createMetastoreClient(Optional.empty()), FALLBACK_CLIENT); |
| 200 | + assertThat(clientFactory.connectionAttempts()).containsExactlyInAnyOrderEntriesOf(ImmutableMap.of( |
| 201 | + URI.create(DEFAULT_URI), 1, |
| 202 | + URI.create(FALLBACK_URI), 1)); |
| 203 | + clientFactory.resetConnectionAttempts(); |
| 204 | + assertEqualHiveClient(metastoreClientFactory.createMetastoreClient(Optional.empty()), FALLBACK_CLIENT); |
| 205 | + // The previous connection failure on DEFAULT_URI is not attempted first again on the second call |
| 206 | + assertThat(clientFactory.connectionAttempts()).containsExactlyInAnyOrderEntriesOf(ImmutableMap.of( |
| 207 | + URI.create(FALLBACK_URI), 1)); |
| 208 | + } |
| 209 | + |
186 | 210 | private static void assertGetTableException(ThriftMetastoreClient client) |
187 | 211 | { |
188 | 212 | assertThatThrownBy(() -> client.getTable("foo", "bar")) |
@@ -230,4 +254,34 @@ private static void assertEqualHiveClient(ThriftMetastoreClient actual, ThriftMe |
230 | 254 | } |
231 | 255 | assertThat(actual).isEqualTo(expected); |
232 | 256 | } |
| 257 | + |
| 258 | + private static class ConnectionCountingThriftMetastoreClientFactory |
| 259 | + implements ThriftMetastoreClientFactory |
| 260 | + { |
| 261 | + private final ThriftMetastoreClientFactory delegate; |
| 262 | + private final Map<URI, Integer> attempts = new HashMap<>(); |
| 263 | + |
| 264 | + public ConnectionCountingThriftMetastoreClientFactory(Map<String, Optional<ThriftMetastoreClient>> clients) |
| 265 | + { |
| 266 | + this.delegate = new MockThriftMetastoreClientFactory(clients); |
| 267 | + } |
| 268 | + |
| 269 | + @Override |
| 270 | + public ThriftMetastoreClient create(URI uri, Optional<String> delegationToken) |
| 271 | + throws TTransportException |
| 272 | + { |
| 273 | + attempts.merge(uri, 1, Integer::sum); |
| 274 | + return delegate.create(uri, delegationToken); |
| 275 | + } |
| 276 | + |
| 277 | + public void resetConnectionAttempts() |
| 278 | + { |
| 279 | + attempts.clear(); |
| 280 | + } |
| 281 | + |
| 282 | + public Map<URI, Integer> connectionAttempts() |
| 283 | + { |
| 284 | + return ImmutableMap.copyOf(attempts); |
| 285 | + } |
| 286 | + } |
233 | 287 | } |
0 commit comments