U
    /e']                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZ d dlmZ d dlmZ d dlZd d	lmZmZ d d
lmZ d dlmZm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z*m+Z+m,Z,m-Z- er&d dl.m/Z/m0Z0 e1e2Z3G dd dZ4edZ5dddddZ6edddZ7G dd de$Z8dddd d!d"Z9ej:d#d$ Z;dS )%    )annotationsN)	Awaitable	Generator)suppress)isawaitable)TYPE_CHECKINGAnyClassVarTypeVar)gen)IOLoop)parse_bytesparse_timedelta)get_template)Statusrpc)Adaptive)Cluster)	Scheduler)Security)NoOpAwaitableTimeoutErrorimport_termsilence_logging)NannyWorkerc                   @  st   e Zd ZdZedd Zejdd ZdddZdd	 Zd
d Z	dd Z
dd Zdd Zdd Zdd Zdd ZdS )ProcessInterfacea)  
    An interface for Scheduler and Worker processes for use in SpecCluster

    This interface is responsible to submit a worker or scheduler process to a
    resource manager like Kubernetes, Yarn, or SLURM/PBS/SGE/...
    It should implement the methods below, like ``start`` and ``close``
    c                 C  s   | j S N)_statusself r!   ;/tmp/pip-unpacked-wheel-g426oqom/distributed/deploy/spec.pystatus,   s    zProcessInterface.statusc                 C  s"   t |tstd||| _d S )NzExpected Status; got )
isinstancer   	TypeErrorr   )r    Z
new_statusr!   r!   r"   r#   0   s    
Nc                 C  s4   t | dd | _d | _t | _tj| _t	 | _
d S )Naddress)getattrr&   external_addressasyncioLocklockr   createdr#   Event_event_finished)r    	schedulernamer!   r!   r"   __init__6   s
    
zProcessInterface.__init__c                   s    fdd}|   S )Nc                
     sN    j 4 I d H 0  jtjkr:  I d H   jtjks:tW 5 Q I d H R X  S r   )r+   r#   r   r,   startrunningAssertionErrorr!   r   r!   r"   _>   s
     z%ProcessInterface.__await__.<locals>._	__await__r    r5   r!   r   r"   r7   =   s    zProcessInterface.__await__c                   s   t j| _dS )a  Submit the process to the resource manager

        For workers this doesn't have to wait until the process actually starts,
        but can return once the resource manager has the request, and will work
        to make the job exist in the future

        For the scheduler we will expect the scheduler's ``.address`` attribute
        to be available after this completes.
        N)r   r3   r#   r   r!   r!   r"   r2   G   s    
zProcessInterface.startc                   s   t j| _| j  dS )a8  Close the process

        This will be called by the Cluster object when we scale down a node,
        but only after we ask the Scheduler to close the worker gracefully.
        This method should kill the process a bit more forcefully and does not
        need to worry about shutting down gracefully
        N)r   closedr#   r.   setr   r!   r!   r"   closeS   s    zProcessInterface.closec                   s   | j  I dH  dS )z"Wait until the server has finishedN)r.   waitr   r!   r!   r"   finished^   s    zProcessInterface.finishedc                 C  s"   dt jt|  d| jj dS )N<z	: status=>)daskutilstypenametyper#   r0   r   r!   r!   r"   __repr__b   s    zProcessInterface.__repr__c                 C  s   t dj| dS )Nzprocess_interface.html.j2)Zprocess_interface)r   renderr   r!   r!   r"   _repr_html_e   s    zProcessInterface._repr_html_c                   s   | I d H  | S r   r!   r   r!   r!   r"   
__aenter__h   s    
zProcessInterface.__aenter__c                   s   |   I d H  d S r   )r;   r    exc_type	exc_value	tracebackr!   r!   r"   	__aexit__l   s    zProcessInterface.__aexit__)NN)__name__
__module____qualname____doc__propertyr#   setterr1   r7   r2   r;   r=   rD   rF   rG   rL   r!   r!   r!   r"   r   #   s   



r   _TzAwaitable[_T])awreturnc                   s
   | I d H S r   r!   )rT   r!   r!   r"   _wrap_awaitables   s    rV   _T_spec_clusterSpecCluster)boundc                      s@  e Zd ZU dZe Zded< dA fdd		Z fd
