U
    /e                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZmZmZ d dl	m
Z
mZmZmZmZmZmZ dd Zdd Zd"d
dZdd Zd#ddZd$ddZdd Zdd Zd%ddZG dd deZejZddeeeeeefddZdd ZG d d! d!ZdS )&    N)Enum)configcoreutils)flattenget_dependencies
ishashableistaskreverse_dictsubstoposortc           
      C   s   t |ttfs|g}t }t }i }ttt|}|rg }|D ]L}t| |dd}| | ||< |||< |D ] }	|	|krj||	 ||	 qjq@|}q4||fS )a<  Return new dask with only the tasks required to calculate keys.

    In other words, remove unnecessary tasks from dask.
    ``keys`` may be a single key or list of keys.

    Examples
    --------
    >>> def inc(x):
    ...     return x + 1

    >>> def add(x, y):
    ...     return x + y

    >>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)}
    >>> dsk, dependencies = cull(d, 'out')
    >>> dsk                                                     # doctest: +ELLIPSIS
    {'out': (<function add at ...>, 'x', 10), 'x': 1}
    >>> dependencies                                            # doctest: +ELLIPSIS
    {'out': ['x'], 'x': []}

    Returns
    -------
    dsk: culled dask graph
    dependencies: Dict mapping {key: [deps]}.  Useful side effect to accelerate
        other optimizations, notably fuse.
    TZas_list)
isinstancelistsetdictr   r   addappend)
dskkeysseendependenciesoutworknew_workkZdependencies_kd r   5/tmp/pip-unpacked-wheel-dbjnr7gq/dask/optimization.pycull   s$    
r   c                 C   s   t | d }|tkrDdd | ddd D }|| d  d|S |tkrt| d dkrt| d d trdd | ddd D }|| d d  d|f| d dd  S dS dS )	zCreate new keys for fused tasksr   c                 S   s   g | ]}t |qS r   r   	key_split.0xr   r   r   
<listcomp>I   s     z5default_fused_linear_keys_renamer.<locals>.<listcomp>N-c                 S   s   g | ]}t |qS r   r    r"   r   r   r   r%   M   s        )typestrr   jointuplelenr   )r   typnamesr   r   r   !default_fused_linear_keys_renamerE   s    
*r0   Tc                    sB  |dk	r.t |ts.t |ts"|g}tt|}|dkrH fdd D }i }t } D ]x}|| }t|dk}|D ]Z}	|dk	r|	|kr||	 qr|	|kr||	= ||	 qr|r||	 qr|	|krr|||	< qrqVg }
ttt|	 }|rd|
 \}	}|	|g}||kr(||}||= || q |  |	|krX||	}	||	= ||	 q0|
| qdd |	 D }|dkrt}n|dkrd}n|}i }t }t }d}|
D ]}|dk	r||}|dk	o| ko||k}| }	 |	 }|rF| }|| ||	 || |	 t | |	|}||	 |}	q||	 |r|||< |||	< ||	 ||< |h||	< ||	 n|||	< q 	 D ]\}}||kr|||< q|r:|	 D ]J\}}||@ D ]6}|| }|| || t|| ||||< qؐq|dk	r:|| D ]}||= ||= q&||fS )a  Return new dask graph with linear sequence of tasks fused together.

    If specified, the keys in ``keys`` keyword argument are *not* fused.
    Supply ``dependencies`` from output of ``cull`` if available to avoid
    recomputing dependencies.

    **This function is mostly superseded by ``fuse``**

    Parameters
    ----------
    dsk: dict
    keys: list
    dependencies: dict, optional
        {key: [list-of-keys]}.  Must be a list to provide count of each key
        This optional input often comes from ``cull``
    rename_keys: bool or func, optional
        Whether to rename fused keys with ``default_fused_linear_keys_renamer``
        or not.  Renaming fused keys can keep the graph more understandable
        and comprehensive, but it comes at the cost of additional processing.
        If False, then the top-most key will be used.  For advanced usage, a
        func is also accepted, ``new_key = rename_keys(fused_key_list)``.

    Examples
    --------
    >>> def inc(x):
    ...     return x + 1

    >>> def add(x, y):
    ...     return x + y

    >>> d = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')}
    >>> dsk, dependencies = fuse(d)
    >>> dsk # doctest: +SKIP
    {'a-b-c': (inc, (inc, 1)), 'c': 'a-b-c'}
    >>> dsk, dependencies = fuse(d, rename_keys=False)
    >>> dsk # doctest: +ELLIPSIS
    {'c': (<function inc at ...>, (<function inc at ...>, 1))}
    >>> dsk, dependencies = fuse(d, keys=['b'], rename_keys=False)
    >>> dsk  # doctest: +ELLIPSIS
    {'b': (<function inc at ...>, 1), 'c': (<function inc at ...>, 'b')}

    Returns
    -------
    dsk: output graph with keys fused
    dependencies: dict mapping dependencies after fusion.  Useful side effect
        to accelerate other downstream optimizations.
    Nc                    s   i | ]}|t  |d dqS Tr   r   r#   r   r   r   r   
