U
    /e                     @  s  d dl mZ d dlZd dlZd dlZd dlmZmZmZm	Z	m
Z
mZ d dlmZmZ d dlZd dlmZ d dlmZmZmZ d dlmZmZ d dlmZmZmZmZmZ d d	l m!Z! d
d Z"G dd de	Z#G dd de#Z$G dd de	Z%dddZ&ddddZ'dS )    annotationsN)HashableIterableKeysViewMappingMutableMappingSet)Anycast)config)	clone_keyflattenis_dask_collection)keys_in_tasksreverse_dict)ensure_dict
ensure_setimport_required	key_split	stringify)get_templatec                   sp    fdd}dd    D }dd  D }  D ]6\}}t||  |  D ]}|| || qRq4|S )z'Returns the dependencies between layersc                   s8      D ]\}}| |kr|  S qtt|  dd S )Nz
 not found)itemsRuntimeErrorrepr)keykvlayers 7/tmp/pip-unpacked-wheel-dbjnr7gq/dask/highlevelgraph.py_find_layer_containing_key   s    
z>compute_layer_dependencies.<locals>._find_layer_containing_keyc                 S  s   h | ]}|D ]}|qqS r    r    ).0layerr   r    r    r!   	<setcomp>   s       z-compute_layer_dependencies.<locals>.<setcomp>c                 S  s   i | ]}|t  qS r    )setr#   r   r    r    r!   
<dictcomp>   s      z.compute_layer_dependencies.<locals>.<dictcomp>)valuesr   r   keysadd)r   r"   all_keysretr   r   r   r    r   r!   compute_layer_dependencies   s    r.   c                   @  s  e Zd ZU dZded< ded< d<dddddZejd	d
ddZejdd
ddZ	ddddddZ
ddddddZd=dddddZeddddd d!d"Zd>dddd#d$d%d&Zdd'dd(d)d*d+Zed(d,d-d.d/d0d1Zd2d3 Zd4d5 Zd?d8d9Zd:d; ZdS )@Layerax  High level graph layer

    This abstract class establish a protocol for high level graph layers.

    The main motivation of a layer is to represent a collection of tasks
    symbolically in order to speedup a series of operations significantly.
    Ideally, a layer should stay in this symbolic state until execution
    but in practice some operations will force the layer to generate all
    its internal tasks. We say that the layer has been materialized.

    Most of the default implementations in this class will materialize the
    layer. It is up to derived classes to implement non-materializing
    implementations.
    Mapping[str, Any] | Noner   collection_annotationsN)r   r1   c                 C  s4   |pt  tdd| _|p,t  tdd| _dS )a  Initialize Layer object.

        Parameters
        ----------
        annotations : Mapping[str, Any], optional
            By default, None.
            Annotations are metadata or soft constraints associated with tasks
            that dask schedulers may choose to respect:
            They signal intent without enforcing hard constraints.
            As such, they are primarily designed for use with the distributed
            scheduler. See the dask.annotate function for more information.
        collection_annotations : Mapping[str, Any], optional. By default, None.
            Experimental, intended to assist with visualizing the performance
            characteristics of Dask computations.
            These annotations are *not* passed to the distributed scheduler.
        r   Nr1   )copyr   getr   r1   )selfr   r1   r    r    r!   __init__6   s    
zLayer.__init__boolreturnc                 C  s   dS )z/Return whether the layer is materialized or notTr    r4   r    r    r!   is_materializedP   s    zLayer.is_materializedr	   c                 C  s   |   S )a?  Return a set of all output keys

        Output keys are all keys in the layer that might be referenced by
        other layers.

        Classes overriding this implementation should not cause the layer
        to be materialized.

        Returns
        -------
        keys: Set
            All output keys
        r*   r9   r    r    r!   get_output_keysU   s    zLayer.get_output_keysr&   r   z$tuple[Layer, Mapping[Hashable, set]])r*   all_hlg_keysr8   c           	        s   t |t kr, fdd D fS i }t }i }| }|r| }| ||< | ||< || D ](}||krr|krr|| || qrqBt|jd|fS )am  Remove unnecessary tasks from the layer

        In other words, return a new Layer with only the tasks required to
        calculate `keys` and a map of external key dependencies.

        Examples
        --------
        >>> inc = lambda x: x + 1
        >>> add = lambda x, y: x + y
        >>> d = MaterializedLayer({'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)})
        >>> _, deps = d.cull({'out'}, d.keys())
        >>> deps
        {'out': {'x'}, 'x': set()}

        Returns
        -------
        layer: Layer
            Culled layer
        deps: Map
            Map of external key dependencies
        c                   s   i | ]}| | qS r    )get_dependenciesr'   r=   r4   r    r!   r(      s      zLayer.cull.<locals>.<dictcomp>r   )	lenr*   r&   r2   popr>   r+   MaterializedLayerr   )	r4   r*   r=   Zret_depsseenoutZworkr   dr    r?   r!   cullf   s$    
