U
    /e                     @  s  d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlZd dlmZ d dlmZ d d	lmZmZmZmZ d d
lmZmZ d dlmZ d dlmZmZm Z m!Z!m"Z"m#Z# G dd dZ$G dd deZ%G dd de%Z&G dd de%Z'G dd deZ(d'ddZ)dd Z*dd Z+G dd deZ,G dd  d e,Z-G d!d" d"eZ.G d#d$ d$eZ/G d%d& d&eZ0dS )(    annotationsN)defaultdict)Callable)product)Any)map)tokenize)	BlockwiseBlockwiseDepBlockwiseDepDictblockwise_token)flattenkeys_in_tasks)Layer)applycached_cumsumconcreteinsert	stringifystringify_collection_keysc                   @  s    e Zd ZdZdd Zdd ZdS )CallableLazyImportzFunction Wrapper for Lazy Importing.

    This Class should only be used when materializing a graph
    on a distributed scheduler.
    c                 C  s
   || _ d S N)function_path)selfr    r   //tmp/pip-unpacked-wheel-dbjnr7gq/dask/layers.py__init__)   s    zCallableLazyImport.__init__c                 O  s   ddl m} || j||S )Nr   )import_term)Zdistributed.utilsr   r   )r   argskwargsr   r   r   r   __call__,   s    zCallableLazyImport.__call__N)__name__
__module____qualname____doc__r   r!   r   r   r   r   r   "   s   r   c                   @  sf   e Zd ZU dZded< ded< dZded< dd	d
dZddddZdddddZe	dd Z
dS )ArrayBlockwiseDepzg
    Blockwise dep for array-likes, which only needs chunking
    information to compute its data.
    tuple[tuple[int, ...], ...]chunkstuple[int, ...]	numblocksFboolproduces_tasksr(   c                 C  s$   || _ tdd |D | _d| _d S )Nc                 s  s   | ]}t |V  qd S r   )len).0chunkr   r   r   	<genexpr>E   s     z-ArrayBlockwiseDep.__init__.<locals>.<genexpr>F)r(   tupler*   r,   r   r(   r   r   r   r   C   s    zArrayBlockwiseDep.__init__idxc                 C  s   t dd S )Nz%Subclasses must implement __getitem__)NotImplementedErrorr   r5   r   r   r   __getitem__H   s    zArrayBlockwiseDep.__getitem__Nzlist[tuple[int, ...]] | None)required_indicesc                 C  s
   d| j iS )Nr(   r-   )r   r9   r   r   r   __dask_distributed_pack__K   s    z+ArrayBlockwiseDep.__dask_distributed_pack__c                 C  s
   | f |S r   r   clsstater   r   r   __dask_distributed_unpack__P   s    z-ArrayBlockwiseDep.__dask_distributed_unpack__)N)r"   r#   r$   r%   __annotations__r,   r   r8   r:   classmethodr>   r   r   r   r   r&   9   s   
 r&   c                   @  s   e Zd ZdZddddZdS )ArrayChunkShapeDepz(Produce chunk shapes given a chunk indexr)   r4   c                 C  s   t dd t|| jD S )Nc                 s  s   | ]\}}|| V  qd S r   r   )r/   ir0   r   r   r   r1   Y   s     z1ArrayChunkShapeDep.__getitem__.<locals>.<genexpr>)r2   zipr(   r7   r   r   r   r8   X   s    zArrayChunkShapeDep.__getitem__N)r"   r#   r$   r%   r8   r   r   r   r   rA   U   s   rA   c                      s>   e Zd ZU dZded< dd fddZddd	d
Z  ZS )ArraySliceDepz>Produce slice(s) into the full-sized array given a chunk indexr'   startsr-   c                   s$   t  | tdd |D | _d S )Nc                 s  s   | ]}t |d dV  qdS )T)Zinitial_zeroN)r   )r/   cr   r   r   r1   c   s     z)ArraySliceDep.__init__.<locals>.<genexpr>)superr   r2   rE   r3   	__class__r   r   r   a   s    zArraySliceDep.__init__r2   r4   c                 C  s,   t dd t|| jD }t dd |D S )Nc                 s  s&   | ]\}}|| ||d   fV  qdS    Nr   )r/   rB   startr   r   r   r1   f   s     z,ArraySliceDep.__getitem__.<locals>.<genexpr>c                 s  s   | ]}t |d V  qd S )N)Nslicer/   sr   r   r   r1   g   s     )r2   rC   rE   )r   r5   locr   r   r   r8   e   s    zArraySliceDep.__getitem__)r"   r#   r$   r%   r?   r   r8   __classcell__r   r   rH   r   rD   \   s   
rD   c                      sz   e Zd ZdZ fddZdd Zedd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd ZdddZedd Z  ZS )ArrayOverlapLayera\  Simple HighLevelGraph array overlap layer.

    Lazily computed High-level graph layer for a array overlap operations.

    Parameters
    ----------
    name : str
        Name of new output overlap array.
    array : Dask array
    axes: Mapping
        Axes dictionary indicating overlap in each dimension,
        e.g. ``{'0': 1, '1': 1}``
    c                   s2   t    || _|| _|| _|| _|| _d | _d S r   )rG   r   nameaxesr(   r*   token_cached_keys)r   rT   rU   r(   r*   rV   rH   r   r   r   y   s    
zArrayOverlapLayer.__init__c                 C  s   d| j  dS )NzArrayOverlapLayer<name=''rT   r   r   r   r   __repr__   s    zArrayOverlapLayer.__repr__c                 C  s$   t | dr| jS |  }|| _| jS z$Materialize full dict representation_cached_dicthasattrr]   _construct_graphr   dskr   r   r   _dict   s
    
