Skip to content

Commit b1165fd

Browse files
authored
Add follow flag to watching console logs method (#8779)
1 parent 09bcfce commit b1165fd

File tree

18 files changed

+435
-99
lines changed

18 files changed

+435
-99
lines changed

src/Aspire.Dashboard/ResourceService/DashboardClient.cs

+30-8
Original file line numberDiff line numberDiff line change
@@ -492,15 +492,8 @@ async IAsyncEnumerable<IReadOnlyList<ResourceLogLine>> IDashboardClient.Subscrib
492492
{
493493
await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken: combinedTokens.Token).ConfigureAwait(false))
494494
{
495-
var logLines = new ResourceLogLine[response.LogLines.Count];
496-
497-
for (var i = 0; i < logLines.Length; i++)
498-
{
499-
logLines[i] = new ResourceLogLine(response.LogLines[i].LineNumber, response.LogLines[i].Text, response.LogLines[i].IsStdErr);
500-
}
501-
502495
// Channel is unbound so TryWrite always succeeds.
503-
channel.Writer.TryWrite(logLines);
496+
channel.Writer.TryWrite(CreateLogLines(response.LogLines));
504497
}
505498
}
506499
finally
@@ -524,6 +517,35 @@ async IAsyncEnumerable<IReadOnlyList<ResourceLogLine>> IDashboardClient.Subscrib
524517
await readTask.ConfigureAwait(false);
525518
}
526519

520+
async IAsyncEnumerable<IReadOnlyList<ResourceLogLine>> IDashboardClient.GetConsoleLogs(string resourceName, [EnumeratorCancellation] CancellationToken cancellationToken)
521+
{
522+
EnsureInitialized();
523+
524+
using var combinedTokens = CancellationTokenSource.CreateLinkedTokenSource(_clientCancellationToken, cancellationToken);
525+
526+
var call = _client!.WatchResourceConsoleLogs(
527+
new WatchResourceConsoleLogsRequest() { ResourceName = resourceName, SuppressFollow = true },
528+
headers: _headers,
529+
cancellationToken: combinedTokens.Token);
530+
531+
await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken: combinedTokens.Token).ConfigureAwait(false))
532+
{
533+
yield return CreateLogLines(response.LogLines);
534+
}
535+
}
536+
537+
private static ResourceLogLine[] CreateLogLines(IList<ConsoleLogLine> logLines)
538+
{
539+
var resourceLogLines = new ResourceLogLine[logLines.Count];
540+
541+
for (var i = 0; i < logLines.Count; i++)
542+
{
543+
resourceLogLines[i] = new ResourceLogLine(logLines[i].LineNumber, logLines[i].Text, logLines[i].IsStdErr);
544+
}
545+
546+
return resourceLogLines;
547+
}
548+
527549
public async Task<ResourceCommandResponseViewModel> ExecuteResourceCommandAsync(string resourceName, string resourceType, CommandViewModel command, CancellationToken cancellationToken)
528550
{
529551
EnsureInitialized();

src/Aspire.Dashboard/ResourceService/IDashboardClient.cs

+2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public interface IDashboardClient : IDashboardClientStatus, IAsyncDisposable
4242
/// so that resources owned by the sequence and its consumers can be freed.</para>
4343
IAsyncEnumerable<IReadOnlyList<ResourceLogLine>> SubscribeConsoleLogs(string resourceName, CancellationToken cancellationToken);
4444

45+
IAsyncEnumerable<IReadOnlyList<ResourceLogLine>> GetConsoleLogs(string resourceName, CancellationToken cancellationToken);
46+
4547
Task<ResourceCommandResponseViewModel> ExecuteResourceCommandAsync(string resourceName, string resourceType, CommandViewModel command, CancellationToken cancellationToken);
4648
}
4749

src/Aspire.Hosting/ApplicationModel/ResourceLoggerService.cs

+120-51
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class ResourceLoggerService
2020
internal TimeProvider TimeProvider { get; set; } = TimeProvider.System;
2121

