U
    /eh7                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZmZmZmZ d dlZd dlmZmZ d dlm Z m!Z!m"Z"m#Z#m$Z$ d dl%m&Z& d dl'm(Z(m)Z)m*Z* d dl+m,Z,m-Z-m.Z.m/Z/ e0e1Z2dZ3ej45ej67dZ8G dd deZ9G dd de Z:G dd de Z;G dd de;Z<G dd de$Z=G dd de=Z>G dd de"Z?G d d! d!e?Z@G d"d# d#e(ZAG d$d% d%e(ZBeA e&d&< eB e&d'< dS )(    )annotationsN)Callable)SSLError)Any)web)HTTPClientErrorHTTPRequest)
HTTPServer)StreamClosedError)WebSocketClientConnectionWebSocketClosedErrorWebSocketHandlerwebsocket_connect)parse_host_portunparse_host_port)CommCommClosedError	ConnectorFatalCommClosedErrorListener)backends)BaseTCPBackend_expect_tls_contextconvert_stream_closed_error)ensure_concrete_hostfrom_framesget_tcp_server_address	to_framesl    d(	 z!distributed.comm.websockets.shardc                      sd   e Zd Zdddd fddZdd Zd	d
 Zdd Zdd Z fddZe	ddddZ
  ZS )	WSHandlerNTbooldeserializeallow_offloadc                   s6   || _ || _|| _|| _| | _t j||f| d S N)handlerr!   r"   requestlistenersuper__init__)selfZapplicationr%   r$   r!   r"   r&   kwargs	__class__ 7/tmp/pip-unpacked-wheel-g426oqom/distributed/comm/ws.pyr(   8   s    
zWSHandler.__init__c                   s@   |  d t | _d| _t| | j| jd| _t	| 
  d S )NTFr    )Zset_nodelayasyncioQueueqclosedWSHandlerCommr!   r"   commZensure_futureon_openr)   r-   r-   r.   openI   s    

  zWSHandler.openc                   sN   z| j | jI d H  W n tk
r6   td Y nX | | jI d H  d S )Nz,Connection closed before handshake completed)r&   Zon_connectionr4   r   loggerdebugr$   r6   r-   r-   r.   r5   T   s
    zWSHandler.on_openc                   s   | j |I d H  d S r#   )r1   put)r)   msgr-   r-   r.   
on_message[   s    zWSHandler.on_messagec                 C  s   d| _ | jt d S NT)r2   r1   
put_nowaitr   r6   r-   r-   r.   on_close^   s    zWSHandler.on_closec                   s   t    d| _d S r=   )r'   closer2   r6   r+   r-   r.   r@   b   s    
zWSHandler.closeintreturnc                 C  s   | j dtS )NZwebsocket_max_message_size)settingsgetMAX_MESSAGE_SIZEr6   r-   r-   r.   max_message_sizef   s    zWSHandler.max_message_size)NTTN)__name__
__module____qualname__r(   r7   r5   r<   r?   r@   propertyrG   __classcell__r-   r-   r+   r.   r   7   s       r   c                      sx   e Zd Zddddd fddZddd	Zdd
dZdd ZeddddZeddddZ	dd Z
dd Z  ZS )r3   Tr   r   )r$   r!   r"   c                   s   || _ || _t j|d d S )Nr!   )r$   r"   r'   r(   )r)   r$   r!   r"   r+   r-   r.   r(   l   s    zWSHandlerComm.__init__Nc                   s   z j j I d H }W n tk
r0   t Y nX |tkrBt ntd|d } fddt|D I d H }t| j	| j
dI d H S )NQr   c                   s   g | ]} j j I d H qS r#   )r$   r1   rE   .0_r6   r-   r.   
<listcomp>   s     z&WSHandlerComm.read.<locals>.<listcomp>r!   deserializersr"   )r$   r1   rE   RuntimeErrorr   structunpackranger   r!   r"   )r)   rT   n_framesframesr-   r6   r.   readv   s    zWSHandlerComm.readc           	   
     s   t || j||| j| jd| jtdI d H }tdt|}d}zZ| j	j
|ddI d H  |D ]:}t|tk	rtt|}| j	j
|ddI d H  |t|7 }q\W n. tk
r } ztt|W 5 d }~X Y nX |S N)ZsenderZ	recipient)r"   serializerson_errorcontextZframe_split_sizerN   r   T)binary)r   r"   
local_inforemote_infohandshake_optionsBIG_BYTES_SHARD_SIZErV   packlenr$   write_messagetypebytesr   r   str	r)   r;   r]   r^   rZ   nZnbytes_framesframeer-   r-   r.   write   s0    zWSHandlerComm.writec                 C  s   | j   d S r#   r$   r@   r6   r-   r-   r.   abort   s    zWSHandlerComm.abortrj   rB   c                 C  s
   | j jjS r#   )r$   r%   hostr6   r-   r-   r.   local_address   s    zWSHandlerComm.local_addressc                 C  s    | j jj}t|tst|d S )Nz:0)r$   r%   Z	remote_ip
