U
    /eh                     @  s  d Z ddlmZ ddlZddlZddlmZ ddlmZm	Z	m
Z
 ddlmZmZmZ ddlmZ ddlZddlZddlmZ dd	lmZ dd
lmZ ddlmZmZmZ ddlmZ ddl m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' dd Z(dd Z)dd Z*dd Z+dd Z,d?ddZ-dd  Z.d!d" Z/d#d$ Z0d%d& Z1d'd( Z2d)d* Z3d+d, Z4d@d-d.Z5d/d0 Z6G d1d2 d2Z7d3d4 Z8d5d6 Z9d7d8 Z:d9d: Z;d;d;d<d=d>Z<dS )Az
The rechunk module defines:
    intersect_chunks: a function for
        converting chunks to new dimensions
    rechunk: a function to convert the blocks
        of an existing dask array to new chunks or blockshape
    )annotationsN)reduce)chaincountproduct)add
itemgettermul)warn)
accumulate)config)getitem)Arrayconcatenate3normalize_chunks)validate_axis)empty)tokenize)HighLevelGraph)parse_bytesc                   s    fdd| D S )zInternal utility for cumulative sum with label.

    >>> cumdims_label(((5, 3, 3), (2, 2, 1)), 'n')  # doctest: +NORMALIZE_WHITESPACE
    [(('n', 0), ('n', 5), ('n', 8), ('n', 11)),
     (('n', 0), ('n', 2), ('n', 4), ('n', 5))]
    c              	     s2   g | ]*}t t fd t|  ttd| qS )   r   )tupleziplenr   r   ).0Zbdsconst 6/tmp/pip-unpacked-wheel-dbjnr7gq/dask/array/rechunk.py
<listcomp>&   s   z!cumdims_label.<locals>.<listcomp>r   )chunksr   r   r   r   cumdims_label   s    
r"   c                 C  s   t t| | tddS )aG  

    >>> new = cumdims_label(((2, 3), (2, 2, 1)), 'n')
    >>> old = cumdims_label(((2, 2, 1), (5,)), 'o')

    >>> _breakpoints(new[0], old[0])
    (('n', 0), ('o', 0), ('n', 2), ('o', 2), ('o', 4), ('n', 5), ('o', 5))
    >>> _breakpoints(new[1], old[1])
    (('n', 0), ('o', 0), ('n', 2), ('n', 4), ('n', 5), ('o', 5))
    r   key)r   sortedr   )ZcumoldZcumnewr   r   r   _breakpoints,   s    r&   c                 C  s4  dd | D }t |d }|d d }d}d}d}d}g }g }	tdt | D ]}
| |
 \}}| |
d  \}}|dkr|}|	r||	 g }	nd}|| | }|}||kr|dkr|d7 }|}|dkrL|dkrL||krt||}|	||f qLnqL|	|t||f |dkrL|d7 }d}|}qL|	r0||	 |S )	a#  
    Internal utility to intersect chunks for 1d after preprocessing.

    >>> new = cumdims_label(((2, 3), (2, 2, 1)), 'n')
    >>> old = cumdims_label(((2, 2, 1), (5,)), 'o')

    >>> _intersect_1d(_breakpoints(old[0], new[0]))  # doctest: +NORMALIZE_WHITESPACE
    [[(0, slice(0, 2, None))],
     [(1, slice(0, 2, None)), (2, slice(0, 1, None))]]
    >>> _intersect_1d(_breakpoints(old[1], new[1]))  # doctest: +NORMALIZE_WHITESPACE
    [[(0, slice(0, 2, None))],
     [(0, slice(2, 4, None))],
     [(0, slice(4, 5, None))]]

    Parameters
    ----------

    breaks: list of tuples
        Each tuple is ('o', 8) or ('n', 8)
        These are pairs of 'o' old or new 'n'
        indicator with a corresponding cumulative sum,
        or breakpoint (a position along the chunking axis).
        The list of pairs is already ordered by breakpoint.
        Note that an 'o' pair always occurs BEFORE
        an 'n' pair if both share the same breakpoint.
    Uses 'o' and 'n' to make new tuples of slices for
    the new block crosswalk to old blocks.
    c                 S  s   g | ]}|d  dkr|qS )r   or   )r   pairr   r   r   r    e   s      z!_intersect_1d.<locals>.<listcomp>   r   r   nr'   )r   rangeappendslice)ZbreaksZo_pairsZlast_old_chunk_idxZ	last_o_brstartZlast_endZold_idxZ
