|
3 | 3 | import functools
|
4 | 4 | import inspect
|
5 | 5 | import io
|
6 |
| -import os |
7 |
| -import subprocess |
8 | 6 | import sys
|
9 |
| -import tempfile |
10 | 7 | import textwrap
|
11 | 8 | import types
|
12 | 9 | import pytest
|
@@ -58,34 +55,6 @@ def check_timings(prof):
|
58 | 55 | assert any(timings.values()), f'Expected timing entries, got {timings!r}'
|
59 | 56 |
|
60 | 57 |
|
61 |
| -def run_in_subproc(code): |
62 |
| - """ |
63 |
| - Run Python code in a subprocess so that it doesn't pollute the |
64 |
| - state of the current interpretor. |
65 |
| -
|
66 |
| - Notes |
67 |
| - ----- |
68 |
| - The code is written to a tempfile and run in a subprocess; |
69 |
| - compared with using `runpy.run_path()`, this results in more |
70 |
| - informative error messages. |
71 |
| - """ |
72 |
| - code = strip(code) |
73 |
| - with tempfile.TemporaryDirectory() as tmpdir: |
74 |
| - curdir = os.path.abspath(os.curdir) |
75 |
| - os.chdir(tmpdir) |
76 |
| - try: |
77 |
| - fname = 'my_test.py' |
78 |
| - with open(fname, mode='w') as fobj: |
79 |
| - print(code, file=fobj) |
80 |
| - proc = subprocess.run([sys.executable, fname], |
81 |
| - capture_output=True, text=True) |
82 |
| - finally: |
83 |
| - os.chdir(curdir) |
84 |
| - print(proc.stdout) |
85 |
| - print(proc.stderr, file=sys.stderr) |
86 |
| - proc.check_returncode() |
87 |
| - |
88 |
| - |
89 | 58 | def test_init():
|
90 | 59 | lp = LineProfiler()
|
91 | 60 | assert lp.functions == []
|
@@ -133,153 +102,6 @@ def test_enable_disable():
|
133 | 102 | assert lp.last_time == {}
|
134 | 103 |
|
135 | 104 |
|
136 |
| -def test_trace_callback_preservation(): |
137 |
| - """ |
138 |
| - Test in a subprocess that the profiler restores the active trace |
139 |
| - callback after it's disabled. |
140 |
| - """ |
141 |
| - run_in_subproc(r""" |
142 |
| - import sys |
143 |
| - from typing import Any, Callable, Literal, Union |
144 |
| - from types import FrameType |
145 |
| -
|
146 |
| - from line_profiler import LineProfiler |
147 |
| -
|
148 |
| -
|
149 |
| - Event = Literal['call', 'line', 'return', 'exception', 'opcode'] |
150 |
| - TracingFunc = Callable[[FrameType, Event, Any], Union['TracingFunc', None]] |
151 |
| -
|
152 |
| -
|
153 |
| - def test_restoring_trace_callback(call: Union[TracingFunc, None]) -> None: |
154 |
| - sys.settrace(call) |
155 |
| - assert sys.gettrace() is call, f'can\'t set trace to {call!r}' |
156 |
| - profile = LineProfiler(wrap_trace=False) |
157 |
| - profile.enable_by_count() |
158 |
| - assert sys.gettrace() is profile, f'can\'t set trace to the profiler' |
159 |
| - profile.disable_by_count() |
160 |
| - assert sys.gettrace() is call, f'trace not restored to {call!r}' |
161 |
| - sys.settrace(None) |
162 |
| -
|
163 |
| -
|
164 |
| - def main() -> None: |
165 |
| - test_restoring_trace_callback(None) |
166 |
| - test_restoring_trace_callback(lambda frame, event, arg: None) |
167 |
| -
|
168 |
| -
|
169 |
| - if __name__ == '__main__': |
170 |
| - main() |
171 |
| - """) |
172 |
| - |
173 |
| - |
174 |
| -@pytest.mark.parametrize('wrap_trace', [True, False]) |
175 |
| -def test_trace_callback_wrapping(wrap_trace: bool) -> None: |
176 |
| - """ |
177 |
| - Test in a subprocess that the profiler can wrap around an existing |
178 |
| - trace callback such that we both profile the code and do whatever |
179 |
| - the existing callback does. |
180 |
| - """ |
181 |
| - code = strip(r""" |
182 |
| - import contextlib |
183 |
| - import sys |
184 |
| - from io import StringIO |
185 |
| - from types import FrameType |
186 |
| - from typing import Any, Callable, Generator, List, Literal, Union |
187 |
| -
|
188 |
| - from line_profiler import LineProfiler |
189 |
| -
|
190 |
| -
|
191 |
| - DEBUG = False |
192 |
| -
|
193 |
| - Event = Literal['call', 'line', 'return', 'exception', 'opcode'] |
194 |
| - TracingFunc = Callable[[FrameType, Event, Any], Union['TracingFunc', None]] |
195 |
| -
|
196 |
| -
|
197 |
| - def foo(n: int) -> int: |
198 |
| - result = 0 |
199 |
| - for i in range(1, n + 1): |
200 |
| - result += i |
201 |
| - return result |
202 |
| -
|
203 |
| -
|
204 |
| - def get_logger(logs: List[str]) -> TracingFunc: |
205 |
| - ''' |
206 |
| - Append a `foo: i = <...>` message whenever we hit the line in |
207 |
| - `foo()` containing the incrementation of `result`. |
208 |
| - ''' |
209 |
| - def callback(frame: FrameType, event: Event, |
210 |
| - _) -> Union[TracingFunc, None]: |
211 |
| - if DEBUG and callback.emit_debug: |
212 |
| - print('{0.co_filename}:{1.f_lineno} - {0.co_name} ({2})' |
213 |
| - .format(frame.f_code, frame, event)) |
214 |
| - if event == 'call': |
215 |
| - # Set up tracing for nested scopes |
216 |
| - return callback |
217 |
| - if event != 'line': |
218 |
| - return # Only trace line events |
219 |
| - if frame.f_code.co_name == 'foo' and frame.f_lineno == _LINENO_: |
220 |
| - # Add log entry whenever the target line is hit |
221 |
| - logs.append(f'foo: i = {frame.f_locals.get("i")}') |
222 |
| - return callback |
223 |
| -
|
224 |
| - callback.emit_debug = False |
225 |
| - return callback |
226 |
| -
|
227 |
| -
|
228 |
| - def main(wrap_trace: bool = _WRAP_TRACE_) -> None: |
229 |
| - logs = [] |
230 |
| - my_callback = get_logger(logs) |
231 |
| - sys.settrace(my_callback) |
232 |
| -
|
233 |
| - profile = LineProfiler(wrap_trace=wrap_trace) |
234 |
| - foo_wrapped = profile(foo) |
235 |
| -
|
236 |
| - for stage, wrap, expect_output in [ |
237 |
| - ('sanity check, no profiling', False, True), |
238 |
| - ('profiled', True, wrap_trace)]: |
239 |
| - assert sys.gettrace() is my_callback, ( |
240 |
| - stage + ': can\'t set custom trace') |
241 |
| - foo_like = profile(foo) if wrap else foo |
242 |
| - my_callback.emit_debug = True |
243 |
| - x = foo_like(5) |
244 |
| - my_callback.emit_debug = False |
245 |
| - assert x == 15, f'{stage}: expected `foo(5) = 15`, got {x!r}' |
246 |
| - print(f'Logs ({stage}): {logs!r}') |
247 |
| - assert sys.gettrace() is my_callback, ( |
248 |
| - stage + ': trace not restored afterwards') |
249 |
| -
|
250 |
| - # Check that the existing trace function has been called |
251 |
| - # where appropriate |
252 |
| - expected = [f'foo: i = {i}' for i in range(1, 6)] |
253 |
| - if expect_output: |
254 |
| - assert logs == expected, ( |
255 |
| - f'{stage}: expected logs = {expected!r}, ' |
256 |
| - f'got {logs!r}') |
257 |
| - else: |
258 |
| - assert not logs, f'{stage}: expected no logs, got {logs!r}' |
259 |
| - logs.clear() |
260 |
| - sys.settrace(None) |
261 |
| -
|
262 |
| - # Check that the profiling is as expected: 5 hits on the |
263 |
| - # incrementation line |
264 |
| - with StringIO() as sio: |
265 |
| - profile.print_stats(stream=sio, summarize=True) |
266 |
| - out = sio.getvalue() |
267 |
| - print(out) |
268 |
| - line, = (line for line in out.splitlines() if '+=' in line) |
269 |
| - nhits = int(line.split()[1]) |
270 |
| - assert nhits == 5, f'expected 5 profiler hits, got {nhits!r}' |
271 |
| -
|
272 |
| -
|
273 |
| - if __name__ == '__main__': |
274 |
| - main() |
275 |
| - """) |
276 |
| - incr_lineno = 1 + next(i for i, line in enumerate(code.splitlines()) |
277 |
| - if line.endswith('result += i')) |
278 |
| - run_in_subproc(code |
279 |
| - .replace('_LINENO_', str(incr_lineno)) |
280 |
| - .replace('_WRAP_TRACE_', str(wrap_trace))) |
281 |
| - |
282 |
| - |
283 | 105 | def test_double_decoration():
|
284 | 106 | """
|
285 | 107 | Test that wrapping the same function twice does not result in
|
|
0 commit comments