zArrayOverlapLayer._dictc                 C  s
   | j | S r   rc   r   keyr   r   r   r8      s    zArrayOverlapLayer.__getitem__c                 C  s
   t | jS r   iterrc   rZ   r   r   r   __iter__   s    zArrayOverlapLayer.__iter__c                 C  s
   t | jS r   r.   rc   rZ   r   r   r   __len__   s    zArrayOverlapLayer.__len__c                 C  s
   t | dS Nr]   r_   rZ   r   r   r   is_materialized   s    z!ArrayOverlapLayer.is_materializedc                 C  s   |   S r   keysrZ   r   r   r   get_output_keys   s    z!ArrayOverlapLayer.get_output_keysc                   sH   | j d k	r| j S | j| j| j    fdd  | _ }|S )Nc                    sb   sfgS t  }|d t krB fddt| D }n fddt| D }|S )NrK   c                   s   g | ]}f  |f qS r   r   r/   rB   )r   rT   r   r   
<listcomp>   s     z>ArrayOverlapLayer._dask_keys.<locals>.keys.<locals>.<listcomp>c                   s   g | ]} |f  qS r   r   rr   )r   rp   r   r   rs      s     )r.   range)r   indresultr(   rp   rT   r*   )r   r   rp      s    z*ArrayOverlapLayer._dask_keys.<locals>.keys)rW   rT   r(   r*   )r   rv   r   rw   r   
_dask_keys   s    

zArrayOverlapLayer._dask_keysFc                 C  s
  | j }| j}| j}|  }d| j }d| j }|r<td}nddlm} tt	t
|}	tjt|	|d}
t|tt	|
t	ttjt}i }i }|D ]h}t|f| |}|f| |kr|||f| < q|f| ||f| < |t|
d| |dff||f| < qt||}|S )	z/Construct graph for a simple overlap operation.zgetitem-zoverlap-zdask.array.core.concatenate3r   )concatenate3)dimsrU   r   rY   )rU   r(   rT   rx   rV   r   Zdask.array.corery   listr   r.   	functoolspartial_expand_keys_around_centertoolzpiper   concatfractional_slicer   merge)r   deserializingrU   r(   rT   Z	dask_keysZgetitem_nameZoverlap_namery   rz   Zexpand_key2Zinterior_keysZinterior_slicesZoverlap_blockskZ
frac_slicerb   r   r   r   r`      sF    


       z"ArrayOverlapLayer._construct_graphc                 C  s   | f |j ddS )NTr   )r`   r;   r   r   r   r>      s    z-ArrayOverlapLayer.__dask_distributed_unpack__)F)r"   r#   r$   r%   r   r[   propertyrc   r8   ri   rk   rn   rq   rx   r`   r@   r>   rR   r   r   rH   r   rS   j   s   
	
+rS   c                   s   fddg }t | dd D ]>\}}d}|dkr<|d7 }|| d k rT|d7 }|| q  fddt | dd D }|dk	r|gg| }tt| }	 fddt |D }
t|
|	}|S )	a  Get all neighboring keys around center

    Parameters
    ----------
    k: tuple
        They key around which to generate new keys
    dims: Sequence[int]
        The number of chunks in each dimension
    name: Option[str]
        The name to include in the output keys, or none to include no name
    axes: Dict[int, int]
        The axes active in the expansion.  We don't expand on non-active axes

    Examples
    --------
    >>> _expand_keys_around_center(('x', 2, 3), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y', 1.1, 2.1), ('y', 1.1, 3), ('y', 1.1, 3.9)],
     [('y',   2, 2.1), ('y',   2, 3), ('y',   2, 3.9)],
     [('y', 2.9, 2.1), ('y', 2.9, 3), ('y', 2.9, 3.9)]]

    >>> _expand_keys_around_center(('x', 0, 4), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y',   0, 3.1), ('y',   0,   4)],
     [('y', 0.9, 3.1), ('y', 0.9,   4)]]
    c                   sN   g }|d dkr| |d  | | |d  |  d k rJ| |d  |S )Ng?r   rK   )append)rB   ru   rv)rz   r   r   inds  s    
