U
    /eJ                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlZd dlmZmZmZmZ d d	lmZ d d
lmZ d dlmZ d dlm Z  d dl!m"Z" d dl#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z* e+e,Z-G dd de(Z.dS )    )annotationsN)suppress)isawaitable)Any)parse)IOLoop)_deprecatedformat_bytesparse_timedeltatypename)get_template)PeriodicCallback)Status)Adaptive)SchedulerInfo)LogLogs
LoopRunnerNoOpAwaitableSyncMethodMixinformat_dashboard_link
log_errorsc                   @  s  e Zd ZU dZdZdZded< d]dd	Zedd
ddZ	e	j
dddddZ	edd Zej
dd Zdd Zdd Zdd Zd^ddZejfddZdd Zd d! Zefd"d#d$d%d&d'Zd(dd)d*d+Zd,d- Zd_d.d/Zd`d0d1Zed2d3d4d5 Zd6d7 Zed8d9 Zd:d; Zd<d= Z dad>d?Z!d@dA Z"dBdC Z#dDdE Z$dFdG Z%dHdI Z&dJdK Z'edLd
dMdNZ(edOdP Z)dQdR Z*edSdT Z+edUdV Z,edWdX Z-dYdZ Z.d[d\ Z/dS )bClustera  Superclass for cluster objects

    This class contains common functionality for Dask Cluster manager classes.

    To implement this class, you must provide

    1.  A ``scheduler_comm`` attribute, which is a connection to the scheduler
        following the ``distributed.core.rpc`` API.
    2.  Implement ``scale``, which takes an integer and scales the cluster to
        that many workers, or else set ``_supports_scaling`` to False

    For that, you should get the following:

    1.  A standard ``__repr__``
    2.  A live IPython widget
    3.  Adaptive scaling
    4.  Integration with dask-labextension
    5.  A ``scheduler_info`` attribute which contains an up-to-date copy of
        ``Scheduler.identity()``, which is used for much of the above
    6.  Methods to gather logs
    TNzIOLoop | None_Cluster__loopF   c                 C  s   t ||d| _di i| _i | _d | _d | _g | _|| _d | _d | _	t
|dd| _d | _|d krrtt d d }|tt| d| _tj| _d S )N)loopasynchronousworkerssecondsdefault   )nametype)r   _loop_runnerscheduler_infoperiodic_callbacks_watch_worker_status_comm_watch_worker_status_task_cluster_manager_logsquietscheduler_comm	_adaptiver
   _sync_interval_sync_cluster_info_taskstruuidZuuid4r   r#   _cluster_infor   createdstatus)selfr   r   r*   r"   Zscheduler_sync_interval r5   >/tmp/pip-unpacked-wheel-g426oqom/distributed/deploy/cluster.py__init__>   s(    
 
zCluster.__init__)returnc                 C  s    | j }|d kr| jj | _ }|S N)r   r$   r   )r4   r   r5   r5   r6   r   ^   s    zCluster.loopr   None)valuer8   c                 C  s*   t jdtdd |d kr td|| _d S )Nz'setting the loop property is deprecated   )
stacklevelzexpected an IOLoop, got None)warningswarnDeprecationWarning
ValueErrorr   )r4   r;   r5   r5   r6   r   i   s      c                 C  s
   | j d S Nr"   r1   r4   r5   r5   r6   r"   r   s    zCluster.namec                 C  s   || j d< d S rB   rC   )r4   r"   r5   r5   r6   r"   v   s    c                   s   | j  I d H }d|_|ddiI d H  t| I d H | _|| _t	| 
|| _| j jdgi dI d H }| j| t	|  | _| j D ]}|  qtj| _d S )NzCluster worker statusopZsubscribe_worker_statuscluster-manager-info)keysr    )r+   Z	live_commr"   writer   readr%   r'   asyncioZensure_future_watch_worker_statusr(   get_metadatar1   update_sync_cluster_infor.   r&   valuesstartr   runningr3   )r4   comminfopcr5   r5   r6   _startz   s"     
zCluster._startc                   s   d}d}d| j  }| jtjkrz&| jjdg| j dI d H  d}W nJ tj	k
r^   Y qY n2 t
k
r   |d7 }||krtjddd	 Y nX t|| j d
|  }t|I d H  qd S )Nr      
   rF   )rG   r;   r   zWFailed to sync cluster info multiple times - perhaps there's a connection issue? Error:T)exc_infog      ?)r-   r3   r   rQ   r+   Zset_metadatar1   copyrJ   CancelledError	Exceptionloggerwarningminsleep)r4   Z	err_countZwarn_atZmax_intervalintervalr5   r5   r6   rN      s*    
