Skip to content

Commit c972bf5

Browse files
authored
Merge pull request #45 from santisq/44-invoke-parallel-should-throw-when-passing-a-function-not-found
44 invoke parallel should throw when passing a function not found
2 parents 02a0144 + 6e058d9 commit c972bf5

11 files changed

+209
-192
lines changed

module/PSParallelPipeline.psd1

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
RootModule = 'bin/netstandard2.0/PSParallelPipeline.dll'
1212

1313
# Version number of this module.
14-
ModuleVersion = '1.2.1'
14+
ModuleVersion = '1.2.2'
1515

1616
# Supported PSEditions
1717
# CompatiblePSEditions = @()
@@ -77,7 +77,7 @@
7777
VariablesToExport = @()
7878

7979
# Aliases to export from this module, for best performance, do not use wildcards and do not delete the entry, use an empty array if there are no aliases to export.
80-
AliasesToExport = @('parallel')
80+
AliasesToExport = @('parallel', 'asparallel')
8181

8282
# DSC resources to export from this module
8383
# DscResourcesToExport = @()

src/PSParallelPipeline/Commands/InvokeParallelCommand.cs

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,20 @@
22
using System.Collections;
33
using System.Management.Automation;
44
using System.Management.Automation.Runspaces;
5+
using System.Threading;
56
using PSParallelPipeline.Poly;
67

78
namespace PSParallelPipeline.Commands;
89

910
[Cmdlet(VerbsLifecycle.Invoke, "Parallel")]
10-
[Alias("parallel")]
11+
[Alias("parallel", "asparallel")]
1112
[OutputType(typeof(object))]
1213
public sealed class InvokeParallelCommand : PSCmdlet, IDisposable
1314
{
1415
private Worker? _worker;
1516

17+
private readonly CancellationTokenSource _cts = new();
18+
1619
[Parameter(Position = 0, Mandatory = true)]
1720
public ScriptBlock ScriptBlock { get; set; } = null!;
1821

@@ -46,6 +49,11 @@ public sealed class InvokeParallelCommand : PSCmdlet, IDisposable
4649

4750
protected override void BeginProcessing()
4851
{
52+
if (TimeoutSeconds > 0)
53+
{
54+
_cts.CancelAfter(TimeSpan.FromSeconds(TimeoutSeconds));
55+
}
56+
4957
InitialSessionState iss = InitialSessionState
5058
.CreateDefault2()
5159
.AddFunctions(Functions, this)
@@ -55,48 +63,42 @@ protected override void BeginProcessing()
5563
{
5664
MaxRunspaces = ThrottleLimit,
5765
UseNewRunspace = UseNewRunspace,
58-
InitialSessionState = iss,
59-
UsingStatements = ScriptBlock.GetUsingParameters(this)
66+
InitialSessionState = iss
6067
};
6168

62-
_worker = new Worker(poolSettings);
63-
64-
if (TimeoutSeconds > 0)
69+
TaskSettings workerSettings = new()
6570
{
66-
_worker.CancelAfter(TimeSpan.FromSeconds(TimeoutSeconds));
67-
}
71+
Script = ScriptBlock.ToString(),
72+
UsingStatements = ScriptBlock.GetUsingParameters(this)
73+
};
6874

75+
_worker = new Worker(poolSettings, workerSettings, _cts.Token);
6976
_worker.Run();
7077
}
7178

7279
protected override void ProcessRecord()
7380
{
7481
Dbg.Assert(_worker is not null);
75-
this.ThrowIfInputObjectIsScriptBlock(InputObject);
82+
InputObject.ThrowIfInputObjectIsScriptBlock(this);
7683

7784
try
7885
{
79-
_worker.Enqueue(InputObject, ScriptBlock);
86+
_worker.Enqueue(InputObject);
8087
while (_worker.TryTake(out PSOutputData data))
8188
{
8289
ProcessOutput(data);
8390
}
8491
}
8592
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
8693
{
87-
_worker.Cancel();
88-
_worker.Wait();
94+
CancelAndWait();
8995
throw;
9096
}
9197
catch (OperationCanceledException exception)
9298
{
9399
_worker.Wait();
94100
exception.WriteTimeoutError(this);
95101
}
96-
catch (Exception exception)
97-
{
98-
exception.WriteUnspecifiedError(this);
99-
}
100102
}
101103

102104
protected override void EndProcessing()
@@ -110,23 +112,19 @@ protected override void EndProcessing()
110112
{
111113
ProcessOutput(data);
112114
}
115+
113116
_worker.Wait();
114117
}
115118
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
116119
{
117-
_worker.Cancel();
118-
_worker.Wait();
120+
CancelAndWait();
119121
throw;
120122
}
121123
catch (OperationCanceledException exception)
122124
{
123125
_worker.Wait();
124126
exception.WriteTimeoutError(this);
125127
}
126-
catch (Exception exception)
127-
{
128-
exception.WriteUnspecifiedError(this);
129-
}
130128
}
131129

132130
private void ProcessOutput(PSOutputData data)
@@ -166,11 +164,18 @@ private void ProcessOutput(PSOutputData data)
166164
}
167165
}
168166

169-
protected override void StopProcessing() => _worker?.Cancel();
167+
private void CancelAndWait()
168+
{
169+
_cts.Cancel();
170+
_worker?.Wait();
171+
}
172+
173+
protected override void StopProcessing() => CancelAndWait();
170174

171175
public void Dispose()
172176
{
173177
_worker?.Dispose();
178+
_cts.Dispose();
174179
GC.SuppressFinalize(this);
175180
}
176181
}