z(_expand_keys_around_center.<locals>.indsrK   Nr   c                   s2   g | ]*\}}t  |d fr(||n|gqS )r   anyget)r/   rB   ru   )rU   r   r   r   rs     s    z._expand_keys_around_center.<locals>.<listcomp>c                   s*   g | ]"\}}t  |d fr"|ndqS )r   rK   r   )r/   rB   d)rU   r   r   rs     s     )	enumerater   r{   r   reshapelist)r   rz   rT   rU   shaperB   ru   numr   seqZshape2rv   r   )rU   rz   r   r   r~      s$    	
r~   c                   sF   t  dkrt|S tt | d  } fddt||D S dS )zgReshape iterator to nested shape

    >>> reshapelist((2, 3), range(6))
    [[0, 1, 2], [3, 4, 5]]
    rK   r   c                   s   g | ]}t  d d |qS rJ   )r   r/   partr   r   r   rs   )  s     zreshapelist.<locals>.<listcomp>N)r.   r{   intr   	partition)r   r   nr   r   r   r     s    r   c           
      C  s  | d ft dd | dd D  }g }tt| dd |dd D ]\}\}}||d}t|t rz|d }|d }	n|}|}	||kr|tddd qF||k r|	r|td|	 qF||kr|r|t| d qF|tdd qFt |}tdd |D r| S tj	||fS dS )a  

    >>> fractional_slice(('x', 5.1), {0: 2})
    (<built-in function getitem>, ('x', 5), (slice(-2, None, None),))

    >>> fractional_slice(('x', 3, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(None, None, None), slice(-3, None, None)))

    >>> fractional_slice(('x', 2.9, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(0, 2, None), slice(-3, None, None)))
    r   c                 s  s   | ]}t t|V  qd S r   )r   roundrr   r   r   r   r1   8  s     z#fractional_slice.<locals>.<genexpr>rK   Nc                 s  s   | ]}|t d d d kV  qd S r   rM   )r/   ru   r   r   r   r1   N  s     )
r2   r   rC   r   
isinstancer   rN   alloperatorgetitem)
ZtaskrU   ZroundedindexrB   trdepthZ
left_depthZright_depthr   r   r   r   ,  s(    $*

r   c                      s   e Zd ZdZd& fdd	Zdd Zdd Zd	d
 Zdd Ze	dd Z
dd Zdd Zdd Zdd Zd'ddZdd Zdd Zdd Zdd  Zed!d" Zd(d$d%Z  ZS ))SimpleShuffleLayera  Simple HighLevelGraph Shuffle layer

    High-level graph layer for a simple shuffle operation in which
    each output partition depends on all input partitions.

    Parameters
    ----------
    name : str
        Name of new shuffled output collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    npartitions : int
        Number of output partitions.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    Nc
           
        s   t  j|	d || _|| _|| _|| _|| _|| _|| _|pBt	|| _
d| j | _| jpZi | _d| jkrri | jd< d | jd d< | jd dd |  D  d S )Nr   split-priorityZ__expanded_annotations__c                 S  s   i | ]
}|d qS )rK   r   )r/   _keyr   r   r   
<dictcomp>  s      z/SimpleShuffleLayer.__init__.<locals>.<dictcomp>)rG   r   rT   columnnpartitionsnpartitions_inputignore_index
name_input
meta_inputrt   	parts_out
split_namer   updateget_split_keys)
r   rT   r   r   r   r   r   r   r   r   rH   r   r   r   x  s    

zSimpleShuffleLayer.__init__c                   s    fddt  jD S )Nc                   s(   g | ] } j D ]}t j||fqqS r   )r   r   r   )r/   part_inpart_outrZ   r   r   rs     s    z5SimpleShuffleLayer.get_split_keys.<locals>.<listcomp>)rt   r   rZ   r   rZ   r   r     s    
z!SimpleShuffleLayer.get_split_keysc                   s    fdd j D S )Nc                   s   h | ]} j |fqS r   rY   r   rZ   r   r   	<setcomp>  s     z5SimpleShuffleLayer.get_output_keys.<locals>.<setcomp>r   rZ   r   rZ   r   rq     s    z"SimpleShuffleLayer.get_output_keysc                 C  s   d | j| jS )Nz-SimpleShuffleLayer<name='{}', npartitions={}>)formatrT   r   rZ   r   r   r   r[     s     zSimpleShuffleLayer.__repr__c                 C  s
   t | dS rl   rm   rZ   r   r   r   rn     s    z"SimpleShuffleLayer.is_materializedc                 C  s$   t | dr| jS |  }|| _| jS r\   r^   ra   r   r   r   rc     s
    
zSimpleShuffleLayer._dictc                 C  s
   | j | S r   rd   re   r   r   r   r8     s    zSimpleShuffleLayer.__getitem__c                 C  s
   t | jS r   rg   rZ   r   r   r   ri     s    zSimpleShuffleLayer.__iter__c                 C  s
   t | jS r   rj   rZ   r   r   r   rk     s    zSimpleShuffleLayer.__len__c              	   C  sP   t  }|D ]@}z|\}}W n tk
r2   Y q
Y nX || jkr@q
|| q
|S z4Simple utility to convert keys to partition indices.set
ValueErrorrT   addr   rp   partsrf   _name_partr   r   r   _keys_to_parts  s    

z!SimpleShuffleLayer._keys_to_partsc                   sN   t t}|p |}|D ].}| j|f   fddt jD O  < q|S )zDetermine the necessary dependencies to produce `keys`.

        For a simple shuffle, output partitions always depend on
        all input partitions. This method does not require graph
        materialization.
        c                   s   h | ]} j |fqS r   r   rr   rZ   r   r   r     s    z8SimpleShuffleLayer._cull_dependencies.<locals>.<setcomp>)r   r   r   rT   rt   r   )r   rp   r   depsr   r   rZ   r   _cull_dependencies  s    z%SimpleShuffleLayer._cull_dependenciesc              
   C  s&   t | j| j| j| j| j| j| j|dS Nr   )r   rT   r   r   r   r   r   r   r   r   r   r   r   _cull  s    zSimpleShuffleLayer._cullc                 C  sD   |  |}| j||d}|t| jkr8| |}||fS | |fS dS )a  Cull a SimpleShuffleLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r   Nr   r   r   r   r   r   rp   all_keysr   Zculled_depsculled_layerr   r   r   cull  s    