z
Layer.cullr   )r   r=   r8   c                 C  s   t || | gS )a:  Get dependencies of `key` in the layer

        Parameters
        ----------
        key: Hashable
            The key to find dependencies of
        all_hlg_keys: Iterable
            All keys in the high level graph.

        Returns
        -------
        deps: set
            A set of dependencies
        r   )r4   r   r=   r    r    r!   r>      s    zLayer.get_dependencies)r   r8   c                   sj   t dt| jpi |pi }i }| D ]<\} t r\ fdd| D ||< d|| d< q( ||< q(|S )a  Packs Layer annotations for transmission to scheduler

        Callables annotations are fully expanded over Layer keys, while
        other values are simply transmitted as is

        Parameters
        ----------
        annotations : Mapping[str, Any], optional
            A top-level annotations.

        Returns
        -------
        packed_annotations : dict
            Packed annotations.
        zdict[str, Any]c                   s   i | ]}t | |qS r    r   r'   r   r    r!   r(      s      z?Layer.__dask_distributed_annotations_pack__.<locals>.<dictcomp>T__expanded_annotations__)r   toolzmerger   r   callable)r4   r   packedar    rI   r!   %__dask_distributed_annotations_pack__   s     
z+Layer.__dask_distributed_annotations_pack__zMutableMapping[str, Any]Iterable[Hashable]None)r   new_annotationsr*   r8   c                 C  s   |dkrdS i }d}|  D ]Z\}}t|tkrPd|krP| }|d= |||< q|sfdd |D }d}t||||< q|  D ]\}}|| |i  q| | dS )a  
        Unpack a set of layer annotations across a set of keys, then merge those
        expanded annotations for the layer into an existing annotations mapping.

        This is not a simple shallow merge because some annotations like retries,
        priority, workers, etc need to be able to retain keys from different layers.

        Parameters
        ----------
        annotations: MutableMapping[str, Any], input/output
            Already unpacked annotations, which are to be updated with the new
            unpacked annotations
        new_annotations: Mapping[str, Any], optional
            New annotations to be unpacked into `annotations`
        keys: Iterable
            All keys in the layer.
        NFrJ   c                 S  s   g | ]}t |qS r    rH   r'   r    r    r!   
<listcomp>   s     zALayer.__dask_distributed_annotations_unpack__.<locals>.<listcomp>T)r   typedictr2   fromkeysupdater3   )r   rS   r*   expandedZkeys_stringifiedrO   r   r   r    r    r!   '__dask_distributed_annotations_unpack__   s     
z-Layer.__dask_distributed_annotations_unpack__ztuple[Layer, bool])r*   seedbind_tor8   c           	        s   ddl m}  fdd i }d}|  D ]J\}}|krpt|}d |}|dk	rprp|j||f}d}|||< q.t||fS )aC  Clone selected keys in the layer, as well as references to keys in other
        layers

        Parameters
        ----------
        keys
            Keys to be replaced. This never includes keys not listed by
            :meth:`get_output_keys`. It must also include any keys that are outside
            of this layer that may be referenced by it.
        seed
            Common hashable used to alter the keys; see :func:`dask.base.clone_key`
        bind_to
            Optional key to bind the leaf nodes to. A leaf node here is one that does
            not reference any replaced keys; in other words it's a node where the
            replacement graph traversal stops; it may still have dependencies on
            non-replaced nodes.
            A bound node will not be computed until after ``bind_to`` has been computed.

        Returns
        -------
        - New layer
        - True if the ``bind_to`` key was injected anywhere; False otherwise

        Notes
        -----
        This method should be overridden by subclasses to avoid materializing the layer.
        r   )chunksc                   s   t | }|tkrH| rHt| d rH| d ft fdd| dd D  S |tkrb fdd| D S |tkr fdd	|  D S z| kr| W S W n tk
r   |  Y S X d
t| S dS )zhVariant of distributed.utils_comm.subs_multiple, which allows injecting
            bind_to
            r   c                 3  s   | ]} |V  qd S Nr    r#   iclone_valuer    r!   	<genexpr>"  s     z3Layer.clone.<locals>.clone_value.<locals>.<genexpr>   Nc                   s   g | ]} |qS r    r    r_   ra   r    r!   rT   $  s     z4Layer.clone.<locals>.clone_value.<locals>.<listcomp>c                   s   i | ]\}}| |qS r    r    r#   r   r   ra   r    r!   r(   &  s      z4Layer.clone.<locals>.clone_value.<locals>.<dictcomp>F)rU   tuplerM   listrV   r   	TypeErrorr   )otyprb   Zis_leafr*   r[   r    r!   rb     s    (

