U
    /eT                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZmZ d dlmZ d dlmZmZmZmZ d dlZd dlmZ d dlmZ d d	lmZ d d
lmZmZmZmZm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z*m+Z+ er&d dl,Z-d dl.Z/d dl0m1Z1 edZ2e3e4Z5G dd de6Z7G dd dZ8G dd dZ9dddddddZ:dddd d!d"Z;dS )#    )annotationsN)defaultdict)CallableIterator)ThreadPoolExecutor)TYPE_CHECKINGAnyTypeVaroverload)parse_bytes)PooledRPCCall)to_serialize)deserialize_schemadump_shardslist_of_buffers_to_tableload_partitionserialize_table)CommShardsBuffer)DiskShardsBuffer)ResourceLimiter)	ShuffleId)
log_errorssync)WorkerTc                   @  s   e Zd ZdS )ShuffleClosedErrorN)__name__
__module____qualname__ r   r   I/tmp/pip-unpacked-wheel-g426oqom/distributed/shuffle/_worker_extension.pyr   ,   s   r   c                   @  s|  e Zd ZdZdddddddddd	d
d
dddZddddZejdddddZddddZ	ddddddZ
ddddd d!Zd"dd#d$Zddd%d&d'Zddd%d(d)Zdd*d%d+d,Zd*dd%d-d.Zddd/d0Zd1dd%d2d3Zd4dd%d5d6Zdd1d7d8d9Zd:d;d<d=d>Zddd?d@ZdddAdBZdCddDdEZdddFdGZdddHdIZdJddKdLdMZdNS )OShuffleao  State for a single active shuffle

    This object is responsible for splitting, sending, receiving and combining
    data shards.

    It is entirely agnostic to the distributed system and can perform a shuffle
    with other `Shuffle` instances using `rpc` and `broadcast`.

    The user of this needs to guarantee that only `Shuffle`s of the same unique
    `ShuffleID` interact.

    Parameters
    ----------
    worker_for:
        A mapping partition_id -> worker_address.
    output_workers:
        A set of all participating worker (addresses).
    column:
        The data column we split the input partition by.
    schema:
        The schema of the payload data.
    id:
        A unique `ShuffleID` this belongs to.
    local_address:
        The local address this Shuffle can be contacted by using `rpc`.
    directory:
        The scratch directory to buffer data in.
    nthreads:
        How many background threads to use for compute.
    loop:
        The event loop.
    rpc:
        A callable returning a PooledRPCCall to contact other Shuffle instances.
        Typically a ConnectionPool.
    broadcast:
        A function that ensures a RPC is evaluated on all `Shuffle` instances of
        a given `ShuffleID`.
    memory_limiter_disk:
    memory_limiter_comm:
        A ``ResourceLimiter`` limiting the total amount of memory used in either
        buffer.
    zdict[int, str]setstrz	pa.Schemar   intzCallable[[str], PooledRPCCall]r   r   )
worker_foroutput_workerscolumnschemaidlocal_address	directorynthreadsrpc	broadcastmemory_limiter_diskmemory_limiter_commsc                 C  s   dd l }|
| _|	| _|| _|| _|| _|| _t|| _t	t
}|| _| D ]\}}|| | qLt|| _|j|ddd| _d| _ttt||d| _t| j|d| _t	t| _t| j|d| _d| _ d| _!t"" | _#d | _$t%& | _'d S )	Nr   Z_workers)namecategoryF)dumploadr+   memory_limiter)sendr5   r   )(pandasr.   r-   r'   r)   r(   r&   r   executorr   listr*   itemsappenddictpartitions_ofZSeriesZastyper%   closedr   r   r   _disk_bufferr   r6   _comm_bufferfloatdiagnosticslengetoutput_partitions_lefttransferredtotal_recvdtime
start_time
_exceptionasyncioEvent_closed_event)selfr%   r&   r'   r(   r)   r*   r+   r,   r-   r.   r/   r0   pdr=   partaddrr   r   r    __init__\   s@    

 

