U
    /e                     @   sx   d Z ddlmZmZ ddlmZ ddlmZmZm	Z	m
Z
 dddZdd	 Zd
d ZG dd dZeddZdddZdS )a   Static order of nodes in dask graph

Dask makes decisions on what tasks to prioritize both

*  Dynamically at runtime
*  Statically before runtime

Dynamically we prefer to run tasks that were just made available.  However when
several tasks become available at the same time we have an opportunity to break
ties in an intelligent way

        d
        |
    b   c
     \ /
      a

For example after we finish ``a`` we can choose to run either ``b`` or ``c``
next.  Making small decisions like this can greatly affect our performance,
especially because the order in which we run tasks affects the order in which
we can release memory, which operationally we find to have a large affect on
many computation.  We want to run tasks in such a way that we keep only a small
amount of data in memory at any given time.


Static Ordering
---------------

And so we create a total ordering over all nodes to serve as a tie breaker.  We
represent this ordering with a dictionary mapping keys to integer values.
Lower scores have higher priority.  These scores correspond to the order in
which a sequential scheduler would visit each node.

    {'a': 0,
     'c': 1,
     'd': 2,
     'b': 3}

There are several ways in which we might order our keys.  This is a nuanced
process that has to take into account many different kinds of workflows, and
operate efficiently in linear time.  We strongly recommend that readers look at
the docstrings of tests in dask/tests/test_order.py.  These tests usually have
graph types laid out very carefully to show the kinds of situations that often
arise, and the order we would like to be determined.


Policy
------

Work towards *small goals* with *big steps*.

1.  **Small goals**: prefer tasks that have few total dependents and whose final
    dependents have few total dependencies.

    We prefer to prioritize those tasks that help branches of computation that
    can terminate quickly.

    With more detail, we compute the total number of dependencies that each
    task depends on (both its own dependencies, and the dependencies of its
    dependencies, and so on), and then we choose those tasks that drive towards
    results with a low number of total dependencies.  We choose to prioritize
    tasks that work towards finishing shorter computations first.

2.  **Big steps**: prefer tasks with many dependents

    However, many tasks work towards the same final dependents.  Among those,
    we choose those tasks with the most work left to do.  We want to finish
    the larger portions of a sub-computation before we start on the smaller
    ones.

3.  **Name comparison**: break ties with key name

    Often graphs are made with regular keynames.  When no other structural
    difference exists between two keys, use the key name to break ties.
    This relies on the regularity of graph constructors like dask.array to be a
    good proxy for ordering.  This is usually a good idea and a sane default.
    )defaultdict
