U
    /e                     @   s  U d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ d
ZdZdaejed< daeed< dd Zd(ddZedfddZd)ddZdd Zedd Zdd Zdd Zd*d d!Zd"d# Z d$d% Z!G d&d' d'eZ"dS )+z%utilities for testing IPython kernels    N)contextmanager)Empty)STDOUT)TemporaryDirectory)time)manager)BlockingKernelClient<   d   KMKCc               	   K   sN   t | d< zddl}| | d< W n ttfk
r8   Y nX tjf dti| S )zostart a new kernel, and return its Manager and Client

    Integrates with our output capturing for tests.
    stderrr   NstdoutZstartup_timeout)r   noseiptest_stdstreams_filenoImportErrorAttributeErrorr   start_new_kernelSTARTUP_TIMEOUT)kwargsr    r   9/tmp/pip-unpacked-wheel-g43y689f/ipykernel/tests/utils.pyr      s    r   c              	   C   s`   ddl m} | dkrt} | j| jfD ]6}z|dd}W n tk
rN   Y q$Y q(X || q(q$dS )z'flush any messages waiting on the queue   validate_messageNg?timeout)test_message_specr   r   Zget_shell_msgget_iopub_msgr   )kcr   get_msgmsgr   r   r   flush_channels(   s    
r"   shellc                 C   sd   t  }t| d| d}||d}|d d |kr4q`td| d|  t  }||| 8 }|}q|S )Nget__msgr   Zparent_headermsg_idzIgnoring reply not to z: )r   getattrprint)r   r&   r   Zchannelt0r    replyt1r   r   r   	get_reply8   s    
r,    c                 K   s   ddl m} |dkrt}|jf d| i|}t||t}||d| |jtd}||d| |d d	 d
kspt|ds|jtd}||d| |d d | kst|d drt	d
|d d tjd ||d fS )zBwrapper for doing common steps for validating an execution requestr   r   NcodeZexecute_replyr   statuscontentexecution_statebusyZsilentexecute_input	traceback
)file)r   r   r   executer,   TIMEOUTr   AssertionErrorgetr(   joinsysr   )r.   r   r   r   r&   r*   r2   r3   r   r   r   r7   G   s     
r7   c                   C   s*   t dkrt \a att ntt tS )zCstart the global kernel (if it isn't running) and return its clientN)r   r   r   atexitregisterstop_global_kernelr"   r   r   r   r   start_global_kernel`   s
    
r@   c                   c   s   t  V  dS )zContext manager for the global kernel instance

    Should be used for most kernel tests

    Returns
    -------
    kernel_client: connected KernelClient instance
    N)r@   r   r   r   r   kernelk   s    
rA   c                    s     fdd} j |_  j|_|S )z.Decorator for tests that use the global kernelc               	      s   t  }  |  W 5 Q R X d S N)rA   )r   test_fr   r   wrapped_test{   s    z!uses_kernel.<locals>.wrapped_test)__doc____name__)rD   rE   r   rC   r   uses_kernelx   s    rH   c                   C   s,   t   da tdkrdS tjdd dadS )z4Stop the global shared kernel instance, if it existsNT)now)r   Zstop_channelsr   Zshutdown_kernelr   r   r   r   r?      s    r?   c              	   C   sV   dt i}zddl}| |d< W n ttfk
r8   Y nX | dk	rJ| |d< tjf |S )zContext manager for a new kernel in a subprocess

    Should only be used for tests where the kernel must not be re-used.

    Returns
    -------
    kernel_client: connected KernelClient instance
    r   r   Nr   Zextra_arguments)r   r   r   r   r   r   Z
run_kernel)argvr   r   r   r   r   
new_kernel   s    	rK   c                 C   s   d}d}| dd}|d }|d }|dkr:|d dkr:qq|d d	kr|d
 dkr`||d 7 }q|d
 dkrz||d 7 }qt d|d
  qq||fS )z%assemble stdout/err from an executionr-   r   r   msg_typer0   r/   r1   idlestreamnamer   textr   zbad stream: %r)KeyError)r    r   r   r!   rL   r0   r   r   r   assemble_output   s    
rR   c                 C   s8   | j dd}|d }|d }|dkr |d dkr q4q d S )Nr   r   rL   r0   r/   r1   rM   )r   )r   r!   rL   r0   r   r   r   wait_for_idle   s
    rS   c                       s,   e Zd ZdZ fddZ fddZ  ZS )TemporaryWorkingDirectoryz
    Creates a temporary directory and sets the cwd to that directory.
    Automatically reverts to previous cwd upon cleanup.
    Usage example:

        with TemporaryWorkingDirectory() as tmpdir:
            ...
    c                    s    t  | _t | j t  S rB   )osgetcwdold_wdchdirrO   super	__enter__)self	__class__r   r   rZ      s    
z#TemporaryWorkingDirectory.__enter__c                    s   t | j t |||S rB   )rU   rX   rW   rY   __exit__)r[   excvaluetbr\   r   r   r^      s    z"TemporaryWorkingDirectory.__exit__)rG   
__module____qualname__rF   rZ   r^   __classcell__r   r   r\   r   rT      s   	rT   )N)r-   N)N)#rF   r=   rU   r<   
contextlibr   queuer   
subprocessr   tempfiler   r   Zjupyter_clientr   Zjupyter_client.blocking.clientr   r   r8   r   ZKernelManager__annotations__r   r   r"   r,   r7   r@   rA   rH   r?   rK   rR   rS   rT   r   r   r   r   <module>   s6   



	