From 55cdd20e213f5f75286ab338065919909ff425b6 Mon Sep 17 00:00:00 2001 From: zaddo67 Date: Tue, 25 May 2021 19:40:40 +1000 Subject: [PATCH 1/2] - Messages added back to pool while still active in message bus stored messages and repeater lists. - Make MessagePoolMulti thread safe as it is called from alloy networking thread and main thread - Make BMSBytePool thread safe - Two of the SendReliableMessage methods were not sending reliably - ForgeUDPSocketServerFacade ChallengeSuccess assumed challenged players still exists in list. Changed to check entry exists. (Might be an issue unique to my project, ok to ignore unless you have same issue) - Helper method in ForgeMessageCodes to instantiate message from message type (Can be ignored) - Helper Method in IPlayerRepository to check if player signature exists (Can be ignored) --- .../Networking/ForgeNetworkMediator.cs | 4 +- .../Networking/Messaging/ForgeMessage.cs | 9 ++++ .../Networking/Messaging/ForgeMessageCodes.cs | 6 +++ .../Messaging/ForgeMessageRepository.cs | 22 ++++++++ .../Networking/Messaging/IMessage.cs | 5 ++ .../Networking/Messaging/MessagePoolMulti.cs | 41 ++++++++++++--- .../Players/ForgePlayerRepository.cs | 5 ++ .../Networking/Players/IPlayerRepository.cs | 1 + .../Sockets/ForgeUDPSocketServerFacade.cs | 50 +++++++++++++------ .../Serialization/BMSBytePool.cs | 43 ++++++++++------ .../src/Entity/NetworkEntity.cs | 7 ++- 11 files changed, 152 insertions(+), 41 deletions(-) diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/ForgeNetworkMediator.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/ForgeNetworkMediator.cs index 93f46038..dd48cf60 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/ForgeNetworkMediator.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/ForgeNetworkMediator.cs @@ -91,12 +91,12 @@ public void SendReliableMessage(IMessage message) public void SendReliableMessage(IMessage message, INetPlayer player) { - MessageBus.SendMessage(message, SocketFacade.ManagedSocket, player.EndPoint); + MessageBus.SendReliableMessage(message, SocketFacade.ManagedSocket, player.EndPoint); } public void SendReliableMessage(IMessage message, EndPoint endpoint) { - MessageBus.SendMessage(message, SocketFacade.ManagedSocket, endpoint); + MessageBus.SendReliableMessage(message, SocketFacade.ManagedSocket, endpoint); } } } diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessage.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessage.cs index 84d844fc..bbaac2b3 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessage.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessage.cs @@ -9,8 +9,17 @@ public abstract class ForgeMessage : IMessage public abstract IMessageInterpreter Interpreter { get; } public abstract void Serialize(BMSByte buffer); public abstract void Deserialize(BMSByte buffer); + public bool IsPooled { get; set; } = false; + public bool IsBuffered { get; set; } = false; + public bool IsSent { get; set; } = false; public void Sent() { + IsSent = true; + OnMessageSent?.Invoke(this); + } + public void Unbuffered() + { + IsBuffered = false; OnMessageSent?.Invoke(this); } } diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessageCodes.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessageCodes.cs index c8afe777..c6bbd75d 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessageCodes.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessageCodes.cs @@ -53,6 +53,12 @@ public static int GetCodeFromType(Type type) return code; } + public static T Instantiate() + { + int code = GetCodeFromType(typeof(T)); + return (T)Instantiate(code); + } + public static void Clear() { _messageTypes.Clear(); diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessageRepository.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessageRepository.cs index eebbb2a6..355d0e84 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessageRepository.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/ForgeMessageRepository.cs @@ -30,6 +30,7 @@ public void Clear() lock (_messages) { _messages.Clear(); + // TODO: Need to unbuffer the messages, otherwise they will never return to message pool } } @@ -74,6 +75,7 @@ public void AddMessage(IMessage message, EndPoint sender) kv = new Dictionary(); _messages.Add(sender, kv); } + message.IsBuffered = true; kv.Add(message.Receipt, message); } } @@ -104,11 +106,24 @@ public void AddMessage(IMessage message, EndPoint sender, int ttlMilliseconds) public void RemoveAllFor(EndPoint sender) { + var copy = new List(); + lock (_messages) { var removals = new List(); + if (_messages.TryGetValue(sender, out var kv)) + { + foreach (var mkv in kv) + copy.Add(mkv.Value); + } _messages.Remove(sender); } + + try + { + foreach (var m in copy) m.Unbuffered(); + } + catch { } } public void RemoveMessage(EndPoint sender, IMessage message) @@ -127,7 +142,14 @@ private void RemoveFromMessageLookup(EndPoint sender, IMessageReceiptSignature r lock (_messages) { if (_messages.TryGetValue(sender, out var kv)) + { + try + { + kv[receipt].Unbuffered(); + } + catch { } // Catch just in case message already removed kv.Remove(receipt); + } } } diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/IMessage.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/IMessage.cs index c2111ffa..62aebc61 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/IMessage.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/IMessage.cs @@ -12,5 +12,10 @@ public interface IMessage void Serialize(BMSByte buffer); void Deserialize(BMSByte buffer); void Sent(); + void Unbuffered(); + bool IsPooled { get; set; } + bool IsBuffered { get; set; } + bool IsSent { get; set; } + } } diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/MessagePoolMulti.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/MessagePoolMulti.cs index e2c7040e..7d92cc5b 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/MessagePoolMulti.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/MessagePoolMulti.cs @@ -1,11 +1,12 @@ using System; using System.Collections.Generic; +using System.Collections.Concurrent; namespace Forge.Networking.Messaging { public class MessagePoolMulti { - private readonly Dictionary> _messagePools = new Dictionary>(); + private readonly Dictionary> _messagePools = new Dictionary>(); public IMessage Get(Type t) { @@ -13,7 +14,16 @@ public IMessage Get(Type t) if (pool.Count == 0) return CreateNewMessageForPool(t, pool); else - return pool.Dequeue(); + { + // Try to dequeue, but if locked default to create new + IMessage item; + if (pool.TryDequeue(out item)) + { + item.IsPooled = false; + return item; + } + else return CreateNewMessageForPool(t, pool); + } } public T Get() where T : IMessage, new() @@ -22,27 +32,36 @@ public IMessage Get(Type t) if (pool.Count == 0) return CreateNewMessageForPool(pool); else - return (T)pool.Dequeue(); + { + // Try to dequeue, but if locked default to create new + IMessage item; + if (pool.TryDequeue(out item)) + { + item.IsPooled = false; + return (T)item; + } + else return CreateNewMessageForPool(pool); + } } - private Queue GetPool(Type type) + private ConcurrentQueue GetPool(Type type) { if (!_messagePools.TryGetValue(type, out var pool)) { - pool = new Queue(); + pool = new ConcurrentQueue(); _messagePools.Add(type, pool); } return pool; } - private T CreateNewMessageForPool(Queue pool) where T : IMessage, new() + private T CreateNewMessageForPool(ConcurrentQueue pool) where T : IMessage, new() { T m = new T(); m.OnMessageSent += Release; return m; } - private IMessage CreateNewMessageForPool(Type t, Queue pool) + private IMessage CreateNewMessageForPool(Type t, ConcurrentQueue pool) { IMessage m = (IMessage)Activator.CreateInstance(t); m.OnMessageSent += Release; @@ -51,7 +70,13 @@ private IMessage CreateNewMessageForPool(Type t, Queue pool) private void Release(IMessage message) { - Queue pool = GetPool(message.GetType()); + if (message.IsPooled) return; // Message has already been returned to pool + if (!message.IsSent) return; // Message has not been sent, not ready to return to pool + if (message.IsBuffered) return; // Message is still buffered, not ready to return to pool + message.IsSent = false; + message.IsPooled = true; + message.Receipt = null; + ConcurrentQueue pool = GetPool(message.GetType()); pool.Enqueue(message); } } diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Players/ForgePlayerRepository.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Players/ForgePlayerRepository.cs index 3d42ab2d..c1d09902 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Players/ForgePlayerRepository.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Players/ForgePlayerRepository.cs @@ -57,6 +57,11 @@ public bool Exists(EndPoint endpoint) return _playerAddressLookup.TryGetValue(endpoint, out _); } + public bool Exists(IPlayerSignature id) + { + return _playerLookup.TryGetValue(id, out _); + } + public IEnumerator GetEnumerator() { return _playerLookup.Values.GetEnumerator(); diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Players/IPlayerRepository.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Players/IPlayerRepository.cs index 8243a142..6cc90897 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Players/IPlayerRepository.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Players/IPlayerRepository.cs @@ -102,6 +102,7 @@ public interface IPlayerRepository /// /// True if the player matching the endpoint was found, otherwise false bool Exists(EndPoint endpoint); + bool Exists(IPlayerSignature id); /// /// Used to get the internal enumerator of the repository diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Sockets/ForgeUDPSocketServerFacade.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Sockets/ForgeUDPSocketServerFacade.cs index 59df53d8..dcebab64 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Sockets/ForgeUDPSocketServerFacade.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Sockets/ForgeUDPSocketServerFacade.cs @@ -45,21 +45,28 @@ public override void ShutDown() base.ShutDown(); } - public void ChallengeSuccess(INetworkMediator netContainer, EndPoint endpoint) + public void ChallengeSuccess(INetworkMediator netContainer, EndPoint endpoint) { INetPlayer player; - ForgeNetworkIdentityMessage netIdentity; + ForgeNetworkIdentityMessage netIdentity = null; lock (_challengedPlayers) { - player = _challengedPlayers.GetPlayer(endpoint); - netContainer.PlayerRepository.AddPlayer(player); - netIdentity = new ForgeNetworkIdentityMessage + if (_challengedPlayers.Exists(endpoint)) { - Identity = player.Id - }; - _challengedPlayers.RemovePlayer(player); + player = _challengedPlayers.GetPlayer(endpoint); + if (!netContainer.PlayerRepository.Exists(endpoint)) + { + netContainer.PlayerRepository.AddPlayer(player); + netIdentity = new ForgeNetworkIdentityMessage + { + Identity = player.Id + }; + } + _challengedPlayers.RemovePlayer(player); + } } - netContainer.MessageBus.SendReliableMessage(netIdentity, ManagedSocket, endpoint); + if (netIdentity != null) + netContainer.MessageBus.SendReliableMessage(netIdentity, ManagedSocket, endpoint); } protected override void ProcessMessageRead(BMSByte buffer, EndPoint sender) @@ -83,13 +90,26 @@ protected override void ProcessMessageRead(BMSByte buffer, EndPoint sender) private void CreatePlayer(object state) { + bool sendChallenge = false; var sender = (EndPoint)state; - var newPlayer = AbstractFactory.Get().GetNew(); - newPlayer.EndPoint = sender; - newPlayer.LastCommunication = DateTime.Now; - _challengedPlayers.AddPlayer(newPlayer); - var challengeMessage = AbstractFactory.Get().GetNew(); - networkMediator.MessageBus.SendReliableMessage(challengeMessage, ManagedSocket, sender); + + lock (_challengedPlayers) + { + if (!_challengedPlayers.Exists(sender)) + { + var newPlayer = AbstractFactory.Get().GetNew(); + newPlayer.EndPoint = sender; + newPlayer.LastCommunication = DateTime.Now; + _challengedPlayers.AddPlayer(newPlayer); + sendChallenge = true; + } + } + + if (sendChallenge) + { + var challengeMessage = AbstractFactory.Get().GetNew(); + networkMediator.MessageBus.SendReliableMessage(challengeMessage, ManagedSocket, sender); + } } protected void ProcessPlayerMessageRead(INetPlayer player, BMSByte buffer) diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Serialization/BMSBytePool.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Serialization/BMSBytePool.cs index cb17bf10..b96a1eb0 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Serialization/BMSBytePool.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Serialization/BMSBytePool.cs @@ -8,37 +8,50 @@ public class BMSBytePool private const int APPROX_SIZE_ZONE = 128; private readonly List _availableBuffers = new List(); private readonly List _inUseBuffers = new List(); + private System.Object syncLock = new object(); public BMSByte Get(int size) { - if (_availableBuffers.Count == 0) - return CreateNewBuffer(size); - else + BMSByte buff; + + lock (syncLock) { - BMSByte buff = GetAvailableBuffer(size); - if (buff.byteArr.Length < size) - buff.SetArraySize(size); - return buff; + if (_availableBuffers.Count == 0) + buff = CreateNewBuffer(size); + else + { + buff = GetAvailableBuffer(size); + if (buff.byteArr.Length < size) + buff.SetArraySize(size); + } } + return buff; } public void Release(BMSByte buffer) { - // TODO: Figure out why _inUseBuffers.Remove(buffer) is returning false bool found = false; - for (int i = 0; i < _inUseBuffers.Count; i++) + + lock (syncLock) { - if (_inUseBuffers[i] == buffer) + for (int i = 0; i < _inUseBuffers.Count; i++) { - _inUseBuffers.RemoveAt(i); - found = true; - break; + if (_inUseBuffers[i] == buffer) + { + _inUseBuffers.RemoveAt(i); + found = true; + break; + } + } + if (found) + { + _availableBuffers.Add(buffer); + buffer.Clear(); } } if (!found) throw new BMSBytePoolReleaseUnmanagedBufferException(); - _availableBuffers.Add(buffer); - buffer.Clear(); + } private BMSByte GetAvailableBuffer(int size) diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/src/Entity/NetworkEntity.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/src/Entity/NetworkEntity.cs index 0b7de74b..4dcf5e9f 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/src/Entity/NetworkEntity.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/src/Entity/NetworkEntity.cs @@ -35,11 +35,16 @@ private void Start() private void OnDestroy() { - _engine.EntityRepository.Remove(this); + if (_engine.EntityRepository.Exists(this.Id)) + _engine.EntityRepository.Remove(this); } private void OnValidate() { + + // OnValidate is only called in the editor + if (Application.isPlaying) return; + var entities = GameObject.FindObjectsOfType(); List _currentIds = entities.Where(e => e != this).Select(e => e._sceneIdentifier).ToList(); foreach (var e in entities) From 6e9fb5d81faf1fda5db2394d6b596db11128a707 Mon Sep 17 00:00:00 2001 From: zaddo67 Date: Sat, 29 May 2021 07:21:56 +1000 Subject: [PATCH 2/2] Fixed mistake I made with clearing receipt --- .gitignore | 6 ++++++ .../Networking/Messaging/MessagePoolMulti.cs | 3 ++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 67e633d3..a0f4da5c 100644 --- a/.gitignore +++ b/.gitignore @@ -82,3 +82,9 @@ sysinfo.txt /ForgeAlloy/ForgeServerRegistryService/obj/Debug/netcoreapp3.0 /ForgeAlloy/ForgeSampleGameServer/obj/Debug/netcoreapp3.0 /ForgeAlloy/ForgeNatHolePunch/obj/Debug/netcoreapp3.0 +/ForgeAlloy/ForgeSampleGameServer/obj/Release/netcoreapp3.0/.NETCoreApp,Version=v3.0.AssemblyAttributes.cs +/ForgeAlloy/ForgeSampleGameServer/obj/Release/netcoreapp3.0/ForgeSampleGameServer.AssemblyInfo.cs +/ForgeAlloy/ForgeNatHolePunch/obj/Release/netcoreapp3.0/.NETCoreApp,Version=v3.0.AssemblyAttributes.cs +/ForgeAlloy/ForgeNatHolePunch/obj/Release/netcoreapp3.0/ForgeNatHolePunch.AssemblyInfo.cs +/ForgeAlloy/ForgeServerRegistryService/obj/Release/netcoreapp3.0/.NETCoreApp,Version=v3.0.AssemblyAttributes.cs +/ForgeAlloy/ForgeServerRegistryService/obj/Release/netcoreapp3.0/ForgeServerRegistryService.AssemblyInfo.cs diff --git a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/MessagePoolMulti.cs b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/MessagePoolMulti.cs index 7d92cc5b..6167c71b 100644 --- a/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/MessagePoolMulti.cs +++ b/ForgeAlloy/ForgeAlloyUnity/Assets/ForgeNetworking/Networking/Messaging/MessagePoolMulti.cs @@ -20,6 +20,7 @@ public IMessage Get(Type t) if (pool.TryDequeue(out item)) { item.IsPooled = false; + item.Receipt = null; return item; } else return CreateNewMessageForPool(t, pool); @@ -38,6 +39,7 @@ public IMessage Get(Type t) if (pool.TryDequeue(out item)) { item.IsPooled = false; + item.Receipt = null; return (T)item; } else return CreateNewMessageForPool(pool); @@ -75,7 +77,6 @@ private void Release(IMessage message) if (message.IsBuffered) return; // Message is still buffered, not ready to return to pool message.IsSent = false; message.IsPooled = true; - message.Receipt = null; ConcurrentQueue pool = GetPool(message.GetType()); pool.Enqueue(message); }