zSimpleShuffleLayer.cullc              	     s0   ddddddddd	g	}t t fd
d|D fS )NrT   r   r   r   r   r   r   r   r   c                 3  s   | ]}t  |V  qd S r   getattrr/   attrrZ   r   r   r1     s     z0SimpleShuffleLayer.__reduce__.<locals>.<genexpr>)r   r2   r   attrsr   rZ   r   
__reduce__   s    zSimpleShuffleLayer.__reduce__c              	   C  s:   ddl m} | j| j| j| j| j| j|| jt	| j
dS )Nr   to_serialize)rT   r   r   r   r   r   r   r   )distributed.protocol.serializer   rT   r   r   r   r   r   r   r{   r   )r   Zall_hlg_keysZknown_key_dependenciesclientZclient_keysr   r   r   r   r:     s    z,SimpleShuffleLayer.__dask_distributed_pack__c                   s   ddl m} t|d tr*t|d |d< d|krBt|d |d< | f |jdd}dd | D }| | B   fd	d| D }t	|||d
S )Nr   
dumps_taskr   inputsTr   c                 S  s   i | ]\}}t |t|qS r   r   r   r/   r   vr   r   r   r   -  s     zBSimpleShuffleLayer.__dask_distributed_unpack__.<locals>.<dictcomp>c                   s   i | ]\}}|t  |gqS r   r   r   ro   r   r   r   3  s      rb   r   )
distributed.workerr   r   r2   r{   r`   itemsrp   r   valmap)r<   r=   rb   dependenciesr   Z	layer_dskr   r   ro   r   r>     s    z.SimpleShuffleLayer.__dask_distributed_unpack__Fc           
   
     s   dj  }|r td}td}nddlm} ddlm} i }jD ]  fddtjD }||j	f|j  f< |D ]^\}}}	t
j||	f|f|j||	f< ||	f|krz|j|	fjdjjj	jf|||	f< qzqB|S )	z/Construct graph for a simple shuffle operation.group-dask.dataframe.core._concat$dask.dataframe.shuffle.shuffle_groupr   _concatshuffle_groupc                   s   g | ]}j  |fqS r   )r   )r/   r   r   r   r   r   rs   J  s   z7SimpleShuffleLayer._construct_graph.<locals>.<listcomp>)rT   r   dask.dataframe.corer   dask.dataframe.shuffler   r   rt   r   r   r   r   r   r   r   r   )
r   r   shuffle_group_nameconcat_funcshuffle_group_funcrb   _concat_list_Z	_part_outZ_part_inr   r   r   r`   7  sB    

z#SimpleShuffleLayer._construct_graph)NN)N)F)r"   r#   r$   r%   r   r   rq   r[   rn   r   rc   r8   ri   rk   r   r   r   r   r   r:   r@   r>   r`   rR   r   r   rH   r   r   [  s,   %  *
	

r   c                      sb   e Zd ZdZd fdd	Zdd Zdd Zd	d
 Z fddZdddZ	dd Z
dddZ  ZS )ShuffleLayera"  Shuffle-stage HighLevelGraph layer

    High-level graph layer corresponding to a single stage of
    a multi-stage inter-partition shuffle operation.

    Stage: (shuffle-group) -> (shuffle-split) -> (shuffle-join)

    Parameters
    ----------
    name : str
        Name of new (partially) shuffled collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    inputs : list of tuples
        Each tuple dictates the data movement for a specific partition.
    stage : int
        Index of the current shuffle stage.
    npartitions : int
        Number of output partitions for the full (multi-stage) shuffle.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    k : int
        A partition is split into this many groups during each stage.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    Nc                   s@   || _ || _|| _t j||||||	|
|p4tt||d	 d S )N)r   r   )r   stagensplitsrG   r   rt   r.   )r   rT   r   r   r   r   r   r   r   r   r   r   r   rH   r   r   r     s    zShuffleLayer.__init__c                 C  sT   g }| j D ]D}| j| }t| jD ]*}|t| j|| j t|| j|f q"q
|S r   )	r   r   rt   r   r   r   r   r   r   )r   rp   r   outrB   r   r   r   r     s    

	zShuffleLayer.get_split_keysc                 C  s   d | j| j| j| jS )Nz=ShuffleLayer<name='{}', stage={}, nsplits={}, npartitions={}>)r   rT   r   r   r   rZ   r   r   r   r[     s       zShuffleLayer.__repr__c                   s6   ddddddddd	d
ddg}t t fdd|D fS )NrT   r   r   r   r   r   r   r   r   r   r   r   c                 3  s   | ]}t  |V  qd S r   r   r   rZ   r   r   r1     s     z*ShuffleLayer.__reduce__.<locals>.<genexpr>)r   r2   r   r   rZ   r   r     s    zShuffleLayer.__reduce__c                   s0   t  j||}| j|d< | j|d< | j|d< |S )Nr   r   r   )rG   r:   r   r   r   )r   r   r    retrH   r   r   r:     s
    


