U
    /e                     @  s  d dl mZ d dlZd dlmZmZmZ d dlmZ d dl	m
Z
 d dlmZ edZerzd dlZd dlmZ d d	lmZ ed
eZddddZdd
ddddddZd
ddddddZd
dddddZd,ddddd d!d"ZG d#d$ d$eZd%Zd
dd&d'd(Zdd
d)d*d+ZdS )-    annotationsN)TYPE_CHECKINGAnyNewType)tokenize)HighLevelGraph)SimpleShuffleLayerzdistributed.shuffle	DataFrame)ShuffleWorkerExtension	ShuffleIdr   returnc               
   C  sn   ddl m}  z
|  }W n, tk
rB } ztd|W 5 d }~X Y nX |jd}|d krjtd|j d|S )Nr   )
get_workerz`shuffle='p2p'` requires Dask's distributed scheduler. This task is not running on a Worker; please confirm that you've created a distributed Client and are submitting this computation through it.shufflezThe worker zE does not have a ShuffleExtension. Is pandas installed on the worker?)Zdistributedr   
ValueErrorRuntimeError
extensionsgetaddress)r   Zworkere	extension r   @/tmp/pip-unpacked-wheel-g426oqom/distributed/shuffle/_shuffle.py_get_worker_extension   s    
r   pd.DataFrameintstrNone)inputidnpartitionscolumnr   c                 C  sR   zt  j| |||d W n4 tk
rL   d| }tj|dd t|Y nX d S )N)r"   r#   z'shuffle_transfer failed during shuffle Texc_info)r   Zadd_partition	Exceptionloggererrorr   )r    r!   r"   r#   msgr   r   r   shuffle_transfer)   s       

r*   object)r!   output_partitionbarrierr   c                 C  sJ   zt  | |W S  tk
rD   d|  }tj|dd t|Y nX d S )Nz%shuffle_unpack failed during shuffle Tr$   )r   Zget_output_partitionr&   r'   r(   r   )r!   r,   r-   r)   r   r   r   shuffle_unpack;   s    
r.   z
list[None])r!   	transfersr   c                 C  sH   zt  | W S  tk
rB   d|  }tj|dd t|Y nX d S )Nz&shuffle_barrier failed during shuffle Tr$   )r   r-   r&   r'   r(   r   )r!   r/   r)   r   r   r   shuffle_barrierH   s    
r0   r   z
int | None)dfr#   r"   r   c              	   C  s   ddl m} |p| j}t| ||}| j }tdd |jD r^dd |jD }td| |j	
 D ]"\}}|tkrh|| d||< qh|| d	||< d
| }	t|	||| jd| j|d}
|t|	|
| g|	|d g|d  S )Nr   r
   c                 s  s   | ]}t |t V  qd S N)
isinstancer   .0cr   r   r   	<genexpr>^   s     z*rearrange_by_column_p2p.<locals>.<genexpr>c                 S  s    i | ]}t |ts|t|qS r   )r3   r   typer4   r   r   r   
<dictcomp>_   s     
  z+rearrange_by_column_p2p.<locals>.<dictcomp>z0p2p requires all column names to be str, found: stringZint64zshuffle-p2p-T)npartitions_inputignore_index
name_input
meta_input   )dask.dataframer   r"   r   Z_metacopyanycolumns	TypeErrorZdtypesitemsr+   ZastypeP2PShuffleLayer_namer   Zfrom_collections)r1   r#   r"   r   tokenemptyunsupportedr6   dtnameZlayerr   r   r   rearrange_by_column_p2pS   s>    



	rM   c                      sr   e Zd Zddddddddddd	 fd	d
ZddddZddddZdd dddZddddddZ  ZS )rF   Nr   r   boolr   zlist | Nonezdict | None)	rL   r#   r"   r;   r<   r=   r>   	parts_outr   c
           
        s<   |	pi }	|	 ddd i t j|||||||||	d	 d S )Nr   c                 S  s   | d S )Nr?   r   keyr   r   r   <lambda>       z*P2PShuffleLayer.__init__.<locals>.<lambda>r   )updatesuper__init__)
selfrL   r#   r"   r;   r<   r=   r>   rO   r   	__class__r   r   rV   }   s    zP2PShuffleLayer.__init__listr   c                 C  s   g S r2   r   rW   r   r   r   get_split_keys   s    zP2PShuffleLayer.get_split_keysc                 C  s    t | j d| j d| j dS )Nz<name='z', npartitions=>)r8   __name__rL   r"   r[   r   r   r   __repr__   s    zP2PShuffleLayer.__repr__)rO   r   c              
   C  s&   t | j| j| j| j| j| j| j|dS )N)rO   )rF   rL   r#   r"   r;   r<   r=   r>   )rW   rO   r   r   r   _cull   s    zP2PShuffleLayer._cullr   zdict[tuple | str, tuple])deserializingr   c           	      C  s   t | j| j| j| j}i }tt|}d| }t }t| j	D ]2}|
||f t| j|f|| j| jf|||f< q>t||f||< | j}| jD ]}t|||f|||f< q|S )Nzshuffle-transfer-)r   r=   r#   r"   rO   barrier_keyr   rZ   ranger;   appendr*   r0   rL   r.   )	rW   ra   rH   ZdskZ_barrier_keyrL   Ztransfer_keysiZpart_outr   r   r   _construct_graph   s$    
z P2PShuffleLayer._construct_graph)NN)N)	r^   
__module____qualname__rV   r\   r_   r`   rf   __classcell__r   r   rX   r   rF   |   s   
  $rF   zshuffle-barrier-)
shuffle_idr   c                 C  s   t |  S r2   )_BARRIER_PREFIX)rj   r   r   r   rb      s    rb   )rQ   r   c                 C  s   |  tstt| tdS )N )
startswithrk   AssertionErrorr   replacerP   r   r   r   id_from_key   s    rp   )N) 
__future__r   loggingtypingr   r   r   Z	dask.baser   Zdask.highlevelgraphr   Zdask.layersr	   	getLoggerr'   Zpandaspdr@   r   Z%distributed.shuffle._worker_extensionr   r   r   r   r*   r.   r0   rM   rF   rk   rb   rp   r   r   r   r   <module>   s*   

 )H