diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java index c2bb0c6cd05..3065a305266 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java @@ -99,7 +99,6 @@ import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; @@ -113,7 +112,6 @@ @ExtendWith(SystemPropertiesExtension.class) @Timeout(60) // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test after the switching to zone-based replication -@Disabled("https://issues.apache.org/jira/browse/IGNITE-23252") @WithSystemProperty(key = FEATURE_FLAG_NAME, value = "true") public class ItReplicaLifecycleTest extends IgniteAbstractTest { private static final int NODE_COUNT = 3; @@ -185,10 +183,11 @@ private void startNodes( int amount, @Nullable InvokeInterceptor invokeInterceptor ) throws NodeStoppingException, InterruptedException { - IntStream.range(0, amount) - .mapToObj(i -> newNode(testInfo, i, invokeInterceptor)) - .parallel() - .forEach(Node::start); + IntStream.range(0, amount).forEach(i -> newNode(testInfo, i, invokeInterceptor)); + + assert nodes.size() == amount : "Not all amount of nodes were created."; + + nodes.values().stream().parallel().forEach(Node::start); Node node0 = getNode(0); @@ -327,7 +326,6 @@ public void testZoneReplicaListener(TestInfo testInfo) throws Exception { } @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-22944") void testAlterReplicaTrigger(TestInfo testInfo) throws Exception { startNodes(testInfo, 3); diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEventParameters.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEventParameters.java index 57889857a5d..317ea9e935d 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEventParameters.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEventParameters.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.partition.replicator; -import org.apache.ignite.internal.event.EventParameters; +import org.apache.ignite.internal.event.CausalEventParameters; import org.apache.ignite.internal.replicator.ZonePartitionId; /** * Parameters for the events about zone partition replicas produced by {@link PartitionReplicaLifecycleManager}. */ -public class LocalPartitionReplicaEventParameters implements EventParameters { +public class LocalPartitionReplicaEventParameters extends CausalEventParameters { /** Zone partition id. */ private final ZonePartitionId zonePartitionId; @@ -31,8 +31,10 @@ public class LocalPartitionReplicaEventParameters implements EventParameters { * Constructor. * * @param zonePartitionId Zone partition id. + * @param revision Event's revision. */ - public LocalPartitionReplicaEventParameters(ZonePartitionId zonePartitionId) { + public LocalPartitionReplicaEventParameters(ZonePartitionId zonePartitionId, long revision) { + super(revision); this.zonePartitionId = zonePartitionId; } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java index 98ef59118f6..3d3ec80630a 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java @@ -412,11 +412,15 @@ private CompletableFuture calculateZoneAssignmentsAndCreateReplicationNode return getOrCreateAssignments(zoneDescriptor, causalityToken, catalogVersion) .thenCompose(assignments -> writeZoneAssignmentsToMetastore(zoneId, assignments)) - .thenCompose(assignments -> createZoneReplicationNodes(zoneId, assignments)); + .thenCompose(assignments -> createZoneReplicationNodes(zoneId, assignments, causalityToken)); }); } - private CompletableFuture createZoneReplicationNodes(int zoneId, List assignments) { + private CompletableFuture createZoneReplicationNodes( + int zoneId, + List assignments, + long revision + ) { return inBusyLockAsync(busyLock, () -> { assert assignments != null : IgniteStringFormatter.format("Zone has empty assignments [id={}].", zoneId); @@ -427,7 +431,13 @@ private CompletableFuture createZoneReplicationNodes(int zoneId, List createZoneReplicationNodes(int zoneId, List createZonePartitionReplicationNode( int zoneId, int partId, @Nullable Assignment localMemberAssignment, - Assignments stableAssignments + Assignments stableAssignments, + long revision ) { if (localMemberAssignment == null) { return nullCompletedFuture(); @@ -490,7 +502,8 @@ private CompletableFuture createZonePartitionReplicationNode( replicationGroupIds.add(replicaGrpId); var eventParams = new LocalPartitionReplicaEventParameters( - new ZonePartitionId(replicaGrpId.zoneId(), replicaGrpId.partitionId()) + new ZonePartitionId(replicaGrpId.zoneId(), replicaGrpId.partitionId()), + revision ); return fireEvent(LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED, eventParams); @@ -835,7 +848,8 @@ private CompletableFuture handleChangeStableAssignmentEvent( zonePartitionId, stableAssignments, pendingAssignments, - isRecovery + isRecovery, + revision ); }, ioExecutor).thenCompose(identity()); } @@ -866,7 +880,8 @@ private CompletableFuture stopAndDestroyPartitionAndUpdateClients( ZonePartitionId zonePartitionId, Set stableAssignments, Assignments pendingAssignments, - boolean isRecovery + boolean isRecovery, + long revision ) { CompletableFuture clientUpdateFuture = isRecovery // Updating clients is not needed on recovery. @@ -880,14 +895,14 @@ private CompletableFuture stopAndDestroyPartitionAndUpdateClients( .noneMatch(assignment -> assignment.consistentId().equals(localNode().name())); if (shouldStopLocalServices) { - return clientUpdateFuture.thenCompose(v -> stopAndDestroyPartition(zonePartitionId)); + return clientUpdateFuture.thenCompose(v -> stopAndDestroyPartition(zonePartitionId, revision)); } else { return clientUpdateFuture; } } - private CompletableFuture stopAndDestroyPartition(ZonePartitionId zonePartitionId) { - return weakStopPartition(zonePartitionId); + private CompletableFuture stopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) { + return weakStopPartition(zonePartitionId, revision); } private CompletableFuture handleChangePendingAssignmentEvent(Entry pendingAssignmentsEntry, long revision) { @@ -996,7 +1011,8 @@ private CompletableFuture handleChangePendingAssignmentEvent( zoneId, partitionId, localMemberAssignment, - computedStableAssignments + computedStableAssignments, + revision ); } else if (pendingAssignmentsAreForced && localMemberAssignment != null) { localServicesStartFuture = runAsync(() -> { @@ -1194,11 +1210,11 @@ public CompletableFuture stopAsync(ComponentContext componentContext) { return Assignments.fromBytes(entry.value()); } - private CompletableFuture weakStopPartition(ZonePartitionId zonePartitionId) { + private CompletableFuture weakStopPartition(ZonePartitionId zonePartitionId, long revision) { return replicaMgr.weakStopReplica( zonePartitionId, WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS, - () -> stopPartition(zonePartitionId).thenAccept(v -> {}) + () -> stopPartition(zonePartitionId, revision).thenAccept(v -> {}) ); } @@ -1208,7 +1224,7 @@ private CompletableFuture weakStopPartition(ZonePartitionId zonePartitionI * @param zonePartitionId Partition ID. * @return Future that will be completed after all resources have been closed. */ - private CompletableFuture stopPartition(ZonePartitionId zonePartitionId) { + private CompletableFuture stopPartition(ZonePartitionId zonePartitionId, long revision) { return executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> { try { return replicaMgr.stopReplica(zonePartitionId) @@ -1218,7 +1234,7 @@ private CompletableFuture stopPartition(ZonePartitionId zonePartitionId) { return fireEvent( LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, - new LocalPartitionReplicaEventParameters(zonePartitionId) + new LocalPartitionReplicaEventParameters(zonePartitionId, revision) ); } else { return nullCompletedFuture(); @@ -1237,14 +1253,17 @@ private CompletableFuture stopPartition(ZonePartitionId zonePartitionId) { * @param partitionIds Partitions to stop. */ private void cleanUpPartitionsResources(Set partitionIds) { + // TODO: Due to IGNITE-23741 we shouldn't destroy partitions on node stop thus the revision will be removed. + long revision = catalogMgr.latestCatalogVersion(); + CompletableFuture[] stopPartitionsFuture = partitionIds.stream() - .map(this::stopPartition) + .map(partId -> stopPartition(partId, revision)) .toArray(CompletableFuture[]::new); try { allOf(stopPartitionsFuture).get(30, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.error("Unable to clean zones resources", e); + LOG.error("Unable to clean up zones resources", e); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 05aefd14be3..781a52720b6 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -754,7 +754,10 @@ private CompletableFuture onZoneReplicaStopped(LocalPartitionReplicaEve return mvGc.removeStorage(tablePartitionId) .thenComposeAsync( - v -> inBusyLockAsync(busyLock, () -> destroyPartitionStorages(tablePartitionId, table)), + v -> inBusyLockAsync(busyLock, () -> weakStopAndDestroyPartition( + tablePartitionId, + parameters.causalityToken()) + ), ioExecutor ); })