diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 08e15dd..6502f9c 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -13,11 +13,11 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4.1.0 + - uses: actions/checkout@v4 - name: Setup .NET - uses: actions/setup-dotnet@v3.2.0 + uses: actions/setup-dotnet@v4 with: - dotnet-version: 6.0.x + dotnet-version: 8.0.x - name: Install dependencies run: | sudo apt-get update diff --git a/.github/workflows/nuget_publish.yml b/.github/workflows/nuget_publish.yml index 71e44b4..10186bb 100644 --- a/.github/workflows/nuget_publish.yml +++ b/.github/workflows/nuget_publish.yml @@ -16,11 +16,11 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4.1.0 + - uses: actions/checkout@v4 - name: Setup .NET - uses: actions/setup-dotnet@v3.2.0 + uses: actions/setup-dotnet@v4 with: - dotnet-version: 6.0.x + dotnet-version: 8.0.x - name: Restore dependencies run: dotnet restore - name: Build diff --git a/README.md b/README.md index 8cd1313..be28e8f 100644 --- a/README.md +++ b/README.md @@ -612,21 +612,17 @@ By default sync servers are obtained from `SyncServers` array and filtered by na - `MaxErrors` maximum number of errors in `Interval` by instance - `SwitchOffTime` time of synchronization switch off -Also you must add sync endpoints in `Configure` method of your `Startup` class. +Also you must add memcached endpoints in `Configure` method of your `Startup` class. ```c# app.UseEndpoints(endpoints => { - endpoints.AddMemcachedSyncEndpoint(this.Configuration); - endpoints.AddMemcachedSyncEndpoint(this.Configuration); endpoints.AddMemcachedEndpoints(this.Configuration); endpoints.MapControllers(); }); ``` `Configuration` argument here is a property on a `Startup` instance -`AddMemcachedSyncEndpoint` - to store data. The generic parameter type must be the same as in corresponding `GetAsync` / `MultiGetAsync` / `StoreAsync` / `MultiStoreAsync` method calls. -`AddMemcachedEndpoints` - for delete and flush endpoints When using cache synchronization feature, the `MemcachedClientResult.SyncSuccess` property can be inspected to determine whether the sync operation succeeded. When cache synchronization is not used this property is set to `false`. diff --git a/benchmarks/Aer.ConsistentHash.Benchmarks/Aer.ConsistentHash.Benchmarks.csproj b/benchmarks/Aer.ConsistentHash.Benchmarks/Aer.ConsistentHash.Benchmarks.csproj index 55e6d50..190a688 100644 --- a/benchmarks/Aer.ConsistentHash.Benchmarks/Aer.ConsistentHash.Benchmarks.csproj +++ b/benchmarks/Aer.ConsistentHash.Benchmarks/Aer.ConsistentHash.Benchmarks.csproj @@ -2,7 +2,7 @@ Exe - net6.0 + net8.0 enable diff --git a/samples/Aer.Memcached.Samples.ConsoleApp/Aer.Memcached.Samples.ConsoleApp.csproj b/samples/Aer.Memcached.Samples.ConsoleApp/Aer.Memcached.Samples.ConsoleApp.csproj index 529e68e..a6b13d0 100644 --- a/samples/Aer.Memcached.Samples.ConsoleApp/Aer.Memcached.Samples.ConsoleApp.csproj +++ b/samples/Aer.Memcached.Samples.ConsoleApp/Aer.Memcached.Samples.ConsoleApp.csproj @@ -2,7 +2,7 @@ Exe - net6.0 + net8.0 enable diff --git a/samples/Aer.Memcached.Samples.Shared/Aer.Memcached.Samples.Shared.csproj b/samples/Aer.Memcached.Samples.Shared/Aer.Memcached.Samples.Shared.csproj index 78c05ce..27dae03 100644 --- a/samples/Aer.Memcached.Samples.Shared/Aer.Memcached.Samples.Shared.csproj +++ b/samples/Aer.Memcached.Samples.Shared/Aer.Memcached.Samples.Shared.csproj @@ -1,7 +1,7 @@ - net6.0 + net8.0 enable diff --git a/samples/Aer.Memcached.Samples.Shared/ComplexModel.cs b/samples/Aer.Memcached.Samples.Shared/ComplexModel.cs index e20dcff..9e2ead1 100644 --- a/samples/Aer.Memcached.Samples.Shared/ComplexModel.cs +++ b/samples/Aer.Memcached.Samples.Shared/ComplexModel.cs @@ -3,4 +3,12 @@ public class ComplexModel { public Dictionary TestValues { get; set; } + + public SomeEnum SomeEnum { get; set; } +} + +public enum SomeEnum +{ + FirstValue, + SecondValue } \ No newline at end of file diff --git a/samples/Aer.Memcached.Samples.WebApi/Aer.Memcached.Samples.WebApi.csproj b/samples/Aer.Memcached.Samples.WebApi/Aer.Memcached.Samples.WebApi.csproj index 80c5ba5..ed1d13a 100644 --- a/samples/Aer.Memcached.Samples.WebApi/Aer.Memcached.Samples.WebApi.csproj +++ b/samples/Aer.Memcached.Samples.WebApi/Aer.Memcached.Samples.WebApi.csproj @@ -1,7 +1,7 @@ - net6.0 + net8.0 enable diff --git a/samples/Aer.Memcached.Samples.WebApi/Program.cs b/samples/Aer.Memcached.Samples.WebApi/Program.cs index e9c1ba2..32a823e 100644 --- a/samples/Aer.Memcached.Samples.WebApi/Program.cs +++ b/samples/Aer.Memcached.Samples.WebApi/Program.cs @@ -43,8 +43,6 @@ public static void Main(string[] args) app.UseEndpoints(endpoints => { - endpoints.AddMemcachedSyncEndpoint(builder.Configuration); - endpoints.AddMemcachedSyncEndpoint(builder.Configuration); endpoints.AddMemcachedEndpoints(builder.Configuration); endpoints.MapControllers(); }); diff --git a/samples/Aer.Memcached.Samples.WebApi/appsettings.Development.json b/samples/Aer.Memcached.Samples.WebApi/appsettings.Development.json index 76f324d..66f09a5 100644 --- a/samples/Aer.Memcached.Samples.WebApi/appsettings.Development.json +++ b/samples/Aer.Memcached.Samples.WebApi/appsettings.Development.json @@ -24,6 +24,10 @@ "MaxErrors": 3, "SwitchOffTime": "00:00:01" } + }, + "HashRing": { + "NumberOfVirtualNodes": 512, + "MaxDegreeOfParallelismForGettingNodes": 2 } } } diff --git a/samples/Aer.Memcached.Samples.WepApiToSync/Aer.Memcached.Samples.WepApiToSync.csproj b/samples/Aer.Memcached.Samples.WepApiToSync/Aer.Memcached.Samples.WepApiToSync.csproj index 80c5ba5..ed1d13a 100644 --- a/samples/Aer.Memcached.Samples.WepApiToSync/Aer.Memcached.Samples.WepApiToSync.csproj +++ b/samples/Aer.Memcached.Samples.WepApiToSync/Aer.Memcached.Samples.WepApiToSync.csproj @@ -1,7 +1,7 @@ - net6.0 + net8.0 enable diff --git a/samples/Aer.Memcached.Samples.WepApiToSync/Program.cs b/samples/Aer.Memcached.Samples.WepApiToSync/Program.cs index 6e7479d..b0ceda0 100644 --- a/samples/Aer.Memcached.Samples.WepApiToSync/Program.cs +++ b/samples/Aer.Memcached.Samples.WepApiToSync/Program.cs @@ -43,10 +43,6 @@ public static void Main(string[] args) app.UseEndpoints(endpoints => { - endpoints.AddMemcachedSyncEndpoint(builder.Configuration); - endpoints.AddMemcachedSyncEndpoint(builder.Configuration); - endpoints.AddMemcachedSyncEndpoint>(builder.Configuration); - endpoints.AddMemcachedSyncEndpoint>(builder.Configuration); endpoints.AddMemcachedEndpoints(builder.Configuration); endpoints.MapControllers(); }); diff --git a/src/Aer.ConsistentHash/Aer.ConsistentHash.csproj b/src/Aer.ConsistentHash/Aer.ConsistentHash.csproj index 0bed360..664845e 100644 --- a/src/Aer.ConsistentHash/Aer.ConsistentHash.csproj +++ b/src/Aer.ConsistentHash/Aer.ConsistentHash.csproj @@ -1,7 +1,7 @@ - net6.0 + net8.0 enable latest true @@ -16,6 +16,7 @@ + diff --git a/src/Aer.ConsistentHash/Config/HashRingSettings.cs b/src/Aer.ConsistentHash/Config/HashRingSettings.cs new file mode 100644 index 0000000..5e9fb0c --- /dev/null +++ b/src/Aer.ConsistentHash/Config/HashRingSettings.cs @@ -0,0 +1,19 @@ +namespace Aer.ConsistentHash.Config; + +public class HashRingSettings +{ + /// + /// Default number of virtual nodes for HashRing + /// + public static readonly int DefaultNumberOfVirtualNodes = 256; + + /// + /// Number of virtual nodes for HashRing + /// + public int NumberOfVirtualNodes { get; set; } = DefaultNumberOfVirtualNodes; + + /// + /// Degree of parallelism while getting nodes + /// + public int? MaxDegreeOfParallelismForGettingNodes { get; set; } +} \ No newline at end of file diff --git a/src/Aer.ConsistentHash/HashRing.cs b/src/Aer.ConsistentHash/HashRing.cs index 28f3b9c..5cf05d6 100644 --- a/src/Aer.ConsistentHash/HashRing.cs +++ b/src/Aer.ConsistentHash/HashRing.cs @@ -1,6 +1,8 @@ using System.Collections.Concurrent; using Aer.ConsistentHash.Abstractions; +using Aer.ConsistentHash.Config; using Aer.ConsistentHash.Infrastructure; +using Microsoft.Extensions.Options; namespace Aer.ConsistentHash; @@ -15,24 +17,32 @@ public class HashRing : INodeLocator private readonly IHashCalculator _hashCalculator; private readonly int _numberOfVirtualNodes; + private readonly int _maxDegreeOfParallelismForGettingNodes; private readonly ConcurrentDictionary _hashToNodeMap = new(); private readonly ConcurrentDictionary _nodeHashToVirtualNodeHashesMap = new(); - + // since we are only writing to this collection from inside the writer lock, // we can safely use with non-thread-safe version private readonly HashSet _deadNodes = new(); - + private ulong[] _sortedNodeHashKeys; /// Calculates hash for nodes and keys - /// Number of virtual nodes by one physical node - public HashRing(IHashCalculator hashCalculator, int numberOfVirtualNodes = 256) + /// HashRing settings + public HashRing(IHashCalculator hashCalculator, IOptions settings = null) { _locker = new ReaderWriterLockSlim(); _hashCalculator = hashCalculator; - _numberOfVirtualNodes = numberOfVirtualNodes; + _numberOfVirtualNodes = settings?.Value == null + ? HashRingSettings.DefaultNumberOfVirtualNodes + : settings.Value.NumberOfVirtualNodes == 0 + ? HashRingSettings.DefaultNumberOfVirtualNodes + : settings.Value.NumberOfVirtualNodes; + _maxDegreeOfParallelismForGettingNodes = settings?.Value == null + ? Environment.ProcessorCount + : settings.Value.MaxDegreeOfParallelismForGettingNodes ?? Environment.ProcessorCount; } public TNode GetNode(string key) @@ -48,6 +58,7 @@ public TNode GetNode(string key) } var node = GetNodeInternal(key); + return node; } finally @@ -75,7 +86,7 @@ public IDictionary, ConcurrentBag> GetNodes( Parallel.ForEach( keys, - new ParallelOptions {MaxDegreeOfParallelism = Environment.ProcessorCount}, + new ParallelOptions { MaxDegreeOfParallelism = _maxDegreeOfParallelismForGettingNodes }, key => { var replicatedNode = GetReplicatedNodeInternal(key, replicationFactor); @@ -137,7 +148,7 @@ public void AddNodes(IEnumerable nodes) public void AddNodes(params TNode[] nodes) { - AddNodes((IEnumerable) nodes); + AddNodes((IEnumerable)nodes); } public void RemoveNode(TNode node) @@ -202,22 +213,22 @@ public void MarkNodeDead(TNode node) _locker.ExitWriteLock(); } } - + public IReadOnlyCollection DrainDeadNodes() { try { _locker.EnterWriteLock(); - + // return defensive copy var currentDeadNodes = _deadNodes.ToArray(); - + _deadNodes.Clear(); return currentDeadNodes; } finally - { + { _locker.ExitWriteLock(); } } @@ -249,7 +260,7 @@ private ReplicatedNode GetReplicatedNodeInternal(string key, uint replica // means that replication factor is greater than total nodes count // return all other nodes as replicas - var replicaNodes = _hashToNodeMap.Values.Except(new[] {primaryNode}); + var replicaNodes = _hashToNodeMap.Values.Except(new[] { primaryNode }); foreach (var replicaNode in replicaNodes) { @@ -270,6 +281,7 @@ private ReplicatedNode GetReplicatedNodeInternal(string key, uint replica { // this is our primary node - start getting replica nodes from this one startingNodeFound = true; + continue; } @@ -365,7 +377,7 @@ private void AddNodeToCollections(TNode node) _hashToNodeMap.GetOrAdd(nodeHash, node); _nodeHashToVirtualNodeHashesMap.GetOrAdd(nodeHash, virtualNodeHashes); - + foreach (var virtualNodeHash in virtualNodeHashes) { _hashToNodeMap.GetOrAdd(virtualNodeHash, node); diff --git a/src/Aer.Memcached.Client/Aer.Memcached.Client.csproj b/src/Aer.Memcached.Client/Aer.Memcached.Client.csproj index 16d33ec..bf9dc0d 100644 --- a/src/Aer.Memcached.Client/Aer.Memcached.Client.csproj +++ b/src/Aer.Memcached.Client/Aer.Memcached.Client.csproj @@ -1,7 +1,7 @@ - net6.0 + net8.0 enable latest true @@ -18,8 +18,8 @@ - - + + diff --git a/src/Aer.Memcached.Client/CacheSync/CacheSyncClient.cs b/src/Aer.Memcached.Client/CacheSync/CacheSyncClient.cs index df301a9..3b05ff7 100644 --- a/src/Aer.Memcached.Client/CacheSync/CacheSyncClient.cs +++ b/src/Aer.Memcached.Client/CacheSync/CacheSyncClient.cs @@ -45,9 +45,9 @@ public CacheSyncClient( } /// - public async Task SyncAsync( + public async Task SyncAsync( MemcachedConfiguration.SyncServer syncServer, - CacheSyncModel data, + CacheSyncModel data, CancellationToken token) { try @@ -58,7 +58,7 @@ public async Task SyncAsync( MediaTypeNames.Application.Json); var baseUri = new Uri(syncServer.Address); - var endpointUri = new Uri(baseUri, _config.SyncSettings.SyncEndpoint + TypeExtensions.GetTypeName()); + var endpointUri = new Uri(baseUri, _config.SyncSettings.SyncEndpoint + TypeExtensions.GetTypeName()); await RequestAsync(content, endpointUri, token); } @@ -69,7 +69,7 @@ public async Task SyncAsync( throw; } } - + /// public async Task DeleteAsync( MemcachedConfiguration.SyncServer syncServer, diff --git a/src/Aer.Memcached.Client/CacheSync/CacheSynchronizer.cs b/src/Aer.Memcached.Client/CacheSync/CacheSynchronizer.cs index 41ab82a..b01f12e 100644 --- a/src/Aer.Memcached.Client/CacheSync/CacheSynchronizer.cs +++ b/src/Aer.Memcached.Client/CacheSync/CacheSynchronizer.cs @@ -41,8 +41,8 @@ public CacheSynchronizer( public bool IsCacheSyncEnabled() => _syncServersProvider.IsConfigured(); /// - public async Task TrySyncCacheAsync( - CacheSyncModel model, + public async Task TrySyncCacheAsync( + CacheSyncModel model, CancellationToken token) { if (model.KeyValues == null) @@ -60,39 +60,15 @@ public async Task TrySyncCacheAsync( var source = new CancellationTokenSource(_config.SyncSettings.TimeToSync); var syncCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, source.Token); var utcNow = DateTimeOffset.UtcNow; - + if (model.KeyValues.Count == 0) { return true; } - await Parallel.ForEachAsync( - _syncServers, - new ParallelOptions() - { - MaxDegreeOfParallelism = Environment.ProcessorCount, - CancellationToken = syncCancellationTokenSource.Token - }, async (syncServer, cancellationToken) => - { - var serverKey = syncServer.Address; - - try - { - if (_serverBySwitchOffTime.TryGetValue(serverKey, out var switchOffTime) && - switchOffTime > utcNow) - { - return; - } - - await _cacheSyncClient.SyncAsync(syncServer, model, cancellationToken); - } - catch (Exception) - { - await CheckCircuitBreaker(serverKey, utcNow); - - throw; - } - }); + await Task.WhenAll(_syncServers + .Select(syncServer => + SyncData(syncServer, model, utcNow, syncCancellationTokenSource.Token))); } catch (Exception) { @@ -103,7 +79,7 @@ await Parallel.ForEachAsync( return true; } - + public async Task TryDeleteCacheAsync( IEnumerable keys, CancellationToken token) @@ -123,34 +99,10 @@ public async Task TryDeleteCacheAsync( var source = new CancellationTokenSource(_config.SyncSettings.TimeToSync); var syncCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, source.Token); var utcNow = DateTimeOffset.UtcNow; - - await Parallel.ForEachAsync( - _syncServers, - new ParallelOptions() - { - MaxDegreeOfParallelism = Environment.ProcessorCount, - CancellationToken = syncCancellationTokenSource.Token - }, async (syncServer, cancellationToken) => - { - var serverKey = syncServer.Address; - - try - { - if (_serverBySwitchOffTime.TryGetValue(serverKey, out var switchOffTime) && - switchOffTime > utcNow) - { - return; - } - - await _cacheSyncClient.DeleteAsync(syncServer, keys, cancellationToken); - } - catch (Exception) - { - await CheckCircuitBreaker(serverKey, utcNow); - - throw; - } - }); + + await Task.WhenAll(_syncServers + .Select(syncServer => + SyncDelete(syncServer, keys, utcNow, syncCancellationTokenSource.Token))); } catch (Exception) { @@ -196,4 +148,56 @@ private async Task CheckCircuitBreaker(string serverKey, DateTimeOffset utcNow) } } } + + private async Task SyncData( + MemcachedConfiguration.SyncServer syncServer, + CacheSyncModel model, + DateTimeOffset utcNow, + CancellationToken token) + { + var serverKey = syncServer.Address; + + try + { + if (_serverBySwitchOffTime.TryGetValue(serverKey, out var switchOffTime) && + switchOffTime > utcNow) + { + return; + } + + await _cacheSyncClient.SyncAsync(syncServer, model, token); + } + catch (Exception) + { + await CheckCircuitBreaker(serverKey, utcNow); + + throw; + } + } + + private async Task SyncDelete( + MemcachedConfiguration.SyncServer syncServer, + IEnumerable keys, + DateTimeOffset utcNow, + CancellationToken token) + { + var serverKey = syncServer.Address; + + try + { + if (_serverBySwitchOffTime.TryGetValue(serverKey, out var switchOffTime) && + switchOffTime > utcNow) + { + return; + } + + await _cacheSyncClient.DeleteAsync(syncServer, keys, token); + } + catch (Exception) + { + await CheckCircuitBreaker(serverKey, utcNow); + + throw; + } + } } \ No newline at end of file diff --git a/src/Aer.Memcached.Client/Config/MemcachedConfiguration.cs b/src/Aer.Memcached.Client/Config/MemcachedConfiguration.cs index 0ff94d6..5c58fc7 100644 --- a/src/Aer.Memcached.Client/Config/MemcachedConfiguration.cs +++ b/src/Aer.Memcached.Client/Config/MemcachedConfiguration.cs @@ -1,5 +1,6 @@ using System.Diagnostics.CodeAnalysis; using Aer.ConsistentHash.Abstractions; +using Aer.ConsistentHash.Config; using Microsoft.Extensions.Logging; namespace Aer.Memcached.Client.Config; @@ -33,6 +34,11 @@ public class MemcachedConfiguration /// public const string DefaultFlushEndpoint = "/memcached/flush"; + /// + /// The default get endpoint. + /// + public const string DefaultGetEndpoint = "/memcached/multi-get-typed"; + /// /// List of servers with hosted memcached. /// @@ -100,6 +106,8 @@ public class MemcachedConfiguration /// If set to true, external cancellations will be logged in terse manner - only as operation name. /// public bool IsTerseCancellationLogging { get; set; } + + public HashRingSettings HashRing { get; set; } /// /// Checks that either or are specified. @@ -302,6 +310,11 @@ public class SynchronizationSettings /// public string FlushEndpoint { get; set; } = DefaultFlushEndpoint; + /// + /// Endpoint that is created by current service to allow other services to get data. + /// + public string GetEndpoint { get; set; } = DefaultGetEndpoint; + /// /// Name of environment variable to get current cluster name. /// It is needed to filter out sync servers and don't try to send data diff --git a/src/Aer.Memcached.Client/Interfaces/ICacheSyncClient.cs b/src/Aer.Memcached.Client/Interfaces/ICacheSyncClient.cs index 06cae34..d1e0989 100644 --- a/src/Aer.Memcached.Client/Interfaces/ICacheSyncClient.cs +++ b/src/Aer.Memcached.Client/Interfaces/ICacheSyncClient.cs @@ -11,9 +11,9 @@ public interface ICacheSyncClient /// Server to sync data /// Data to sync /// Cancellation token - Task SyncAsync( + Task SyncAsync( MemcachedConfiguration.SyncServer syncServer, - CacheSyncModel data, + CacheSyncModel data, CancellationToken token); /// diff --git a/src/Aer.Memcached.Client/Interfaces/ICacheSynchronizer.cs b/src/Aer.Memcached.Client/Interfaces/ICacheSynchronizer.cs index 1143c6c..200bf92 100644 --- a/src/Aer.Memcached.Client/Interfaces/ICacheSynchronizer.cs +++ b/src/Aer.Memcached.Client/Interfaces/ICacheSynchronizer.cs @@ -15,7 +15,7 @@ public interface ICacheSynchronizer /// /// Data to sync. /// Cancellation token. - Task TrySyncCacheAsync(CacheSyncModel model, CancellationToken token); + Task TrySyncCacheAsync(CacheSyncModel model, CancellationToken token); /// /// Deletes data on the servers that are specified in . diff --git a/src/Aer.Memcached.Client/Interfaces/IMemcachedClient.cs b/src/Aer.Memcached.Client/Interfaces/IMemcachedClient.cs index a5d711a..39d4a1e 100644 --- a/src/Aer.Memcached.Client/Interfaces/IMemcachedClient.cs +++ b/src/Aer.Memcached.Client/Interfaces/IMemcachedClient.cs @@ -64,6 +64,20 @@ Task MultiStoreAsync( CacheSyncOptions cacheSyncOptions = null, uint replicationFactor = 0); + /// + /// Lean version of MultiStore method to synchronize cache data + /// keyValues must have already serialized data in values + /// + /// Values by keys. + /// Flags for the data. + /// Expiration time. + /// Cancellation token. + Task MultiStoreSynchronizeDataAsync( + IDictionary keyValues, + uint flags, + DateTimeOffset? expirationTime, + CancellationToken token); + /// /// Gets one value by key. /// diff --git a/src/Aer.Memcached.Client/MemcachedClient.cs b/src/Aer.Memcached.Client/MemcachedClient.cs index 5c68c19..6ffe428 100644 --- a/src/Aer.Memcached.Client/MemcachedClient.cs +++ b/src/Aer.Memcached.Client/MemcachedClient.cs @@ -98,12 +98,13 @@ public async Task StoreAsync( if (IsCacheSyncEnabledInternal(cacheSyncOptions)) { syncSuccess = await _cacheSynchronizer.TrySyncCacheAsync( - new CacheSyncModel + new CacheSyncModel { - KeyValues = new Dictionary() + KeyValues = new Dictionary() { - [key] = value + [key] = cacheItem.Data.Array }, + Flags = cacheItem.Flags, ExpirationTime = expirationTime.HasValue ? utcNow.Add(expirationTime.Value) : null @@ -149,15 +150,22 @@ public async Task MultiStoreAsync( var keyToExpirationMap = _expirationCalculator.GetExpiration(keyValues.Keys, expirationTime); - await MultiStoreInternalAsync(nodes, keyToExpirationMap, keyValues, token, storeMode, batchingOptions); + var serializedKeyValues = new Dictionary(); + foreach (var keyValue in keyValues) + { + serializedKeyValues[keyValue.Key] = _binarySerializer.Serialize(keyValue.Value); + } + + await MultiStoreInternalAsync(nodes, keyToExpirationMap, serializedKeyValues, token, storeMode, batchingOptions); var syncSuccess = false; if (IsCacheSyncEnabledInternal(cacheSyncOptions)) { syncSuccess = await _cacheSynchronizer.TrySyncCacheAsync( - new CacheSyncModel + new CacheSyncModel { - KeyValues = keyValues, + KeyValues = serializedKeyValues.ToDictionary(key => key.Key, value => value.Value.Data.Array), + Flags = serializedKeyValues.First().Value.Flags, ExpirationTime = expirationTime.HasValue ? utcNow.Add(expirationTime.Value) : null @@ -210,16 +218,23 @@ public async Task MultiStoreAsync( return MemcachedClientResult.Unsuccessful( $"Memcached nodes for keys {string.Join(",", keyValues.Keys)} not found"); } + + var serializedKeyValues = new Dictionary(); + foreach (var keyValue in keyValues) + { + serializedKeyValues[keyValue.Key] = _binarySerializer.Serialize(keyValue.Value); + } - await MultiStoreInternalAsync(nodes, keyToExpirationMap, keyValues, token, storeMode, batchingOptions); + await MultiStoreInternalAsync(nodes, keyToExpirationMap, serializedKeyValues, token, storeMode, batchingOptions); var syncSuccess = false; if (IsCacheSyncEnabledInternal(cacheSyncOptions)) { syncSuccess = await _cacheSynchronizer.TrySyncCacheAsync( - new CacheSyncModel + new CacheSyncModel { - KeyValues = keyValues, + KeyValues = serializedKeyValues.ToDictionary(key => key.Key, value => value.Value.Data.Array), + Flags = serializedKeyValues.First().Value.Flags, ExpirationTime = expirationTime }, token); @@ -237,6 +252,57 @@ public async Task MultiStoreAsync( $"An exception happened during {nameof(MultiStoreAsync)} execution.\nException details: {e}"); } } + + /// + public async Task MultiStoreSynchronizeDataAsync( + IDictionary keyValues, + uint flags, + DateTimeOffset? expirationTime, + CancellationToken token) + { + try + { + if (keyValues is null or { Count: 0 }) + { + return MemcachedClientResult.Successful; + } + + var keyToExpirationMap = _expirationCalculator.GetExpiration(keyValues.Keys, expirationTime); + + // this check is first since it shortcuts all the following logic + if (keyToExpirationMap is null) + { + return MemcachedClientResult.Unsuccessful( + $"Expiration date time offset {expirationTime} lies in the past. No keys stored"); + } + + var nodes = _nodeLocator.GetNodes(keyValues.Keys, 0); + if (nodes.Keys.Count == 0) + { + return MemcachedClientResult.Unsuccessful( + $"Memcached nodes for keys {string.Join(",", keyValues.Keys)} not found"); + } + + var serializedKeyValues = new Dictionary(); + foreach (var keyValue in keyValues) + { + serializedKeyValues[keyValue.Key] = new CacheItemForRequest(flags, keyValue.Value); + } + + await MultiStoreInternalAsync(nodes, keyToExpirationMap, serializedKeyValues, token); + + return MemcachedClientResult.Successful.WithSyncSuccess(true); + } + catch (OperationCanceledException) when (_memcachedConfiguration.IsTerseCancellationLogging) + { + return MemcachedClientResult.Cancelled(nameof(MultiStoreAsync)); + } + catch (Exception e) + { + return MemcachedClientResult.Unsuccessful( + $"An exception happened during {nameof(MultiStoreAsync)} execution.\nException details: {e}"); + } + } /// public async Task> GetAsync(string key, CancellationToken token) @@ -643,10 +709,10 @@ public async Task FlushAsync(CancellationToken token) } } - private async Task MultiStoreInternalAsync( + private async Task MultiStoreInternalAsync( IDictionary, ConcurrentBag> nodes, Dictionary keyToExpirationMap, - IDictionary keyValues, + IDictionary keyValues, CancellationToken token, StoreMode storeMode = StoreMode.Set, BatchingOptions batchingOptions = null) @@ -677,7 +743,7 @@ await MultiStoreBatchedInternalAsync( foreach (var key in keys) { - keyValuesToStore[key] = _binarySerializer.Serialize(keyValues[key]); + keyValuesToStore[key] = keyValues[key]; } var command = new MultiStoreCommand( @@ -706,9 +772,9 @@ await MultiStoreBatchedInternalAsync( public bool IsCacheSyncEnabled() => _cacheSynchronizer != null && _cacheSynchronizer.IsCacheSyncEnabled(); - private async Task MultiStoreBatchedInternalAsync( + private async Task MultiStoreBatchedInternalAsync( IDictionary, ConcurrentBag> nodes, - IDictionary keyValues, + IDictionary keyValues, BatchingOptions batchingOptions, Dictionary keyToExpirationMap, StoreMode storeMode, @@ -739,7 +805,7 @@ await Parallel.ForEachAsync( var keyValuesToStore = new Dictionary(); foreach (var key in keysBatch) { - keyValuesToStore[key] = _binarySerializer.Serialize(keyValues[key]); + keyValuesToStore[key] = keyValues[key]; } using (var command = new MultiStoreCommand( diff --git a/src/Aer.Memcached.Client/Models/CacheSyncModel.cs b/src/Aer.Memcached.Client/Models/CacheSyncModel.cs index d09db65..6363e26 100644 --- a/src/Aer.Memcached.Client/Models/CacheSyncModel.cs +++ b/src/Aer.Memcached.Client/Models/CacheSyncModel.cs @@ -3,13 +3,17 @@ namespace Aer.Memcached.Client.Models; /// /// Represents a multi-cluster cache synchronization key-value DTO. /// -/// The type of the value. -public class CacheSyncModel +public class CacheSyncModel { /// /// Gets or sets the key-values to sync. /// - public IDictionary KeyValues { get; set; } + public IDictionary KeyValues { get; set; } + + /// + /// Flags for sync data. + /// + public uint Flags { get; set; } /// /// Gets or sets the key-value items expiration time. diff --git a/src/Aer.Memcached.Client/Serializers/Binary/BinarySerializer.cs b/src/Aer.Memcached.Client/Serializers/Binary/BinarySerializer.cs index 30844e1..205726a 100644 --- a/src/Aer.Memcached.Client/Serializers/Binary/BinarySerializer.cs +++ b/src/Aer.Memcached.Client/Serializers/Binary/BinarySerializer.cs @@ -1,4 +1,5 @@ -using System.Text; +using System.Runtime.CompilerServices; +using System.Text; using Aer.Memcached.Client.Commands.Infrastructure; using Aer.Memcached.Client.Interfaces; using Aer.Memcached.Client.Models; @@ -54,10 +55,10 @@ internal CacheItemForRequest Serialize(object value) data = new ArraySegment(BitConverter.GetBytes((bool) value)); break; case TypeCode.SByte: - data = new ArraySegment(BitConverter.GetBytes((sbyte) value)); + data = new ArraySegment(GetBytes((sbyte) value)); break; case TypeCode.Byte: - data = new ArraySegment(BitConverter.GetBytes((byte) value)); + data = new ArraySegment([(byte)value]); break; case TypeCode.Int16: data = new ArraySegment(BitConverter.GetBytes((short) value)); @@ -208,4 +209,9 @@ private static uint TypeCodeToFlag(TypeCode code) { return (uint) ((int) code | TypeCodeSerializationMask); } + + private static byte[] GetBytes(short value) + { + return BitConverter.GetBytes(value); + } } diff --git a/src/Aer.Memcached/Aer.Memcached.csproj b/src/Aer.Memcached/Aer.Memcached.csproj index cedf5c4..abdf97c 100644 --- a/src/Aer.Memcached/Aer.Memcached.csproj +++ b/src/Aer.Memcached/Aer.Memcached.csproj @@ -1,7 +1,7 @@ - net6.0 + net8.0 enable latest true @@ -18,9 +18,9 @@ - + - + diff --git a/src/Aer.Memcached/Infrastructure/MemcachedMaintainer.cs b/src/Aer.Memcached/Infrastructure/MemcachedMaintainer.cs index 62f8db6..d2714fb 100644 --- a/src/Aer.Memcached/Infrastructure/MemcachedMaintainer.cs +++ b/src/Aer.Memcached/Infrastructure/MemcachedMaintainer.cs @@ -284,20 +284,8 @@ private void CheckNodesHealth(object timerState) { _locker.ExitReadLock(); } - - var parallelDeadNodesCheckTask = Parallel.ForEachAsync( - nodesInLocator, - new ParallelOptions {MaxDegreeOfParallelism = Environment.ProcessorCount}, - async (node, _) => - { - // pass no cancellation token since the caller method is synchronous - if (await _nodeHealthChecker.CheckNodeIsDeadAsync(node)) - { - _deadNodes.Add(node); - } - }); - parallelDeadNodesCheckTask.GetAwaiter().GetResult(); + Task.WhenAll(nodesInLocator.Select(CheckNode)).GetAwaiter().GetResult(); if (!_deadNodes.IsEmpty) { @@ -327,4 +315,13 @@ public void Dispose() _nodeRebuildingTimer?.Dispose(); _nodeHealthCheckTimer?.Dispose(); } + + private async Task CheckNode(TNode node) + { + // pass no cancellation token since the caller method is synchronous + if (await _nodeHealthChecker.CheckNodeIsDeadAsync(node)) + { + _deadNodes.Add(node); + } + } } \ No newline at end of file diff --git a/src/Aer.Memcached/ServiceCollectionExtensions.cs b/src/Aer.Memcached/ServiceCollectionExtensions.cs index 2f09828..96016f3 100644 --- a/src/Aer.Memcached/ServiceCollectionExtensions.cs +++ b/src/Aer.Memcached/ServiceCollectionExtensions.cs @@ -1,6 +1,7 @@ using System.Diagnostics; using Aer.ConsistentHash; using Aer.ConsistentHash.Abstractions; +using Aer.ConsistentHash.Config; using Aer.Memcached.Abstractions; using Aer.Memcached.Client; using Aer.Memcached.Client.Authentication; @@ -15,6 +16,7 @@ using Aer.Memcached.Diagnostics.Listeners; using Aer.Memcached.Infrastructure; using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Routing; using Microsoft.Extensions.Configuration; @@ -35,6 +37,7 @@ public static IServiceCollection AddMemcached( ArgumentNullException.ThrowIfNull(services); services.Configure(configuration.GetSection(nameof(MemcachedConfiguration))); + services.Configure(configuration.GetSection($"{nameof(MemcachedConfiguration)}:{nameof(MemcachedConfiguration.HashRing)}")); services.AddSingleton(); services.AddSingleton, HeadlessServiceDnsLookupNodeProvider>(); services.AddSingleton, HashRing>(); @@ -129,31 +132,6 @@ public static IApplicationBuilder EnableMemcachedDiagnostics( return applicationBuilder; } - public static void AddMemcachedSyncEndpoint(this IEndpointRouteBuilder endpoints, IConfiguration configuration) - { - var config = configuration.GetSection(nameof(MemcachedConfiguration)).Get(); - - if (config.SyncSettings != null) - { - endpoints.MapPost( - config.SyncSettings.SyncEndpoint + TypeExtensions.GetTypeName(), - async ( - [FromBody] CacheSyncModel model, - IMemcachedClient memcachedClient, - CancellationToken token) => - { - await memcachedClient.MultiStoreAsync( - model.KeyValues, - model.ExpirationTime, - token, - cacheSyncOptions: new CacheSyncOptions - { - IsManualSyncOn = false - }); - }); - } - } - public static void AddMemcachedEndpoints(this IEndpointRouteBuilder endpoints, IConfiguration configuration) { var config = configuration.GetSection(nameof(MemcachedConfiguration)).Get(); @@ -163,6 +141,9 @@ public static void AddMemcachedEndpoints(this IEndpointRouteBuilder endpoints, I var flushEndpoint = config.SyncSettings == null ? MemcachedConfiguration.DefaultFlushEndpoint : config.SyncSettings.FlushEndpoint; + var getEndpoint = config.SyncSettings == null + ? MemcachedConfiguration.DefaultGetEndpoint + : config.SyncSettings.GetEndpoint; endpoints.MapPost( deleteEndpoint, @@ -186,5 +167,66 @@ await memcachedClient.MultiDeleteAsync( { await memcachedClient.FlushAsync(token); }); + + if (config.SyncSettings != null) + { + endpoints.MapPost( + config.SyncSettings.SyncEndpoint + TypeExtensions.GetTypeName(), + async ( + [FromBody] CacheSyncModel model, + IMemcachedClient memcachedClient, + CancellationToken token) => + { + await memcachedClient.MultiStoreSynchronizeDataAsync( + model.KeyValues, + model.Flags, + model.ExpirationTime, + token); + }); + } + + endpoints.MapPost( + getEndpoint, + (MultiGetTypedRequest request, IMemcachedClient memcachedClient, CancellationToken token) => + { + try + { + var resolvedType = Type.GetType(request.Type); + if (resolvedType == null) + { + return Results.Ok( + $"Type is not found. Try {typeof(string).FullName} or {typeof(object).FullName}"); + } + + var method = typeof(MemcachedClient).GetMethod(nameof(MemcachedClient.MultiGetAsync)); + if (method == null) + { + return Results.Ok($"Method for the type {resolvedType} is not found"); + } + + var genericMethod = method.MakeGenericMethod(resolvedType); + + var task = genericMethod.Invoke(memcachedClient, parameters: [request.Keys, token, null, (uint)0]) as Task; + if (task == null) + { + return Results.Ok($"Method for the type {resolvedType} is not found"); + } + + var result = task.GetType().GetProperty("Result")?.GetValue(task); + + return Results.Ok(Newtonsoft.Json.JsonConvert.SerializeObject(result)); + } + catch (Exception e) + { + return Results.BadRequest(e); + } + }); + } + + private class MultiGetTypedRequest + { + public string[] Keys { get; set; } + + public string Type { get; set; } } } \ No newline at end of file diff --git a/tests/Aer.ConsistentHash.Tests/Aer.ConsistentHash.Tests.csproj b/tests/Aer.ConsistentHash.Tests/Aer.ConsistentHash.Tests.csproj index 4913888..69e6daa 100644 --- a/tests/Aer.ConsistentHash.Tests/Aer.ConsistentHash.Tests.csproj +++ b/tests/Aer.ConsistentHash.Tests/Aer.ConsistentHash.Tests.csproj @@ -1,7 +1,7 @@ - net6.0 + net8.0 enable false diff --git a/tests/Aer.Memcached.Tests/Aer.Memcached.Tests.csproj b/tests/Aer.Memcached.Tests/Aer.Memcached.Tests.csproj index eb88ba3..b063f4b 100644 --- a/tests/Aer.Memcached.Tests/Aer.Memcached.Tests.csproj +++ b/tests/Aer.Memcached.Tests/Aer.Memcached.Tests.csproj @@ -1,7 +1,7 @@ - net6.0 + net8.0 enable false diff --git a/tests/Aer.Memcached.Tests/TestClasses/CacheSynchronizerTests.cs b/tests/Aer.Memcached.Tests/TestClasses/CacheSynchronizerTests.cs index 8e3a395..350be0f 100644 --- a/tests/Aer.Memcached.Tests/TestClasses/CacheSynchronizerTests.cs +++ b/tests/Aer.Memcached.Tests/TestClasses/CacheSynchronizerTests.cs @@ -46,13 +46,13 @@ public async Task Sync_NotConfigured_NoCalls() { var cacheSynchronizer = GetCacheSynchronizer(); - var syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel{ - KeyValues = _fixture.Create>(), + var syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel{ + KeyValues = _fixture.Create>(), ExpirationTime = _fixture.Create() }, CancellationToken.None); await _cacheSyncClient.Received(0).SyncAsync(Arg.Any(), - Arg.Any>(), Arg.Any()); + Arg.Any(), Arg.Any()); await _errorStatisticsStore.Received(0) .GetErrorStatisticsAsync(Arg.Any(), Arg.Any(), Arg.Any()); syncSuccess.Should().BeFalse(); @@ -82,13 +82,13 @@ public async Task Sync_Configured_NoErrors() var cacheSynchronizer = GetCacheSynchronizer(); - var syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel{ - KeyValues = _fixture.Create>(), + var syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel{ + KeyValues = _fixture.Create>(), ExpirationTime = _fixture.Create() }, CancellationToken.None); await _cacheSyncClient.Received(syncServers.Length).SyncAsync(Arg.Any(), - Arg.Any>(), Arg.Any()); + Arg.Any(), Arg.Any()); await _errorStatisticsStore.Received(0) .GetErrorStatisticsAsync(Arg.Any(), Arg.Any(), Arg.Any()); syncSuccess.Should().BeTrue(); @@ -121,18 +121,18 @@ public async Task Sync_Configured_Error_NoCircuitBreakerConfigured_NoCalls() _syncServersProvider.IsConfigured().Returns(true); _syncServersProvider.GetSyncServers().Returns(syncServers); _cacheSyncClient - .SyncAsync(Arg.Any(), Arg.Any>(), + .SyncAsync(Arg.Any(), Arg.Any(), Arg.Any()).Throws(new Exception()); var cacheSynchronizer = GetCacheSynchronizer(); - var syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel{ - KeyValues = _fixture.Create>(), + var syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel{ + KeyValues = _fixture.Create>(), ExpirationTime = _fixture.Create() }, CancellationToken.None); await _cacheSyncClient.Received(syncServers.Length).SyncAsync(Arg.Any(), - Arg.Any>(), Arg.Any()); + Arg.Any(), Arg.Any()); await _errorStatisticsStore.Received(0) .GetErrorStatisticsAsync(Arg.Any(), Arg.Any(), Arg.Any()); syncSuccess.Should().BeFalse(); @@ -168,7 +168,7 @@ public async Task Sync_Configured_Error_CircuitBreakerConfigured_CallToErrorStat _syncServersProvider.IsConfigured().Returns(true); _syncServersProvider.GetSyncServers().Returns(syncServers); _cacheSyncClient - .SyncAsync(Arg.Any(), Arg.Any>(), + .SyncAsync(Arg.Any(), Arg.Any(), Arg.Any()).Throws(new Exception()); var cacheSynchronizer = GetCacheSynchronizer(new MemcachedConfiguration @@ -179,14 +179,14 @@ public async Task Sync_Configured_Error_CircuitBreakerConfigured_CallToErrorStat } }); - var syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel + var syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel { - KeyValues = _fixture.Create>(), + KeyValues = _fixture.Create>(), ExpirationTime = _fixture.Create() }, CancellationToken.None); await _cacheSyncClient.Received(syncServers.Length).SyncAsync(Arg.Any(), - Arg.Any>(), Arg.Any()); + Arg.Any(), Arg.Any()); await _errorStatisticsStore.Received(syncServers.Length) .GetErrorStatisticsAsync(Arg.Any(), Arg.Any(), Arg.Any()); syncSuccess.Should().BeFalse(); @@ -231,7 +231,7 @@ public async Task Sync_Configured_Error_CircuitBreakerConfigured_SwitchOffCheck( _syncServersProvider.IsConfigured().Returns(true); _syncServersProvider.GetSyncServers().Returns(syncServers); _cacheSyncClient - .SyncAsync(Arg.Any(), Arg.Any>(), + .SyncAsync(Arg.Any(), Arg.Any(), Arg.Any()).Throws(new Exception()); _errorStatisticsStore.GetErrorStatisticsAsync(syncServerToTurnOff.Address, Arg.Any(), Arg.Any()) .Returns( @@ -248,30 +248,30 @@ public async Task Sync_Configured_Error_CircuitBreakerConfigured_SwitchOffCheck( } }); - var syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel{ - KeyValues = _fixture.Create>(), + var syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel{ + KeyValues = _fixture.Create>(), ExpirationTime = _fixture.Create() }, CancellationToken.None); await _cacheSyncClient.Received(1).SyncAsync(syncServerNotTurnedOff, - Arg.Any>(), Arg.Any()); + Arg.Any(), Arg.Any()); await _cacheSyncClient.Received(1).SyncAsync(syncServerToTurnOff, - Arg.Any>(), Arg.Any()); + Arg.Any(), Arg.Any()); await _errorStatisticsStore.Received(1) .GetErrorStatisticsAsync(syncServerToTurnOff.Address, Arg.Any(), Arg.Any()); await _errorStatisticsStore.Received(1) .GetErrorStatisticsAsync(syncServerNotTurnedOff.Address, Arg.Any(), Arg.Any()); syncSuccess.Should().BeFalse(); - syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel{ - KeyValues = _fixture.Create>(), + syncSuccess = await cacheSynchronizer.TrySyncCacheAsync(new CacheSyncModel{ + KeyValues = _fixture.Create>(), ExpirationTime = _fixture.Create() }, CancellationToken.None); await _cacheSyncClient.Received(2).SyncAsync(syncServerNotTurnedOff, - Arg.Any>(), Arg.Any()); + Arg.Any(), Arg.Any()); await _cacheSyncClient.Received(1).SyncAsync(syncServerToTurnOff, - Arg.Any>(), Arg.Any()); + Arg.Any(), Arg.Any()); await _errorStatisticsStore.Received(1) .GetErrorStatisticsAsync(syncServerToTurnOff.Address, Arg.Any(), Arg.Any()); await _errorStatisticsStore.Received(2) diff --git a/tests/Aer.Memcached.Tests/TestClasses/MemcachedE2ETests.cs b/tests/Aer.Memcached.Tests/TestClasses/MemcachedE2ETests.cs index 9d70297..e8d6c1c 100644 --- a/tests/Aer.Memcached.Tests/TestClasses/MemcachedE2ETests.cs +++ b/tests/Aer.Memcached.Tests/TestClasses/MemcachedE2ETests.cs @@ -238,6 +238,199 @@ await client1.MultiDelete(new MultiDeleteRequest // ignored } } + + [TestMethodWithIgnoreIfSupport] + [IgnoreIf(nameof(IsWindows))] + public async Task WepApi_E2E_MultiDelete_NonExistentKeys_Success() + { + var port1 = GeneratePort(); + var port2 = GeneratePort(); + + var httpServerFixture1 = new HttpServerFixture + { + Port = port1 + }.WithWebHostBuilder(builder => + { + builder.ConfigureTestServices(services => + { + services.Configure(configuration => + { + configuration.SyncSettings = new MemcachedConfiguration.SynchronizationSettings + { + SyncServers = new[] + { + new MemcachedConfiguration.SyncServer + { + Address = $"http://localhost:{port2}", + ClusterName = "test2" + } + }, + CacheSyncCircuitBreaker = new MemcachedConfiguration.CacheSyncCircuitBreakerSettings + { + Interval = TimeSpan.FromSeconds(2), + SwitchOffTime = TimeSpan.FromSeconds(1), + MaxErrors = 3 + } + }; + }); + }); + }); + + var httpServerFixture2 = new HttpServerFixture + { + Port = port2 + }.WithWebHostBuilder(builder => + { + builder.ConfigureTestServices(services => + { + services.Configure(configuration => + { + configuration.SyncSettings = new MemcachedConfiguration.SynchronizationSettings + { + SyncServers = new[] + { + new MemcachedConfiguration.SyncServer + { + Address = $"http://localhost:{port1}", + ClusterName = "test1" + } + }, + CacheSyncCircuitBreaker = new MemcachedConfiguration.CacheSyncCircuitBreakerSettings + { + Interval = TimeSpan.FromSeconds(2), + SwitchOffTime = TimeSpan.FromSeconds(1), + MaxErrors = 3 + } + }; + }); + }); + }); + ; + + var client1 = new MemcachedWebApiClient(httpServerFixture1.CreateDefaultClient()); + var client2 = new MemcachedWebApiClient(httpServerFixture2.CreateDefaultClient()); + + var keyValues = Enumerable.Range(0, 5) + .ToDictionary(_ => Guid.NewGuid().ToString(), _ => Guid.NewGuid().ToString()); + + var result = await client1.MultiDelete(new MultiDeleteRequest + { + Keys = keyValues.Keys.ToArray() + }); + + result.SyncSuccess.Should().BeTrue(); + + try + { + await httpServerFixture1.DisposeAsync(); + await httpServerFixture2.DisposeAsync(); + } + catch (Exception) + { + // ignored + } + } + + [TestMethodWithIgnoreIfSupport] + [IgnoreIf(nameof(IsWindows))] + public async Task WepApi_E2E_MultiStore_SameKeysTwice_Success() + { + var port1 = GeneratePort(); + var port2 = GeneratePort(); + + var httpServerFixture1 = new HttpServerFixture + { + Port = port1 + }.WithWebHostBuilder(builder => + { + builder.ConfigureTestServices(services => + { + services.Configure(configuration => + { + configuration.SyncSettings = new MemcachedConfiguration.SynchronizationSettings + { + SyncServers = new[] + { + new MemcachedConfiguration.SyncServer + { + Address = $"http://localhost:{port2}", + ClusterName = "test2" + } + }, + CacheSyncCircuitBreaker = new MemcachedConfiguration.CacheSyncCircuitBreakerSettings + { + Interval = TimeSpan.FromSeconds(2), + SwitchOffTime = TimeSpan.FromSeconds(1), + MaxErrors = 3 + } + }; + }); + }); + }); + + var httpServerFixture2 = new HttpServerFixture + { + Port = port2 + }.WithWebHostBuilder(builder => + { + builder.ConfigureTestServices(services => + { + services.Configure(configuration => + { + configuration.SyncSettings = new MemcachedConfiguration.SynchronizationSettings + { + SyncServers = new[] + { + new MemcachedConfiguration.SyncServer + { + Address = $"http://localhost:{port1}", + ClusterName = "test1" + } + }, + CacheSyncCircuitBreaker = new MemcachedConfiguration.CacheSyncCircuitBreakerSettings + { + Interval = TimeSpan.FromSeconds(2), + SwitchOffTime = TimeSpan.FromSeconds(1), + MaxErrors = 3 + } + }; + }); + }); + }); + ; + + var client1 = new MemcachedWebApiClient(httpServerFixture1.CreateDefaultClient()); + var client2 = new MemcachedWebApiClient(httpServerFixture2.CreateDefaultClient()); + + var keyValues = Enumerable.Range(0, 5) + .ToDictionary(_ => Guid.NewGuid().ToString(), _ => Guid.NewGuid().ToString()); + + var multiStoreResult = await client1.MultiStore(new MultiStoreRequest + { + KeyValues = keyValues, + ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(2) + }); + + multiStoreResult.SyncSuccess.Should().BeTrue(); + + multiStoreResult = await client1.MultiStore(new MultiStoreRequest + { + KeyValues = keyValues, + ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(2) + }); + + multiStoreResult.SyncSuccess.Should().BeTrue(); + + try + { + await httpServerFixture1.DisposeAsync(); + await httpServerFixture2.DisposeAsync(); + } + catch (Exception) + { + // ignored + } + } [TestMethodWithIgnoreIfSupport] [IgnoreIf(nameof(IsWindows))]