# coding: utf-8 ''' The idea is that we record the commands sent to the debugger and reproduce them from this script (so, this works as the client, which spawns the debugger as a separate process and communicates to it as if it was run from the outside) Note that it's a python script but it'll spawn a process to run as jython, ironpython and as python. ''' import time import pytest from tests_python import debugger_unittest from tests_python.debugger_unittest import (CMD_SET_PROPERTY_TRACE, REASON_CAUGHT_EXCEPTION, REASON_UNCAUGHT_EXCEPTION, REASON_STOP_ON_BREAKPOINT, REASON_THREAD_SUSPEND, overrides, CMD_THREAD_CREATE, CMD_GET_THREAD_STACK, REASON_STEP_INTO_MY_CODE, CMD_GET_EXCEPTION_DETAILS, IS_IRONPYTHON, IS_JYTHON, IS_CPYTHON, IS_APPVEYOR, wait_for_condition, CMD_GET_FRAME, CMD_GET_BREAKPOINT_EXCEPTION, CMD_THREAD_SUSPEND, CMD_STEP_OVER, REASON_STEP_OVER, CMD_THREAD_SUSPEND_SINGLE_NOTIFICATION, CMD_THREAD_RESUME_SINGLE_NOTIFICATION, REASON_STEP_RETURN, REASON_STEP_RETURN_MY_CODE, REASON_STEP_OVER_MY_CODE, REASON_STEP_INTO, CMD_THREAD_KILL, IS_PYPY, REASON_STOP_ON_START, CMD_SMART_STEP_INTO, CMD_GET_VARIABLE) from _pydevd_bundle.pydevd_constants import IS_WINDOWS, IS_PY38_OR_GREATER, \ IS_MAC from _pydevd_bundle.pydevd_comm_constants import CMD_RELOAD_CODE, CMD_INPUT_REQUESTED, \ CMD_RUN_CUSTOM_OPERATION import json import pydevd_file_utils import subprocess import threading from _pydev_bundle import pydev_log from urllib.parse import unquote, unquote_plus from tests_python.debug_constants import * # noqa pytest_plugins = [ str('tests_python.debugger_fixtures'), ] builtin_qualifier = "builtins" @pytest.mark.skipif(not IS_CPYTHON, reason='Test needs gc.get_referrers/reference counting to really check anything.') def test_case_referrers(case_setup): with case_setup.test_file('_debugger_case1.py') as writer: writer.log.append('writing add breakpoint') writer.write_add_breakpoint(6, 'set_up') writer.log.append('making initial run') writer.write_make_initial_run() writer.log.append('waiting for breakpoint hit') hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.log.append('get frame') writer.write_get_frame(thread_id, frame_id) writer.log.append('step over') writer.write_step_over(thread_id) writer.log.append('get frame') writer.write_get_frame(thread_id, frame_id) writer.log.append('run thread') writer.write_run_thread(thread_id) writer.log.append('asserting') try: assert 13 == writer._sequence, 'Expected 13. Had: %s' % writer._sequence except: writer.log.append('assert failed!') raise writer.log.append('asserted') writer.finished_ok = True def test_case_2(case_setup): with case_setup.test_file('_debugger_case2.py') as writer: writer.write_add_breakpoint(3, 'Call4') # seq = 3 writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_get_frame(thread_id, frame_id) # Note: write get frame but not waiting for it to be gotten. writer.write_add_breakpoint(14, 'Call2') writer.write_run_thread(thread_id) hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_get_frame(thread_id, frame_id) # Note: write get frame but not waiting for it to be gotten. writer.write_run_thread(thread_id) writer.log.append('Checking sequence. Found: %s' % (writer._sequence)) assert 15 == writer._sequence, 'Expected 15. Had: %s' % writer._sequence writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.parametrize( 'skip_suspend_on_breakpoint_exception, skip_print_breakpoint_exception', ( [['NameError'], []], [['NameError'], ['NameError']], [[], []], # Empty means it'll suspend/print in any exception [[], ['NameError']], [['ValueError'], ['Exception']], [['Exception'], ['ValueError']], # ValueError will also suspend/print since we're dealing with a NameError ) ) def test_case_breakpoint_condition_exc(case_setup, skip_suspend_on_breakpoint_exception, skip_print_breakpoint_exception): msgs_in_stderr = ( 'Error while evaluating expression in conditional breakpoint: i > 5', 'Traceback (most recent call last):', 'File "", line 1, in ', ) # It could be one or the other in PyPy depending on the version. msgs_one_in_stderr = ( "NameError: name 'i' is not defined", "global name 'i' is not defined", ) def _ignore_stderr_line(line): if original_ignore_stderr_line(line): return True for msg in msgs_in_stderr + msgs_one_in_stderr: if msg in line: return True return False with case_setup.test_file('_debugger_case_breakpoint_condition_exc.py') as writer: original_ignore_stderr_line = writer._ignore_stderr_line writer._ignore_stderr_line = _ignore_stderr_line writer.write_suspend_on_breakpoint_exception(skip_suspend_on_breakpoint_exception, skip_print_breakpoint_exception) expect_print = skip_print_breakpoint_exception in ([], ['ValueError']) expect_suspend = skip_suspend_on_breakpoint_exception in ([], ['ValueError']) breakpoint_id = writer.write_add_breakpoint( writer.get_line_index_with_content('break here'), 'Call', condition='i > 5') if not expect_print: _original = writer.reader_thread.on_message_found def on_message_found(found_msg): for msg in msgs_in_stderr + msgs_one_in_stderr: assert msg not in found_msg writer.reader_thread.on_message_found = on_message_found writer.write_make_initial_run() def check_error_msg(stderr): for msg in msgs_in_stderr: assert msg in stderr for msg in msgs_one_in_stderr: if msg in stderr: break else: raise AssertionError('Did not find any of: %s in stderr: %s' % ( msgs_one_in_stderr, stderr)) if expect_print: msg, ctx = writer.wait_for_output() check_error_msg(msg.replace('>', '>')) if expect_suspend: writer.wait_for_message(CMD_GET_BREAKPOINT_EXCEPTION) hit = writer.wait_for_breakpoint_hit() writer.write_run_thread(hit.thread_id) if IS_JYTHON: # Jython will break twice. if expect_suspend: writer.wait_for_message(CMD_GET_BREAKPOINT_EXCEPTION) hit = writer.wait_for_breakpoint_hit() writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_get_frame(thread_id, frame_id) msg = writer.wait_for_message(CMD_GET_FRAME) name_to_value = {} for var in msg.var: name_to_value[var['name']] = var['value'] assert name_to_value == {'i': 'int: 6', 'last_i': 'int: 6'} writer.write_remove_breakpoint(breakpoint_id) writer.write_run_thread(thread_id) writer.finished_ok = True def test_case_remove_breakpoint(case_setup): with case_setup.test_file('_debugger_case_remove_breakpoint.py') as writer: breakpoint_id = writer.write_add_breakpoint(writer.get_line_index_with_content('break here')) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_remove_breakpoint(breakpoint_id) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_double_remove_breakpoint(case_setup): with case_setup.test_file('_debugger_case_remove_breakpoint.py') as writer: breakpoint_id = writer.write_add_breakpoint(writer.get_line_index_with_content('break here')) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_remove_breakpoint(breakpoint_id) writer.write_remove_breakpoint(breakpoint_id) # Double-remove (just check that we don't have an error). writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_IRONPYTHON, reason='This test fails once in a while due to timing issues on IronPython, so, skipping it.') def test_case_3(case_setup): with case_setup.test_file('_debugger_case3.py') as writer: writer.write_make_initial_run() time.sleep(.5) breakpoint_id = writer.write_add_breakpoint(4, '') hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_get_frame(thread_id, frame_id) writer.write_run_thread(thread_id) hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_get_frame(thread_id, frame_id) writer.write_remove_breakpoint(breakpoint_id) writer.write_run_thread(thread_id) writer.finished_ok = True def test_case_suspend_thread(case_setup): with case_setup.test_file('_debugger_case4.py') as writer: writer.write_make_initial_run() thread_id = writer.wait_for_new_thread() writer.write_suspend_thread(thread_id) while True: hit = writer.wait_for_breakpoint_hit((REASON_THREAD_SUSPEND, REASON_STOP_ON_BREAKPOINT)) if hit.name == 'sleep': break # Ok, broke on 'sleep'. else: # i.e.: if it doesn't hit on 'sleep', release and pause again. writer.write_run_thread(thread_id) time.sleep(.1) writer.write_suspend_thread(thread_id) assert hit.thread_id == thread_id writer.write_evaluate_expression('%s\t%s\t%s' % (hit.thread_id, hit.frame_id, 'LOCAL'), 'exit_while_loop()') writer.wait_for_evaluation([ [ '%0A'.format(builtin_qualifier), '%0A%0A'.format(builtin_qualifier), '%0A%0A', # jython ] ]) writer.write_run_thread(hit.thread_id) assert 17 == writer._sequence, 'Expected 17. Had: %s' % writer._sequence writer.finished_ok = True def test_case_8(case_setup): with case_setup.test_file('_debugger_case89.py') as writer: writer.write_add_breakpoint(10, 'Method3') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') writer.write_step_return(hit.thread_id) hit = writer.wait_for_breakpoint_hit('109', line=15) writer.write_run_thread(hit.thread_id) assert 9 == writer._sequence, 'Expected 9. Had: %s' % writer._sequence writer.finished_ok = True def test_case_9(case_setup): with case_setup.test_file('_debugger_case89.py') as writer: writer.write_add_breakpoint(10, 'Method3') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') # Note: no active exception (should not give an error and should return no # exception details as there's no exception). writer.write_get_current_exception(hit.thread_id) msg = writer.wait_for_message(CMD_GET_EXCEPTION_DETAILS) assert msg.thread['id'] == hit.thread_id assert not hasattr(msg.thread, 'frames') # No frames should be found. writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit('108', line=11) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit('108', line=12) writer.write_run_thread(hit.thread_id) assert 13 == writer._sequence, 'Expected 13. Had: %s' % writer._sequence writer.finished_ok = True def test_case_10(case_setup): with case_setup.test_file('_debugger_case_simple_calls.py') as writer: writer.write_add_breakpoint(2, 'None') # None or Method should make hit. writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') writer.write_step_return(hit.thread_id) hit = writer.wait_for_breakpoint_hit('109', line=11) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit('108', line=12) writer.write_run_thread(hit.thread_id) assert 11 == writer._sequence, 'Expected 11. Had: %s' % writer._sequence writer.finished_ok = True def test_case_11(case_setup): with case_setup.test_file('_debugger_case_simple_calls.py') as writer: writer.write_add_breakpoint(2, 'Method1') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT, line=2) assert hit.name == 'Method1' writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_STEP_OVER, line=3) assert hit.name == 'Method1' writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_STEP_OVER, line=12) # Reverts to step in assert hit.name == 'Method2' writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_STEP_OVER, line=13) assert hit.name == 'Method2' writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_STEP_OVER, line=18) # Reverts to step in assert hit.name == '' # Finish with a step over writer.write_step_over(hit.thread_id) if IS_JYTHON: writer.write_run_thread(hit.thread_id) else: # Finish with a step over writer.write_step_over(hit.thread_id) writer.finished_ok = True def test_case_12(case_setup): # Note: In CPython we now ignore the function names, so, we'll stop at the breakpoint in line 2 # regardless of the function name (we decide whether to stop in a line or not through the function # lines). with case_setup.test_file('_debugger_case_simple_calls.py') as writer: writer.write_add_breakpoint(2, '') # Should not be hit: setting empty function (not None) should only hit global. writer.write_add_breakpoint(6, 'Method1a') writer.write_add_breakpoint(11, 'Method2') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111', line=11) writer.write_step_return(hit.thread_id) hit = writer.wait_for_breakpoint_hit('111', line=6 if IS_JYTHON else 2) # not a return (it stopped in the other breakpoint) writer.write_run_thread(hit.thread_id) if not IS_JYTHON: hit = writer.wait_for_breakpoint_hit('111', line=6) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_IRONPYTHON, reason='Failing on IronPython (needs to be investigated).') def test_case_13(case_setup): with case_setup.test_file('_debugger_case13.py') as writer: def _ignore_stderr_line(line): if original_ignore_stderr_line(line): return True if IS_JYTHON: for expected in ( "RuntimeWarning: Parent module '_pydevd_bundle' not found while handling absolute import", "import __builtin__"): if expected in line: return True return False original_ignore_stderr_line = writer._ignore_stderr_line writer._ignore_stderr_line = _ignore_stderr_line writer.write_add_breakpoint(35, 'main') writer.write("%s\t%s\t%s" % (CMD_SET_PROPERTY_TRACE, writer.next_seq(), "true;false;false;true")) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') writer.write_get_frame(hit.thread_id, hit.frame_id) writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107', line=25) # Should go inside setter method writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107', line=36) writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107', line=21) # Should go inside getter method writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107') # Disable property tracing writer.write("%s\t%s\t%s" % (CMD_SET_PROPERTY_TRACE, writer.next_seq(), "true;true;true;true")) writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107', line=39) # Should Skip step into properties setter # Enable property tracing writer.write("%s\t%s\t%s" % (CMD_SET_PROPERTY_TRACE, writer.next_seq(), "true;false;false;true")) writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107', line=8) # Should go inside getter method writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_14(case_setup): # Interactive Debug Console with case_setup.test_file('_debugger_case14.py') as writer: writer.write_add_breakpoint(22, 'main') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') assert hit.thread_id, '%s not valid.' % hit.thread_id assert hit.frame_id, '%s not valid.' % hit.frame_id # Access some variable writer.write_debug_console_expression("%s\t%s\tEVALUATE\tcarObj.color" % (hit.thread_id, hit.frame_id)) writer.wait_for_var(['False', '%27Black%27']) assert 7 == writer._sequence, 'Expected 9. Had: %s' % writer._sequence # Change some variable writer.write_debug_console_expression("%s\t%s\tEVALUATE\tcarObj.color='Red'" % (hit.thread_id, hit.frame_id)) writer.write_debug_console_expression("%s\t%s\tEVALUATE\tcarObj.color" % (hit.thread_id, hit.frame_id)) writer.wait_for_var(['False', '%27Red%27']) assert 11 == writer._sequence, 'Expected 13. Had: %s' % writer._sequence # Iterate some loop writer.write_debug_console_expression("%s\t%s\tEVALUATE\tfor i in range(3):" % (hit.thread_id, hit.frame_id)) writer.wait_for_var(['True']) writer.write_debug_console_expression("%s\t%s\tEVALUATE\t print(i)" % (hit.thread_id, hit.frame_id)) writer.wait_for_var(['True']) writer.write_debug_console_expression("%s\t%s\tEVALUATE\t" % (hit.thread_id, hit.frame_id)) writer.wait_for_var( [ 'False' ] ) assert 17 == writer._sequence, 'Expected 19. Had: %s' % writer._sequence writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_15(case_setup): with case_setup.test_file('_debugger_case15.py') as writer: writer.write_add_breakpoint(22, 'main') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT) # Access some variable writer.write_custom_operation("%s\t%s\tEXPRESSION\tcarObj.color" % (hit.thread_id, hit.frame_id), "EXEC", "f=lambda x: 'val=%s' % x", "f") writer.wait_for_custom_operation('val=Black') assert 7 == writer._sequence, 'Expected 7. Had: %s' % writer._sequence writer.write_custom_operation("%s\t%s\tEXPRESSION\tcarObj.color" % (hit.thread_id, hit.frame_id), "EXECFILE", debugger_unittest._get_debugger_test_file('_debugger_case15_execfile.py'), "f") writer.wait_for_custom_operation('val=Black') assert 9 == writer._sequence, 'Expected 9. Had: %s' % writer._sequence writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_16_resolve_numpy_array(case_setup): # numpy.ndarray resolver try: import numpy except ImportError: pytest.skip('numpy not available') with case_setup.test_file('_debugger_case16.py') as writer: writer.write_add_breakpoint(9, 'main') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT) # In this test we check that the three arrays of different shapes, sizes and types # are all resolved properly as ndarrays. # First pass check is that we have all three expected variables defined writer.write_get_frame(hit.thread_id, hit.frame_id) writer.wait_for_multiple_vars(( ( '', '' ), ( '', '' ), # Any of the ones below will do. ( '', '' ) )) # For each variable, check each of the resolved (meta data) attributes... writer.write_get_variable(hit.thread_id, hit.frame_id, 'smallarray') writer.wait_for_multiple_vars(( ''.format(builtin_qualifier), ''.format(builtin_qualifier), ''.format(builtin_qualifier), ''.format(builtin_qualifier), ], [ ''.format(builtin_qualifier), ''.format(builtin_qualifier), ''.format(builtin_qualifier), ''.format(builtin_qualifier), ], '%0A'.format(builtin_qualifier,)) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_19(case_setup): # Check evaluate '__' attributes with case_setup.test_file('_debugger_case19.py') as writer: writer.write_add_breakpoint(8, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT, line=8) writer.write_evaluate_expression('%s\t%s\t%s' % (hit.thread_id, hit.frame_id, 'LOCAL'), 'a.__var') writer.wait_for_evaluation([ [ 'Hello' in contents assert 'Flask-Jinja-Test' in contents writer.finished_ok = True @pytest.mark.skipif(not TEST_DJANGO, reason='No django available') def test_case_django_a(case_setup_django): def get_environ(writer): env = os.environ.copy() env.update({ 'PYDEVD_FILTER_LIBRARIES': '1', # Global setting for in project or not }) return env with case_setup_django.test_file(EXPECTED_RETURNCODE='any', get_environ=get_environ) as writer: writer.write_make_initial_run() # Wait for the first request that works... for i in range(4): try: t = writer.create_request_thread('my_app') t.start() contents = t.wait_for_contents() contents = contents.replace(' ', '').replace('\r', '').replace('\n', '') assert contents == '
  • v1:v1
  • v2:v2