namedtuple)log)get_dependenciesget_depsgetcyclereverse_dictNc           :         s   si S  dkr"fddD  t  t \t ttkrztd}tdddd |D  dd	  D }t|d
kotd
k}dd fdd  D D }|j} fdd} fdd} fdd}	fdd D }
i d}t	||dg}|j
}g }|j}|j}|j
}tt}tt}g }|j}|j
}|rvt|}nt }|j}|j}i }| }|j}g }|j}|j}tj} d}!|r@| }"|"krΐq|" rX|r|"|krH||" |  |" }#d
t|#  k rdk r6n n|t|#|dd n
||# ||# |sRqd}$n||"< |d
7 }|" }#d}%|" d d
krfdd	|#D }&|&r|#|&8 }#t|&d
krt|&|	d}&|&D ]}'||'< |d
7 }qd}%n,|r|D ] }'|'  d
8  < |#|' q|#r|#D ]}'|'  d
8  < qd}$nqn|rZ| }|j
}qnj|rhd}$n\|r@t }#|D ] }(|(krqx|( })||(< |d
7 }|( d d
kr<fdd	|)D }&|&r|)|&8 })t|&d
krt|&|	d}&|&D ]}'||'< |d
7 }qn,|r<|D ] }'|'  d
8  < |)|' q|)r|)D ]}'|'  d
8  < qFt|)d
kr|)\}(|( s|( })q|#|)O }#qxqqx|  | |#}#|#sqd}%d}$nq@|$r|rt }#|s|rdnd}%|D ]\}(}*|(krq|%r0t| |* d
kr0||( q|( })||(< |d
7 }|( d d
krfdd	|)D }&|&r|)|&8 })t|&d
krt|&|	d}&|&D ]}'||'< |d
7 }qn,|r|D ] }'|'  d
8  < |)|' q|)r|)D ]}'|'  d
8  < q|%r`|)|@ }+|+rdt|)t|+krVt|+d
kr|+\}(|( s|( })q0q|)|+ })nd}+t|)d
kr|)\}(|( s|+s|( })q0||( q|#|)O }#qq0q| |#}#|  |#sΐqd}%|#|@ }+|+r(t|#t|+krt|+d
kr|+\}'|' s|"||'< qd}%|#|+ }#t|#d
kr|#\}'|sj|%r`|'g}|j
}||' q|
|' },n:|
|' },|,|
|d  k r|| |'g}|j
}||' q|' s|"||'< n,|,|
|" k r||, |# n||, |# qt|#dk
r|#\}'})|
|' },|
|) }-|-|,k s4|,|-krH||)||'k rH|)|' }'})|-|, },}-|	r|
|d  }.|-|.k r|| ||)g |'g}|j
}||# |) 	s|$r||) n|"||)< n|,|.k 	r*|| |'g}|j
}||' |) s|$r||) n|"||)< n0|-|
|" k 	r||- |)g n||- |)g n|
|" }/|-|/k 	r^||, |'g ||- |)g nL|,|/k 	r||, |'g ||- |)g n ||, |'g ||- |)g q<|%
rz|) 	s|| |'g}|j
}||' |"||)< n|,|-k
r,d|
|"  d |, k
r,|| ||)g |'g}|j
}||# nL|| |'g}|j
}||' |-|
|" k 
rh||- |)g n||- |)g n|
|" }/|-|/k 
r||, |'g ||- |)g nL|,|/k 
r||, |'g ||- |)g n ||, |'g ||- |)g qtt}0|#D ]}'|0|
|'  |' q
|
|" }/|r(|
|d  }.g }1|0 D ]H\},}2|,|.k rh|1|, n(|,|/k r||, |2 n||, |2 qJ|1r<|| d
t|1k r|1jdd! |1D ]V},|0|, }3d
t|3  k rd"k rn n|3j|dd |d#d |3D  ||3 q| }|j
}q|%r t	|0}4|0
|4}5t|5d
kr\|5}|| nd$|/ d%t|5 t|5 |4 krt|5d"k r|5j|dd |d&d |5D  | }||5 n>t|5d"k rt	|5|dg}n
|5
 g}||4 |5 || |j
}|0 D ]2\},}2|,|/k r*||, |2 n||, |2 qqt tkrVq|rt|dd!D ]},|t||,  qhtt}|rfd'd(| D }#|#rd
t|#  k rd"k rn n|#j|dd |d)d |#D  ||# qq|rq|r|| }}q|!st|}6t|tkr:t|}| |}t|}7|6|7 }8|8|7ks|7|7|8 t|7|8   |7t|7 k rt	||d}"qt|d*k rt||dd}nt|}|j
}9d}!|r|"|kr|9 }"|"kr|9 }"q||" qS )+a  Order nodes in dask graph

    This produces an ordering over our tasks that we use to break ties when
    executing.  We do this ahead of time to reduce a bit of stress on the
    scheduler and also to assist in static analysis.

    This currently traverses the graph as a single-threaded scheduler would
    traverse it.  It breaks ties in the following ways:

    1.  Begin at a leaf node that is a dependency of a root node that has the
        largest subgraph (start hard things first)
    2.  Prefer tall branches with few dependents (start hard things first and
        try to avoid memory usage)
    3.  Prefer dependents that are dependencies of root nodes that have
        the smallest subgraph (do small goals that can terminate quickly)

    Examples
    --------
    >>> inc = lambda x: x + 1
    >>> add = lambda x, y: x + y
    >>> dsk = {'a': 1, 'b': 2, 'c': (inc, 'a'), 'd': (add, 'b', 'c')}
    >>> order(dsk)
    {'a': 0, 'c': 1, 'b': 2, 'd': 3}
    Nc                    s   i | ]}|t  |qS  )r   ).0k)dskr	   ./tmp/pip-unpacked-wheel-dbjnr7gq/dask/order.py
<dictcomp>q   s      zorder.<locals>.<dictcomp>z2Cycle detected between the following keys:
  -> %sz
  -> c                 s   s   | ]}t |V  qd S N)strr
   xr	   r	   r   	<genexpr>{   s     zorder.<locals>.<genexpr>c                 S   s   h | ]\}}|s|qS r	   r	   r
   r   vr	   r	   r   	<setcomp>   s      zorder.<locals>.<setcomp>   c                 S   s@   i | ]8\}}\}}}}}|| || ||| | |t |fqS r	   )StrComparable)r
   keynum_dependentstotal_dependentsmin_dependenciesmax_dependenciesmin_heightsmax_heightsr	   r	   r   r      s   c                 3   s,   | ]$\}}|s|t  | | fV  qd S r   lenr
   r   val)
dependentsmetricsr	   r   r      s   c                    s4   t |  t  |   |   |  d  t| fS )zChoose a path from our starting task to our tactical goal

        This path is connected to a large goal, but focuses on completing
        a small goal and being memory efficient.
           r!   r   r   )dependenciesr$   r%   