zShuffle.__init__returnc                 C  s   d| j  d| j dS )Nz<Shuffle id:  on >)r)   r*   rN   r   r   r    __repr__   s    zShuffle.__repr__zIterator[None])r1   rT   c                 c  s0   t   }d V  t   }| j|  || 7  < d S N)rH   rB   )rN   r1   startstopr   r   r    rH      s    zShuffle.timeNonec                   s&   |    | jd| jddI d H  d S )Nshuffle_inputs_done)op
shuffle_id)msg)raise_if_closedr.   r)   rW   r   r   r    barrier   s    zShuffle.barrierlist[bytes])addressshardsrT   c                   s(   |    | |jt|| jdI d H S )N)datar_   )ra   r-   shuffle_receiver   r)   )rN   rd   re   r   r   r    r6      s
    
zShuffle.sendzCallable[..., T]r   r   )funcargsrT   c              
     sH   |    | d, t j| j|f| I d H W  5 Q R  S Q R X d S )Ncpu)ra   rH   rK   Zget_running_loopZrun_in_executorr8   )rN   rh   ri   r   r   r    offload   s    zShuffle.offloadzdict[str, Any]c                 C  s,   | j  }| j|d< | j || j| jdS )Nread)ZdiskZcommrB   rZ   )r@   	heartbeatrG   r?   rB   rI   )rN   Zcomm_heartbeatr   r   r    rm      s    

zShuffle.heartbeat)rf   rT   c                   s   |  |I d H  d S rY   )_receiverN   rf   r   r   r    receive   s    zShuffle.receivec              
     sx   |    z@|  jttt|7  _| | j|I d H }| |I d H  W n* tk
rr } z|| _	 W 5 d }~X Y nX d S rY   )
ra   rG   summaprC   rk   _repartition_buffers_write_to_disk	ExceptionrJ   )rN   rf   groupser   r   r    rn      s    zShuffle._receivezdict[str, list[pa.Table]]c                 C  sF   t |}t|| j}t|ttt| ks2t~dd | D S )Nc                 S  s   i | ]\}}||gqS r   r   ).0kvr   r   r    
<dictcomp>   s      z0Shuffle._repartition_buffers.<locals>.<dictcomp>)	r   split_by_partitionr'   rC   rq   rr   valuesAssertionErrorr:   )rN   rf   tablerv   r   r   r    rs      s
    zShuffle._repartition_buffersc                   s   |    | j|I d H  d S rY   )ra   r?   writero   r   r   r    rt      s    zShuffle._write_to_diskc                 C  s.   | j r*| jr| jtd| j d| j d S )NzShuffle z has been closed on )r>   rJ   r   r)   r*   rW   r   r   r    ra      s    zShuffle.raise_if_closedpd.DataFramec                   sT      jrtd dd fdd}|I d H }|I d H  d S )Nz&Cannot add more partitions to shuffle dict[str, list[bytes]]rS   c                    s&   t  jj} dd |  D } | S )Nc                 S  s   i | ]\}}|t |gqS r   )r   )rx   ry   tr   r   r    r{      s      z4Shuffle.add_partition.<locals>._.<locals>.<dictcomp>)split_by_workerr'   r%   r:   )outrf   rN   r   r    _   s    z Shuffle.add_partition.<locals>._)ra   rF   RuntimeErrorrk   _write_to_comm)rN   rf   r   r   r   r   r    add_partition   s    	zShuffle.add_partitionr   c                   s   |    | j|I d H  d S rY   )ra   r@   r   ro   r   r   r    r      s    zShuffle._write_to_comm)irT   c              	     s   |    | jstd| j| | jksJtd| d| j|  d| j d| jdksltd| d| j d	|  I d H  z,| |}| d
 |	 }W 5 Q R X W n" t