isinstancerj   AssertionError)r)   ipr-   r-   r.   peer_address   s    
zWSHandlerComm.peer_addressc                 C  s,   | j jp*| j j p*| j jjjo*| j jjjjS r#   )r$   r2   Zws_connectionr%   
connectionstreamr6   r-   r-   r.   r2      s    zWSHandlerComm.closedc                   s   | j   d S r#   rp   r6   r-   r-   r.   r@      s    zWSHandlerComm.close)TT)N)NN)rH   rI   rJ   r(   r[   ro   rq   rK   rs   rw   r2   r@   rL   r-   r-   r+   r.   r3   k   s     


r3   c                      s   e Zd ZdZddddd fddZdd	 Zd ddZd!ddZdd Zdd Z	dd Z
eddddZeddddZdd Zedd Z  ZS )"WSws://Tr   r   )sockr!   r"   c                   sp   d| _ t j|d || _| j | jjj | _| j | jjj | _|| _	t
| |  | _i | _|   d S )NFrM   )_closedr'   r(   r|   prefixparsednetlocZ_local_addr
_peer_addrr"   weakreffinalize_get_finalizer
_finalizer_extra_read_extra)r)   r|   r!   r"   r+   r-   r.   r(      s    zWS.__init__c                 C  s   t | }| j|fdd}|S )Nc                 S  s   | j std| |   d S )Nz Closing dangling websocket in %s)
close_coder8   infor@   )r|   rr-   r-   r.   r      s    z#WS._get_finalizer.<locals>.finalize)reprr|   )r)   r   r   r-   r-   r.   r      s    zWS._get_finalizerNc              
     s   z: j  I d H }|d kr(   t td|d }W n* tk
rd } zt|W 5 d }~X Y nX  fddt|D I d H }t| j	| j
dI d H }|S )NrN   r   c                   s   g | ]} j  I d H qS r#   )r|   read_messagerO   r6   r-   r.   rR      s     zWS.read.<locals>.<listcomp>rS   )r|   r   rq   r   rV   rW   r   rX   r   r!   r"   )r)   rT   rY   rn   rZ   r;   r-   r6   r.   r[      s     zWS.readc           	   
     s   t || j||| j| jd| jtdI d H }tdt|}d}zZ| j	j
|ddI d H  |D ]:}t|tk	rtt|}| j	j
|ddI d H  |t|7 }q\W n* tk
r } zt|W 5 d }~X Y nX |S r\   )r   r"   ra   rb   rc   rd   rV   re   rf   r|   rg   rh   ri   r   r   rk   r-   r-   r.   ro      s0    zWS.writec                   s&   | j js| j  | j   d| _d S r=   r|   r   r   detachr@   r}   r6   r-   r-   r.   r@   	  s    

zWS.closec                 C  s&   | j js| j  | j   d| _d S r=   r   r6   r-   r-   r.   rq     s    

zWS.abortc                 C  s   | j  p| j jp| jS r#   )r|   r   r}   r6   r-   r-   r.   r2     s    z	WS.closedrj   rB   c                 C  s   | j  | jjj S r#   r~   r|   r   r   r6   r-   r-   r.   rs     s    zWS.local_addressc                 C  s   | j  | jjj S r#   r   r6   r-   r-   r.   rw     s    zWS.peer_addressc                 C  s   d S r#   r-   r6   r-   r-   r.   r      s    zWS._read_extrac                 C  s   | j S r#   )r   r6   r-   r-   r.   
extra_info#  s    zWS.extra_info)TT)N)NN)rH   rI   rJ   r~   r(   r   r[   ro   r@   rq   r2   rK   rs   rw   r   r   rL   r-   r-   r+   r.   rz      s"     


rz   c                   @  s   e Zd ZdZdd ZdS )WSSwss://c                 C  s\   t |  | jjj}|d k	rX| jj| | d | jd \}}}t	
d| j||| d S )N)Zpeercertcipherr   z7TLS connection with %r: protocol=%s, cipher=%s, bits=%d)rz   r   r|   ry   socketr   updategetpeercertr   r8   r9   r   )r)   r|   r   protobitsr-   r-   r.   r   +  s    

