Skip to content

Commit 2448beb

Browse files
authored
Fix ExecuteClusterAppendLog (#1187)
* Ensure flush at ExecuteAppendLog happens optimistically outside epoch protection * log unexpected flush * bump version * remove logging and make network buffer settings more clear
1 parent 8153dcd commit 2448beb

File tree

6 files changed

+32
-9
lines changed

6 files changed

+32
-9
lines changed

Version.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<Project>
22
<!-- VersionPrefix property for builds and packages -->
33
<PropertyGroup>
4-
<VersionPrefix>1.0.63</VersionPrefix>
4+
<VersionPrefix>1.0.64</VersionPrefix>
55
</PropertyGroup>
66
</Project>

libs/client/ClientSession/GarnetClientSession.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -328,8 +328,8 @@ public unsafe void ExecuteClusterAppendLog(string nodeId, long previousAddress,
328328
{
329329
Debug.Assert(nodeId != null);
330330

331-
byte* curr = offset;
332-
int arraySize = 7;
331+
var curr = offset;
332+
var arraySize = 7;
333333

334334
while (!RespWriteUtils.TryWriteArrayLength(arraySize, ref curr, end))
335335
{
@@ -389,7 +389,6 @@ public unsafe void ExecuteClusterAppendLog(string nodeId, long previousAddress,
389389
curr = offset;
390390
}
391391
offset = curr;
392-
Flush();
393392
}
394393

395394
/// <summary>

libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddr
8787

8888
public void Throttle()
8989
{
90+
// Trigger flush while we are out of epoch protection
91+
garnetClient.CompletePending(false);
9092
garnetClient.Throttle();
9193
}
9294

libs/cluster/Server/Replication/ReplicationManager.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null
114114
this.storeWrapper = clusterProvider.storeWrapper;
115115
this.pageSizeBits = storeWrapper.appendOnlyFile == null ? 0 : storeWrapper.appendOnlyFile.UnsafeGetLogPageSizeBits();
116116

117+
networkBufferSettings.Log(logger, nameof(ReplicationManager));
117118
this.networkPool = networkBufferSettings.CreateBufferPool(logger: logger);
118119
ValidateNetworkBufferSettings();
119120

libs/cluster/Server/Replication/ReplicationNetworkBufferSettings.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ internal sealed partial class ReplicationManager : IDisposable
1212
/// <summary>
1313
/// NetworkBufferSettings for the buffer pool maintained by the ReplicationManager
1414
/// </summary>
15-
const int defaultSendBufferSize = 1 << 22;
16-
const int defaultInitialReceiveBufferSize = 1 << 12;
17-
readonly NetworkBufferSettings networkBufferSettings = new(defaultSendBufferSize, defaultInitialReceiveBufferSize);
15+
NetworkBufferSettings networkBufferSettings => NetworkBufferSettings.GetInclusive([GetRSSNetworkBufferSettings, GetIRSNetworkBufferSettings, GetAofSyncNetworkBufferSettings]);
1816

1917
/// <summary>
2018
/// Network pool maintained by the ReplicationManager
@@ -38,10 +36,11 @@ internal sealed partial class ReplicationManager : IDisposable
3836

3937
/// <summary>
4038
/// NetworkBufferSettings for the AOF sync task clients
39+
/// NOTE: double buffer size for send page to ensure payload (command header + page size) always fits into client buffer.
4140
/// </summary>
42-
const int aofSyncSendBufferSize = 1 << 22;
41+
int aofSyncSendBufferSize => 2 << clusterProvider.storeWrapper.serverOptions.AofPageSizeBits();
4342
const int aofSyncInitialReceiveBufferSize = 1 << 17;
44-
public NetworkBufferSettings GetAofSyncNetworkBufferSettings { get; } = new(aofSyncSendBufferSize, aofSyncInitialReceiveBufferSize);
43+
public NetworkBufferSettings GetAofSyncNetworkBufferSettings => new(aofSyncSendBufferSize, aofSyncInitialReceiveBufferSize);
4544

4645
void ValidateNetworkBufferSettings()
4746
{

libs/common/NetworkBufferSettings.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,25 @@ public NetworkBufferSettings(int sendBufferSize = 1 << 17, int initialReceiveBuf
4747
this.maxReceiveBufferSize = maxReceiveBufferSize;
4848
}
4949

50+
/// <summary>
51+
/// Return inclusive size for array of settings
52+
/// </summary>
53+
/// <param name="settings"></param>
54+
/// <returns></returns>
55+
public static NetworkBufferSettings GetInclusive(NetworkBufferSettings[] settings)
56+
{
57+
var maxSendBufferSize = 1 << 17;
58+
var minInitialReceiveBufferSize = 1 << 17;
59+
var maxReceiveBufferSize = 1 << 20;
60+
foreach (var setting in settings)
61+
{
62+
maxSendBufferSize = Math.Max(maxSendBufferSize, setting.sendBufferSize);
63+
minInitialReceiveBufferSize = Math.Min(minInitialReceiveBufferSize, setting.initialReceiveBufferSize);
64+
maxReceiveBufferSize = Math.Min(maxReceiveBufferSize, setting.maxReceiveBufferSize);
65+
}
66+
return new NetworkBufferSettings(maxSendBufferSize, minInitialReceiveBufferSize, maxReceiveBufferSize);
67+
}
68+
5069
/// <summary>
5170
/// Allocate network buffer pool
5271
/// </summary>
@@ -63,5 +82,8 @@ public LimitedFixedBufferPool CreateBufferPool(int maxEntriesPerLevel = 16, ILog
6382
levels = Math.Max(4, levels);
6483
return new LimitedFixedBufferPool(minSize, maxEntriesPerLevel: maxEntriesPerLevel, numLevels: levels, logger: logger);
6584
}
85+
86+
public void Log(ILogger logger, string category)
87+
=> logger?.LogInformation("[{category}] network settings: {sendBufferSize}, {initialReceiveBufferSize}, {maxReceiveBufferSize}", category, sendBufferSize, initialReceiveBufferSize, maxReceiveBufferSize);
6688
}
6789
}

0 commit comments

Comments
 (0)