<dictcomp>   s      zfuse_linear.<locals>.<dictcomp>r(   c                 S   s   i | ]\}}|t |qS r   r   r#   r   vr   r   r   r5      s      TF)r   r   r   r   r-   r   r   mapreverseditemspopitempopr   reverser0   updateremover   )r   r   r   rename_keyschild2parent	unfusibleparentdepshas_many_childrenchildchainsparent2childchainkey_renamerrvZfusedaliasesZ
is_renamedZnew_keyvalkeyZold_keyr   r4   r   fuse_linearT   s    0















rP   c                 C   s8   | d krt  S t| t r| S t| tt fs0| g} t | S N)r   r   r   )r$   r   r   r   	_flat_set   s    
rR   c                    s2   r,t tt  tr,dd   D  t|} dkrNfddD  |rp| fdd D  tfdd|D  d}i }|D ]L}| }| | @ D ]*}||kr|| }	n| }	t	|||	}q|||< q|
 }
 D ]<\}}||
kr| | @ D ]}t	|||| }q||
|< q|
S )	a  Return new dask with the given keys inlined with their values.

    Inlines all constants if ``inline_constants`` keyword is True. Note that
    the constant keys will remain in the graph, to remove them follow
    ``inline`` with ``cull``.

    Examples
    --------
    >>> def inc(x):
    ...     return x + 1

    >>> def add(x, y):
    ...     return x + y

    >>> d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 'x', 'y')}
    >>> inline(d)       # doctest: +ELLIPSIS
    {'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, 'y')}

    >>> inline(d, keys='y') # doctest: +ELLIPSIS
    {'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, (<function inc at ...>, 1))}

    >>> inline(d, keys='y', inline_constants=False) # doctest: +ELLIPSIS
    {'x': 1, 'y': (<function inc at ...>, 'x'), 'z': (<function add at ...>, 'x', (<function inc at ...>, 'x'))}
    c                 S   s   i | ]\}}|t |qS r   r6   r7   r   r   r   r5   	  s      zinline.<locals>.<dictcomp>Nc                    s   i | ]}|t  |qS r   r2   r3   r4   r   r   r5     s      c                 3   s6   | ].\}}t |r|ks* | st|s|V  qd S rQ   )r   r	   r7   r   r   r   r   	<genexpr>  s      zinline.<locals>.<genexpr>c                    s   i | ]}| kr| | qS r   r   r3   r4   r   r   r5     s       )r   )r   nextitervaluesr   r;   rR   r?   r   r   copy)r   r   inline_constantsr   ZreplaceorderZkeysubsrO   rN   depreplaceZdsk2itemr   rS   r   inline   s:     


r]   Fc                    s   sS t t |dkr2fddD }t| fdd fdd D }|rt|||d|D ]
}|= qxS )	a  Inline cheap functions into larger operations

    Examples
    --------
    >>> inc = lambda x: x + 1
    >>> add = lambda x, y: x + y
    >>> double = lambda x: x * 2
    >>> dsk = {'out': (add, 'i', 'd'),  # doctest: +SKIP
    ...        'i': (inc, 'x'),
    ...        'd': (double, 'y'),
    ...        'x': 1, 'y': 1}
    >>> inline_functions(dsk, [], [inc])  # doctest: +SKIP
    {'out': (add, (inc, 'x'), 'd'),
     'd': (double, 'y'),
     'x': 1, 'y': 1}

    Protect output keys.  In the example below ``i`` is not inlined because it
    is marked as an output key.

    >>> inline_functions(dsk, ['i', 'out'], [inc, double])  # doctest: +SKIP
    {'out': (add, 'i', (double, 'y')),
     'i': (inc, 'x'),
     'x': 1, 'y': 1}
    Nc                    s   i | ]}|t  |qS r   r2   r3   r4   r   r   r5   T  s      z$inline_functions.<locals>.<dictcomp>c                    s,   zt |  W S  tk
