U
    /eX                     @  s  d Z ddlmZ ddlZddlZddlZddlZddlZddlm	Z	m
Z
mZ ddlmZmZ ddlmZ ddlZddlmZ ddlmZmZ dd	lmZmZmZmZ dd
lmZmZ ddlm Z m!Z!m"Z"m#Z# ddl$m%Z%m&Z&m'Z' ddl(m)Z)m*Z*m+Z+m,Z,m-Z- e.e/Z0er2zddl1a1W n e2k
r.   Y nX nda1da3da4da5dZ6dddddZ7ddddddZ8dddddddZ9d.ddZ:dd  Z;d!d" Z<G d#d$ d$eZ=G d%d& d&eZ>G d'd( d(eZ?G d)d* d*eZ@e@ ed+< d,d- ZAdS )/z
:ref:`UCX`_ based communications for distributed.

See :ref:`communications` for more.

.. _UCX: https://github.com/openucx/ucx
    )annotationsN)	AwaitableCallable
Collection)TYPE_CHECKINGAny)patch)parse_bytes)parse_host_portunparse_host_port)CommCommClosedError	ConnectorListener)Backendbackends)ensure_concrete_hostfrom_frames
host_array	to_frames)CudaDeviceInfoget_device_index_and_uuidhas_cuda_context)	ensure_ipget_ipget_ipv6
log_errorsnbytesFzThis is often the result of a CUDA-enabled library calling a CUDA runtime function before Dask-CUDA can spawn worker processes. Please make sure any such function calls don't happen at import time or in the global scope of a program.r   str)device_inforeturnc                 C  s   | j  dt| j dS )Nz ())Zdevice_indexr   uuid)r    r#   8/tmp/pip-unpacked-wheel-g426oqom/distributed/comm/ucx.py_get_device_and_uuid_str@   s    r%   intNone)r   pidr    c                 C  s(   t | }td| d| dt  d S )NzA CUDA context for device z already exists on process ID . r%   loggerwarning_warning_suffix)r   r(   Zdevice_uuid_strr#   r#   r$   _warn_existing_cuda_contextD   s    r.   )device_info_expecteddevice_info_actualr(   r    c              
   C  s6   t | }t |}td| d| d| dt  d S )NzWorker with process ID z/ should have a CUDA context assigned to device z,, but instead the CUDA context is on device r)   r*   )r/   r0   r(   Zexpected_device_uuid_strZactual_device_uuid_strr#   r#   r$   _warn_cuda_context_wrong_deviceL   s
    r1   c                 C  s@   dd l }|j }|jjj| }|jj||d } |   d S Nr   )
numba.cudacudacurrent_contextZdriverZdrvapi	cu_streamZStreamZsynchronize)streamnumbactxr6   r#   r#   r$   synchronize_streamX   s
    
r:   c                    s  t d k	rd S t \} }tjd| d|dd}tjddksTd|krd|krzdd l W n tk
r|   td	Y nX t	tjd
d
dd }t atjrttjt   j  t atjrtjj|jkrt|tjt  dd l }|a ttj| t j| dd W 5 Q R X tjd}z:dd lfdda|d k	rdt|}jdd|d W nf tk
r   zdd l  fdd}|aW n tk
r   dd aY nX |d k	rtd Y nX d S )NZUCX_TLSTLS z(distributed.comm.ucx.create-cuda-contextTr4   z^cudar   z;CUDA support with UCX requires Numba for context managementZCUDA_VISIBLE_DEVICES0,)optionsZenv_takes_precedencezdistributed.rmm.pool-sizec                   s    j | dS )N)size)ZDeviceBuffern)rmmr#   r$   device_array   s    rD   F)Zpool_allocatorZmanaged_memoryZinitial_pool_sizec                   s&    j j| fdd}t| j j |S )Nu1)Zdtype)r4   rD   weakreffinalizer5   )rB   a)r8   r#   r$   numba_device_array   s    z%init_once.<locals>.numba_device_arrayc                 S  s   t dd S )Nz;In order to send/recv CUDA arrays, Numba or RMM is required)RuntimeErrorrA   r#   r#   r$   rD      s    zyInitial RMM pool size defined, but RMM is not available. Please consider installing RMM or removing the pool size option.)ucp_prepare_ucx_configosenvirongetdaskconfigr3   ImportErrorr   splitr   pre_existing_cuda_contextZhas_contextr.   r   getpidr4   r5   cuda_context_createdr"   r1   r   dictinitrC   rD   r	   Zreinitializer+   r,   )Z
ucx_configZucx_environmentZucx_tlsZcuda_visible_deviceZ_ucpZpool_size_strZ	pool_sizerI   r#   )r8   rC   r$   	init_oncea   s    
	
 
  
  

