U
    /e                     @  s~   d dl mZ d dlZd dlmZ d dlmZmZ d dlm	Z	 d dl
mZ d dlmZ eeZG dd	 d	ZG d
d dZdS )    )annotationsN)	stringify)
futures_ofwait)sync)	pack_data)_deserializec                   @  s<   e Zd ZdZdd Zdd Zdddd	Zd
dddZd
S )ReplayTaskSchedulerzA plugin for the scheduler to recreate tasks locally

    This adds the following routes to the scheduler

    *  get_runspec
    *  get_error_cause
    c                 C  s&   || _ | j| j jd< | j| j jd< d S )Nget_runspecget_error_cause)	schedulerr
   handlersr   )selfr    r   >/tmp/pip-unpacked-wheel-g426oqom/distributed/recreate_tasks.py__init__   s    zReplayTaskScheduler.__init__c                 C  s   t |trt|}t|}|S N)
isinstancelisttupler   )r   keyr   r   r   _process_key   s    
z ReplayTaskScheduler._process_keyr   keysc                O  sD   |D ]:}|  |}| jj|}|d k	r|jd k	r|jj  S qd S r   )r   r   tasksgetZexception_blamer   )r   r   argskwargsr   tsr   r   r   r   #   s
    
z#ReplayTaskScheduler.get_error_causeNr   c                O  s0   |  |}| jj|}|jdd |jD dS )Nc                 S  s   g | ]
}|j qS r   r   ).0Zdtsr   r   r   
<listcomp>-   s     z3ReplayTaskScheduler.get_runspec.<locals>.<listcomp>)taskdeps)r   r   r   r   Zrun_specZdependencies)r   r   r   r   r   r   r   r   r
   *   s    
zReplayTaskScheduler.get_runspec)__name__
__module____qualname____doc__r   r   r   r
   r   r   r   r   r	      s
   r	   c                   @  sT   e Zd ZdZdd Zedd Zdd Zdd	 Zd
d Z	dd Z
dd Zdd ZdS )ReplayTaskClienta  
    A plugin for the client allowing replay of remote tasks locally

    Adds the following methods to the given client:

    - ``recreate_error_locally``: main user method for replaying failed tasks
    - ``recreate_task_locally``: main user method for replaying any task
    c                 C  sR   || _ | | j jd< | j| j _| j| j _| j| j _| j| j _| j| j _| j| j _d S )Nzreplay-tasks)client
extensions_get_raw_components_from_future_prepare_raw_components_get_components_from_future_get_errored_futurerecreate_task_locallyrecreate_error_locally)r   r)   r   r   r   r   :   s    



zReplayTaskClient.__init__c                 C  s   | j jS r   )r)   r   )r   r   r   r   r   G   s    zReplayTaskClient.schedulerc           	        s   t |tr|}nt|I dH  |j}| jj|dI dH }|d |d  }}t |trptf |\}}}||||fS t|d\}}}||||fS dS )z~
        For a given future return the func, args and kwargs and future
        deps that would be executed remotely.
        Nr   r#   r"   )r"   )r   strr   r   r   r
   dictr   )	r   futurer   specr#   r"   functionr   r   r   r   r   r+   K   s    

z0ReplayTaskClient._get_raw_components_from_futurec                   sJ   |\}}}}| j i |}| j |I dH }t||}t||}|||fS )zF
        Take raw components and resolve future dependencies.
        N)r)   Z_graph_to_futuresZ_gatherr   )r   raw_componentsr5   r   r   r#   futuresdatar   r   r   r,   ^   s    

z(ReplayTaskClient._prepare_raw_componentsc                   s    |  |I dH }| |I dH S )z
        For a given future return the func, args and kwargs that would be
        executed remotely. Any args/kwargs that are themselves futures will
        be resolved to the return value of those futures.
        N)r+   r,   )r   r3   r6   r   r   r   r-   i   s    z,ReplayTaskClient._get_components_from_futurec                 C  s"   t | jj| j|\}}}|||S )aK  
        For any calculation, whether it succeeded or failed, perform the task
        locally for debugging.

        This operation should be performed after a future (result of ``gather``,
        ``compute``, etc) comes back with a status other than "pending". Cases
        where you might want to debug a successfully completed future could
        include a calculation that returns an unexpected results. A common
        debugging process might include running the task locally in debug mode,
        with `pdb.runcall`.

        Examples
        --------
        >>> import pdb                                    # doctest: +SKIP
        >>> future = c.submit(div, 1, 1)                  # doctest: +SKIP
        >>> future.status                                 # doctest: +SKIP
        'finished'
        >>> pdb.runcall(c.recreate_task_locally, future)  # doctest: +SKIP

        Parameters
        ----------
        future : future
            The same thing as was given to ``gather``.

        Returns
        -------
        Any; will return the result of the task future.
        )r   r)   loopr-   )r   r3   funcr   r   r   r   r   r/   r   s      
z&ReplayTaskClient.recreate_task_locallyc                   sD   t |I dH  dd t|D }|s,td| jj|dI dH }|S )zf
        For a given future collection, return the first future that raised
        an error.
        Nc                 S  s   g | ]}|j d kr|jqS )error)statusr   )r    fr   r   r   r!      s     
 z8ReplayTaskClient._get_errored_future.<locals>.<listcomp>zNo errored futures passedr   )r   r   
ValueErrorr   r   )r   r3   r7   Z	cause_keyr   r   r   r.      s    z$ReplayTaskClient._get_errored_futurec                 C  s   t | jj| j|}| |S )a  
        For a failed calculation, perform the blamed task locally for debugging.

        This operation should be performed after a future (result of ``gather``,
        ``compute``, etc) comes back with a status of "error", if the stack-
        trace is not informative enough to diagnose the problem. The specific
        task (part of the graph pointing to the future) responsible for the
        error will be fetched from the scheduler, together with the values of
        its inputs. The function will then be executed, so that ``pdb`` can
        be used for debugging.

        Examples
        --------
        >>> future = c.submit(div, 1, 0)         # doctest: +SKIP
        >>> future.status                        # doctest: +SKIP
        'error'
        >>> c.recreate_error_locally(future)     # doctest: +SKIP
        ZeroDivisionError: division by zero

        If you're in IPython you might take this opportunity to use pdb

        >>> %pdb                                 # doctest: +SKIP
        Automatic pdb calling has been turned ON

        >>> c.recreate_error_locally(future)     # doctest: +SKIP
        ZeroDivisionError: division by zero
              1 def div(x, y):
        ----> 2     return x / y
        ipdb>

        Parameters
        ----------
        future : future or collection that failed
            The same thing as was given to ``gather``, but came back with
            an exception/stack-trace. Can also be a (persisted) dask collection
            containing any errored futures.

        Returns
        -------
        Nothing; the function runs and should raise an exception, allowing
        the debugger to run.
        )r   r)   r9   r.   r/   )r   r3   Zerrored_future_keyr   r   r   r0      s    +z'ReplayTaskClient.recreate_error_locallyN)r$   r%   r&   r'   r   propertyr   r+   r,   r-   r/   r.   r0   r   r   r   r   r(   0   s   	
	"r(   )
__future__r   loggingZ
dask.utilsr   Zdistributed.clientr   r   Zdistributed.utilsr   Zdistributed.utils_commr   Zdistributed.workerr   	getLoggerr$   loggerr	   r(   r   r   r   r   <module>   s   
!