z&ShuffleLayer.__dask_distributed_pack__c           
      C  s   t t}|p| |}dd t| jD }|D ]}| j| }t| jD ]j}t|| j|}|| }	| jdkr|	| j	kr|| j
|f d| j
 |df qF|| j
|f | j|	f qFq.|S )zqDetermine the necessary dependencies to produce `keys`.

        Does not require graph materialization.
        c                 S  s   i | ]\}}||qS r   r   r/   rB   inpr   r   r   r     s      z3ShuffleLayer._cull_dependencies.<locals>.<dictcomp>r   r   empty)r   r   r   r   r   rt   r   r   r   r   rT   r   r   )
r   rp   r   r   inp_part_mapr   r   r   _inpr   r   r   r   r     s    
"zShuffleLayer._cull_dependenciesc                 C  s2   t | j| j| j| j| j| j| j| j| j	| j
|dS r   )r   rT   r   r   r   r   r   r   r   r   r   r   r   r   r   r     s    zShuffleLayer._cullFc              
   C  sl  d| j  }|r td}td}nddlm} ddlm} i }dd t| jD }| jD ]}| j| }g }	t	| j
D ].}
t|| j|
}|| j }|	| j||f qt||	| jf|| j |f< |	D ]\}}}tj||f|f|| j||f< ||f|kr|| }| jdkr2|| jk r| j|f}n||d	f}| j||< n
| j|f}||| j| j| j
| j| j| jf|||f< qqV|S )
z2Construct graph for a "rearrange-by-column" stage.r   r   r   r   r   r   c                 S  s   i | ]\}}||qS r   r   r   r   r   r   r     s      z1ShuffleLayer._construct_graph.<locals>.<dictcomp>r   )rT   r   r   r   r   r   r   r   r   rt   r   r   r   r   r   r   r   r   r   r   r   r   r   )r   r   r   r   r   rb   r   r   r   r   rB   r   Z_idxr   r   Z	input_keyr   r   r   r`     sX    




zShuffleLayer._construct_graph)NN)N)F)r"   r#   r$   r%   r   r   r[   r   r:   r   r   r`   rR   r   r   rH   r   r   h  s   0  
r   c                      s   e Zd ZdZd$ fdd	Zdd Zdd Zd	d
 Zedd Z	dd Z
dd Zdd Zdd Zedd Zdd Zedd Zd%ddZdd Zdd  Zd&d"d#Z  ZS )'BroadcastJoinLayera;  Broadcast-based Join Layer

    High-level graph layer for a join operation requiring the
    smaller collection to be broadcasted to every partition of
    the larger collection.

    Parameters
    ----------
    name : str
        Name of new (joined) output collection.
    lhs_name: string
        "Left" DataFrame collection to join.
    lhs_npartitions: int
        Number of partitions in "left" DataFrame collection.
    rhs_name: string
        "Right" DataFrame collection to join.
    rhs_npartitions: int
        Number of partitions in "right" DataFrame collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations.
    **merge_kwargs : **dict
        Keyword arguments to be passed to chunkwise merge func.
    Nc	           
        s   t  j|d || _|| _|| _|| _|| _|| _|pBtt	| j| _
|	| _| jd| _| jd| _| jd| _t| jtrtt| jf| _t| jtrtt| jf| _d S )Nr   howleft_onright_on)rG   r   rT   r   lhs_namelhs_npartitionsrhs_namerhs_npartitionsr   rt   r   merge_kwargsr   r   r   r   r   r{   r2   )
r   rT   r   r   r  r  r  r   r   r  rH   r   r   r   a  s     zBroadcastJoinLayer.__init__c                   s    fdd j D S )Nc                   s   h | ]} j |fqS r   rY   r   rZ   r   r   r     s     z5BroadcastJoinLayer.get_output_keys.<locals>.<setcomp>r   rZ   r   rZ   r   rq   ~  s    z"BroadcastJoinLayer.get_output_keysc                 C  s   d | j| j| j| jS )Nz5BroadcastJoinLayer<name='{}', how={}, lhs={}, rhs={}>)r   rT   r   r   r  rZ   r   r   r   r[     s       zBroadcastJoinLayer.__repr__c                 C  s
   t | dS rl   rm   rZ   r   r   r   rn     s    z"BroadcastJoinLayer.is_materializedc                 C  s$   t | dr| jS |  }|| _| jS r\   r^   ra   r   r   r   rc     s
    
zBroadcastJoinLayer._dictc                 C  s
   | j | S r   rd   re   r   r   r   r8     s    zBroadcastJoinLayer.__getitem__c                 C  s
   t | jS r   rg   rZ   r   r   r   ri     s    zBroadcastJoinLayer.__iter__c                 C  s
   t | jS r   rj   rZ   r   r   r   rk     s    zBroadcastJoinLayer.__len__c              	   O  sl   dd l }i }| j D ]0\}}t|tttfs>||||< q|||< q| j| j	| j