num_neededr	   r   dependents_key   s    zorder.<locals>.dependents_keyc              
      s\   t |  }|  \}}}}}| || ||| |   |t  |   |   ||t| f	S )z}Choose which dependency to run as part of a reverse DFS

        This is very similar to both ``initial_stack_key``.
        r'   )r   r   r   r   r   r   r   )r)   r$   r%   r*   total_dependenciesr	   r   dependencies_key   s$    zorder.<locals>.dependencies_keyc                    s   t  |   t| fS )zGDetermine the order of dependents that are ready to run and be releasedr'   r(   r)   r	   r   finish_now_key   s    zorder.<locals>.finish_now_keyc                    s4   i | ],\}\}}}}}|| |  d  ||  qS r   r	   )r
   r   r   r   _r   )r,   r	   r   r      s   r   r   Fi  T)r   reverser&   c                    s$   h | ]} | s| d kr|qS r0   r	   r
   depr$   r*   r	   r   r   x  s    c                    s$   h | ]} | s| d kr|qS r0   r	   r4   r6   r	   r   r     s    c                    s$   h | ]} | s| d kr|qS r0   r	   r4   r6   r	   r   r     s             )r3   d   c                 s   s   | ]}|gV  qd S r   r	   r4   r	   r	   r   r     s     
      c                 s   s   | ]}|gV  qd S r   r	   r4   r	   r	   r   r     s     c                    s   g | ]}| kr|qS r	   r	   r   resultr	   r   
<listcomp>  s      zorder.<locals>.<listcomp>c                 s   s   | ]}|gV  qd S r   r	   r4   r	   r	   r   r     s     i'  )r   ndependenciesgraph_metricsr!   r   RuntimeErrorjoinitems__getitem__minpopappendextendr   listsetupdateaddclear
differencesortedremovesortreversedtypedictr   ):r   r)   cycleZ
root_nodesZskip_root_nodeZ
init_stackZinitial_stack_keyr+   r-   r/   Zpartition_keysiZinner_stackZinner_stack_popZinner_stacksZinner_stacks_appendZinner_stacks_extendZinner_stacks_popZ
next_nodesZlater_nodesZouter_stackZouter_stack_extendZouter_stack_popseenZseen_updateZseen_addZsinglesZsingles_itemsZsingles_clearZlater_singlesZlater_singles_appendZlater_singles_clearZset_differenceZis_init_sorteditemdepsZprocess_singlesZadd_to_inner_stackZ
finish_nowr5   ZsingleZdep2parentZalready_seenr   key2Zprev_keyZitem_keyZ	dep_poolsZnow_keysvalspoolZmin_keyZmin_poolZprev_lenNmZinit_stack_popr	   )r)   r$   r   r%   r*   r>   r,   r   orderT   s   

	
(


 





























 





 





 

0

ra   c                    sj  i  dd |  D }g }|j}|j}|  D ]T\}}|s.|| }	d|	|	ddf |< | | D ]$}
||
  d8  < ||
 s\||
 q\q.|rf| }|| }t|dkr|\} | \}}}}}d| ||d| d| f |< nVt fdd|| D  \}}}}}dt| t|t|dt| dt| f |< | | D ](}
||
  d8  < ||
 s:||
 q:q S )ae  Useful measures of a graph used by ``dask.order.order``

    Example DAG (a1 has no dependencies; b2 and c1 are root nodes):

    c1
    |
    b1  b2
     \  /
      a1

    For each key we return:

    1.  **total_dependents**: The number of keys that can only be run
        after this key is run.  The root nodes have value 1 while deep child
        nodes will have larger values.

        1
        |
        2   1
         \ /
          4

    2.  **min_dependencies**: The minimum value of the total number of
        dependencies of all final dependents (see module-level comment for more).
        In other words, the minimum of ``ndependencies`` of root
        nodes connected to the current node.

        3
        |
        3   2
         \ /
          2

    3.  **max_dependencies**: The maximum value of the total number of
        dependencies of all final dependents (see module-level comment for more).
        In other words, the maximum of ``ndependencies`` of root
        nodes connected to the current node.

        3
        |
        3   2
         \ /
          3

    4.  **min_height**: The minimum height from a root node

        0
        |
        1   0
         \ /
          1

    5.  **max_height**: The maximum height from a root node

        0
        |
        1   0
         \ /
          2

    Examples
    --------
    >>> inc = lambda x: x + 1
    >>> dsk = {'a1': 1, 'b1': (inc, 'a1'), 'b2': (inc, 'a1'), 'c1': (inc, 'b1')}
    >>> dependencies, dependents = get_deps(dsk)
    >>> _, total_dependencies = ndependencies(dependencies, dependents)
    >>> metrics = graph_metrics(dependencies, dependents, total_dependencies)
    >>> sorted(metrics.items())
    [('a1', (4, 2, 3, 1, 2)), ('b1', (2, 3, 3, 1, 1)), ('b2', (1, 2, 2, 0, 0)), ('c1', (1, 3, 3, 0, 0))]

    Returns
    -------
    metrics: Dict[key, Tuple[int, int, int, int, int]]
    c                 S   s   i | ]\}}|r|t |qS r	   r    r   r	   r	   r   r   r  s       z!graph_metrics.<locals>.<dictcomp>r   r   c                 3   s   | ]} | V  qd S r   r	   )r
   r[   r=   r	   r   r     s     z graph_metrics.<locals>.<genexpr>)rD   rG   rH   r!   zipsumrF   max)r)   r$   r,   r*   currentcurrent_popcurrent_appendr   rZ   r#   childparentsr[   r   r   r   r   r   r	   r=   r   rA   &  sb    K




