Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,18 @@ private void OnChat(ReceivedMessage<Decentraland.Kernel.Comms.Rfc4.Chat> receive
if(!isCommunitiesIncluded && channelType == ChatChannel.ChatChannelType.COMMUNITY)
return;

if (messageDeduplication.TryPass(receivedMessage.FromWalletId, receivedMessage.Payload.Timestamp) == false
|| IsUserBlockedAndMessagesHidden(receivedMessage.FromWalletId))
string walletId = receivedMessage.Payload.HasForwardedFrom ? receivedMessage.Payload.ForwardedFrom
: receivedMessage.FromWalletId;

if (messageDeduplication.TryPass(receivedMessage.FromWalletId, receivedMessage.Payload.Timestamp) == false)
return;

// If the user that sends the message is banned from the current scene, we ignore the message
if (channelType == ChatChannel.ChatChannelType.NEARBY && BannedUsersFromCurrentScene.Instance.IsUserBanned(walletId))
return;

// If the user that sends the message is blocked and the settings indicate to hide messages from blocked users, we ignore the message
if (IsUserBlockedAndMessagesHidden(walletId))
return;

ChatChannel.ChannelId parsedChannelId;
Expand All @@ -124,13 +134,6 @@ private void OnChat(ReceivedMessage<Decentraland.Kernel.Comms.Rfc4.Chat> receive
break;
}

string walletId = receivedMessage.Payload.HasForwardedFrom ? receivedMessage.Payload.ForwardedFrom
: receivedMessage.FromWalletId;

// If the user that sends the message is banned from the current scene, we ignore the message
if (channelType == ChatChannel.ChatChannelType.NEARBY && BannedUsersFromCurrentScene.Instance.IsUserBanned(walletId))
return;

ChatMessage newMessage = messageFactory.CreateChatMessage(walletId, false, receivedMessage.Payload.Message, null, receivedMessage.Payload.Timestamp);

MessageAdded?.Invoke(parsedChannelId, channelType, newMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
using DCL.Communities.CommunitiesDataProvider;
using DCL.Communities.CommunitiesDataProvider.DTOs;
using DCL.Diagnostics;
using DCL.Friends.UserBlocking;
using DCL.Optimization.Pools;
using DCL.Utilities;
using DCL.Utilities.Extensions;
using DCL.Utility.Types;
using Decentraland.SocialService.V2;
Expand Down Expand Up @@ -33,19 +35,21 @@ public class CommunityUserStateService : ICurrentChannelUserStateService, IDispo
private readonly CommunitiesEventBus communitiesEventBus;
private readonly IEventBus eventBus;
private readonly IWeb3IdentityCache web3IdentityCache;
private readonly ObjectProxy<IUserBlockingCache> userBlockingCache;

private readonly IChatHistory chatHistory;

private readonly Dictionary<ChatChannel.ChannelId, HashSet<string>> onlineParticipantsPerChannel = new (10);
private readonly Dictionary<ChatChannel.ChannelId, (HashSet<string> normal, HashSet<string> blocked)> onlineParticipantsPerChannel = new (10);

public CommunityUserStateService(CommunitiesDataProvider communitiesDataProvider, CommunitiesEventBus communitiesEventBus, IEventBus eventBus, IChatHistory chatHistory,
IWeb3IdentityCache web3IdentityCache)
IWeb3IdentityCache web3IdentityCache, ObjectProxy<IUserBlockingCache> userBlockingCache)
{
this.communitiesDataProvider = communitiesDataProvider;
this.communitiesEventBus = communitiesEventBus;
this.eventBus = eventBus;
this.chatHistory = chatHistory;
this.web3IdentityCache = web3IdentityCache;
this.userBlockingCache = userBlockingCache;

OnlineParticipants = Array.Empty<string>();
SubscribeToEvents();
Expand All @@ -58,23 +62,24 @@ private void OnChannelAdded(ChatChannel addedChannel)
if (onlineParticipantsPerChannel.ContainsKey(addedChannel.Id))
return;

onlineParticipantsPerChannel.SyncAdd(addedChannel.Id, HASHSET_POOL.Get());
onlineParticipantsPerChannel.SyncAdd(addedChannel.Id, (HASHSET_POOL.Get(), HASHSET_POOL.Get()));
InitializeOnlineMembersAsync(addedChannel.Id, lifeTimeCts.Token).Forget();
}

private void OnChannelRemoved(ChatChannel.ChannelId id, ChatChannel.ChatChannelType channelType)
{
if (onlineParticipantsPerChannel.SyncTryGetValue(id, out HashSet<string>? onlineParticipants))
if (onlineParticipantsPerChannel.SyncTryGetValue(id, out var onlineParticipants))
{
HASHSET_POOL.Release(onlineParticipants);
HASHSET_POOL.Release(onlineParticipants.normal);
HASHSET_POOL.Release(onlineParticipants.blocked);
onlineParticipantsPerChannel.SyncRemove(id);
}
}

public void Activate(ChatChannel.ChannelId communityChannelId)
{
OnlineParticipants = onlineParticipantsPerChannel.SyncTryGetValue(communityChannelId, out HashSet<string>? onlineParticipants)
? onlineParticipants
OnlineParticipants = onlineParticipantsPerChannel.SyncTryGetValue(communityChannelId, out var onlineParticipants)
? onlineParticipants.normal
: Array.Empty<string>();

currentChannelId = communityChannelId;
Expand All @@ -86,27 +91,31 @@ private async UniTaskVoid InitializeOnlineMembersAsync(ChatChannel.ChannelId com
if (!result.Success) return;

// At this point the channel can be already removed
if (!onlineParticipantsPerChannel.SyncTryGetValue(communityChannelId, out HashSet<string>? onlineParticipants))
if (!onlineParticipantsPerChannel.SyncTryGetValue(communityChannelId, out var onlineParticipants))
return;

onlineParticipants.Clear();
onlineParticipants.normal.Clear();
onlineParticipants.blocked.Clear();

GetCommunityMembersResponse response = result.Value;

string? localPlayerAddress = web3IdentityCache.Identity?.Address;

foreach (GetCommunityMembersResponse.MemberData memberData in response.data.results)
{
if (!string.IsNullOrEmpty(localPlayerAddress) && memberData.memberAddress == localPlayerAddress)
if ((!string.IsNullOrEmpty(localPlayerAddress) && memberData.memberAddress == localPlayerAddress) || (userBlockingCache.Configured && userBlockingCache.StrictObject.UserIsBlocked(memberData.memberAddress)))
continue;

onlineParticipants.Add(memberData.memberAddress);
if (userBlockingCache.Configured && userBlockingCache.StrictObject.UserIsBlocked(memberData.memberAddress))
onlineParticipants.blocked.Add(memberData.memberAddress);
else
onlineParticipants.normal.Add(memberData.memberAddress);
}

// Edge case - the channel is initialized AFTER the community is selected
// (on the moment of the community selection the online users collection was empty)
if (currentChannelId.Equals(communityChannelId))
eventBus.Publish(new ChatEvents.ChannelUsersStatusUpdated(communityChannelId, ChatChannel.ChatChannelType.COMMUNITY, onlineParticipants));
eventBus.Publish(new ChatEvents.ChannelUsersStatusUpdated(communityChannelId, ChatChannel.ChatChannelType.COMMUNITY, onlineParticipants.normal));
}

public void Deactivate()
Expand All @@ -126,32 +135,64 @@ public void Dispose()
private void UserConnectedToCommunity(CommunityMemberConnectivityUpdate userConnectivity)
{
ChatChannel.ChannelId communityChannelId = ChatChannel.NewCommunityChannelId(userConnectivity.CommunityId);

if (!onlineParticipantsPerChannel.TryGetValue(communityChannelId, out var onlineParticipants))
return;

if (userBlockingCache.Configured && userBlockingCache.StrictObject.UserIsBlocked(userConnectivity.Member.Address))
{
onlineParticipants.blocked.Add(userConnectivity.Member.Address);
return;
}

SetOnline(communityChannelId, userConnectivity.Member.Address);
}

private void UserDisconnectedFromCommunity(CommunityMemberConnectivityUpdate userConnectivity)
{
ChatChannel.ChannelId communityChannelId = ChatChannel.NewCommunityChannelId(userConnectivity.CommunityId);

if (!onlineParticipantsPerChannel.TryGetValue(communityChannelId, out var onlineParticipants))
return;

if (userBlockingCache.Configured && userBlockingCache.StrictObject.UserIsBlocked(userConnectivity.Member.Address))
onlineParticipants.blocked.Remove(userConnectivity.Member.Address);

SetOffline(communityChannelId, userConnectivity.Member.Address);
}

private void UnblockedTrySetOnline(string userId)
{
foreach (var entry in onlineParticipantsPerChannel)
if (entry.Value.blocked.Remove(userId))
SetOnline(entry.Key, userId);
}

private void BlockedTrySetOffline(string userId)
{
foreach (var entry in onlineParticipantsPerChannel)
if (entry.Value.normal.Contains(userId))
if (entry.Value.blocked.Add(userId))
SetOffline(entry.Key, userId);
}

private void SetOnline(ChatChannel.ChannelId channelId, string userId)
{
if (!onlineParticipantsPerChannel.TryGetValue(channelId, out HashSet<string>? onlineParticipants))
if (!onlineParticipantsPerChannel.TryGetValue(channelId, out var onlineParticipants))
return;

// Notifications for non-current channel are not sent as it's not needed from the design standpoint (it's possible to open only one community at a time)
if (onlineParticipants.Add(userId) && currentChannelId.Equals(channelId))
if (onlineParticipants.normal.Add(userId) && currentChannelId.Equals(channelId))
eventBus.Publish(new ChatEvents.UserStatusUpdatedEvent(channelId, ChatChannel.ChatChannelType.COMMUNITY, userId, true));
}

private void SetOffline(ChatChannel.ChannelId channelId, string userId)
{
if (!onlineParticipantsPerChannel.TryGetValue(channelId, out HashSet<string>? onlineParticipants))
if (!onlineParticipantsPerChannel.TryGetValue(channelId, out var onlineParticipants))
return;

// Notifications for non-current channel are not sent as it's not needed from the design standpoint (it's possible to open only one community at a time)
if (onlineParticipants.Remove(userId) && currentChannelId.Equals(channelId))
if (onlineParticipants.normal.Remove(userId) && currentChannelId.Equals(channelId))
eventBus.Publish(new ChatEvents.UserStatusUpdatedEvent(channelId, ChatChannel.ChatChannelType.COMMUNITY, userId, false));
}

Expand All @@ -162,7 +203,8 @@ public void Reset()

foreach (var onlineList in onlineParticipantsPerChannel.Values)
{
HASHSET_POOL.Release(onlineList);
HASHSET_POOL.Release(onlineList.normal);
HASHSET_POOL.Release(onlineList.blocked);
}

onlineParticipantsPerChannel.Clear();
Expand All @@ -175,6 +217,14 @@ public void SubscribeToEvents()
chatHistory.ChannelRemoved += OnChannelRemoved;
communitiesEventBus.UserConnectedToCommunity += UserConnectedToCommunity;
communitiesEventBus.UserDisconnectedFromCommunity += UserDisconnectedFromCommunity;

userBlockingCache.OnObjectSet += cache =>
{
cache.UserBlocked += BlockedTrySetOffline;
cache.UserBlocksYou += BlockedTrySetOffline;
cache.UserUnblocked += UnblockedTrySetOnline;
cache.UserUnblocksYou += UnblockedTrySetOnline;
};
}

private void UnsubscribeFromEvents()
Expand All @@ -183,6 +233,13 @@ private void UnsubscribeFromEvents()
chatHistory.ChannelRemoved -= OnChannelRemoved;
communitiesEventBus.UserConnectedToCommunity -= UserConnectedToCommunity;
communitiesEventBus.UserDisconnectedFromCommunity -= UserDisconnectedFromCommunity;

if (!userBlockingCache.Configured) return;

userBlockingCache.StrictObject.UserBlocked -= BlockedTrySetOffline;
userBlockingCache.StrictObject.UserBlocksYou -= BlockedTrySetOffline;
userBlockingCache.StrictObject.UserUnblocked -= UnblockedTrySetOnline;
userBlockingCache.StrictObject.UserUnblocksYou -= UnblockedTrySetOnline;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using DCL.Friends.UserBlocking;
using DCL.Multiplayer.Connections.RoomHubs;
using DCL.Optimization.Pools;
using DCL.Utilities;
using LiveKit.Proto;
using LiveKit.Rooms;
using LiveKit.Rooms.Participants;
Expand All @@ -18,8 +19,10 @@ public class NearbyUserStateService : ICurrentChannelUserStateService
{
private readonly IRoomHub roomHub;
private readonly IEventBus eventBus;
private readonly ObjectProxy<IUserBlockingCache> userBlockingCache;

private readonly HashSet<string> onlineParticipants = new (PoolConstants.AVATARS_COUNT);
private readonly HashSet<string> blockedOnlineParticipants = new (PoolConstants.AVATARS_COUNT);

public IReadOnlyCollection<string> OnlineParticipants
{
Expand All @@ -29,10 +32,12 @@ public IReadOnlyCollection<string> OnlineParticipants
}
}

public NearbyUserStateService(IRoomHub roomHub, IEventBus eventBus)
public NearbyUserStateService(IRoomHub roomHub, IEventBus eventBus,
ObjectProxy<IUserBlockingCache> userBlockingCache)
{
this.roomHub = roomHub;
this.eventBus = eventBus;
this.userBlockingCache = userBlockingCache;
}

public void Activate()
Expand All @@ -47,6 +52,13 @@ public void Activate()

roomHub.IslandRoom().ConnectionStateChanged += OnRoomConnectionStateChange;
roomHub.SceneRoom().Room().ConnectionStateChanged += OnRoomConnectionStateChange;

if (!userBlockingCache.Configured) return;

userBlockingCache.StrictObject.UserBlocked += BlockedSetOffline;
userBlockingCache.StrictObject.UserBlocksYou += BlockedSetOffline;
userBlockingCache.StrictObject.UserUnblocked += UnblockedSetOnline;
userBlockingCache.StrictObject.UserUnblocksYou += UnblockedSetOnline;
}

