Skip to content
This repository was archived by the owner on Aug 15, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,9 @@ sysinfo.txt
/ForgeAlloy/ForgeServerRegistryService/obj/Debug/netcoreapp3.0
/ForgeAlloy/ForgeSampleGameServer/obj/Debug/netcoreapp3.0
/ForgeAlloy/ForgeNatHolePunch/obj/Debug/netcoreapp3.0
/ForgeAlloy/ForgeSampleGameServer/obj/Release/netcoreapp3.0/.NETCoreApp,Version=v3.0.AssemblyAttributes.cs
/ForgeAlloy/ForgeSampleGameServer/obj/Release/netcoreapp3.0/ForgeSampleGameServer.AssemblyInfo.cs
/ForgeAlloy/ForgeNatHolePunch/obj/Release/netcoreapp3.0/.NETCoreApp,Version=v3.0.AssemblyAttributes.cs
/ForgeAlloy/ForgeNatHolePunch/obj/Release/netcoreapp3.0/ForgeNatHolePunch.AssemblyInfo.cs
/ForgeAlloy/ForgeServerRegistryService/obj/Release/netcoreapp3.0/.NETCoreApp,Version=v3.0.AssemblyAttributes.cs
/ForgeAlloy/ForgeServerRegistryService/obj/Release/netcoreapp3.0/ForgeServerRegistryService.AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ public void SendReliableMessage(IMessage message)

public void SendReliableMessage(IMessage message, INetPlayer player)
{
MessageBus.SendMessage(message, SocketFacade.ManagedSocket, player.EndPoint);
MessageBus.SendReliableMessage(message, SocketFacade.ManagedSocket, player.EndPoint);
}

public void SendReliableMessage(IMessage message, EndPoint endpoint)
{
MessageBus.SendMessage(message, SocketFacade.ManagedSocket, endpoint);
MessageBus.SendReliableMessage(message, SocketFacade.ManagedSocket, endpoint);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,17 @@ public abstract class ForgeMessage : IMessage
public abstract IMessageInterpreter Interpreter { get; }
public abstract void Serialize(BMSByte buffer);
public abstract void Deserialize(BMSByte buffer);
public bool IsPooled { get; set; } = false;
public bool IsBuffered { get; set; } = false;
public bool IsSent { get; set; } = false;
public void Sent()
{
IsSent = true;
OnMessageSent?.Invoke(this);
}
public void Unbuffered()
{
IsBuffered = false;
OnMessageSent?.Invoke(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public static int GetCodeFromType(Type type)
return code;
}

public static T Instantiate<T>()
{
int code = GetCodeFromType(typeof(T));
return (T)Instantiate(code);
}

public static void Clear()
{
_messageTypes.Clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public void Clear()
lock (_messages)
{
_messages.Clear();
// TODO: Need to unbuffer the messages, otherwise they will never return to message pool
}
}

Expand Down Expand Up @@ -74,6 +75,7 @@ public void AddMessage(IMessage message, EndPoint sender)
kv = new Dictionary<IMessageReceiptSignature, IMessage>();
_messages.Add(sender, kv);
}
message.IsBuffered = true;
kv.Add(message.Receipt, message);
}
}
Expand Down Expand Up @@ -104,11 +106,24 @@ public void AddMessage(IMessage message, EndPoint sender, int ttlMilliseconds)

public void RemoveAllFor(EndPoint sender)
{
var copy = new List<IMessage>();

lock (_messages)
{
var removals = new List<IMessageReceiptSignature>();
if (_messages.TryGetValue(sender, out var kv))
{
foreach (var mkv in kv)
copy.Add(mkv.Value);
}
_messages.Remove(sender);
}

try
{
foreach (var m in copy) m.Unbuffered();
}
catch { }
}