z Layer.clone.<locals>.clone_valueFTN)Zdask.graph_manipulationr]   r   r   bindrB   )	r4   r*   r[   r\   r]   Zdsk_newboundr   valuer    rk   r!   clone   s    !

zLayer.clonezMapping[Hashable, Set]r
   )r=   known_key_dependenciesclient_keysr8   c                   s  ddl m  ddlm} ddlm}m} ddlm} t	|  fdd
 D }	|	rb||	i }
i }
 D ](\}}||dd	\|
|< }|rr|||< qr|
|rtj|  nt }|D ]4}|j|k	rtd
t|j|jkr|t|jqt|dd}|	r"t|	fdd|
 D } |  }|fdd|D  |
 D ]<\}}|rTt|| dd}|dd |D  |||< qTdd |
 D } B fdd
 D t||dS )a  Pack the layer for scheduler communication in Distributed

        This method should pack its current state and is called by the Client when
        communicating with the Scheduler.
        The Scheduler will then use .__dask_distributed_unpack__(data, ...) to unpack
        the state, materialize the layer, and merge it into the global task graph.

        The returned state must be compatible with Distributed's scheduler, which
        means it must obey the following:
          - Serializable by msgpack (notice, msgpack converts lists to tuples)
          - All remote data must be unpacked (see unpack_remotedata())
          - All keys must be converted to strings now or when unpacking
          - All tasks must be serialized (see dumps_task())

        The default implementation materialize the layer thus layers such as Blockwise
        and ShuffleLayer should implement a specialized pack and unpack function in
        order to avoid materialization.

        Parameters
        ----------
        all_hlg_keys: Iterable[Hashable]
            All keys in the high level graph
        known_key_dependencies: Mapping[Hashable, Set]
            Already known dependencies
        client: distributed.Client
            The client calling this function.
        client_keys : Iterable[Hashable]
            List of keys requested by the client.

        Returns
        -------
        state: Object serializable by msgpack
            Scheduler compatible state of the layer
        r   )Future)CancelledError)subs_multipleunpack_remotedata)
dumps_taskc                   s(   i | ] \}}t | r|kr||qS r    )
isinstancere   )rr   rq   r    r!   r(   r  s
   
  z3Layer.__dask_distributed_pack__.<locals>.<dictcomp>T)Z	byte_keysz;Inputs contain futures that were created by another client.r2   c                   s   i | ]\}}||  qS r    r    re   )
alias_keysr    r!   r(     s      c                 3  s&   | ]}|t  | gd dfV  qdS )F)Zas_listNrG   r'   )r=   dskr    r!   rc     s   z2Layer.__dask_distributed_pack__.<locals>.<genexpr>c                 s  s   | ]}|j V  qd S r^   r   )r#   fr    r    r!   rc     s     c                 S  s$   i | ]\}}t |d d |D qS )c                 S  s   h | ]}t |qS r    rH   )r#   depr    r    r!   r%     s     z=Layer.__dask_distributed_pack__.<locals>.<dictcomp>.<setcomp>rH   )r#   r   depsr    r    r!   r(     s    c                   s"   i | ]\}}t |t | d qS ))Z	exclusiverH   re   )merged_hlg_keysr    r!   r(     s    )rz   dependencies)Zdistributed.clientrr   Zdistributed.utilsrs   Zdistributed.utils_commrt   ru   Zdistributed.workerrv   rV   r   r&   unionr)   client
ValueErrorr   r   futuresr   r*   rX   r   rK   Zvalmap)r4   r=   rp   r   rq   rs   rt   ru   rv   Zfuture_aliasesZdsk2Zfut_depsr   r   ZfutsZunpacked_futuresfuturer   missing_keysr   rE   r    )rr   ry   r=   rq   rz   r   r!   __dask_distributed_pack__@  s^    )



zLayer.__dask_distributed_pack__zMapping[str, Any]zMapping[str, set]rV   )staterz   r   r8   c                 C  s   |d |d dS )aa  Unpack the state of a layer previously packed by __dask_distributed_pack__()

        This method is called by the scheduler in Distributed in order to unpack
        the state of a layer and merge it into its global task graph. The method
        can use `dsk` and `dependencies`, which are the already materialized
        state of the preceding layers in the high level graph. The layers of the
        high level graph are unpacked in topological order.

        See Layer.__dask_distributed_pack__() for packing detail.

        Parameters
        ----------
        state: Any
            The state returned by Layer.__dask_distributed_pack__()
        dsk: Mapping, read-only
            The materialized low level graph of the already unpacked layers
        dependencies: Mapping, read-only
            The dependencies of each key in `dsk`

        Returns
        -------
        unpacked-layer: dict
            layer_dsk: Mapping[str, Any]
                Materialized (stringified) graph of the layer
            layer_deps: Mapping[str, set]
                Dependencies of each key in `layer_dsk`
        rz   r   )rz   r~   r    )clsr   rz   r   r    r    r!   __dask_distributed_unpack__  s    "z!Layer.__dask_distributed_unpack__c                 C  s   t t| ffS )zBDefault serialization implementation, which materializes the Layer)rB   rV   r9   r    r    r!   
