Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure SuspendRecovery is called only once #1110

Merged
merged 21 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion libs/cluster/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ static class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_UNKNOWN_ENDPOINT => "ERR Unknown endpoint"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_MAKE_REPLICA_WITH_ASSIGNED_SLOTS => "ERR Primary has been assigned slots and cannot be a replica"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK => "ERR Recovery in progress, could not acquire recoverLock"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_ACQUIRE_REPLICATE_LOCK => "ERR Replicate already in progress"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_TAKEOVER_FROM_PRIMARY => "ERR Could not take over from primary"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_REPLICATE_SELF => "ERR Can't replicate myself"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_NOT_ASSIGNED_PRIMARY_ERROR => "ERR Don't have primary"u8;
Expand Down
4 changes: 2 additions & 2 deletions libs/cluster/Server/ClusterManagerWorkerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public bool TryAddReplica(string nodeid, bool force, out ReadOnlySpan<byte> erro

// Transition to recovering state
// Only one caller will succeed in becoming a replica for the provided node-id
if (!clusterProvider.replicationManager.StartRecovery(RecoveryStatus.ClusterReplicate))
if (!clusterProvider.replicationManager.BeginRecovery(RecoveryStatus.ClusterReplicate))
{
logger?.LogError($"{nameof(TryAddReplica)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
errorMessage = CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK;
Expand All @@ -200,7 +200,7 @@ public bool TryAddReplica(string nodeid, bool force, out ReadOnlySpan<byte> erro
break;

// If we reach here then we failed to update config so we need to suspend recovery and retry to update the config
clusterProvider.replicationManager.SuspendRecovery();
clusterProvider.replicationManager.EndRecovery(RecoveryStatus.NoRecovery);
}
FlushConfig();
return true;
Expand Down
13 changes: 6 additions & 7 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void Dispose()

/// <inheritdoc />
public bool IsReplica()
=> clusterManager?.CurrentConfig.LocalNodeRole == NodeRole.REPLICA || replicationManager?.Recovering == true;
=> clusterManager?.CurrentConfig.LocalNodeRole == NodeRole.REPLICA || replicationManager?.IsRecovering == true;

/// <inheritdoc />
public bool IsReplica(string nodeId)
Expand Down Expand Up @@ -236,8 +236,9 @@ public MetricsItem[] GetReplicationInfo()
new("store_current_safe_aof_address", clusterEnabled ? replicationManager.StoreCurrentSafeAofAddress.ToString() : "N/A"),
new("store_recovered_safe_aof_address", clusterEnabled ? replicationManager.StoreRecoveredSafeAofTailAddress.ToString() : "N/A"),
new("object_store_current_safe_aof_address", clusterEnabled && !serverOptions.DisableObjects ? replicationManager.ObjectStoreCurrentSafeAofAddress.ToString() : "N/A"),
new("object_store_recovered_safe_aof_address", clusterEnabled && !serverOptions.DisableObjects ? replicationManager.ObjectStoreRecoveredSafeAofTailAddress.ToString() : "N/A")

new("object_store_recovered_safe_aof_address", clusterEnabled && !serverOptions.DisableObjects ? replicationManager.ObjectStoreRecoveredSafeAofTailAddress.ToString() : "N/A"),
new("recover_status", replicationManager.currentRecoveryStatus.ToString()),
new("last_failover_state", !clusterEnabled ? FailoverUtils.GetFailoverStatus(FailoverStatus.NO_FAILOVER) : failoverManager.GetLastFailoverStatus())
};

if (clusterEnabled)
Expand All @@ -251,16 +252,14 @@ public MetricsItem[] GetReplicationInfo()
replicationInfo.Add(new("master_port", port.ToString()));
replicationInfo.Add(primaryLinkStatus[0]);
replicationInfo.Add(primaryLinkStatus[1]);
replicationInfo.Add(new("master_sync_in_progress", replicationManager.Recovering.ToString()));
replicationInfo.Add(new("master_sync_in_progress", replicationManager.IsRecovering.ToString()));
replicationInfo.Add(new("slave_read_repl_offset", replication_offset));
replicationInfo.Add(new("slave_priority", "100"));
replicationInfo.Add(new("slave_read_only", "1"));
replicationInfo.Add(new("replica_announced", "1"));
replicationInfo.Add(new("master_sync_last_io_seconds_ago", replicationManager.LastPrimarySyncSeconds.ToString()));
replicationInfo.Add(new("replication_offset_lag", replicationOffsetLag.ToString()));
replicationInfo.Add(new("replication_offset_max_lag", storeWrapper.serverOptions.ReplicationOffsetMaxLag.ToString()));
replicationInfo.Add(new("recover_status", replicationManager.recoverStatus.ToString()));
replicationInfo.Add(new("last_failover_state", !clusterEnabled ? FailoverUtils.GetFailoverStatus(FailoverStatus.NO_FAILOVER) : failoverManager.GetLastFailoverStatus()));
}
else
{
Expand Down Expand Up @@ -302,7 +301,7 @@ public RoleInfo GetReplicaInfo()
address = address,
port = port,
replication_offset = replicationManager.ReplicationOffset,
replication_state = replicationManager.Recovering ? "sync" :
replication_state = replicationManager.IsRecovering ? "sync" :
connection.connected ? "connected" : "connect"
};

Expand Down
1 change: 1 addition & 0 deletions libs/cluster/Server/Failover/FailoverManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public bool TryStartReplicaFailover(FailoverOption option, TimeSpan failoverTime
if (!failoverTaskLock.TryWriteLock())
return false;

lastFailoverStatus = FailoverStatus.BEGIN_FAILOVER;
currentFailoverSession = new FailoverSession(
clusterProvider,
option,
Expand Down
8 changes: 4 additions & 4 deletions libs/cluster/Server/Failover/ReplicaFailoverSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,17 @@ private bool TakeOverAsPrimary()
{
// Take over as primary and inform old primary
status = FailoverStatus.TAKING_OVER_AS_PRIMARY;
var acquiredLock = true;
var acquiredLock = false;

try
{
// Make replica syncing unavailable by setting recovery flag
if (!clusterProvider.replicationManager.StartRecovery(RecoveryStatus.ClusterFailover))
if (!clusterProvider.replicationManager.BeginRecovery(RecoveryStatus.ClusterFailover))
{
logger?.LogWarning($"{nameof(TakeOverAsPrimary)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
acquiredLock = false;
return false;
}
acquiredLock = true;
_ = clusterProvider.BumpAndWaitForEpochTransition();

// Take over slots from old primary
Expand All @@ -180,7 +180,7 @@ private bool TakeOverAsPrimary()
finally
{
// Disable recovering as now this node has become a primary or failed in its attempt earlier
if (acquiredLock) clusterProvider.replicationManager.SuspendRecovery();
if (acquiredLock) clusterProvider.replicationManager.EndRecovery(RecoveryStatus.NoRecovery);
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ public async Task BeginAofSync()

logger?.LogSyncMetadata(LogLevel.Trace, "BeginAofSync", replicaSyncMetadata, recoverSyncMetadata);

// Check what happens if we fail after recovery and start AOF stream
ExceptionInjectionHelper.TriggerException(ExceptionInjectionType.Replication_Fail_Before_Background_AOF_Stream_Task_Start);

// We have already added the iterator for the covered address above but replica might request an address
// that is ahead of the covered address so we should start streaming from that address in order not to
// introduce duplicate insertions.
Expand All @@ -275,8 +278,8 @@ public async Task BeginAofSync()
catch (Exception ex)
{
logger?.LogError(ex, "{method}", $"{nameof(ReplicaSyncSession.BeginAofSync)}");
SetStatus(SyncStatus.FAILED, "Failed to begin AOF sync");
AofSyncTask?.Dispose();
SetStatus(SyncStatus.FAILED, ex.Message);
_ = clusterProvider.replicationManager.TryRemoveReplicationTask(AofSyncTask);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Replication/PrimaryOps/PrimarySync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal sealed partial class ReplicationManager : IDisposable
/// <returns></returns>
public bool TryAttachSync(SyncMetadata replicaSyncMetadata, out ReadOnlySpan<byte> errorMessage)
{
errorMessage = null;
errorMessage = [];
if (clusterProvider.serverOptions.ReplicaDisklessSync)
{
if (!replicationSyncManager.AddReplicaSyncSession(replicaSyncMetadata, out var replicaSyncSession))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ public async Task<bool> SendCheckpoint()
}
}

// Check what happens if we fail after recovery and start AOF stream
ExceptionInjectionHelper.TriggerException(ExceptionInjectionType.Replication_Fail_Before_Background_AOF_Stream_Task_Start);

// We have already added the iterator for the covered address above but replica might request an address
// that is ahead of the covered address so we should start streaming from that address in order not to
// introduce duplicate insertions.
Expand All @@ -305,7 +308,7 @@ public async Task<bool> SendCheckpoint()
{
logger?.LogError(ex, "acquiredEntry: {cEntryDump}", localEntry.GetCheckpointEntryDump());
if (aofSyncTaskInfo != null) _ = clusterProvider.replicationManager.TryRemoveReplicationTask(aofSyncTaskInfo);
errorMsg = "ERR " + ex.Message;// this is error sent to remote client
errorMsg = ex.Message;// this is error sent to remote client
return false;
}
finally
Expand Down
6 changes: 5 additions & 1 deletion libs/cluster/Server/Replication/RecoveryStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public enum RecoveryStatus : byte
/// <summary>
/// Recovery at replica of no one
/// </summary>
ReplicaOfNoOne
ReplicaOfNoOne,
/// <summary>
/// Replica has recovered the checkpoint after signal from primary
/// </summary>
CheckpointRecoveredAtReplica,
}
}
56 changes: 37 additions & 19 deletions libs/cluster/Server/Replication/ReplicaOps/ReplicaDisklessSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,53 @@ namespace Garnet.cluster
{
internal sealed partial class ReplicationManager : IDisposable
{
/// <summary>
/// Try to replicate using diskless sync
/// </summary>
/// <param name="session"></param>
/// <param name="nodeId"></param>
/// <param name="background"></param>
/// <param name="force"></param>
/// <param name="tryAddReplica"></param>
/// <param name="errorMessage"></param>
/// <returns></returns>
public bool TryReplicateDisklessSync(
ClusterSession session,
string nodeId,
bool background,
bool force,
bool tryAddReplica,
out ReadOnlySpan<byte> errorMessage)
{
errorMessage = default;
// Ensure two replicate commands do not execute at the same time.
if (!replicateLock.TryWriteLock())
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_REPLICATE_LOCK;
return false;
}

try
{
logger?.LogTrace("CLUSTER REPLICATE {nodeid}", nodeId);
if (!clusterProvider.clusterManager.TryAddReplica(nodeId, force: force, out errorMessage, logger: logger))
{
replicateLock.WriteUnlock();
return false;
}

// Wait for threads to agree configuration change of this node
session.UnsafeBumpAndWaitForEpochTransition();
_ = Task.Run(() => TryBeginReplicaSync());
if (background)
_ = Task.Run(() => TryBeginReplicaSync());
else
{
var result = TryBeginReplicaSync().Result;
if (result != null)
{
errorMessage = Encoding.ASCII.GetBytes(result);
return false;
}
}
}
catch (Exception ex)
{
logger?.LogError(ex, $"{nameof(TryReplicateDisklessSync)}");
replicateLock.WriteUnlock();
}
return true;

async Task TryBeginReplicaSync()
async Task<string> TryBeginReplicaSync()
{
var disklessSync = clusterProvider.serverOptions.ReplicaDisklessSync;
var disableObjects = clusterProvider.serverOptions.DisableObjects;
Expand Down Expand Up @@ -84,7 +96,7 @@ async Task TryBeginReplicaSync()
{
var errorMsg = Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_NOT_ASSIGNED_PRIMARY_ERROR);
logger?.LogError("{msg}", errorMsg);
return;
return errorMsg;
}

gcs = new(
Expand Down Expand Up @@ -118,23 +130,23 @@ async Task TryBeginReplicaSync()
{
logger?.LogError(ex, $"{nameof(TryBeginReplicaSync)}");
clusterProvider.clusterManager.TryResetReplica();
SuspendRecovery();
return ex.Message;
}
finally
{
replicateLock.WriteUnlock();
EndRecovery(RecoveryStatus.NoRecovery);
gcs?.Dispose();
recvCheckpointHandler?.Dispose();
}
return null;
}

return true;
}

public long ReplicaRecoverDiskless(SyncMetadata primarySyncMetadata)
public long ReplicaRecoverDiskless(SyncMetadata primarySyncMetadata, out ReadOnlySpan<byte> errorMessage)
{
try
{
errorMessage = [];
logger?.LogSyncMetadata(LogLevel.Trace, nameof(ReplicaRecoverDiskless), primarySyncMetadata);

var aofBeginAddress = primarySyncMetadata.currentAofBeginAddress;
Expand Down Expand Up @@ -162,10 +174,16 @@ public long ReplicaRecoverDiskless(SyncMetadata primarySyncMetadata)
ReplicationOffset = replicationOffset;
return ReplicationOffset;
}
catch (Exception ex)
{
logger?.LogError(ex, $"{nameof(ReplicaRecoverDiskless)}");
errorMessage = Encoding.ASCII.GetBytes(ex.Message);
return -1;
}
finally
{
// Done with recovery at this point
SuspendRecovery();
EndRecovery(RecoveryStatus.CheckpointRecoveredAtReplica);
}
}
}
Expand Down
Loading