public void RemoveMessage(EndPoint sender, IMessage message)
Expand All @@ -127,7 +142,14 @@ private void RemoveFromMessageLookup(EndPoint sender, IMessageReceiptSignature r
lock (_messages)
{
if (_messages.TryGetValue(sender, out var kv))
{
try
{
kv[receipt].Unbuffered();
}
catch { } // Catch just in case message already removed
kv.Remove(receipt);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,10 @@ public interface IMessage
void Serialize(BMSByte buffer);
void Deserialize(BMSByte buffer);
void Sent();
void Unbuffered();
bool IsPooled { get; set; }
bool IsBuffered { get; set; }
bool IsSent { get; set; }

}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;

namespace Forge.Networking.Messaging
{
public class MessagePoolMulti
{
private readonly Dictionary<Type, Queue<IMessage>> _messagePools = new Dictionary<Type, Queue<IMessage>>();
private readonly Dictionary<Type, ConcurrentQueue<IMessage>> _messagePools = new Dictionary<Type, ConcurrentQueue<IMessage>>();

public IMessage Get(Type t)
{
var pool = GetPool(t);
if (pool.Count == 0)
return CreateNewMessageForPool(t, pool);
else
return pool.Dequeue();
{
// Try to dequeue, but if locked default to create new
IMessage item;
if (pool.TryDequeue(out item))
{
item.IsPooled = false;
item.Receipt = null;
return item;
}
else return CreateNewMessageForPool(t, pool);
}
}

public T Get<T>() where T : IMessage, new()
Expand All @@ -22,27 +33,37 @@ public IMessage Get(Type t)
if (pool.Count == 0)
return CreateNewMessageForPool<T>(pool);
else
return (T)pool.Dequeue();
{
// Try to dequeue, but if locked default to create new
IMessage item;
if (pool.TryDequeue(out item))
{
item.IsPooled = false;
item.Receipt = null;
return (T)item;
}
else return CreateNewMessageForPool<T>(pool);
}
}

private Queue<IMessage> GetPool(Type type)
private ConcurrentQueue<IMessage> GetPool(Type type)
{
if (!_messagePools.TryGetValue(type, out var pool))
{
pool = new Queue<IMessage>();
pool = new ConcurrentQueue<IMessage>();
_messagePools.Add(type, pool);
}
return pool;
}

private T CreateNewMessageForPool<T>(Queue<IMessage> pool) where T : IMessage, new()
private T CreateNewMessageForPool<T>(ConcurrentQueue<IMessage> pool) where T : IMessage, new()
{
T m = new T();
m.OnMessageSent += Release;
return m;
}

private IMessage CreateNewMessageForPool(Type t, Queue<IMessage> pool)
private IMessage CreateNewMessageForPool(Type t, ConcurrentQueue<IMessage> pool)
{
IMessage m = (IMessage)Activator.CreateInstance(t);
m.OnMessageSent += Release;
Expand All @@ -51,7 +72,12 @@ private IMessage CreateNewMessageForPool(Type t, Queue<IMessage> pool)

private void Release(IMessage message)
{
Queue<IMessage> pool = GetPool(message.GetType());
if (message.IsPooled) return; // Message has already been returned to pool
if (!message.IsSent) return; // Message has not been sent, not ready to return to pool
if (message.IsBuffered) return; // Message is still buffered, not ready to return to pool
message.IsSent = false;
message.IsPooled = true;
ConcurrentQueue<IMessage> pool = GetPool(message.GetType());
pool.Enqueue(message);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public bool Exists(EndPoint endpoint)
return _playerAddressLookup.TryGetValue(endpoint, out _);
}

public bool Exists(IPlayerSignature id)
{
return _playerLookup.TryGetValue(id, out _);
}

public IEnumerator<INetPlayer> GetEnumerator()
{
return _playerLookup.Values.GetEnumerator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public interface IPlayerRepository
/// </param>
/// <returns>True if the player matching the endpoint was found, otherwise false</returns>
bool Exists(EndPoint endpoint);
bool Exists(IPlayerSignature id);

/// <summary>
/// Used to get the internal enumerator of the repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,28 @@ public override void ShutDown()
base.ShutDown();
}

public void ChallengeSuccess(INetworkMediator netContainer, EndPoint endpoint)
public void ChallengeSuccess(INetworkMediator netContainer, EndPoint endpoint)
{
INetPlayer player;
ForgeNetworkIdentityMessage netIdentity;
ForgeNetworkIdentityMessage netIdentity = null;
lock (_challengedPlayers)
{
player = _challengedPlayers.GetPlayer(endpoint);
netContainer.PlayerRepository.AddPlayer(player);
netIdentity = new ForgeNetworkIdentityMessage
if (_challengedPlayers.Exists(endpoint))
{
Identity = player.Id
};
_challengedPlayers.RemovePlayer(player);
player = _challengedPlayers.GetPlayer(endpoint);
if (!netContainer.PlayerRepository.Exists(endpoint))
{
netContainer.PlayerRepository.AddPlayer(player);
netIdentity = new ForgeNetworkIdentityMessage
{
Identity = player.Id
};
}
_challengedPlayers.RemovePlayer(player);
}
}
netContainer.MessageBus.SendReliableMessage(netIdentity, ManagedSocket, endpoint);
if (netIdentity != null)
netContainer.MessageBus.SendReliableMessage(netIdentity, ManagedSocket, endpoint);
}

protected override void ProcessMessageRead(BMSByte buffer, EndPoint sender)
Expand All @@ -83,13 +90,26 @@ protected override void ProcessMessageRead(BMSByte buffer, EndPoint sender)

private void CreatePlayer(object state)
{
bool sendChallenge = false;
var sender = (EndPoint)state;
var newPlayer = AbstractFactory.Get<INetworkTypeFactory>().GetNew<INetPlayer>();
newPlayer.EndPoint = sender;
newPlayer.LastCommunication = DateTime.Now;
_challengedPlayers.AddPlayer(newPlayer);
var challengeMessage = AbstractFactory.Get<INetworkTypeFactory>().GetNew<IChallengeMessage>();
networkMediator.MessageBus.SendReliableMessage(challengeMessage, ManagedSocket, sender);

lock (_challengedPlayers)
{
if (!_challengedPlayers.Exists(sender))
{
var newPlayer = AbstractFactory.Get<INetworkTypeFactory>().GetNew<INetPlayer>();
newPlayer.EndPoint = sender;
newPlayer.LastCommunication = DateTime.Now;
_challengedPlayers.AddPlayer(newPlayer);
sendChallenge = true;
}
}

if (sendChallenge)
{
var challengeMessage = AbstractFactory.Get<INetworkTypeFactory>().GetNew<IChallengeMessage>();
networkMediator.MessageBus.SendReliableMessage(challengeMessage, ManagedSocket, sender);
}
}

protected void ProcessPlayerMessageRead(INetPlayer player, BMSByte buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,50 @@ public class BMSBytePool
private const int APPROX_SIZE_ZONE = 128;
private readonly List<BMSByte> _availableBuffers = new List<BMSByte>();
private readonly List<BMSByte> _inUseBuffers = new List<BMSByte>();
private System.Object syncLock = new object();

public BMSByte Get(int size)
{
if (_availableBuffers.Count == 0)
return CreateNewBuffer(size);
else
BMSByte buff;

lock (syncLock)
{
BMSByte buff = GetAvailableBuffer(size);
if (buff.byteArr.Length < size)
buff.SetArraySize(size);
return buff;
if (_availableBuffers.Count == 0)
buff = CreateNewBuffer(size);
else
{
buff = GetAvailableBuffer(size);
if (buff.byteArr.Length < size)
buff.SetArraySize(size);
}
}
return buff;
}

public void Release(BMSByte buffer)
{
// TODO: Figure out why _inUseBuffers.Remove(buffer) is returning false
bool found = false;
for (int i = 0; i < _inUseBuffers.Count; i++)

lock (syncLock)
{
if (_inUseBuffers[i] == buffer)
for (int i = 0; i < _inUseBuffers.Count; i++)
{
_inUseBuffers.RemoveAt(i);
found = true;
break;
if (_inUseBuffers[i] == buffer)
{
_inUseBuffers.RemoveAt(i);
found = true;
break;
}
}
if (found)
{
_availableBuffers.Add(buffer);
buffer.Clear();
}
}
if (!found)
throw new BMSBytePoolReleaseUnmanagedBufferException();
_availableBuffers.Add(buffer);
buffer.Clear();

}

private BMSByte GetAvailableBuffer(int size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ private void Start()

private void OnDestroy()
{
_engine.EntityRepository.Remove(this);
if (_engine.EntityRepository.Exists(this.Id))
_engine.EntityRepository.Remove(this);
}

private void OnValidate()
{

// OnValidate is only called in the editor
if (Application.isPlaying) return;

var entities = GameObject.FindObjectsOfType<NetworkEntity>();
List<string> _currentIds = entities.Where(e => e != this).Select(e => e._sceneIdentifier).ToList();
foreach (var e in entities)
Expand Down