__reduce__  s    zLayer.__reduce__c                 C  s"   t | | j}|j| j |S )z#Default shallow copy implementation)rU   __new__	__class____dict__rX   )r4   objr    r    r!   __copy__  s    zLayer.__copy__ r    c              	   C  s   |dkrt |}nt| dr(t | j}n| jj}d}| jrn| jddkrn| jd}|rnddlm} ||}t	dj
|  ||||  ||d	S )
Nr   namerU   dask.array.core.Arrayr]   r   )svgzhighlevelgraph_layer.html.j2)Zmaterialized	shortnamelayer_indexhighlevelgraph_keyinfor   svg_repr)r   hasattrr   r   __name__r1   r3   Zdask.array.svgr   r   renderr:   layer_info_dict)r4   r   r   r   r   r   r]   r   r    r    r!   _repr_html_  s.    

zLayer._repr_html_c                 C  s   t | j|  t|   d}| jd k	rP| j D ]\}}tt	|||< q4| j
d k	r| j
 D ]"\}}|dkrdtt	|||< qd|S )N)
layer_typer:   znumber of outputsr]   )rU   r   r:   r@   r<   r   r   htmlescapestrr1   )r4   r   r   valr    r    r!   r     s    

zLayer.layer_info_dict)NN)N)N)r   r   r    )r   
__module____qualname____doc____annotations__r5   abcabstractmethodr:   r<   rF   r>   rP   staticmethodrZ   ro   r   classmethodr   r   r   r   r   r    r    r    r!   r/   #   s2   
  0 3 Ks#
r/   c                      sX   e Zd ZdZddd fddZdd Zd	d
 Zdd Zdd Zdd Z	dd Z
  ZS )rB   zFully materialized layer of `Layer`

    Parameters
    ----------
    mapping: Mapping
        The mapping between keys and tasks, typically a dask graph.
    Nr   mappingc                   s   t  j|d || _d S )Nr   )superr5   r   )r4   r   r   r   r    r!   r5     s    zMaterializedLayer.__init__c                 C  s
   || j kS r^   r   r4   r   r    r    r!   __contains__  s    zMaterializedLayer.__contains__c                 C  s
   | j | S r^   r   r   r    r    r!   __getitem__  s    zMaterializedLayer.__getitem__c                 C  s
   t | jS r^   )iterr   r9   r    r    r!   __iter__"  s    zMaterializedLayer.__iter__c                 C  s
   t | jS r^   )r@   r   r9   r    r    r!   __len__%  s    zMaterializedLayer.__len__c                 C  s   dS )NTr    r9   r    r    r!   r:   (  s    z!MaterializedLayer.is_materializedc                 C  s   |   S r^   r;   r9   r    r    r!   r<   +  s    z!MaterializedLayer.get_output_keys)N)r   r   r   r   r5   r   r   r   r   r:   r<   __classcell__r    r    r   r!   rB     s   rB   c                   @  sf  e Zd ZU dZded< ded< ded< ded	< d