k
r   | j 	 }Y nX |  jd8  _|S )Nz1`get_output_partition` called before barrier taskzOutput partition z belongs on z, not z. r   z5No outputs remaining, but requested output partition rU   .rj      )ra   rF   r~   r%   r*   rE   flush_receive_read_from_diskrH   	to_pandasKeyErrorr(   Zempty_table)rN   r   dfr   r   r   r    get_output_partition   s$    
zShuffle.get_output_partitionz	int | strpa.Table)r)   rT   c                 C  s   |    | j|S rY   )ra   r?   rl   )rN   r)   r   r   r    r   
  s    zShuffle._read_from_diskc              
     sh   |    | jrtdd| _|  I d H  z| j  W n* tk
rb } z|| _ W 5 d }~X Y nX d S )Nz#`inputs_done` called multiple timesT)ra   rF   r~   _flush_commr@   Zraise_on_exceptionru   rJ   )rN   rw   r   r   r    inputs_done  s    zShuffle.inputs_donec                   s   |    | j I d H  d S rY   )ra   r@   flushrW   r   r   r    r     s    zShuffle._flush_commboolc                 C  s   | j o| jdkS )Nr   )rF   rE   rW   r   r   r    done  s    zShuffle.donec                   s   |    | j I d H  d S rY   )ra   r?   r   rW   r   r   r    r      s    zShuffle.flush_receivec                   s   | j r| j I d H  d S d| _ | j I d H  | j I d H  z| jjdd W n tk
rp   | j  Y nX | j	  d S )NT)Zcancel_futures)
r>   rM   waitr@   closer?   r8   shutdownru   r"   rW   r   r   r    r   $  s    zShuffle.closeru   )	exceptionrT   c                 C  s   | j s|| _d S rY   )r>   rJ   )rN   r   r   r   r    fail2  s    zShuffle.failN)r   r   r   __doc__rR   rX   
contextlibcontextmanagerrH   rb   r6   rk   rm   rp   rn   rs   rt   ra   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   0   s0   +$6	
r!   c                   @  s  e Zd ZU dZded< ded< ded< ded< d	ed
< dddddZddddZddddddZdddddZddddddZ	ddd ddd!d"d#Z
dddd$d%Zd&dd'd(d)Zedd&dd*d+Zedddd d&d,d-d+ZdAdd/d0d1d&d,d2d+Zdddd3d4d5Zddd6d7Zdddd8d9Zedddd d&d,d:d;Zedd&dd<d;ZdBdd/d0d1d&d,d=d;Zdd dd>d?d@Zd.S )CShuffleWorkerExtensiona  Interface between a Worker and a Shuffle.

    This extension is responsible for

    - Lifecycle of Shuffle instances
    - ensuring connectivity between remote shuffle instances
    - ensuring connectivity and integration with the scheduler
    - routing concurrent calls to the appropriate `Shuffle` based on its `ShuffleID`
    - collecting instrumentation of ongoing shuffles and route to scheduler/worker
    r   workerzdict[ShuffleId, Shuffle]shufflesr   r0   r/   r   r>   r\   )r   rT   c                 C  sl   | j |jd< | j|jd< | j|jd< | j|jd< | |jd< || _i | _tt	d| _