| j| j| j| j|dS )Nr   )rT   r   r   r  r  r  r   r  )pickler  r   r   strr{   r+   dumpsrT   r   r   r  r  r  r   )r   r   r    r  Z_merge_kwargsr   r   r   r   r   r:     s    
z,BroadcastJoinLayer.__dask_distributed_pack__c                   s~   ddl m} |di }|| | f |jdd}dd | D }| | B   fdd| D }t|||d	S )
Nr   r   r  Tr   c                 S  s   i | ]\}}t |t|qS r   r   r   r   r   r   r     s      zBBroadcastJoinLayer.__dask_distributed_unpack__.<locals>.<dictcomp>c                   s   i | ]\}}|t  |gqS r   r   r   ro   r   r   r     s      r   )	r   r   popr   r`   r   rp   r   r   )r<   r=   rb   r   r   r  rawr   r   ro   r   r>     s    
z.BroadcastJoinLayer.__dask_distributed_unpack__c              	   C  sP   t  }|D ]@}z|\}}W n tk
r2   Y q
Y nX || jkr@q
|| q
|S r   r   r   r   r   r   r     s    

z!BroadcastJoinLayer._keys_to_partsc                 C  s8   | j | jk r | j| j | j| jfS | j| j| j| jfS d S r   )r  r  r   r  r   r   rZ   r   r   r   _broadcast_plan  s    		z"BroadcastJoinLayer._broadcast_planc                   s|   | j dd \ }}tt}|p(| |}|D ]H}|| j|f   fddt|D O  < || j|f  ||fhO  < q.|S )zDetermine the necessary dependencies to produce `keys`.

        For a broadcast join, output partitions always depend on
        all partitions of the broadcasted collection, but only one
        partition of the "other" collection.
        N   c                   s   h | ]} |fqS r   r   rr   
bcast_namer   r   r     s     z8BroadcastJoinLayer._cull_dependencies.<locals>.<setcomp>)r
  r   r   r   rT   rt   )r   rp   r   
bcast_size
other_namer   r   r   r  r   r     s    (
z%BroadcastJoinLayer._cull_dependenciesc                 C  s0   t | j| j| j| j| j| jf| j|d| jS )N)r   r   )	r   rT   r   r   r  r  r  r   r  r   r   r   r   r     s    	zBroadcastJoinLayer._cullc                 C  sD   |  |}| j||d}|t| jkr8| |}||fS | |fS dS )a  Cull a BroadcastJoinLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r   Nr   r   r   r   r   r     s    

zBroadcastJoinLayer.cullFc                 C  s2  d| j  }d| j  }|r2td}td}td}n$ddlm} ddlm} dd	lm} | j\}}}	}
| j| jk rtd
nd}i }| j	D ]}| j
dkr||	|f|
|f|||f< g }t|D ]d}| j
dkrtj||f|fn|	|f||fg}|d
kr|  |||f}t||| jf||< || q||f|| j |f< q|S )z/Construct graph for a broadcast join operation.zinter-r   z%dask.dataframe.multi._split_partitionz$dask.dataframe.multi._concat_wrapperz)dask.dataframe.multi._merge_chunk_wrapperr   )_concat_wrapper)_merge_chunk_wrapper)_split_partitionleftrightinner)rT   r   Zdask.dataframe.multir  r  r  r
  r  r  r   r   rt   r   r   reverser   r  r   )r   r   Z
inter_namer   Zsplit_partition_funcr   Zmerge_chunk_funcr  r  r  Zother_onZ
bcast_siderb   rB   r   jZ_merge_argsZ	inter_keyr   r   r   r`     sZ    