ed< dNddddddZedd ZedOddZdd Z	ddddZ
dd Zdddd Zd!dd"d#Zd
dd$d%Zd&d' Zd(d) Zddd*d+Zed,d- Zd.d/ Zed0d1 ZdPd3d4Zd5d6 Zd7d d8d9d:Zd;d d<d=d>Zd?d@ ZdQdAdBddCdDdEZedddFdGdHZdIddJdKZdLdM ZdS )RHighLevelGrapha
  Task graph composed of layers of dependent subgraphs

    This object encodes a Dask task graph that is composed of layers of
    dependent subgraphs, such as commonly occurs when building task graphs
    using high level collections like Dask array, bag, or dataframe.

    Typically each high level array, bag, or dataframe operation takes the task
    graphs of the input collections, merges them, and then adds one or more new
    layers of tasks for the new operation.  These layers typically have at
    least as many tasks as there are partitions or chunks in the collection.
    The HighLevelGraph object stores the subgraphs for each operation
    separately in sub-graphs, and also stores the dependency structure between
    them.

    Parameters
    ----------
    layers : Mapping[str, Mapping]
        The subgraph layers, keyed by a unique name
    dependencies : Mapping[str, set[str]]
        The set of layers on which each layer depends
    key_dependencies : Mapping[Hashable, set], optional
        Mapping (some) keys in the high level graph to their dependencies. If
        a key is missing, its dependencies will be calculated on-the-fly.

    Examples
    --------
    Here is an idealized example that shows the internal state of a
    HighLevelGraph

    >>> import dask.dataframe as dd

    >>> df = dd.read_csv('myfile.*.csv')  # doctest: +SKIP
    >>> df = df + 100  # doctest: +SKIP
    >>> df = df[df.name == 'Alice']  # doctest: +SKIP

    >>> graph = df.__dask_graph__()  # doctest: +SKIP
    >>> graph.layers  # doctest: +SKIP
    {
     'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
                  ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
                  ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
                  ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},
     'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
             ('add', 1): (operator.add, ('read-csv', 1), 100),
             ('add', 2): (operator.add, ('read-csv', 2), 100),
             ('add', 3): (operator.add, ('read-csv', 3), 100)}
     'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
                ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
                ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
                ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
    }

    >>> graph.dependencies  # doctest: +SKIP
    {
     'read-csv': set(),
     'add': {'read-csv'},
     'filter': {'add'}
    }

    See Also
    --------
    HighLevelGraph.from_collections :
        typically used by developers to make new HighLevelGraphs
    zMapping[str, Layer]r   zMapping[str, Set]r   zdict[Hashable, Set]key_dependenciesrV   _to_dictr&   _all_external_keysNzMapping[str, Mapping]zdict[Hashable, Set] | None)r   r   r   c                 C  s(   || _ |pi | _dd | D | _d S )Nc                 S  s(   i | ] \}}|t |tr|nt|qS r    )rw   r/   rB   re   r    r    r!   r(     s    z+HighLevelGraph.__init__.<locals>.<dictcomp>)r   r   r   r   )r4   r   r   r   r    r    r!   r5   w  s
    
zHighLevelGraph.__init__c                 C  s   t |stt|| }t|tr\t|jdd}|||< t|jdd}t	|
 ||< n$t|}||||i}||h|t	 i}| ||S )z4`from_collections` optimized for a single collectionTrx   )r   rh   rU   __dask_graph__rw   r   r   r   r   r&   __dask_layers___get_some_layer_name)r   r   r$   
collectiongraphr   r~   r   r    r    r!   _from_collection  s    
zHighLevelGraph._from_collectionr    c           	      C  s   t |dkr| |||d S ||i}|t i}tj|tdD ]}t|r| }t|t	r|
|j |
|j ||  t| O  < qt|}|||< || | t ||< q>tt|q>| ||S )ac  Construct a HighLevelGraph from a new layer and a set of collections

        This constructs a HighLevelGraph in the common case where we have a single
        new layer and a set of old collections on which we want to depend.

        This pulls out the ``__dask_layers__()`` method of the collections if
        they exist, and adds them to the dependencies for this new layer.  It
        also merges all of the layers from all of the dependent collections
        together into the new layers for this graph.

        Parameters
        ----------
        name : str
            The name of the new layer
        layer : Mapping
            The graph layer itself
        dependencies : List of Dask collections
            A list of other dask collections (like arrays or dataframes) that
            have graphs themselves

        Examples
        --------

        In typical usage we make a new task layer, and then pass that layer
        along with all dependent collections to this method.

        >>> def add(self, other):
        ...     name = 'add-' + tokenize(self, other)
        ...     layer = {(name, i): (add, input_key, other)
        ...              for i, input_key in enumerate(self.__dask_keys__())}
        ...     graph = HighLevelGraph.from_collections(name, layer, dependencies=[self])
        ...     return new_collection(name, graph)
        rd   r   r{   )r@   r   r&   rK   uniqueidr   r   rw   r   rX   r   r   r   r   r+   rh   rU   )	r   r   r$   r   r   r~   r   r   r   r    r    r!   from_collections  s"    #

zHighLevelGraph.from_collectionsc              
   C  s   z| j | | W S  tk
r$   Y nX z| j |d  | W S  tttfk
rT   Y nX | j  D ](}z|| W   S  tk
r   Y q`X q`t|d S )Nr   )r   KeyError
IndexErrorrh   r)   )r4   r   rE   r    r    r!   r     s    zHighLevelGraph.__getitem__intr7   c                 C  s   t dd | j D S )Nc                 s  s   | ]}t |V  qd S r^   r@   )r#   r$   r    r    r!   rc     s     z)HighLevelGraph.__len__.<locals>.<genexpr>)sumr   r)   r9   r    r    r!   r     s    zHighLevelGraph.__len__c                 C  s   t |  S r^   )r   to_dictr9   r    r    r!   r     s    zHighLevelGraph.__iter__c                 C  s4   z| j W S  tk
r.   t|  }| _ | Y S X dS )zIEfficiently convert to plain dict. This method is faster than dict(self).N)r   AttributeErrorr   )r4   rD   r    r    r!   r     s
    zHighLevelGraph.to_dictr   c                 C  s   |    S )zGet all keys of all the layers.

        This will in many cases materialize layers, which makes it a relatively
        expensive operation. See :meth:`get_all_external_keys` for a faster alternative.
        )r   r*   r9   r    r    r!   r*     s    zHighLevelGraph.keysc                 C  sP   z| j W S  tk