tt	d| _d| _d S )	Nrg   r]   shuffle_failzshuffle-failshufflez100 MiBz1 GiBF)rg   handlersr]   r   Zstream_handlers
extensionsr   r   r   r   r0   r/   r>   )rN   r   r   r   r    rR   I  s    
zShuffleWorkerExtension.__init__r<   rS   c                 C  s   dd | j  D S )Nc                 S  s   i | ]\}}||  qS r   )rm   )rx   r)   r   r   r   r    r{   ]  s      z4ShuffleWorkerExtension.heartbeat.<locals>.<dictcomp>)r   r:   rW   r   r   r    rm   \  s    z ShuffleWorkerExtension.heartbeatr   rc   )r_   rf   rT   c                   s$   |  |I dH }||I dH  dS )z
        Handler: Receive an incoming shard of data from a peer worker.
        Using an unknown ``shuffle_id`` is an error.
        N)_get_shufflerp   )rN   r_   rf   r   r   r   r    rg   _  s    	z&ShuffleWorkerExtension.shuffle_receive)r_   rT   c              	     sp   t  ` | |I dH }| I dH  | rb|jjs:ttd|  | 	|I dH  | j
|= W 5 Q R X dS )z
        Handler: Inform the extension that all input partitions have been handed off to extensions.
        Using an unknown ``shuffle_id`` is an error.
        NzShuffle inputs done )r   r   r   r   r?   emptyr~   loggerinfo_register_completer   rN   r_   r   r   r   r    r]   k  s    z*ShuffleWorkerExtension.shuffle_inputs_doner#   )r_   messagerT   c                   sR   z| j | }W n tk
r$   Y d S X t|}|| | I d H  | j |= d S rY   )r   r   r   r   r   )rN   r_   r   r   r   r   r   r    r   |  s    
z#ShuffleWorkerExtension.shuffle_failr   r$   )rf   r_   npartitionsr'   rT   c                 C  s*   | j ||||d}t| jj|j|d d S )N)r   r   r'   )rf   )get_shuffler   r   loopr   )rN   rf   r_   r   r'   r   r   r   r    r     s       z$ShuffleWorkerExtension.add_partitionc                   s"   |  |I dH }| I dH  dS )z
        Task: Note that the barrier task has been reached (`add_partition` called for all input partitions)

        Using an unknown ``shuffle_id`` is an error. Calling this before all partitions have been
        added is undefined.
        N)r   rb   r   r   r   r    _barrier  s    	zShuffleWorkerExtension._barrierr!   )r   rT   c                   s0   |  I d H  | jjj|j| jjdI d H  d S )N)r)   r   )r   r   	schedulerZshuffle_register_completer)   rd   )rN   r   r   r   r    r     s
    z)ShuffleWorkerExtension._register_completec                   s   d S rY   r   rN   r_   r   r   r    r     s    z#ShuffleWorkerExtension._get_shuffle)r_   r   r'   r   rT   c                   s   d S rY   r   rN   r_   r   r'   r   r   r   r    r     s    Nzpd.DataFrame | Nonez
str | Nonez
int | Nonec           	        s  ddl }z| j| }W nZ tk
rr   zj| jjj||dk	rR|j| 	 nd||| jj
dI dH }|d dkrt|d |d dkstW n0 tk
r   td dd	lm} | Y nX | jrt| jj d
| jj
 || jkr`t|d |d |d t|d |tj| jjd| | jjj| jj