r&   Y dS X d S )NF)functions_ofissubset	TypeError)r8   )fast_functionsr   r   	inlinableW  s    z#inline_functions.<locals>.inlinablec                    s4   g | ],\}}t |r | r|kr|r|qS r   )r	   r7   )
dependentsrb   outputr   r   r%   ]  s      z$inline_functions.<locals>.<listcomp>)rY   r   )r   r
   r;   r]   )r   rd   ra   rY   r   r   r   r   )rc   r   ra   rb   rd   r   inline_functions1  s*       re   c                 C   s   t | dr| j} q | S )Nfunc)hasattrrf   )rf   r   r   r   unwrap_partiall  s    
rh   c                 C   st   t  }| g}tth}|rpg }|D ]H} t| |kr t| r^|t| d  || dd  q ||  q |}q|S )a1  Set of functions contained within nested task

    Examples
    --------
    >>> inc = lambda x: x + 1
    >>> add = lambda x, y: x + y
    >>> mul = lambda x, y: x * y
    >>> task = (add, (mul, 1, 2), (inc, 3))  # doctest: +SKIP
    >>> functions_of(task)  # doctest: +SKIP
    set([add, mul, inc])
    r   r(   N)r   r   r,   r)   r	   r   rh   extend)Ztaskfuncsr   Zsequence_typesr   r   r   r   r^   r  s    r^   x   c           	         s   t | }t|}t|} r$ d8   fdd}|tkr~t|}dd |D }|| t|}|| d	|}||S |t
krt|dkrt|d trt|}dd |D }|| t|}||d  d	|}||f|d	d
  S d
S )zCreate new keys for ``fuse`` tasks.

    The optional parameter `max_fused_key_length` is used to limit the maximum string length for each renamed key.
    If this parameter is set to `None`, there is no limit.
       c                    s>    r:t |  kr:t| dd d }| d    d| } | S )Nr$      r'   )r-   hash)Zkey_nameZ	name_hashmax_fused_key_lengthr   r   _enforce_max_key_limit  s    z:default_fused_keys_renamer.<locals>._enforce_max_key_limitc                 S   s   h | ]}t |qS r   r    r3   r   r   r   	<setcomp>  s     z-default_fused_keys_renamer.<locals>.<setcomp>r'   r   c                 S   s   h | ]}t |qS r   r    r3   r   r   r   rr     s     r(   N)r:   rU   r)   r*   r   r!   discardsortedr   r+   r,   r-   r   )	r   rp   itZ	first_keyr.   rq   Z
first_namer/   Zconcatenated_namer   ro   r   default_fused_keys_renamer  s,    



"


