Skip to content

Commit af97557

Browse files
CopiloteleanorjboydCopilot
authored
Implementing result pipe drainage on cancellation (#25981)
Pull request created by AI Agent --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: Eleanor Boyd <26030610+eleanorjboyd@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 5389068 commit af97557

4 files changed

Lines changed: 246 additions & 17 deletions

File tree

src/client/testing/testController/common/utils.ts

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,36 @@ export function createTestingDeferred(): Deferred<void> {
2828
return createDeferred<void>();
2929
}
3030

31+
// Maximum time (ms) to wait for the result pipe to drain after the subprocess exits.
32+
// Acts as a backstop in case `reader.onClose` never fires (e.g. abnormal subprocess exit,
33+
// platform-specific named-pipe quirks) so the adapter's `finally` block can never hang.
34+
export const RESULT_PIPE_DRAIN_TIMEOUT_MS = 5_000;
35+
36+
/**
37+
* Awaits `deferred.promise` but resolves after at most `timeoutMs` so callers
38+
* cannot hang indefinitely if the underlying event source never fires.
39+
*/
40+
export async function awaitDeferredWithTimeout<T>(deferred: Deferred<T>, timeoutMs: number): Promise<void> {
41+
let timeoutHandle: NodeJS.Timeout | undefined;
42+
try {
43+
await Promise.race([
44+
deferred.promise,
45+
new Promise<void>((resolve) => {
46+
timeoutHandle = setTimeout(() => {
47+
traceVerbose(
48+
`awaitDeferredWithTimeout: deferred did not settle within ${timeoutMs}ms; giving up and continuing.`,
49+
);
50+
resolve();
51+
}, timeoutMs);
52+
}),
53+
]);
54+
} finally {
55+
if (timeoutHandle) {
56+
clearTimeout(timeoutHandle);
57+
}
58+
}
59+
}
60+
3161
interface ExecutionResultMessage extends Message {
3262
params: ExecutionTestPayload;
3363
}
@@ -89,6 +119,8 @@ export async function startRunResultNamedPipe(
89119
traceVerbose('Starting Test Result named pipe');
90120
const pipeName: string = generateRandomPipeName('python-test-results');
91121

122+
// `cancellationToken` only cancels pipe creation; disposal is driven by
123+
// `reader.onClose` so buffered results are not dropped on cancel.
92124
const reader = await createReaderPipe(pipeName, cancellationToken);
93125
traceVerbose(`Test Results named pipe ${pipeName} connected`);
94126
let disposables: Disposable[] = [];
@@ -99,14 +131,6 @@ export async function startRunResultNamedPipe(
99131
deferredTillServerClose.resolve();
100132
});
101133

102-
if (cancellationToken) {
103-
disposables.push(
104-
cancellationToken?.onCancellationRequested(() => {
105-
traceLog(`Test Result named pipe ${pipeName} cancelled`);
106-
disposable.dispose();
107-
}),
108-
);
109-
}
110134
disposables.push(
111135
reader,
112136
reader.listen((data: Message) => {
@@ -115,9 +139,7 @@ export async function startRunResultNamedPipe(
115139
dataReceivedCallback((data as ExecutionResultMessage).params as ExecutionTestPayload);
116140
}),
117141
reader.onClose(() => {
118-
// this is called once the server close, once per run instance
119142
traceVerbose(`Test Result named pipe ${pipeName} closed. Disposing of listener/s.`);
120-
// dispose of all data listeners and cancelation listeners
121143
disposable.dispose();
122144
}),
123145
reader.onError((error) => {

src/client/testing/testController/pytest/pytestExecutionAdapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
7676
project,
7777
);
7878
} finally {
79-
await deferredTillServerClose.promise;
79+
await utils.awaitDeferredWithTimeout(deferredTillServerClose, utils.RESULT_PIPE_DRAIN_TIMEOUT_MS);
8080
}
8181
}
8282

src/client/testing/testController/unittest/testExecutionAdapter.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,10 @@ export class UnittestTestExecutionAdapter implements ITestExecutionAdapter {
7070
cSource.token, // token to cancel
7171
);
7272
runInstance.token.onCancellationRequested(() => {
73-
console.log(`Test run cancelled, resolving 'till TillAllServerClose' deferred for ${uri.fsPath}.`);
74-
// if canceled, stop listening for results
75-
deferredTillServerClose.resolve();
73+
traceInfo(`Test run cancelled for ${uri.fsPath}; waiting for result pipe to drain.`);
74+
// Don't resolve the deferred here: the pipe must drain first.
75+
// `reader.onClose` in `startRunResultNamedPipe` will resolve it
76+
// once the subprocess closes its end of the pipe.
7677
});
7778
try {
7879
await this.runTestsNew(
@@ -89,7 +90,7 @@ export class UnittestTestExecutionAdapter implements ITestExecutionAdapter {
8990
} catch (error) {
9091
traceError(`Error in running unittest tests: ${error}`);
9192
} finally {
92-
await deferredTillServerClose.promise;
93+
await utils.awaitDeferredWithTimeout(deferredTillServerClose, utils.RESULT_PIPE_DRAIN_TIMEOUT_MS);
9394
}
9495
}
9596

src/test/testing/testController/utils.unit.test.ts

Lines changed: 208 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,29 @@ import * as assert from 'assert';
22
import * as sinon from 'sinon';
33
import * as fs from 'fs';
44
import * as path from 'path';
5-
import { CancellationToken, TestController, TestItem, Uri, Range, Position } from 'vscode';
6-
import { writeTestIdsFile, populateTestTree } from '../../../client/testing/testController/common/utils';
5+
import { CancellationToken, CancellationTokenSource, TestController, TestItem, Uri, Range, Position } from 'vscode';
6+
import {
7+
Emitter,
8+
Event,
9+
MessageReader,
10+
PartialMessageInfo,
11+
Disposable as RpcDisposable,
12+
DataCallback,
13+
} from 'vscode-jsonrpc';
14+
import { Message } from 'vscode-jsonrpc';
15+
import {
16+
writeTestIdsFile,
17+
populateTestTree,
18+
startRunResultNamedPipe,
19+
awaitDeferredWithTimeout,
20+
} from '../../../client/testing/testController/common/utils';
21+
import { createDeferred, Deferred } from '../../../client/common/utils/async';
22+
import * as namedPipes from '../../../client/common/pipes/namedPipes';
723
import { EXTENSION_ROOT_DIR } from '../../../client/constants';
824
import {
925
DiscoveredTestNode,
1026
DiscoveredTestItem,
27+
ExecutionTestPayload,
1128
ITestResultResolver,
1229
} from '../../../client/testing/testController/common/types';
1330
import { RunTestTag, DebugTestTag } from '../../../client/testing/testController/common/testItemUtilities';
@@ -752,3 +769,192 @@ suite('populateTestTree tests', () => {
752769
assert.deepStrictEqual(mockTestItem2.range, new Range(new Position(6, 0), new Position(7, 0)));
753770
});
754771
});
772+
773+
suite('startRunResultNamedPipe drain-on-cancel tests', () => {
774+
let sandbox: sinon.SinonSandbox;
775+
let createReaderPipeStub: sinon.SinonStub;
776+
777+
// Minimal `MessageReader` fake exposing only what `startRunResultNamedPipe` uses.
778+
class FakeMessageReader implements MessageReader {
779+
private _onClose = new Emitter<void>();
780+
781+
private _onError = new Emitter<Error>();
782+
783+
private _onPartialMessage = new Emitter<PartialMessageInfo>();
784+
785+
private _callback: DataCallback | undefined;
786+
787+
public disposed = false;
788+
789+
public onError: Event<Error> = this._onError.event;
790+
791+
public onClose: Event<void> = this._onClose.event;
792+
793+
public onPartialMessage: Event<PartialMessageInfo> = this._onPartialMessage.event;
794+
795+
public listen(callback: DataCallback): RpcDisposable {
796+
this._callback = callback;
797+
return {
798+
dispose: () => {
799+
this._callback = undefined;
800+
},
801+
};
802+
}
803+
804+
public dispose(): void {
805+
this.disposed = true;
806+
this._onClose.dispose();
807+
this._onError.dispose();
808+
this._onPartialMessage.dispose();
809+
}
810+
811+
// Test helpers.
812+
public emit(message: Message): void {
813+
this._callback?.(message);
814+
}
815+
816+
public hasListener(): boolean {
817+
return this._callback !== undefined;
818+
}
819+
820+
public fireClose(): void {
821+
this._onClose.fire();
822+
}
823+
}
824+
825+
setup(() => {
826+
sandbox = sinon.createSandbox();
827+
});
828+
829+
teardown(() => {
830+
sandbox.restore();
831+
});
832+
833+
function makeMessage(payload: Partial<ExecutionTestPayload>): Message {
834+
// Fill in required ExecutionTestPayload fields so tests exercise a shape close
835+
// to what real runners send and don't drift from the schema over time.
836+
const full: ExecutionTestPayload = {
837+
cwd: '',
838+
status: 'success',
839+
error: '',
840+
...payload,
841+
};
842+
return ({ jsonrpc: '2.0', params: full } as unknown) as Message;
843+
}
844+
845+
test('cancellation alone does NOT resolve deferredTillServerClose and does NOT detach the listener (drain not interrupted)', async () => {
846+
const reader = new FakeMessageReader();
847+
createReaderPipeStub = sandbox.stub(namedPipes, 'createReaderPipe').resolves(reader);
848+
849+
const received: ExecutionTestPayload[] = [];
850+
const deferredTillServerClose: Deferred<void> = createDeferred<void>();
851+
const cancelSource = new CancellationTokenSource();
852+
853+
await startRunResultNamedPipe((payload) => received.push(payload), deferredTillServerClose, cancelSource.token);
854+
855+
assert.ok(createReaderPipeStub.calledOnce, 'createReaderPipe should be called once');
856+
assert.ok(reader.hasListener(), 'reader should have a listener registered before cancel');
857+
858+
// Trigger cancellation.
859+
cancelSource.cancel();
860+
861+
// Yield to let any synchronous-then-microtask handlers run.
862+
await new Promise((r) => setImmediate(r));
863+
864+
assert.strictEqual(
865+
reader.disposed,
866+
false,
867+
'reader must NOT be disposed by cancellation alone (otherwise buffered data would be lost)',
868+
);
869+
assert.ok(reader.hasListener(), 'data listener must remain attached after cancel so the drain can continue');
870+
assert.strictEqual(
871+
(deferredTillServerClose as Deferred<void>).completed,
872+
false,
873+
'deferredTillServerClose must NOT resolve on cancellation; it should only resolve when the pipe closes',
874+
);
875+
876+
cancelSource.dispose();
877+
});
878+
879+
test('data emitted after cancellation is still delivered to the callback (drain works)', async () => {
880+
const reader = new FakeMessageReader();
881+
sandbox.stub(namedPipes, 'createReaderPipe').resolves(reader);
882+
883+
const received: ExecutionTestPayload[] = [];
884+
const deferredTillServerClose: Deferred<void> = createDeferred<void>();
885+
const cancelSource = new CancellationTokenSource();
886+
887+
await startRunResultNamedPipe((payload) => received.push(payload), deferredTillServerClose, cancelSource.token);
888+
889+
// Simulate the debug-path race: cancel fires while results are still buffered.
890+
cancelSource.cancel();
891+
await new Promise((r) => setImmediate(r));
892+
893+
// Buffered messages arrive after cancellation.
894+
reader.emit(makeMessage({ cwd: 'a' }));
895+
reader.emit(makeMessage({ cwd: 'b' }));
896+
897+
assert.strictEqual(received.length, 2, 'messages emitted after cancel must still reach the callback');
898+
assert.deepStrictEqual(
899+
received.map((p) => p.cwd),
900+
['a', 'b'],
901+
'all buffered results delivered in order',
902+
);
903+
904+
// Subprocess closes its end of the pipe -> onClose fires -> dispose.
905+
reader.fireClose();
906+
await deferredTillServerClose.promise;
907+
908+
assert.strictEqual(reader.disposed, true, 'reader disposed via onClose path');
909+
910+
cancelSource.dispose();
911+
});
912+
913+
test('reader.onClose resolves deferredTillServerClose and disposes the reader (natural completion path, no cancellation)', async () => {
914+
const reader = new FakeMessageReader();
915+
sandbox.stub(namedPipes, 'createReaderPipe').resolves(reader);
916+
917+
const deferredTillServerClose: Deferred<void> = createDeferred<void>();
918+
919+
await startRunResultNamedPipe(
920+
() => {
921+
/* no-op */
922+
},
923+
deferredTillServerClose,
924+
undefined,
925+
);
926+
927+
assert.strictEqual(
928+
(deferredTillServerClose as Deferred<void>).completed,
929+
false,
930+
'deferred unresolved before close',
931+
);
932+
933+
reader.fireClose();
934+
await deferredTillServerClose.promise;
935+
936+
assert.strictEqual(reader.disposed, true, 'reader disposed when onClose fires');
937+
});
938+
});
939+
940+
suite('awaitDeferredWithTimeout', () => {
941+
test('resolves promptly when the deferred resolves before the timeout', async () => {
942+
const deferred = createDeferred<void>();
943+
const started = Date.now();
944+
const waiter = awaitDeferredWithTimeout(deferred, 5000);
945+
setTimeout(() => deferred.resolve(), 10);
946+
await waiter;
947+
const elapsed = Date.now() - started;
948+
assert.ok(elapsed < 1000, `should resolve well before timeout, took ${elapsed}ms`);
949+
});
950+
951+
test('resolves after the timeout when the deferred never settles (no hang)', async () => {
952+
const deferred = createDeferred<void>();
953+
const started = Date.now();
954+
await awaitDeferredWithTimeout(deferred, 50);
955+
const elapsed = Date.now() - started;
956+
assert.ok(elapsed >= 50, `should wait at least the timeout, took ${elapsed}ms`);
957+
assert.ok(elapsed < 2000, `should not hang well beyond timeout, took ${elapsed}ms`);
958+
assert.strictEqual(deferred.completed, false, 'underlying deferred remains unresolved');
959+
});
960+
});

0 commit comments

Comments
 (0)