rA   c                    s   i }i  |   D ] \}}t|||< |sd |< q| }g }|j}|j} D ]2}	||	 D ]$}
||
  d8  < ||
 sZ||
 qZqN|r| }	dt fdd| |	 D   |	< ||	 D ]$}
||
  d8  < ||
 s||
 qq| fS )aG  Number of total data elements on which this key depends

    For each key we return the number of tasks that must be run for us to run
    this task.

    Examples
    --------
    >>> inc = lambda x: x + 1
    >>> dsk = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')}
    >>> dependencies, dependents = get_deps(dsk)
    >>> num_dependencies, total_dependencies = ndependencies(dependencies, dependents)
    >>> sorted(total_dependencies.items())
    [('a', 1), ('b', 2), ('c', 3)]

    Returns
    -------
    num_dependencies: Dict[key, int]
    total_dependencies: Dict[key, int]
    r   c                 3   s   | ]} | V  qd S r   r	   )r
   rh   r=   r	   r   r     s     z ndependencies.<locals>.<genexpr>)rD   r!   copyrG   rH   rc   )r)   r$   r*   r   r   Znum_dependenciesre   rf   rg   r   r[   r	   r=   r   r@     s.    
"r@   c                   @   s$   e Zd ZdZdZdd Zdd ZdS )r   a  Wrap object so that it defaults to string comparison

    When comparing two objects of different types Python fails

    >>> 'a' < 1
    Traceback (most recent call last):
        ...
    TypeError: '<' not supported between instances of 'str' and 'int'

    This class wraps the object so that, when this would occur it instead
    compares the string representation

    >>> StrComparable('a') < StrComparable(1)
    False
    objc                 C   s
   || _ d S r   rk   )selfrl   r	   r	   r   __init__  s    zStrComparable.__init__c                 C   s<   z| j |j k W S  tk
r6   t| j t|j k  Y S X d S r   )rl   	Exceptionr   )rm   otherr	   r	   r   __lt__  s    zStrComparable.__lt__N)__name__
__module____qualname____doc__	__slots__rn   rq   r	   r	   r	   r   r     s   r   	OrderInfo)ra   ageZnum_data_when_runZnum_data_when_releasedZnum_dependencies_freedc                    s2  |dkrt | \}}nt|}|dkr2t| |d}g }d}i  i i i dd | D }tt| |jdD ]\}}|| ||< d}	|| D ]@}
||
  d8  < ||
 dkr|||
   |
< ||
< |	d7 }	q|	|< || r||	d 8 }qnd |< ||< ||	8 }qn fdd| D }||fS )	a  Simulate runtime metrics as though running tasks one at a time in order.

    These diagnostics can help reveal behaviors of and issues with ``order``.

    Returns a dict of `namedtuple("OrderInfo")` and a list of the number of outputs held over time.

    OrderInfo fields:
    - order : the order in which the node is run.
    - age : how long the output of a node is held.
    - num_data_when_run : the number of outputs held in memory when a node is run.
    - num_data_when_released : the number of outputs held in memory when the output is released.
    - num_dependencies_freed : the number of dependencies freed by running the node.
    Nr.   r   c                 S   s   i | ]\}}|t |qS r	   r    r"   r	   r	   r   r     s      zdiagnostics.<locals>.<dictcomp>r2   r   c              
      s2   i | ]*\}}|t | | | | | qS r	   )rw   r"   rx   ZfreedZreleasepressureZrunpressurer	   r   r   .  s        )r   r   ra   rD   	enumeraterP   rE   rH   )r   or)   r$   ZpressureZnum_in_memoryr*   rW   r   Zreleasedr5   rvr	   ry   r   diagnostics   s@    


r}   )N)NN)ru   collectionsr   r   mathr   Z	dask.corer   r   r   r   ra   rA   r@   r   rw   r}   r	   r	   r	   r   <module>   s$   M
     W /