' break except: if i == 3: raise continue writer.write_add_breakpoint_django(5, None, 'index.html') t = writer.create_request_thread('my_app') t.start() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT, line=5) writer.write_get_variable(hit.thread_id, hit.frame_id, 'entry') writer.wait_for_vars([ '
  • v1:v1
  • v2:v2
  • ' % (contents,)) writer.finished_ok = True @pytest.mark.skipif(not TEST_DJANGO, reason='No django available') def test_case_django_b(case_setup_django): with case_setup_django.test_file(EXPECTED_RETURNCODE='any') as writer: writer.write_add_breakpoint_django(4, None, 'name.html') writer.write_add_exception_breakpoint_django() writer.write_remove_exception_breakpoint_django() writer.write_make_initial_run() t = writer.create_request_thread('my_app/name') time.sleep(5) # Give django some time to get to startup before requesting the page t.start() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT, line=4) writer.write_get_frame(hit.thread_id, hit.frame_id) writer.wait_for_var('', 'f', ''] else: expected_frame_names = ['', 'f', ''] writer.write_get_current_exception(hit.thread_id) msg = writer.wait_for_message(accept_message=lambda msg:'exc_type="' in msg and 'exc_desc="' in msg, unquote_msg=False) frame_names = [unquote(f['name']).replace('<', '<').replace('>', '>') for f in msg.thread.frame] assert frame_names == expected_frame_names writer.write_run_thread(hit.thread_id) if not unhandled: expected_lines = [ writer.get_line_index_with_content('# exc line'), writer.get_line_index_with_content('# call exc'), ] for expected_line in expected_lines: hit = writer.wait_for_breakpoint_hit(REASON_CAUGHT_EXCEPTION) assert hit.line == expected_line writer.write_get_current_exception(hit.thread_id) msg = writer.wait_for_message(accept_message=lambda msg:'exc_type="' in msg and 'exc_desc="' in msg, unquote_msg=False) frame_names = [unquote(f['name']).replace('<', '<').replace('>', '>') for f in msg.thread.frame] assert frame_names == expected_frame_names writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Failing on Jython -- needs to be investigated).') def test_unhandled_exceptions_basic(case_setup): def check_test_suceeded_msg(writer, stdout, stderr): # Don't call super (we have an unhandled exception in the stack trace). return 'TEST SUCEEDED' in ''.join(stdout) and 'TEST SUCEEDED' in ''.join(stderr) def additional_output_checks(writer, stdout, stderr): if 'raise Exception' not in stderr: raise AssertionError('Expected test to have an unhandled exception.\nstdout:\n%s\n\nstderr:\n%s' % ( stdout, stderr)) with case_setup.test_file( '_debugger_case_unhandled_exceptions.py', check_test_suceeded_msg=check_test_suceeded_msg, additional_output_checks=additional_output_checks, EXPECTED_RETURNCODE=1, ) as writer: writer.write_add_exception_breakpoint_with_policy('Exception', "0", "1", "0") writer.write_make_initial_run() def check(hit, exc_type, exc_desc): writer.write_get_current_exception(hit.thread_id) msg = writer.wait_for_message(accept_message=lambda msg:exc_type in msg and 'exc_type="' in msg and 'exc_desc="' in msg, unquote_msg=False) assert unquote(msg.thread['exc_desc']) == exc_desc assert unquote(msg.thread['exc_type']) in ( "<type 'exceptions.%s'>" % (exc_type,), # py2 "<class '%s'>" % (exc_type,) # py3 ) if len(msg.thread.frame) == 0: assert unquote(unquote(msg.thread.frame['file'])).endswith('_debugger_case_unhandled_exceptions.py') else: assert unquote(unquote(msg.thread.frame[0]['file'])).endswith('_debugger_case_unhandled_exceptions.py') writer.write_run_thread(hit.thread_id) # Will stop in 2 background threads hit0 = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) thread_id1 = hit0.thread_id hit1 = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) thread_id2 = hit1.thread_id if hit0.name == 'thread_func2': check(hit0, 'ValueError', 'in thread 2') check(hit1, 'Exception', 'in thread 1') else: check(hit0, 'Exception', 'in thread 1') check(hit1, 'ValueError', 'in thread 2') writer.write_run_thread(thread_id1) writer.write_run_thread(thread_id2) # Will stop in main thread hit = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) assert hit.name == '' thread_id3 = hit.thread_id # Requesting the stack in an unhandled exception should provide the stack of the exception, # not the current location of the program. writer.write_get_thread_stack(thread_id3) msg = writer.wait_for_message(CMD_GET_THREAD_STACK) assert len(msg.thread.frame) == 0 # In main thread (must have no back frames). assert msg.thread.frame['name'] == '' check(hit, 'IndexError', 'in main') writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Failing on Jython -- needs to be investigated).') def test_unhandled_exceptions_in_top_level1(case_setup_unhandled_exceptions): with case_setup_unhandled_exceptions.test_file( '_debugger_case_unhandled_exceptions_on_top_level.py', EXPECTED_RETURNCODE=1, ) as writer: writer.write_add_exception_breakpoint_with_policy('Exception', "0", "1", "0") writer.write_make_initial_run() # Will stop in main thread hit = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Failing on Jython -- needs to be investigated).') def test_unhandled_exceptions_in_top_level2(case_setup_unhandled_exceptions): # Note: expecting unhandled exception to be printed to stderr. def get_environ(writer): env = os.environ.copy() curr_pythonpath = env.get('PYTHONPATH', '') pydevd_dirname = os.path.dirname(writer.get_pydevd_file()) curr_pythonpath = pydevd_dirname + os.pathsep + curr_pythonpath env['PYTHONPATH'] = curr_pythonpath return env def update_command_line_args(writer, args): # Start pydevd with '-m' to see how it deal with being called with # runpy at the start. assert args[0].endswith('pydevd.py') args = ['-m', 'pydevd'] + args[1:] return args with case_setup_unhandled_exceptions.test_file( '_debugger_case_unhandled_exceptions_on_top_level.py', get_environ=get_environ, update_command_line_args=update_command_line_args, EXPECTED_RETURNCODE='any', ) as writer: writer.write_add_exception_breakpoint_with_policy('Exception', "0", "1", "0") writer.write_make_initial_run() # Should stop (only once) in the main thread. hit = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Failing on Jython -- needs to be investigated).') def test_unhandled_exceptions_in_top_level3(case_setup_unhandled_exceptions): with case_setup_unhandled_exceptions.test_file( '_debugger_case_unhandled_exceptions_on_top_level.py', EXPECTED_RETURNCODE=1 ) as writer: # Handled and unhandled # PySide2 has a bug in shibokensupport which will try to do: sys._getframe(1). # during the teardown (which will fail as there's no back frame in this case). # So, mark ignore libraries in this case. writer.write_add_exception_breakpoint_with_policy('Exception', "1", "1", ignore_libraries="1") writer.write_make_initial_run() # Will stop in main thread twice: once one we find that the exception is being # thrown and another in postmortem mode when we discover it's uncaught. hit = writer.wait_for_breakpoint_hit(REASON_CAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Failing on Jython -- needs to be investigated).') def test_unhandled_exceptions_in_top_level4(case_setup_unhandled_exceptions): # Note: expecting unhandled exception to be printed to stderr. with case_setup_unhandled_exceptions.test_file( '_debugger_case_unhandled_exceptions_on_top_level2.py', EXPECTED_RETURNCODE=1, ) as writer: # Handled and unhandled writer.write_add_exception_breakpoint_with_policy('Exception', "1", "1", "0") writer.write_make_initial_run() # We have an exception thrown and handled and another which is thrown and is then unhandled. hit = writer.wait_for_breakpoint_hit(REASON_CAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_CAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='Only for Python.') def test_case_set_next_statement(case_setup): with case_setup.test_file('_debugger_case_set_next_statement.py') as writer: breakpoint_id = writer.write_add_breakpoint(6, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT, line=6) # Stop in line a=3 (before setting it) writer.write_evaluate_expression('%s\t%s\t%s' % (hit.thread_id, hit.frame_id, 'LOCAL'), 'a') writer.wait_for_evaluation('' assert msg.thread.frame['line'] == str(writer.get_line_index_with_content('break line on unhandled exception')) writer.write_run_thread(hit.thread_id) writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(not IS_PY36_OR_GREATER, reason='Requires Python 3.') def test_case_throw_exc_reason_xml(case_setup): def check_test_suceeded_msg(self, stdout, stderr): return 'TEST SUCEEDED' in ''.join(stderr) def additional_output_checks(writer, stdout, stderr): assert "raise RuntimeError('TEST SUCEEDED')" in stderr assert "raise RuntimeError from e" in stderr assert "raise Exception('another while handling')" in stderr with case_setup.test_file( '_debugger_case_raise_with_cause.py', EXPECTED_RETURNCODE=1, check_test_suceeded_msg=check_test_suceeded_msg, additional_output_checks=additional_output_checks ) as writer: writer.write_add_exception_breakpoint_with_policy('Exception', "0", "1", "0") writer.write_make_initial_run() el = writer.wait_for_curr_exc_stack() name_and_lines = [] for frame in el.thread.frame: name_and_lines.append((frame['name'], frame['line'])) assert name_and_lines == [ ('foobar', '20'), ('', '23'), ('[Chained Exc: another while handling] foobar', '18'), ('[Chained Exc: another while handling] handle', '10'), ('[Chained Exc: TEST SUCEEDED] foobar', '16'), ('[Chained Exc: TEST SUCEEDED] method', '6'), ('[Chained Exc: TEST SUCEEDED] method2', '2'), ] hit = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) writer.write_get_thread_stack(hit.thread_id) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='Only for Python.') def test_case_get_next_statement_targets(case_setup): with case_setup.test_file('_debugger_case_get_next_statement_targets.py') as writer: breakpoint_id = writer.write_add_breakpoint(21, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT, line=21) writer.write_get_next_statement_targets(hit.thread_id, hit.frame_id) targets = writer.wait_for_get_next_statement_targets() # Note: 20 may appear as a side-effect of the frame eval # mode (so, we have to ignore it here) -- this isn't ideal, but # it's also not that bad (that line has no code in the source and # executing it will just set the tracing for the method). targets.discard(20) # On Python 3.11 there's now a line 1 (which should be harmless). targets.discard(1) expected = set((2, 3, 5, 8, 9, 10, 12, 13, 14, 15, 17, 18, 19, 21)) assert targets == expected, 'Expected targets to be %s, was: %s' % (expected, targets) writer.write_remove_breakpoint(breakpoint_id) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_IRONPYTHON or IS_JYTHON, reason='Failing on IronPython and Jython (needs to be investigated).') def test_case_type_ext(case_setup): # Custom type presentation extensions def get_environ(self): env = os.environ.copy() python_path = env.get("PYTHONPATH", "") ext_base = debugger_unittest._get_debugger_test_file('my_extensions') env['PYTHONPATH'] = ext_base + os.pathsep + python_path if python_path else ext_base return env with case_setup.test_file('_debugger_case_type_ext.py', get_environ=get_environ) as writer: writer.get_environ = get_environ writer.write_add_breakpoint(7, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') writer.write_get_frame(hit.thread_id, hit.frame_id) assert writer.wait_for_var([ [ r'', r''.format(builtin_qualifier)) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_variable_access(case_setup, pyfile, data_regression): @pyfile def case_custom(): obj = [ tuple(range(9)), [ tuple(range(5)), ] ] print('TEST SUCEEDED') with case_setup.test_file(case_custom) as writer: line = writer.get_line_index_with_content('TEST SUCEEDED') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') writer.write_get_frame(hit.thread_id, hit.frame_id) frame_vars = writer.wait_for_untangled_message( accept_message=lambda cmd_id, untangled: cmd_id == CMD_GET_FRAME) obj_var = [v for v in frame_vars.var if v['name'] == 'obj'][0] assert obj_var['type'] == 'list' assert unquote_plus(obj_var['value']) == ": [(0, 1, 2, 3, 4, 5, 6, 7, 8), [(0, 1, 2, 3, 4)]]" assert obj_var['isContainer'] == "True" def _skip_key_in_dict(key): try: int(key) except ValueError: if 'more' in key or '[' in key: return False return True return False def collect_vars(locator, level=0): writer.write("%s\t%s\t%s\t%s" % (CMD_GET_VARIABLE, writer.next_seq(), hit.thread_id, locator)) obj_vars = writer.wait_for_untangled_message( accept_message=lambda cmd_id, _untangled: cmd_id == CMD_GET_VARIABLE) for v in obj_vars.var: if _skip_key_in_dict(v['name']): continue new_locator = locator + '\t' + v['name'] yield level, v, new_locator if v['isContainer'] == 'True': yield from collect_vars(new_locator, level + 1) found = [] for level, val, _locator in collect_vars('%s\tFRAME\tobj' % hit.frame_id): found.append(((' ' * level) + val['name'] + ': ' + unquote_plus(val['value']))) data_regression.check(found) # Check referrers full_loc = '%s\t%s\t%s' % (hit.thread_id, hit.frame_id, 'FRAME\tobj\t1\t0') writer.write_custom_operation(full_loc, 'EXEC', "from _pydevd_bundle.pydevd_referrers import get_referrer_info", "get_referrer_info") msg = writer.wait_for_untangled_message( double_unquote=True, accept_message=lambda cmd_id, _untangled: cmd_id == CMD_RUN_CUSTOM_OPERATION) msg_vars = msg.var try: msg_vars['found_as'] msg_vars = [msg_vars] except: pass # it's a container. for v in msg_vars: if v['found_as'] == 'list[0]': # In pypy we may have more than one reference, find out the one referrer_id = v['id'] assert int(referrer_id) assert unquote_plus(v['value']) == ": [(0, 1, 2, 3, 4)]" break else: raise AssertionError("Unable to find ref with list[0]. Found: %s" % (msg_vars,)) found = [] by_id_locator = '%s\t%s' % (referrer_id, 'BY_ID') for level, val, _locator in collect_vars(by_id_locator): found.append(((' ' * level) + val['name'] + ': ' + unquote_plus(val['value']))) data_regression.check(found, basename='test_case_variable_access_by_id') writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_IRONPYTHON or IS_JYTHON, reason='Failing on IronPython and Jython (needs to be investigated).') def test_case_event_ext(case_setup): def get_environ(self): env = os.environ.copy() python_path = env.get("PYTHONPATH", "") ext_base = debugger_unittest._get_debugger_test_file('my_extensions') env['PYTHONPATH'] = ext_base + os.pathsep + python_path if python_path else ext_base env["VERIFY_EVENT_TEST"] = "1" return env # Test initialize event for extensions with case_setup.test_file('_debugger_case_event_ext.py', get_environ=get_environ) as writer: original_additional_output_checks = writer.additional_output_checks @overrides(writer.additional_output_checks) def additional_output_checks(stdout, stderr): original_additional_output_checks(stdout, stderr) if 'INITIALIZE EVENT RECEIVED' not in stdout: raise AssertionError('No initialize event received') writer.additional_output_checks = additional_output_checks writer.write_make_initial_run() writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Jython does not seem to be creating thread started inside tracing (investigate).') def test_case_writer_creation_deadlock(case_setup): # check case where there was a deadlock evaluating expressions with case_setup.test_file('_debugger_case_thread_creation_deadlock.py') as writer: writer.write_add_breakpoint(26, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') assert hit.line == 26, 'Expected return to be in line 26, was: %s' % (hit.line,) writer.write_evaluate_expression('%s\t%s\t%s' % (hit.thread_id, hit.frame_id, 'LOCAL'), 'create_thread()') writer.wait_for_evaluation('= 3: binary_junk = binary_junk.decode('utf-8', 'replace') return line.startswith(( 'text', 'binary', 'a', binary_junk, )) writer._ignore_stderr_line = _ignore_stderr_line # Note: writes to stdout and stderr are now synchronous (so, the order # must always be consistent and there's a message for each write). expected = [ 'text\n', 'binary or text\n', 'ação1\n', ] if sys.version_info[0] >= 3: expected.extend(( 'binary\n', 'ação2\n'.encode(encoding='latin1').decode('utf-8', 'replace'), 'ação3\n', )) binary_junk = '\xef\xbf\xbd\xef\xbf\xbd\xef\xbf\xbd\n\n' if sys.version_info[0] >= 3: binary_junk = "\ufffd\ufffd\ufffd\ufffd\ufffd\n\n" expected.append(binary_junk) new_expected = [(x, 'stdout') for x in expected] new_expected.extend([(x, 'stderr') for x in expected]) writer.write_start_redirect() writer.write_make_initial_run() msgs = [] ignored = [] while len(msgs) < len(new_expected): try: msg = writer.wait_for_output() except AssertionError: for msg in msgs: sys.stderr.write('Found: %s\n' % (msg,)) for msg in new_expected: sys.stderr.write('Expected: %s\n' % (msg,)) for msg in ignored: sys.stderr.write('Ignored: %s\n' % (msg,)) raise if msg not in new_expected: ignored.append(msg) continue msgs.append(msg) if msgs != new_expected: print(msgs) print(new_expected) assert msgs == new_expected writer.finished_ok = True def _path_equals(path1, path2): path1 = pydevd_file_utils.normcase(path1) path2 = pydevd_file_utils.normcase(path2) return path1 == path2 @pytest.mark.parametrize('mixed_case', [True, False] if sys.platform == 'win32' else [False]) def test_path_translation(case_setup, mixed_case): def get_file_in_client(writer): # Instead of using: test_python/_debugger_case_path_translation.py # we'll set the breakpoints at foo/_debugger_case_path_translation.py file_in_client = os.path.dirname(os.path.dirname(writer.TEST_FILE)) return os.path.join(os.path.dirname(file_in_client), 'foo', '_debugger_case_path_translation.py') def get_environ(writer): import json env = os.environ.copy() env["PYTHONIOENCODING"] = 'utf-8' assert writer.TEST_FILE.endswith('_debugger_case_path_translation.py') file_in_client = get_file_in_client(writer) if mixed_case: new_file_in_client = ''.join([file_in_client[i].upper() if i % 2 == 0 else file_in_client[i].lower() for i in range(len(file_in_client))]) assert _path_equals(file_in_client, new_file_in_client) env["PATHS_FROM_ECLIPSE_TO_PYTHON"] = json.dumps([ ( os.path.dirname(file_in_client), os.path.dirname(writer.TEST_FILE) ) ]) return env with case_setup.test_file('_debugger_case_path_translation.py', get_environ=get_environ) as writer: from tests_python.debugger_unittest import CMD_LOAD_SOURCE writer.write_start_redirect() file_in_client = get_file_in_client(writer) assert 'tests_python' not in file_in_client writer.write_add_breakpoint( writer.get_line_index_with_content('break here'), 'call_this', filename=file_in_client) writer.write_make_initial_run() xml = writer.wait_for_message(lambda msg:'stop_reason="111"' in msg) assert xml.thread.frame[0]['file'] == file_in_client thread_id = xml.thread['id'] # Request a file that exists files_to_match = [file_in_client] if IS_WINDOWS: files_to_match.append(file_in_client.upper()) for f in files_to_match: writer.write_load_source(f) writer.wait_for_message( lambda msg: '%s\t' % CMD_LOAD_SOURCE in msg and \ "def main():" in msg and \ "print('break here')" in msg and \ "print('TEST SUCEEDED!')" in msg , expect_xml=False) # Request a file that does not exist writer.write_load_source(file_in_client + 'not_existent.py') writer.wait_for_message( lambda msg:'901\t' in msg and ('FileNotFoundError' in msg or 'IOError' in msg), expect_xml=False) writer.write_run_thread(thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') def test_linecache_xml(case_setup, tmpdir): from _pydevd_bundle.pydevd_comm_constants import CMD_LOAD_SOURCE_FROM_FRAME_ID with case_setup.test_file('_debugger_case_linecache.py') as writer: writer.write_add_breakpoint(writer.get_line_index_with_content('breakpoint')) writer.write_make_initial_run() # First hit is for breakpoint reached via a stack frame that doesn't have source. hit = writer.wait_for_breakpoint_hit() writer.write_get_thread_stack(hit.thread_id) msg = writer.wait_for_get_thread_stack_message() frame_ids = set() for frame in msg.thread.frame: if frame['file'] == '': frame_ids.add(frame['id']) assert len(frame_ids) == 2 for frame_id in frame_ids: writer.write_load_source_from_frame_id(frame_id) writer.wait_for_message( lambda msg: '%s\t' % CMD_LOAD_SOURCE_FROM_FRAME_ID in msg and ( "[x for x in range(10)]" in msg and "def somemethod():" in msg ) , expect_xml=False) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') def test_show_bytecode_xml(case_setup, tmpdir): from _pydevd_bundle.pydevd_comm_constants import CMD_LOAD_SOURCE_FROM_FRAME_ID with case_setup.test_file('_debugger_case_show_bytecode.py') as writer: writer.write_add_breakpoint(writer.get_line_index_with_content('breakpoint')) writer.write_make_initial_run() # First hit is for breakpoint reached via a stack frame that doesn't have source. hit = writer.wait_for_breakpoint_hit() writer.write_get_thread_stack(hit.thread_id) msg = writer.wait_for_get_thread_stack_message() frame_ids = set() for frame in msg.thread.frame: if frame['file'] == '': frame_ids.add(frame['id']) assert len(frame_ids) == 2 for frame_id in frame_ids: writer.write_load_source_from_frame_id(frame_id) writer.wait_for_message( lambda msg: '%s\t' % CMD_LOAD_SOURCE_FROM_FRAME_ID in msg and ( "MyClass" in msg or "foo()" in msg ) , expect_xml=False) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_evaluate_errors(case_setup): with case_setup.test_file('_debugger_case_local_variables.py') as writer: writer.write_add_breakpoint(writer.get_line_index_with_content('Break here'), 'Call') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_evaluate_expression('%s\t%s\t%s' % (thread_id, frame_id, 'LOCAL'), 'name_error') writer.wait_for_evaluation('' else: assert len(msg.thread.frame) > 1 # Stopped in threading (must have back frames). assert msg.thread.frame[0]['name'] == 'method' writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_dump_threads_to_stderr(case_setup): from tests_python.debugger_unittest import wait_for_condition def additional_output_checks(writer, stdout, stderr): assert is_stderr_ok(stderr), make_error_msg(stderr) def make_error_msg(stderr): return 'Did not find thread dump in stderr. stderr:\n%s' % (stderr,) def is_stderr_ok(stderr): return 'Thread Dump' in stderr and 'Thread pydevd.CommandThread (daemon: True, pydevd thread: True)' in stderr with case_setup.test_file( '_debugger_case_get_thread_stack.py', additional_output_checks=additional_output_checks) as writer: writer.write_add_breakpoint(12, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT) writer.write_dump_threads() wait_for_condition( lambda: is_stderr_ok(writer.get_stderr()), lambda: make_error_msg(writer.get_stderr()) ) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_stop_on_start_regular(case_setup): with case_setup.test_file('_debugger_case_simple_calls.py') as writer: writer.write_stop_on_start() writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_START, file='_debugger_case_simple_calls.py', line=1) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def _get_breakpoint_cases(): if sys.version_info >= (3, 7): # Just check breakpoint() return ('_debugger_case_breakpoint.py',) else: # Check breakpoint() and sys.__breakpointhook__ replacement. return ('_debugger_case_breakpoint.py', '_debugger_case_breakpoint2.py') @pytest.mark.parametrize("filename", _get_breakpoint_cases()) def test_py_37_breakpoint(case_setup, filename): with case_setup.test_file(filename) as writer: writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(file=filename, line=3) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def _get_generator_cases(): # On py3 we should check both versions. return ( '_debugger_case_generator_py2.py', '_debugger_case_generator_py3.py', ) @pytest.mark.parametrize("filename", _get_generator_cases()) def test_generator_cases(case_setup, filename): with case_setup.test_file(filename) as writer: writer.write_add_breakpoint(writer.get_line_index_with_content('break here')) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_stop_on_start_m_switch(case_setup_m_switch): with case_setup_m_switch.test_file() as writer: writer.write_stop_on_start() writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_START, file='_debugger_case_m_switch.py', line=1) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_stop_on_start_entry_point(case_setup_m_switch_entry_point): with case_setup_m_switch_entry_point.test_file() as writer: writer.write_stop_on_start() writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_START, file='_debugger_case_module_entry_point.py', line=1) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Not working properly on Jython (needs investigation).') def test_debug_zip_files(case_setup, tmpdir): def get_environ(writer): env = os.environ.copy() curr_pythonpath = env.get('PYTHONPATH', '') curr_pythonpath = str(tmpdir.join('myzip.zip')) + os.pathsep + curr_pythonpath curr_pythonpath = str(tmpdir.join('myzip2.egg!')) + os.pathsep + curr_pythonpath env['PYTHONPATH'] = curr_pythonpath env["IDE_PROJECT_ROOTS"] = str(tmpdir.join('myzip.zip')) return env import zipfile zip_file = zipfile.ZipFile( str(tmpdir.join('myzip.zip')), 'w') zip_file.writestr('zipped/__init__.py', '') zip_file.writestr('zipped/zipped_contents.py', 'def call_in_zip():\n return 1') zip_file.close() zip_file = zipfile.ZipFile( str(tmpdir.join('myzip2.egg!')), 'w') zip_file.writestr('zipped2/__init__.py', '') zip_file.writestr('zipped2/zipped_contents2.py', 'def call_in_zip2():\n return 1') zip_file.close() with case_setup.test_file('_debugger_case_zip_files.py', get_environ=get_environ) as writer: writer.write_add_breakpoint( 2, 'None', filename=os.path.join(str(tmpdir.join('myzip.zip')), 'zipped', 'zipped_contents.py') ) writer.write_add_breakpoint( 2, 'None', filename=os.path.join(str(tmpdir.join('myzip2.egg!')), 'zipped2', 'zipped_contents2.py') ) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() assert hit.name == 'call_in_zip' writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit() assert hit.name == 'call_in_zip2' writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') @pytest.mark.parametrize('file_to_check', [ '_debugger_case_multiprocessing_2.py', '_debugger_case_multiprocessing.py', '_debugger_case_python_c.py', '_debugger_case_multiprocessing_pool.py' ]) def test_multiprocessing_simple(case_setup_multiprocessing, file_to_check): import threading from tests_python.debugger_unittest import AbstractWriterThread with case_setup_multiprocessing.test_file(file_to_check) as writer: break1_line = writer.get_line_index_with_content('break 1 here') break2_line = writer.get_line_index_with_content('break 2 here') writer.write_add_breakpoint(break1_line) writer.write_add_breakpoint(break2_line) server_socket = writer.server_socket class SecondaryProcessWriterThread(AbstractWriterThread): TEST_FILE = writer.get_main_filename() _sequence = -1 class SecondaryProcessThreadCommunication(threading.Thread): def run(self): from tests_python.debugger_unittest import ReaderThread expected_connections = 1 for _ in range(expected_connections): server_socket.listen(1) self.server_socket = server_socket new_sock, addr = server_socket.accept() reader_thread = ReaderThread(new_sock) reader_thread.name = ' *** Multiprocess Reader Thread' reader_thread.start() writer2 = SecondaryProcessWriterThread() writer2.reader_thread = reader_thread writer2.sock = new_sock writer2.write_version() writer2.write_add_breakpoint(break1_line) writer2.write_add_breakpoint(break2_line) writer2.write_make_initial_run() hit = writer2.wait_for_breakpoint_hit() writer2.write_run_thread(hit.thread_id) secondary_process_thread_communication = SecondaryProcessThreadCommunication() secondary_process_thread_communication.start() writer.write_make_initial_run() hit2 = writer.wait_for_breakpoint_hit() secondary_process_thread_communication.join(10) if secondary_process_thread_communication.is_alive(): raise AssertionError('The SecondaryProcessThreadCommunication did not finish') writer.write_run_thread(hit2.thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') @pytest.mark.parametrize('count', range(5)) # Call multiple times to exercise timing issues. def test_multiprocessing_with_stopped_breakpoints(case_setup_multiprocessing, count, debugger_runner_simple): import threading from tests_python.debugger_unittest import AbstractWriterThread with case_setup_multiprocessing.test_file('_debugger_case_multiprocessing_stopped_threads.py') as writer: break_main_line = writer.get_line_index_with_content('break in main here') break_thread_line = writer.get_line_index_with_content('break in thread here') break_process_line = writer.get_line_index_with_content('break in process here') writer.write_add_breakpoint(break_main_line) writer.write_add_breakpoint(break_thread_line) writer.write_add_breakpoint(break_process_line) server_socket = writer.server_socket listening_event = threading.Event() class SecondaryProcessWriterThread(AbstractWriterThread): TEST_FILE = writer.get_main_filename() _sequence = -1 class SecondaryProcessThreadCommunication(threading.Thread): def run(self): from tests_python.debugger_unittest import ReaderThread server_socket.listen(1) self.server_socket = server_socket listening_event.set() writer.log.append(' *** Multiprocess waiting on server_socket.accept()') new_sock, addr = server_socket.accept() reader_thread = ReaderThread(new_sock) reader_thread.name = ' *** Multiprocess Reader Thread' reader_thread.start() writer.log.append(' *** Multiprocess started ReaderThread') writer2 = SecondaryProcessWriterThread() writer2._WRITE_LOG_PREFIX = ' *** Multiprocess write: ' writer2.log = writer.log writer2.reader_thread = reader_thread writer2.sock = new_sock writer2.write_version() writer2.write_add_breakpoint(break_main_line) writer2.write_add_breakpoint(break_thread_line) writer2.write_add_breakpoint(break_process_line) writer2.write_make_initial_run() hit = writer2.wait_for_breakpoint_hit() writer2.write_run_thread(hit.thread_id) secondary_process_thread_communication = SecondaryProcessThreadCommunication() secondary_process_thread_communication.start() ok = listening_event.wait(timeout=10) assert ok writer.write_make_initial_run() hit2 = writer.wait_for_breakpoint_hit() # Breaks in thread. writer.write_step_over(hit2.thread_id) hit2 = writer.wait_for_breakpoint_hit(REASON_STEP_OVER) # line == event.set() # paused on breakpoint, will start process and pause on main thread # in the main process too. writer.write_step_over(hit2.thread_id) # Note: ignore the step over hit (go only for the breakpoint hit). main_hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT) secondary_process_thread_communication.join(10) if secondary_process_thread_communication.is_alive(): raise AssertionError('The SecondaryProcessThreadCommunication did not finish') writer.write_run_thread(hit2.thread_id) writer.write_run_thread(main_hit.thread_id) # We must have found at least 2 debug files when doing multiprocessing (one for # each pid). assert len(pydev_log.list_log_files(debugger_runner_simple.pydevd_debug_file)) == 2 writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') @pytest.mark.parametrize('target', [ '_debugger_case_quoting.py', '_debugger_case_subprocess_zip.py' ]) def test_subprocess_quoted_args(case_setup_multiprocessing, target): from tests_python.debugger_unittest import AbstractWriterThread with case_setup_multiprocessing.test_file(target) as writer: break_subprocess_line = writer.get_line_index_with_content('break here') writer.write_add_breakpoint(break_subprocess_line) server_socket = writer.server_socket class SecondaryProcessWriterThread(AbstractWriterThread): TEST_FILE = writer.get_main_filename() _sequence = -1 class SecondaryProcessThreadCommunication(threading.Thread): def run(self): from tests_python.debugger_unittest import ReaderThread # Note: on linux on Python 2 because on Python 2 CPython subprocess.call will actually # create a fork first (at which point it'll connect) and then, later on it'll # call the main (as if it was a clean process as if PyDB wasn't created # the first time -- the debugger will still work, but it'll do an additional # connection. expected_connections = 1 for _ in range(expected_connections): server_socket.listen(1) self.server_socket = server_socket new_sock, addr = server_socket.accept() reader_thread = ReaderThread(new_sock) reader_thread.name = ' *** Multiprocess Reader Thread' reader_thread.start() writer2 = SecondaryProcessWriterThread() writer2.reader_thread = reader_thread writer2.sock = new_sock writer2.write_version() writer2.write_add_breakpoint(break_subprocess_line) writer2.write_make_initial_run() hit = writer2.wait_for_breakpoint_hit() writer2.write_run_thread(hit.thread_id) secondary_process_thread_communication = SecondaryProcessThreadCommunication() secondary_process_thread_communication.start() writer.write_make_initial_run() secondary_process_thread_communication.join(10) if secondary_process_thread_communication.is_alive(): raise AssertionError('The SecondaryProcessThreadCommunication did not finish') writer.finished_ok = True def _attach_to_writer_pid(writer): import pydevd assert writer.process is not None def attach(): attach_pydevd_file = os.path.join(os.path.dirname(pydevd.__file__), 'pydevd_attach_to_process', 'attach_pydevd.py') subprocess.call([sys.executable, attach_pydevd_file, '--pid', str(writer.process.pid), '--port', str(writer.port)]) threading.Thread(target=attach).start() wait_for_condition(lambda: writer.finished_initialization) @pytest.mark.skipif(not IS_CPYTHON or IS_MAC, reason='CPython only test (brittle on Mac).') @pytest.mark.parametrize('reattach', [True, False]) def test_attach_to_pid_no_threads(case_setup_remote, reattach): with case_setup_remote.test_file('_debugger_case_attach_to_pid_simple.py', wait_for_port=False) as writer: time.sleep(1) # Give it some time to initialize to get to the while loop. _attach_to_writer_pid(writer) bp_line = writer.get_line_index_with_content('break here') bp_id = writer.write_add_breakpoint(bp_line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(line=bp_line) if reattach: # This would be the same as a second attach to pid, so, the idea is closing the current # connection and then doing a new attach to pid. writer.write_remove_breakpoint(bp_id) writer.write_run_thread(hit.thread_id) writer.do_kill() # This will simply close the open sockets without doing anything else. time.sleep(1) t = threading.Thread(target=writer.start_socket) t.start() wait_for_condition(lambda: hasattr(writer, 'port')) time.sleep(1) writer.process = writer.process _attach_to_writer_pid(writer) wait_for_condition(lambda: hasattr(writer, 'reader_thread')) time.sleep(1) bp_id = writer.write_add_breakpoint(bp_line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(line=bp_line) writer.write_change_variable(hit.thread_id, hit.frame_id, 'wait', 'False') writer.wait_for_var(' 1 else: assert len(msg) == 1 writer.finished_ok = True @pytest.mark.skipif(not TEST_GEVENT, reason='Gevent not installed.') def test_gevent_remote(case_setup_remote): def get_environ(writer): env = os.environ.copy() env['GEVENT_SUPPORT'] = 'True' return env with case_setup_remote.test_file('_debugger_case_gevent.py', get_environ=get_environ, append_command_line_args=['remote']) as writer: writer.write_add_breakpoint(writer.get_line_index_with_content('break here')) writer.write_make_initial_run() for _i in range(10): hit = writer.wait_for_breakpoint_hit(name='run') writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_return_value(case_setup): with case_setup.test_file('_debugger_case_return_value.py') as writer: break_line = writer.get_line_index_with_content('break here') writer.write_add_breakpoint(break_line) writer.write_show_return_vars() writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(name='main', line=break_line) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_STEP_OVER, name='main', line=break_line + 1) writer.write_get_frame(hit.thread_id, hit.frame_id) writer.wait_for_vars([ [ '' writer.write_step_over_my_code(hit.thread_id) hit = writer.wait_for_breakpoint_hit(reason=REASON_STEP_OVER_MY_CODE) assert hit.name == '' writer.write_step_over_my_code(hit.thread_id) writer.finished_ok = True @pytest.fixture( params=[ 'step_over', 'step_return', 'step_in', ] ) def step_method(request): return request.param def test_sysexit_on_filtered_file(case_setup): def get_environ(writer): env = os.environ.copy() env.update({'PYDEVD_FILTERS': json.dumps({'**/_debugger_case_sysexit.py': True})}) return env with case_setup.test_file('_debugger_case_sysexit.py', get_environ=get_environ, EXPECTED_RETURNCODE=1) as writer: writer.write_add_exception_breakpoint_with_policy( 'SystemExit', notify_on_handled_exceptions=1, # Notify multiple times notify_on_unhandled_exceptions=1, ignore_libraries=0 ) writer.write_make_initial_run() writer.finished_ok = True @pytest.mark.parametrize("scenario", [ 'handled_once', 'handled_multiple', 'unhandled', ]) def test_exception_not_on_filtered_file(case_setup, scenario): def get_environ(writer): env = os.environ.copy() env.update({'PYDEVD_FILTERS': json.dumps({'**/other.py': True})}) return env def check_test_suceeded_msg(writer, stdout, stderr): return 'TEST SUCEEDED' in ''.join(stderr) def additional_output_checks(writer, stdout, stderr): if 'raise RuntimeError' not in stderr: raise AssertionError('Expected test to have an unhandled exception.\nstdout:\n%s\n\nstderr:\n%s' % ( stdout, stderr)) with case_setup.test_file( 'my_code/my_code_exception.py', get_environ=get_environ, EXPECTED_RETURNCODE='any', check_test_suceeded_msg=check_test_suceeded_msg, additional_output_checks=additional_output_checks, ) as writer: if scenario == 'handled_once': writer.write_add_exception_breakpoint_with_policy( 'RuntimeError', notify_on_handled_exceptions=2, # Notify only once notify_on_unhandled_exceptions=0, ignore_libraries=0 ) elif scenario == 'handled_multiple': writer.write_add_exception_breakpoint_with_policy( 'RuntimeError', notify_on_handled_exceptions=1, # Notify multiple times notify_on_unhandled_exceptions=0, ignore_libraries=0 ) elif scenario == 'unhandled': writer.write_add_exception_breakpoint_with_policy( 'RuntimeError', notify_on_handled_exceptions=0, notify_on_unhandled_exceptions=1, ignore_libraries=0 ) writer.write_make_initial_run() for _i in range(3 if scenario == 'handled_multiple' else 1): hit = writer.wait_for_breakpoint_hit( REASON_UNCAUGHT_EXCEPTION if scenario == 'unhandled' else REASON_CAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_exception_on_filtered_file(case_setup): def get_environ(writer): env = os.environ.copy() env.update({'PYDEVD_FILTERS': json.dumps({'**/other.py': True})}) return env def check_test_suceeded_msg(writer, stdout, stderr): return 'TEST SUCEEDED' in ''.join(stderr) def additional_output_checks(writer, stdout, stderr): if 'raise RuntimeError' not in stderr: raise AssertionError('Expected test to have an unhandled exception.\nstdout:\n%s\n\nstderr:\n%s' % ( stdout, stderr)) with case_setup.test_file( 'my_code/my_code_exception_on_other.py', get_environ=get_environ, EXPECTED_RETURNCODE='any', check_test_suceeded_msg=check_test_suceeded_msg, additional_output_checks=additional_output_checks, ) as writer: writer.write_add_exception_breakpoint_with_policy( 'RuntimeError', notify_on_handled_exceptions=2, # Notify only once notify_on_unhandled_exceptions=1, ignore_libraries=0 ) writer.write_make_initial_run() # Note: the unhandled exception was initially raised in a file which is filtered out, but we # should be able to see the frames which are part of the project. hit = writer.wait_for_breakpoint_hit( REASON_UNCAUGHT_EXCEPTION, file='my_code_exception_on_other.py', line=writer.get_line_index_with_content('other.raise_exception()') ) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.parametrize("environ", [ {'PYDEVD_FILTER_LIBRARIES': '1'}, # Global setting for step over {'PYDEVD_FILTERS': json.dumps({'**/other.py': True})}, # specify as json {'PYDEVD_FILTERS': '**/other.py'}, # specify ';' separated list ]) @pytest.mark.skipif(IS_JYTHON, reason='Flaky on Jython.') def test_step_over_my_code_global_settings(case_setup, environ, step_method): def get_environ(writer): env = os.environ.copy() env.update(environ) return env def do_step(): if step_method == 'step_over': writer.write_step_over(hit.thread_id) return REASON_STEP_OVER # Note: goes from step over to step into elif step_method == 'step_return': writer.write_step_return(hit.thread_id) return REASON_STEP_RETURN else: assert step_method == 'step_in' writer.write_step_in(hit.thread_id) return REASON_STEP_INTO with case_setup.test_file('my_code/my_code.py', get_environ=get_environ) as writer: writer.write_set_project_roots([debugger_unittest._get_debugger_test_file('my_code')]) writer.write_add_breakpoint(writer.get_line_index_with_content('break here')) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit(reason=REASON_STEP_INTO) assert hit.name == 'callback1' writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit(reason=REASON_STEP_INTO) assert hit.name == 'callback2' stop_reason = do_step() hit = writer.wait_for_breakpoint_hit(reason=stop_reason) assert hit.name == 'callback1' stop_reason = do_step() hit = writer.wait_for_breakpoint_hit(reason=stop_reason) assert hit.name == '' if IS_JYTHON: # Jython may get to exit functions, so, just resume the thread. writer.write_run_thread(hit.thread_id) else: stop_reason = do_step() if step_method != 'step_return': stop_reason = do_step() if step_method == 'step_over': stop_reason = REASON_STEP_OVER hit = writer.wait_for_breakpoint_hit(reason=stop_reason) assert hit.name == '' writer.write_step_over(hit.thread_id) writer.finished_ok = True def test_step_over_my_code_global_setting_and_explicit_include(case_setup): def get_environ(writer): env = os.environ.copy() env.update({ 'PYDEVD_FILTER_LIBRARIES': '1', # Global setting for in project or not # specify as json (force include). 'PYDEVD_FILTERS': json.dumps({'**/other.py': False}) }) return env with case_setup.test_file('my_code/my_code.py', get_environ=get_environ) as writer: writer.write_set_project_roots([debugger_unittest._get_debugger_test_file('my_code')]) writer.write_add_breakpoint(writer.get_line_index_with_content('break here')) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit(reason=REASON_STEP_INTO) # Although we filtered out non-project files, other.py is explicitly included. assert hit.name == 'call_me_back1' writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_access_token(case_setup): def update_command_line_args(self, args): i = args.index('--client') assert i > 0 args.insert(i, '--access-token') args.insert(i + 1, 'bar123') args.insert(i, '--client-access-token') args.insert(i + 1, 'foo234') return args with case_setup.test_file('_debugger_case_print.py', update_command_line_args=update_command_line_args) as writer: writer.write_add_breakpoint(1, 'None') # I.e.: should not work (not authenticated). writer.wait_for_message(lambda msg:'Client not authenticated.' in msg, expect_xml=False) writer.write_authenticate(access_token='bar123', client_access_token='foo234') writer.write_version() writer.write_make_initial_run() writer.finished_ok = True def test_namedtuple(case_setup): ''' Check that we don't step into in the namedtuple constructor. ''' with case_setup.test_file('_debugger_case_namedtuple.py') as writer: line = writer.get_line_index_with_content('break here') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() expected_line = line for _ in range(2): expected_line += 1 writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_INTO, file='_debugger_case_namedtuple.py', line=expected_line) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_matplotlib_activation(case_setup): try: import matplotlib except ImportError: return def get_environ(writer): env = os.environ.copy() env.update({ 'IPYTHONENABLE': 'True', }) return env with case_setup.test_file('_debugger_case_matplotlib.py', get_environ=get_environ) as writer: writer.write_add_breakpoint(writer.get_line_index_with_content('break here')) writer.write_make_initial_run() for _ in range(3): hit = writer.wait_for_breakpoint_hit() writer.write_run_thread(hit.thread_id) writer.finished_ok = True _GENERATOR_FILES = [ '_debugger_case_generator3.py', '_debugger_case_generator.py', '_debugger_case_generator2.py', ] @pytest.mark.parametrize('target_filename', _GENERATOR_FILES) @pytest.mark.skipif(IS_JYTHON, reason='We do not detect generator returns on Jython.') def test_generator_step_over_basic(case_setup, target_filename): with case_setup.test_file(target_filename) as writer: line = writer.get_line_index_with_content('break here') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() # Note: not using for so that we know which step failed in the ci if it fails. writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_OVER, file=target_filename, line=writer.get_line_index_with_content('step 1') ) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_OVER, file=target_filename, line=writer.get_line_index_with_content('step 2') ) if IS_PY38_OR_GREATER and target_filename == '_debugger_case_generator2.py': # On py 3.8 it goes back to the return line. writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_OVER, file=target_filename, line=writer.get_line_index_with_content('return \\') ) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_OVER, file=target_filename, line=writer.get_line_index_with_content('step 3') ) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.parametrize('target_filename', _GENERATOR_FILES) @pytest.mark.skipif(IS_JYTHON, reason='We do not detect generator returns on Jython.') def test_generator_step_return(case_setup, target_filename): with case_setup.test_file(target_filename) as writer: line = writer.get_line_index_with_content('break here') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() # Note: not using for so that we know which step failed in the ci if it fails. writer.write_step_return(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_RETURN, file=target_filename, line=writer.get_line_index_with_content('generator return') ) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_OVER, file=target_filename, line=writer.get_line_index_with_content('step 3') ) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_PY36_OR_GREATER, reason='Only CPython 3.6 onwards') def test_stepin_not_my_code_coroutine(case_setup): def get_environ(writer): environ = {'PYDEVD_FILTERS': '{"**/not_my_coroutine.py": true}'} env = os.environ.copy() env.update(environ) return env with case_setup.test_file('my_code/my_code_coroutine.py', get_environ=get_environ) as writer: writer.write_set_project_roots([debugger_unittest._get_debugger_test_file('my_code')]) writer.write_add_breakpoint(writer.get_line_index_with_content('break here')) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit(reason=REASON_STEP_INTO) assert hit.name == 'main' writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Flaky on Jython') def test_generator_step_in(case_setup): with case_setup.test_file('_debugger_case_generator_step_in.py') as writer: line = writer.get_line_index_with_content('stop 1') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() for i in range(2, 5): writer.write_step_in(hit.thread_id) kwargs = {} if not IS_JYTHON: kwargs['line'] = writer.get_line_index_with_content('stop %s' % (i,)) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_INTO, file='_debugger_case_generator_step_in.py', **kwargs ) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.parametrize( 'target_filename', [ '_debugger_case_asyncio.py', '_debugger_case_trio.py', ] ) @pytest.mark.skipif(not IS_CPYTHON or not IS_PY36_OR_GREATER, reason='Only CPython 3.6 onwards') def test_asyncio_step_over_basic(case_setup, target_filename): with case_setup.test_file(target_filename) as writer: line = writer.get_line_index_with_content('break main') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_OVER, file=target_filename, line=writer.get_line_index_with_content('step main') ) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.parametrize( 'target_filename', [ '_debugger_case_asyncio.py', '_debugger_case_trio.py', ] ) @pytest.mark.skipif(not IS_CPYTHON or not IS_PY36_OR_GREATER, reason='Only CPython 3.6 onwards') def test_asyncio_step_over_end_of_function(case_setup, target_filename): with case_setup.test_file(target_filename) as writer: line = writer.get_line_index_with_content('break count 2') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_OVER, name=('sleep', 'wait_task_rescheduled'), ) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.parametrize( 'target_filename', [ '_debugger_case_asyncio.py', '_debugger_case_trio.py', ] ) @pytest.mark.skipif(not IS_CPYTHON or not IS_PY36_OR_GREATER, reason='Only CPython 3.6 onwards') def test_asyncio_step_in(case_setup, target_filename): with case_setup.test_file(target_filename) as writer: line = writer.get_line_index_with_content('break count 1') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_step_return(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_RETURN, file=target_filename, line=writer.get_line_index_with_content('break main') ) writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_INTO, name=('sleep', 'wait_task_rescheduled'), ) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.parametrize( 'target_filename', [ '_debugger_case_asyncio.py', '_debugger_case_trio.py', ] ) @pytest.mark.skipif(not IS_CPYTHON or not IS_PY36_OR_GREATER, reason='Only CPython 3.6 onwards') def test_asyncio_step_return(case_setup, target_filename): with case_setup.test_file(target_filename) as writer: line = writer.get_line_index_with_content('break count 1') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_step_return(hit.thread_id) hit = writer.wait_for_breakpoint_hit( reason=REASON_STEP_RETURN, file=target_filename, line=writer.get_line_index_with_content('break main') ) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_notify_stdin(case_setup, pyfile): @pyfile def case_stdin(): import sys print('Write something:') contents = sys.stdin.readline() print('Found: ' + contents) print('TEST SUCEEDED') def additional_output_checks(writer, stdout, stderr): assert 'Found: foo' in stdout with case_setup.test_file( case_stdin, additional_output_checks=additional_output_checks, ) as writer: writer.write_make_initial_run() msg = writer.wait_for_message(CMD_INPUT_REQUESTED, expect_xml=False) assert msg.split('\t')[-1] == 'True' process = writer.process process.stdin.write(b'foo\n') process.stdin.flush() msg = writer.wait_for_message(CMD_INPUT_REQUESTED, expect_xml=False) assert msg.split('\t')[-1] == 'False' writer.finished_ok = True def test_frame_eval_mode_corner_case_01(case_setup): with case_setup.test_file( 'wrong_bytecode/_debugger_case_wrong_bytecode.py', ) as writer: line = writer.get_line_index_with_content('break here') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(line=writer.get_line_index_with_content('break here'), file='_debugger_case_wrong_bytecode.py') writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(line=writer.get_line_index_with_content('step 1'), file='_debugger_case_wrong_bytecode.py', reason=REASON_STEP_OVER) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(line=writer.get_line_index_with_content('step 2'), file='_debugger_case_wrong_bytecode.py', reason=REASON_STEP_OVER) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(line=writer.get_line_index_with_content('step 3'), file='_debugger_case_wrong_bytecode.py', reason=REASON_STEP_OVER) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(line=writer.get_line_index_with_content('step 4'), file='_debugger_case_wrong_bytecode.py', reason=REASON_STEP_OVER) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(line=writer.get_line_index_with_content('step 5'), file='_debugger_case_wrong_bytecode.py', reason=REASON_STEP_OVER) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_PY36_OR_GREATER, reason='Only CPython 3.6 onwards') def test_frame_eval_mode_corner_case_02(case_setup): with case_setup.test_file( '_bytecode_super.py', ) as writer: line = writer.get_line_index_with_content('break here') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(line=line, file='_bytecode_super.py') writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_PY36_OR_GREATER, reason='Only CPython 3.6 onwards') def test_frame_eval_mode_corner_case_03(case_setup): with case_setup.test_file( '_bytecode_constructs.py', ) as writer: line = writer.get_line_index_with_content('break while') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(line=line) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(line=line + 1, reason=REASON_STEP_OVER) writer.write_step_over(hit.thread_id) # i.e.: check that the jump target is still ok. hit = writer.wait_for_breakpoint_hit(line=line, reason=REASON_STOP_ON_BREAKPOINT) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(line=line + 1, reason=REASON_STEP_OVER) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(line=line, reason=REASON_STOP_ON_BREAKPOINT) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_PY36_OR_GREATER, reason='Only CPython 3.6 onwards') def test_frame_eval_mode_corner_case_04(case_setup): with case_setup.test_file( '_bytecode_constructs.py', ) as writer: line = writer.get_line_index_with_content('break for') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(line=line) writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit(line=line) writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit(line=line) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_PY36_OR_GREATER, reason='Only CPython 3.6 onwards') @pytest.mark.parametrize( 'break_name', [ 'break except', 'break with', 'break try 1', 'break try 2', 'break finally 1', 'break except 2', 'break finally 2', 'break finally 3', 'break finally 4', 'break in dict', 'break else', ] ) def test_frame_eval_mode_corner_case_many(case_setup, break_name): if break_name == 'break finally 4' and sys.version_info[:2] == (3, 9): # This case is currently failing in Python 3.9 return # Check the constructs where we stop only once and proceed. with case_setup.test_file( '_bytecode_constructs.py', ) as writer: line = writer.get_line_index_with_content(break_name) writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(line=line) writer.write_run_thread(hit.thread_id) if break_name == 'break with': if sys.version_info[:2] >= (3, 10): # On Python 3.10 it'll actually backtrack for the # with and thus will execute the line where the # 'with' statement was started again. hit = writer.wait_for_breakpoint_hit(line=line) writer.write_run_thread(hit.thread_id) writer.finished_ok = True check_shadowed = [ ( u''' if __name__ == '__main__': import queue print(queue) ''', 'queue.py', u'shadowed = True\n' ), ( u''' if __name__ == '__main__': import queue print(queue) ''', 'queue.py', u'raise AssertionError("error on import")' ) ] @pytest.mark.parametrize('module_name_and_content', check_shadowed) def test_debugger_shadowed_imports(case_setup, tmpdir, module_name_and_content): main_content, module_name, content = module_name_and_content target = tmpdir.join('main.py') shadowed = tmpdir.join(module_name) target.write_text(main_content, encoding='utf-8') shadowed.write_text(content, encoding='utf-8') def get_environ(writer): env = os.environ.copy() env.update({ 'PYTHONPATH': str(tmpdir), }) return env try: with case_setup.test_file( str(target), get_environ=get_environ, wait_for_initialization=False, ) as writer: writer.write_make_initial_run() except AssertionError: pass # This is expected as pydevd didn't start-up. assert ('the module "%s" could not be imported because it is shadowed by:' % (module_name.split('.')[0])) in writer.get_stderr() def test_debugger_hide_pydevd_threads(case_setup, pyfile): @pyfile def target_file(): import threading from _pydevd_bundle import pydevd_constants found_pydevd_thread = False for t in threading.enumerate(): if getattr(t, 'is_pydev_daemon_thread', False): found_pydevd_thread = True if pydevd_constants.IS_CPYTHON: assert not found_pydevd_thread else: assert found_pydevd_thread print('TEST SUCEEDED') with case_setup.test_file(target_file) as writer: line = writer.get_line_index_with_content('TEST SUCEEDED') writer.write_add_breakpoint(line) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(line=line) writer.write_run_thread(hit.thread_id) writer.finished_ok = True # Jython needs some vars to be set locally. # set JAVA_HOME=c:\bin\jdk1.8.0_172 # set PATH=%PATH%;C:\bin\jython2.7.0\bin # set PATH=%PATH%;%JAVA_HOME%\bin # c:\bin\jython2.7.0\bin\jython.exe -m py.test tests_python if __name__ == '__main__': pytest.main(['-k', 'test_case_12'])