Skip to content

Commit 0d82bb5

Browse files
committed
optimzie relay
1 parent 1c54748 commit 0d82bb5

13 files changed

+221
-167
lines changed

Relay/AdminChatBot.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ private void WebSocketHandlerOnNewConnection(object? sender, string newConnectio
7272
{
7373
string message =
7474
$"This relay has not yet been configured. We have generated a temporary admin key that you can use to configure. Simply import the following {TemporaryAdminPrivateKey.ToNIP19()} and send a DM to itself with \"/admin config\" to see config and \"/admin update {{CONFIG}} to set config.";
75-
_stateManager.PendingMessages.Writer.TryWrite((newConnection,
76-
JsonSerializer.Serialize(new[] {"NOTICE", message})));
75+
_ = _stateManager.Send(newConnection,
76+
JsonSerializer.Serialize(new[] {"NOTICE", message}));
7777
}
7878
}
7979
}
@@ -147,7 +147,7 @@ await ReplyToEvent(evt.Id,evt.PublicKey,
147147
{
148148
case "info":
149149
await ReplyToEvent(evt.Id,evt.PublicKey, @"
150-
Active connections: " + _connectionManager.Connections.Count + @"
150+
Active connections: " + _stateManager.Connections.Count + @"
151151
Active subscriptions: " + _stateManager.ConnectionSubscriptionsToFilters.Count );
152152
break;
153153
case "config":

Relay/ConnectionManager.cs

+4-56
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
1-
using System;
21
using System.Collections.Concurrent;
3-
using System.Linq;
42
using System.Net.WebSockets;
5-
using System.Text.Json;
63
using System.Threading;
74
using System.Threading.Tasks;
85
using Microsoft.Extensions.Hosting;
96
using Microsoft.Extensions.Logging;
10-
using Microsoft.Extensions.Options;
11-
using NNostr.Client;
127

138
namespace Relay
149
{
@@ -17,9 +12,7 @@ public class ConnectionManager : IHostedService
1712
private readonly StateManager _stateManager;
1813
private readonly ILogger<ConnectionManager> _logger;
1914
private readonly NostrEventService _nostrEventService;
20-
private Task _processingSendMessages = Task.CompletedTask;
21-
private CancellationTokenSource _cts;
22-
public ConcurrentDictionary<string, WebSocket> Connections { get; set; } = new();
15+
2316

2417

2518
public ConnectionManager(StateManager stateManager, ILogger<ConnectionManager> logger,
@@ -32,74 +25,29 @@ public ConnectionManager(StateManager stateManager, ILogger<ConnectionManager> l
3225

3326
public virtual Task StartAsync(CancellationToken cancellationToken)
3427
{
35-
_cts = new CancellationTokenSource();
36-
_processingSendMessages = ProcessSendMessages(_cts.Token);
3728
_nostrEventService.EventsMatched += (sender, matched) => { _ = NostrEventServiceOnEventsMatched(matched); };
3829
return Task.CompletedTask;
3930
}
4031

4132
private async Task NostrEventServiceOnEventsMatched(NostrEventsMatched e)
4233
{
43-
if (!Connections.ContainsKey(e.ConnectionId))
34+
if (!_stateManager.Connections.ContainsKey(e.ConnectionId))
4435
{
4536
_stateManager.RemoveConnection(e.ConnectionId);
4637
return;
4738
}
4839

4940
foreach (var nostrEvent in e.Events)
5041
{
51-
await _stateManager.PendingMessages.Writer.WriteAsync((e.ConnectionId,
52-
JsonSerializer.Serialize(new object[]
53-
{
54-
"EVENT",
55-
e.SubscriptionId, nostrEvent
56-
})));
42+
await _stateManager.SendEvent(e.ConnectionId,e.SubscriptionId, nostrEvent);
5743
}
58-
e.OnSent.SetResult();
5944
_logger.LogInformation($"Sent {e.Events.Length} events to {e.ConnectionId} for subscription {e.SubscriptionId}");
6045
}
6146

62-
private async Task ProcessSendMessages(CancellationToken cancellationToken)
63-
{
64-
while (await _stateManager.PendingMessages.Reader.WaitToReadAsync(cancellationToken))
65-
{
66-
if (_stateManager.PendingMessages.Reader.TryRead(out var evt))
67-
{
68-
try
69-
{
70-
if (Connections.TryGetValue(evt.Item1, out var conn))
71-
{
72-
_logger.LogTrace($"sending message to connection {evt.connectionId}\n{evt.message}");
73-
await conn.SendMessageAsync(evt.Item2, cancellationToken);
74-
}
75-
else
76-
{
77-
_logger.LogWarning(
78-
$"Had to send a message to a connection that no longer exists {evt.Item1}");
79-
}
80-
}
81-
catch when (cancellationToken.IsCancellationRequested)
82-
{
83-
throw;
84-
}
85-
catch (Exception ex)
86-
{
87-
_logger.LogWarning(ex, $"Unhandled exception in {this.GetType().Name}");
88-
}
89-
}
90-
}
91-
}
47+
9248

9349
public virtual async Task StopAsync(CancellationToken cancellationToken)
9450
{
95-
_cts?.Cancel();
96-
try
97-
{
98-
await _processingSendMessages;
99-
}
100-
catch (OperationCanceledException)
101-
{
102-
}
10351
}
10452
}
10553
}

Relay/Extensions.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Data.Entity;
12
using System.Linq;
23
using LinqKit;
34
using Microsoft.EntityFrameworkCore;
@@ -11,7 +12,7 @@ public static IQueryable<TNostrEvent> Filter<TNostrEvent, TEventTag>(this IQuery
1112
NostrSubscriptionFilter[] filters) where TNostrEvent : BaseNostrEvent<TEventTag>
1213
where TEventTag : NostrEventTag
1314
{
14-
var results = filters.Select(filter => { return events.Filter<TNostrEvent, TEventTag>(filter); });
15+
var results = filters.Select(events.Filter<TNostrEvent, TEventTag>);
1516
// union the results
1617
return results.Aggregate((a, b) => a.Union(b));
1718
}
@@ -78,6 +79,6 @@ public static IQueryable<TNostrEvent> Filter<TNostrEvent, TEventTag>(this IQuery
7879
filterQuery = filterQuery.OrderByDescending(e => e.CreatedAt).Take(filter.Limit.Value);
7980
}
8081

81-
return filterQuery;
82+
return filterQuery.AsStreaming();
8283
}
8384
}

Relay/INostrMessageHandler.cs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1+
using System;
12
using System.Threading.Tasks;
3+
using Microsoft.Extensions.Logging;
24

35
namespace Relay
46
{
57
public interface INostrMessageHandler
68
{
9+
ILogger Logger { get; }
710
public Task Handle(string connectionId, string msg) {
811
try {
912
return HandleCore(connectionId, msg);
10-
} catch {
13+
} catch(Exception e){
14+
Logger.LogError($"Error handling message {msg}", e);
1115
return Task.CompletedTask;
1216
}
1317
}

Relay/MessageHandlers/CloseNostrMessageHandler.cs

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ public CloseNostrMessageHandler(StateManager stateManager, NostrEventService nos
1919
_logger = logger;
2020
}
2121

22+
public ILogger Logger => _logger;
23+
2224
public async Task HandleCore(string connectionId, string msg)
2325
{
2426
if (!msg.StartsWith($"[\"{PREFIX}"))

Relay/MessageHandlers/EventNostrMessageHandler.cs

+5-13
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,18 @@ private async Task ProcessEventMessages(CancellationToken cancellationToken)
5151

5252
if (count < _options.CurrentValue.Nip13Difficulty)
5353
{
54-
WriteOkMessage(evt.Item1, e.Id, false, $"pow: difficulty {count} is less than {_options.CurrentValue.Nip13Difficulty}");
54+
await _stateManager.SendOk(evt.Item1, e.Id, false, $"pow: difficulty {count} is less than {_options.CurrentValue.Nip13Difficulty}");
5555
}
5656
}else if (e.Verify<RelayNostrEvent, RelayNostrEventTag>())
5757
{
5858
var tuple = await _nostrEventService.AddEvent(e);
5959

60-
WriteOkMessage(evt.Item1, tuple.eventId, tuple.success, tuple.reason);
60+
await _stateManager.SendOk(evt.Item1, tuple.eventId, tuple.success, tuple.reason);
6161

6262
}
6363
else
6464
{
65-
WriteOkMessage(evt.Item1, e.Id, false, "invalid: event could not be verified");
65+
await _stateManager.SendOk(evt.Item1, e.Id, false, "invalid: event could not be verified");
6666
}
6767
}
6868
catch (Exception exception)
@@ -73,16 +73,8 @@ private async Task ProcessEventMessages(CancellationToken cancellationToken)
7373
}
7474
}
7575

76-
private void WriteOkMessage(string connection, string eventId, bool success, string reason)
77-
{
78-
_stateManager.PendingMessages.Writer.TryWrite((connection, JsonSerializer.Serialize(new object[]
79-
{
80-
"OK",
81-
eventId,
82-
success,
83-
reason
84-
})));
85-
}
76+
77+
public ILogger Logger => _logger;
8678

8779
public async Task HandleCore(string connectionId, string msg)
8880
{
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
1-
using System;
2-
using System.Collections.Concurrent;
31
using System.Collections.Generic;
4-
using System.Linq;
52
using System.Text.Json;
63
using System.Threading.Tasks;
74
using Microsoft.Extensions.Logging;
85
using Microsoft.Extensions.Options;
96
using NNostr.Client;
10-
using Relay.Data;
11-
127
namespace Relay
138
{
149
public class RequestNostrMessageHandler : INostrMessageHandler
@@ -32,12 +27,15 @@ public RequestNostrMessageHandler(
3227
_logger = logger;
3328
}
3429

30+
public Microsoft.Extensions.Logging.ILogger Logger => _logger;
31+
3532
public async Task HandleCore(string connectionId, string msg)
3633
{
3734
if (!msg.StartsWith($"[\"{PREFIX}"))
3835
{
3936
return;
4037
}
38+
4139
var json = JsonDocument.Parse(msg).RootElement;
4240

4341
var id = json[1].GetString();
@@ -48,40 +46,26 @@ public async Task HandleCore(string connectionId, string msg)
4846
}
4947

5048
_stateManager.AddSubscription(connectionId, id, filters.ToArray());
49+
_logger.LogInformation($"Added subscription {id} for {connectionId}");
5150
var matchedEvents = await _nostrEventService.GetFromDB(filters.ToArray());
52-
if (matchedEvents.Length > 0)
51+
using (matchedEvents.Item1)
5352
{
54-
var matched = new NostrEventsMatched()
55-
{
56-
Events = matchedEvents,
57-
ConnectionId = connectionId,
58-
SubscriptionId = id
59-
};
60-
_nostrEventService.InvokeMatched(matched);
61-
62-
if (_options.CurrentValue.EnableNip15)
53+
var count = 0;
54+
await foreach (var e in matchedEvents.Item2)
6355
{
64-
matched.OnSent.Task.ContinueWith(task =>
65-
{
66-
return _stateManager.PendingMessages.Writer.WaitToWriteAsync().AsTask().ContinueWith(_ =>
67-
SendEOSE(connectionId, id));
68-
});
56+
await _stateManager.SendEvent(connectionId, id, e);
57+
count++;
6958
}
59+
60+
_logger.LogInformation($"sent {count} initial events to {connectionId} for subscription {id}");
7061
}
71-
else if (_options.CurrentValue.EnableNip15)
62+
63+
if (_options.CurrentValue.EnableNip15)
7264
{
73-
await SendEOSE(connectionId, id);
65+
await _stateManager.SendEOSE(connectionId, id);
7466
}
7567
}
7668

77-
private async Task SendEOSE(string connectionId, string subscriptionId)
78-
{
79-
await _stateManager.PendingMessages.Writer.WriteAsync((connectionId,
80-
JsonSerializer.Serialize(new[]
81-
{
82-
"EOSE",
83-
subscriptionId
84-
})));
85-
}
69+
8670
}
8771
}

Relay/MultiValueDictionary.cs

+11
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,17 @@ public bool AddOrReplace(TKey key, TValue value)
4949
public bool Remove(TKey key)
5050
{
5151
return _dictionary.TryRemove(key, out _);
52+
}
53+
public bool Remove(TKey key, out TValue[]? items)
54+
{
55+
if(_dictionary.TryRemove(key, out var x))
56+
{
57+
items = x.ToArray();
58+
return true;
59+
60+
}
61+
items = null;
62+
return false;
5263
}
5364
public bool Remove(TKey key, TValue value)
5465
{

0 commit comments

Comments
 (0)