rY   c                 C  s   |  }|dk	rd|_ dS )zCallback to close Dask Comm when UCX Endpoint closes or errors

    Parameters
    ----------
        ref: weak reference to a Dask UCX comm
    NT)_closed)refZcommr#   r#   r$   _close_comm   s    r\   c                      s   e Zd ZdZd dddd fddZeddd	d
ZeddddZed!dddddddZ	ed"ddZ
dd Zdd Zedd Zdd Z  ZS )#UCXa  Comm object using UCP.

    Parameters
    ----------
    ep : ucp.Endpoint
        The UCP endpoint.
    address : str
        The address, prefixed with `ucx://` to use.
    deserialize : bool, default True
        Whether to deserialize data in :meth:`distributed.protocol.loads`

    Notes
    -----
    The read-write cycle uses the following pattern:

    Each msg is serialized into a number of "data" frames. We prepend these
    real frames with two additional frames

        1. is_gpus: Boolean indicator for whether the frame should be
           received into GPU memory. Packed in '?' format. Unpack with
           ``<n_frames>?`` format.
        2. frame_size : Unsigned int describing the size of frame (in bytes)
           to receive. Packed in 'Q' format, so a length-0 frame is equivalent
           to an unsized frame. Unpacked with ``<n_frames>Q``.

    The expected read cycle is

    1. Read the frame describing if connection is closing and number of frames
    2. Read the frame describing whether each data frame is gpu-bound
    3. Read the frame describing whether each data frame is sized
    4. Read all the data frames.
    Tr   bool
