Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23252 ItReplicaLifecycleTest is unstable #4956

Merged
merged 15 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
Expand All @@ -113,7 +112,6 @@
@ExtendWith(SystemPropertiesExtension.class)
@Timeout(60)
// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test after the switching to zone-based replication
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23252")
@WithSystemProperty(key = FEATURE_FLAG_NAME, value = "true")
public class ItReplicaLifecycleTest extends IgniteAbstractTest {
private static final int NODE_COUNT = 3;
Expand Down Expand Up @@ -185,10 +183,11 @@ private void startNodes(
int amount,
@Nullable InvokeInterceptor invokeInterceptor
) throws NodeStoppingException, InterruptedException {
IntStream.range(0, amount)
.mapToObj(i -> newNode(testInfo, i, invokeInterceptor))
.parallel()
.forEach(Node::start);
IntStream.range(0, amount).forEach(i -> newNode(testInfo, i, invokeInterceptor));

assert nodes.size() == amount : "Not all amount of nodes were created.";

nodes.values().stream().parallel().forEach(Node::start);

Node node0 = getNode(0);

Expand Down Expand Up @@ -327,7 +326,6 @@ public void testZoneReplicaListener(TestInfo testInfo) throws Exception {
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-22944")
void testAlterReplicaTrigger(TestInfo testInfo) throws Exception {
startNodes(testInfo, 3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@

package org.apache.ignite.internal.partition.replicator;

import org.apache.ignite.internal.event.EventParameters;
import org.apache.ignite.internal.event.CausalEventParameters;
import org.apache.ignite.internal.replicator.ZonePartitionId;

/**
* Parameters for the events about zone partition replicas produced by {@link PartitionReplicaLifecycleManager}.
*/
public class LocalPartitionReplicaEventParameters implements EventParameters {
public class LocalPartitionReplicaEventParameters extends CausalEventParameters {
/** Zone partition id. */
private final ZonePartitionId zonePartitionId;

/**
* Constructor.
*
* @param zonePartitionId Zone partition id.
* @param revision Event's revision.
*/
public LocalPartitionReplicaEventParameters(ZonePartitionId zonePartitionId) {
public LocalPartitionReplicaEventParameters(ZonePartitionId zonePartitionId, long revision) {
super(revision);
this.zonePartitionId = zonePartitionId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,15 @@ private CompletableFuture<Void> calculateZoneAssignmentsAndCreateReplicationNode

return getOrCreateAssignments(zoneDescriptor, causalityToken, catalogVersion)
.thenCompose(assignments -> writeZoneAssignmentsToMetastore(zoneId, assignments))
.thenCompose(assignments -> createZoneReplicationNodes(zoneId, assignments));
.thenCompose(assignments -> createZoneReplicationNodes(zoneId, assignments, causalityToken));
});
}

private CompletableFuture<Void> createZoneReplicationNodes(int zoneId, List<Assignments> assignments) {
private CompletableFuture<Void> createZoneReplicationNodes(
int zoneId,
List<Assignments> assignments,
long revision
) {
return inBusyLockAsync(busyLock, () -> {
assert assignments != null : IgniteStringFormatter.format("Zone has empty assignments [id={}].", zoneId);

Expand All @@ -427,7 +431,13 @@ private CompletableFuture<Void> createZoneReplicationNodes(int zoneId, List<Assi

Assignment localMemberAssignment = localMemberAssignment(zoneAssignment);

partitionsStartFutures[partId] = createZonePartitionReplicationNode(zoneId, partId, localMemberAssignment, zoneAssignment);
partitionsStartFutures[partId] = createZonePartitionReplicationNode(
zoneId,
partId,
localMemberAssignment,
zoneAssignment,
revision
);
}

return allOf(partitionsStartFutures);
Expand All @@ -442,13 +452,15 @@ private CompletableFuture<Void> createZoneReplicationNodes(int zoneId, List<Assi
* @param partId Partition id.
* @param localMemberAssignment Assignment of the local member, or null if local member is not part of the assignment.
* @param stableAssignments Stable assignments.
* @param revision Event's revision.
* @return Future that completes when a replica is started.
*/
private CompletableFuture<Void> createZonePartitionReplicationNode(
int zoneId,
int partId,
@Nullable Assignment localMemberAssignment,
Assignments stableAssignments
Assignments stableAssignments,
long revision
) {
if (localMemberAssignment == null) {
return nullCompletedFuture();
Expand Down Expand Up @@ -490,7 +502,8 @@ private CompletableFuture<Void> createZonePartitionReplicationNode(
replicationGroupIds.add(replicaGrpId);

var eventParams = new LocalPartitionReplicaEventParameters(
new ZonePartitionId(replicaGrpId.zoneId(), replicaGrpId.partitionId())
new ZonePartitionId(replicaGrpId.zoneId(), replicaGrpId.partitionId()),
revision
);

return fireEvent(LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED, eventParams);
Expand Down Expand Up @@ -835,7 +848,8 @@ private CompletableFuture<Void> handleChangeStableAssignmentEvent(
zonePartitionId,
stableAssignments,
pendingAssignments,
isRecovery
isRecovery,
revision
);
}, ioExecutor).thenCompose(identity());
}
Expand Down Expand Up @@ -866,7 +880,8 @@ private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients(
ZonePartitionId zonePartitionId,
Set<Assignment> stableAssignments,
Assignments pendingAssignments,
boolean isRecovery
boolean isRecovery,
long revision
) {
CompletableFuture<Void> clientUpdateFuture = isRecovery
// Updating clients is not needed on recovery.
Expand All @@ -880,14 +895,14 @@ private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients(
.noneMatch(assignment -> assignment.consistentId().equals(localNode().name()));

if (shouldStopLocalServices) {
return clientUpdateFuture.thenCompose(v -> stopAndDestroyPartition(zonePartitionId));
return clientUpdateFuture.thenCompose(v -> stopAndDestroyPartition(zonePartitionId, revision));
} else {
return clientUpdateFuture;
}
}

private CompletableFuture<Void> stopAndDestroyPartition(ZonePartitionId zonePartitionId) {
return weakStopPartition(zonePartitionId);
private CompletableFuture<Void> stopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) {
return weakStopPartition(zonePartitionId, revision);
}

private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry pendingAssignmentsEntry, long revision) {
Expand Down Expand Up @@ -996,7 +1011,8 @@ private CompletableFuture<Void> handleChangePendingAssignmentEvent(
zoneId,
partitionId,
localMemberAssignment,
computedStableAssignments
computedStableAssignments,
revision
);
} else if (pendingAssignmentsAreForced && localMemberAssignment != null) {
localServicesStartFuture = runAsync(() -> {
Expand Down Expand Up @@ -1194,11 +1210,11 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
return Assignments.fromBytes(entry.value());
}

private CompletableFuture<Void> weakStopPartition(ZonePartitionId zonePartitionId) {
private CompletableFuture<Void> weakStopPartition(ZonePartitionId zonePartitionId, long revision) {
return replicaMgr.weakStopReplica(
zonePartitionId,
WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS,
() -> stopPartition(zonePartitionId).thenAccept(v -> {})
() -> stopPartition(zonePartitionId, revision).thenAccept(v -> {})
);
}

Expand All @@ -1208,7 +1224,7 @@ private CompletableFuture<Void> weakStopPartition(ZonePartitionId zonePartitionI
* @param zonePartitionId Partition ID.
* @return Future that will be completed after all resources have been closed.
*/
private CompletableFuture<?> stopPartition(ZonePartitionId zonePartitionId) {
private CompletableFuture<?> stopPartition(ZonePartitionId zonePartitionId, long revision) {
return executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> {
try {
return replicaMgr.stopReplica(zonePartitionId)
Expand All @@ -1218,7 +1234,7 @@ private CompletableFuture<?> stopPartition(ZonePartitionId zonePartitionId) {

return fireEvent(
LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED,
new LocalPartitionReplicaEventParameters(zonePartitionId)
new LocalPartitionReplicaEventParameters(zonePartitionId, revision)
);
} else {
return nullCompletedFuture();
Expand All @@ -1237,14 +1253,17 @@ private CompletableFuture<?> stopPartition(ZonePartitionId zonePartitionId) {
* @param partitionIds Partitions to stop.
*/
private void cleanUpPartitionsResources(Set<ZonePartitionId> partitionIds) {
// TODO: Due to IGNITE-23741 we shouldn't destroy partitions on node stop thus the revision will be removed.
long revision = catalogMgr.latestCatalogVersion();

CompletableFuture<?>[] stopPartitionsFuture = partitionIds.stream()
.map(this::stopPartition)
.map(partId -> stopPartition(partId, revision))
.toArray(CompletableFuture[]::new);

try {
allOf(stopPartitionsFuture).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Unable to clean zones resources", e);
LOG.error("Unable to clean up zones resources", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,10 @@ private CompletableFuture<Boolean> onZoneReplicaStopped(LocalPartitionReplicaEve

return mvGc.removeStorage(tablePartitionId)
.thenComposeAsync(
v -> inBusyLockAsync(busyLock, () -> destroyPartitionStorages(tablePartitionId, table)),
v -> inBusyLockAsync(busyLock, () -> weakStopAndDestroyPartition(
tablePartitionId,
parameters.causalityToken())
),
ioExecutor
);
})
Expand Down