Skip to content

Commit 719b93f

Browse files
authored
Add Redis (#20)
1 parent c049708 commit 719b93f

22 files changed

Lines changed: 1536 additions & 1 deletion

Directory.Packages.props

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
<PackageVersion Include="Paramore.Brighter.MessagingGateway.Kafka" Version="$(BrighterVersion)" />
3131
<PackageVersion Include="Paramore.Brighter.MessagingGateway.MsSql" Version="$(BrighterVersion)" />
3232
<PackageVersion Include="Paramore.Brighter.MessagingGateway.Postgres" Version="$(BrighterVersion)" />
33+
<PackageVersion Include="Paramore.Brighter.MessagingGateway.Redis" Version="10.0.2" />
3334
<PackageVersion Include="Paramore.Brighter.MessagingGateway.RMQ.Sync" Version="$(BrighterVersion)" />
3435
<PackageVersion Include="Paramore.Brighter.MessagingGateway.RMQ.Async" Version="$(BrighterVersion)" />
3536
<PackageVersion Include="Paramore.Brighter.MessagingGateway.RocketMQ" Version="$(BrighterVersion)" />
@@ -53,4 +54,4 @@
5354
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
5455
</PackageVersion>
5556
</ItemGroup>
56-
</Project>
57+
</Project>

Fluent.Brighter.sln

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluent.Brighter.SqlServer",
4747
EndProject
4848
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqlServerSample", "samples\SqlServerSample\SqlServerSample.csproj", "{7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}"
4949
EndProject
50+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluent.Brighter.Redis", "src\Fluent.Brighter.Redis\Fluent.Brighter.Redis.csproj", "{8DC14D04-F8D0-41CB-8964-B75257CEFE68}"
51+
EndProject
52+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisSample", "samples\RedisSample\RedisSample.csproj", "{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}"
53+
EndProject
5054
Global
5155
GlobalSection(SolutionConfigurationPlatforms) = preSolution
5256
Debug|Any CPU = Debug|Any CPU
@@ -285,6 +289,30 @@ Global
285289
{7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}.Release|x64.Build.0 = Release|Any CPU
286290
{7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}.Release|x86.ActiveCfg = Release|Any CPU
287291
{7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}.Release|x86.Build.0 = Release|Any CPU
292+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
293+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|Any CPU.Build.0 = Debug|Any CPU
294+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x64.ActiveCfg = Debug|Any CPU
295+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x64.Build.0 = Debug|Any CPU
296+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x86.ActiveCfg = Debug|Any CPU
297+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x86.Build.0 = Debug|Any CPU
298+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|Any CPU.ActiveCfg = Release|Any CPU
299+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|Any CPU.Build.0 = Release|Any CPU
300+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x64.ActiveCfg = Release|Any CPU
301+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x64.Build.0 = Release|Any CPU
302+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x86.ActiveCfg = Release|Any CPU
303+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x86.Build.0 = Release|Any CPU
304+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
305+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|Any CPU.Build.0 = Debug|Any CPU
306+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x64.ActiveCfg = Debug|Any CPU
307+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x64.Build.0 = Debug|Any CPU
308+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x86.ActiveCfg = Debug|Any CPU
309+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x86.Build.0 = Debug|Any CPU
310+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|Any CPU.ActiveCfg = Release|Any CPU
311+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|Any CPU.Build.0 = Release|Any CPU
312+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x64.ActiveCfg = Release|Any CPU
313+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x64.Build.0 = Release|Any CPU
314+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x86.ActiveCfg = Release|Any CPU
315+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x86.Build.0 = Release|Any CPU
288316
EndGlobalSection
289317
GlobalSection(SolutionProperties) = preSolution
290318
HideSolutionNode = FALSE
@@ -309,5 +337,7 @@ Global
309337
{523946F8-241E-4DF6-92A3-875B1DDCDF39} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA}
310338
{1BBDF3C1-F4FF-404E-A5A9-6E3A743280BF} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
311339
{7DB6D8C1-5EDE-4EF1-953B-515C26164F9C} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA}
340+
{8DC14D04-F8D0-41CB-8964-B75257CEFE68} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
341+
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA}
312342
EndGlobalSection
313343
EndGlobal
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using Paramore.Brighter;
2+
3+
namespace RedisSample.Commands;
4+
5+
public class FarewellEvent(string farewell) : Event(Id.Random())
6+
{
7+
public FarewellEvent() : this(string.Empty)
8+
{
9+
}
10+
11+
public string Farewell { get; set; } = farewell;
12+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using Paramore.Brighter;
2+
3+
namespace RedisSample.Commands;
4+
5+
public class GreetingEvent(string greeting) : Event(Id.Random())
6+
{
7+
public GreetingEvent() : this(string.Empty) { }
8+
9+
public string Greeting { get; set; } = greeting;
10+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using Paramore.Brighter;
2+
3+
using RedisSample.Commands;
4+
5+
namespace RedisSample.Handlers;
6+
7+
public class FarewellEventHandler : RequestHandler<FarewellEvent>
8+
{
9+
public override FarewellEvent Handle(FarewellEvent @event)
10+
{
11+
Console.WriteLine(">>>>>> {0}", @event.Farewell);
12+
return base.Handle(@event);
13+
}
14+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using Paramore.Brighter;
2+
3+
using RedisSample.Commands;
4+
5+
namespace RedisSample.Handlers;
6+
7+
public class GreetingEventHandler : RequestHandler<GreetingEvent>
8+
{
9+
public override GreetingEvent Handle(GreetingEvent @event)
10+
{
11+
Console.WriteLine("====== {0}", @event.Greeting);
12+
return base.Handle(@event);
13+
}
14+
}

samples/RedisSample/Program.cs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// See https://aka.ms/new-console-template for more information
2+
3+
using Fluent.Brighter;
4+
5+
using Microsoft.Extensions.DependencyInjection;
6+
using Microsoft.Extensions.Hosting;
7+
8+
using Paramore.Brighter;
9+
using Paramore.Brighter.ServiceActivator.Extensions.Hosting;
10+
11+
using RedisSample.Commands;
12+
13+
using Serilog;
14+
15+
Log.Logger = new LoggerConfiguration()
16+
.MinimumLevel.Information()
17+
.Enrich.FromLogContext()
18+
.WriteTo.Console()
19+
.CreateLogger();
20+
21+
22+
var host = new HostBuilder()
23+
.UseSerilog()
24+
.ConfigureServices((_, services) =>
25+
{
26+
services
27+
.AddHostedService<ServiceActivatorHostedService>()
28+
.AddFluentBrighter(builder =>
29+
{
30+
builder
31+
.UsingRedis(cfg =>
32+
{
33+
cfg.SetConnection(c => c
34+
.SetRedisConnectionString("redis://localhost:6379?ConnectTimeout=1000&SendTimeout=1000")
35+
.SetMaxPoolSize(10)
36+
.SetMessageTimeToLive(TimeSpan.FromMinutes(10))
37+
.SetDefaultRetryTimeout(3_000));
38+
39+
cfg
40+
.UsePublications(pp => pp
41+
.AddPublication<GreetingEvent>(p => p.SetTopic("greeting.topic"))
42+
.AddPublication<FarewellEvent>(p => p.SetTopic("farewell.topic")))
43+
.UseSubscriptions(sb => sb
44+
.AddSubscription<GreetingEvent>(s => s
45+
.SetTopic("greeting.topic")
46+
.SetQueue("greeting.queue")
47+
.SetMessagePumpType(MessagePumpType.Reactor))
48+
.AddSubscription<FarewellEvent>(s => s
49+
.SetTopic("farewell.topic")
50+
.SetQueue("farewell.queue")
51+
.SetMessagePumpType(MessagePumpType.Reactor)));
52+
});
53+
});
54+
})
55+
.Build();
56+
57+
await host.StartAsync();
58+
59+
while (true)
60+
{
61+
await Task.Delay(TimeSpan.FromSeconds(10));
62+
Console.Write("Say your name (or q to quit): ");
63+
var name = Console.ReadLine();
64+
65+
if (string.IsNullOrEmpty(name))
66+
{
67+
continue;
68+
}
69+
70+
if (name == "q")
71+
{
72+
break;
73+
}
74+
75+
var process = host.Services.GetRequiredService<IAmACommandProcessor>();
76+
await process.PostAsync(new GreetingEvent(name));
77+
await process.PostAsync(new FarewellEvent(name));
78+
}
79+
80+
await host.StopAsync();
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net9.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
<IsPackable>false</IsPackable>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<ProjectReference Include="..\..\src\Fluent.Brighter.Redis\Fluent.Brighter.Redis.csproj" />
13+
</ItemGroup>
14+
15+
<ItemGroup>
16+
<PackageReference Include="Paramore.Brighter.ServiceActivator.Extensions.Hosting" />
17+
<PackageReference Include="Serilog.AspNetCore" />
18+
</ItemGroup>
19+
</Project>
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
services:
2+
redis:
3+
image: redis
4+
ports:
5+
- "6379:6379"
6+
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
using System;
2+
3+
using Paramore.Brighter;
4+
using Paramore.Brighter.MessagingGateway.Redis;
5+
6+
namespace Fluent.Brighter.Redis.Extensions;
7+
8+
/// <summary>
9+
/// Provides extension methods for <see cref="ConsumerBuilder"/> to configure Redis-based message consumers.
10+
/// These extensions enable easy setup of Redis subscriptions and channel factories.
11+
/// </summary>
12+
public static class ConsumerBuilderExtensions
13+
{
14+
/// <summary>
15+
/// Adds a pre-configured Redis subscription to the consumer builder.
16+
/// </summary>
17+
/// <param name="builder">The <see cref="ConsumerBuilder"/> instance to configure.</param>
18+
/// <param name="subscription">The pre-configured <see cref="RedisSubscription"/> to add.</param>
19+
/// <returns>The <see cref="ConsumerBuilder"/> instance for method chaining.</returns>
20+
public static ConsumerBuilder AddRedisSubscription(this ConsumerBuilder builder, RedisSubscription subscription)
21+
{
22+
return builder.AddSubscription(subscription);
23+
}
24+
25+
/// <summary>
26+
/// Adds a Redis subscription to the consumer builder using a configuration action.
27+
/// This method creates a new <see cref="RedisSubscriptionBuilder"/>, applies the configuration, and builds the subscription.
28+
/// </summary>
29+
/// <param name="builder">The <see cref="ConsumerBuilder"/> instance to configure.</param>
30+
/// <param name="configure">An action that configures the <see cref="RedisSubscriptionBuilder"/> with subscription settings.</param>
31+
/// <returns>The <see cref="ConsumerBuilder"/> instance for method chaining.</returns>
32+
public static ConsumerBuilder AddRedisSubscription(this ConsumerBuilder builder,
33+
Action<RedisSubscriptionBuilder> configure)
34+
{
35+
var sub = new RedisSubscriptionBuilder();
36+
configure(sub);
37+
return builder.AddSubscription(sub.Build());
38+
}
39+
40+
/// <summary>
41+
/// Adds a Redis subscription for a specific request type to the consumer builder.
42+
/// This method automatically configures the subscription with the specified <typeparamref name="TRequest"/> type
43+
/// and then applies additional configuration provided by the action.
44+
/// </summary>
45+
/// <typeparam name="TRequest">The type of request/message that this subscription will handle. Must implement <see cref="IRequest"/>.</typeparam>
46+
/// <param name="builder">The <see cref="ConsumerBuilder"/> instance to configure.</param>
47+
/// <param name="configure">An action that configures the <see cref="RedisSubscriptionBuilder"/> with additional settings.</param>
48+
/// <returns>The <see cref="ConsumerBuilder"/> instance for method chaining.</returns>
49+
public static ConsumerBuilder AddRedisSubscription<TRequest>(this ConsumerBuilder builder,
50+
Action<RedisSubscriptionBuilder> configure)
51+
where TRequest : class, IRequest
52+
{
53+
var sub = new RedisSubscriptionBuilder();
54+
sub.SetDataType(typeof(TRequest));
55+
configure(sub);
56+
return builder.AddSubscription(sub.Build());
57+
}
58+
59+
/// <summary>
60+
/// Adds a Redis channel factory to the consumer builder using a configuration builder.
61+
/// Channel factories are responsible for creating message channels that consume messages from Redis.
62+
/// </summary>
63+
/// <param name="builder">The <see cref="ConsumerBuilder"/> instance to configure.</param>
64+
/// <param name="configure">An action that configures the <see cref="RedisMessagingGatewayConfigurationBuilder"/> with connection details.</param>
65+
/// <returns>The <see cref="ConsumerBuilder"/> instance for method chaining.</returns>
66+
public static ConsumerBuilder AdRedisChannelFactory(this ConsumerBuilder builder,
67+
Action<RedisMessagingGatewayConfigurationBuilder> configure)
68+
{
69+
var connection = new RedisMessagingGatewayConfigurationBuilder();
70+
configure(connection);
71+
return builder.AddRedisChannelFactory(connection.Build());
72+
}
73+
74+
/// <summary>
75+
/// Adds a Redis channel factory to the consumer builder using a pre-configured Redis messaging gateway configuration.
76+
/// Channel factories are responsible for creating message channels that consume messages from Redis.
77+
/// </summary>
78+
/// <param name="builder">The <see cref="ConsumerBuilder"/> instance to configure.</param>
79+
/// <param name="connection">The pre-configured Redis messaging gateway configuration containing connection details.</param>
80+
/// <returns>The <see cref="ConsumerBuilder"/> instance for method chaining.</returns>
81+
public static ConsumerBuilder AddRedisChannelFactory(this ConsumerBuilder builder, RedisMessagingGatewayConfiguration connection)
82+
{
83+
return builder
84+
.AddChannelFactory(new ChannelFactory(new RedisMessageConsumerFactory(connection)));
85+
}
86+
}

0 commit comments

Comments
 (0)