dZ	dd Z
ddddZ fddZdddddZ fddZdd Z fddZddd d!Zddd"d#ZdBd%d&Zd'd( Zd)d* Zed+d, Zd-d. ZeZed/d0 Zed1d2 Zed$ejddddfd3d4d4d5d5d6d6d7d8d9	 fd:d;Ze d<d=d>d?d@Z!  Z"S )CrX   a  Cluster that requires a full specification of workers

    The SpecCluster class expects a full specification of the Scheduler and
    Workers to use.  It removes any handling of user inputs (like threads vs
    processes, number of cores, and so on) and any handling of cluster resource
    managers (like pods, jobs, and so on).  Instead, it expects this
    information to be passed in scheduler and worker specifications.  This
    class does handle all of the logic around asynchronously cleanly setting up
    and tearing things down at the right times.  Hopefully it can form a base
    for other more user-centric classes.

    Parameters
    ----------
    workers: dict
        A dictionary mapping names to worker classes and their specifications
        See example below
    scheduler: dict, optional
        A similar mapping for a scheduler
    worker: dict
        A specification of a single worker.
        This is used for any new workers that are created.
    asynchronous: bool
        If this is intended to be used directly within an event loop with
        async/await
    silence_logs: bool
        Whether or not we should silence logging when setting up the cluster.
    name: str, optional
        A name to use when printing out the cluster, defaults to type name

    Examples
    --------
    To create a SpecCluster you specify how to set up a Scheduler and Workers

    >>> from dask.distributed import Scheduler, Worker, Nanny
    >>> scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
    >>> workers = {
    ...     'my-worker': {"cls": Worker, "options": {"nthreads": 1}},
    ...     'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}},
    ... }
    >>> cluster = SpecCluster(scheduler=scheduler, workers=workers)

    The worker spec is stored as the ``.worker_spec`` attribute

    >>> cluster.worker_spec
    {
       'my-worker': {"cls": Worker, "options": {"nthreads": 1}},
       'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}},
    }

    While the instantiation of this spec is stored in the ``.workers``
    attribute

    >>> cluster.workers
    {
        'my-worker': <Worker ...>
        'my-nanny': <Nanny ...>
    }

    Should the spec change, we can await the cluster or call the
    ``._correct_state`` method to align the actual state to the specified
    state.

    We can also ``.scale(...)`` the cluster, which adds new workers of a given
    form.

    >>> worker = {'cls': Worker, 'options': {}}
    >>> cluster = SpecCluster(scheduler=scheduler, worker=worker)
    >>> cluster.worker_spec
    {}

    >>> cluster.scale(3)
    >>> cluster.worker_spec
    {
        0: {'cls': Worker, 'options': {}},
        1: {'cls': Worker, 'options': {}},
        2: {'cls': Worker, 'options': {}},
    }

    Note that above we are using the standard ``Worker`` and ``Nanny`` classes,
    however in practice other classes could be used that handle resource
    management like ``KubernetesPod`` or ``SLURMJob``.  The spec does not need
    to conform to the expectations of the standard Dask Worker class.  It just
    needs to be called with the provided options, support ``__await__`` and
    ``close`` methods and the ``worker_address`` property..

    Also note that uniformity of the specification is not required.  Other API
    could be added externally (in subclasses) that adds workers of different
    specifications into the same dictionary.

    If a single entry in the spec will generate multiple dask workers then
    please provide a `"group"` element to the spec, that includes the suffixes
    that will be added to each name (this should be handled by your worker
    class).

    >>> cluster.worker_spec
    {
        0: {"cls": MultiWorker, "options": {"processes": 3}, "group": ["-0", "-1", -2"]}
        1: {"cls": MultiWorker, "options": {"processes": 2}, "group": ["-0", "-1"]}
    }

    These suffixes should correspond to the names used by the workers when
    they deploy.

    >>> [ws.name for ws in cluster.scheduler.workers.values()]
    ["0-0", "0-1", "0-2", "1-0", "1-1"]
    z&ClassVar[weakref.WeakSet[SpecCluster]]
_instancesNFT   c                   sN  |d kr|rt  }t | _t|| _t|p6i | _t|| _d | _	i | _
d| _|p`t | _t | _|rt|d| _t|dd| _| j|  d | _|pt| j| _|	| _t j||||
d zt|dd t k}W n tk
r   |}Y nX |sJ| j !  | "| j# z| "| j$ W n$ t%k
rH   | "| j&  Y nX d S )Nr   )levelbokeh)r\   root)asynchronousloopr0   scheduler_sync_intervalZasyncio_loop)'r   currentweakrefWeakSet_createdcopyscheduler_specworker_specnew_specr/   workers_ir   securityr:   _futuresr   _old_logging_level_old_bokeh_logging_levelrZ   add_correct_state_waitingrC   rM   _nameshutdown_on_closesuperr1   r'   r)   get_running_loopRuntimeError_loop_runnerr2   sync_start_correct_state	Exceptionr;   )r    rj   r/   workerr_   r`   rl   Zsilence_logsr0   rs   ra   Zcalled_from_running_loop	__class__r!   r"   r1      sP    
 

zSpecCluster.__init__c              
     sj  | j tjkrtdI d H  q | j tjkr.d S | j tjkrBtdt | _	tj| _ | j
d krzdd l}W n tk
r~   Y n
X ddi}t|d| _
z| jd kr| j
d }t|trt|}|f | j
di | _| jI d H | _tt| jd	d p| jj| jd
d| _t  I d H  W nJ tk
rd } z*tj| _ |  I d H  td| |W 5 d }~X Y nX d S )Ng{Gz?zCluster is closedr   Z	dashboardT)clsoptionsr   r   r(   client)Zconnection_argszCluster failed to start: )r#   r   Zstartingr)   sleepr3   r9   
ValueErrorr*   _lockrg   Zdistributed.dashboardImportErrorr   r/   r$   strr   getr   r'   r&   rl   Zget_connection_argsscheduler_commrt   ry   r{   failed_closerv   )r    distributedr   r   er}   r!   r"   ry   %  s@    