local_addr	peer_addrdeserializec                   s   t  j|d || _|r&|ds&t|ds4t|| _|| _d | _t| jdr~t	
| }| jtt| d| _d| _nd| _td|  d S )N)rb   ucxset_close_callbackFTzUCX.__init__ %s)super__init___ep
startswithAssertionError_local_addr
_peer_addrZ	comm_flaghasattrrF   r[   rd   	functoolspartialr\   rZ   _has_close_callbackr+   debug)selfepr`   ra   rb   r[   	__class__r#   r$   rf      s    
zUCX.__init__)r    c                 C  s   | j S N)rj   rq   r#   r#   r$   local_address  s    zUCX.local_addressc                 C  s   | j S ru   )rk   rv   r#   r#   r$   peer_address  s    zUCX.peer_addressNmessagerW   zCollection[str] | Noner&   )msgserializerson_errorr    c                   s*  |   rtd|d krd}t|||| jdI d H }t|}tdd |D }tdd |D }tdd t||D  \}}	z~| jt	
dd	|I d H  | jt	j
|d
 |d  f|| I d H  t|rtd |	D ]}
| j|
I d H  qt|W S  tjjk
r$   |   tdY nX d S )Nz,Endpoint is closed -- unable to send messager4   rP   pickleerror)r{   r|   allow_offloadc                 s  s   | ]}t |d V  qdS )Z__cuda_array_interface__N)rl   .0fr#   r#   r$   	<genexpr>-  s     zUCX.write.<locals>.<genexpr>c                 s  s   | ]}t |V  qd S ru   r   r   r#   r#   r$   r   .  s     c                 s  s&   | ]\}}t |d kr||fV  qdS r   Nr   r   is_cuda
each_framer#   r#   r$   r   0  s   ?QF?Qr   z(While writing, the connection was closed)closedr   r   r   lentupleziprr   sendstructpackanyr:   sumrK   
exceptionsUCXBaseExceptionabort)rq   rz   r{   r|   framesnframescuda_framessizesZcuda_send_framesZsend_framesr   r#   r#   r$   write  s>    

z	UCX.writer}   c              
     s  |d krd}zt td}| j|I d H  td|\}}|rJtd|d |d  }t t|}| j|I d H  t||}|d | ||d   }}W n8 tk
r }	 z|   td|	W 5 d }	~	X Y nX dd t	||D }
t	d	d
 t	||
D  \}}t
|rtd z"|D ]}| j|I d H  q$W n: tk
rz }	 z|   td|	W 5 d }	~	X Y nX zt|
| j|| jdI d H }W n& tk
r   |   tdY nX |S d S )Nr}   r   zConnection closed by writerr   r   z.Connection closed by writer.
Inner exception: c                 S  s$   g | ]\}}|rt |nt|qS r#   )rD   r   )r   r   Z	each_sizer#   r#   r$   
<listcomp>t  s   zUCX.read.<locals>.<listcomp>c                 s  s&   | ]\}}t |d kr||fV  qdS r   r   r   r#   r#   r$   r   y  s   zUCX.read.<locals>.<genexpr>r   )rb   deserializersr   z Aborted stream on truncated data)r   r   calcsizerr   recvunpackr   BaseExceptionr   r   r   r:   r   rb   r   EOFError)rq   r   rz   shutdownr   Z
header_fmtheaderr   r   er   Zcuda_recv_framesZrecv_framesr   r#   r#   r$   readS  s^    

zUCX.readc                   sz   d| _ | jd k	rvz | jtdddI d H  W n6 tjjtjj	tjj
fttjddf k
rf   Y nX |   d | _d S )NTr   r   ZUCXConnectionResetr#   )rZ   rg   rr   r   r   r   rK   r   ZUCXErrorZUCXCloseErrorZUCXCanceledgetattrr   rv   r#   r#   r$   close  s    
 z	UCX.closec                 C  s$   d| _ | jd k	r | j  d | _d S NT)rZ   rg   r   rv   r#   r#   r$   r     s    

z	UCX.abortc                 C  s   | j d k	r| j S tdd S )NzUCX Endpoint is closed)rg   r   rv   r#   r#   r$   rr     s    
zUCX.epc                 C  s   | j dkr| jS | jd kS d S r   )ro   rZ   rg   rv   r#   r#   r$   r     s    
z
UCX.closed)T)Nry   )r}   )__name__
__module____qualname____doc__rf   propertyrw   rx   r   r   r   r   r   rr   r   __classcell__r#   r#   rs   r$   r]      s$   "   9J
r]   c                   @  s.   e Zd ZdZeZdZddddddd	d
ZdS )UCXConnectorucx://FTr   r^   r   r]   )addressrb   connection_argsr    c                   sn   t d| t|\}}t  zt||I d H }W n  tjjk
rT   tdY nX | j	|d| j
| |dS )NzUCXConnector.connect: %s,Connection closed before handshake completedr<   r_   )r+   rp   r
   rY   rK   Zcreate_endpointr   r   r   
comm_classprefix)rq   r   rb   r   ipportrr   r#   r#   r$   connect  s    zUCXConnector.connectN)T)r   r   r   r   r]   r   	encryptedr   r#   r#   r#   r$   r     s
    r   c                   @  s   e Zd ZejZejZejZdddddddd	d
Zedd Z	edd Z
dd Zdd Zdd Zedd Zedd Zedd ZdS )UCXListenerNFTr   z'Callable[[UCX], Awaitable[None]] | Noner^   r   )r   comm_handlerrb   r   r   c                 K  sN   | dsd| }t|dd\| _| _|| _|| _|| _d | _d | _|| _	d S )Nrc   r   r   )default_port)
rh   r
   r   _input_portr   rb   r   rg   
ucp_serverr   )rq   r   r   rb   r   r   r#   r#   r$   rf     s    
zUCXListener.__init__c                 C  s   | j jS ru   )r   r   rv   r#   r#   r$   r     s    zUCXListener.portc                 C  s   d| j  d t| j S )Nr   :)r   r   r   rv   r#   r#   r$   r     s    zUCXListener.addressc                   s(    fdd}t   tj| jd _d S )Nc                   sn   t |  j j jd} j|_z |I d H  W n  tk
rR   td Y d S X  jrj |I d H  d S )Nr_   r   )	r]   r   rb   r   Zon_connectionr   r+   rp   r   )Z	client_eprc   rv   r#   r$   serve_forever  s    
z(UCXListener.start.<locals>.serve_forever)r   )rY   rK   Zcreate_listenerr   r   )rq   r   r#   rv   r$   start  s    zUCXListener.startc                 C  s
   d | _ d S ru   )r   rv   r#   r#   r$   stop  s    zUCXListener.stopc                 C  s   | j | jfS ru   )r   r   rv   r#   r#   r$   get_host_port  s    zUCXListener.get_host_portc                 C  s   | j t|    S ru   )r   r   r   rv   r#   r#   r$   listen_address  s    zUCXListener.listen_addressc                 C  s$   |   \}}t|}| jt|| S ru   )r   r   r   r   )rq   hostr   r#   r#   r$   contact_address  s    zUCXListener.contact_addressc                 C  s   |   S ru   )r   rv   r#   r#   r$   bound_address!  s    zUCXListener.bound_address)NFT)r   r   r   r   r   r   r   rf   r   r   r   r   r   r   r   r   r   r#   r#   r#   r$   r     s(      



r   c                   @  s<   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd ZdS )
UCXBackendc                 C  s   t  S ru   )r   rv   r#   r#   r$   get_connector+  s    zUCXBackend.get_connectorc                 K  s   t |||f|S ru   )r   )rq   locZhandle_commrb   r   r#   r#   r$   get_listener.  s    zUCXBackend.get_listenerc                 C  s   t |d S r2   r
   rq   r   r#   r#   r$   get_address_host4  s    zUCXBackend.get_address_hostc                 C  s   t |S ru   r   r   r#   r#   r$   get_address_host_port7  s    z UCXBackend.get_address_host_portc                 C  s   t |\}}tt||S ru   )r
   r   r   )rq   r   r   r   r#   r#   r$   resolve_address:  s    zUCXBackend.resolve_addressc                 C  s8   t |\}}t|}d|kr&t|}nt|}t|d S )Nr   )r
   r   r   r   r   )rq   r   r   r   Z
local_hostr#   r#   r$   get_local_address_for>  s    
z UCXBackend.get_local_address_forN)	r   r   r   r   r   r   r   r   r   r#   r#   r#   r$   r   (  s   r   rc   c                  C  sn  i } t tjdtjdtjdgrtjdr@d}d}nd}d}t tjdtjdgrl|d }tjdrd	| }tjdr|d
 }||d} i }tjdi  D ]\}}dttjd|	d}|dd  }| krt
d| d| d|d| d| |  d q|tjkr\t
d| d| d|d| dtj|  d q|||< q| |fS )a  Translate dask config options to appropriate UCX config options

    Returns
    -------
    tuple
        Options suitable for passing to ``ucp.init`` and additional
        UCX options that will be inserted directly into the environment
        while calling ``ucp.init``.
    zdistributed.comm.ucx.tcpzdistributed.comm.ucx.nvlinkzdistributed.comm.ucx.infinibandzdistributed.comm.ucx.rdmacmZtcpZrdmacmzdistributed.comm.ucx.cuda-copyz
,cuda_copyzrc,z	,cuda_ipc)r;   ZSOCKADDR_TLS_PRIORITYz distributed.comm.ucx.environment_r]   -   Nz	Ignoring =z (key=z!) in ucx.environment, preferring z from high level optionsz from external environment)r]   )r   rP   rQ   rO   itemsjoinmapr   upperrS   r+   r,   rM   rN   info)Zhigh_level_optionsZtlsZtls_priorityZenvironment_optionskvkeyZhl_keyr#   r#   r$   rL   K  sH    





&(
rL   )r   )Br   
__future__r   rm   loggingrM   r   rF   collections.abcr   r   r   typingr   r   Zunittest.mockr   rP   Z
dask.utilsr	   Zdistributed.comm.addressingr
   r   Zdistributed.comm.corer   r   r   r   Zdistributed.comm.registryr   r   Zdistributed.comm.utilsr   r   r   r   Zdistributed.diagnostics.nvmlr   r   r   Zdistributed.utilsr   r   r   r   r   	getLoggerr   r+   rK   rR   rD   rT   rV   r-   r%   r.   r1   r:   rY   r\   r]   r   r   r   rL   r#   r#   r#   r$   <module>   sT   

	j pK 
