U
    /eR                     @   s   d dl Z d dlmZ d dlmZ d dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d d	lmZ e	jZd
d Zedd Ze dd Zdd Zdd Zdd Zejjde jkdddd Zejjdddd Z dd Z!d d! Z"dS )"    N)contextmanager)StringIO)capture_output)Session)BlockingInProcessKernelClient)InProcessKernel)InProcessKernelManager)assemble_outputc                 O   s>   t | tstt| f||}d|d ks.td|d d< |S )zg
    This patch jupyter_client.session:Session.msg to add a cell_id to the return message metadata
    ZcellIdmetadataZtest_cell_id)
isinstancer   AssertionErrororig_msg)_selfargskwargsres r   I/tmp/pip-unpacked-wheel-g43y689f/ipykernel/inprocess/tests/test_kernel.py_inject_cell_id   s
    r   c                   c   s   ztt_d V  W 5 t t_X d S N)r   r   msgr   r   r   r   r   patch_cell_id   s    
r   c                  c   s0   t  } |   |  }|  |  |V  d S r   )r   Zstart_kernelclientZstart_channelsZwait_for_ready)kmkcr   r   r   r   (   s    r   c              	   C   s    t   | d W 5 Q R X d S )Nz1+1)r   execute)r   r   r   r   test_with_cell_id2   s    r   c                 C   s6   t jddd}| d t| j\}}d|ks2tdS )z*Does %pylab work in the in-process kernel?Z
matplotlibzThis test requires matplotlibreasonz%pylabN)pytestZimportorskipr   r	   get_iopub_msgr   )r   _outerrr   r   r   
test_pylab7   s    
r$   c                 C   sH   t d}tj}|t_z| d W 5 |t_X | jjjddksDtdS )z6Does the in-process kernel handle raw_input correctly?zfoobar
zx = input()xZfoobarN)	r   sysstdinr   kernelshelluser_nsgetr   )r   ioZ	sys_stdinr   r   r   test_raw_input?   s    r-   Z__pypy__zfails on pypyr   c              	   C   st   t  }t }|jd W 5 Q R X |jdks2tt||jd} |j	|  | 
d t| j\}}|dksptdS )z0Does the in-process kernel correctly capture IO?print("foo")foo
r(   sessionzprint("bar")zbar
N)r   r   r)   run_cellstdoutr   r   r1   	frontendsappendr   r	   r    r   r(   r,   r"   r#   r   r   r   test_stdoutK   s    
r7   zDCurrently don't capture during test as pytest does its own capturingc              	   C   s~   t  }t }|jd W 5 Q R X |jdks2tt||jd} |j	|  | 
d | 
d t| j\}}|dksztdS )zDoes correctly capture fdr.   r/   r0   z	import oszos.system("echo capfd")zcapfd
N)r   r   r)   r2   r3   r   r   r1   r4   r5   r   r	   Ziopub_channelr6   r   r   r   
test_capfd[   s    

r8   c                 C   s&   t  }d|_dd |_|jdd dS )z5Tests that kernel getpass accept the stream parameterTc                  _   s   d S r   r   )r   r   r   r   r   <lambda>p       z%test_getpass_stream.<locals>.<lambda>z	non empty)streamN)r   Z_allow_stdinZ_input_requestgetpassr   r(   r   r   r   test_getpass_streaml   s    
r>   c                    s0   t  }|ddI d H  |jjd dks,td S )Nza=1Ta   )r   Z
do_executer)   r*   r   r=   r   r   r   test_do_executeu   s    rA   )#r&   
contextlibr   r,   r   r   ZIPython.utils.ior   Zjupyter_client.sessionr   Zipykernel.inprocess.blockingr   Zipykernel.inprocess.ipkernelr   Zipykernel.inprocess.managerr   Zipykernel.tests.utilsr	   r   r   r   r   Zfixturer   r   r$   r-   markZskipifbuiltin_module_namesr7   skipr8   r>   rA   r   r   r   r   <module>   s0   

	

	