zSpecCluster._startc                 C  s(   | j r| j S t|  }|| _ |S d S r   )rq   r)   ensure_future_correct_state_internal)r    Ztaskr!   r!   r"   rz   L  s
    zSpecCluster._correct_stateNonerU   c              
     s   j 4 I d H  d  _t jt j }|rx jjtjkrV j	j
t|dI d H   fdd|D }tj| I d H  |D ]}| jkr| j|= q|t jt j }g }|D ]} j| }|d |di  }}d|kr| }||d< t|tr t|}|t jdd p jjf|}	 j|	 ||	 q|rztdd |D I d H  |D ]}
t |
_|
I d H  q\ jtt|| W 5 Q I d H R X d S )	N)rj   c                   s*   g | ]"}| j krt j |  qS r!   )rj   r)   create_taskr;   .0wr   r!   r"   
<listcomp>]  s   
z7SpecCluster._correct_state_internal.<locals>.<listcomp>r   r   r0   Zcontact_addressc                 S  s   g | ]}t t|qS r!   r)   r   rV   r   r!   r!   r"   r   z  s     )r   rq   r:   rj   rh   r/   r#   r   r3   r   Zretire_workerslistr)   gatherr   rf   r$   r   r   r'   r&   re   rp   appendr<   rc   refZ_clusterupdatedictzip)r    to_closeZtasksr0   Zto_openrj   dr   optsr|   r   r!   r   r"   r   U  sN    




z#SpecCluster._correct_state_internalc                   s\   |dkrJj d   d  fdd}ttjd}t || t 	|  d S )Nremoverj   r0   c                     s`   j kr\ jd kr\tfddjd  D s\jtj    j = d S )Nrj   c                 3  s   | ]}|d   kV  qdS )r0   Nr!   )r   r   r0   r!   r"   	<genexpr>  s   z?SpecCluster._update_worker_status.<locals>.f.<locals>.<genexpr>)	rj   scheduler_infoanyvaluesrm   rp   r)   r   r;   r!   msgr0   r    r!   r"   f  s    z,SpecCluster._update_worker_status.<locals>.fz&distributed.deploy.lost-worker-timeout)
r   r   r@   configr   r)   ru   Z
call_laterrt   _update_worker_status)r    opr   r   delayr}   r   r"   r     s    
z!SpecCluster._update_worker_statusrW   z$Generator[Any, Any, _T_spec_cluster])r    rU   c                   s   dd fdd}|   S )NrW   r   c                     s^    j tjkr  I d H   jI d H    I d H   jrZtdd  j	 D I d H   S )Nc                 S  s   g | ]}t t|qS r!   r   r   r!   r!   r"   r     s   z4SpecCluster.__await__.<locals>._.<locals>.<listcomp>)
r#   r   r,   ry   r/   rz   rj   r)   r<   r   r!   r   r!   r"   r5     s    
z SpecCluster.__await__.<locals>._r6   r8   r!   r   r"   r7     s    zSpecCluster.__await__c              
     s  | j tjkrtdI d H  q | j tjkr.d S | j tjksH| j tjkrZtj| _ tt	 | j
  W 5 Q R X | d}t|r|I d H  |  I d H  tj| j I d H  | jr| j4 I d H : tt | j I d H  W 5 Q R X | j I d H  W 5 Q I d H R X n
