-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPerformerQuery.cs
More file actions
77 lines (67 loc) · 2.34 KB
/
PerformerQuery.cs
File metadata and controls
77 lines (67 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
using Couchbase.AnalyticsClient.Results;
using Couchbase.Grpc.Protocol.Columnar;
namespace Couchbase.Analytics.Performer.Internal.Utils;
public class PerformerQuery
{
public PerformerQuery(Task<IQueryResult> queryTask, ContentAs contentAs, CancellationTokenSource cancellationTokenSource)
{
QueryTask = queryTask;
ContentAs = contentAs;
CancellationTokenSource = cancellationTokenSource;
}
private Task<IQueryResult> QueryTask { get; }
private IQueryResult? QueryResult { get; set; }
public CancellationTokenSource? CancellationTokenSource { get; }
public ContentAs ContentAs { get; set; }
private IAsyncEnumerator<AnalyticsRow>? _cachedEnumerator;
public async Task<IQueryResult> GetQueryResult()
{
return QueryResult ??= await QueryTask.ConfigureAwait(false);
}
public async Task<QueryRowResponse> GetNextRow()
{
if (QueryResult is null)
{
// Do not wrap in try/catch since we want to bubble the Exception to the caller
// so it can convert it appropriately (e.g. "Invalid Credentials" and such)
QueryResult = await QueryTask.ConfigureAwait(false);
}
if (_cachedEnumerator is null)
{
try
{
// ReSharper disable once MethodSupportsCancellation
_cachedEnumerator = QueryResult.GetAsyncEnumerator();
}
catch (Exception ex)
{
throw new InvalidOperationException("Failed to get async enumerator from query task.", ex);
}
}
QueryRowResponse response;
if (await _cachedEnumerator!.MoveNextAsync().ConfigureAwait(false))
{
response = new QueryRowResponse()
{
Success = new QueryRowResponse.Types.Result()
{
Row = new QueryRowResponse.Types.Row()
{
RowContent = _cachedEnumerator.Current.ContentAsToAnalyticsRow(ContentAs)
}
}
};
}
else
{
response = new QueryRowResponse()
{
Success = new QueryRowResponse.Types.Result()
{
EndOfStream = true
}
};
}
return response;
}
}