last_o_endretZret_nextidxlabelbrZ
last_labelZlast_brendslcr   r   r   _intersect_1d:   sL    +


r6   c                 C  s   dd | D }dd |D }dd | D }dd |D }t |d}t |d}dd |D }d	d |D }	||	kstd
|d|	||kstddd t||D }
t|D ]*\}}|rdd t|D }|
|| q|
S )a  Helper to build old_chunks to new_chunks.

    Handles missing values, as long as the missing dimension
    is unchanged.

    Examples
    --------
    >>> old = ((10, 10, 10, 10, 10), )
    >>> new = ((25, 5, 20), )
    >>> _old_to_new(old, new)  # doctest: +NORMALIZE_WHITESPACE
    [[[(0, slice(0, 10, None)), (1, slice(0, 10, None)), (2, slice(0, 5, None))],
      [(2, slice(5, 10, None))],
      [(3, slice(0, 10, None)), (4, slice(0, 10, None))]]]
    c                 S  s"   g | ]}t d d |D s|qS )c                 s  s   | ]}t |V  qd S Nmathisnanr   yr   r   r   	<genexpr>   s     )_old_to_new.<locals>.<listcomp>.<genexpr>anyr   xr   r   r   r       s      z_old_to_new.<locals>.<listcomp>c                 S  s"   g | ]}t d d |D s|qS )c                 s  s   | ]}t |V  qd S r7   r8   r;   r   r   r   r=      s     r>   r?   rA   r   r   r   r       s      c                 S  s   g | ]}t d d |D qS )c                 s  s   | ]}t |V  qd S r7   r8   r;   r   r   r   r=      s     r>   sumrA   r   r   r   r       s     c                 S  s   g | ]}t d d |D qS )c                 s  s   | ]}t |V  qd S r7   r8   r;   r   r   r   r=      s     r>   rC   rA   r   r   r   r       s     r'   r+   c                 S  s   g | ]}t |qS r   rC   )r   r'   r   r   r   r       s     c                 S  s   g | ]}t |qS r   rC   )r   r+   r   r   r   r       s     zCannot change dimensions from z to zcChunks must be unchanging along unknown dimensions.

A possible solution:
  x.compute_chunk_sizes()c                 S  s"   g | ]}t t|d  |d qS )r   r   )r6   r&   )r   cmr   r   r   r       s     c                 S  s   g | ]}|t d dfgqS r   N)r.   r   ir   r   r   r       s     )r"   
ValueErrorr   	enumerater,   insert)
old_chunks
new_chunksZ	old_knownZ	new_knownZ	n_missingZ
n_missing2ZcmoZcmnZsumsZsums2
old_to_newr1   missingextrar   r   r   _old_to_new   s(    

rQ   c                 C  s(   t | |}t| }tdd |D }|S )a  
    Make dask.array slices as intersection of old and new chunks.

    >>> intersections = intersect_chunks(((4, 4), (2,)),
    ...                                  ((8,), (1, 1)))
    >>> list(intersections)  # doctest: +NORMALIZE_WHITESPACE
    [(((0, slice(0, 4, None)), (0, slice(0, 1, None))),
      ((1, slice(0, 4, None)), (0, slice(0, 1, None)))),
     (((0, slice(0, 4, None)), (0, slice(1, 2, None))),
      ((1, slice(0, 4, None)), (0, slice(1, 2, None))))]

    Parameters
    ----------

    old_chunks : iterable of tuples
        block sizes along each dimension (convert from old_chunks)
    new_chunks: iterable of tuples
        block sizes along each dimension (converts to new_chunks)
    c                 s  s   | ]}t t| V  qd S r7   )r   r   r   Zcrr   r   r   r=      s     z#intersect_chunks.<locals>.<genexpr>)rQ   r   r   )rL   rM   rN   cross1Zcrossr   r   r   intersect_chunks   s    
