U
    /e_                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZmZ d dlmZmZ d dlmZ d dlmZmZ d dlmZmZ d dlmZ d d	lmZ d dlZd d
lmZ d dl m!Z!m"Z" d dl#m$Z$m%Z%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z,m-Z-m.Z.m/Z/m0Z0 d dl1m2Z2m3Z3 d dl4m5Z5 d dl6m7Z7m8Z8m9Z9m:Z:m;Z; e<e=Z>e
j?dk rde@ejA d d ZBnde@ejC d ZBe5d ZDdd ZEdd ZFdd ZGdd ZHG dd de$ZIG d d! d!eIZJd"d# ZKG d$d% d%ZLejMejNB ZOejPd&d'd(ZQG d)d* d*ejRZSG d+d, d,e&eLZTG d-d. d.eTZUG d/d0 d0eTZVG d1d2 d2e(eLZWG d3d4 d4eWZXG d5d6 d6eWZYG d7d8 d8e*ZZG d9d: d:eZZ[G d;d< d<eZZ\dS )=    )annotationsN)SSLCertVerificationErrorSSLError)AnyClassVar)sliding_window)gennetutil)IOStreamStreamClosedError)	TCPClient)	TCPServer)parse_timedelta)parse_host_portunparse_host_port)CommCommClosedError	ConnectorFatalCommClosedErrorListener)Backend)ensure_concrete_hostfrom_framesget_tcp_server_address
host_array	to_frames)pack_frames_preludeunpack_frames)MEMORY_LIMIT)	ensure_ipensure_memoryviewget_ipget_ipv6nbytes)   
            c           
      C  s  |   rdS tjd}tt|dd}| j}d}||d ksFtdtd|d	 }td|| | }|||  }|d
ks~tzt	j
drtd|| |tjd|d |d f n|tjtjd ztj}tj}tj}W n4 tk
r   t	j
dkrd}d}d}nd}Y nX |dk	rbtd||| |tj|| |tj|| |tj|| t	j
drtd|d  d}	|tj|	|d  W n  tk
r   td Y nX dS )z5
    Set kernel-level TCP timeout on the stream.
    Nzdistributed.comm.timeouts.tcpseconds)defaultr%   r(   zTimeout too lowr'      r   winz+Setting TCP keepalive: idle=%d, interval=%di  darwin   i  i  z7Setting TCP keepalive: nprobes=%d, idle=%d, interval=%dlinuxzSetting TCP user timeout: %d ms   z$Could not set timeout on TCP stream.)closeddaskconfiggetintr   socketAssertionErrormaxsysplatform
startswithloggerdebugZioctlZSIO_KEEPALIVE_VALS
setsockopt
SOL_SOCKETSO_KEEPALIVETCP_KEEPIDLETCP_KEEPINTVLTCP_KEEPCNTAttributeErrorSOL_TCPOSError	exception)
commtimeoutsockZnprobesZidleintervalrA   rB   rC   TCP_USER_TIMEOUT rM   8/tmp/pip-unpacked-wheel-g426oqom/distributed/comm/tcp.pyset_tcp_timeout;   sT    


rO   c                 C  s@   |   rdS zt| j dd  W S  tk
r:   Y dS X dS )z'
    Get a stream's local address.
    z<closed>Nr'   )r1   r   r6   getsocknamerF   )rH   rM   rM   rN   get_stream_addressv   s    rQ   c                 C  s   |j dk	rl|j }t|tjrJ|jrJd|jkrJtd|  d|jj d| td|  d|jj d| |ntd|  d| |dS )z8
    Re-raise StreamClosedError as CommClosedError.
    N
UNKNOWN_CAzin z: )	
real_error
isinstancesslr   reasonr   	__class____name__r   )objexcrM   rM   rN   convert_stream_closed_error   s    
"r[   c                 C  s   |  }|rd|_ dS )zCallback to close Dask Comm when Tornado Stream closes

    Parameters
    ----------
        ref: weak reference to a Dask comm
    TN_closed)refrH   rM   rM   rN   _close_comm   s    r_   c                      s   e Zd ZU dZejejdZ	de