rJ   t }| j D ]}||  q(|| _ | Y S X dS )zGet all output keys of all layers

        This will in most cases _not_ materialize any layers, which makes
        it a relative cheap operation.

        Returns
        -------
        keys: set
            A set of all external keys
        N)r   r   r&   r   r)   rX   r<   )r4   r*   r$   r    r    r!   get_all_external_keys  s    z$HighLevelGraph.get_all_external_keysc                 C  s   |    S r^   )r   r   r9   r    r    r!   r     s    zHighLevelGraph.itemsc                 C  s   |    S r^   )r   r)   r9   r    r    r!   r)     s    zHighLevelGraph.valuesc                 C  sT   |   }|| j   }|rN| j D ](}||  @ D ]}|||| j|< q4q$| jS )zGet dependencies of all keys

        This will in most cases materialize all layers, which makes
        it an expensive operation.

        Returns
        -------
        map: Mapping
            A map that maps each key to its dependencies
        )r*   r   r   r)   r>   )r4   r,   r   r$   r   r    r    r!   get_all_dependencies  s    z#HighLevelGraph.get_all_dependenciesc                 C  s
   t | jS r^   )r   r   r9   r    r    r!   
dependents1  s    zHighLevelGraph.dependentsc                 C  s&   t t| jddt| jdd| j S )NTrx   )r   r   r   r   r   r2   r9   r    r    r!   r2   5  s
    zHighLevelGraph.copyc                 G  sn   i }i }|D ]V}t |tr4||j ||j qt |trZ||t|< t |t|< qt|q| ||S r^   )	rw   r   rX   r   r   r   r   r&   rh   )r   Zgraphsr   r   gr    r    r!   rL   <  s    


zHighLevelGraph.mergedask-hlg.svgc                 K  s(   ddl m} t| f|}|||| |S )a>  
        Visualize this dask high level graph.

        Requires ``graphviz`` to be installed.

        Parameters
        ----------
        filename : str or None, optional
            The name of the file to write to disk. If the provided `filename`
            doesn't include an extension, '.png' will be used by default.
            If `filename` is None, no file will be written, and the graph is
            rendered in the Jupyter notebook only.
        format : {'png', 'pdf', 'dot', 'svg', 'jpeg', 'jpg'}, optional
            Format in which to write output file. Default is 'svg'.
        color : {None, 'layer_type'}, optional (default: None)
            Options to color nodes.
            - None, no colors.
            - layer_type, color nodes based on the layer type.
        **kwargs
           Additional keyword arguments to forward to ``to_graphviz``.

        Examples
        --------
        >>> x.dask.visualize(filename='dask.svg')  # doctest: +SKIP
        >>> x.dask.visualize(filename='dask.svg', color='layer_type')  # doctest: +SKIP

        Returns
        -------
        result : IPython.diplay.Image, IPython.display.SVG, or None
            See dask.dot.dot_graph for more information.

        See Also
        --------
        dask.dot.dot_graph
        dask.base.visualize # low level variant
        r   )graphviz_to_file)dask.dotr   to_graphviz)r4   filenameformatkwargsr   r   r    r    r!   	visualizeK  s    &zHighLevelGraph.visualizec           
      C  s   dd | j  D }dd | j D }g }| j  D ].\}}|D ]}|| | q>|s2|| q2g }t|dkr| }|| || D ]*}	||	  d8  < ||	 dkr||	 qqf|S )a  Sort the layers in a high level graph topologically

        Parameters
        ----------
        hlg : HighLevelGraph
            The high level graph's layers to sort

        Returns
        -------
        sorted: list
            List of layer names sorted topologically
        c                 S  s   i | ]\}}|t |qS r    r   re   r    r    r!   r(     s      z3HighLevelGraph._toposort_layers.<locals>.<dictcomp>c                 S  s   i | ]
}|g qS r    r    r'   r    r    r!   r(     s      r   rd   )r   r   appendr@   rA   )
r4   ZdegreeZreverse_depsreadyr   r   r}   r-   r$   Zrdepr    r    r!   _toposort_layersw  s"    
zHighLevelGraph._toposort_layersr   )r*   r8   c                   s   ddl m} tt|} }i }i }t D ]}j| }||	 }	|	r4|
|	|\}
}t }| D ]}||O }qr||
	 8 }||O }|