rT   autoFc                   s   j dkr"tdd  jD r" S t|tr fdd| D }t j D ]6}||krh j| ||< qL|| dkrL j| ||< qLt|tt	frtdd t
| jD }t| j| j jd}|s| jkrֈ S  j }t||kstd	|rtd
d |D }ttt|}t
| jD ]4\}}	||	kr"t|	s"t|s"td	q"t j| jj||}
|
D ]}t | qr S )a  
    Convert blocks in dask array x for new chunks.

    Parameters
    ----------
    x: dask array
        Array to be rechunked.
    chunks:  int, tuple, dict or str, optional
        The new block dimensions to create. -1 indicates the full size of the
        corresponding dimension. Default is "auto" which automatically
        determines chunk sizes.
    threshold: int, optional
        The graph growth factor under which we don't bother introducing an
        intermediate step.
    block_size_limit: int, optional
        The maximum block size (in bytes) we want to produce
        Defaults to the configuration value ``array.chunk-size``
    balance : bool, default False
        If True, try to make each chunk to be the same size.

        This means ``balance=True`` will remove any small leftover chunks, so
        using ``x.rechunk(chunks=len(x) // N, balance=True)``
        will almost certainly result in ``N`` chunks.

    Examples
    --------
    >>> import dask.array as da
    >>> x = da.ones((1000, 1000), chunks=(100, 100))

    Specify uniform chunk sizes with a tuple

    >>> y = x.rechunk((1000, 10))

    Or chunk only specific dimensions with a dictionary

    >>> y = x.rechunk({0: 1000})

    Use the value ``-1`` to specify that you want a single chunk along a
    dimension or the value ``"auto"`` to specify that dask can freely rechunk a
    dimension to attain blocks of a uniform block size

    >>> y = x.rechunk({0: -1, 1: 'auto'}, block_size_limit=1e8)

    If a chunk size does not divide the dimension then rechunk will leave any
    unevenness to the last chunk.

    >>> x.rechunk(chunks=(400, -1)).chunks
    ((400, 400, 200), (1000,))

    However if you want more balanced chunks, and don't mind Dask choosing a
    different chunksize for you then you can use the ``balance=True`` option.

    >>> x.rechunk(chunks=(400, -1), balance=True).chunks
    ((500, 500), (1000,))
    r   c                 s  s   | ]}|d kV  qdS rF   r   )r   sr   r   r   r=     s     zrechunk.<locals>.<genexpr>c                   s   i | ]\}}t | j|qS r   )r   ndim)r   cvrB   r   r   
<dictcomp>!  s     
 zrechunk.<locals>.<dictcomp>Nc                 s  s"   | ]\}}|d k	r|n|V  qd S r7   r   )r   lcrcr   r   r   r=   (  s     )limitdtypeZprevious_chunksz-Provided chunks are not consistent with shapec                 s  s   | ]}t |V  qd S r7   )_balance_chunksizes)r   chunkr   r   r   r=   5  s     )rW   allshape
isinstancedictitemsr,   r!   r   listr   r   r_   r   rI   maprD   r9   r:   plan_rechunkitemsize_compute_rechunk)rB   r!   	thresholdblock_size_limitZbalancerH   rW   Z
new_shapesnewoldstepsrX   r   rZ   r   rechunk   sN    9
    "    rq   c                 C  s   t ttt| S r7   )r   r	   rh   r   r!   r   r   r   _number_of_blocksF  s    rs   c                 C  s   t ttt| S r7   )r   r	   rh   maxrr   r   r   r   _largest_block_sizeJ  s    ru   c                 C  s   t tdd t| |D }|S )z5Estimate the graph size during a rechunk computation.c                 s  s6   | ].\}}||kr&t |t | d  nt |V  qdS r   Nr   )r   ocncr   r   r   r=   T  s   z&estimate_graph_size.<locals>.<genexpr>)r   r	   r   )rL   rM   Zcrossed_sizer   r   r   estimate_graph_sizeN  s    rz   c                 C  s`   g }| D ]N}t t|| }t|D ]"}|||  }|| ||8 }q&|dkstqt|S )zpMinimally divide the given chunks so as to make the largest chunk
    width less or equal than *max_width*.
    r   )intnpceilr,   r-   AssertionErrorr   )desired_chunks	max_widthr!   rX   Z