2222
private readonly ConcurrentDictionary<string, ResourceLoggerState> _loggers = new();
23+
private IConsoleLogsService _consoleLogsService = new FakeConsoleLogsService();
2324
private Action<(string, ResourceLoggerState)>? _loggerAdded;
2425
private event Action<(string, ResourceLoggerState)> LoggerAdded
2526
{
@@ -146,6 +147,26 @@ internal Action<LogEntry> GetInternalLogger(string resourceName)
146147
return (logEntry) => state.AddLog(logEntry, inMemorySource: false);
147148
}
148149

150+
/// <summary>
151+
/// Get all logs for a resource. This will return all logs that have been written to the log stream for the resource and then complete.
152+
/// </summary>
153+
/// <param name="resource">The resource to get all logs for.</param>
154+
/// <returns>An async enumerable that returns all logs that have been written to the log stream and then completes.</returns>
155+
public IAsyncEnumerable<IReadOnlyList<LogLine>> GetAllAsync(IResource resource)
156+
{
157+
ArgumentNullException.ThrowIfNull(resource);
158+
159+
var resourceNames = resource.GetResolvedResourceNames();
160+
if (resourceNames.Length > 1)
161+
{
162+
return CombineMultipleAsync(resourceNames, GetAllAsync);
163+
}
164+
else
165+
{
166+
return GetAllAsync(resourceNames[0]);
167+
}
168+
}
169+
149170
/// <summary>
150171
/// Watch for changes to the log stream for a resource.
151172
/// </summary>
@@ -158,43 +179,24 @@ public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(IResource resource)
158179
var resourceNames = resource.GetResolvedResourceNames();
159180
if (resourceNames.Length > 1)
160181
{
161-
return WatchMultipleAsync(resourceNames, WatchAsync);
182+
return CombineMultipleAsync(resourceNames, WatchAsync);
162183
}
163184
else
164185
{
165186
return WatchAsync(resourceNames[0]);
166187
}
188+
}
167189

168-
static async IAsyncEnumerable<IReadOnlyList<LogLine>> WatchMultipleAsync(string[] resourceNames, Func<string, IAsyncEnumerable<IReadOnlyList<LogLine>>> watch)
169-
{
170-
var channel = Channel.CreateUnbounded<IReadOnlyList<LogLine>>();
171-
var readTasks = resourceNames.Select(async (name) =>
172-
{
173-
await foreach (var logLines in watch(name).ConfigureAwait(false))
174-
{
175-
channel.Writer.TryWrite(logLines);
176-
}
177-
});
178-
179-
var completionTask = Task.Run(async () =>
180-
{
181-
try
182-
{
183-
await Task.WhenAll(readTasks).ConfigureAwait(false);
184-
}
185-
finally
186-
{
187-
channel.Writer.Complete();
188-
}
189-
});
190-
191-
await foreach (var item in channel.Reader.ReadAllAsync().ConfigureAwait(false))
192-
{
193-
yield return item;
194-
}
190+
/// <summary>
191+
/// Get all logs for a resource. This will return all logs that have been written to the log stream for the resource and then complete.
192+
/// </summary>
193+
/// <param name="resourceName">The resource name</param>
194+
/// <returns>An async enumerable that returns all logs that have been written to the log stream and then completes.</returns>
195+
public IAsyncEnumerable<IReadOnlyList<LogLine>> GetAllAsync(string resourceName)
196+
{
197+
ArgumentNullException.ThrowIfNull(resourceName);
195198

196-
await completionTask.ConfigureAwait(false);
197-
}
199+
return GetResourceLoggerState(resourceName).GetAllAsync();
198200
}
199201

200202
/// <summary>
@@ -290,11 +292,42 @@ public void ClearBacklog(string resourceName)
290292
}
291293
}
292294

