-
Notifications
You must be signed in to change notification settings - Fork 863
Expand file tree
/
Copy pathIngestionDocumentReader.cs
More file actions
121 lines (108 loc) · 6.03 KB
/
IngestionDocumentReader.cs
File metadata and controls
121 lines (108 loc) · 6.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Mime;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Shared.Diagnostics;
namespace Microsoft.Extensions.DataIngestion;
/// <summary>
/// Reads source content and converts it to an <see cref="IngestionDocument"/>.
/// </summary>
public abstract class IngestionDocumentReader
{
/// <summary>
/// Reads a file and converts it to an <see cref="IngestionDocument"/>.
/// </summary>
/// <param name="source">The file to read.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous read operation.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is <see langword="null"/>.</exception>
public Task<IngestionDocument> ReadAsync(FileInfo source, CancellationToken cancellationToken = default)
{
string identifier = Throw.IfNull(source).FullName; // entire path is more unique than just part of it.
return ReadAsync(source, identifier, GetMediaType(source), cancellationToken);
}
/// <summary>
/// Reads a file and converts it to an <see cref="IngestionDocument"/>.
/// </summary>
/// <param name="source">The file to read.</param>
/// <param name="identifier">The unique identifier for the document.</param>
/// <param name="mediaType">The media type of the file.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous read operation.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="identifier"/> is <see langword="null"/> or empty.</exception>
public virtual async Task<IngestionDocument> ReadAsync(FileInfo source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(source);
_ = Throw.IfNullOrEmpty(identifier);
using FileStream stream = new(source.FullName, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: 1, FileOptions.Asynchronous);
return await ReadAsync(stream, identifier, string.IsNullOrEmpty(mediaType) ? GetMediaType(source) : mediaType!, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Reads all files in the specified directory that match the given search pattern and option,
/// and converts each to an <see cref="IngestionDocument"/>.
/// </summary>
/// <param name="directory">The directory to read.</param>
/// <param name="searchPattern">The search pattern for file selection.</param>
/// <param name="searchOption">The search option for directory traversal.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>An asynchronous sequence of <see cref="IngestionDocument"/> instances.</returns>
/// <exception cref="ArgumentNullException"><paramref name="directory"/> is <see langword="null"/>.</exception>
/// <exception cref="ArgumentNullException"><paramref name="searchPattern"/> is <see langword="null"/> or empty.</exception>
public async IAsyncEnumerable<IngestionDocument> ReadAsync(
DirectoryInfo directory,
string searchPattern = "*.*",
SearchOption searchOption = SearchOption.TopDirectoryOnly,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(directory);
_ = Throw.IfNullOrEmpty(searchPattern);
_ = Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories);
await foreach (var document in ReadAsync(directory.EnumerateFiles(searchPattern, searchOption), cancellationToken).ConfigureAwait(false))
{
yield return document;
}
}
/// <summary>
/// Reads the specified files and converts each to an <see cref="IngestionDocument"/>.
/// </summary>
/// <param name="files">The files to read.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>An asynchronous sequence of <see cref="IngestionDocument"/> instances.</returns>
/// <exception cref="ArgumentNullException"><paramref name="files"/> is <see langword="null"/>.</exception>
public async IAsyncEnumerable<IngestionDocument> ReadAsync(
IEnumerable<FileInfo> files,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(files);
foreach (FileInfo file in files)
{
cancellationToken.ThrowIfCancellationRequested();
IngestionDocument document;
try
{
document = await ReadAsync(file, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
document = new IngestionDocument(file.FullName) { ReadException = ex };
}
yield return document;
}
}
/// <summary>
/// Reads a stream and converts it to an <see cref="IngestionDocument"/>.
/// </summary>
/// <param name="source">The stream to read.</param>
/// <param name="identifier">The unique identifier for the document.</param>
/// <param name="mediaType">The media type of the content.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous read operation.</returns>
public abstract Task<IngestionDocument> ReadAsync(Stream source, string identifier, string mediaType, CancellationToken cancellationToken = default);
private static string GetMediaType(FileInfo source) =>
MediaTypeMap.GetMediaType(source.Extension) ?? "application/octet-stream";
}