td | jr*| j I d H  | jD ](}|j tjtjtjhks0t|j q0t| drpt| j t| drt| jdd t  ! I d H  d S )Ng?r   z"Cluster closed without starting uprn   ro   r]   )r^   )"r#   r   closingr)   r   r9   r3   r   r   AttributeErrorZ	_adaptivestopscaler   rz   r   rm   r   r   OSError	terminateZ	close_rpcloggerwarningr/   r;   re   r4   hasattrr   rn   ro   rt   r   )r    r   r   r}   r!   r"   r     sD    



"



zSpecCluster._closec                   s,   | I d H  |   I d H  | jtjks(t| S r   )rz   r#   r   r3   r4   r   r!   r!   r"   rG     s    
zSpecCluster.__aenter__c                   s   t  ||| | j  d S r   )rt   __exit__rw   r   rH   r}   r!   r"   r     s    zSpecCluster.__exit__intc                 C  sR   | j stddD ]2}tt  | j d | W  5 Q R    S Q R X qtddS )z7Return the number of threads per worker for new workersz4To scale by cores= you must specify cores per worker)ZnthreadsZncoresthreadscoresr   ZunreachableN)ri   r   r   KeyErrorrv   r    r0   r!   r!   r"   _threads_per_worker  s    
&zSpecCluster._threads_per_workerc                 C  sV   | j stddD ]6}tt$ t| j d | W  5 Q R    S Q R X qtddS )z2Return the memory limit per worker for new workerszQto scale by memory= your worker definition must include a memory_limit definition)Zmemory_limitmemoryr   zVto use scale(memory=...) your worker definition must include a memory_limit definitionN)ri   r   r   r   r   r   r!   r!   r"   _memory_per_worker  s    
*zSpecCluster._memory_per_workerr   c                 C  s  |d k	r(t |ttt||   }|d k	rLt |tt||   }t| j|krt	| jdd | j
d  D  }t| j|kr|r| j| = q|t| j|kr| j  q| jtjtjfkrt| j|k r| j|   q| j| j | jrt S d S )Nc                 S  s   h | ]}|d  qS r   r!   )r   vr!   r!   r"   	<setcomp>  s    z$SpecCluster.scale.<locals>.<setcomp>rj   )maxr   mathceilr   r   r   lenrh   r:   r   r   poppopitemr#   r   r   r9   r   new_worker_specr`   Zadd_callbackrz   r_   r   )r    nr   r   Znot_yet_launchedr!   r!   r"   r     s$     zSpecCluster.scalec                 C  s   |S )zReturns new worker name.

        This can be overridden in SpecCluster derived classes to customise the
        worker names.
        r!   )r    Zworker_numberr!   r!   r"   _new_worker_name  s    zSpecCluster._new_worker_namec                 C  s<   |  | j}|| jkr2|  jd7  _|  | j}q|| jiS )zReturn name and spec for the next worker

        Returns
        -------
        d: dict mapping names to worker specs

        See Also
        --------
        scale
        r[   )r   rk   rh   ri   )r    Znew_worker_namer!   r!   r"   r     s
    
zSpecCluster.new_worker_specc                 C  s
   t | jS r   )boolri   r   r!   r!   r"   _supports_scaling-  s    zSpecCluster._supports_scalingc                   s   t fdd|D spi  j D ]8\}}d|krT|d D ]}| t|| < q<q$| |< q$ fdd|D }|D ]}|jkrtj|= qtI d H  d S )Nc                 3  s   | ]}| j kV  qd S r   )rh   r   r   r!   r"   r   3  s     z)SpecCluster.scale_down.<locals>.<genexpr>groupc                   s   h | ]}  ||qS r!   )r   r   )mappingr!   r"   r   <  s     z)SpecCluster.scale_down.<locals>.<setcomp>)allrh   itemsr   )r    rj   r0   specsuffixr   r!   )r   r    r"   