rv   c                   @   s   e Zd ZdZedddZdS )Defaultr   )returnc                 C   s   dS )Nz	<default>r   selfr   r   r   __repr__  s    zDefault.__repr__N)__name__
__module____qualname__tokenr*   r{   r   r   r   r   rw     s   rw   c	           >   
      s  t ddkr |fS |dk	rDt|tsDt|ts8|g}tt|}|tkrbt d}|tk	sbt|tkrt d}|tk	st|tkrt d}|tk	st|dkr|d }|tkrt d}|tk	st|dkrd|t	|d	   }|tkrt d
}|tk	st|dkrd}|r&|s. |fS |tkrPt d}|tk	sPt|dkr`t
}	n(|dkrpd}	nt|stdn|}	|	dk	}|dkr fdd D }
nt|}
i }|
 D ]F\}}|D ]*}||kr|g||< n|| | qt||
|< qdd | D }|r*||8 }  D ]4\}}t|tk	r2t|tjtfs2|| q2|s|rtdd | D r |
fS   }i }g }g }|
j}|j}|j}|j}|j}|j}|j}|j}|j}|j}|r:| }|| ||kr|| d }q|| |||
| @  |d }||kr||
| @ } | rr||  |}|d }||
| @ } qH|  |||| |r|gndd	d	d	d|
| | f q*|  |
| }!|!| }"|!|" } t| }#|#d	krx| \}$}%}&}'}(})}*}+t|+},|*|,d	   krdkr"n n|,d	 }*|"|+O }"t|"|,k}-|-sD|*d	7 }*|)|* |' |kr|-sf|'|k rt | |$|%}.|!|$ |!||$O }!||$= ||$ |r|&| |&||< ||$d |r|-r|||.|&|'|(|)|*|"f n |||.|&|'d	 |(|)d	 |*|"f n|.||< qnb|%||$< ||$ |r6|*t|d	 krFt|d	 }*|||| |r\|gndd	|(d	|*|"f nqng }&d	}'d}(d}/d})d}*t }+d}0||# d }1||# d= |1D ]n\}2}2}2}3}4}5}6}7|3d	kr|/d	7 }/n|3|'kr|3}'|(|47 }(|)|57 })|*|67 }*t|7|0krt|7}0|+|7O }+qt|+},|*t |#d	 t!d|,|0 7 }*|*|,d	   krhdkrtn n|,d	 }*|"|+O }"t|"|,k}-|-s|*d	7 }*|)|* |' |kr|/|kr|(|kr|'|kr|-s|'|k r | }.t }8|1D ]V}9|9d }:t|.|:|9d	 }.||:= |8||:O }8||: |r||:d |&|9d  q|!| 8 }!|!|8O }!|rh|&| |&||< |r|||.|&|'d	 |(|)d	 |*|"f n|.||< qn|1D ]"}9|9d	 ||9d < ||9d  q|r6|(|kr|}(|*t|d	 krt|d	 }*|||| |r|gndd	|(d	|*|"f nq|| d }q*q|rPt"|||
|| |	r| D ]P\};}<|	|<}=|=dk	r^|=|kr^||; ||=< |=||;< |
|; |
|=< |=h|
|;< q^||
fS )am  Fuse tasks that form reductions; more advanced than ``fuse_linear``

    This trades parallelism opportunities for faster scheduling by making tasks
    less granular.  It can replace ``fuse_linear`` in optimization passes.

    This optimization applies to all reductions--tasks that have at most one
    dependent--so it may be viewed as fusing "multiple input, single output"
    groups of tasks into a single task.  There are many parameters to fine
    tune the behavior, which are described below.  ``ave_width`` is the
    natural parameter with which to compare parallelism to granularity, so
    it should always be specified.  Reasonable values for other parameters
    will be determined using ``ave_width`` if necessary.

    Parameters
    ----------
    dsk: dict
        dask graph
    keys: list or set, optional
        Keys that must remain in the returned dask graph
    dependencies: dict, optional
        {key: [list-of-keys]}.  Must be a list to provide count of each key
        This optional input often comes from ``cull``
    ave_width: float (default 1)
        Upper limit for ``width = num_nodes / height``, a good measure of
        parallelizability.
        dask.config key: ``optimization.fuse.ave-width``
    max_width: int (default infinite)
        Don't fuse if total width is greater than this.
        dask.config key: ``optimization.fuse.max-width``
    max_height: int or None (default None)
        Don't fuse more than this many levels. Set to None to dynamically
        adjust to ``1.5 + ave_width * log(ave_width + 1)``.
        dask.config key: ``optimization.fuse.max-height``
    max_depth_new_edges: int or None (default None)
        Don't fuse if new dependencies are added after this many levels.
        Set to None to dynamically adjust to ave_width * 1.5.
        dask.config key: ``optimization.fuse.max-depth-new-edges``
    rename_keys: bool or func, optional (default True)
        Whether to rename the fused keys with ``default_fused_keys_renamer``
        or not.  Renaming fused keys can keep the graph more understandable
        and comprehensive, but it comes at the cost of additional processing.
        If False, then the top-most key will be used.  For advanced usage, a
        function to create the new name is also accepted.
        dask.config key: ``optimization.fuse.rename-keys``
    fuse_subgraphs : bool or None, optional (default None)
        Whether to fuse multiple tasks into ``SubgraphCallable`` objects.
        Set to None to let the default optimizer of individual dask collections decide.
        If no collection-specific default exists, None defaults to False.
        dask.config key: ``optimization.fuse.subgraphs``

    Returns
    -------
    dsk
        output graph with keys fused
    dependencies
        dict mapping dependencies after fusion.  Useful side effect to accelerate other
        downstream optimizations.
    zoptimization.fuse.activeFNzoptimization.fuse.ave-widthzoptimization.fuse.max-heightz%optimization.fuse.max-depth-new-edgesg      ?zoptimization.fuse.max-widthr(   zoptimization.fuse.subgraphszoptimization.fuse.rename-keysTz)rename_keys must be a boolean or callablec                    s   i | ]}|t  |d dqS r1   r2   r3   r4   r   r   r5   :  s      zfuse.<locals>.<dictcomp>c                 S   s    h | ]\}}t |d kr|qS )r(   )r-   )r#   r   valsr   r   r   rr   G  s      zfuse.<locals>.<setcomp>c                 s   s   | ]}t t|d kV  qdS )r(   N)r-   r   )r#   r8   r   r   r   rT   P  s     zfuse.<locals>.<genexpr>r   r&      )#r   getr   r   r   r   _defaultAssertionErrormathlogrv   callabler`   r   r;   r   r)   r,   numbersNumberr*   rs   allrW   rX   r=   r   r@   ri   r-   r   intminmax_inplace_fuse_subgraphs)>r   r   r   Z	ave_width	max_widthZ