zCluster._sync_cluster_infoc              	     s   | j tjkrd S tj| _ tt | j  W 5 Q R X | jrL| j	 I d H  | j
r^| j
I d H  | jr| j  ttj | jI d H  W 5 Q R X | jr| j I d H  | j D ]}|  qtj| _ d S r9   )r3   r   closedclosingr   AttributeErrorr,   stopr'   closer(   r.   cancelrJ   rZ   r+   Z	close_rpcr&   rO   )r4   rT   r5   r5   r6   _close   s$    


zCluster._closec              
   C  sL   | j tjkr| jrt S d S tt | j| j|dW  5 Q R  S Q R X d S )N)Zcallback_timeout)	r3   r   ra   r   r   r   RuntimeErrorsyncrg   )r4   timeoutr5   r5   r6   re      s    
zCluster.closec                 C  s^   t | dtjtjkrZzt| }W n$ tk
rD   dt|  }Y nX |d| t| d d S )Nr3   zwith a broken __repr__ zunclosed cluster )source)getattrr   ra   reprr[   object__repr__ResourceWarning)r4   _warnZself_rr5   r5   r6   __del__   s    zCluster.__del__c              	     sl   z|  I dH }W n tk
r*   Y qZY nX t   |D ]\}}| || q8W 5 Q R X q | I dH  dS )z>Listen to scheduler for updates on adding and removing workersN)rI   OSErrorr   _update_worker_statusre   )r4   rR   ZmsgsrE   msgr5   r5   r6   rK      s    
zCluster._watch_worker_statusc                 C  sV   |dkr0| d}| jd | | j| n"|dkrF| jd |= ntd||d S )Naddr   removez
Invalid op)popr%   rM   rA   )r4   rE   ru   r   r5   r5   r6   rt      s    
zCluster._update_worker_statusztype[Adaptive]r   r   )r   kwargsr8   c              	   K  sP   t t | j  W 5 Q R X t| ds.i | _| j| || f| j| _| jS )zTurn on adaptivity

        For keyword arguments see dask.distributed.Adaptive

        Examples
        --------
        >>> cluster.adapt(minimum=0, maximum=10, interval='500ms')
        _adaptive_options)r   rc   r,   rd   hasattrrz   rM   )r4   r   ry   r5   r5   r6   adapt   s    	

zCluster.adaptint)nr8   c                 C  s
   t  dS )zScale cluster to n workers

        Parameters
        ----------
        n : int
            Target number of workers

        Examples
        --------
        >>> cluster.scale(10)  # scale cluster to ten workers
        N)NotImplementedError)r4   r~   r5   r5   r6   scale  s    zCluster.scalec                 C  s(   | j tj |f | js$t| dS )aC  Log a message.

        Output a message to the user and also store for future retrieval.

        For use in subclasses where initialisation may take a while and it would
        be beneficial to feed back to the user.

        Examples
        --------
        >>> self._log("Submitted job X to batch scheduler")
        N)r)   appenddatetimenowr*   print)r4   logr5   r5   r6   _log  s    zCluster._logc           	        s   t  }|r(tddd | jD |d< |rX| j I d H }tddd |D |d< |r|dkrhd }| jj|dI d H }| D ]$\}}tdd	d |D ||< q|S )