scale_down1  s    


zSpecCluster.scale_downc                   sN   t  }| j D ]8\ }d|kr>| fdd|d D  q|  q|S )Nr   c                   s   h | ]}t  | qS r!   r   r   r   r   r!   r"   r   J  s     z#SpecCluster.plan.<locals>.<setcomp>)r:   rh   r   r   rp   r    outr   r!   r   r"   planE  s    zSpecCluster.planc              	     sn   t  }| jD ]\ z| j  }W n tk
r6   Y qY nX d|kr^| fdd|d D  q|  q|S )Nr   c                   s   h | ]}t  | qS r!   r   r   r   r!   r"   r   X  s     z(SpecCluster.requested.<locals>.<setcomp>)r:   rj   rh   r   r   rp   r   r!   r   r"   	requestedO  s    

zSpecCluster.requestedztype[Adaptive]floatz
int | Nonez
str | Noner   r   )	r   minimummaximumminimum_coresmaximum_coresminimum_memorymaximum_memorykwargsrU   c           	        s   |dk	r$t |pdt||   }|dk	rLt |p4dtt||   }|dk	rlt|t||   }|dk	rt|tt||   }t j	f |||d|S )a  Turn on adaptivity

        This scales Dask clusters automatically based on scheduler activity.

        Parameters
        ----------
        minimum : int
            Minimum number of workers
        maximum : int
            Maximum number of workers
        minimum_cores : int
            Minimum number of cores/threads to keep around in the cluster
        maximum_cores : int
            Maximum number of cores/threads to keep around in the cluster
        minimum_memory : str
            Minimum amount of memory to keep around in the cluster
            Expressed as a string like "100 GiB"
        maximum_memory : str
            Maximum amount of memory to keep around in the cluster
            Expressed as a string like "100 GiB"

        Examples
        --------
        >>> cluster.adapt(minimum=0, maximum_memory="100 GiB", interval='500ms')

        See Also
        --------
        dask.distributed.Adaptive : for more keyword arguments
        Nr   )r   r   r   )
r   r   r   r   r   r   minfloorrt   adapt)	r    r   r   r   r   r   r   r   r   r}   r!   r"   r   ]  s6    (    zSpecCluster.adaptr   r   )r0   rU   c                 C  s
   t  dS )zJCreate an instance of this class to represent an existing cluster by name.N)NotImplementedError)r   r0   r!   r!   r"   	from_name  s    zSpecCluster.from_name)
NNNFNNFNTr[   )r   NN)#rM   rN   rO   rP   rc   rd   rZ   __annotations__r1   ry   rz   r   r   r7   r   rG   r   r   r   r   r   r   rQ   r   r   Zscale_upr   r   r   r   infr   classmethodr   __classcell__r!   r!   r}   r"   rX   z   sX   
k          ='	,+



	
$?zdict[str, Any]r   zdict[str, Worker | Nanny])r   argsrU   c                   sz   i }|   D ]8\}}|d }t|tr.t|}|||di ||< q|rvtj|  I d H  | D ]}|I d H  qf|S )Nr   r   )r   r$   r   r   r   r)   r   r   )r   r   rj   kr   r   r   r!   r!   r"   run_spec  s    
r   c               
   C  sX   t tjD ]H} t| ddr
ttjt& t| dtjtjkrH| j	dd W 5 Q R X q
d S )Nrs   Fr#   
   )timeout)
r   rX   rZ   r'   r   r   r   r   r9   r;   )Zclusterr!   r!   r"   close_clusters  s
    r   )<
__future__r   r)   atexitrf   loggingr   rc   collections.abcr   r   
contextlibr   inspectr   typingr   r   r	   r
   Ztornador   Ztornado.ioloopr   r@   Z
dask.utilsr   r   Zdask.widgetsr   Zdistributed.corer   r   Zdistributed.deploy.adaptiver   Zdistributed.deploy.clusterr   Zdistributed.schedulerr   Zdistributed.securityr   Zdistributed.utilsr   r   r   r   r   r   r   	getLoggerrM   r   r   rS   rV   rW   rX   r   registerr   r!   r!   r!   r"   <module>   sH   
M    ,