d< de
d< d&dd	d	d
d fddZdd Zdd Zed	dddZed	dddZd'ddZd(ddZejdd Zddd d!Zd
dd"d#Zed$d% Z  ZS ))TCPzO
    An established communication based on an underlying Tornado IOStream.
    zdistributed.comm.shardzClassVar[int]max_shard_sizezIOStream | NonestreamTr
   strbool)rb   
local_addr	peer_addrdeserializec                   s   d| _ t j|d || _|| _|| _t| |  | _	d| j	_
i | _t| }|tt| |d t| |   d S )NF)rg   T)r]   super__init___local_addr
_peer_addrrb   weakreffinalize_get_finalizer
_finalizeratexit_extrar^   Zset_close_callback	functoolspartialr_   Zset_nodelayrO   _read_extra)selfrb   re   rf   rg   r^   rW   rM   rN   ri      s    

zTCP.__init__c                 C  s   d S NrM   ru   rM   rM   rN   rt      s    zTCP._read_extrac                 C  s   t | }| j|fdd}|S )Nc                 S  s,   | d k	r(|   s(td|  |   d S )NzClosing dangling stream in )r1   r<   warningclose)rb   rrM   rM   rN   rm      s    z$TCP._get_finalizer.<locals>.finalize)reprrb   )ru   r{   rm   rM   rM   rN   rn      s    zTCP._get_finalizer)returnc                 C  s   | j S rw   )rj   rx   rM   rM   rN   local_address   s    zTCP.local_addressc                 C  s   | j S rw   )rk   rx   rM   rM   rN   peer_address   s    zTCP.peer_addressNc              
     sV  | j }|d krt d}t|}z||I d H }t||\}t|}tdtd|t	 t	D ]>\}}||| }	|	j
}
||	I d H }||
ks`t||
fq`W n\ tk
r } z"d | _ d| _t st| | W 5 d }~X Y np tk
r   |    Y nTX z&t|}t|| j|| jdI d H }W n& tk
rL   |   tdY nX |S d S )NQr'   r   T)rg   deserializersallow_offloadz aborted stream on truncated data)rb   r   structcalcsize
read_bytesunpackr   r   rangeOPENSSL_MAX_CHUNKSIZEr#   Z	read_intor7   r   r]   r9   is_finalizingr[   BaseExceptionabortr   r   rg   r   EOFError)ru   r   rb   fmtZfmt_sizeframes_nbytesframesijchunkchunk_nbytesnemsgrM   rM   rN   read   sJ    
zTCP.readmessagec              
     s  | j }|d krt t|| j||| j| jd| j| jdI d H }dd |D }t|}t	|}t
dt|| | }|f|}t|f|}||d 7 }|dk rd|g}|g}zt||D ]r\}	}
|	rt|
}
td	td|	t tD ]F\}}|
|| }|j}|jd krt |j| | j|7  _qq|d W nb tk
r } z$d | _ d
| _t svt| | W 5 d }~X Y n  tk
r   |    Y nX |S )N)ZsenderZ	recipient)r   serializerson_errorcontextZframe_split_sizec                 S  s   g | ]}t |qS rM   )r#   ).0frM   rM   rN   
<listcomp>  s     zTCP.write.<locals>.<listcomp>r   r   i       r'   T)rb   r   r   r   Z
local_infoZremote_infoZhandshake_optionsra   sumr   r   packr#   joinzipr    r   r   r   Z_write_bufferr   appendZ_total_write_indexwriter]   r9   r   r[   r   r   )ru   r   r   r   rb   r   r   Zframes_nbytes_totalheaderZeach_frame_nbytesZ
each_framer   r   r   r   r   rM   rM   rN   r   
  sj    

z	TCP.writec                 c  s~   | j d  }| _ d| _|d k	rz| szz@z&| r<|dV  |j	tj
 W n tk
