Skip to content

Commit e8169d5

Browse files
committed
wip
1 parent d84143a commit e8169d5

File tree

7 files changed

+31
-16
lines changed

7 files changed

+31
-16
lines changed

libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicaSyncSession.cs

+4-1
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ public async Task BeginAofSync()
264264

265265
logger?.LogSyncMetadata(LogLevel.Trace, "BeginAofSync", replicaSyncMetadata, recoverSyncMetadata);
266266

267+
// Check what happens if we fail after recovery and start AOF stream
268+
ExceptionScenarioHelper.TriggerException(ExceptionScenario.FAIL_RIGHT_BEFORE_AOF_STREAM_STARTS);
269+
267270
// We have already added the iterator for the covered address above but replica might request an address
268271
// that is ahead of the covered address so we should start streaming from that address in order not to
269272
// introduce duplicate insertions.
@@ -275,7 +278,7 @@ public async Task BeginAofSync()
275278
catch (Exception ex)
276279
{
277280
logger?.LogError(ex, "{method}", $"{nameof(ReplicaSyncSession.BeginAofSync)}");
278-
SetStatus(SyncStatus.FAILED, "Failed to begin AOF sync");
281+
SetStatus(SyncStatus.FAILED, ex.Message);
279282
AofSyncTask?.Dispose();
280283
}
281284
}

libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ public async Task<bool> SendCheckpoint()
308308
{
309309
logger?.LogError(ex, "acquiredEntry: {cEntryDump}", localEntry.GetCheckpointEntryDump());
310310
if (aofSyncTaskInfo != null) _ = clusterProvider.replicationManager.TryRemoveReplicationTask(aofSyncTaskInfo);
311-
errorMsg = "ERR " + ex.Message;// this is error sent to remote client
311+
errorMsg = ex.Message;// this is error sent to remote client
312312
return false;
313313
}
314314
finally

libs/cluster/Server/Replication/ReplicaOps/ReplicaDisklessSync.cs

+16-3
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,26 @@ public bool TryReplicateDisklessSync(
3838

3939
// Wait for threads to agree configuration change of this node
4040
session.UnsafeBumpAndWaitForEpochTransition();
41-
_ = Task.Run(() => TryBeginReplicaSync());
41+
42+
if (background)
43+
_ = Task.Run(() => TryBeginReplicaSync());
44+
else
45+
{
46+
var resp = TryBeginReplicaSync().GetAwaiter().GetResult();
47+
if (resp != null)
48+
{
49+
errorMessage = Encoding.ASCII.GetBytes(resp);
50+
return false;
51+
}
52+
}
4253
}
4354
catch (Exception ex)
4455
{
4556
logger?.LogError(ex, $"{nameof(TryReplicateDisklessSync)}");
4657
replicateLock.WriteUnlock();
4758
}
4859

49-
async Task TryBeginReplicaSync()
60+
async Task<string> TryBeginReplicaSync()
5061
{
5162
var disklessSync = clusterProvider.serverOptions.ReplicaDisklessSync;
5263
var disableObjects = clusterProvider.serverOptions.DisableObjects;
@@ -84,7 +95,7 @@ async Task TryBeginReplicaSync()
8495
{
8596
var errorMsg = Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_NOT_ASSIGNED_PRIMARY_ERROR);
8697
logger?.LogError("{msg}", errorMsg);
87-
return;
98+
return errorMsg;
8899
}
89100

90101
gcs = new(
@@ -118,6 +129,7 @@ async Task TryBeginReplicaSync()
118129
{
119130
logger?.LogError(ex, $"{nameof(TryBeginReplicaSync)}");
120131
clusterProvider.clusterManager.TryResetReplica();
132+
return ex.Message;
121133
}
122134
finally
123135
{
@@ -126,6 +138,7 @@ async Task TryBeginReplicaSync()
126138
gcs?.Dispose();
127139
recvCheckpointHandler?.Dispose();
128140
}
141+
return null;
129142
}
130143

131144
return true;

libs/cluster/Session/RespClusterReplicationCommands.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ private bool NetworkClusterReplicate(out bool invalidParameters)
5959
return true;
6060
}
6161

62-
var background = false;
62+
var background = true;
6363
var nodeId = parseState.GetString(0);
6464

6565
if (parseState.Count > 1)

