Skip to content

Commit 488a886

Browse files
committed
(#270) Convert documents to Markdown
1 parent dace5c1 commit 488a886

File tree

6 files changed

+267
-12
lines changed

6 files changed

+267
-12
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
using ClassifiedAds.Application.FileEntries.MessageBusEvents;
2+
using ClassifiedAds.Domain.Entities;
3+
using ClassifiedAds.Domain.Infrastructure.Messaging;
4+
using ClassifiedAds.Domain.Infrastructure.Storages;
5+
using CryptographyHelper;
6+
using CryptographyHelper.SymmetricAlgorithms;
7+
using Microsoft.Extensions.Configuration;
8+
using Microsoft.Extensions.DependencyInjection;
9+
using Microsoft.Extensions.Logging;
10+
using System;
11+
using System.Net.Http;
12+
using System.Net.Http.Headers;
13+
using System.Security.Cryptography;
14+
using System.Threading;
15+
using System.Threading.Tasks;
16+
17+
namespace ClassifiedAds.Background.MessageBusConsumers;
18+
19+
public sealed class FileEmbeddingConsumer :
20+
IMessageBusConsumer<FileEmbeddingConsumer, FileCreatedEvent>,
21+
IMessageBusConsumer<FileEmbeddingConsumer, FileUpdatedEvent>,
22+
IMessageBusConsumer<FileEmbeddingConsumer, FileDeletedEvent>
23+
{
24+
private static readonly HttpClient _httpClient = new HttpClient();
25+
26+
private readonly ILogger<FileEmbeddingConsumer> _logger;
27+
private readonly IConfiguration _configuration;
28+
private readonly IServiceProvider _serviceProvider;
29+
30+
public FileEmbeddingConsumer(ILogger<FileEmbeddingConsumer> logger,
31+
IConfiguration configuration,
32+
IServiceProvider serviceProvider)
33+
{
34+
_logger = logger;
35+
_configuration = configuration;
36+
_serviceProvider = serviceProvider;
37+
}
38+
39+
public async Task HandleAsync(FileCreatedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
40+
{
41+
_logger.LogInformation("Handling FileCreatedEvent for FileEntry Id: {FileEntryId}", data?.FileEntry?.Id);
42+
43+
if (string.IsNullOrEmpty(data?.FileEntry?.FileLocation))
44+
{
45+
return;
46+
}
47+
48+
if (data.FileEntry.FileName.EndsWith(".txt") ||
49+
data.FileEntry.FileName.EndsWith(".md") ||
50+
data.FileEntry.FileName.EndsWith(".markdown"))
51+
{
52+
_logger.LogInformation("Skipping text file for FileEntry Id: {FileEntryId}", data?.FileEntry?.Id);
53+
return;
54+
}
55+
56+
if (data.FileEntry.FileName.EndsWith(".pdf") ||
57+
data.FileEntry.FileName.EndsWith(".docx"))
58+
{
59+
_logger.LogInformation("Converting file to markdown for FileEntry Id: {FileEntryId}", data?.FileEntry?.Id);
60+
61+
var markdown = await ConvertToMarkdownAsync(data.FileEntry, cancellationToken);
62+
63+
return;
64+
}
65+
66+
return;
67+
}
68+
69+
public async Task HandleAsync(FileUpdatedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
70+
{
71+
_logger.LogInformation("Handling FileUpdatedEvent for FileEntry Id: {FileEntryId}", data?.FileEntry?.Id);
72+
73+
if (string.IsNullOrEmpty(data?.FileEntry?.FileLocation))
74+
{
75+
return;
76+
}
77+
78+
if (data.FileEntry.FileName.EndsWith(".txt") ||
79+
data.FileEntry.FileName.EndsWith(".md") ||
80+
data.FileEntry.FileName.EndsWith(".markdown"))
81+
{
82+
_logger.LogInformation("Skipping text file for FileEntry Id: {FileEntryId}", data?.FileEntry?.Id);
83+
return;
84+
}
85+
86+
if (data.FileEntry.FileName.EndsWith(".pdf") ||
87+
data.FileEntry.FileName.EndsWith(".docx"))
88+
{
89+
_logger.LogInformation("Converting file to markdown for FileEntry Id: {FileEntryId}", data?.FileEntry?.Id);
90+
91+
var markdown = await ConvertToMarkdownAsync(data.FileEntry, cancellationToken);
92+
93+
return;
94+
}
95+
96+
return;
97+
}
98+
99+
public Task HandleAsync(FileDeletedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
100+
{
101+
_logger.LogInformation("Handling FileDeletedEvent for FileEntry Id: {FileEntryId}", data?.FileEntry?.Id);
102+
return Task.CompletedTask;
103+
}
104+
105+
private async Task<string> ConvertToMarkdownAsync(FileEntry fileEntry, CancellationToken cancellationToken = default)
106+
{
107+
// TODO: xxx
108+
var content = await _serviceProvider.CreateScope().ServiceProvider.GetRequiredService<IFileStorageManager>().ReadAsync(fileEntry, cancellationToken);
109+
110+
if (fileEntry.Encrypted)
111+
{
112+
var masterEncryptionKey = _configuration["Storage:MasterEncryptionKey"];
113+
var encryptionKey = fileEntry.EncryptionKey.FromBase64String()
114+
.UseAES(masterEncryptionKey.FromBase64String())
115+
.WithCipher(CipherMode.CBC)
116+
.WithIV(fileEntry.EncryptionIV.FromBase64String())
117+
.WithPadding(PaddingMode.PKCS7)
118+
.Decrypt();
119+
120+
content = fileEntry.FileLocation != "Fake.txt"
121+
? content
122+
.UseAES(encryptionKey)
123+
.WithCipher(CipherMode.CBC)
124+
.WithIV(fileEntry.EncryptionIV.FromBase64String())
125+
.WithPadding(PaddingMode.PKCS7)
126+
.Decrypt()
127+
: content;
128+
}
129+
130+
using var form = new MultipartFormDataContent();
131+
using var fileContent = new ByteArrayContent(content);
132+
fileContent.Headers.ContentType = MediaTypeHeaderValue.Parse("multipart/form-data");
133+
form.Add(fileContent, "formFile", fileEntry.FileName);
134+
form.Add(new StringContent("Test Name"), "name");
135+
136+
var response = await _httpClient.PostAsync(_configuration["TextExtracting:MarkItDownServer:Endpoint"], form, cancellationToken);
137+
response.EnsureSuccessStatusCode();
138+
139+
var markdown = await response.Content.ReadAsStringAsync(cancellationToken);
140+
141+
return markdown;
142+
}
143+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
using ClassifiedAds.Application.Products.MessageBusEvents;
2+
using ClassifiedAds.Domain.Infrastructure.Messaging;
3+
using Microsoft.Extensions.Configuration;
4+
using Microsoft.Extensions.Logging;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
8+
namespace ClassifiedAds.Background.MessageBusConsumers;
9+
10+
public sealed class ProductEmbeddingConsumer :
11+
IMessageBusConsumer<ProductEmbeddingConsumer, ProductCreatedEvent>,
12+
IMessageBusConsumer<ProductEmbeddingConsumer, ProductUpdatedEvent>,
13+
IMessageBusConsumer<ProductEmbeddingConsumer, ProductDeletedEvent>
14+
{
15+
private readonly ILogger<ProductEmbeddingConsumer> _logger;
16+
private readonly IConfiguration _configuration;
17+
18+
public ProductEmbeddingConsumer(ILogger<ProductEmbeddingConsumer> logger,
19+
IConfiguration configuration)
20+
{
21+
_logger = logger;
22+
_configuration = configuration;
23+
}
24+
25+
public Task HandleAsync(ProductCreatedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
26+
{
27+
_logger.LogInformation("Handling ProductCreatedEvent for ProductId: {ProductId}", data?.Product?.Id);
28+
return Task.CompletedTask;
29+
}
30+
31+
public Task HandleAsync(ProductUpdatedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
32+
{
33+
_logger.LogInformation("Handling ProductUpdatedEvent for ProductId: {ProductId}", data?.Product?.Id);
34+
return Task.CompletedTask;
35+
}
36+
37+
public Task HandleAsync(ProductDeletedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
38+
{
39+
_logger.LogInformation("Handling ProductDeletedEvent for ProductId: {ProductId}", data?.Product?.Id);
40+
return Task.CompletedTask;
41+
}
42+
}

src/Monolith/ClassifiedAds.Background/OutBoxEventPublishers/FileEntryOutboxMessagePublisher.cs renamed to src/Monolith/ClassifiedAds.Background/OutBoxPublishers/FileEntryOutboxPublisher.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
using System.Threading;
88
using System.Threading.Tasks;
99

10-
namespace ClassifiedAds.Background.OutboxMessagePublishers;
10+
namespace ClassifiedAds.Background.OutBoxPublishers;
1111

12-
public class FileEntryOutboxMessagePublisher : IOutboxMessagePublisher
12+
public class FileEntryOutboxPublisher : IOutboxMessagePublisher
1313
{
1414
private readonly IMessageBus _messageBus;
1515

@@ -23,7 +23,7 @@ public static string CanHandleEventSource()
2323
return typeof(PublishOutboxMessagesCommand).Assembly.GetName().Name;
2424
}
2525

26-
public FileEntryOutboxMessagePublisher(IMessageBus messageBus)
26+
public FileEntryOutboxPublisher(IMessageBus messageBus)
2727
{
2828
_messageBus = messageBus;
2929
}

src/Monolith/ClassifiedAds.Background/OutBoxEventPublishers/ProductOutboxMessagePublisher.cs renamed to src/Monolith/ClassifiedAds.Background/OutBoxPublishers/ProductOutboxPublisher.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
using System.Threading;
88
using System.Threading.Tasks;
99

10-
namespace ClassifiedAds.Background.OutboxMessagePublishers;
10+
namespace ClassifiedAds.Background.OutBoxPublishers;
1111

12-
public class ProductOutboxMessagePublisher : IOutboxMessagePublisher
12+
public class ProductOutboxPublisher : IOutboxMessagePublisher
1313
{
1414
private readonly IMessageBus _messageBus;
1515

@@ -23,7 +23,7 @@ public static string CanHandleEventSource()
2323
return typeof(PublishOutboxMessagesCommand).Assembly.GetName().Name;
2424
}
2525

26-
public ProductOutboxMessagePublisher(IMessageBus messageBus)
26+
public ProductOutboxPublisher(IMessageBus messageBus)
2727
{
2828
_messageBus = messageBus;
2929
}

src/Monolith/ClassifiedAds.Background/Program.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
.AddApplicationServices()
5757
.AddMessageHandlers();
5858

59+
services.AddStorageManager(appSettings.Storage);
60+
5961
services.AddTransient<IMessageBus, MessageBus>();
6062
services.AddMessageBusSender<FileCreatedEvent>(appSettings.Messaging);
6163
services.AddMessageBusSender<FileUpdatedEvent>(appSettings.Messaging);
@@ -69,6 +71,12 @@
6971
services.AddMessageBusReceiver<WebhookConsumer, ProductCreatedEvent>(appSettings.Messaging);
7072
services.AddMessageBusReceiver<WebhookConsumer, ProductUpdatedEvent>(appSettings.Messaging);
7173
services.AddMessageBusReceiver<WebhookConsumer, ProductDeletedEvent>(appSettings.Messaging);
74+
services.AddMessageBusReceiver<FileEmbeddingConsumer, FileCreatedEvent>(appSettings.Messaging);
75+
services.AddMessageBusReceiver<FileEmbeddingConsumer, FileUpdatedEvent>(appSettings.Messaging);
76+
services.AddMessageBusReceiver<FileEmbeddingConsumer, FileDeletedEvent>(appSettings.Messaging);
77+
services.AddMessageBusReceiver<ProductEmbeddingConsumer, ProductCreatedEvent>(appSettings.Messaging);
78+
services.AddMessageBusReceiver<ProductEmbeddingConsumer, ProductUpdatedEvent>(appSettings.Messaging);
79+
services.AddMessageBusReceiver<ProductEmbeddingConsumer, ProductDeletedEvent>(appSettings.Messaging);
7280
services.AddMessageBusConsumers(Assembly.GetExecutingAssembly());
7381
services.AddOutboxMessagePublishers(Assembly.GetExecutingAssembly());
7482

@@ -106,6 +114,12 @@ static void AddHostedServices(IServiceCollection services)
106114
services.AddHostedService<MessageBusConsumerBackgroundService<WebhookConsumer, ProductCreatedEvent>>();
107115
services.AddHostedService<MessageBusConsumerBackgroundService<WebhookConsumer, ProductUpdatedEvent>>();
108116
services.AddHostedService<MessageBusConsumerBackgroundService<WebhookConsumer, ProductDeletedEvent>>();
117+
services.AddHostedService<MessageBusConsumerBackgroundService<FileEmbeddingConsumer, FileCreatedEvent>>();
118+
services.AddHostedService<MessageBusConsumerBackgroundService<FileEmbeddingConsumer, FileUpdatedEvent>>();
119+
services.AddHostedService<MessageBusConsumerBackgroundService<FileEmbeddingConsumer, FileDeletedEvent>>();
120+
services.AddHostedService<MessageBusConsumerBackgroundService<ProductEmbeddingConsumer, ProductCreatedEvent>>();
121+
services.AddHostedService<MessageBusConsumerBackgroundService<ProductEmbeddingConsumer, ProductUpdatedEvent>>();
122+
services.AddHostedService<MessageBusConsumerBackgroundService<ProductEmbeddingConsumer, ProductDeletedEvent>>();
109123
services.AddHostedService<PublishOutboxWorker>();
110124
services.AddHostedService<SendEmailWorker>();
111125
services.AddHostedService<SendSmsWorker>();

src/Monolith/ClassifiedAds.Background/appsettings.json

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,23 @@
2727
}
2828
}
2929
},
30+
"Storage": {
31+
"Provider": "Local",
32+
"MasterEncryptionKey": "+2ZC9wrwlvPswPxCND0BjrKJ3CfOpImGtn4hloVwo2I=",
33+
"Local": {
34+
"Path": "C:\\Data\\files"
35+
},
36+
"Azure": {
37+
"ConnectionString": "DefaultEndpointsProtocol=https;AccountName=xxx;AccountKey=xxx;EndpointSuffix=core.windows.net",
38+
"Container": "classifiedadds"
39+
},
40+
"Amazon": {
41+
"AccessKeyID": "xxx",
42+
"SecretAccessKey": "xxx",
43+
"BucketName": "classifiedadds",
44+
"RegionEndpoint": "ap-southeast-1"
45+
}
46+
},
3047
"Messaging": {
3148
"Provider": "RabbitMQ",
3249
"RabbitMQ": {
@@ -44,12 +61,22 @@
4461
},
4562
"Consumers": {
4663
"WebhookConsumer": {
47-
"FileCreatedEvent": "classifiedadds_file_created",
48-
"FileUpdatedEvent": "classifiedadds_file_updated",
49-
"FileDeletedEvent": "classifiedadds_file_deleted",
50-
"ProductCreatedEvent": "classifiedadds_product_created",
51-
"ProductUpdatedEvent": "classifiedadds_product_updated",
52-
"ProductDeletedEvent": "classifiedadds_product_deleted"
64+
"FileCreatedEvent": "webhook_classifiedadds_file_created",
65+
"FileUpdatedEvent": "webhook_classifiedadds_file_updated",
66+
"FileDeletedEvent": "webhook_classifiedadds_file_deleted",
67+
"ProductCreatedEvent": "webhook_classifiedadds_product_created",
68+
"ProductUpdatedEvent": "webhook_classifiedadds_product_updated",
69+
"ProductDeletedEvent": "webhook_classifiedadds_product_deleted"
70+
},
71+
"FileEmbeddingConsumer": {
72+
"FileCreatedEvent": "file_embedding_classifiedadds_file_created",
73+
"FileUpdatedEvent": "file_embedding_classifiedadds_file_updated",
74+
"FileDeletedEvent": "file_embedding_classifiedadds_file_deleted"
75+
},
76+
"ProductEmbeddingConsumer": {
77+
"ProductCreatedEvent": "product_embedding_classifiedadds_product_created",
78+
"ProductUpdatedEvent": "product_embedding_classifiedadds_product_updated",
79+
"ProductDeletedEvent": "product_embedding_classifiedadds_product_deleted"
5380
}
5481
}
5582
},
@@ -180,5 +207,34 @@
180207
"PayloadUrl": "https://ddddotnet-webhook-server.azurewebsites.net/test",
181208
"Secret": ""
182209
}
210+
},
211+
"MessageBusConsumers": {
212+
"WebhookConsumer": {
213+
"Enabled": true
214+
},
215+
"FileEmbeddingConsumer": {
216+
"Enabled": true
217+
},
218+
"ProductEmbeddingConsumer": {
219+
"Enabled": true
220+
}
221+
},
222+
"TextExtracting": {
223+
"MarkItDownServer": {
224+
"Endpoint": "https://localhost:7110/"
225+
}
226+
},
227+
"ImageAnalysis": {
228+
"AzureAIVision": {
229+
"Endpoint": "https://xxx.cognitiveservices.azure.com/",
230+
"ApiKey": "ApiKey"
231+
}
232+
},
233+
"Embedding": {
234+
"OpenAI": {
235+
"Endpoint": "https://models.github.ai/inference",
236+
"ApiKey": "<OpenAI ApiKey or GitHub Token>",
237+
"EmbeddingModelId": "openai/text-embedding-3-small"
238+
}
183239
}
184240
}

0 commit comments

Comments
 (0)