r`   Y nX W 5 | j  |  X d S )NTr   )rb   r]   r1   ro   detachrz   writingr   r6   shutdown	SHUT_RDWRrF   ru   rb   rM   rM   rN   rz   V  s    

z	TCP.closeNonec                 C  s:   | j d  }| _ d| _|d k	r6| s6| j  |  d S )NT)rb   r]   r1   ro   r   rz   r   rM   rM   rN   r   i  s
    
z	TCP.abortc                 C  s   | j S rw   r\   rx   rM   rM   rN   r1   p  s    z
TCP.closedc                 C  s   | j S rw   )rq   rx   rM   rM   rN   
extra_infos  s    zTCP.extra_info)T)N)Nr   )rX   
__module____qualname____doc__r2   utilsZparse_bytesr3   r4   ra   __annotations__ri   rt   rn   propertyr~   r   r   r   r   	coroutinerz   r   r1   r   __classcell__rM   rM   rv   rN   r`      s*   

 
2
L
r`   c                   @  s$   e Zd ZdZeeejZdd ZdS )TLSz(
    A TLS-specific version of TCP.
    c                 C  sZ   t |  | jj}|d k	rV| jj| | d | jd \}}}t	d| j
||| d S )N)Zpeercertcipherr   z7TLS connection with %r: protocol=%s, cipher=%s, bits=%d)r`   rt   rb   r6   rq   updategetpeercertr   r<   r=   rk   )ru   rJ   r   protobitsrM   rM   rN   rt     s    
zTLS._read_extraN)	rX   r   r   r   minr   r`   ra   rt   rM   rM   rM   rN   r   x  s   r   c                 C  s(   |  d}t|tjs$td||S )Nssl_contextzpTLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?) Instead got )r4   rT   rU   
SSLContext	TypeError)connection_argsctxrM   rM   rN   _expect_tls_context  s    
r   c                   @  s   e Zd Zdd ZdS )RequireEncryptionMixinc                 C  s(   | j s$|dr$td| j| f d S )NZrequire_encryptionzLencryption required by Dask configuration, refusing communication from/to %r)	encryptedr4   RuntimeErrorprefix)ru   addressr   rM   rM   rN   _check_encryption  s    
z(RequireEncryptionMixin._check_encryptionN)rX   r   r   r   rM   rM   rM   rN   r     s   r   )typec             
     sr   t jdkrTztj| |||tdW S  tjk
rR } z|jtjkrB W 5 d }~X Y nX t	 j| ||tj
dI d H S )N)r$   	   )familyr   flagsr   r   )r9   version_infor6   getaddrinfo_NUMERIC_ONLYgaierrorerrno
EAI_NONAMEasyncioZget_running_loopSOCK_STREAM)hostportr   r   r   rM   rM   rN   _getaddrinfo  s$    
   r   c                   @  s*   e Zd ZdZejfdddddddZd	S )
_DefaultLoopResolvera  
    Resolver implementation using `asyncio.loop.getaddrinfo`.
    backport from Tornado 6.2+
    https://github.com/tornadoweb/tornado/blob/3de78b7a15ba7134917a18b0755ea24d7f8fde94/tornado/netutil.py#L416-L432

    With an additional optimization based on
    https://github.com/python-trio/trio/blob/4edfd41bd5519a2e626e87f6c6ca9fb32b90a6f4/trio/_socket.py#L125-L192
    (Copyright Contributors to the Trio project.)

    And proposed to cpython in https://github.com/python/cpython/pull/31497/
    rc   r5   zsocket.AddressFamilyzlist[tuple[int, Any]])r   r   r   r}   c                   s"   dd t |||tjdI d H D S )Nc                 S  s   g | ]\}}}}}||fqS rM   rM   )r   Zfam_r   rM   rM   rN   r     s   z0_DefaultLoopResolver.resolve.<locals>.<listcomp>r   )r   r6   r   )ru   r   r   r   rM   rM   rN   resolve  s       
z_DefaultLoopResolver.resolveN)rX   r   r   r   r6   	AF_UNSPECr   rM   rM   rM   rN   r     s   r   c                   @  s,   e Zd ZU ee dZded< dddZdS )	BaseTCPConnector)resolverzClassVar[TCPClient]clientTc              
     s>  |  || t|\}}| jf |}zpd|krZ| jj||tdI d H }|jd|I d H }n | jj||fdti|I d H }| r|jrt	|jW n t	k
r } zt
| | W 5 d }~X Y nV tk
r }	 ztd|	W 5 d }	~	X Y n, tk
r }	 zt |	W 5 d }	~	X Y nX | jt| }
| ||
| j| |}|S )Nserver_hostname)max_buffer_sizeFr   zzTLS certificate does not match. Check your security settings. More info at https://distributed.dask.org/en/latest/tls.html)F)r   r   _get_connect_argsr   connectMAX_BUFFER_SIZEZ	start_tlsr1   errorr   r[   r   r   r   r   rQ   
comm_class)ru   r   rg   r   ipr   kwargsrb   r   errr~   rH   rM   rM   rN   r     sN          zBaseTCPConnector.connectN)T)rX   r   r   r   r   r   r   r   rM   rM   rM   rN   r     s   
r   c                   @  s    e Zd ZdZeZdZdd ZdS )TCPConnectortcp://Fc                 K  s   i S rw   rM   ru   r   rM   rM   rN   r     s    zTCPConnector._get_connect_argsN)rX   r   r   r   r`   r   r   r   rM   rM   rM   rN   r     s   r   c                   @  s    e Zd ZdZeZdZdd ZdS )TLSConnectortls://Tc                 K  s&   dt |i}|dr"|d |d< |S )Nssl_optionsr   )r   r4   )ru   r   Ztls_argsrM   rM   rN   r     s    
zTLSConnector._get_connect_argsN)rX   r   r   r   r   r   r   r   rM   rM   rM   rN   r     s   r   c                   @  sV   e Zd ZdddZdd Zdd	 Zd
d Zdd Zdd Ze	dd Z
e	dd ZdS )BaseTCPListenerTNr   c                 K  sT   |  || t||\| _| _|| _|| _|| _|| _| jf || _	d | _
d | _d S rw   )r   r   r   r   default_hostcomm_handlerrg   r   _get_server_argsserver_args
tcp_serverbound_address)ru   r   r   rg   r   r   default_portr   rM   rM   rN   ri   &  s    
zBaseTCPListener.__init__c                   s   t f dti| j| _| j| j_ttj	d}t
dD ]l}ztj| j| j|d}W n> tk
r } z | jdks||jtjkr~ |}W 5 d }~X Y q8X | j|  qq8||   d S )Nr   zdistributed.comm.socket-backlog   )r   backlogr   )r   r   r   r   _handle_streamZhandle_streamr5   r2   r3   r4   r   r	   Zbind_socketsr   r   rF   r   Z
EADDRINUSEZadd_socketsget_host_port)ru   r   r   Zsocketsr   rZ   rM   rM   rN   start:  s$    
  