N
c                 s  s   | ]}|d  V  qdS )r   Nr5   ).0liner5   r5   r6   	<genexpr>/  s     z$Cluster._get_logs.<locals>.<genexpr>r   c                 s  s   | ]\}}|V  qd S r9   r5   r   levelr   r5   r5   r6   r   4  s     Z	SchedulerT)r   c                 s  s   | ]\}}|V  qd S r9   r5   r   r5   r5   r6   r   ;  s     )r   r   joinr)   r+   get_logsZworker_logsitems)	r4   cluster	schedulerr   logsLdkvr5   r5   r6   	_get_logs*  s    zCluster._get_logsc                 C  s   | j | j|||dS )ae  Return logs for the cluster, scheduler and workers

        Parameters
        ----------
        cluster : boolean
            Whether or not to collect logs for the cluster manager
        scheduler : boolean
            Whether or not to collect logs for the scheduler
        workers : boolean or Iterable[str], optional
            A list of worker addresses to select.
            Defaults to all workers if `True` or no workers if `False`

        Returns
        -------
        logs: Dict[str]
            A dictionary of logs, with one item for the scheduler and one for
            each worker
        )r   r   r   )ri   r   )r4   r   r   r   r5   r5   r6   r   ?  s       zCluster.get_logsr   )Zuse_insteadc                 O  s   | j ||S r9   )r   )r4   argsry   r5   r5   r6   r   V  s    zCluster.logsc                 C  sJ   ddl m} z | }|r*|j| kr*|W S W n tk
r@   Y nX || S )zReturn client for the cluster

        If a client has already been initialized for the cluster, return that
        otherwise initialize a new client object.
        r   )Client)Zdistributed.clientr   currentr   rA   )r4   r   Zcurrent_clientr5   r5   r6   
get_clientZ  s    
zCluster.get_clientc                 C  s\   z| j d d }W n tk
r(   Y dS X | jdd dd dd }t||S d S )	NZservicesZ	dashboard z://r   /r   :)r%   KeyErrorscheduler_addresssplitr   )r4   porthostr5   r5   r6   dashboard_linkj  s    $zCluster.dashboard_linkc                 C  s   | j r| j jrd}nd}t| jd }t| drJtdd | j D }nt| dr`t| j}n|}||krp|n| d| }d| d	| d
S )Nr   ZManualr   worker_specc                 s  s&   | ]}d |krdn
t |d  V  qdS )groupr   N)len)r   Zeachr5   r5   r6   r   {  s   z*Cluster._scaling_status.<locals>.<genexpr>z / zM
        <table>
            <tr><td style="text-align: left;">Scaling mode: zB</td></tr>
            <tr><td style="text-align: left;">Workers: z$</td></tr>
        </table>
        )	r,   Zperiodic_callbackr   r%   r{   sumr   rO   r   )r4   moder   	requestedZworker_countr5   r5   r6   _scaling_statust  s"    


zCluster._scaling_statusc                   s  zj W S  tk
r   Y nX z,ddlm}m}m}m}m}m}m	}m
} W n tk
rf   d_ Y dS X |dd}	| jrJ|dd|	d|d|	d}
|dd	|	d|dd
|	d|d|	d}|||
g||gg|ddd}d|_|dd |dd fdd}|| tfdd}|
| n|d}| | }||gg|_|dd |dd |_ fddttjjddd  fdd}j| |S ) z3Create IPython widget for display within a notebookr   )HTML	AccordionButtonHBoxIntTextLayoutTabVBoxNZ150px)widthZWorkers)descriptionlayoutZScaleZMinimumZMaximumZAdaptZ500px)Z	min_width)r   zManual Scalingr   zAdaptive Scalingc                   s   j j jd   d S )N)minimummaximum)r|   r;   )b)r   r   r4   rM   r5   r6   adapt_cb  s    z!Cluster._widget.<locals>.adapt_cbc              	     s8    j }tt j  W 5 Q R X |   d S r9   )r;   r   rc   r,   rd   r   )r   r~   )requestr4   rM   r5   r6   scale_cb  s
    

z!Cluster._widget.<locals>.scale_cbr   r   ZScalingc                     s     _  _d S r9   )_repr_html_r;   r   r5   )scale_statusr4   r3   r5   r6   rM     s    
zCluster._widget.<locals>.updatez(distributed.deploy.cluster-repr-intervalmsr   c                    s$   t  d } | jd< |   d S )Ni  zcluster-repr)r   r&   rP   )rT   )cluster_repr_intervalr4   rM   r5   r6   install  s    
z Cluster._widget.<locals>.install)Z_cached_widgetrc   
ipywidgetsr   r   r   r   r   r   r   r   ImportErrorr   _supports_scalingZselected_index	set_titleZon_clickr   r   childrenr
   daskconfiggetr   Zadd_callback)r4   r   r   r   r   r   r   r   r   r   r   r|   Z	accordionr   r   tabr   r5   )r   r   r   r   r   r4   r3   rM   r6   _widget  sV    ,


