U
    /e-1                     @  sP  d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZmZmZmZ d dlZd dlmZ d d	lmZmZmZ d d
lmZ d dlmZ eeZ d(ddZ!G dd dZ"d gZ#edfddZ$e%e&e'e(fZ)dddddddZ*d)ddddddZ+e,fddZ-d d! Z.d"e/e0fdfd#d$Z1dd%d&d'Z2dS )*    )annotationsN)defaultdict)partial)cycle)Any)concatdropgroupbymerge)SubgraphCallable)is_namedtuple_instanceparse_timedelta	stringifyrpc)AllTc                   s  ddl m  t }t }| dd |  D } t }t }t|t| t| k rtt}	t }
t }|  D ]^\}}||krqpz,t	t|| }|	| 
| ||
|< W qp tk
r   || Y qpX qp|r||O } fdd|	 D }i | D ]\}}z|I dH }W nX tk
r:   || Y nH tk
rr } ztd| || W 5 d}~X Y nX |d  q|fd	d
|
 D O }| q:fdd|D }||t|fS )a  Gather data directly from peers

    Parameters
    ----------
    who_has: dict
        Dict mapping keys to sets of workers that may have that key
    rpc: callable

    Returns dict mapping key to value

    See Also
    --------
    gather
    _gather
    r   )get_data_from_workerc                 S  s   i | ]\}}|t |qS  )set.0kvr   r   :/tmp/pip-unpacked-wheel-g426oqom/distributed/utils_comm.py
<dictcomp>,   s      z'gather_from_workers.<locals>.<dictcomp>c                   s6   i | ].\}}|t j ||d dd| dqS )F)whoserializersZmax_connectionszget-data-from-)name)asyncioZcreate_task)r   addresskeys)r   r   r   r   r   r   r   ?   s    Nz9Got an unexpected error while collecting from workers: %sdatac                   s   h | ]\}}| kr|qS r   r   r   )responser   r   	<setcomp>[   s      z&gather_from_workers.<locals>.<setcomp>c                   s   i | ]}|t  | qS r   )list)r   r   )original_who_hasr   r   r   ^   s      )Zdistributed.workerr   r   itemsdictlenr   r$   randomchoiceappend
IndexErroraddOSError
ValueErrorloggerinfoupdate)who_hasr   closer   r   Zbad_addressesZmissing_workersresultsZall_bad_keysdrevZbad_keyskey	addressesaddrZ
coroutinesworkercrer   )r   r%   r"   r   r   r   r   gather_from_workers   sT     r?   c                   @  s    e Zd ZdZdd Zdd ZdS )
WrappedKeya  Interface for a key in a dask graph.

    Subclasses must have .key attribute that refers to a key in a dask graph.

    Sometimes we want to associate metadata to keys in a dask graph.  For
    example we might know that that key lives on a particular machine or can
    only be accessed in a certain way.  Schedulers may have particular needs
    that can only be addressed by additional metadata.
    c                 C  s
   || _ d S Nr8   )selfr8   r   r   r   __init__m   s    zWrappedKey.__init__c                 C  s   t | j d| j dS )Nz('z'))type__name__r8   )rC   r   r   r   __repr__p   s    zWrappedKey.__repr__N)rF   
__module____qualname____doc__rD   rG   r   r   r   r   r@   b   s   
r@   c              	     s0  t | tstt |tstttdd |  D }tt|  \}}ttd t	| t
|}td  t	|7  < tt|||}td|}dd | D }fdd|D z&t fdd	| D I dH }
W 5  D ]}	|	 I dH  qX td
d |
D }dd td| D }|||fS )a  Scatter data directly to workers

    This distributes data in a round-robin fashion to a set of workers based on
    how many cores they have.  nthreads should be a dictionary mapping worker
    identities to numbers of cores.

    See scatter for parameter docstring
    c                 s  s   | ]\}}|g| V  qd S rA   r   )r   wZncr   r   r   	<genexpr>   s     z%scatter_to_workers.<locals>.<genexpr>r   c                 S  s    i | ]\}}|d d |D qS )c                 S  s   i | ]\}}}||qS r   r   )r   _r8   valuer   r   r   r      s      z1scatter_to_workers.<locals>.<dictcomp>.<dictcomp>r   )r   r;   r   r   r   r   r      s      z&scatter_to_workers.<locals>.<dictcomp>c                   s   i | ]}| |qS r   r   )r   r:   r   r   r   r      s      Nc                   s"   g | ]\}}| j | d qS ))r!   report)Zupdate_data)r   r   r   )rO   rpcsr   r   