nb_dividesrH   r+   r   r   r   divide_to_width\  s    

r   c                   s  t  |kr S t }t |dkr|| }t  }|| }|| }|||  }|||  | }|| f| |f||   S t | }t  | }	 fddtt  d D }
t|
 t }|	dkrt|
\}}}|| dkr.|d7 }|| dkr|d7 }qt	|
|| ||  ||f qn6|| ||  |krdt	|
|| ||  ||f q|| dksvt
d||< |||< |	d8 }	qttd|S )zMinimally merge the given chunks so as to drop the number of
    chunks below *max_number*, while minimizing the largest width.
    r   c                   s*   g | ]"} |  |d    ||d  fqS r   r   rG   r   r   r   r      s   z#merge_to_number.<locals>.<listcomp>r   N)r   setpoprD   r,   heapqheapifyrg   heappopheappushr~   r   filter)r   
max_numberZdistinctwr+   totalZdesired_widthwidthadjustZnmergesheapr!   rH   jr   r   r   merge_to_numberk  sD    




r   c                   sZ  t | }dd | D dd |D dd tt| |D fddt|D  fddt|D } fdd	}t||d
}tt}t| }d}	|D ]}
||
  |
 pd }||kr||
 ||
< |}q|
 }t|| | }t	||
 |}t |t | |
 kr(|||
< |t
| | }d}	q|t|ks@t||ksNtt||	fS )z
    Find an intermediate rechunk that would merge some adjacent blocks
    together in order to get us nearer the *new_chunks* target, without
    violating the *block_size_limit* (in number of elements).
    c                 S  s   g | ]}t |qS r   rt   r   rX   r   r   r   r      s     z&find_merge_rechunk.<locals>.<listcomp>c                 S  s   g | ]}t |qS r   r   r   r   r   r   r      s     c                 S  s&   i | ]\}\}}|t |t | qS r   rw   )r   dimrx   ry   r   r   r   r[     s   
 z&find_merge_rechunk.<locals>.<dictcomp>c                   s"   i | ]}| | | pd  qS r   r   r   r   )new_largest_widthold_largest_widthr   r   r[     s    c                   s   g | ]} | d kr|qS )g      ?r   r   )graph_size_effectr   r   r      s      c                   s<   |  } |  }|dkrd}|dkr8t |t | S dS )Nr   g0D   ?r   )r|   log)kZgseZbse)block_size_effectr   r   r   r$     s
    zfind_merge_rechunk.<locals>.keyr#   Fr   T)r   rJ   r   r,   r%   r   r	   rg   r{   r   rt   ru   r~   r   )rL   rM   rm   rW   Zmerge_candidatesr$   Zsorted_candidatesZlargest_block_sizer!   memory_limit_hitr   Znew_largest_block_sizeZlargest_widthZchunk_limitrX   r   )r   r   r   r   r   find_merge_rechunk  s>    
r   c           	      C  s   t | }t| }t|D ]}t||}||kr2 qt | | t || krLqtt | | | | }t|| |}t ||kstt |t | | krt|t| | kr|||< qt|S )z
    Find an intermediate rechunk that would split some chunks to
    get us nearer *new_chunks*, without violating the *graph_size_limit*.
    )	r   rg   r,   rz   r{   r   r~   rt   r   )	rL   rM   Zgraph_size_limitrW   r!   r   