zCluster._widgetc                 C  sR   z| j  }W n tk
r&   d}Y nX tdjt| j| j| j d | j||dS )NzScheduler not started yet.zcluster.html.j2r   )r#   r"   r   r   scheduler_info_reprcluster_status)	r%   r   rc   r   renderr#   __name__r"   r   )r4   r   r   r5   r5   r6   r     s    
zCluster._repr_html_c                 K  s   ddl m} |  }|rvddl}t|jtdkrh|jf |p@i }t| |d< |  |d< ||dd q||f| nt| |  d	}||dd dS )
z%Display the cluster rich IPython reprr   )displayNz8.0.0
text/plain	text/htmlT)raw)r   r   )	ZIPython.displayr   r   r   parse_version__version__Z_repr_mimebundle_rm   r   )r4   ry   r   Zwidgetr   Z
mimebundler5   r5   r6   _ipython_display_  s    zCluster._ipython_display_c                 C  s   |  | jS r9   )ri   
__aenter__rD   r5   r5   r6   	__enter__	  s    zCluster.__enter__c                 C  s   |  | j|||S r9   )ri   	__aexit__)r4   exc_type	exc_value	tracebackr5   r5   r6   __exit__  s    zCluster.__exit__c                 c  s   | S r9   r5   rD   r5   r5   r6   	__await__  s    zCluster.__await__c                   s   | I d H  | S r9   r5   rD   r5   r5   r6   r     s    
zCluster.__aenter__c                   s   |   }t|r|I d H  d S r9   )re   r   )r4   r   r   r   fr5   r5   r6   r     s    zCluster.__aexit__r/   c                 C  s   | j s
dS | j jS )Nz<Not Connected>)r+   addressrD   r5   r5   r6   r     s    zCluster.scheduler_addressc                 C  s   t | dt| jS )N_name)rl   r#   r   rD   r5   r5   r6   _cluster_class_name"  s    zCluster._cluster_class_namec              	   C  sz   d| j | j| jt| jd tdd | jd  D f }dd | jd  D }t|rn|dtt| 7 }|d7 }|S )	Nz!%s(%s, %r, workers=%d, threads=%dr   c                 s  s   | ]}|d  V  qdS )ZnthreadsNr5   r   wr5   r5   r6   r   ,  s     z#Cluster.__repr__.<locals>.<genexpr>c                 S  s   g | ]}|d  qS )Zmemory_limitr5   r   r5   r5   r6   
<listcomp>/  s     z$Cluster.__repr__.<locals>.<listcomp>z	, memory=))	r   r"   r   r   r%   r   rO   allr	   )r4   textZmemoryr5   r5   r6   ro   &  s    zCluster.__repr__c                 C  s
   t | jS r9   setr   rD   r5   r5   r6   plan6  s    zCluster.planc                 C  s
   t | jS r9   r   rD   r5   r5   r6   r   :  s    zCluster.requestedc                 C  s   dd | j d  D S )Nc                 S  s   h | ]}|d  qS )r"   r5   )r   r   r5   r5   r6   	<setcomp>@  s     z#Cluster.observed.<locals>.<setcomp>r   )r%   rO   rD   r5   r5   r6   observed>  s    zCluster.observedc                 C  s   t |t | ko| j|jkS r9   )r#   r"   )r4   otherr5   r5   r6   __eq__B  s    zCluster.__eq__c                 C  s   t | S r9   )idrD   r5   r5   r6   __hash__E  s    zCluster.__hash__)FNFNr   )N)TTT)TTT)N)0r   
__module____qualname____doc__r   r   __annotations__r7   propertyr   setterr"   rU   rN   rg   re   r>   r?   rr   rK   rt   r   r|   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   ro   r   r   r   r   r   r5   r5   r5   r6   r   $   sp   
     
 


%





	U




r   )/
__future__r   rJ   r   loggingr0   r>   
contextlibr   inspectr   typingr   Zpackaging.versionr   r   Ztornado.ioloopr   Zdask.configr   Z
dask.utilsr   r	   r
   r   Zdask.widgetsr   Zdistributed.compatibilityr   Zdistributed.corer   Zdistributed.deploy.adaptiver   Zdistributed.objectsr   Zdistributed.utilsr   r   r   r   r   r   r   	getLoggerr   r\   r   r5   r5   r5   r6   <module>   s(   $

