Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion MQTTnet.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@


Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31919.166
Expand Down Expand Up @@ -32,6 +32,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp", "Source\M
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.AspTestApp", "Source\MQTTnet.AspTestApp\MQTTnet.AspTestApp.csproj", "{72867E4C-4E15-4E8E-8FAB-AE9253286BBC}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.Hosting", "Source\MQTTnet.Extensions.Hosting\MQTTnet.Extensions.Hosting.csproj", "{B53FC20B-862C-4F8F-B9FF-E8C8D76A870D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -82,6 +84,10 @@ Global
{72867E4C-4E15-4E8E-8FAB-AE9253286BBC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{72867E4C-4E15-4E8E-8FAB-AE9253286BBC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{72867E4C-4E15-4E8E-8FAB-AE9253286BBC}.Release|Any CPU.Build.0 = Release|Any CPU
{B53FC20B-862C-4F8F-B9FF-E8C8D76A870D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B53FC20B-862C-4F8F-B9FF-E8C8D76A870D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B53FC20B-862C-4F8F-B9FF-E8C8D76A870D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B53FC20B-862C-4F8F-B9FF-E8C8D76A870D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
1 change: 1 addition & 0 deletions Samples/MQTTnet.Samples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<ProjectReference Include="..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj" />
<ProjectReference Include="..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj" />
<ProjectReference Include="..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj" />
<ProjectReference Include="..\Source\MQTTnet.Hosting\MQTTnet.Hosting.csproj" />
<ProjectReference Include="..\Source\MQTTnet\MQTTnet.csproj" />
</ItemGroup>

Expand Down
46 changes: 46 additions & 0 deletions Samples/Server/Server_Hosting_Extensions_Samples.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

// ReSharper disable UnusedType.Global
// ReSharper disable UnusedMember.Global
// ReSharper disable InconsistentNaming
// ReSharper disable EmptyConstructor
// ReSharper disable MemberCanBeMadeStatic.Local

using Microsoft.Extensions.Hosting;
//using MQTTnet.AspNetCore;
using MQTTnet.Server;

namespace MQTTnet.Samples.Server;

public static class Server_Hosting_Extensions_Samples
{

// This could be called as a top-level statement in a Program.cs file
public static Task Start_Single_Line_Server()
=> new HostBuilder().UseMqttServer().Build().RunAsync();

public static Task Start_Simple_Server()
{
var host = new HostBuilder()
.UseMqttServer()
.Build();

return host.RunAsync();
}

public static Task Start_Server()
{
var builder = new HostBuilder();

builder
.UseMqttServer(mqtt =>
{
mqtt.WithDefaultEndpoint();
});

var host = builder.Build();
return host.RunAsync();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace MQTTnet.Extensions.Hosting.Events
{
public delegate Task<bool> HttpWebSocketClientAuthenticationCallback();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Extensions.Hosting;
using MQTTnet.Extensions.Hosting.Implementations;
using MQTTnet.Extensions.Hosting.Options;
using MQTTnet.Implementations;
using MQTTnet.Server;
using System;
using System.Collections.Generic;

namespace Microsoft.Extensions.Hosting
{
public static class HostBuilderExtensions
{

public static IHostBuilder UseMqttServer(this IHostBuilder hostBuilder)
=> hostBuilder.UseMqttServer(builder =>
{
builder.WithDefaultEndpoint();
});


public static IHostBuilder UseMqttServer(this IHostBuilder hostBuilder, Action<MqttServerHostingBuilder> configure)
{
var startActions = new List<Action<MqttServer>>();
var stopActions = new List<Action<MqttServer>>();
hostBuilder.ConfigureServices((context, services) =>
{
services.AddSingleton(s =>
{
var builder = new MqttServerHostingBuilder(s, startActions, stopActions);
configure(builder);
return builder.Build();
});

var logger = new MqttNetEventLogger();

services
.AddSingleton<IMqttNetLogger>(logger)
.AddSingleton<MqttHostedServer>()
.AddSingleton<MqttServerHostingOptions>()
.AddSingleton<IHostedService>(s => s.GetRequiredService<MqttHostedServer>())
.AddSingleton<IHostedService>(s => new MqttServerConfigurationHostedService(s, startActions, stopActions))
.AddSingleton<MqttServer>(s => s.GetRequiredService<MqttHostedServer>())

.AddSingleton<MqttTcpServerAdapter>()
.AddSingleton<IMqttServerAdapter>(s => s.GetRequiredService<MqttTcpServerAdapter>())

.AddSingleton<MqttServerWebSocketConnectionHandler>()

.AddSingleton<MqttWebSocketServerAdapter>()
.AddSingleton<IMqttServerAdapter>(s => s.GetRequiredService<MqttWebSocketServerAdapter>());

});
return hostBuilder;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.Extensions.Hosting;
using MQTTnet.Extensions.Hosting.Implementations;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.WebSockets;
using System.Text;

namespace MQTTnet.Server
{
public static class HostingMqttServerExtensions
{

public static void HandleWebSocketConnection(this MqttServer server, HttpListenerWebSocketContext webSocketContext, HttpListenerContext httpListenerContext)
{
if (!(server is MqttHostedServer mqttHostedServer))
throw new InvalidOperationException("The server must be started through hosting extensions.");

mqttHostedServer.ServiceProvider.GetRequiredService<MqttServerWebSocketConnectionHandler>().HandleWebSocketConnection(webSocketContext, httpListenerContext);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using Microsoft.Extensions.Hosting;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.WebSockets;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Extensions.Hosting.Implementations
{
public class MqttServerWebSocketConnectionHandler : IHostedService, IDisposable
{
readonly CancellationTokenSource _cts = new CancellationTokenSource();
readonly MqttWebSocketServerAdapter _adapter;
readonly IMqttNetLogger _logger;

public MqttServerWebSocketConnectionHandler(MqttWebSocketServerAdapter adapter, IMqttNetLogger logger)
{
_adapter = adapter;
_logger = logger;
}

public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
_cts.Cancel();

return Task.CompletedTask;
}

public void HandleWebSocketConnection(HttpListenerWebSocketContext webSocketContext, HttpListenerContext httpListenerContext, X509Certificate2 clientCertificate = null)
{
_ = Task.Factory.StartNew(() => TryHandleWebSocketConnectionAsync(webSocketContext, httpListenerContext, clientCertificate));
}

async Task TryHandleWebSocketConnectionAsync(HttpListenerWebSocketContext webSocketContext, HttpListenerContext httpListenerContext, X509Certificate2 clientCertificate)
{
if (webSocketContext == null) throw new ArgumentNullException(nameof(webSocketContext));
var endpoint = $"{httpListenerContext.Request.RemoteEndPoint.Address}:{httpListenerContext.Request.RemoteEndPoint.Port}";

try
{
var clientHandler = _adapter.ClientHandler;
if (clientHandler != null)
{
var formatter = new MqttPacketFormatterAdapter(new MqttBufferWriter(4096, 65535));
var channel = new MqttWebSocketChannel(webSocketContext.WebSocket, endpoint, webSocketContext.IsSecureConnection, clientCertificate);
using (var channelAdapter = new MqttChannelAdapter(channel, formatter, null, _logger))
{
await clientHandler(channelAdapter).ConfigureAwait(false);
}
}
}
finally
{
clientCertificate?.Dispose();
}
}

public void Dispose()
{
_cts.Dispose();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Net;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Net.WebSockets;
using System.Net.Http;
using MQTTnet.Formatter;
using MQTTnet.Extensions.Hosting;
using MQTTnet.Extensions.Hosting.Options;
using MQTTnet.Extensions.Hosting.Implementations;
using Microsoft.Extensions.DependencyInjection;

namespace MQTTnet.Implementations
{
public class MqttWebSocketServerAdapter : IMqttServerAdapter
{
readonly List<MqttWebSocketServerListener> _listeners = new List<MqttWebSocketServerListener>();
readonly IServiceProvider _services;
readonly MqttServerHostingOptions _hostingOptions;
MqttServerOptions _serverOptions;
IMqttNetLogger _logger;

public MqttWebSocketServerAdapter(IServiceProvider services, MqttServerHostingOptions hostingOptions)
{
_services = services;
_hostingOptions = hostingOptions;
}

public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }

public Task StartAsync(MqttServerOptions options, IMqttNetLogger logger)
{
_serverOptions = options;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

if (_hostingOptions.DefaultWebSocketEndpointOptions.IsEnabled)
{
_listeners.Add(ActivatorUtilities.CreateInstance<MqttWebSocketServerListener>(_services, options, _hostingOptions.DefaultWebSocketEndpointOptions));
}

if (_hostingOptions.DefaultTlsWebSocketEndpointOptions.IsEnabled)
{
_listeners.Add(ActivatorUtilities.CreateInstance<MqttWebSocketServerListener>(_services, options, _hostingOptions.DefaultTlsWebSocketEndpointOptions));
}

foreach (var listener in _listeners)
{
listener.Start(CancellationToken.None);
}

return CompletedTask.Instance;
}

public Task StopAsync()
{
foreach (var listener in _listeners)
{
listener.Dispose();
}

_listeners.Clear();

return CompletedTask.Instance;
}

public void Dispose()
{
foreach (var listener in _listeners)
{
listener.Dispose();
}
}

}
}
Loading