295+
private static async IAsyncEnumerable<IReadOnlyList<LogLine>> CombineMultipleAsync(string[] resourceNames, Func<string, IAsyncEnumerable<IReadOnlyList<LogLine>>> fetch)
296+
{
297+
var channel = Channel.CreateUnbounded<IReadOnlyList<LogLine>>();
298+
var readTasks = resourceNames.Select(async (name) =>
299+
{
300+
await foreach (var logLines in fetch(name).ConfigureAwait(false))
301+
{
302+
channel.Writer.TryWrite(logLines);
303+
}
304+
});
305+
306+
var completionTask = Task.Run(async () =>
307+
{
308+
try
309+
{
310+
await Task.WhenAll(readTasks).ConfigureAwait(false);
311+
}
312+
finally
313+
{
314+
channel.Writer.Complete();
315+
}
316+
});
317+
318+
await foreach (var item in channel.Reader.ReadAllAsync().ConfigureAwait(false))
319+
{
320+
yield return item;
321+
}
322+
323+
await completionTask.ConfigureAwait(false);
324+
}
325+
293326
// Internal for testing.
294327
internal ResourceLoggerState GetResourceLoggerState(string resourceName) =>
295328
_loggers.GetOrAdd(resourceName, (name, context) =>
296329
{
297-
var state = new ResourceLoggerState(TimeProvider);
330+
var state = new ResourceLoggerState(name, TimeProvider, _consoleLogsService);
298331
context._loggerAdded?.Invoke((name, state));
299332
return state;
300333
},
@@ -314,15 +347,19 @@ internal sealed class ResourceLoggerState
314347

315348
private readonly CircularBuffer<LogEntry> _inMemoryEntries = new(MaxLogCount);
316349
private readonly LogEntries _backlog = new(MaxLogCount) { BaseLineNumber = 0 };
350+
private readonly string _name;
317351
private readonly TimeProvider _timeProvider;
352+
private readonly IConsoleLogsService _consoleLogsService;
318353

319354
/// <summary>
320355
/// Creates a new <see cref="ResourceLoggerState"/>.
321356
/// </summary>
322-
public ResourceLoggerState(TimeProvider timeProvider)
357+
public ResourceLoggerState(string name, TimeProvider timeProvider, IConsoleLogsService consoleLogsService)
323358
{
324359
_logger = new ResourceLogger(this);
360+
_name = name;
325361
_timeProvider = timeProvider;
362+
_consoleLogsService = consoleLogsService;
326363
}
327364

328365
private Action<bool>? _onSubscribersChanged;
@@ -353,6 +390,25 @@ public event Action<bool> OnSubscribersChanged
353390
}
354391
}
355392

393+
public async IAsyncEnumerable<IReadOnlyList<LogLine>> GetAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
394+
{
395+
var consoleLogsEnumerable = _consoleLogsService.GetAllLogsAsync(_name, cancellationToken);
396+
397+
List<LogEntry> inMemoryEntries;
398+
lock (_lock)
399+
{
400+
inMemoryEntries = _inMemoryEntries.ToList();
401+
}
402+
403+
var lineNumber = 0;
404+
yield return CreateLogLines(ref lineNumber, inMemoryEntries);
405+
406+
await foreach (var item in consoleLogsEnumerable.ConfigureAwait(false))
407+
{
408+
yield return CreateLogLines(ref lineNumber, item);
409+
}
410+
}
411+
356412
/// <summary>
357413
/// Watch for changes to the log stream for a resource.
358414
/// </summary>
@@ -408,25 +464,6 @@ public async IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync([EnumeratorCanc
408464
channel.Writer.TryComplete();
409465
}
410466
}
411-
412-
static LogLine[] CreateLogLines(ref int lineNumber, IReadOnlyList<LogEntry> entries)
413-
{
414-
var logs = new LogLine[entries.Count];
415-
for (var i = 0; i < entries.Count; i++)
416-
{
417-
var entry = entries[i];
418-
var content = entry.Content ?? string.Empty;
419-
if (entry.Timestamp != null)
420-
{
421-
content = entry.Timestamp.Value.ToString(KnownFormats.ConsoleLogsTimestampFormat, CultureInfo.InvariantCulture) + " " + content;
422-
}
423-
424-
logs[i] = new LogLine(lineNumber, content, entry.Type == LogEntryType.Error);
425-
lineNumber++;
426-
}
427-
428-
return logs;
429-
}
430467
}
431468

432469
private bool HasSubscribers
@@ -554,6 +591,38 @@ public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Except
554591
}
555592
}
556593
}
594+
595+
private static LogLine[] CreateLogLines(ref int lineNumber, IReadOnlyList<LogEntry> entries)
596+
{
597+
var logs = new LogLine[entries.Count];
598+
for (var i = 0; i < entries.Count; i++)
599+
{
600+
var entry = entries[i];
601+
var content = entry.Content ?? string.Empty;
602+
if (entry.Timestamp != null)
603+
{
604+
content = entry.Timestamp.Value.ToString(KnownFormats.ConsoleLogsTimestampFormat, CultureInfo.InvariantCulture) + " " + content;
605+
}
606+
607+
logs[i] = new LogLine(lineNumber, content, entry.Type == LogEntryType.Error);
608+
lineNumber++;
609+
}
610+
611+
return logs;
612+
}
613+
614+
internal void SetConsoleLogsService(IConsoleLogsService consoleLogsService)
615+
{
616+
_consoleLogsService = consoleLogsService;
617+
}
618+
619+
private sealed class FakeConsoleLogsService : IConsoleLogsService
620+
{
621+
public IAsyncEnumerable<IReadOnlyList<LogEntry>> GetAllLogsAsync(string resourceName, CancellationToken cancellationToken)
622+
{
623+
throw new InvalidOperationException($"Getting all logs requires the {nameof(ResourceLoggerService)} instance created by DI.");
624+
}
625+
}
557626
}
558627

559628
/// <summary>

src/Aspire.Hosting/Dashboard/DashboardService.cs

+7-5
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,21 @@ public override async Task WatchResourceConsoleLogs(
108108
ServerCallContext context)
109109
{
110110
await ExecuteAsync(
111-
WatchResourceConsoleLogsInternal,
111+
cancellationToken => WatchResourceConsoleLogsInternal(request.SuppressFollow, cancellationToken),
112112
context).ConfigureAwait(false);
113113

114-
async Task WatchResourceConsoleLogsInternal(CancellationToken cancellationToken)
114+
async Task WatchResourceConsoleLogsInternal(bool suppressFollow, CancellationToken cancellationToken)
115115
{
116-
var subscription = serviceData.SubscribeConsoleLogs(request.ResourceName);
116+
var enumerable = suppressFollow
117+
? serviceData.GetConsoleLogs(request.ResourceName)
118+
: serviceData.SubscribeConsoleLogs(request.ResourceName);
117119

118-
if (subscription is null)
120+
if (enumerable is null)
119121
{
120122
return;
121123
}
122124

123-
await foreach (var group in subscription.WithCancellation(cancellationToken).ConfigureAwait(false))
125+
await foreach (var group in enumerable.WithCancellation(cancellationToken).ConfigureAwait(false))
124126
{
125127
var sentLines = 0;
126128

src/Aspire.Hosting/Dashboard/DashboardServiceData.cs

+17-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ public DashboardServiceData(
2727
_resourceLoggerService = resourceLoggerService;
2828
_resourcePublisher = new ResourcePublisher(_cts.Token);
2929
_commandExecutor = commandExecutor;
30-
3130
var cancellationToken = _cts.Token;
3231

3332
Task.Run(async () =>
@@ -149,4 +148,21 @@ async IAsyncEnumerable<IReadOnlyList<LogLine>> Enumerate([EnumeratorCancellation
149148
}
150149
}
151150
}
151+
152+
internal IAsyncEnumerable<IReadOnlyList<LogLine>>? GetConsoleLogs(string resourceName)
153+
{
154+
var sequence = _resourceLoggerService.GetAllAsync(resourceName);
155+
156+
return sequence is null ? null : Enumerate();
157+
158+
async IAsyncEnumerable<IReadOnlyList<LogLine>> Enumerate([EnumeratorCancellation] CancellationToken cancellationToken = default)
159+
{
160+
using var linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token);
161+
162+
await foreach (var item in sequence.WithCancellation(linked.Token).ConfigureAwait(false))
163+
{
164+
yield return item;
165+
}
166+
}
167+
}
152168
}

src/Aspire.Hosting/Dashboard/proto/resource_service.proto

+3
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,9 @@ message ConsoleLogLine {
297297
message WatchResourceConsoleLogsRequest {
298298
// Specifies the resource to watch logs from.
299299
string resource_name = 1;
300+
// A flag that indicates whether to suppress following new logs.
301+
// The method completes once the current set of logs is received.
302+
bool suppress_follow = 2;
300303
}
301304

302305
// A message received from the server when watching resource logs.

0 commit comments

Comments
 (0)