z#BroadcastJoinLayer._construct_graph)NN)N)F)r"   r#   r$   r%   r   rq   r[   rn   r   rc   r8   ri   rk   r:   r@   r>   r   r
  r   r   r   r`   rR   r   r   rH   r   r   F  s,   "  
	


r   c                      s>   e Zd ZdZd fdd	Zedd Zdd	 Zd
d Z  Z	S )DataFrameIOLayera  DataFrame-based Blockwise Layer with IO

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    columns : str, list or None
        Field name(s) to read in as columns in the output.
    inputs : list or BlockwiseDep
        List of arguments to be passed to ``io_func`` so
        that the materialized task to produce partition ``i``
        will be: ``(<io_func>, inputs[i])``.  Note that each
        element of ``inputs`` is typically a tuple of arguments.
    io_func : callable
        A callable function that takes in a single tuple
        of arguments, and outputs a DataFrame partition.
        Column projection will be supported for functions
        that satisfy the ``DataFrameIOFunction`` protocol.
    label : str (optional)
        String to use as a prefix in the place-holder collection
        name. If nothing is specified (default), "subset-" will
        be used.
    produces_tasks : bool (optional)
        Whether one or more elements of `inputs` is expected to
        contain a nested task. This argument in only used for
        serialization purposes, and will be deprecated in the
        future. Default is False.
    creation_info: dict (optional)
        Dictionary containing the callable function ('func'),
        positional arguments ('args'), and key-word arguments
        ('kwargs') used to produce the dask collection with
        this underlying ``DataFrameIOLayer``.
    annotations: dict (optional)
        Layer annotations to pass through to Blockwise.
    NFc	                   s   || _ || _|| _|| _|| _|| _|| _|| _t|t	sZt
dd t| jD | jd}	n|}	| j |tdfi}
t j| j d|
|	dfgi |d d S )Nc                 S  s   i | ]\}}|f|qS r   r   r   r   r   r   r     s      z-DataFrameIOLayer.__init__.<locals>.<dictcomp>)r,   r   rB   )outputZoutput_indicesrb   indicesr*   r   )rT   _columnsr   io_funclabelr,   r   creation_infor   r   r   r   r   rG   r   )r   rT   columnsr   r  r  r,   r  r   Z
io_arg_maprb   rH   r   r   r     s.    
zDataFrameIOLayer.__init__c                 C  s   | j S )z(Current column projection for this layer)r  rZ   r   r   r   r    s    zDataFrameIOLayer.columnsc              	   C  s   ddl m} t|}| jdks.t| j|rt| j|rH| j|}n| j}t	| j
pXdd t| j| || j|| j
| j| jd}|S | S dS )zProduce a column projection for this IO layer.
        Given a list of required output columns, this method
        returns the projected layer.
        r   )DataFrameIOFunctionNZsubset-)r  r,   r   )Zdask.dataframe.io.utilsr   r{   r  r   
issupersetr   r  project_columnsr  r  r	   rT   r   r,   r   )r   r  r   r  Zlayerr   r   r   r#    s"    	z DataFrameIOLayer.project_columnsc                 C  s   d | jt| j| jS )Nz3DataFrameIOLayer<name='{}', n_parts={}, columns={}>)r   rT   r.   r   r  rZ   r   r   r   r[     s
      zDataFrameIOLayer.__repr__)NFNN)
r"   r#   r$   r%   r   r   r  r#  r[   rR   r   r   rH   r   r  m  s   *    (
 r  c                      s.  e Zd ZU dZded< ded< ded< ded< ded	< d
ed< ded< ded< ded< ded< ded< ded< d?dddddd
dddddd fddZddddZd@d!d"Zd#d$ Zd%d& Z	d'd( Z
d)d* Zd+d, Zed-d. Zd/d0 Zd1d2 Zd3d4 Zd5d6 Zd7d8 Zd9d: Zd;d< Zed=d> Z  ZS )ADataFrameTreeReductionag  DataFrame Tree-Reduction Layer

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    name_input : str
        Name of the input layer that is being reduced.
    npartitions_input : str
        Number of partitions in the input layer.
    concat_func : callable
        Function used by each tree node to reduce a list of inputs
        into a single output value. This function must accept only
        a list as its first positional argument.
    tree_node_func : callable
        Function used on the output of ``concat_func`` in each tree
        node. This function must accept the output of ``concat_func``
        as its first positional argument.
    finalize_func : callable, optional
        Function used in place of ``tree_node_func`` on the final tree
        node(s) to produce the final output for each split. By default,
        ``tree_node_func`` will be used.
    split_every : int, optional
        This argument specifies the maximum number of input nodes
        to be handled by any one task in the tree. Defaults to 32.
    split_out : int, optional
        This argument specifies the number of output nodes in the
        reduction tree. If ``split_out`` is set to an integer >=1, the
        input tasks must contain data that can be indexed by a ``getitem``
        operation with a key in the range ``[0, split_out)``.
    output_partitions : list, optional
        List of required output partitions. This parameter is used
        internally by Dask for high-level culling.
    tree_node_name : str, optional
        Name to use for intermediate tree-node tasks.
    r  rT   r   r   r   r   r   tree_node_funczCallable | Nonefinalize_funcsplit_every	split_outz	list[int]output_partitionstree_node_namewidthsheightN    z
int | Nonezlist[int] | Nonez
str | Nonezdict[str, Any] | None)rT   r   r   r   r%  r&  r'  r(  r)  r*  r   c                   s   t  j|d || _|| _|| _|| _|| _|| _|| _|| _	|	d krXt
t| j	pRdn|	| _|
pjd| j | _| j}|g| _|dkrt|| j }| jt| q|t| j| _d S )Nr   rK   z
tree_node-)rG   r   rT   r   r   r   r%  r&  r'  r(  r{   rt   r)  r*  r+  mathceilr   r   r.   r,  )r   rT   r   r   r   r%  r&  r'  r(  r)  r*  r   r   rH   r   r   r     s(    zDataFrameTreeReduction.__init__r   splitc                G  s   | j r||f S |S r   )r(  )r   r1  Z
name_partsr   r   r   	_make_key?  s    z DataFrameTreeReduction._make_keyFc                 C  s(   |r| j r| j }n| j}tj|| j|fS r   )r&  r%  r   r   r   )r   
input_keys
final_taskZ
outer_funcr   r   r   _define_taskE  s    
z#DataFrameTreeReduction._define_taskc                   s  i }j s|S jjr`d7 j D ]6tjD ]&}tjj|ff|j|d< q6q(jdkrpj D ]tdjD ] tj	  D ]}j	 d  }j
| }t|j
 |} dkrfddt||D }n fddt||D } jd krF|dks,td	| d
