|
12 | 12 | import org.elasticsearch.action.ActionListener;
|
13 | 13 | import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
|
14 | 14 | import org.elasticsearch.action.support.ActionFilters;
|
| 15 | +import org.elasticsearch.action.support.ActionTestUtils; |
15 | 16 | import org.elasticsearch.action.support.SubscribableListener;
|
16 | 17 | import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
17 | 18 | import org.elasticsearch.client.internal.node.NodeClient;
|
18 | 19 | import org.elasticsearch.cluster.ClusterChangedEvent;
|
19 | 20 | import org.elasticsearch.cluster.ClusterName;
|
20 | 21 | import org.elasticsearch.cluster.ClusterState;
|
| 22 | +import org.elasticsearch.cluster.ClusterStateUpdateTask; |
21 | 23 | import org.elasticsearch.cluster.metadata.IndexMetadata;
|
22 | 24 | import org.elasticsearch.cluster.metadata.Metadata;
|
23 | 25 | import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
|
|
26 | 28 | import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
27 | 29 | import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
|
28 | 30 | import org.elasticsearch.cluster.service.ClusterService;
|
| 31 | +import org.elasticsearch.common.Strings; |
29 | 32 | import org.elasticsearch.common.UUIDs;
|
30 | 33 | import org.elasticsearch.common.blobstore.BlobPath;
|
31 | 34 | import org.elasticsearch.common.blobstore.BlobStore;
|
|
42 | 45 | import org.elasticsearch.index.store.Store;
|
43 | 46 | import org.elasticsearch.indices.recovery.RecoverySettings;
|
44 | 47 | import org.elasticsearch.indices.recovery.RecoveryState;
|
| 48 | +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; |
45 | 49 | import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
|
46 | 50 | import org.elasticsearch.snapshots.SnapshotId;
|
47 | 51 | import org.elasticsearch.snapshots.SnapshotInfo;
|
@@ -377,6 +381,103 @@ public void testRegisterRepositorySuccessAfterCreationFailed() {
|
377 | 381 | assertThat(repositoriesService.repository(repoName), isA(TestRepository.class));
|
378 | 382 | }
|
379 | 383 |
|
| 384 | + public void testCannotSetRepositoryReadonlyFlagDuringGenerationChange() { |
| 385 | + final var repoName = randomAlphaOfLengthBetween(10, 25); |
| 386 | + final long originalGeneration = randomFrom(RepositoryData.EMPTY_REPO_GEN, 0L, 1L, randomLongBetween(2, Long.MAX_VALUE - 1)); |
| 387 | + final long newGeneration = originalGeneration + 1; |
| 388 | + |
| 389 | + safeAwait( |
| 390 | + SubscribableListener |
| 391 | + |
| 392 | + .newForked( |
| 393 | + l -> repositoriesService.registerRepository( |
| 394 | + new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE), |
| 395 | + l.map(ignored -> null) |
| 396 | + ) |
| 397 | + ) |
| 398 | + .andThen(l -> updateGenerations(repoName, originalGeneration, newGeneration, l)) |
| 399 | + .andThenAccept(ignored -> { |
| 400 | + final var metadata = repositoriesService.repository(repoName).getMetadata(); |
| 401 | + assertEquals(originalGeneration, metadata.generation()); |
| 402 | + assertEquals(newGeneration, metadata.pendingGeneration()); |
| 403 | + assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); |
| 404 | + }) |
| 405 | + .andThen( |
| 406 | + l -> repositoriesService.registerRepository( |
| 407 | + new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE) |
| 408 | + .settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)), |
| 409 | + ActionTestUtils.assertNoSuccessListener(e -> { |
| 410 | + assertEquals( |
| 411 | + Strings.format( |
| 412 | + """ |
| 413 | + [%s] trying to modify or unregister repository that is currently used \ |
| 414 | + (currently updating root blob generation from [%d] to [%d], cannot update readonly flag)""", |
| 415 | + repoName, |
| 416 | + originalGeneration, |
| 417 | + newGeneration |
| 418 | + ), |
| 419 | + asInstanceOf(RepositoryConflictException.class, e).getMessage() |
| 420 | + ); |
| 421 | + l.onResponse(null); |
| 422 | + }) |
| 423 | + ) |
| 424 | + ) |
| 425 | + .andThenAccept(ignored -> { |
| 426 | + final var metadata = repositoriesService.repository(repoName).getMetadata(); |
| 427 | + assertEquals(originalGeneration, metadata.generation()); |
| 428 | + assertEquals(newGeneration, metadata.pendingGeneration()); |
| 429 | + assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); |
| 430 | + }) |
| 431 | + .andThen(l -> updateGenerations(repoName, newGeneration, newGeneration, l)) |
| 432 | + .andThenAccept(ignored -> { |
| 433 | + final var metadata = repositoriesService.repository(repoName).getMetadata(); |
| 434 | + assertEquals(newGeneration, metadata.generation()); |
| 435 | + assertEquals(newGeneration, metadata.pendingGeneration()); |
| 436 | + assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); |
| 437 | + }) |
| 438 | + .andThen( |
| 439 | + l -> repositoriesService.registerRepository( |
| 440 | + new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE) |
| 441 | + .settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)), |
| 442 | + l.map(ignored -> null) |
| 443 | + ) |
| 444 | + ) |
| 445 | + .andThenAccept( |
| 446 | + ignored -> assertTrue( |
| 447 | + repositoriesService.repository(repoName) |
| 448 | + .getMetadata() |
| 449 | + .settings() |
| 450 | + .getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null) |
| 451 | + ) |
| 452 | + ) |
| 453 | + ); |
| 454 | + } |
| 455 | + |
| 456 | + private void updateGenerations(String repositoryName, long safeGeneration, long pendingGeneration, ActionListener<?> listener) { |
| 457 | + clusterService.submitUnbatchedStateUpdateTask("update repo generations", new ClusterStateUpdateTask() { |
| 458 | + @Override |
| 459 | + public ClusterState execute(ClusterState currentState) { |
| 460 | + return new ClusterState.Builder(currentState).metadata( |
| 461 | + Metadata.builder(currentState.metadata()) |
| 462 | + .putCustom( |
| 463 | + RepositoriesMetadata.TYPE, |
| 464 | + RepositoriesMetadata.get(currentState).withUpdatedGeneration(repositoryName, safeGeneration, pendingGeneration) |
| 465 | + ) |
| 466 | + ).build(); |
| 467 | + } |
| 468 | + |
| 469 | + @Override |
| 470 | + public void onFailure(Exception e) { |
| 471 | + listener.onFailure(e); |
| 472 | + } |
| 473 | + |
| 474 | + @Override |
| 475 | + public void clusterStateProcessed(ClusterState initialState, ClusterState newState) { |
| 476 | + listener.onResponse(null); |
| 477 | + } |
| 478 | + }); |
| 479 | + } |
| 480 | + |
380 | 481 | private ClusterState createClusterStateWithRepo(String repoName, String repoType) {
|
381 | 482 | ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
|
382 | 483 | Metadata.Builder mdBuilder = Metadata.builder();
|
@@ -406,7 +507,7 @@ private void assertThrowsOnRegister(String repoName) {
|
406 | 507 | private static class TestRepository implements Repository {
|
407 | 508 |
|
408 | 509 | private static final String TYPE = "internal";
|
409 |
| - private final RepositoryMetadata metadata; |
| 510 | + private RepositoryMetadata metadata; |
410 | 511 | private boolean isClosed;
|
411 | 512 | private boolean isStarted;
|
412 | 513 |
|
@@ -514,7 +615,9 @@ public IndexShardSnapshotStatus.Copy getShardSnapshotStatus(SnapshotId snapshotI
|
514 | 615 | }
|
515 | 616 |
|
516 | 617 | @Override
|
517 |
| - public void updateState(final ClusterState state) {} |
| 618 | + public void updateState(final ClusterState state) { |
| 619 | + metadata = RepositoriesMetadata.get(state).repository(metadata.name()); |
| 620 | + } |
518 | 621 |
|
519 | 622 | @Override
|
520 | 623 | public void cloneShardSnapshot(
|
|
0 commit comments