graph_sizer   rX   r   r   r   find_split_rechunk  s    
(
r   c                 C  s:  |pt d}|pt d}t|tr.t|}t|}g }dd | D }|dks`t|r`t|rj||g S || }t| }t|}	t	|||	g}|t
| t
|  }
| }d}t||}||
k rq0|r|}nt|||| }t|||\}}||kr|r0||krq0||kr|| |}|s*q0d}q||g S )a3  Plan an iterative rechunking from *old_chunks* to *new_chunks*.
    The plan aims to minimize the rechunk graph size.

    Parameters
    ----------
    itemsize: int
        The item size of the array
    threshold: int
        The graph growth factor under which we don't bother
        introducing an intermediate step
    block_size_limit: int
        The maximum block size (in bytes) we want to produce during an
        intermediate step

    Notes
    -----
    No intermediate steps will be planned if any dimension of ``old_chunks``
    is unknown.
    zarray.rechunk-thresholdzarray.chunk-sizec                 S  s   g | ]}t d d |D qS )c                 s  s   | ]}t |V  qd S r7   r8   r;   r   r   r   r=   #  s     z*plan_rechunk.<locals>.<listcomp>.<genexpr>r?   rA   r   r   r   r    #  s     z plan_rechunk.<locals>.<listcomp>r   TF)r   getrd   strr   r   rb   r@   ru   rt   rs   rz   r   r   r-   )rL   rM   rj   rl   rm   rW   rp   Zhas_nansZlargest_old_blockZlargest_new_blockZgraph_size_thresholdZcurrent_chunksZ
first_passr   r!   r   r   r   r   ri     sT    


    

ri   c                   s
  j dkrtj|jdS j}tj|}t }t }t|}d| }d| }t	 }	t
jdd jD dd}
t
|
jD ]}jf| |
|< qtd	d
 |D  }t||D ]"\} |f| } fddt|D fddt|D }t
j|dd}|j}t D ]|\}}t| \}}|t|	f}|
| dd }tfdd
tt||D rn|
| ||< nt|
| |f||< |||< q||j d ksttdd
 |jD r|jd ||< qt| f||< q~
~t||}tj||gd}t|||dS )z1Compute the rechunk of *x* to the given *chunks*.r   )r!   r_   zrechunk-merge-zrechunk-split-c                 S  s   g | ]}t |qS r   rw   r   r   r   r   r    i  s     z$_compute_rechunk.<locals>.<listcomp>O)r_   c                 s  s   | ]}t t|V  qd S r7   )r,   r   r   r   r   r   r=   n  s     z#_compute_rechunk.<locals>.<genexpr>c                   s   g | ]  fd dD qS )c                   s   g | ]}|  d  qS r   r   rR   rH   r   r   r    r  s     z/_compute_rechunk.<locals>.<listcomp>.<listcomp>r   )r   )rS   r   r   r    r  s     c                   s   g | ]}t t | qS r   )r   r   rG   )old_block_indicesr   r   r    s  s     r   Nc                 3  s4   | ],\}\}}|j d ko*|j j| | kV  qdS rF   )r/   stopr!   )r   rH   r5   indrZ   r   r   r=   }  s   
c                 s  s   | ]}|d kV  qdS rv   r   )r   dr   r   r   r=     s     )Zdependencies)meta)sizer   rc   r_   rW   rT   r!   re   r   r   r|   Zndindexnamer   r   r,   ZflatrJ   nextrb   r   r~   r   tolisttoolzmerger   Zfrom_collectionsr   )rB   r!   rW   ZcrossedZx2ZintermediatestokenZ
merge_nameZ
split_nameZsplit_name_suffixesZ
old_blocksindexZ	new_indexZnew_idxr$   Zsubdims1Zrec_cat_argZrec_cat_arg_flatZrec_cat_indexZ
ind_slicesZold_block_indexZslicesr   Z	old_indexZlayergraphr   )rS   r   rB   r   rk   X  sL    