||< t||st|ts| r4t|t|kr4|| q4t|   fdd|D }t|||S )a  Return new HighLevelGraph with only the tasks required to calculate keys.

        In other words, remove unnecessary tasks from dask.

        Parameters
        ----------
        keys
            iterable of keys or nested list of keys such as the output of
            ``__dask_keys__()``

        Returns
        -------
        hlg: HighLevelGraph
            Culled high level graph
        r   )	Blockwisec                   s   i | ]}|j |  @ qS r    )r   )r#   
layer_nameZret_layers_keysr4   r    r!   r(     s    z'HighLevelGraph.cull.<locals>.<dictcomp>)Zdask.layersr   r&   r   r   reversedr   r   intersectionr<   rF   r)   rw   rB   r:   r@   rX   r*   r   )r4   r*   r   Zkeys_setZall_ext_keys
ret_layersZret_key_depsr   r$   Zoutput_keysZculled_layerZculled_depsZexternal_depsrE   ret_dependenciesr    r   r!   rF     s<    

zHighLevelGraph.cullzIterable[str])r   r8   c                 C  sX   t |}i }i }|rN| }| j| ||< | j| ||< ||| |  O }qt||S )a  Return a new HighLevelGraph with only the given layers and their
        dependencies. Internally, layers are not modified.

        This is a variant of :meth:`HighLevelGraph.cull` which is much faster and does
        not risk creating a collision between two layers with the same name and
        different content when two culled graphs are merged later on.

        Returns
        -------
        hlg: HighLevelGraph
            Culled high level graph
        )r&   rA   r   r   r*   r   )r4   r   Zto_visitr   r   r   r    r    r!   cull_layers  s    zHighLevelGraph.cull_layersc           	   
   C  s  | j  D ]L\}}|| jkr0tdt| d|D ] }|| j kr4tt| dq4q
| j D ]}t|dsbtqbt| j}| j 	 }|	 }||krtdt
|dt
||D ]F}| j | || krtdt| dt| j |  dt||  qd S )	Nzdependencies[z] not found in layersz not found in dependenciesr   zincorrect dependencies keys z
 expected zincorrect dependencies[z]: )r   r   r   r   r   r)   r   AssertionErrorr.   r*   r&   )	r4   r   r~   r}   r$   r   Zdep_key1Zdep_key2r   r    r    r!   validate  s,    



*zHighLevelGraph.validaterQ   r0   )rq   r   r8   c                   s\   g } fdd   D D ]8}||jt|j|   j||||d qd|iS )a  Pack the high level graph for Scheduler -> Worker communication

        The approach is to delegate the packaging to each layer in the high level graph
        by calling .__dask_distributed_pack__() and .__dask_distributed_annotations_pack__()
        on each layer.

        Parameters
        ----------
        client : distributed.Client
            The client calling this function.
        client_keys : Iterable[Hashable]
            List of keys requested by the client.
        annotations : Mapping[str, Any], optional
            A top-level annotations.

        Returns
        -------
        data: dict
            Packed high level graph layers
        c                 3  s   | ]} j | V  qd S r^   r   )r#   r   r9   r    r!   rc   1  s     z;HighLevelGraph.__dask_distributed_pack__.<locals>.<genexpr>)r   r   r   r   r   )	r   r   r   rU   r   r   r   r   rP   )r4   r   rq   r   r   r$   r    r9   r!   r     s"    z(HighLevelGraph.__dask_distributed_pack__)hlgr8   c                 C  s   ddl m} i }i }i }| d D ]}|d dkr>tj}tj}n&||d }t||d }	|	j}|	j}||d ||}
||
d  |
d	  D ]\}}||t	 |B ||< q|||d
 |
d 
  q |||dS )a  Unpack the high level graph for Scheduler -> Worker communication

        The approach is to delegate the unpackaging to each layer in the high level graph
        by calling ..._unpack__() and ..._annotations_unpack__()
        on each layer.

        Parameters
        ----------
        hlg: dict
            Packed high level graph layers

        Returns
        -------
        unpacked-graph: dict
            dsk: dict[str, Any]
                Materialized (stringified) graph of all nodes in the high level graph
            deps: dict[str, set]
                Dependencies of each key in `dsk`
            annotations: dict[str, Any]
                Annotations for `dsk`
        r   )import_allowed_moduler   r   Nr   r   rz   r~   r   )rz   r~   r   )Zdistributed.protocol.serializer   r/   r   rZ   getattrrX   r   r3   r&   r*   )r   r   rz   r~   annor$   Zunpack_stateZunpack_annomodr   Zunpacked_layerr   r   r    r    r!   r   C  s$    z*HighLevelGraph.__dask_distributed_unpack__r   c              	   C  sx   t | j dt| j d}|d| jj d| jj dtt|  d7 }t| 	 D ]\}}|d| d| d	7 }qT|S )