max_heightZmax_depth_new_edgesrA   Zfuse_subgraphsrK   rE   Zrdepsr   r   r8   Z	reduciblerL   fused_treesZ
info_stackZchildren_stackZdeps_popZreducible_addZreducible_popZreducible_removeZfused_trees_popZinfo_stack_appendZinfo_stack_popZchildren_stack_appendZchildren_stack_extendZchildren_stack_poprD   rG   childrenZdeps_parentedgesZnum_childrenZ	child_keyZ
child_taskZ
child_keysheightwidthZ	num_nodesZfudgeZchildren_edgesZnum_children_edgesZno_new_edgesrN   Znum_single_nodesZmax_num_edgesZchildren_info_Z
cur_heightZ	cur_widthZcur_num_nodesZ	cur_fudgeZ	cur_edgesZchildren_depsZ
child_infoZ	cur_childZroot_keyZ
fused_keysaliasr   r4   r   fuse  sV   H















 




 






  



r   c                    s  i }t  } D ]x}|| }t|dk}	|D ]Z}
|dk	rJ|
|krJ||
 q*|
|krd||
= ||
 q*|	rt||
 q*|
|kr*|||
< q*qg }dd | D }|rD| \}
}|
|g}||kr||}||= || q|  |
|kr
||
}
||
= ||
 qd}|D ].}|t | 7 }|dkr||  qqq|D ]} fdd|D }|d }||d   }||< |dd D ]}||=  |= qt	|}t
|||f|  |< |rHg }|D ].}||d}|r|| n
|| q|||< qHdS )	zKSubroutine of fuse.

    Mutates dsk, dependencies, and fused_trees inplacer(   Nc                 S   s   i | ]\}}||qS r   r   r7   r   r   r   r5     s      z+_inplace_fuse_subgraphs.<locals>.<dictcomp>r   c                    s   i | ]}| | qS r   r   r3   r4   r   r   r5     s      r&   F)r   r-   r   r;   r<   r=   r   r>   r	   r,   SubgraphCallableri   )r   r   r   r   rA   rB   rC   rD   rE   rF   rG   rH   rI   rJ   ZntasksrO   ZsubgraphoutkeyZ
inkeys_setr   inkeysZchain2Zsubchainr   r4   r   r   m  sh    






r   c                   @   sN   e Zd ZdZdZdddZdd Zdd	 Zd
d Zdd Z	dd Z
dd ZdS )r   aD  Create a callable object from a dask graph.

    Parameters
    ----------
    dsk : dict
        A dask graph
    outkey : hashable
        The output key from the graph
    inkeys : list
        A list of keys to be used as arguments to the callable.
    name : str, optional
        The name to use for the function.
    )r   r   r   nameNc                 C   s2   || _ || _|| _|d kr(dt  }|| _d S )Nzsubgraph_callable-)r   r   r   uuidZuuid4r   )rz   r   r   r   r   r   r   r   __init__  s    zSubgraphCallable.__init__c                 C   s   | j S rQ   )r   ry   r   r   r   r{     s    zSubgraphCallable.__repr__c                 C   s<   t | t |ko:| j|jko:| j|jko:t| jt|jkS rQ   )r)   r   r   r   r   rz   otherr   r   r   __eq__  s    

zSubgraphCallable.__eq__c                 C   s
   | |k S rQ   r   r   r   r   r   __ne__  s    zSubgraphCallable.__ne__c                 G   sJ   t |t | jks,tdt | jt |f t| j| jtt| j|S )NzExpected %d args, got %d)	r-   r   
ValueErrorr   r   r   r   r   zip)rz   argsr   r   r   __call__  s    zSubgraphCallable.__call__c                 C   s   t | j| j| j| jffS rQ   )r   r   r   r   r   ry   r   r   r   
__reduce__  s    zSubgraphCallable.__reduce__c                 C   s   t t| jt| j| jfS rQ   )rn   r,   r   	frozensetr   r   ry   r   r   r   __hash__  s    zSubgraphCallable.__hash__)N)r|   r}   r~   __doc__	__slots__r   r{   r   r   r   r   r   r   r   r   r   r     s   
r   )NNT)NTN)NFN)rk   ) r   r   r   enumr   Zdaskr   r   r   Z	dask.corer   r   r   r	   r
   r   r   r   r0   rP   rR   r]   re   rh   r^   rv   rw   r   r   r   r   r   r   r   r   r   <module>   sD   $3
 

C     
;
'
   .G