j|dd|jf< qj|dd|jj| d< qqqrn8j D ]0jddg}j|dd|jf< qv|S )z%Construct graph for a tree reduction.z-splitr0     rK   c                   s   g | ]}j  |d qS )r0  )r2  r/   p)name_input_userP   r   r   r   rs   r  s   z;DataFrameTreeReduction._construct_graph.<locals>.<listcomp>c                   s$   g | ]}j j| d  dqS )rK   r0  )r2  r*  r7  )r   rP   r   r   r   rs   x  s      r   zgroup = z%, not 0 for final tree reduction taskT)r4  F)r)  r   r(  rt   r   r   r   r2  r,  r+  r'  minAssertionErrorr5  rT   r*  )r   rb   r8  groupZp_maxZlstartZlstopr3  r   )r   r9  rP   r   r   r`   M  s`    



 	   

z'DataFrameTreeReduction._construct_graphc                 C  s   d | j| j| jS )Nz>DataFrameTreeReduction<name='{}', input_name={}, split_out={}>)r   rT   r   r(  rZ   r   r   r   r[     s
      zDataFrameTreeReduction.__repr__c                   s    fdd j D S )Nc                   s   h | ]} j |fqS r   rY   rO   rZ   r   r   r     s     z6DataFrameTreeReduction._output_keys.<locals>.<setcomp>)r)  rZ   r   rZ   r   _output_keys  s    z#DataFrameTreeReduction._output_keysc                 C  s$   t | dr| jS |  }|| _| jS )N_cached_output_keys)r_   r>  r=  )r   Zoutput_keysr   r   r   rq     s
    
z&DataFrameTreeReduction.get_output_keysc                 C  s
   t | dS rl   rm   rZ   r   r   r   rn     s    z&DataFrameTreeReduction.is_materializedc                 C  s$   t | dr| jS |  }|| _| jS r\   r^   ra   r   r   r   rc     s
    
zDataFrameTreeReduction._dictc                 C  s
   | j | S r   rd   re   r   r   r   r8     s    z"DataFrameTreeReduction.__getitem__c                 C  s
   t | jS r   rg   rZ   r   r   r   ri     s    zDataFrameTreeReduction.__iter__c                 C  s>   t | jdd  pd| jpd }| jr:|| jt| j  S |S )NrK   )sumr+  r(  r   r.   r)  )r   Z	tree_sizer   r   r   rk     s     zDataFrameTreeReduction.__len__c              	   C  sP   t  }|D ]@}z|\}}W n tk
r2   Y q
Y nX || jkr@q
|| q
|S )z;Simple utility to convert keys to output partition indices.r   )r   rp   splitsrf   r   _splitr   r   r   _keys_to_output_partitions  s    

z1DataFrameTreeReduction._keys_to_output_partitionsc                 C  s2   t | j| j| j| j| j| j| j| j|| j	| j
dS )N)r&  r'  r(  r)  r*  r   )r$  rT   r   r   r   r%  r&  r'  r(  r*  r   )r   r)  r   r   r   r     s    zDataFrameTreeReduction._cullc                   sX    j df fddt jD i} |}|t jkrL |}||fS  |fS dS )z2Cull a DataFrameTreeReduction HighLevelGraph layerr   c                   s   h | ]} j |fqS r   r   rr   rZ   r   r   r     s    z.DataFrameTreeReduction.cull.<locals>.<setcomp>N)rT   rt   r   rB  r   r)  r   )r   rp   r   r   r)  r   r   rZ   r   r     s     


zDataFrameTreeReduction.cullc                 O  s^   ddl m} || j}|| j}| jr2|| j}nd }| j| j| j|||| j| j	| j
| jd
S )Nr   r   )
rT   r   r   r   r%  r&  r'  r(  r)  r*  )r   r   r   r%  r&  rT   r   r   r'  r(  r)  r*  )r   r   r    r   Z_concat_funcZ_tree_node_funcZ_finalize_funcr   r   r   r:     s"    

z0DataFrameTreeReduction.__dask_distributed_pack__c                   sd   ddl m} | f | }dd | D }| | B   fdd| D }t|||dS )Nr   r   c                 S  s   i | ]\}}t |t|qS r   r   r   r   r   r   r     s      zFDataFrameTreeReduction.__dask_distributed_unpack__.<locals>.<dictcomp>c                   s   i | ]\}}|t  |gqS r   r   r   ro   r   r   r     s      r   )r   r   r`   r   rp   r   r   )r<   r=   rb   r   r   r	  r   r   ro   r   r>     s    
z2DataFrameTreeReduction.__dask_distributed_unpack__)Nr-  NNNN)F)r"   r#   r$   r%   r?   r   r2  r5  r`   r[   r=  rq   rn   r   rc   r8   ri   rk   rB  r   r   r:   r@   r>   rR   r   r   rH   r   r$    sL   
%	      ('
J
	r$  )NN)1
__future__r   r|   r.  r   collectionsr   collections.abcr   	itertoolsr   typingr   Ztlzr   Ztlz.curriedr   Z	dask.baser	   Zdask.blockwiser
   r   r   r   Z	dask.corer   r   Zdask.highlevelgraphr   Z
dask.utilsr   r   r   r   r   r   r   r&   rA   rD   rS   r~   r   r   r   r   r   r  r$  r   r   r   r   <module>   s@    ~
7/   _  )x