-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathProgram.cs
204 lines (173 loc) · 6.64 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
using MQTTnet.Server;
using Meshtastic.Protobufs;
using Google.Protobuf;
using Serilog;
using MQTTnet.Protocol;
using System.Runtime.Loader;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Serilog.Formatting.Compact;
using Meshtastic.Crypto;
using Meshtastic;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Reflection;
await RunMqttServer(args);
async Task RunMqttServer(string[] args)
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.WriteTo.Console(new RenderedCompactJsonFormatter())
// .WriteTo.File(new RenderedCompactJsonFormatter(), "log.json", rollingInterval: RollingInterval.Hour)
.CreateLogger();
using var mqttServer = new MqttServerFactory()
.CreateMqttServer(BuildMqttServerOptions());
ConfigureMqttServer(mqttServer);
// Set up host
using var host = CreateHostBuilder(args).Build();
await host.StartAsync();
var lifetime = host.Services.GetRequiredService<IHostApplicationLifetime>();
await mqttServer.StartAsync();
// Configure graceful shutdown
await SetupGracefulShutdown(mqttServer, lifetime, host);
}
MqttServerOptions BuildMqttServerOptions()
{
var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location)!;
#pragma warning disable SYSLIB0057 // Type or member is obsolete
var certificate = new X509Certificate2(
Path.Combine(currentPath, "certificate.pfx"),
"large4cats",
X509KeyStorageFlags.Exportable);
#pragma warning restore SYSLIB0057
var options = new MqttServerOptionsBuilder()
.WithoutDefaultEndpoint()
.WithEncryptedEndpoint()
.WithEncryptedEndpointPort(8883)
.WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx))
.WithEncryptionSslProtocol(SslProtocols.Tls12)
.Build();
Log.Logger.Information("Using SSL certificate for MQTT server");
return options;
}
void ConfigureMqttServer(MqttServer mqttServer)
{
mqttServer.InterceptingPublishAsync += HandleInterceptingPublish;
mqttServer.InterceptingSubscriptionAsync += HandleInterceptingSubscription;
mqttServer.ValidatingConnectionAsync += HandleValidatingConnection;
}
async Task HandleInterceptingPublish(InterceptingPublishEventArgs args)
{
try
{
if (args.ApplicationMessage.Payload.Length == 0)
{
Log.Logger.Warning("Received empty payload on topic {@Topic} from {@ClientId}", args.ApplicationMessage.Topic, args.ClientId);
args.ProcessPublish = false;
return;
}
var serviceEnvelope = ServiceEnvelope.Parser.ParseFrom(args.ApplicationMessage.Payload);
if (!IsValidServiceEnvelope(serviceEnvelope))
{
Log.Logger.Warning("Service envelope or packet is malformed. Blocking packet on topic {@Topic} from {@ClientId}",
args.ApplicationMessage.Topic, args.ClientId);
args.ProcessPublish = false;
return;
}
// Spot for any async operations we might want to perform
await Task.FromResult(0);
var data = DecryptMeshPacket(serviceEnvelope);
// Uncomment to block unrecognized packets
// if (data == null)
// {
// Log.Logger.Warning("Service envelope does not contain a valid packet. Blocking packet");
// args.ProcessPublish = false;
// return;
// }
LogReceivedMessage(args.ApplicationMessage.Topic, args.ClientId, data);
args.ProcessPublish = true;
}
catch (InvalidProtocolBufferException)
{
Log.Logger.Warning("Failed to decode presumed protobuf packet. Blocking");
args.ProcessPublish = false;
}
catch (Exception ex)
{
Log.Logger.Error("Exception occurred while processing packet on {@Topic} from {@ClientId}: {@Exception}",
args.ApplicationMessage.Topic, args.ClientId, ex.Message);
args.ProcessPublish = false;
}
}
Task HandleInterceptingSubscription(InterceptingSubscriptionEventArgs args)
{
// Add filtering logic here if needed
args.ProcessSubscription = true;
return Task.CompletedTask;
}
Task HandleValidatingConnection(ValidatingConnectionEventArgs args)
{
// Add connection / authentication logic here if needed
args.ReasonCode = MqttConnectReasonCode.Success;
return Task.CompletedTask;
}
bool IsValidServiceEnvelope(ServiceEnvelope serviceEnvelope)
{
return !(String.IsNullOrWhiteSpace(serviceEnvelope.ChannelId) ||
String.IsNullOrWhiteSpace(serviceEnvelope.GatewayId) ||
serviceEnvelope.Packet == null ||
serviceEnvelope.Packet.Id < 1 ||
serviceEnvelope.Packet.From < 1 ||
serviceEnvelope.Packet.Encrypted == null ||
serviceEnvelope.Packet.Encrypted.Length < 1 ||
serviceEnvelope.Packet.Decoded != null);
}
void LogReceivedMessage(string topic, string clientId, Data? data)
{
if (data?.Portnum == PortNum.TextMessageApp)
{
Log.Logger.Information("Received text message on topic {@Topic} from {@ClientId}: {@Message}",
topic, clientId, data.Payload.ToStringUtf8());
}
else
{
Log.Logger.Information("Received packet on topic {@Topic} from {@ClientId} with port number: {@Portnum}",
topic, clientId, data?.Portnum);
}
}
static Data? DecryptMeshPacket(ServiceEnvelope serviceEnvelope)
{
var nonce = new NonceGenerator(serviceEnvelope.Packet.From, serviceEnvelope.Packet.Id).Create();
var decrypted = PacketEncryption.TransformPacket(serviceEnvelope.Packet.Encrypted.ToByteArray(), nonce, Resources.DEFAULT_PSK);
var payload = Data.Parser.ParseFrom(decrypted);
if (payload.Portnum > PortNum.UnknownApp && payload.Payload.Length > 0)
return payload;
return null;
}
async Task SetupGracefulShutdown(MqttServer mqttServer, IHostApplicationLifetime lifetime, IHost host)
{
var ended = new ManualResetEventSlim();
var starting = new ManualResetEventSlim();
AssemblyLoadContext.Default.Unloading += ctx =>
{
starting.Set();
Log.Logger.Debug("Waiting for completion");
ended.Wait();
};
starting.Wait();
Log.Logger.Debug("Received signal gracefully shutting down");
await mqttServer.StopAsync();
Thread.Sleep(500);
ended.Set();
lifetime.StopApplication();
await host.WaitForShutdownAsync();
}
static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder(args)
.UseConsoleLifetime()
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton(Console.Out);
});
}