zBaseTCPListener.startc                 C  s"   | j d  }| _ |d k	r|  d S rw   )r   stop)ru   r   rM   rM   rN   r   T  s    zBaseTCPListener.stopc                 C  s   | j d krtdd S )Nz,invalid operation on non-started TCPListener)r   
ValueErrorrx   rM   rM   rN   _check_startedY  s    
zBaseTCPListener._check_startedc                   s   | j t|d d   }| ||I d H }|d kr4d S td|| j | j t| }| |||| j}| j	|_	z| 
|I d H  W n" tk
r   td| Y d S X | |I d H  d S )Nr'   z!Incoming connection from %r to %rz4Connection from %s closed before handshake completed)r   r   _prepare_streamr<   r=   contact_addressrQ   r   rg   r   Zon_connectionr   infor   )ru   rb   r   r~   rH   rM   rM   rN   r   ]  s    zBaseTCPListener._handle_streamc                 C  s,   |    | jdkrt| j| _| jdd S )z@
        The listening address as a (host, port) tuple.
        Nr'   )r   r   r   r   rx   rM   rM   rN   r   p  s    
zBaseTCPListener.get_host_portc                 C  s   | j t|    S )z4
        The listening address as a string.
        )r   r   r   rx   rM   rM   rN   listen_address{  s    zBaseTCPListener.listen_addressc                 C  s*   |   \}}t|| jd}| jt|| S )z2
        The contact address as a string.
        )r   )r   r   r   r   r   )ru   r   r   rM   rM   rN   r    s    zBaseTCPListener.contact_address)TTNr   )rX   r   r   ri   r   r   r   r   r   r   r  r  rM   rM   rM   rN   r   %  s       