src/PSParallelPipeline/ExceptionHelpers.cs renamed to src/PSParallelPipeline/ExceptionHelper.cs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
namespace PSParallelPipeline;
55

6-
internal static class ExceptionHelpers
6+
internal static class ExceptionHelper
77
{
88
private const string _notsupported =
99
"Passed-in script block variables are not supported, and can result in undefined behavior.";
@@ -15,17 +15,33 @@ internal static void WriteTimeoutError(this Exception exception, PSCmdlet cmdlet
1515
ErrorCategory.OperationTimeout,
1616
cmdlet));
1717

18-
internal static void WriteUnspecifiedError(this Exception exception, PSCmdlet cmdlet) =>
19-
cmdlet.WriteError(new ErrorRecord(
20-
exception, "UnspecifiedCmdletError", ErrorCategory.NotSpecified, cmdlet));
21-
2218
internal static PSOutputData CreateProcessingTaskError(this Exception exception, object context) =>
2319
PSOutputData.WriteError(new ErrorRecord(
2420
exception, "ProcessingTask", ErrorCategory.NotSpecified, context));
2521

22+
internal static void ThrowFunctionNotFoundError(
23+
this CommandNotFoundException exception,
24+
Cmdlet cmdlet,
25+
string function) =>
26+
cmdlet.ThrowTerminatingError(new ErrorRecord(
27+
exception, "FunctionNotFound", ErrorCategory.ObjectNotFound, function));
28+
2629
private static bool ValueIsNotScriptBlock(object? value) =>
2730
value is not ScriptBlock and not PSObject { BaseObject: ScriptBlock };
2831

32+
internal static CommandInfo ThrowIfFunctionNotFoundError(
33+
this CommandInfo? command,
34+
string function)
35+
{
36+
if (command is not null)
37+
{
38+
return command;
39+
}
40+
41+
throw new CommandNotFoundException(
42+
$"The function with name '{function}' could not be found.");
43+
}
44+
2945
internal static void ThrowIfVariableIsScriptBlock(this PSCmdlet cmdlet, object? value)
3046
{
3147
if (ValueIsNotScriptBlock(value))
@@ -40,7 +56,7 @@ internal static void ThrowIfVariableIsScriptBlock(this PSCmdlet cmdlet, object?
4056
value));
4157
}
4258

43-
internal static void ThrowIfInputObjectIsScriptBlock(this PSCmdlet cmdlet, object? value)
59+
internal static void ThrowIfInputObjectIsScriptBlock(this object? value, PSCmdlet cmdlet)
4460
{
4561
if (ValueIsNotScriptBlock(value))
4662
{

src/PSParallelPipeline/Extensions.cs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,22 @@ internal static InitialSessionState AddFunctions(
2020
{
2121
foreach (string function in functionsToAdd)
2222
{
23-
CommandInfo? commandInfo = cmdlet
24-
.InvokeCommand
25-
.GetCommand(function, CommandTypes.Function);
26-
27-
if (commandInfo is null)
23+
try
2824
{
29-
continue;
25+
CommandInfo commandInfo = cmdlet
26+
.InvokeCommand
27+
.GetCommand(function, CommandTypes.Function)
28+
.ThrowIfFunctionNotFoundError(function);
29+
30+
initialSessionState.Commands.Add(
31+
new SessionStateFunctionEntry(
32+
name: function,
33+
definition: commandInfo.Definition));
34+
}
35+
catch (CommandNotFoundException exception)
36+
{
37+
exception.ThrowFunctionNotFoundError(cmdlet, function);
3038
}
31-
32-
initialSessionState.Commands.Add(new SessionStateFunctionEntry(
33-
name: function,
34-
definition: commandInfo.Definition));
3539
}
3640
}
3741

src/PSParallelPipeline/PSOutputStreams.cs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
using System;
22
using System.Collections.Concurrent;
33
using System.Management.Automation;
4-
using System.Threading;
54

65
namespace PSParallelPipeline;
76

87
internal sealed class PSOutputStreams : IDisposable
98
{
10-
private BlockingCollection<PSOutputData> OutputPipe { get => _worker.OutputPipe; }
9+
private BlockingCollection<PSOutputData> Output { get; }
1110

1211
internal PSDataCollection<PSObject> Success { get; } = [];
1312

@@ -23,17 +22,15 @@ internal sealed class PSOutputStreams : IDisposable
2322

2423
internal PSDataCollection<WarningRecord> Warning { get; } = [];
2524

26-
private readonly Worker _worker;
27-
28-
internal PSOutputStreams(Worker worker)
25+
internal PSOutputStreams(BlockingCollection<PSOutputData> output)
2926
{
30-
_worker = worker;
31-
SetStreamHandlers();
27+
Output = output;
28+
SetHandlers();
3229
}
3330

34-
internal void AddOutput(PSOutputData data) => OutputPipe.Add(data);
31+
internal void AddOutput(PSOutputData data) => Output.Add(data);
3532

36-
private void SetStreamHandlers()
33+
private void SetHandlers()
3734
{
3835
Success.DataAdding += (s, e) =>
3936
AddOutput(PSOutputData.WriteObject(e.ItemAdded));
@@ -66,7 +63,6 @@ public void Dispose()
6663
Progress.Dispose();
6764
Verbose.Dispose();
6865
Warning.Dispose();
69-
OutputPipe.Dispose();
7066
GC.SuppressFinalize(this);
7167
}
7268
}

0 commit comments

Comments
 (0)