public void Deactivate()
Expand All @@ -58,16 +70,27 @@ public void Deactivate()
roomHub.SceneRoom().Room().ConnectionStateChanged -= OnRoomConnectionStateChange;

lock (onlineParticipants) { onlineParticipants.Clear(); }

if (!userBlockingCache.Configured) return;

userBlockingCache.StrictObject.UserBlocked -= BlockedSetOffline;
userBlockingCache.StrictObject.UserBlocksYou -= BlockedSetOffline;
userBlockingCache.StrictObject.UserUnblocked -= UnblockedSetOnline;
userBlockingCache.StrictObject.UserUnblocksYou -= UnblockedSetOnline;
}

private void RefreshAllOnlineParticipants(IReadOnlyCollection<string> participantIdentities)
{
lock (onlineParticipants)
{
onlineParticipants.Clear();
blockedOnlineParticipants.Clear();

foreach (string participantIdentity in participantIdentities)
onlineParticipants.Add(participantIdentity);
if (userBlockingCache.Configured && userBlockingCache.StrictObject.UserIsBlocked(participantIdentity))
blockedOnlineParticipants.Add(participantIdentity);
else
onlineParticipants.Add(participantIdentity);
}
}

Expand All @@ -85,14 +108,25 @@ private void OnRoomUpdatesFromParticipant(Participant participant, UpdateFromPar
{
lock (onlineParticipants)
{
if (update == UpdateFromParticipant.Connected) { SetOnline(participant.Identity); }
bool participantIsBlocked = userBlockingCache.Configured && userBlockingCache.StrictObject.UserIsBlocked(participant.Identity);

if (update == UpdateFromParticipant.Connected)
{
if (participantIsBlocked)
blockedOnlineParticipants.Add(participant.Identity);
else
SetOnline(participant.Identity);
}
else if (update == UpdateFromParticipant.Disconnected)
{
// User is not disconnected if they are still in another room
if (otherRoom.Participants.RemoteParticipant(participant.Identity) != null)
return;

SetOffline(participant.Identity);
if (participantIsBlocked)
blockedOnlineParticipants.Remove(participant.Identity);
else
SetOffline(participant.Identity);
}
}
}
Expand Down Expand Up @@ -131,5 +165,29 @@ private void SetOffline(string userId)
if (onlineParticipants.Remove(userId))
eventBus.Publish(new ChatEvents.UserStatusUpdatedEvent(ChatChannel.NEARBY_CHANNEL_ID, ChatChannel.ChatChannelType.NEARBY, userId, false));
}

private void UnblockedSetOnline(string userId)
{
lock (onlineParticipants)
{
if (blockedOnlineParticipants.Contains(userId) && onlineParticipants.Add(userId))
{
blockedOnlineParticipants.Remove(userId);
eventBus.Publish(new ChatEvents.UserStatusUpdatedEvent(ChatChannel.NEARBY_CHANNEL_ID, ChatChannel.ChatChannelType.NEARBY, userId, true));
}
}
}

private void BlockedSetOffline(string userId)
{
lock (onlineParticipants)
{
if (onlineParticipants.Remove(userId))
{
blockedOnlineParticipants.Add(userId);
eventBus.Publish(new ChatEvents.UserStatusUpdatedEvent(ChatChannel.NEARBY_CHANNEL_ID, ChatChannel.ChatChannelType.NEARBY, userId, false));
}
}
}
}
}
Loading
Loading