Skip to content

Remove APM use outside of TaskHost node provider / endpoint #11800

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions src/Build/BackEnd/Client/MSBuildClientPacketPump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@
using System.Collections.Concurrent;
using System.IO;
using System.Threading;

#if NET
using System.Threading.Tasks;
#endif

using Microsoft.Build.Internal;
using Microsoft.Build.Shared;

Expand Down Expand Up @@ -204,11 +200,7 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
try
{
byte[] headerByte = new byte[5];
#if FEATURE_APM
IAsyncResult result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
Task<int> readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length).AsTask();
#endif

bool continueReading = true;
do
Expand All @@ -220,11 +212,7 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
WaitHandle[] handles =
[
localPacketPumpShutdownEvent,
#if FEATURE_APM
result.AsyncWaitHandle
#else
((IAsyncResult)readTask).AsyncWaitHandle
#endif
];
int waitId = WaitHandle.WaitAny(handles);
switch (waitId)
Expand All @@ -239,11 +227,7 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
{
// Client recieved a packet header. Read the rest of it.
int headerBytesRead = 0;
#if FEATURE_APM
headerBytesRead = localStream.EndRead(result);
#else
headerBytesRead = readTask.Result;
#endif

if ((headerBytesRead != headerByte.Length) && !localPacketPumpShutdownEvent.WaitOne(0))
{
Expand Down Expand Up @@ -303,11 +287,7 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
else
{
// Start reading the next package header.
#if FEATURE_APM
result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length).AsTask();
#endif
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,17 +634,12 @@ public NodeContext(int nodeId, Process process,
/// </summary>
public void BeginAsyncPacketRead()
{
#if FEATURE_APM
_clientToServerStream.BeginRead(_headerByte, 0, _headerByte.Length, HeaderReadComplete, this);
#else
ThreadPool.QueueUserWorkItem(delegate
{
var ignored = RunPacketReadLoopAsync();
});
#endif
}

#if !FEATURE_APM
public async Task RunPacketReadLoopAsync()
{
while (true)
Expand All @@ -667,6 +662,9 @@ public async Task RunPacketReadLoopAsync()

NodePacketType packetType = (NodePacketType)_headerByte[0];
int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(_headerByte, 1, 4));
#if NETFRAMEWORK
MSBuildEventSource.Log.PacketReadSize(packetLength);
#endif

_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();
Expand Down Expand Up @@ -700,7 +698,6 @@ public async Task RunPacketReadLoopAsync()
}
}
}
#endif

/// <summary>
/// Sends the specified packet to this node asynchronously.
Expand Down Expand Up @@ -910,53 +907,6 @@ private bool ProcessHeaderBytesRead(int bytesRead)
return true;
}

#if FEATURE_APM
/// <summary>
/// Callback invoked by the completion of a read of a header byte on one of the named pipes.
/// </summary>
private void HeaderReadComplete(IAsyncResult result)
{
int bytesRead;
try
{
try
{
bytesRead = _clientToServerStream.EndRead(result);
}

// Workaround for CLR stress bug; it sporadically calls us twice on the same async
// result, and EndRead will throw on the second one. Pretend the second one never happened.
catch (ArgumentException)
{
CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring");
return;
}

if (!ProcessHeaderBytesRead(bytesRead))
{
return;
}
}
catch (IOException e)
{
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in HeaderReadComplete: {0}", e);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return;
}

int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(_headerByte, 1, 4));
MSBuildEventSource.Log.PacketReadSize(packetLength);

// Ensures the buffer is at least this length.
// It avoids reallocations if the buffer is already large enough.
_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();

_clientToServerStream.BeginRead(packetData, 0, packetLength, BodyReadComplete, new Tuple<byte[], int>(packetData, packetLength));
}
#endif

private bool ProcessBodyBytesRead(int bytesRead, int packetLength, NodePacketType packetType)
{
if (bytesRead != packetLength)
Expand Down Expand Up @@ -990,64 +940,6 @@ private bool ReadAndRoutePacket(NodePacketType packetType, byte[] packetData, in
}
return true;
}

#if FEATURE_APM
/// <summary>
/// Method called when the body of a packet has been read.
/// </summary>
private void BodyReadComplete(IAsyncResult result)
{
NodePacketType packetType = (NodePacketType)_headerByte[0];
var state = (Tuple<byte[], int>)result.AsyncState;
byte[] packetData = state.Item1;
int packetLength = state.Item2;
int bytesRead;

try
{
try
{
bytesRead = _clientToServerStream.EndRead(result);
}

// Workaround for CLR stress bug; it sporadically calls us twice on the same async
// result, and EndRead will throw on the second one. Pretend the second one never happened.
catch (ArgumentException)
{
CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring");
return;
}

if (!ProcessBodyBytesRead(bytesRead, packetLength, packetType))
{
return;
}
}
catch (IOException e)
{
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in BodyReadComplete (Reading): {0}", e);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return;
}

// Read and route the packet.
if (!ReadAndRoutePacket(packetType, packetData, packetLength))
{
return;
}

if (packetType != NodePacketType.NodeShutdown)
{
// Read the next packet.
BeginAsyncPacketRead();
}
else
{
Close();
}
}
#endif
}
}
}
10 changes: 6 additions & 4 deletions src/Shared/CommunicationsUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