test/Garnet.test.cluster/ClusterTestContext.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public void TearDown()
6565
waiter?.Dispose();
6666
clusterTestUtils?.Dispose();
6767
loggerFactory?.Dispose();
68-
var timeoutSeconds = 5;
68+
var timeoutSeconds = 60;
6969
if (!Task.Run(() => DisposeCluster()).Wait(TimeSpan.FromSeconds(timeoutSeconds)))
7070
logger?.LogError("Timed out waiting for DisposeCluster");
7171
if (!Task.Run(() => TestUtils.DeleteDirectory(TestFolder, true)).Wait(TimeSpan.FromSeconds(timeoutSeconds)))

test/Garnet.test.cluster/ClusterTestUtils.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1879,7 +1879,7 @@ public List<SlotItem> ClusterSlots(IPEndPoint endPoint, ILogger logger = null)
18791879
}
18801880
}
18811881

1882-
public string ClusterReplicate(int replicaNodeIndex, int primaryNodeIndex, bool failEx = true, ILogger logger = null)
1882+
public string ClusterReplicate(int replicaNodeIndex, int primaryNodeIndex, bool async = false, bool failEx = true, ILogger logger = null)
18831883
{
18841884
var primaryId = ClusterMyId(primaryNodeIndex, logger: logger);
18851885
return ClusterReplicate(replicaNodeIndex, primaryId, failEx: failEx, logger: logger);
@@ -1893,7 +1893,7 @@ public string ClusterReplicate(IPEndPoint endPoint, string primaryNodeId, bool a
18931893
try
18941894
{
18951895
var server = redis.GetServer(endPoint);
1896-
List<object> args = async ? ["replicate", primaryNodeId, "async"] : ["replicate", primaryNodeId];
1896+
List<object> args = async ? ["replicate", primaryNodeId, "async"] : ["replicate", primaryNodeId, "sync"];
18971897
var result = (string)server.Execute("cluster", args);
18981898
ClassicAssert.AreEqual("OK", result);
18991899
return result;

test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs

+6-7
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using System;
55
using System.Collections.Generic;
6-
using System.Diagnostics;
76
using System.IO;
87
using System.Linq;
98
using System.Net;
@@ -1174,16 +1173,16 @@ public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW)
11741173
}
11751174
}
11761175

1177-
[Test, Order(23)]
1178-
[Conditional("DEBUG")]
1179-
public void ClusterExceptionAtReplicaAofSyncStart()
1176+
//[Test, Order(23)]
1177+
//[Conditional("DEBUG")]
1178+
public void ClusterExceptionAtReplicaAofSyncStart([Values] bool enableDisklessSync)
11801179
{
11811180
ExceptionScenarioHelper.EnableException(ExceptionScenario.FAIL_RIGHT_BEFORE_AOF_STREAM_STARTS);
11821181

11831182
var primaryIndex = 0;
11841183
var replicaIndex = 1;
11851184
var nodes_count = 2;
1186-
context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, useTLS: useTLS);
1185+
context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, useTLS: useTLS, enableDisklessSync: enableDisklessSync, timeout: timeout);
11871186
context.CreateConnection(useTLS: useTLS);
11881187

11891188
_ = context.clusterTestUtils.AddDelSlotsRange(primaryIndex, [(0, 16383)], addslot: true, logger: context.logger);
@@ -1195,13 +1194,13 @@ public void ClusterExceptionAtReplicaAofSyncStart()
11951194
context.clusterTestUtils.WaitUntilNodeIsKnown(primaryIndex, replicaIndex, logger: context.logger);
11961195

11971196
var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex: replicaIndex, primaryNodeIndex: primaryIndex, failEx: false, logger: context.logger);
1198-
ClassicAssert.AreEqual("ERR Debug scenario triggered FAIL_AFTER_RECOVERY_AND_BEFORE_ATTACH", resp);
1197+
ClassicAssert.AreEqual("Debug scenario triggered FAIL_RIGHT_BEFORE_AOF_STREAM_STARTS", resp);
11991198

12001199
var role = context.clusterTestUtils.RoleCommand(replicaIndex, logger: context.logger);
12011200
ClassicAssert.AreEqual("master", role.Value);
12021201

12031202
resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex: replicaIndex, primaryNodeIndex: primaryIndex, failEx: false, logger: context.logger);
1204-
ClassicAssert.AreEqual("ERR Debug scenario triggered FAIL_AFTER_RECOVERY_AND_BEFORE_ATTACH", resp);
1203+
ClassicAssert.AreEqual("Debug scenario triggered FAIL_RIGHT_BEFORE_AOF_STREAM_STARTS", resp);
12051204
}
12061205
}
12071206
}

0 commit comments

Comments
 (0)