U
    /e                     @  s   d dl mZ d dlZd dlZd dlZd dlZd dlmZ d dlZd dl	m
Z
 d dlZd dlmZmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ G d
d deZdS )    )annotationsN)suppress)
HTTPServer)get_address_hostget_tcp_server_addresses)Server)RoutingApplication)DequeHandlerclean_dashboard_addressget_versionsc                   @  sZ   e Zd ZdZdddZdd Zdd Zed	d
 ZdddddZ	dddZ
dddZdS )
ServerNodez?
    Base class for server nodes in a distributed cluster.
    Nc                 C  s
   t |dS )N)packagesr   )selfr    r   4/tmp/pip-unpacked-wheel-g426oqom/distributed/node.pyversions   s    zServerNode.versionsc           	      C  sN  |dkrd}| j  D ]0\}}d }t|tr8|\}}nd}t|trP|d}t|ttfrt|dkr|d t|d  }}n"t|dkr|d \}}nt	|t|tr|\}}ni }z>|| fd| j
i|}||d k	r|n||f || j|< W q tk
rF } z,tjd| d	| d
d t| dd W 5 d }~X Y qX qd S )Nz0.0.0.0 r   :      Zio_loopz
Could not launch service 'z
' on port z. zGot the following message:

   )
stacklevel)Zservice_specsitems
isinstancetuplestrsplitlistlenint
ValueErrorZlooplistenservices	Exceptionwarningswarn)	r   Zdefault_listen_ipkvZ	listen_ipportkwargsserviceer   r   r   start_services"   sB    





zServerNode.start_servicesc                 C  sP   t | dr4| jjD ] }t |drt|jr|  q| j D ]}|  q>d S )Nhttp_applicationstop)hasattrr.   Zapplicationscallabler/   r#   values)r   Zapplicationr+   r   r   r   stop_servicesK   s    

zServerNode.stop_servicesc                 C  s   dd | j  D S )Nc                 S  s   i | ]\}}||j qS r   r)   ).0r'   r(   r   r   r   
<dictcomp>U   s      z,ServerNode.service_ports.<locals>.<dictcomp>)r#   r   )r   r   r   r   service_portsS   s    zServerNode.service_portszlogging.LoggerNone)loggerreturnc                 C  sP   t tjdd| _| jttjd || j t	
| |j| j d S )Nzdistributed.admin.log-length)nzdistributed.admin.log-format)r	   daskconfigget_deque_handlersetFormatterlogging	Formatter
addHandlerweakreffinalizeremoveHandler)r   r9   r   r   r   _setup_loggingW   s    
zServerNode._setup_loggingr   Fc                 C  sr   | j }g }t|jD ]X\}}|r(||ks2|j|k r6 qn|rV||j|j||f q||j||f q|S )a!  
        Fetch log entries for this node

        Parameters
        ----------
        start : float, optional
            A time (in seconds) to begin filtering log entries from
        n : int, optional
            Maximum number of log entries to return from filtered results
        timestamps : bool, default False
            Do we want log entries to include the time they were generated?

        Returns
        -------
        List of tuples containing the log level, message, and (optional) timestamp for each filtered entry
        )r?   	enumeratedequecreatedappend	levelnameformat)r   startr;   Z
timestampsZdeque_handlerLcountmsgr   r   r   get_logsa   s    zServerNode.get_logsc              
   C  s  t || _tjd}tjd}tjd}|rRtj|tjjd}|j	||d t
| j|d| _t|pj|}|D ]}	|	d dkr| j}
t|
ttfr|
d	 }
|
rtt t|
|	d< W 5 Q R X d
}d}z6|s| jjf |	 n| jjf t|	dd	i W qrW q tk
r.   d}|d }|dk r* Y qX qqrt| j}|d	 d | j_| j| jd< tdd |D dd |D D ]6\}}||krx|d	krxtd| d| d qxdS )z0This creates an HTTP Server running on this nodez'distributed.scheduler.dashboard.tls.keyz(distributed.scheduler.dashboard.tls.certz+distributed.scheduler.dashboard.tls.ca-file)cafilepurpose)keyfile)ssl_optionsaddressNr   Fr   r)   Tr   Z	dashboardc                 S  s   g | ]}|d  qS r4   r   )r5   ar   r   r   
<listcomp>   s     z0ServerNode.start_http_server.<locals>.<listcomp>c                 S  s   g | ]}|d  qS )r   r   )r5   br   r   r   rY      s     zPort z` is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port z instead)r   r.   r<   r=   r>   sslcreate_default_contextPurposeCLIENT_AUTHload_cert_chainr   Zhttp_serverr
   Z_start_addressr   r   r   r   r!   r   r"   tlzmerger$   r   r)   r#   zipr%   r&   )r   ZroutesZdashboard_addressdefault_portrV   Ztls_keyZtls_certZtls_ca_fileZhttp_addressesZhttp_addressrW   Zchange_portZretries_leftZbound_addressesexpectedactualr   r   r   start_http_server~   sV    
 


 zServerNode.start_http_server)N)r   NF)r   N)__name__
__module____qualname____doc__r   r-   r3   propertyr7   rG   rR   rf   r   r   r   r   r      s   	
)


   r   )
__future__r   rA   r[   r%   rD   
contextlibr   r`   Ztornado.httpserverr   r<   Zdistributed.commr   r   Zdistributed.corer   Zdistributed.http.routingr   Zdistributed.utilsr	   r
   Zdistributed.versionsr   r   r   r   r   r   <module>   s   