Nz with z	 layers.
<.z object at z>
 z. 
)
rU   r   r@   r   r   r   hexr   	enumerater   )r4   Zrepresentationr`   Zlayerkeyr    r    r!   __repr__v  s
    ,zHighLevelGraph.__repr__c                 C  s.   t djt| j| j|  | jt|  dS )Nzhighlevelgraph.html.j2)rU   r   ZtoposortZlayer_dependenciesZ	n_outputs)	r   r   rU   r   r   r   r   r@   r   r9   r    r    r!   r   }  s    
zHighLevelGraph._repr_html_)N)r    )r   N)N) r   r   r   r   r   r5   r   r   r   r   r   r   r   r*   r   r   r)   r   propertyr   r2   rL   r   r   rF   r   r   r   r   r   r   r   r    r    r    r!   r   /  sJ   
A 
7


,E& .2r   BTc           "      K  s  ddl m}m}	 tdd}
|p i }|p(i }|p0i }|p8i }|p@i }||d< d|d< d|d	< || |
j|||d
}i }| jD ]}t| j| ||< q~t	|
 }t|
 }i }|d}|dkrddgddgddgddgddgddgddgddgd}| jD ]}|	|}||i }|||d}||kr4dntd|| | ||  d  }tt| j| j}d|dd d||  d}| j| j}|rV|ddkr|d|d d|d  d!|d" d#|d$ d%	7 }|dd&krVd'd(i}|d)}|d*|d+ d,||d- d%t| d.tt|d/krJt|nd0 d%	7 }|d1t| |d2t| |d3t| |dkr||d }d4||d5< |d6t| |d7d8 |j|f| q | j D ]2\}}|	|}|D ]}|	|}||| qq|dkrd9} d:}!| D ].\}}|d5 r2|!d;|d  d<| d=7 }!q2|!d>7 }!|| i }|d1t|! |d2d? |d@dA |j| f| |S )BNr   )labelr   graphvizaJ  Drawing dask graphs with the graphviz visualization engine requires the `graphviz` python library and the `graphviz` system library.

Please either conda or pip install as follows:

  conda install python-graphviz     # either conda install
  python -m pip install graphviz    # or pip install and follow installation instructionsrankdirZboxshapeZ	helveticaZfontname)
graph_attr	node_attr	edge_attrcolorr   z#CCC7F9Fz#F9CCC7z#FFD9F2z#D9F2FFz#D9FFE6z#DBDEE5)ZDataFrameIOLayerZShuffleLayerZSimpleShuffleLayerZArrayOverlayLayerZBroadcastJoinLayerr   ZBlockwiseLayerrB   )cache   zA r/   r   z Layer with z Tasks.
rU   r   zArray Shape: z
Data Type: Zdtypez
Chunk Size: 	chunksizez
Chunk Type: Z
chunk_typer   zdask.dataframe.core.DataFramezpandas.core.frame.DataFrameZpandascolumnszNumber of Partitions: Znpartitionsz
DataFrame Type: Zdataframe_typez DataFrame Columns: (   z[...]r   ZfontsizeZtooltipTrd   Z	fillcolorstyleZfilledZKeyzn<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0" CELLPADDING="5"><TR><TD><B>Legend: Layer types</B></TD></TR>z<TR><TD BGCOLOR="z">z
</TD></TR>z	</TABLE>>Z20margin0)r   r   r   r   rX   ZDigraphr   r@   r   minr)   maxr3   r   r   rU   r   replacer1   
setdefaultnoder   Zedge)"hgZdata_attributesZfunction_attributesr   r   r   r   r   r   r   r   r   Zn_tasksr$   Z	min_tasksZ	max_tasksr   r   Zlayer_colorsr   attrsZ
node_labelZ	node_sizer   Znode_tooltipsZlayer_caZdftypecolsZ
node_colorr~   r}   dep_nameZlegend_titleZlegend_labelr    r    r!   r     s    
	
  

4
L


r   r   r7   c              	   C  s:   z|   \}|W S  ttfk
r4   tt|  Y S X dS )zLSomehow get a unique name for a Layer from a non-HighLevelGraph dask mappingN)r   r   r   r   r   )r   r   r    r    r!   r     s
    
r   )NNr   NNN)(
__future__r   r   r2   r   collections.abcr   r   r   r   r   r	   typingr
   r   ZtlzrK   Zdaskr   Z	dask.baser   r   r   Z	dask.corer   r   Z
dask.utilsr   r   r   r   r   Zdask.widgetsr   r.   r/   rB   r   r   r   r    r    r    r!   <module>   s>       o     ^      
 