#if !CLR2COMPATIBILITY
using Microsoft.Build.Shared.Debugging;
#endif
#if !FEATURE_APM
using System.Threading.Tasks;
#endif

Expand Down Expand Up @@ -588,13 +586,17 @@ internal static int ReadIntForHandshake(this PipeStream stream, byte? byteToAcce
}
#nullable disable

#if !FEATURE_APM
#if !TASKHOST
internal static async ValueTask<int> ReadAsync(Stream stream, byte[] buffer, int bytesToRead)
{
int totalBytesRead = 0;
while (totalBytesRead < bytesToRead)
{
int bytesRead = await stream.ReadAsync(buffer.AsMemory(totalBytesRead, bytesToRead - totalBytesRead), CancellationToken.None);
#if NET
int bytesRead = await stream.ReadAsync(buffer.AsMemory(totalBytesRead, bytesToRead - totalBytesRead)).ConfigureAwait(false);
#else
int bytesRead = await stream.ReadAsync(buffer, totalBytesRead, bytesToRead - totalBytesRead).ConfigureAwait(false);
#endif
if (bytesRead == 0)
{
return totalBytesRead;
Expand Down
24 changes: 8 additions & 16 deletions src/Shared/NodeEndpointOutOfProcBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
using System.Security.Principal;

#endif
Copy link
Preview

Copilot AI May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider adding an inline comment here to clarify that '!TASKHOST' triggers TPL-based concurrency and that APM logic is reserved for TaskHost builds.

Suggested change
#endif
#endif
// The '!TASKHOST' directive triggers TPL-based concurrency by including System.Threading.Tasks.
// APM (Asynchronous Programming Model) logic is reserved for TaskHost builds.

Copilot uses AI. Check for mistakes.

#if NET451_OR_GREATER || NETCOREAPP
#if !TASKHOST
using System.Threading.Tasks;
#endif

Expand Down Expand Up @@ -368,7 +368,7 @@ private void PacketPumpProc()
try
{
// Wait for a connection
Copy link
Preview

Copilot AI May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Ensure that TASKHOST is defined only for TaskHost scenarios since it now controls the use of legacy APM code in this section.

Suggested change
// Wait for a connection
// Wait for a connection
// Ensure TASKHOST is defined only for TaskHost scenarios, as it controls the use of legacy APM code.

Copilot uses AI. Check for mistakes.

#if FEATURE_APM
#if TASKHOST
IAsyncResult resultForConnection = localPipeServer.BeginWaitForConnection(null, null);
CommunicationsUtilities.Trace("Waiting for connection {0} ms...", waitTimeRemaining);
bool connected = resultForConnection.AsyncWaitHandle.WaitOne(waitTimeRemaining, false);
Expand All @@ -385,7 +385,7 @@ private void PacketPumpProc()
}

CommunicationsUtilities.Trace("Parent started connecting. Reading handshake from parent");
#if FEATURE_APM
#if TASKHOST
localPipeServer.EndWaitForConnection(resultForConnection);
#endif

Expand Down Expand Up @@ -521,9 +521,7 @@ private void RunReadLoop(BufferedReadStream localReadPipe, NamedPipeServerStream
// spammed to the endpoint and it never gets an opportunity to shutdown.
CommunicationsUtilities.Trace("Entering read loop.");
byte[] headerByte = new byte[5];
#if NET451_OR_GREATER
Task<int> readTask = localReadPipe.ReadAsync(headerByte, 0, headerByte.Length, CancellationToken.None);
#elif NETCOREAPP
#if !TASKHOST
Task<int> readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length).AsTask();
#else
IAsyncResult result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
Expand All @@ -533,7 +531,7 @@ private void RunReadLoop(BufferedReadStream localReadPipe, NamedPipeServerStream
// packets to be sent by other threads which are shutting down, such as the logging thread.
WaitHandle[] handles = new WaitHandle[]
{
#if NET451_OR_GREATER || NETCOREAPP
#if !TASKHOST
((IAsyncResult)readTask).AsyncWaitHandle,
#else
result.AsyncWaitHandle,
Expand All @@ -553,7 +551,7 @@ private void RunReadLoop(BufferedReadStream localReadPipe, NamedPipeServerStream
int bytesRead = 0;
try
{
#if NET451_OR_GREATER || NETCOREAPP
#if !TASKHOST
bytesRead = readTask.Result;
#else
bytesRead = localReadPipe.EndRead(result);
Expand Down Expand Up @@ -613,17 +611,11 @@ private void RunReadLoop(BufferedReadStream localReadPipe, NamedPipeServerStream
break;
}

#if NET451_OR_GREATER
readTask = localReadPipe.ReadAsync(headerByte, 0, headerByte.Length, CancellationToken.None);
#elif NETCOREAPP
#if !TASKHOST
readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length).AsTask();
#else
result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
#endif

#if NET451_OR_GREATER || NETCOREAPP
handles[0] = ((IAsyncResult)readTask).AsyncWaitHandle;
#else
result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
handles[0] = result.AsyncWaitHandle;
#endif
}
Expand Down
Loading