| jjt| j || j!| j"d}|| j|< | j|  Y S Y nX |j#r|j#|S dS )z=Get a shuffle by ID; raise ValueError if it's not registered.r   N)r)   r(   r   r'   r   statusERRORr   OKzEWorker Shuffle unable to get information from scheduler, rescheduling)
Reschedulez already closed on r'   r%   r&   r(   zshuffle-)r'   r%   r&   r(   r)   r+   r,   r*   r-   r.   r/   r0   )$pyarrowr   r   r   r   Zshuffle_getZSchemafrom_pandas	serializeZ
to_pybytesrd   r   r~   r   r   distributed.workerr   r>   r   	__class__r   r!   r   ospathjoinZlocal_directorystater,   r-   	functoolspartial_broadcast_to_participantsr/   r0   rJ   )	rN   r_   r   r'   r   par   resultr   r   r   r    r     sj    	
  
)r)   r`   rT   c                   s.   | j jj|dI d H }| j jj||dI d H S )N)r)   )r`   workers)r   r   Z!shuffle_get_participating_workersr.   )rN   r)   r`   Zparticipating_workersr   r   r    r     s     z1ShuffleWorkerExtension._broadcast_to_participantsc                   s8   | j r
td| _ | jr4| j \}}| I d H  qd S )NT)r>   r~   r   popitemr   )rN   r   r   r   r   r    r     s
    
zShuffleWorkerExtension.closec                 C  s   t | jj| j| d S rY   )r   r   r   r   r   r   r   r    rb     s    zShuffleWorkerExtension.barrierc                 C  s   d S rY   r   r   r   r   r    r     s    z"ShuffleWorkerExtension.get_shufflec                 C  s   d S rY   r   r   r   r   r    r     s    c                 C  s   t | jj| j||||S rY   )r   r   r   r   r   r   r   r    r   #  s    )r_   output_partitionrT   c                 C  sP   |  |}t| jj|j|}| rL|| jkrL| j|}t| jj| j| |S )z
        Task: Retrieve a shuffled output partition from the ShuffleExtension.

        Calling this for a ``shuffle_id`` which is unknown or incomplete is an error.
        )	r   r   r   r   r   r   r   popr   )rN   r_   r   r   outputr   r   r    r   3  s    
z+ShuffleWorkerExtension.get_output_partition)NNN)NNN)r   r   r   r   __annotations__rR   rm   rg   r]   r   r   r   r   r
   r   r   r   rb   r   r   r   r   r   r    r   7  sD   

	   C		   r   r   r#   z	pd.Serieszdict[Any, pa.Table])r   r'   r%   rT   c                   s"  ddl }ddl}| jjjd|ddd} t| }|s<i S |jj| dd  	d |
 dgd } dg ~ ||dd |dd	 kd d }|dg|g} fd
dtd|D }| j|d	 dd || }	fddt|	|D }
ttt|
 |kst|
S )zO
    Split data into many arrow batches, partitioned by destination worker
    r   NZ_workerTinner)rightZleft_onZright_indexhow)Zpreserve_indexr   c                   s"   g | ]\}} j ||| d qS offsetlengthslicerx   abr   r   r    
<listcomp>d  s    z#split_by_worker.<locals>.<listcomp>   r   c                   s   i | ]\}} j j| |qS r   )cat
categories)rx   codeZshard)r%   r   r    r{   j  s   
 z#split_by_worker.<locals>.<dictcomp>)numpyr   merger   codesrenamerC   ZTabler   sort_byasarrayselectZdropwhereconcatenatetoolzsliding_windowr;   r   ziprq   rr   r}   r~   )r   r'   r%   npr   Znrowsr   splitsre   Zunique_codesr   r   )r   r%   r    r   D  s8    
&


r   r   )r   r'   rT   c                   s   ddl } |g |  }|   | | |gd }||dd |dd kd d }|dg|g} fddt	
d|D }| j|d dd t ttt|kstt|t|ksttt||S )	zL
    Split data into many arrow batches, partitioned by final partition
    r   Nr   r   c                   s"   g | ]\}} j ||| d qS r   r   r   r   r   r    r     s    z&split_by_partition.<locals>.<listcomp>r   r   )r   r   r   uniquesortr   r   r   r   r   r   r;   r   rC   rq   rr   r~   r<   r   )r   r'   r   Z
partitions	partitionr   re   r   r   r    r|   s  s    
&

r|   )<
__future__r   rK   r   r   loggingr   rH   collectionsr   collections.abcr   r   concurrent.futuresr   typingr   r   r	   r
   r   Z
dask.utilsr   Zdistributed.corer   Zdistributed.protocolr   Zdistributed.shuffle._arrowr   r   r   r   r   Zdistributed.shuffle._commsr   Zdistributed.shuffle._diskr   Zdistributed.shuffle._limiterr   Zdistributed.shuffle._shuffler   Zdistributed.utilsr   r   r7   rO   r   r   r   r   r   	getLoggerr   r   r   r   r!   r   r   r|   r   r   r   r    <module>   sF   
  	  /