r   c                   @  s(   e Zd ZdZeZdZdd Zdd ZdS )TCPListenerr   Fc                 K  s   i S rw   rM   r   rM   rM   rN   r     s    zTCPListener._get_server_argsc                   s   |S rw   rM   )ru   rb   r   rM   rM   rN   r     s    zTCPListener._prepare_streamN)	rX   r   r   r   r`   r   r   r   r   rM   rM   rM   rN   r    s
   r  c                   @  s(   e Zd ZdZeZdZdd Zdd ZdS )TLSListenerr   Tc                 K  s   t |}d|iS )Nr   )r   )ru   r   r   rM   rM   rN   r     s    zTLSListener._get_server_argsc                   s\   z|  I d H  W n@ tk
rR } z"td| j|t|dd p>| W 5 d }~X Y nX |S d S )Nz7Listener on %r: TLS handshake failed with remote %r: %srS   )Zwait_for_handshakerF   r<   ry   r  getattr)ru   rb   r   r   rM   rM   rN   r     s    zTLSListener._prepare_streamN)	rX   r   r   r   r   r   r   r   r   rM   rM   rM   rN   r    s
   r  c                   @  s<   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd ZdS )BaseTCPBackendc                 C  s   |   S rw   )_connector_classrx   rM   rM   rN   get_connector  s    zBaseTCPBackend.get_connectorc                 K  s   | j |||f|S rw   )_listener_class)ru   locZhandle_commrg   r   rM   rM   rN   get_listener  s    zBaseTCPBackend.get_listenerc                 C  s   t |d S )Nr   r   ru   r  rM   rM   rN   get_address_host  s    zBaseTCPBackend.get_address_hostc                 C  s   t |S rw   r  r  rM   rM   rN   get_address_host_port  s    z$BaseTCPBackend.get_address_host_portc                 C  s   t |\}}tt||S rw   )r   r   r   )ru   r  r   r   rM   rM   rN   resolve_address  s    zBaseTCPBackend.resolve_addressc                 C  s8   t |\}}t|}d|kr&t|}nt|}t|d S )N:)r   r   r"   r!   r   )ru   r  r   r   Z
local_hostrM   rM   rN   get_local_address_for  s    
z$BaseTCPBackend.get_local_address_forN)	rX   r   r   r	  r  r  r  r  r  rM   rM   rM   rN   r    s   r  c                   @  s   e Zd ZeZeZdS )
TCPBackendN)rX   r   r   r   r  r  r
  rM   rM   rM   rN   r    s   r  c                   @  s   e Zd ZeZeZdS )
TLSBackendN)rX   r   r   r   r  r  r
  rM   rM   rM   rN   r    s   r  )]
__future__r   r   ctypesr   rr   loggingr6   rU   r   r9   rl   r   r   typingr   r   Ztlzr   Ztornador   r	   Ztornado.iostreamr
   r   Ztornado.tcpclientr   Ztornado.tcpserverr   r2   Z
dask.utilsr   Zdistributed.comm.addressingr   r   Zdistributed.comm.corer   r   r   r   r   Zdistributed.comm.registryr   Zdistributed.comm.utilsr   r   r   r   r   Zdistributed.protocol.utilsr   r   Zdistributed.systemr   Zdistributed.utilsr   r    r!   r"   r#   	getLoggerrX   r<   r   sizeofc_intr   c_size_tr   rO   rQ   r[   r_   r`   r   r   r   AI_NUMERICHOSTAI_NUMERICSERVr   r   r   Resolverr   r   r   r   r   r  r  r  r  r  rM   rM   rM   rN   <module>   sh   
; Z
.	g 