Skip to content

Commit 9df3fa1

Browse files
committed
(#271) Semantic/Vector Search
1 parent f8275c5 commit 9df3fa1

File tree

2 files changed

+65
-51
lines changed

2 files changed

+65
-51
lines changed

src/Monolith/ClassifiedAds.Background/MessageBusConsumers/FileEmbeddingConsumer.cs

Lines changed: 29 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ private async Task ProcessFileAsync(FileEntry fileEntry, CancellationToken cance
7474
return;
7575
}
7676

77+
if (fileEntry.Deleted)
78+
{
79+
return;
80+
}
81+
7782
using var scope = _serviceProvider.CreateScope();
7883
var fileStorageManager = scope.ServiceProvider.GetService<IFileStorageManager>();
7984
var markdownService = scope.ServiceProvider.GetService<MarkdownService>();
@@ -91,19 +96,9 @@ private async Task ProcessFileAsync(FileEntry fileEntry, CancellationToken cance
9196

9297
var chunks = TextChunkingService.ChunkSentences(Encoding.UTF8.GetString(bytes));
9398

94-
var chunksFolder = Path.Combine(_configuration["Storage:TempFolderPath"], "Chunks", fileEntry.Id.ToString());
99+
var chunksFolder = CreateDirectoryIfNotExist(Path.Combine(_configuration["Storage:TempFolderPath"], "Chunks", fileEntry.Id.ToString()));
95100

96-
if (!Directory.Exists(chunksFolder))
97-
{
98-
Directory.CreateDirectory(chunksFolder);
99-
}
100-
101-
var embeddingsFolder = Path.Combine(_configuration["Storage:TempFolderPath"], "Embeddings", fileEntry.Id.ToString());
102-
103-
if (!Directory.Exists(embeddingsFolder))
104-
{
105-
Directory.CreateDirectory(embeddingsFolder);
106-
}
101+
var embeddingsFolder = CreateDirectoryIfNotExist(Path.Combine(_configuration["Storage:TempFolderPath"], "Embeddings", fileEntry.Id.ToString()));
107102

108103
foreach (var chunk in chunks)
109104
{
@@ -119,12 +114,7 @@ private async Task ProcessFileAsync(FileEntry fileEntry, CancellationToken cance
119114
{
120115
_logger.LogInformation("Converting file to markdown for FileEntry Id: {FileEntryId}", fileEntry?.Id);
121116

122-
var markdownFolder = Path.Combine(_configuration["Storage:TempFolderPath"], "Markdown");
123-
124-
if (!Directory.Exists(markdownFolder))
125-
{
126-
Directory.CreateDirectory(markdownFolder);
127-
}
117+
var markdownFolder = CreateDirectoryIfNotExist(Path.Combine(_configuration["Storage:TempFolderPath"], "Markdown"));
128118

129119
var markdownFile = Path.Combine(markdownFolder, fileEntry.Id + ".md");
130120

@@ -137,19 +127,9 @@ private async Task ProcessFileAsync(FileEntry fileEntry, CancellationToken cance
137127

138128
var chunks = TextChunkingService.ChunkSentences(await File.ReadAllTextAsync(markdownFile, cancellationToken));
139129

140-
var chunksFolder = Path.Combine(_configuration["Storage:TempFolderPath"], "Chunks", fileEntry.Id.ToString());
130+
var chunksFolder = CreateDirectoryIfNotExist(Path.Combine(_configuration["Storage:TempFolderPath"], "Chunks", fileEntry.Id.ToString()));
141131

142-
if (!Directory.Exists(chunksFolder))
143-
{
144-
Directory.CreateDirectory(chunksFolder);
145-
}
146-
147-
var embeddingsFolder = Path.Combine(_configuration["Storage:TempFolderPath"], "Embeddings", fileEntry.Id.ToString());
148-
149-
if (!Directory.Exists(embeddingsFolder))
150-
{
151-
Directory.CreateDirectory(embeddingsFolder);
152-
}
132+
var embeddingsFolder = CreateDirectoryIfNotExist(Path.Combine(_configuration["Storage:TempFolderPath"], "Embeddings", fileEntry.Id.ToString()));
153133

154134
foreach (var chunk in chunks)
155135
{
@@ -164,21 +144,12 @@ private async Task ProcessFileAsync(FileEntry fileEntry, CancellationToken cance
164144
{
165145
_logger.LogInformation("Processing image file for FileEntry Id: {FileEntryId}", fileEntry?.Id);
166146

167-
var imageAnalysisFolder = Path.Combine(_configuration["Storage:TempFolderPath"], "ImageAnalysis");
147+
var imageAnalysisFolder = CreateDirectoryIfNotExist(Path.Combine(_configuration["Storage:TempFolderPath"], "ImageAnalysis"));
168148

169-
if (!Directory.Exists(imageAnalysisFolder))
170-
{
171-
Directory.CreateDirectory(imageAnalysisFolder);
172-
}
149+
var embeddingsFolder = CreateDirectoryIfNotExist(Path.Combine(_configuration["Storage:TempFolderPath"], "Embeddings", fileEntry.Id.ToString()));
173150

174-
var embeddingsFolder = Path.Combine(_configuration["Storage:TempFolderPath"], "Embeddings", fileEntry.Id.ToString());
175-
176-
if (!Directory.Exists(embeddingsFolder))
177-
{
178-
Directory.CreateDirectory(embeddingsFolder);
179-
}
180-
181-
var imageAnalysisFile = Path.Combine(imageAnalysisFolder, fileEntry.Id + ".json");
151+
var imageAnalysisFile = Path.Combine(imageAnalysisFolder, $"{fileEntry.Id}.json");
152+
var embeddingFile = Path.Combine(embeddingsFolder, $"{fileEntry.Id}.json");
182153

183154
if (!File.Exists(imageAnalysisFile))
184155
{
@@ -189,12 +160,14 @@ private async Task ProcessFileAsync(FileEntry fileEntry, CancellationToken cance
189160
var json = JsonSerializer.Serialize(imageAnalysisResult);
190161

191162
await File.WriteAllTextAsync(imageAnalysisFile, json, cancellationToken);
163+
}
192164

165+
if (!File.Exists(embeddingFile))
166+
{
167+
var json = await File.ReadAllTextAsync(imageAnalysisFile, cancellationToken);
193168
var embedding = await embeddingService.GenerateAsync(json, cancellationToken);
194-
await File.WriteAllTextAsync(Path.Combine(embeddingsFolder, $"{fileEntry.Id}.json"), JsonSerializer.Serialize(embedding), cancellationToken);
169+
await File.WriteAllTextAsync(embeddingFile, JsonSerializer.Serialize(embedding), cancellationToken);
195170
}
196-
197-
198171
}
199172
}
200173

@@ -224,4 +197,14 @@ private async Task<byte[]> GetBytesAsync(IFileStorageManager fileStorageManager,
224197

225198
return content;
226199
}
200+
201+
private static string CreateDirectoryIfNotExist(string path)
202+
{
203+
if (!Directory.Exists(path))
204+
{
205+
Directory.CreateDirectory(path);
206+
}
207+
208+
return path;
209+
}
227210
}
Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
using ClassifiedAds.Application.Products.MessageBusEvents;
2+
using ClassifiedAds.Background.Services;
3+
using ClassifiedAds.Domain.Entities;
24
using ClassifiedAds.Domain.Infrastructure.Messaging;
35
using Microsoft.Extensions.Configuration;
6+
using Microsoft.Extensions.DependencyInjection;
47
using Microsoft.Extensions.Logging;
8+
using System;
9+
using System.IO;
10+
using System.Text.Json;
511
using System.Threading;
612
using System.Threading.Tasks;
713

@@ -14,29 +20,54 @@ public sealed class ProductEmbeddingConsumer :
1420
{
1521
private readonly ILogger<ProductEmbeddingConsumer> _logger;
1622
private readonly IConfiguration _configuration;
23+
private readonly IServiceProvider _serviceProvider;
1724

1825
public ProductEmbeddingConsumer(ILogger<ProductEmbeddingConsumer> logger,
19-
IConfiguration configuration)
26+
IConfiguration configuration,
27+
IServiceProvider serviceProvider)
2028
{
2129
_logger = logger;
2230
_configuration = configuration;
31+
_serviceProvider = serviceProvider;
2332
}
2433

25-
public Task HandleAsync(ProductCreatedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
34+
public async Task HandleAsync(ProductCreatedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
2635
{
2736
_logger.LogInformation("Handling ProductCreatedEvent for ProductId: {ProductId}", data?.Product?.Id);
28-
return Task.CompletedTask;
37+
38+
await ProcessProductAsync(data.Product, cancellationToken);
2939
}
3040

31-
public Task HandleAsync(ProductUpdatedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
41+
public async Task HandleAsync(ProductUpdatedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
3242
{
3343
_logger.LogInformation("Handling ProductUpdatedEvent for ProductId: {ProductId}", data?.Product?.Id);
34-
return Task.CompletedTask;
44+
45+
await ProcessProductAsync(data.Product, cancellationToken);
3546
}
3647

3748
public Task HandleAsync(ProductDeletedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
3849
{
3950
_logger.LogInformation("Handling ProductDeletedEvent for ProductId: {ProductId}", data?.Product?.Id);
4051
return Task.CompletedTask;
4152
}
53+
54+
private async Task ProcessProductAsync(Product product, CancellationToken cancellationToken)
55+
{
56+
using var scope = _serviceProvider.CreateScope();
57+
var embeddingService = scope.ServiceProvider.GetService<EmbeddingService>();
58+
59+
var embedding = await embeddingService.GenerateAsync(product.Description, cancellationToken);
60+
var embeddingsFolder = CreateDirectoryIfNotExist(Path.Combine(_configuration["Storage:TempFolderPath"], "Embeddings", "Products"));
61+
await File.WriteAllTextAsync(Path.Combine(embeddingsFolder, $"{product.Id}.json"), JsonSerializer.Serialize(embedding), cancellationToken);
62+
}
63+
64+
private static string CreateDirectoryIfNotExist(string path)
65+
{
66+
if (!Directory.Exists(path))
67+
{
68+
Directory.CreateDirectory(path);
69+
}
70+
71+
return path;
72+
}
4273
}

0 commit comments

Comments
 (0)