zWSS._read_extraN)rH   rI   rJ   r~   r   r-   r-   r-   r.   r   (  s   r   c                   @  s~   e Zd ZdZdddddddd	d
Zdd ZeddddZdd Zdd Z	dd Z
eddddZeddddZdS )
WSListenerr{   TFrj   r   r   r   )addressr$   r!   r"   connection_argsc                 K  sd   | | js| j | }t|dd\| _| _|| _|| _|| _|| _d | _	d| _
| jf || _d S )Nr   )default_portT)
startswithr~   r   rv   portr$   r!   r"   r   bound_addressnew_comm_server_get_server_argsserver_args)r)   r   r$   r!   r"   r   r-   r-   r.   r(   =  s    zWSListener.__init__c                 K  s   i S r#   r-   r)   r   r-   r-   r.   r   Q  s    zWSListener._get_server_argsrB   c                 C  s   | j  | j d| j S )N:)r~   rv   r   r6   r-   r-   r.   r   T  s    zWSListener.addressc              
     s  dt | j| j| jt| dfg}zz| jjj| _| jj| jkrd| _td| j  | j
d}| jjr||d kr|td|rtd| j dt || j_| jjjd	| W n tk
r   td
 Y nX W 5 | jrtt	|f| j
| _| j| j X d S )N/)r$   r!   r"   r&   Fz Sharing the same server on port ssl_optionsz&No ssl context found for the Schedulerz:Dashboard and Scheduler are using the same server on port zU, defaulting to the Scheduler's ssl context. Your dashboard could become inaccessiblez.*z'No server available. Creating a new one)r   r$   r!   r"   r   refr   r	   r   Applicationr   serverlistenr   __self__Zhttp_serverr8   r9   rE   r   rU   warningswarnRuntimeWarningZhttp_applicationadd_handlersAttributeError)r)   Zroutesr   r-   r-   r.   startX  s:    zWSListener.startc                 C  s   | j   d S r#   )r   stopr6   r-   r-   r.   r   ~  s    zWSListener.stopc                 C  s$   | j dkrt| j| _ | j dd S )z@
        The listening address as a (host, port) tuple.
        N   )r   r   r   r6   r-   r-   r.   get_host_port  s    
zWSListener.get_host_portc                 C  s   | j t|    S r#   )r~   r   r   r6   r-   r-   r.   listen_address  s    zWSListener.listen_addressc                 C  s$   |   \}}t|}| jt|| S r#   )r   r   r~   r   )r)   rr   r   r-   r-   r.   contact_address  s    zWSListener.contact_addressN)TF)rH   rI   rJ   r~   r(   r   rK   r   r   r   r   r   r   r-   r-   r-   r.   r   :  s     &	r   c                   @  s   e Zd ZdZdd ZdS )WSSListenerr   c                 K  s   t |}d|iS )Nr   )r   )r)   r   ctxr-   r-   r.   r     s    zWSSListener._get_server_argsN)rH   rI   rJ   r~   r   r-   r-   r-   r.   r     s   r   c                   @  s&   e Zd ZdZeZdddZdd ZdS )	WSConnectorr{   Tc           	   
     s   | j f |}zJt| j | f|}t|tdI d H }|j rT|jjrTt|jjW n tk
r } zt	| | W 5 d }~X Y nb t
k
r } ztd|W 5 d }~X Y n8 tk
r } ztd|  d| |W 5 d }~X Y nX | j||dS )N)rG   zcTLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?)zin z: rM   )_get_connect_argsr   r~   r   rF   ry   r2   errorr
   r   r   r   r   r   
comm_class)	r)   r   r!   r   r*   r%   r|   rn   errr-   r-   r.   connect  s"    (zWSConnector.connectc                 K  s   | di S )Nextra_conn_argsrE   r   r-   r-   r.   r     s    zWSConnector._get_connect_argsN)T)rH   rI   rJ   r~   rz   r   r   r   r-   r-   r-   r.   r     s   
r   c                   @  s   e Zd ZdZeZdd ZdS )WSSConnectorr   c                 K  sD   d| di| di }| dr@| di d|d i|d< |S )Nr   ssl_contextr   server_hostnameheadersHostr   )r)   r   Zwss_argsr-   r-   r.   r     s     



zWSSConnector._get_connect_argsN)rH   rI   rJ   r~   r   r   r   r-   r-   r-   r.   r     s   r   c                   @  s   e Zd ZeZeZdS )	WSBackendN)rH   rI   rJ   r   _connector_classr   _listener_classr-   r-   r-   r.   r     s   r   c                   @  s   e Zd ZeZeZdS )
WSSBackendN)rH   rI   rJ   r   r   r   r   r-   r-   r-   r.   r     s   r   wswss)C
__future__r   r/   loggingrV   r   r   collections.abcr   sslr   typingr   Ztornador   Ztornado.httpclientr   r   Ztornado.httpserverr	   Ztornado.iostreamr
   Ztornado.websocketr   r   r   r   ZdaskZdistributed.comm.addressingr   r   Zdistributed.comm.corer   r   r   r   r   Zdistributed.comm.registryr   Zdistributed.comm.tcpr   r   r   Zdistributed.comm.utilsr   r   r   r   	getLoggerrH   r8   rF   utilsZparse_bytesconfigrE   rd   r   r3   rz   r   r   r   r   r   r   r   r-   r-   r-   r.   <module>   sH   

4Ql[