<listcomp>   s
   z&scatter_to_workers.<locals>.<listcomp>c                 s  s   | ]}|d  V  qdS )nbytesNr   )r   or   r   r   rL      s     c                 S  s    i | ]\}}|d d |D qS )c                 S  s   g | ]\}}}|qS r   r   )r   rK   rM   r   r   r   rQ      s     z1scatter_to_workers.<locals>.<dictcomp>.<listcomp>r   r   r   r   r   r      s         )
isinstancer'   AssertionErrorr$   r   r&   zipr   _round_robin_counterr(   r   r	   valuesZ	close_rpcr   r
   )Znthreadsr!   r   rO   workersnamesZworker_iterLr6   r=   outrR   r3   r   )rO   r   rP   r   scatter_to_workersw   s*    	

r^   r   boolzset[WrappedKey])rS   	byte_keys
found_keysreturnc                   s  t | }|tkr| s| S t | d tkrt t fdd| dd D } | d }t  fdd|j D }d}r܈  rtd	d D ntd
d D }t|j| }t||j||j	}|f| | S t fdd| D S n"t
| r&| fdd| D  S |tkrV| s:| S  fdd| D }	||	S |tkr| r~ fdd|  D S | S n2t|tr| j}
 rt|
}
|  |
S | S dS )zXInner implementation of `unpack_remotedata` that adds found wrapped keys to `found_keys`r   c                 3  s   | ]}t | V  qd S rA   _unpack_remotedata_innerr   ir`   futuresr   r   rL      s     z+_unpack_remotedata_inner.<locals>.<genexpr>rT   Nc                   s   i | ]\}}|t | qS r   rc   r   rg   r   r   r      s    z,_unpack_remotedata_inner.<locals>.<dictcomp>r   c                 s  s   | ]}t |jV  qd S rA   )r   r8   r   fr   r   r   rL      s     c                 s  s   | ]}|j V  qd S rA   rB   ri   r   r   r   rL      s     c                 3  s   | ]}t | V  qd S rA   rc   r   itemr`   ra   r   r   rL      s    c                   s   g | ]}t | qS r   rc   rk   rm   r   r   rQ      s     z,_unpack_remotedata_inner.<locals>.<listcomp>c                   s   g | ]}t | qS r   rc   rk   rm   r   r   rQ      s     c                   s   i | ]\}}|t | qS r   rc   r   rm   r   r   r      s    )rE   tupler   r   r2   dskr&   inkeysZoutkeyr   r   collection_typesr'   
issubclassr@   r8   r   r-   )rS   r`   ra   typargsZscro   Zfuture_keysrp   Zoutsr   r   )r`   ra   rh   r   rd      s`    
 






rd   Fztuple[Any, set])rS   r`   rb   c                 C  s   t  }t| |||fS )a  Unpack WrappedKey objects from collection

    Returns original collection and set of all found WrappedKey objects

    Examples
    --------
    >>> rd = WrappedKey('mykey')
    >>> unpack_remotedata(1)
    (1, set())
    >>> unpack_remotedata(())
    ((), set())
    >>> unpack_remotedata(rd)
    ('mykey', {WrappedKey('mykey')})
    >>> unpack_remotedata([1, rd])
    ([1, 'mykey'], {WrappedKey('mykey')})
    >>> unpack_remotedata({1: rd})
    ({1: 'mykey'}, {WrappedKey('mykey')})
    >>> unpack_remotedata({1: [rd]})
    ({1: ['mykey']}, {WrappedKey('mykey')})

    Use the ``byte_keys=True`` keyword to force string keys

    >>> rd = WrappedKey(('x', 1))
    >>> unpack_remotedata(rd, byte_keys=True)
    ("('x', 1)", {WrappedKey('('x', 1)')})
    )r   rd   )rS   r`   ra   r   r   r   unpack_remotedata   s    ru   c                   s   t | }z t| r&|  kr& |  W S W n tk
r<   Y nX |tkr^| fdd| D S |tkr~ fdd|  D S t| r| fdd| D  S | S dS )a  Merge known data into tuple or dict

    Parameters
    ----------
    o
        core data structures containing literals and keys
    d : dict
        mapping of keys to data

    Examples
    --------
    >>> data = {'x': 1}
    >>> pack_data(('x', 'y'), data)
    (1, 'y')
    >>> pack_data({'a': 'x', 'b': 'y'}, data)  # doctest: +SKIP
    {'a': 1, 'b': 'y'}
    >>> pack_data({'a': ['x'], 'b': 'y'}, data)  # doctest: +SKIP
    {'a': [1], 'b': 'y'}
    c                   s   g | ]}t | d qS )	key_types	pack_datar   xr6   rw   r   r   rQ   #  s     zpack_data.<locals>.<listcomp>c                   s    i | ]\}}|t | d qS rv   rx   r   r|   r   r   r   %  s      zpack_data.<locals>.<dictcomp>c                   s   g | ]}t | d qS rv   rx   rz   r|   r   r   rQ   '  s     N)rE   rU   	TypeErrorrq   r'   r&   r   )rS   r6   rw   rs   r   r|   r   ry     s    ry   c                   s   t | }|tkrH| rHt| d rH| d ft fdd| dd D  S |tkrb fdd| D S |tkr fdd	|  D S z | | W S  tk
r   |  Y S X dS )
aX  Perform substitutions on a tasks

    Parameters
    ----------
    o
        Core data structures containing literals and keys
    d : dict
        Mapping of keys to values

    Examples
    --------
    >>> dsk = {"a": (sum, ["x", 2])}
    >>> data = {"x": 1}
    >>> subs_multiple(dsk, data)  # doctest: +SKIP
    {'a': (sum, [1, 2])}

    r   c                 3  s   | ]}t | V  qd S rA   subs_multiplere   r6   r   r   rL   @  s     z subs_multiple.<locals>.<genexpr>rT   Nc                   s   g | ]}t | qS r   r~   re   r   r   r   rQ   B  s     z!subs_multiple.<locals>.<listcomp>c                   s   i | ]\}}|t | qS r   r~   r   r   r   r   r   D  s      z!subs_multiple.<locals>.<dictcomp>)rE   rn   callabler$   r'   r&   getr}   )rS   r6   rs   r   r   r   r   ,  s    (r   g?c           
        s   t |D ]}z|  I dH W   S  |k
r } zt|p:t| }td| d| d| d|  t|d| d  |}	|dkr|	dt |  9 }	t|	I dH  W 5 d}~X Y qX q|  I dH S )	a  
    Return the result of ``await coro()``, re-trying in case of exceptions

    The delay between attempts is ``delay_min * (2 ** i - 1)`` where ``i`` enumerates the attempt that just failed
    (starting at 0), but never larger than ``delay_max``.
    This yields no delay between the first and second attempt, then ``delay_min``, ``3 * delay_min``, etc.
    (The reason to re-try with no delay is that in most cases this is sufficient and will thus recover faster
    from a communication failure).

    Parameters
    ----------
    coro
        The coroutine function to call and await
    count
        The maximum number of re-tries before giving up. 0 means no re-try; must be >= 0.
    delay_min
        The base factor for the delay (in seconds); this is the first non-zero delay between re-tries.
    delay_max
        The maximum delay (in seconds) between consecutive re-tries (without jitter)
    jitter_fraction
        The maximum jitter to add to the delay, as fraction of the total delay. No jitter is added if this
        value is <= 0.
        Using a non-zero value here avoids "herd effects" of many operations re-tried at the same time
    retry_on_exceptions
        A tuple of exception classes to retry. Other exceptions are not caught and re-tried, but propagate immediately.
    operation
        A human-readable description of the operation attempted; used only for logging failures

    Returns
    -------
    Any
        Whatever `await coro()` returned
    Nz	Retrying z after exception in attempt /z:    rT   r   )rangestrr0   r1   minr)   r   sleep)
corocount	delay_min	delay_maxZjitter_fractionZretry_on_exceptions	operationZi_tryexdelayr   r   r   retryL  s    +$r   )r   c                  sX   t jd}tt jddd}tt jddd}tt| f||||||dI dH S )zT
    Retry an operation using the configuration values for the retry parameters
    zdistributed.comm.retry.countz distributed.comm.retry.delay.mins)defaultz distributed.comm.retry.delay.max)r   r   r   r   N)daskconfigr   r   r   r   )r   r   rt   kwargsZretry_countZretry_delay_minZretry_delay_maxr   r   r   retry_operation  s     
 
 r   )TNN)F)3
__future__r   r   loggingr)   collectionsr   	functoolsr   	itertoolsr   typingr   Ztlzr   r   r	   r
   Zdask.configr   Zdask.optimizationr   Z
dask.utilsr   r   r   Zdistributed.corer   Zdistributed.utilsr   	getLoggerrF   r0   r?   r@   rX   r^   rn   r$   r   	frozensetrq   rd   ru   objectry   r   EnvironmentErrorIOErrorr   r   r   r   r   r   <module>   s8   

K,B%%
: