Skip to content

Commit bca6ae7

Browse files
authored
Merge pull request #41 from santisq/40-have-pstask-return-the-runspace-to-the-runspacepool
Improves RunspacePool re-use logic
2 parents 7b47eae + 214fea1 commit bca6ae7

10 files changed

+146
-234
lines changed

module/PSParallelPipeline.psd1

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
RootModule = 'bin/netstandard2.0/PSParallelPipeline.dll'
1212

1313
# Version number of this module.
14-
ModuleVersion = '1.1.9'
14+
ModuleVersion = '1.2.0'
1515

1616
# Supported PSEditions
1717
# CompatiblePSEditions = @()

src/PSParallelPipeline/Commands/InvokeParallelCommand.cs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ protected override void BeginProcessing()
7272
_worker.CancelAfter(TimeSpan.FromSeconds(TimeOutSeconds));
7373
}
7474

75-
_worker.Start();
75+
_worker.Run();
7676
}
7777

7878
protected override void ProcessRecord()
@@ -90,12 +90,13 @@ protected override void ProcessRecord()
9090
}
9191
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
9292
{
93-
_worker.StopAndWait();
93+
_worker.Cancel();
94+
_worker.Wait();
9495
throw;
9596
}
9697
catch (OperationCanceledException exception)
9798
{
98-
_worker.WaitOperationCanceled();
99+
_worker.Wait();
99100
exception.WriteTimeoutError(this);
100101
}
101102
catch (Exception exception)
@@ -115,17 +116,17 @@ protected override void EndProcessing()
115116
{
116117
ProcessOutput(data);
117118
}
118-
119119
_worker.Wait();
120120
}
121121
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
122122
{
123-
_worker.StopAndWait();
123+
_worker.Cancel();
124+
_worker.Wait();
124125
throw;
125126
}
126127
catch (OperationCanceledException exception)
127128
{
128-
_worker.WaitOperationCanceled();
129+
_worker.Wait();
129130
exception.WriteTimeoutError(this);
130131
}
131132
catch (Exception exception)
@@ -147,7 +148,8 @@ private void ProcessOutput(PSOutputData data)
147148
break;
148149

149150
case Type.Debug:
150-
WriteDebug((string)data.Output);
151+
DebugRecord debug = (DebugRecord)data.Output;
152+
WriteDebug(debug.Message);
151153
break;
152154

153155
case Type.Information:
@@ -159,16 +161,18 @@ private void ProcessOutput(PSOutputData data)
159161
break;
160162

161163
case Type.Verbose:
162-
WriteVerbose((string)data.Output);
164+
VerboseRecord verbose = (VerboseRecord)data.Output;
165+
WriteVerbose(verbose.Message);
163166
break;
164167

165168
case Type.Warning:
166-
WriteWarning((string)data.Output);
169+
WarningRecord warning = (WarningRecord)data.Output;
170+
WriteWarning(warning.Message);
167171
break;
168172
}
169173
}
170174

171-
protected override void StopProcessing() => _worker?.StopAndWait();
175+
protected override void StopProcessing() => _worker?.Cancel();
172176

173177
public void Dispose()
174178
{

src/PSParallelPipeline/ExceptionHelpers.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ internal static void WriteUnspecifiedError(this Exception exception, PSCmdlet cm
1919
cmdlet.WriteError(new ErrorRecord(
2020
exception, "UnspecifiedCmdletError", ErrorCategory.NotSpecified, cmdlet));
2121

22-
internal static ErrorRecord CreateProcessingTaskError(this Exception exception, object context) =>
23-
new(exception, "ProcessingTask", ErrorCategory.NotSpecified, context);
22+
internal static PSOutputData CreateProcessingTaskError(this Exception exception, object context) =>
23+
PSOutputData.WriteError(new ErrorRecord(
24+
exception, "ProcessingTask", ErrorCategory.NotSpecified, context));
2425

2526
private static bool ValueIsNotScriptBlock(object? value) =>
2627
value is not ScriptBlock and not PSObject { BaseObject: ScriptBlock };

src/PSParallelPipeline/PSOutputData.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,21 @@ internal record struct PSOutputData(Type Type, object Output)
1818
internal static PSOutputData WriteObject(object sendToPipeline) =>
1919
new(Type.Success, sendToPipeline);
2020

21-
internal static PSOutputData WriteError(ErrorRecord error) =>
21+
internal static PSOutputData WriteError(object error) =>
2222
new(Type.Error, error);
2323

24-
internal static PSOutputData WriteDebug(DebugRecord debug) =>
25-
new(Type.Debug, debug.Message);
24+
internal static PSOutputData WriteDebug(object debug) =>
25+
new(Type.Debug, debug);
2626

27-
internal static PSOutputData WriteInformation(InformationRecord information) =>
27+
internal static PSOutputData WriteInformation(object information) =>
2828
new(Type.Information, information);
2929

30-
internal static PSOutputData WriteProgress(ProgressRecord progress) =>
30+
internal static PSOutputData WriteProgress(object progress) =>
3131
new(Type.Progress, progress);
3232

33-
internal static PSOutputData WriteVerbose(VerboseRecord verbose) =>
34-
new(Type.Verbose, verbose.Message);
33+
internal static PSOutputData WriteVerbose(object verbose) =>
34+
new(Type.Verbose, verbose);
3535

36-
internal static PSOutputData WriteWarning(WarningRecord warning) =>
37-
new(Type.Warning, warning.Message);
36+
internal static PSOutputData WriteWarning(object warning) =>
37+
new(Type.Warning, warning);
3838
}

src/PSParallelPipeline/PSOutputStreams.cs

Lines changed: 22 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -33,91 +33,41 @@ internal PSOutputStreams(Worker worker)
3333
SetStreamHandlers();
3434
}
3535

36+
internal void AddOutput(PSOutputData data) => OutputPipe.Add(data);
37+
3638
private void SetStreamHandlers()
3739
{
38-
Success.DataAdded += (s, e) =>
39-
{
40-
foreach (PSObject data in Success.ReadAll())
41-
{
42-
WriteObject(data);
43-
}
44-
};
45-
46-
Error.DataAdded += (s, e) =>
47-
{
48-
foreach (ErrorRecord error in Error.ReadAll())
49-
{
50-
WriteError(error);
51-
}
52-
};
53-
54-
Debug.DataAdded += (s, e) =>
55-
{
56-
foreach (DebugRecord debug in Debug.ReadAll())
57-
{
58-
WriteDebug(debug);
59-
}
60-
};
61-
62-
63-
Information.DataAdded += (s, e) =>
64-
{
65-
foreach (InformationRecord information in Information.ReadAll())
66-
{
67-
WriteInformation(information);
68-
}
69-
};
70-
71-
Progress.DataAdded += (s, e) =>
72-
{
73-
foreach (ProgressRecord progress in Progress.ReadAll())
74-
{
75-
WriteProgress(progress);
76-
}
77-
};
78-
79-
Verbose.DataAdded += (s, e) =>
80-
{
81-
foreach (VerboseRecord verbose in Verbose.ReadAll())
82-
{
83-
WriteVerbose(verbose);
84-
}
85-
};
86-
87-
Warning.DataAdded += (s, e) =>
88-
{
89-
foreach (WarningRecord warning in Warning.ReadAll())
90-
{
91-
WriteWarning(warning);
92-
}
93-
};
94-
}
40+
Success.DataAdding += (s, e) =>
41+
AddOutput(PSOutputData.WriteObject(e.ItemAdded));
9542

96-
private void WriteObject(PSObject data) =>
97-
OutputPipe.Add(PSOutputData.WriteObject(data), Token);
43+
Error.DataAdding += (s, e) =>
44+
AddOutput(PSOutputData.WriteError(e.ItemAdded));
9845

99-
internal void WriteError(ErrorRecord error) =>
100-
OutputPipe.Add(PSOutputData.WriteError(error), Token);
46+
Debug.DataAdding += (s, e) =>
47+
AddOutput(PSOutputData.WriteDebug(e.ItemAdded));
10148

102-
private void WriteDebug(DebugRecord debug) =>
103-
OutputPipe.Add(PSOutputData.WriteDebug(debug), Token);
49+
Information.DataAdding += (s, e) =>
50+
AddOutput(PSOutputData.WriteInformation(e.ItemAdded));
10451

105-
private void WriteInformation(InformationRecord information) =>
106-
OutputPipe.Add(PSOutputData.WriteInformation(information), Token);
52+
Progress.DataAdding += (s, e) =>
53+
AddOutput(PSOutputData.WriteProgress(e.ItemAdded));
10754

108-
private void WriteProgress(ProgressRecord progress) =>
109-
OutputPipe.Add(PSOutputData.WriteProgress(progress), Token);
55+
Verbose.DataAdding += (s, e) =>
56+
AddOutput(PSOutputData.WriteVerbose(e.ItemAdded));
11057

111-
private void WriteVerbose(VerboseRecord verbose) =>
112-
OutputPipe.Add(PSOutputData.WriteVerbose(verbose), Token);
113-
114-
private void WriteWarning(WarningRecord warning) =>
115-
OutputPipe.Add(PSOutputData.WriteWarning(warning), Token);
58+
Warning.DataAdding += (s, e) =>
59+
AddOutput(PSOutputData.WriteWarning(e.ItemAdded));
60+
}
11661

11762
public void Dispose()
11863
{
11964
Success.Dispose();
12065
Error.Dispose();
66+
Debug.Dispose();
67+
Information.Dispose();
68+
Progress.Dispose();
69+
Verbose.Dispose();
70+
Warning.Dispose();
12171
OutputPipe.Dispose();
12272
GC.SuppressFinalize(this);
12373
}

src/PSParallelPipeline/PSTask.cs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ namespace PSParallelPipeline;
99

1010
internal sealed class PSTask : IDisposable
1111
{
12-
private PSOutputStreams OutputStreams { get => _pool.PSOutputStreams; }
13-
1412
private readonly PowerShell _powershell;
1513

1614
private readonly PSDataStreams _internalStreams;
1715

1816
private readonly RunspacePool _pool;
1917

18+
private PSOutputStreams OutputStreams { get => _pool.PSOutputStreams; }
19+
2020
internal Runspace Runspace
2121
{
2222
get => _powershell.Runspace;
@@ -70,30 +70,39 @@ internal PSTask AddScript(ScriptBlock script)
7070
return this;
7171
}
7272

73-
internal PSTask AddUsingStatements(Dictionary<string, object?>? usingParams)
73+
internal void AddUsingStatements(Dictionary<string, object?> usingParams)
7474
{
75-
if (usingParams is { Count: > 0 })
75+
if (usingParams.Count > 0 )
7676
{
7777
_powershell.AddParameters(new Dictionary<string, Dictionary<string, object?>>
7878
{
7979
["--%"] = usingParams
8080
});
8181
}
82-
83-
return this;
8482
}
8583

86-
internal async Task<PSTask> InvokeAsync()
84+
internal async Task InvokeAsync()
8785
{
88-
using CancellationTokenRegistration _ = _pool.RegisterCancellation(CancelCallback(this));
89-
await InvokePowerShellAsync(_powershell, OutputStreams.Success);
90-
return this;
86+
try
87+
{
88+
using CancellationTokenRegistration _ = _pool.RegisterCancellation(CancelCallback(this));
89+
await InvokePowerShellAsync(_powershell, OutputStreams.Success);
90+
}
91+
catch (Exception exception)
92+
{
93+
OutputStreams.AddOutput(exception.CreateProcessingTaskError(this));
94+
}
95+
finally
96+
{
97+
_pool.CompleteTask(this);
98+
_pool.Release();
99+
}
91100
}
92101

93-
private static Action CancelCallback(PSTask task) => delegate
102+
private static Action CancelCallback(PSTask psTask) => delegate
94103
{
95-
task.Dispose();
96-
task.Runspace.Dispose();
104+
psTask.Dispose();
105+
psTask.Runspace.Dispose();
97106
};
98107

99108
public void Dispose()

0 commit comments

Comments
 (0)