rk   c                   @  s    e Zd Zdd Zdd ZeZdS )_PrettyBlocksc                 C  s
   || _ d S r7   blocks)selfr   r   r   r   __init__  s    z_PrettyBlocks.__init__c                 C  s2  g }g }d}| j D ]}|rf|d |krf|dkr\t|dkr\|d |d d f |dd  }|d7 }q|dkrt|dks~t||d |d f g }d}|| q|r|dkr|d |f n&t|dkst||d |d f g }|D ]4\}}|d kr|t| q|d||f  qd|S )Nr   r*   r   z%d*[%s]z | )r   r   r-   r~   r   join)r   runsrunZrepeatsrX   partsr   r   r   __str__  s4    


z_PrettyBlocks.__str__N)__name__
__module____qualname__r   r   __repr__r   r   r   r   r     s    r   c                 C  s(   t | trtdd | D s tt| S )z
    Pretty-format *blocks*.

    >>> format_blocks((10, 10, 10))
    3*[10]
    >>> format_blocks((2, 3, 4))
    [2, 3, 4]
    >>> format_blocks((10, 10, 5, 6, 2, 2, 2, 7))
    2*[10] | [5, 6] | 3*[2] | [7]
    c                 s  s"   | ]}t |tpt|V  qd S r7   )rd   r{   r9   r:   rA   r   r   r   r=     s    z format_blocks.<locals>.<genexpr>)rd   r   rb   r~   r   r   r   r   r   format_blocks  s    r   c                 C  s    t | tsttdd | D S )zH
    >>> format_chunks((10 * (3,), 3 * (10,)))
    (10*[3], 3*[10])
    c                 s  s   | ]}t |V  qd S r7   )r   r   r   r   r   r=     s     z format_chunks.<locals>.<genexpr>)rd   r   r~   rr   r   r   r   format_chunks  s    r   c                 C  s   dd | D S )zs
    >>> format_plan([((10, 10, 10), (15, 15)), ((30,), (10, 10, 10))])
    [(3*[10], 2*[15]), ([30], 3*[10])]
    c                 S  s   g | ]}t |qS r   )r   r   r   r   r   r      s     zformat_plan.<locals>.<listcomp>r   )Zplanr   r   r   format_plan  s    r   c                 C  s0   | | }| | }|g| }|r(| | t|S r7   )r-   r   )r+   	chunksizeleftovern_chunksr!   r   r   r   _get_chunks  s    

r   ztuple[int, ...])r!   returnc                   s   t  t}t |d }t dt  kr<d8  fddt|| || d D }fdd|D }t|std  S dd |D }t 	|}|| S )	z
    Balance the chunk sizes

    Parameters
    ----------
    chunks : tuple[int, ...]
        Chunk sizes for Dask array.

    Returns
    -------
    new_chunks : tuple[int, ...]
        New chunks for Dask array with balanced sizes.
    r)   g      ?r   c                   s   g | ]}t t |qS r   )r   rD   )r   Z	chunk_lenrr   r   r   r      s   z'_balance_chunksizes.<locals>.<listcomp>c                   s   g | ]}t | kr|qS r   rw   r   )r   r   r   r       s      zSchunk size balancing not possible with given chunks. Try increasing the chunk size.c                 S  s   g | ]}t |t| qS r   )rt   minr   r   r   r   r      s     )
r|   ZmedianZastyper{   r   r   rt   r,   r
   Zargmin)r!   Z
median_lenZepsrM   Zpossible_chunksZdiffsZbest_chunk_sizer   )r!   r   r   r`     s"    

r`   )rU   NNF)NN)=__doc__
__future__r   r   r9   	functoolsr   	itertoolsr   r   r   operatorr   r   r	   warningsr
   Znumpyr|   Ztlzr   r   Zdaskr   Zdask.array.chunkr   Zdask.array.corer   r   r   Zdask.array.utilsr   Zdask.array.wrapr   Z	dask.baser   Zdask.highlevelgraphr   Z
dask.utilsr   r"   r&   r6   rQ   rT   rq   rs   ru   rz   r   r   r   r   ri   rk   r   r   r   r   r   r`   r   r   r   r   <module>   sR   c,
b6I   
R='	
