U
    /eJ                    @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZmZmZmZ d dlmZmZmZ d dlmZ d dlmZ d dlZd dlmZ d d	lmZmZmZmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z. d d
l/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6m7Z7m8Z8m9Z9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@mAZAmBZBmCZCmDZD d dlEmFZFmGZG d dlHmIZI d dlJmKZKmLZLmMZM d dlNmOZO d dlPmQZQmRZRmSZSmTZTmUZUmVZVmWZWmXZXmYZYmZZZm[Z[m\Z\ e8]de8d Z^dZ_e`deafddd dZbddd Zcd!d" Zddd#d$Zedd%d&Zfd'd( Zgdd)e[ddd*fd+d,Zhd-d. Zid/d0 ZjG d1d2 d2Zkd3d4 ZlG d5d6 d6e6ZmG d7d8 d8e6Zndd9d:Zodd<d=Zpd>d? Zqdd@dAZrdBdC ZsdDdE ZtdFdG ZudHdI ZvdJdK ZwddLdMZxddNdOZydPdQ ZzdRdS Z{dTdU Z|ddVdWZ}G dXdY dYeZ~dZd[ Zd\d] Zd^d_ Zd`da Ze_fdbdcZddde ZedffdgdhZddidjZdkdl Zdmdn ZddodpZdqdr ZddsdtZdudv Zdwdx Zdydz Zd{d| Zd}d~ Zdd Zdd ZdS )    )annotationsN)defaultdict)HashableIterableIteratorMapping)partialreducewraps)Random)urlopen)
open_files)
accumulatecomposecountcurryfirstfrequenciesgroupbyjoinmerge
merge_withpartition_allpeekpluckreducebyremovesecondtaketopkuniquevalmap)config)chunkto_avro)DaskMethodsMixindont_optimizenamed_schedulersreplace_name_in_keytokenize)	blockwise)globalmethod)flattenget_dependenciesistaskquotereverse_dict)Delayedunpack_collections)HighLevelGraph)cullfuseinline)sizeof)applydigitensure_bytesensure_dictensure_unicodefuncnameinsertiter_chunks	key_splitparse_bytessystem_encodingtakes_multiple_argumentsZ	processessyncZ__no__default__	no_result c                 C  s   dS )NrF   rG   selfrG   rG   1/tmp/pip-unpacked-wheel-dbjnr7gq/dask/bag/core.py<lambda>L       rK   )	__slots__
__reduce__Tc                 C  s   t | tkr&t| dk r&dd | D S t| s2| S | d | dd  }}|sn|ttfkrn| d } t|ddiS |ftd	d
 |D  S dS )a  
    Given a task, remove unnecessary calls to ``list`` and ``reify``.

    This traverses tasks and small lists.  We choose not to traverse down lists
    of size >= 50 because it is unlikely that sequences this long contain other
    sequences in practice.

    Examples
    --------
    >>> def inc(x):
    ...     return x + 1
    >>> task = (sum, (list, (map, inc, [1, 2, 3])))
    >>> lazify_task(task)  # doctest: +ELLIPSIS
    (<built-in function sum>, (<class 'map'>, <function inc at ...>, [1, 2, 3]))
    2   c                 S  s   g | ]}t |d qS )Flazify_task.0argrG   rG   rJ   
<listcomp>a   s     zlazify_task.<locals>.<listcomp>r      NstartFc                 s  s   | ]}t |d V  qdS )FNrP   rR   rG   rG   rJ   	<genexpr>i   s     zlazify_task.<locals>.<genexpr>)typelistlenr/   reifyrQ   tuple)taskrW   headtailrG   rG   rJ   rQ   P   s    rQ   c                 C  s
   t t| S )zq
    Remove unnecessary calls to ``list`` in tasks.

    See Also
    --------
    dask.bag.core.lazify_task
    )r!   rQ   dskrG   rG   rJ   lazifyl   s    rc   c                   sl   |dkrfdd  D }t|  fdd  D }|t| t|dd|D ]
}|= q\S )zInline lists that are only used once.

    >>> d = {'b': (list, 'a'),
    ...      'c': (sum, 'b', 1)}
    >>> inline_singleton_lists(d, 'c')
    {'c': (<built-in function sum>, (<class 'list'>, 'a'), 1)}

    Pairs nicely with lazify afterwards.
    Nc                   s   i | ]\}}|t  |d qS ))r^   )r.   rS   kvra   rG   rJ   
<dictcomp>   s      z*inline_singleton_lists.<locals>.<dictcomp>c                   s<   h | ]4\}}t |r|r|d  tkrt | dkr|qS )r   rV   )r/   rZ   r[   rd   )
dependentsrG   rJ   	<setcomp>   s      z)inline_singleton_lists.<locals>.<setcomp>F)Zinline_constants)itemsr1   difference_updater-   r7   )rb   keysdependenciesZinline_keysre   rG   )rh   rb   rJ   inline_singleton_listsw   s    

rn   c           
      K  s^   t | } t| |\}}i }|dk	r*||d< t|||p6g  |f|\}}t|||}t|}	|	S )z Optimize a dask from a dask Bag.NZrename_keys)r<   r5   r6   rn   rc   )
rb   rl   Z	fuse_keysZrename_fused_keyskwargsdsk2rm   dsk3dsk4Zdsk5rG   rG   rJ   optimize   s    rs   c              	   C  sp   |b}t |tjrd}t}nd}t}d}| D ]&}|r@|| nd}||| q,|rb|| W 5 Q R X d S )N
   
FT)
isinstanceioTextIOWrapperr=   r;   write)dataZ	lazy_filelast_endlinefendlineZensurestarteddrG   rG   rJ   _to_textfiles_chunk   s    r   inferFc                   s   |dkrdnd}	t |f||	|| jd|p,i }
dt j  fddt|
D }tj| gd}t | j}|r|j	f | d	d
 |
D S |
 S dS )al  Write dask Bag to disk, one filename per partition, one line per element.

    **Paths**: This will create one file for each partition in your bag. You
    can specify the filenames in a variety of ways.

    Use a globstring

    >>> b.to_textfiles('/path/to/data/*.json.gz')  # doctest: +SKIP

    The * will be replaced by the increasing sequence 1, 2, ...

    ::

        /path/to/data/0.json.gz
        /path/to/data/1.json.gz

    Use a globstring and a ``name_function=`` keyword argument.  The
    name_function function should expect an integer and produce a string.
    Strings produced by name_function must preserve the order of their
    respective partition indices.

    >>> from datetime import date, timedelta
    >>> def name(i):
    ...     return str(date(2015, 1, 1) + i * timedelta(days=1))

    >>> name(0)
    '2015-01-01'
    >>> name(15)
    '2015-01-16'

    >>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)  # doctest: +SKIP

    ::

        /path/to/data/2015-01-01.json.gz
        /path/to/data/2015-01-02.json.gz
        ...

    You can also provide an explicit list of paths.

    >>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]  # doctest: +SKIP
    >>> b.to_textfiles(paths) # doctest: +SKIP

    **Compression**: Filenames with extensions corresponding to known
    compression algorithms (gz, bz2) will be compressed accordingly.

    **Bag Contents**: The bag calling ``to_textfiles`` must be a bag of
    text strings. For example, a bag of dictionaries could be written to
    JSON text files by mapping ``json.dumps`` on to the bag first, and
    then calling ``to_textfiles`` :

    >>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")  # doctest: +SKIP

    **Last endline**: By default the last line does not end with a newline
    character. Pass ``last_endline=True`` to invert the default.
    Nwbwt)compressionmodeencodingname_functionnumzto-textfiles-c                   s(   i | ] \}}|ft  j|f|fqS rG   )r   name)rS   ir|   br{   r   rG   rJ   rg      s    z to_textfiles.<locals>.<dictcomp>rm   c                 S  s   g | ]
}|j qS rG   )path)rS   r|   rG   rG   rJ   rU   	  s     z to_textfiles.<locals>.<listcomp>)r   npartitionsuuiduuid4hex	enumerater4   from_collectionsrY   compute
to_delayed)r   r   r   r   r   r   storage_optionsr{   ro   r   filesrb   graphoutrG   r   rJ   to_textfiles   s,    C
r   c                 C  sV   | s| S t | trt| } t | d tr@t | d ts@t| } t | trRt| } | S Nr   )rv   r   rZ   r   strtoolzconcatresultsrG   rG   rJ   finalize  s    


r   c                 C  s   | d S r   rG   r   rG   rG   rJ   finalize_item  s    r   c                   @  s8   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d ZdS )StringAccessora  String processing functions

    Examples
    --------

    >>> import dask.bag as db
    >>> b = db.from_sequence(['Alice Smith', 'Bob Jones', 'Charlie Smith'])
    >>> list(b.str.lower())
    ['alice smith', 'bob jones', 'charlie smith']

    >>> list(b.str.match('*Smith'))
    ['Alice Smith', 'Charlie Smith']

    >>> list(b.str.split(' '))
    [['Alice', 'Smith'], ['Bob', 'Jones'], ['Charlie', 'Smith']]
    c                 C  s
   || _ d S N)_bag)rI   bagrG   rG   rJ   __init__0  s    zStringAccessor.__init__c                 C  s   t ttt| tt S r   )sortedsetdirrY   r   rH   rG   rG   rJ   __dir__3  s    zStringAccessor.__dir__c                 O  s   | j tj|f||S r   )r   mapoperatormethodcaller)rI   keyargsro   rG   rG   rJ   _strmap6  s    zStringAccessor._strmapc                 C  sX   zt | |W S  tk
rR   |ttkrLtt|}t|t| j| Y S  Y nX d S r   )	object__getattribute__AttributeErrorr   r   getattrrobust_wrapsr   r   )rI   r   funcrG   rG   rJ   __getattr__9  s    
zStringAccessor.__getattr__c                 C  s    ddl m } | jt||dS )aX  Filter strings by those that match a pattern.

        Examples
        --------

        >>> import dask.bag as db
        >>> b = db.from_sequence(['Alice Smith', 'Bob Jones', 'Charlie Smith'])
        >>> list(b.str.match('*Smith'))
        ['Alice Smith', 'Charlie Smith']

        See Also
        --------
        fnmatch.fnmatch
        r   )fnmatch)pat)r   r   filterr   )rI   patternr   rG   rG   rJ   matchC  s    zStringAccessor.matchN)	__name__
__module____qualname____doc__r   r   r   r   r   rG   rG   rG   rJ   r     s   
r   c                   s    fdd}|S )z-A weak version of wraps that only copies doc.c                   s    j | _ | S r   )r   )wrappedwrapperrG   rJ   _Z  s    zrobust_wraps.<locals>._rG   )r   r   rG   r   rJ   r   W  s    r   c                   @  s   e Zd Zd"ddZdd Zdd Zdd	 Zd
d Zee	de
dZeeZdd Zdd ZddddZedd Zedd Zdd Zdd Zdd Zej Z Z ZZd#d d!ZdS )$ItemNc                 C  sR   || _ || _|| _|p|| _t|trN| j|jkrNtd| j dt|j d S )NzLayer z% not in the HighLevelGraph's layers: )	daskr   r   _layerrv   r4   Zlayers
ValueErrorrZ   )rI   rb   r   layerrG   rG   rJ   r   b  s    
zItem.__init__c                 C  s   | j S r   r   rH   rG   rG   rJ   __dask_graph__p  s    zItem.__dask_graph__c                 C  s   | j gS r   r   rH   rG   rG   rJ   __dask_keys__s  s    zItem.__dask_keys__c                 C  s   | j fS r   )r   rH   rG   rG   rJ   __dask_layers__v  s    zItem.__dask_layers__c                 C  s   | j S r   r   rH   rG   rG   rJ   __dask_tokenize__y  s    zItem.__dask_tokenize__bag_optimizer   Zfalseyc                 C  s   t dfS NrG   )r   rH   rG   rG   rJ   __dask_postcompute__  s    zItem.__dask_postcompute__c                 C  s
   | j dfS r   _rebuildrH   rG   rG   rJ   __dask_postpersist__  s    zItem.__dask_postpersist__renamec                C  s    |rt | j|n| j}t||S r   )r)   r   r   )rI   rb   r   r   rG   rG   rJ   r     s    zItem._rebuildc                 C  sT   ddl m}m} t| |s,t| dr,|| } t| |s:tt| j| j| 	 d dS )zfCreate bag item from a dask.delayed value.

        See ``dask.bag.from_delayed`` for details
        r   r2   delayedr   r   )
dask.delayedr2   r   rv   hasattrAssertionErrorr   r   r   r   )valuer2   r   rG   rG   rJ   from_delayed  s
    zItem.from_delayedc                 C  s   | j | jfS r   r   r   rH   rG   rG   rJ   _args  s    z
Item._argsc                 C  s   | j S r   r   rH   rG   rG   rJ   __getstate__  s    zItem.__getstate__c                 C  s   |\| _ | _d S r   r   rI   staterG   rG   rJ   __setstate__  s    zItem.__setstate__c                 C  sB   d t|t| |d}||| jfi}tj||| gd}t||S )N{}-{}r9   r   )formatr>   r*   r   r4   r   r   )rI   r   r   rb   r   rG   rG   rJ   r9     s    z
Item.applyTc                 C  s:   ddl m} |  }|r(| ||  }|| j|| jdS )zConvert into a ``dask.delayed`` object.

        Parameters
        ----------
        optimize_graph : bool, optional
            If True [default], the graph is optimized before converting into
            ``dask.delayed`` objects.
        r   r2   r   )r   r2   r   __dask_optimize__r   r   r   )rI   optimize_graphr2   rb   rG   rG   rJ   r     s
    	zItem.to_delayed)N)T)r   r   r   r   r   r   r   r   r,   rs   r'   r   staticmethodDEFAULT_GET__dask_scheduler__r   r   r   r   propertyr   r   r   r9   r&   r   __int__	__float____complex____bool__r   rG   rG   rG   rJ   r   a  s&   


r   c                   @  s.  e Zd ZdZddddddZdd	 Zd
dddZdd Zdd Ze	e
dedZeeZdd Zdd ZddddZdd ZeZeedZdd  Zd!d" Zed#d$ Zd%d& Zd'd( Zd)d* Zdsd+d,Zd-d. Zd/d0 Z e!fd1d2Z"d3d4 Z#e$e%dd5e&d6dd7fd8d9Z%e$e'dtd<d=Z'de!de(fd>d?Z)dud@dAZ*dvdBdCZ+dwdDdEZ,de(dfdFdGZ-dxdHdIZ.dydJdKZ/dzdLdMZ0d{dNdOZ1d|dPdQZ2d}dRdSZ3dTdU Z4d~dWdXZ5ddYdZZ6dd[d\Z7d]d^ Z8e!de!dfd_d`Z9ddbdcZ:ddde Z;dfdg Z<ddidjZ=ddkdlZ>ddmdnZ?ddodpZ@e!fdqdrZAdS )Baga  Parallel collection of Python objects

    Examples
    --------
    Create Bag from sequence

    >>> import dask.bag as db
    >>> b = db.from_sequence(range(5))
    >>> list(b.filter(lambda x: x % 2 == 0).map(lambda x: x * 10))
    [0, 20, 40]

    Create Bag from filename or globstring of filenames

    >>> b = db.read_text('/path/to/mydata.*.json.gz').map(json.loads)  # doctest: +SKIP

    Create manually (expert use)

    >>> dsk = {('x', 0): (range, 5),
    ...        ('x', 1): (range, 5),
    ...        ('x', 2): (range, 5)}
    >>> b = db.Bag(dsk, 'x', npartitions=3)

    >>> sorted(b.map(lambda x: x * 10))
    [0, 0, 0, 10, 10, 10, 20, 20, 20, 30, 30, 30, 40, 40, 40]

    >>> int(b.fold(lambda x, y: x + y))
    30
    r   r   int)rb   r   r   c                 C  s0   t |tstj||g d}|| _|| _|| _d S )Nr   )rv   r4   r   r   r   r   )rI   rb   r   r   rG   rG   rJ   r     s
    
zBag.__init__c                 C  s   | j S r   r   rH   rG   rG   rJ   r     s    zBag.__dask_graph__zlist[Hashable])returnc                   s    fddt  jD S )Nc                   s   g | ]} j |fqS rG   r   rS   r   rH   rG   rJ   rU     s     z%Bag.__dask_keys__.<locals>.<listcomp>)ranger   rH   rG   rH   rJ   r     s    zBag.__dask_keys__c                 C  s   | j fS r   r   rH   rG   rG   rJ   r     s    zBag.__dask_layers__c                 C  s   | j S r   r   rH   rG   rG   rJ   r     s    zBag.__dask_tokenize__r   r   c                 C  s   t dfS r   )r   rH   rG   rG   rJ   r     s    zBag.__dask_postcompute__c                 C  s
   | j dfS r   r   rH   rG   rG   rJ   r     s    zBag.__dask_postpersist__Nr   c                C  s(   | j }|r|||}t| ||| jS r   )r   getrY   r   )rI   rb   r   r   rG   rG   rJ   r     s    zBag._rebuildc                 C  s   dt | j| jf S )Nzdask.bag<%s, npartitions=%d>)rA   r   r   rH   rG   rG   rJ   __str__  s    zBag.__str__)fgetc                 O  s   t || f||S )a  Apply a function elementwise across one or more bags.

        Note that all ``Bag`` arguments must be partitioned identically.

        Parameters
        ----------
        func : callable
        *args, **kwargs : Bag, Item, or object
            Extra arguments and keyword arguments to pass to ``func`` *after*
            the calling bag instance. Non-Bag args/kwargs are broadcasted
            across all calls to ``func``.

        Notes
        -----
        For calls with multiple `Bag` arguments, corresponding partitions
        should have the same length; if they do not, the call will error at
        compute time.

        Examples
        --------
        >>> import dask.bag as db
        >>> b = db.from_sequence(range(5), npartitions=2)
        >>> b2 = db.from_sequence(range(5, 10), npartitions=2)

        Apply a function to all elements in a bag:

        >>> b.map(lambda x: x + 1).compute()
        [1, 2, 3, 4, 5]

        Apply a function with arguments from multiple bags:

        >>> from operator import add
        >>> b.map(add, b2).compute()
        [5, 7, 9, 11, 13]

        Non-bag arguments are broadcast across all calls to the mapped
        function:

        >>> b.map(add, 1).compute()
        [1, 2, 3, 4, 5]

        Keyword arguments are also supported, and have the same semantics as
        regular arguments:

        >>> def myadd(x, y=0):
        ...     return x + y
        >>> b.map(myadd, y=b2).compute()
        [5, 7, 9, 11, 13]
        >>> b.map(myadd, y=1).compute()
        [1, 2, 3, 4, 5]

        Both arguments and keyword arguments can also be instances of
        ``dask.bag.Item``. Here we'll add the max value in the bag to each
        element:

        >>> b.map(myadd, b.max()).compute()
        [4, 5, 6, 7, 8]
        )bag_maprI   r   r   ro   rG   rG   rJ   r      s    ;zBag.mapc                   s|   d t t dfg}r<t\}||  fddtjD }tj||d}t	|jS )a  Apply a function using argument tuples from the given bag.

        This is similar to ``itertools.starmap``, except it also accepts
        keyword arguments. In pseudocode, this is could be written as:

        >>> def starmap(func, bag, **kwargs):
        ...     return (func(*args, **kwargs) for args in bag)

        Parameters
        ----------
        func : callable
        **kwargs : Item, Delayed, or object, optional
            Extra keyword arguments to pass to ``func``. These can either be
            normal objects, ``dask.bag.Item``, or ``dask.delayed.Delayed``.

        Examples
        --------
        >>> import dask.bag as db
        >>> data = [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
        >>> b = db.from_sequence(data, npartitions=2)

        Apply a function to each argument tuple:

        >>> from operator import add
        >>> b.starmap(add).compute()
        [3, 7, 11, 15, 19]

        Apply a function to each argument tuple, with additional keyword
        arguments:

        >>> def myadd(x, y, z=0):
        ...     return x + y + z
        >>> b.starmap(myadd, z=10).compute()
        [13, 17, 21, 25, 29]

        Keyword arguments can also be instances of ``dask.bag.Item`` or
        ``dask.delayed.Delayed``:

        >>> max_second = b.pluck(1).max()
        >>> max_second.compute()
        10
        >>> b.starmap(myadd, z=max_second).compute()
        [13, 17, 21, 25, 29]
        r   starmapc                   s(   i | ] }|ft t j|fffqS rG   )r\   starmap_chunkr   r   r   ro   r   rI   rG   rJ   rg   p  s    zBag.starmap.<locals>.<dictcomp>r   )
r   r>   r*   unpack_scalar_dask_kwargsextendr   r   r4   r   rY   )rI   r   ro   rm   collectionsrb   r   rG   r  rJ   r  =  s    -
zBag.starmapc                 C  s   | j | j| jfS r   r   r   r   rH   rG   rG   rJ   r   w  s    z	Bag._argsc                 C  s   | j S r   r   rH   rG   rG   rJ   r   {  s    zBag.__getstate__c                 C  s   |\| _ | _| _d S r   r  r   rG   rG   rJ   r   ~  s    zBag.__setstate__c                   sZ   dt  dt   fddtjD }tj |gd}t| jS )a   Filter elements in collection by a predicate function.

        >>> def iseven(x):
        ...     return x % 2 == 0

        >>> import dask.bag as db
        >>> b = db.from_sequence(range(5))
        >>> list(b.filter(iseven))
        [0, 2, 4]
        zfilter--c                   s&   i | ]} |ft tj|fffqS rG   )r\   r   r   r   r   	predicaterI   rG   rJ   rg     s    zBag.filter.<locals>.<dictcomp>r   r>   r*   r   r   r4   r   rY   rI   r  rb   r   rG   r
  rJ   r     s    z
Bag.filterc                   s   d  krdksn t dt|ts0t|}dt|   tj|} fddttj|D }t	j
 |gd}t| jS )a  Return elements from bag with probability of ``prob``.

        Parameters
        ----------
        prob : float
            A float between 0 and 1, representing the probability that each
            element will be returned.
        random_state : int or random.Random, optional
            If an integer, will be used to seed a new ``random.Random`` object.
            If provided, results in deterministic sampling.

        Examples
        --------
        >>> import dask.bag as db
        >>> b = db.from_sequence(range(5))
        >>> list(b.random_sample(0.5, 43))
        [0, 3, 4]
        >>> list(b.random_sample(0.5, 43))
        [0, 3, 4]
        r   rV   z,prob must be a number in the interval [0, 1]zrandom-sample-%sc                   s,   i | ]$\}} |ft tj|f|ffqS rG   )r\   random_sampler   )rS   r   r   r   probrI   rG   rJ   rg     s    z%Bag.random_sample.<locals>.<dictcomp>r   )r   rv   r   r*   getstaterandom_state_data_pythonr   zipr   r4   r   rY   )rI   r  random_state
state_datarb   r   rG   r  rJ   r    s    
zBag.random_samplec                   sZ   dt  dt   fddtjD }tj |gd}t| jS )zRemove elements in collection that match predicate.

        >>> def iseven(x):
        ...     return x % 2 == 0

        >>> import dask.bag as db
        >>> b = db.from_sequence(range(5))
        >>> list(b.remove(iseven))
        [1, 3]
        zremove-r	  c                   s&   i | ]} |ft tj|fffqS rG   )r\   r   r   r   r
  rG   rJ   rg     s    zBag.remove.<locals>.<dictcomp>r   r  r  rG   r
  rJ   r     s    z
Bag.removec                 O  s   t || f||S )ak  Apply a function to every partition across one or more bags.

        Note that all ``Bag`` arguments must be partitioned identically.

        Parameters
        ----------
        func : callable
            The function to be called on every partition.
            This function should expect an ``Iterator`` or ``Iterable`` for
            every partition and should return an ``Iterator`` or ``Iterable``
            in return.
        *args, **kwargs : Bag, Item, Delayed, or object
            Arguments and keyword arguments to pass to ``func``.
            Partitions from this bag will be the first argument, and these will
            be passed *after*.

        Examples
        --------
        >>> import dask.bag as db
        >>> b = db.from_sequence(range(1, 101), npartitions=10)
        >>> def div(nums, den=1):
        ...     return [num / den for num in nums]

        Using a python object:

        >>> hi = b.max().compute()
        >>> hi
        100
        >>> b.map_partitions(div, den=hi).take(5)
        (0.01, 0.02, 0.03, 0.04, 0.05)

        Using an ``Item``:

        >>> b.map_partitions(div, den=b.max()).take(5)
        (0.01, 0.02, 0.03, 0.04, 0.05)

        Note that while both versions give the same output, the second forms a
        single graph, and then computes everything at once, and in some cases
        may be more efficient.
        )map_partitionsr  rG   rG   rJ   r    s    )zBag.map_partitionsc                   s   dt   t tkr>fddtjD }n fddtjD }tj|gd}t|jS )ak  Select item from all tuples/dicts in collection.

        >>> import dask.bag as db
        >>> b = db.from_sequence([{'name': 'Alice', 'credits': [1, 2, 3]},
        ...                       {'name': 'Bob',   'credits': [10, 20]}])
        >>> list(b.pluck('name'))
        ['Alice', 'Bob']
        >>> list(b.pluck('credits').pluck(0))
        [1, 10]
        zpluck-c                   s&   i | ]}|ft t j|fffqS rG   rZ   r   r   r   )r   r   rI   rG   rJ   rg     s    zBag.pluck.<locals>.<dictcomp>c                   s(   i | ] }|ft tj|f ffqS rG   r  r   defaultr   r   rI   rG   rJ   rg     s    r   )r*   r0   
no_defaultr   r   r4   r   rY   )rI   r   r  rb   r   rG   r  rJ   r     s    z	Bag.pluckc                   s   t  fddt|D S )a  Transform a bag of tuples to ``n`` bags of their elements.

        Examples
        --------
        >>> import dask.bag as db
        >>> b = db.from_sequence([(i, i + 1, i + 2) for i in range(10)])
        >>> first, second, third = b.unzip(3)
        >>> isinstance(first, db.Bag)
        True
        >>> first.compute()
        [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

        Note that this is equivalent to:

        >>> first, second, third = (b.pluck(i) for i in range(3))
        c                 3  s   | ]}  |V  qd S r   )r   r   rH   rG   rJ   rX      s     zBag.unzip.<locals>.<genexpr>r]   r   )rI   nrG   rH   rJ   unzip  s    z	Bag.unzipr   TFc           	      K  s    t | |||||f||d|S )N)r   r{   )r   )	rI   r   r   r   r   r   r   r{   ro   rG   rG   rJ   r   "  s    	zBag.to_textfilesnull>  c	           
   
   K  s   t | ||||||||f	|	S r   r$   )
rI   filenameZschemar   r   codecZsync_intervalmetadatar   ro   rG   rG   rJ   r%   :  s    
zBag.to_avroc                 C  s\   |p|}|t k	r2| jtt||dtt|||dS ddlm} | j||||||dS dS )a  Parallelizable reduction

        Fold is like the builtin function ``reduce`` except that it works in
        parallel.  Fold takes two binary operator functions, one to reduce each
        partition of our dataset and another to combine results between
        partitions

        1.  ``binop``: Binary operator to reduce within each partition
        2.  ``combine``:  Binary operator to combine results from binop

        Sequentially this would look like the following:

        >>> intermediates = [reduce(binop, part) for part in partitions]  # doctest: +SKIP
        >>> final = reduce(combine, intermediates)  # doctest: +SKIP

        If only one function is given then it is used for both functions
        ``binop`` and ``combine`` as in the following example to compute the
        sum:

        >>> def add(x, y):
        ...     return x + y

        >>> import dask.bag as db
        >>> b = db.from_sequence(range(5))
        >>> b.fold(add).compute()
        10

        In full form we provide both binary operators as well as their default
        arguments

        >>> b.fold(binop=add, combine=add, initial=0).compute()
        10

        More complex binary operators are also doable

        >>> def add_to_set(acc, x):
        ...     ''' Add new element x to set acc '''
        ...     return acc | set([x])
        >>> b.fold(add_to_set, set.union, initial=set()).compute()
        {0, 1, 2, 3, 4}

        See Also
        --------

        Bag.foldby
        initial)split_everyout_typer   )r	   N)r  	reductionr   _reduceZtlz.curriedr	   )rI   binopcombiner$  r%  r&  r	   rG   rG   rJ   foldT  s    1zBag.foldc                 C  s2   | j ttt|ddt}|r.|jttdd}|S )zCount number of occurrences of each distinct element.

        >>> import dask.bag as db
        >>> b = db.from_sequence(['Alice', 'Bob', 'Alice'])
        >>> dict(b.frequencies())       # doctest: +SKIP
        {'Alice': 2, 'Bob', 1}
        r   r&  r%  r   T)r   reverse)r'  r   merge_frequenciesr   r  	dictitemsr   r   )rI   r%  sortresultrG   rG   rJ   r     s    zBag.frequenciesc                 C  sT   |r.t |rt|rtt|}tt||d}n
tt|}| j|t|tjt	|ddS )a  K largest elements in collection

        Optionally ordered by some key function

        >>> import dask.bag as db
        >>> b = db.from_sequence([10, 3, 5, 7, 11, 4])
        >>> list(b.topk(2))
        [11, 10]

        >>> list(b.topk(2, lambda x: -x))
        [3, 4]
        r   r   r,  )
callablerD   r   r9   r   r'  r   r   r   r   )rI   re   r   r%  r   rG   rG   rJ   r     s    


zBag.topkc                 C  sB   |dkrt n
tt |d}|dkr$tn
tt|d}| j||tddS )a	  Distinct elements of collection

        Unordered without repeats.

        Parameters
        ----------
        key: {callable,str}
            Defines uniqueness of items in bag by calling ``key`` on each item.
            If a string is passed ``key`` is considered to be ``lambda x: x[key]``.

        Examples
        --------
        >>> import dask.bag as db
        >>> b = db.from_sequence(['Alice', 'Bob', 'Alice'])
        >>> sorted(b.distinct())
        ['Alice', 'Bob']
        >>> b = db.from_sequence([{'name': 'Alice'}, {'name': 'Bob'}, {'name': 'Alice'}])
        >>> b.distinct(key=lambda x: x['name']).compute()
        [{'name': 'Alice'}, {'name': 'Bob'}]
        >>> b.distinct(key='name').compute()
        [{'name': 'Alice'}, {'name': 'Bob'}]
        Nr   distinct)r&  r   )chunk_distinctr   merge_distinctr'  r   )rI   r   r   ZaggrG   rG   rJ   r3    s    zBag.distinctc                   s`  |dkrd}|dkrj }t||}|p2t d|  j dk fddtj D }j } |pzt| d| }	d	}
||kr|	t|
 }tt|t|D ]*\}}t|fd
d|D df|||f< q|d }||
d7 }
qt|fddt|D df||	d	f< tj	|	|gd}|t
krP||	d	f||	< t
||	S t||	dS dS )a  Reduce collection with reduction operators.

        Parameters
        ----------
        perpartition: function
            reduction to apply to each partition
        aggregate: function
            reduction to apply to the results of all partitions
        split_every: int (optional)
            Group partitions into groups of this size while performing reduction
            Defaults to 8
        out_type: {Bag, Item}
            The out type of the result, Item if a single element, Bag if a list
            of elements.  Defaults to Item.

        Examples
        --------
        >>> import dask.bag as db
        >>> b = db.from_sequence(range(10))
        >>> b.reduction(sum, sum).compute()
        45
        N   F-part-rV   c                   s$   i | ]} |ft j|ffqS rG   )empty_safe_applyr   r   )ais_lastperpartitionrI   rG   rJ   rg     s    z!Bag.reduction.<locals>.<dictcomp>z-aggregate-r   c                   s   g | ]} |fqS rG   rG   rS   jr   rG   rJ   rU     s     z!Bag.reduction.<locals>.<listcomp>c                   s   g | ]} |fqS rG   rG   r<  r>  rG   rJ   rU     s     Tr   )r   r*   r>   r   r   r   r   empty_safe_aggregater4   r   r   popr   )rI   r;  Z	aggregater%  r&  r   tokenrb   re   fmtdepthcr   indsr   rG   )r9  r   r:  r;  rI   rJ   r'    sF    



zBag.reductionc                 C  s   | j tt|dS )zSum all elementsr%  )r'  sumrI   r%  rG   rG   rJ   rG  &  s    zBag.sumc                 C  s   | j tt|dS )zMaximum elementrF  )r'  maxrH  rG   rG   rJ   rI  *  s    zBag.maxc                 C  s   | j tt|dS )zMinimum elementrF  )r'  minrH  rG   rG   rJ   rJ  .  s    zBag.minc                 C  s   | j tt|dS )zAre any of the elements truthy?

        Examples
        --------
        >>> import dask.bag as db
        >>> bool_bag = db.from_sequence([True, True, False])
        >>> bool_bag.any().compute()
        True
        rF  )r'  anyrH  rG   rG   rJ   rK  2  s    
zBag.anyc                 C  s   | j tt|dS )zAre all elements truthy?

        Examples
        --------
        >>> import dask.bag as db
        >>> bool_bag = db.from_sequence([True, True, False])
        >>> bool_bag.all().compute()
        False
        rF  )r'  allrH  rG   rG   rJ   rL  >  s    
zBag.allc                 C  s   | j tt|dS )zCount the number of elements.

        Examples
        --------
        >>> import dask.bag as db
        >>> numbers = db.from_sequence([1, 2, 3])
        >>> numbers.count().compute()
        3
        rF  )r'  r   rG  rH  rG   rG   rJ   r   J  s    
z	Bag.countc                 C  s    dd }dd }| j ||ddS )zArithmetic meanc                 S  s*   d\}}| D ]}||7 }|d7 }q||fS )N)g        r   rV   rG   )seqtotalr  xrG   rG   rJ   
mean_chunkY  s
    
zBag.mean.<locals>.mean_chunkc                 S  s$   t t|  \}}dt| t| S )Ng      ?)rZ   r  rG  )rO  ZtotalscountsrG   rG   rJ   mean_aggregate`  s    z Bag.mean.<locals>.mean_aggregateFrF  )r'  )rI   rP  rR  rG   rG   rJ   meanV  s    zBag.meanr   c                 C  s   | j tjttj|dddS )ZVarianceddofFrF  )r'  r#   Z	var_chunkr   Zvar_aggregaterI   rU  rG   rG   rJ   varf  s
      zBag.varc                 C  s   | j |dtjS )zStandard deviationrT  )rW  r9   mathsqrtrV  rG   rG   rJ   stdl  s    zBag.stdc           	      C  s  dt | ||| }i }t|trb|jdkrT||j | d }t|f|d| < qd}t|nDt|t	r||j |j
}n&t|tr|}ndt|j }t||dkr|}t| jD ]$}tt|||| j|fff|||f< qtj||| gd}t| ||| jS )	a<  Joins collection with another collection.

        Other collection must be one of the following:

        1.  An iterable.  We recommend tuples over lists for internal
            performance reasons.
        2.  A delayed object, pointing to a tuple.  This is recommended if the
            other collection is sizable and you're using the distributed
            scheduler.  Dask is able to pass around data wrapped in delayed
            objects with greater sophistication.
        3.  A Bag with a single partition

        You might also consider Dask Dataframe, whose join operations are much
        more heavily optimized.

        Parameters
        ----------
        other: Iterable, Delayed, Bag
            Other collection on which to join
        on_self: callable
            Function to call on elements in this collection to determine a
            match
        on_other: callable (defaults to on_self)
            Function to call on elements in the other collection to determine a
            match

        Examples
        --------
        >>> import dask.bag as db
        >>> people = db.from_sequence(['Alice', 'Bob', 'Charlie'])
        >>> fruit = ['Apple', 'Apricot', 'Banana']
        >>> list(people.join(fruit, lambda x: x[0]))
        [('Apple', 'Alice'), ('Apricot', 'Alice'), ('Banana', 'Bob')]
        zjoin-rV   r   zjoin-%s-otherzOMulti-bag joins are not implemented. We recommend Dask dataframe if appropriatezRJoined argument must be single-partition Bag,  delayed object, or Iterable, got %sNr   )r*   rv   r   r   updater   r   rZ   NotImplementedErrorr2   _keyr   rY   Z
_Bag__name	TypeErrorr   r   r   r4   r   )	rI   otherZon_selfZon_otherr   rb   msgr   r   rG   rG   rJ   r   p  s4    #




"zBag.joinc                   sn   t tstdt jj }  fddt|D }tj|gd}t||  S )z#Cartesian product between two bags.zproduct-c              	     sD   i | ]<}t  D ].}|  | fttjj|fj|fffqqS rG   )r   rZ   	itertoolsproductr   rS   r   r=  mr   r_  rI   rG   rJ   rg     s   
 zBag.product.<locals>.<dictcomp>r   )	rv   r   r   r*   r   r   r4   r   rY   )rI   r_  r  rb   r   rG   rd  rJ   rb    s    zBag.productc              
     s  |dkrd}|dkrj }t||}d|  |dkr@}tk	rj fddtj D }n fddtj D }ttj|}	d}
j } ||krjt|
 }|tk	rtt	|t|D ]8\}}t
d|	tjttfd	d
|D ff|f|||f< qnDtt	|t|D ]0\}}ttt|ffdd
|D f|||f< q"|d }||
d7 }
qd| }|tk	rtt
d|	tjttfdd
t|D ff|ff||df< n.tttt|ffdd
t|D ff||df< tj||gd}t||dS )a  Combined reduction and groupby.

        Foldby provides a combined groupby and reduce for efficient parallel
        split-apply-combine tasks.

        The computation

        >>> b.foldby(key, binop, init)                        # doctest: +SKIP

        is equivalent to the following:

        >>> def reduction(group):                               # doctest: +SKIP
        ...     return reduce(binop, group, init)               # doctest: +SKIP

        >>> b.groupby(key).map(lambda (k, v): (k, reduction(v)))# doctest: +SKIP

        But uses minimal communication and so is *much* faster.

        >>> import dask.bag as db
        >>> b = db.from_sequence(range(10))
        >>> iseven = lambda x: x % 2 == 0
        >>> add = lambda x, y: x + y
        >>> dict(b.foldby(iseven, add))
        {True: 20, False: 25}

        **Key Function**

        The key function determines how to group the elements in your bag.
        In the common case where your bag holds dictionaries then the key
        function often gets out one of those elements.

        >>> def key(x):
        ...     return x['name']

        This case is so common that it is special cased, and if you provide a
        key that is not a callable function then dask.bag will turn it into one
        automatically.  The following are equivalent:

        >>> b.foldby(lambda x: x['name'], ...)  # doctest: +SKIP
        >>> b.foldby('name', ...)  # doctest: +SKIP

        **Binops**

        It can be tricky to construct the right binary operators to perform
        analytic queries.  The ``foldby`` method accepts two binary operators,
        ``binop`` and ``combine``.  Binary operators two inputs and output must
        have the same type.

        Binop takes a running total and a new element and produces a new total:

        >>> def binop(total, x):
        ...     return total + x['amount']

        Combine takes two totals and combines them:

        >>> def combine(total1, total2):
        ...     return total1 + total2

        Each of these binary operators may have a default first value for
        total, before any other value is seen.  For addition binary operators
        like above this is often ``0`` or the identity element for your
        operation.

        **split_every**

        Group partitions into groups of this size while performing reduction.
        Defaults to 8.

        >>> b.foldby('name', binop, 0, combine, 0)  # doctest: +SKIP

        Examples
        --------

        We can compute the maximum of some ``(key, value)`` pairs, grouped
        by the ``key``. (You might be better off converting the ``Bag`` to
        a ``dask.dataframe`` and using its groupby).

        >>> import random
        >>> import dask.bag as db

        >>> tokens = list('abcdefg')
        >>> values = range(10000)
        >>> a = [(random.choice(tokens), random.choice(values))
        ...       for _ in range(100)]
        >>> a[:2]  # doctest: +SKIP
        [('g', 676), ('a', 871)]

        >>> a = db.from_sequence(a)

        >>> def binop(t, x):
        ...     return max((t, x), key=lambda x: x[1])

        >>> a.foldby(lambda x: x[0], binop).compute()  # doctest: +SKIP
        [('g', ('g', 984)),
         ('a', ('a', 871)),
         ('b', ('b', 999)),
         ('c', ('c', 765)),
         ('f', ('f', 955)),
         ('e', ('e', 991)),
         ('d', ('d', 854))]

        See Also
        --------

        toolz.reduceby
        pyspark.combineByKey
        Nr6  Fz	foldby-a-c                   s&   i | ]} |ft j|ffqS rG   r   r   r   )r9  r)  r$  r   rI   rG   rJ   rg   C  s    zBag.foldby.<locals>.<dictcomp>c                   s$   i | ]} |ft j|ffqS rG   rf  r   )r9  r)  r   rI   rG   rJ   rg   H  s    r   c                   s   g | ]} |fqS rG   rG   r<  r>  rG   rJ   rU   Y  s     zBag.foldby.<locals>.<listcomp>c                   s   g | ]} |fqS rG   rG   r<  r>  rG   rJ   rU   a  s     rV   z	foldby-b-c                   s   g | ]} |fqS rG   rG   r<  r>  rG   rJ   rU   p  s     c                   s   g | ]} |fqS rG   rG   r<  r>  rG   rJ   rU   w  s     r   )r   r*   r  r   r   r#   Zfoldby_combine2r   r   r   r   r   r   r   r/  r   r	   r4   r   rY   )rI   r   r)  r$  r*  Zcombine_initialr%  rA  rb   Zcombine2rC  re   rD  r   rE  er   rG   )r9  r   r)  r$  r   rI   rJ   foldby  sh    t



  z
Bag.foldbyrV   c                   s   |dkr| j }|| j kr*td| j |t| ||}d| }|dkrd|  i }t|D ] }tt|| j|fff| |f< qZtj	 fddt|D f}	t
||	|f||df< n|dft
|| jdf|fi}tj||| gd	}
t|
|d}|rt| S |S d
S )a&  Take the first k elements.

        Parameters
        ----------
        k : int
            The number of elements to return
        npartitions : int, optional
            Elements are only taken from the first ``npartitions``, with a
            default of 1. If there are fewer than ``k`` rows in the first
            ``npartitions`` a warning will be raised and any found rows
            returned. Pass -1 to use all partitions.
        compute : bool, optional
            Whether to compute the result, default is True.
        warn : bool, optional
            Whether to warn if the number of elements returned is less than
            requested, default is True.

        >>> import dask.bag as db
        >>> b = db.from_sequence(range(1_000))
        >>> b.take(3)
        (0, 1, 2)
        z$only {} partitions, take received {}ztake-rV   ztake-partial-c                   s   g | ]} |fqS rG   rG   r   Zname_prG   rJ   rU     s     zBag.take.<locals>.<listcomp>r   r   N)r   r   r   r*   r   rZ   r   r   r   r   	safe_taker4   r   r   r]   r   )rI   re   r   r   warnrA  r   rb   r   r   r   r   rG   rj  rJ   r   }  s0    
 zBag.takec                   sJ   dt    fddtjD }tj |gd}t| jS )zConcatenate nested lists into one long list.

        >>> import dask.bag as db
        >>> b = db.from_sequence([[1], [2, 3]])
        >>> list(b)
        [[1], [2, 3]]

        >>> list(b.flatten())
        [1, 2, 3]
        zflatten-c                   s&   i | ]} |ft tjj|fffqS rG   )rZ   r   r   r   r   r   rI   rG   rJ   rg     s    zBag.flatten.<locals>.<dictcomp>r   )r*   r   r   r4   r   rY   )rI   rb   r   rG   rm  rJ   r-     s    zBag.flattenc                 C  s   t |  S r   )iterr   rH   rG   rG   rJ   __iter__  s    zBag.__iter__   c                 C  s   |dk	rt d|dkr$tdd}|dkrFtdddkrBd}nd}|dkr^t| |||dS |dkrtt| ||d	S d
}t|dS )a  Group collection by key function

        This requires a full dataset read, serialization and shuffle.
        This is expensive.  If possible you should use ``foldby``.

        Parameters
        ----------
        grouper: function
            Function on which to group elements
        shuffle: str
            Either 'disk' for an on-disk shuffle or 'tasks' to use the task
            scheduling framework.  Use 'disk' if you are on a single machine
            and 'tasks' if you are on a distributed cluster.
        npartitions: int
            If using the disk-based shuffle, the number of output partitions
        blocksize: int
            If using the disk-based shuffle, the size of shuffle blocks (bytes)
        max_branch: int
            If using the task-based shuffle, the amount of splitting each
            partition undergoes.  Increase this for fewer copies but more
            scheduler overhead.

        Examples
        --------
        >>> import dask.bag as db
        >>> b = db.from_sequence(range(10))
        >>> iseven = lambda x: x % 2 == 0
        >>> dict(b.groupby(iseven))             # doctest: +SKIP
        {True: [0, 2, 4, 6, 8], False: [1, 3, 5, 7, 9]}

        See Also
        --------
        Bag.foldby
        Nz.The method= keyword has been moved to shuffle=shuffleZ	scheduler)zdask.distributedZdistributedZtasksZdisk)r   	blocksize)
max_branchz!Shuffle must be 'disk' or 'tasks')	Exceptionr"   r   groupby_diskgroupby_tasksr\  )rI   groupermethodr   rr  rs  rq  r`  rG   rG   rJ   r     s&    +   zBag.groupbyc                 C  s   ddl }ddlm} |dkrR| jddd}t|dkr>td|jt||d}n&|dk	rdtdn|jj	|| d	}t|j
}|j }| t||}	|r| |	j|	 }
n|	j}
dg| jd  }||
|	j||S )
a  Create Dask Dataframe from a Dask Bag.

        Bag should contain tuples, dict records, or scalars.

        Index will not be particularly meaningful.  Use ``reindex`` afterwards
        if necessary.

        Parameters
        ----------
        meta : pd.DataFrame, dict, iterable, optional
            An empty ``pd.DataFrame`` that matches the dtypes and column names
            of the output. This metadata is necessary for many algorithms in
            dask dataframe to work.  For ease of use, some alternative inputs
            are also available. Instead of a ``DataFrame``, a ``dict`` of
            ``{name: dtype}`` or iterable of ``(name, dtype)`` can be provided.
            If not provided or a list, a single element from the first
            partition will be computed, triggering a potentially expensive call
            to ``compute``. This may lead to unexpected results, so providing
            ``meta`` is recommended. For more information, see
            ``dask.dataframe.utils.make_meta``.
        columns : sequence, optional
            Column names to use. If the passed data do not have names
            associated with them, this argument provides names for the columns.
            Otherwise this argument indicates the order of the columns in the
            result (any names not found in the data will become all-NA
            columns).  Note that if ``meta`` is provided, column names will be
            taken from there and this parameter is invalid.
        optimize_graph : bool, optional
            If True [default], the graph is optimized before converting into
            :class:`dask.dataframe.DataFrame`.


        Examples
        --------
        >>> import dask.bag as db
        >>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
        ...                       {'name': 'Bob',     'balance': 200},
        ...                       {'name': 'Charlie', 'balance': 300}],
        ...                      npartitions=2)
        >>> df = b.to_dataframe()

        >>> df.compute()
              name  balance
        0    Alice      100
        1      Bob      200
        0  Charlie      300
        r   NrV   F)rl  zm`dask.bag.Bag.to_dataframe` failed to properly infer metadata, please pass in metadata via the `meta` keywordcolumnsz'Can't specify both `meta` and `columns`)Zparent_meta)pandasZdask.dataframeZ	dataframer   r[   r   	DataFramerZ   utilsZ	make_metarz  dtypesZto_dictr  to_dataframer   r   r   r   r   )rI   metarz  r   pdddr_   colsr~  dfsrb   Z	divisionsrG   rG   rJ   r    s(    0


zBag.to_dataframec                   s`   ddl m  |  }|  | j|rJ| |d tjdd fdd|D S )aO  Convert into a list of ``dask.delayed`` objects, one per partition.

        Parameters
        ----------
        optimize_graph : bool, optional
            If True [default], the graph is optimized before converting into
            ``dask.delayed`` objects.

        See Also
        --------
        dask.bag.from_delayed
        r   r   zdelayed-rG   r   c                   s   g | ]} |d qS )r   rG   )rS   re   r2   rb   r   rG   rJ   rU   m  s     z"Bag.to_delayed.<locals>.<listcomp>)r   r2   r   r   r   r   r4   r   )rI   r   rl   rG   r  rJ   r   W  s    zBag.to_delayedc                 C  sH   t |dk	|dk	gdkr td|dk	r2t| |S |dk	rDt| |S dS )a  Repartition Bag across new divisions.

        Parameters
        ----------
        npartitions : int, optional
            Number of partitions of output.
        partition_size : int or string, optional
            Max number of bytes of memory for each partition. Use numbers or
            strings like 5MB.

            .. warning::

               This keyword argument triggers computation to determine
               the memory size of each partition, which may be expensive.

        Notes
        -----
        Exactly one of ``npartitions`` or ``partition_size`` should be specified.
        A ``ValueError`` will be raised when that is not the case.

        Examples
        --------
        >>> b.repartition(5)  # set to have 5 partitions  # doctest: +SKIP
        NrV   zRPlease provide exactly one ``npartitions`` or ``partition_size`` keyword arguments)rG  r   repartition_npartitionsrepartition_size)rI   r   partition_sizerG   rG   rJ   repartitiono  s    
zBag.repartitionc                 C  s   t | ||}t|}| d| }| d| }| d| }|dft|| jdf|df|dft|dff|dft|dffi}td| jD ]N}	t|| j|	f||	d ff|||	f< t||	ff|||	f< t||	ff|||	f< qtj	||| gd}
t
|
|| jS )a{  Repeatedly apply binary function to a sequence, accumulating results.

        This assumes that the bag is ordered.  While this is typically the case
        not all Dask.bag functions preserve this property.

        Examples
        --------
        >>> import dask.bag as db
        >>> from operator import add
        >>> b = db.from_sequence([1, 2, 3, 4, 5], npartitions=2)
        >>> b.accumulate(add).compute()
        [1, 3, 6, 10, 15]

        Accumulate also takes an optional argument that will be used as the
        first value.

        >>> b.accumulate(add, initial=-1).compute()
        [-1, 0, 2, 5, 9, 14]
        r7  z-first-z-second-r   TrV   r   )r*   r>   accumulate_partr   r   r   r   r   r4   r   r   )rI   r)  r$  rA  Z
binop_namer9  r   rD  rb   r   r   rG   rG   rJ   r     s$      
 
"zBag.accumulate)N)NNr  r  NT)NF)NN)N)N)N)N)N)N)N)r   )r   )N)rV   TT)NNrp  NN)NNT)T)NN)Br   r   r   r   r   r   r   r   r   r,   rs   r'   r   r   r   r   r   r   r   r   __repr__r   r   r   r   r  r   r   r   r   r  r   r  r  r   r  r
   r   rC   r%   r   r+  r   r   r3  r'  rG  rI  rJ  rK  rL  r   rS  rW  rZ  r   rb  rh  r   r-   ro  r   r  r   r  r   rG   rG   rG   rJ   r     s   
=:

#+         
C


  
F








E
 9
7     
>
O

"r   c                 C  sX   |t krtt| |}ntt| ||d}|rD||r<|d ng |fS |dd  |d fS )Nr#  ri  rV   )r  rZ   r   )r)  rM  r$  Zis_firstresrG   rG   rJ   r    s    r  rp  c           
      C  s`   t ||D ]P}t| |}tt}| D ]"\}}	|tt||  |	 q(|j|dd q
|S )z:Partition a bag along a grouper, store partitions on disk.T)fsync)	r   r   r   rZ   rj   abshashr  append)
rw  sequencer   pZ	nelementsblockr   Zd2re   rf   rG   rG   rJ   	partition  s    
r  c                 C  s    t | |j|dd}t| S )z7Collect partitions from disk and yield k,v group pairs.F)lock)r   r   rZ   rj   )rw  groupr  barrier_tokenr   rG   rG   rJ   collect  s    r  c                   s   t | } |r&|s&ttt| | }|dkrX|dkrXt| dk rHd}ntt| d }t t|| }dt| |  t|dkr fddt|D }n dfg i}t| t|S )a"  Create a dask Bag from Python sequence.

    This sequence should be relatively small in memory.  Dask Bag works
    best when it handles loading your data itself.  Commonly we load a
    sequence of filenames into a Bag and then use ``.map`` to open them.

    Parameters
    ----------
    seq: Iterable
        A sequence of elements to put into the dask
    partition_size: int (optional)
        The length of each partition
    npartitions: int (optional)
        The number of desired partitions

    It is best to provide either ``partition_size`` or ``npartitions``
    (though not both.)

    Examples
    --------
    >>> import dask.bag as db
    >>> b = db.from_sequence(['Alice', 'Bob', 'Chuck'], partition_size=2)

    See Also
    --------
    read_text: Create bag from text files
    Nd   rV   zfrom_sequence-r   c                   s   i | ]\}} |ft |qS rG   )rZ   )rS   r   partr   rG   rJ   rg     s      z!from_sequence.<locals>.<dictcomp>)	rZ   r   rX  ceilr[   r   r*   r   r   )rM  r  r   partsr   rG   r   rJ   from_sequence  s    r  c                 C  sX   t | tr| g} dt j }i }t| D ]\}}tt|ff|||f< q*t||t	| S )aJ  Create a dask Bag from a url.

    Examples
    --------
    >>> a = from_url('http://raw.githubusercontent.com/dask/dask/main/README.rst')
    >>> a.npartitions
    1

    >>> a.take(8)  # doctest: +SKIP
    (b'Dask\n',
     b'====\n',
     b'\n',
     b'|Build Status| |Coverage| |Doc Status| |Discourse| |Version Status| |NumFOCUS|\n',
     b'\n',
     b'Dask is a flexible parallel computing library for analytics.  See\n',
     b'documentation_ for more information.\n',
     b'\n')

    >>> b = from_url(['http://github.com', 'http://google.com'])
    >>> b.npartitions
    2
    z	from_url-)
rv   r   r   r   r   r   rZ   r   r   r[   )urlsr   rb   r   urG   rG   rJ   from_url  s    
r  c                 C  s   t |  S )zSA pickleable version of dict.items

    >>> dictitems({'x': 1})
    [('x', 1)]
    )rZ   rj   )r   rG   rG   rJ   r/  !  s    r/  c                   sJ   dt |   td  fdd| D }tj|| d}t|t|S )zConcatenate many bags together, unioning all elements.

    >>> import dask.bag as db
    >>> a = db.from_sequence([1, 2, 3])
    >>> b = db.from_sequence([4, 5, 6])
    >>> c = db.concat([a, b])

    >>> list(c)
    [1, 2, 3, 4, 5, 6]
    zconcat-r   c                   s(   i | ] }|  D ]}t f|qqS rG   )r   next)rS   r   r   counterr   rG   rJ   rg   7  s
     
  
 zconcat.<locals>.<dictcomp>r   )r*   ra  r   r4   r   r   r[   bagsrb   r   rG   r  rJ   r   *  s
    
r   c                 C  s:   t | trt| } t| r6t | d tr6ttt| } | S r   )rv   r   rZ   r[   r   )rM  rG   rG   rJ   r\   <  s
    
r\   c                   s   ddl m m t|  r | g}  fdd| D } dt|   fddtt| D }dd | D }tt||}t	j
|| d}t|t| S )	aW  Create bag from many dask Delayed objects.

    These objects will become the partitions of the resulting Bag.  They should
    evaluate to a ``list`` or some other concrete sequence.

    Parameters
    ----------
    values: list of delayed values
        An iterable of dask Delayed objects.  Each evaluating to a list.

    Returns
    -------
    Bag

    Examples
    --------
    >>> x, y, z = [delayed(load_sequence_from_file)(fn)
    ...             for fn in filenames] # doctest: +SKIP
    >>> b = from_delayed([x, y, z])  # doctest: +SKIP

    See also
    --------
    dask.delayed
    r   r   c                   s,   g | ]$}t | s$t|d r$|n|qS r   )rv   r   rS   rf   r   rG   rJ   rU   a  s   z from_delayed.<locals>.<listcomp>zbag-from-delayed-c                   s   g | ]} |fqS rG   rG   r   r   rG   rJ   rU   g  s     c                 S  s   g | ]}t |jfqS rG   )r\   r   r  rG   rG   rJ   rU   h  s     r   )r   r2   r   rv   r*   r   r[   dictr  r4   r   r   )valuesnamesZvalues2rb   r   rG   )r2   r   r   rJ   r   D  s    
r   c                 C  s.   |d k	rt |sttj|d}tt| |dS Nr   )r2  r   r#   getitemrZ   r    )rM  r   rG   rG   rJ   r4  o  s    r4  c                 C  s   t t| |dS r  )r4  r   r   )seqsr   rG   rG   rJ   r5  u  s    r5  c                 C  sz   t | trt| } | si S | d | dd   }}|s8|S tt}|| |D ]&}| D ]\}}||  |7  < qZqN|S )Nr   rV   )rv   r   rZ   r   r   r[  rj   )r  r   restr   r   re   rf   rG   rG   rJ   r.  y  s    

r.  c                   s~    | d |f t tt|td } fdd|D } | dkrr|d \}}tt| ff||f< t||S )zNumbers from zero to n

    Examples
    --------

    >>> import dask.bag as db
    >>> b = db.range(5, npartitions=2)
    >>> list(b)
    [0, 1, 2, 3, 4]
    zrange-%d-npartitions-%dr   c              	     s.   i | ]&\}}|ft t|t|  ffqS rG   )r\   r   rJ  rc  r  r   sizerG   rJ   rg     s      zbag_range.<locals>.<dictcomp>ri  )rZ   r   r   r   r\   r   )r  r   Zijsrb   r   r=  rG   r  rJ   	bag_range  s    r  c                    sd    d j tfdd D s$tdt    fddtD }tj| d}t|S )a%  Partition-wise bag zip

    All passed bags must have the same number of partitions.

    NOTE: corresponding partitions should have the same length; if they do not,
    the "extra" elements from the longer partition(s) will be dropped.  If you
    have this case chances are that what you really need is a data alignment
    mechanism like pandas's, and not a missing value filler like zip_longest.

    Examples
    --------

    Correct usage:

    >>> import dask.bag as db
    >>> evens = db.from_sequence(range(0, 10, 2), partition_size=4)
    >>> odds = db.from_sequence(range(1, 10, 2), partition_size=4)
    >>> pairs = db.zip(evens, odds)
    >>> list(pairs)
    [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)]

    Incorrect usage:

    >>> numbers = db.range(31, npartitions=1)
    >>> fizz = numbers.filter(lambda n: n % 3 == 0)
    >>> buzz = numbers.filter(lambda n: n % 5 == 0)
    >>> fizzbuzz = db.zip(fizz, buzz)
    >>> list(fizzbuzz)
    [(0, 0), (3, 5), (6, 10), (9, 15), (12, 20), (15, 25), (18, 30)]

    When what you really wanted was more along the lines of the following:

    >>> list(fizzbuzz) # doctest: +SKIP
    (0, 0), (3, None), (None, 5), (6, None), (9, None), (None, 10),
    (12, None), (15, 15), (18, None), (None, 20),
    (21, None), (24, None), (None, 25), (27, None), (30, 30)
    r   c                 3  s   | ]}|j  kV  qd S r   r   rS   r   r  rG   rJ   rX     s     zbag_zip.<locals>.<genexpr>zzip-c              	     s2   i | ]*  ft tft fd dD  fqS )c                 3  s   | ]}|j  fV  qd S r   r   r  r   rG   rJ   rX     s     z%bag_zip.<locals>.<dictcomp>.<genexpr>)r\   r  r]   rS   )r  r   r  rJ   rg     s    zbag_zip.<locals>.<dictcomp>r   )r   rL  r   r*   r   r4   r   r   r  rG   )r  r   r   rJ   bag_zip  s    &
r  c                 C  s,   |rt | f|} dd |D }t| ||dS )a  Map ``f`` across one or more iterables, maybe with keyword arguments.

    Low-level function used in ``bag_map``, not user facing.

    Arguments
    ---------
    f : callable
    iters : List[Iterable]
    iter_kwarg_keys : List[str] or None
        Keyword names to use for pair with the tail end of ``iters``, allowing
        keyword arguments to be passed in from iterators.
    kwargs : dict or None
        Additional constant keyword arguments to use on every call to ``f``.
    c                 S  s   g | ]}t |qS rG   )rn  rS   r9  rG   rG   rJ   rU     s     zmap_chunk.<locals>.<listcomp>)
kwarg_keys)r   	_MapChunk)r|   itersiter_kwarg_keysro   rG   rG   rJ   	map_chunk  s    r  c                   @  s&   e Zd ZdddZdd Zdd ZdS )	r  Nc                 C  s&   || _ || _|pd| _t| j| _d S r   )r|   r  r  r[   nkws)rI   r|   r  r  rG   rG   rJ   r     s    
z_MapChunk.__init__c                 C  s|   zdd | j D }W n tk
r2   |    Y nX | jrr|d | j  }tt| j|| j d  }| j||S | j| S )Nc                 S  s   g | ]}t |qS rG   )r  r   rG   rG   rJ   rU     s     z&_MapChunk.__next__.<locals>.<listcomp>)r  StopIterationcheck_all_iterators_consumedr  r  r  r  r|   )rI   valsr   ro   rG   rG   rJ   __next__  s    z_MapChunk.__next__c              	   C  sZ   t | jdkrV| jD ]@}t|tjr&qzt| W n tk
rF   Y qX d}t|qd S )NrV   zmap called with multiple bags that aren't identically partitioned. Please ensure that all bag arguments have the same partition lengths)r[   r  rv   ra  repeatr  r  r   )rI   r   r`  rG   rG   rJ   r    s    
z&_MapChunk.check_all_iterators_consumed)N)r   r   r   r   r  r  rG   rG   rG   rJ   r    s   
r  c                 C  s   |rt | f|} t| |S r   )r   ra  r  )r|   rO  ro   rG   rG   rJ   r    s    r  c                 C  sl   i }g }|   D ]4\}}t|\}}|s2|||< q|||< || q|rdttt|t| ff}||fS )zExtracts dask values from kwargs.

    Currently only ``dask.bag.Item`` and ``dask.delayed.Delayed`` are
    supported.  Returns a merged dask graph and a task resulting in a keyword
    dict.
    )rj   r3   r  r  r  rZ   r  )ro   Zkwargs2rm   re   rf   vvr  rG   rG   rJ   r    s    
r  c                   s  d ttdf||g }g }g  |D ]`}t|trR||  | q.t|ttfr~||  tj	|j
f q. tj	|f q.i i | D ].\}}t|tr||< || q||< qt\}|| |stddd |D }	t|	dkrtd|	 }	 fdd	r<tnd
fddt|	D }
ttt|}t|dkr| nt}tj|
|| d}|||	S )a]  Apply a function elementwise across one or more bags.

    Note that all ``Bag`` arguments must be partitioned identically.

    Parameters
    ----------
    func : callable
    *args, **kwargs : Bag, Item, Delayed, or object
        Arguments and keyword arguments to pass to ``func``. Non-Bag args/kwargs
        are broadcasted across all calls to ``func``.

    Notes
    -----
    For calls with multiple `Bag` arguments, corresponding partitions should
    have the same length; if they do not, the call will error at compute time.

    Examples
    --------
    >>> import dask.bag as db
    >>> b = db.from_sequence(range(5), npartitions=2)
    >>> b2 = db.from_sequence(range(5, 10), npartitions=2)

    Apply a function to all elements in a bag:

    >>> db.map(lambda x: x + 1, b).compute()
    [1, 2, 3, 4, 5]

    Apply a function with arguments from multiple bags:

    >>> from operator import add
    >>> db.map(add, b, b2).compute()
    [5, 7, 9, 11, 13]

    Non-bag arguments are broadcast across all calls to the mapped function:

    >>> db.map(add, b, 1).compute()
    [1, 2, 3, 4, 5]

    Keyword arguments are also supported, and have the same semantics as
    regular arguments:

    >>> def myadd(x, y=0):
    ...     return x + y
    >>> db.map(myadd, b, y=b2).compute()
    [5, 7, 9, 11, 13]
    >>> db.map(myadd, b, y=1).compute()
    [1, 2, 3, 4, 5]

    Both arguments and keyword arguments can also be instances of
    ``dask.bag.Item`` or ``dask.delayed.Delayed``. Here we'll add the max value
    in the bag to each element:

    >>> db.map(myadd, b, b.max()).compute()
    [4, 5, 6, 7, 8]
    r   r   $At least one argument must be a Bag.c                 S  s   h | ]
}|j qS rG   r  rS   r   rG   rG   rJ   ri     s     zbag_map.<locals>.<setcomp>rV   1All bags must have the same number of partitions.c                   s6    fddD }r2|  fdd D  |S )Nc                   s$   g | ]}t |tr|j fn|qS rG   rv   r   r   r  r  rG   rJ   rU     s     z0bag_map.<locals>.build_iters.<locals>.<listcomp>c                 3  s   | ]}|j  fV  qd S r   r   r  r  rG   rJ   rX     s     z/bag_map.<locals>.build_iters.<locals>.<genexpr>)r  r  )r  r   )args2
bag_kwargsr  rJ   build_iters  s    zbag_map.<locals>.build_itersNc              	     s(   i | ] }|ft t |ffqS rG   )r\   r  rS   r  )r  r   r  r   other_kwargsrG   rJ   rg     s
   zbag_map.<locals>.<dictcomp>r   )r   r>   r*   rv   r   r  r   r2   ra  r  r   rj   r  r  r   r[   r@  rZ   r   r   r   rY   r4   r   )r   r   ro   rm   r  r9  re   rf   r  r   rb   return_typer   rG   )r  r  r  r   r  r   r  rJ   r   )  sL    8






	r   c                   sL  | ddptdtdf||g }g  g }|D ]T}t|trb||  | q>t|ttfr |j	 || q> | q>i i |
 D ].\}}t|tr||< || q||< qt\}|| |stddd |D }	t|	dkrtd	|	  }	 fd
dfddr`fddt|	D }
ng }i } D ]@}t|tr||jdg |jf||j< n||dg qlrttrd d d d dd  tdf||d|d}
ttt|}t|dkr(|  nt}tj|
|| d}|||	S )a  Apply a function to every partition across one or more bags.

    Note that all ``Bag`` arguments must be partitioned identically.

    Parameters
    ----------
    func : callable
    *args, **kwargs : Bag, Item, Delayed, or object
        Arguments and keyword arguments to pass to ``func``.

    Examples
    --------
    >>> import dask.bag as db
    >>> b = db.from_sequence(range(1, 101), npartitions=10)
    >>> def div(nums, den=1):
    ...     return [num / den for num in nums]

    Using a python object:

    >>> hi = b.max().compute()
    >>> hi
    100
    >>> b.map_partitions(div, den=hi).take(5)
    (0.01, 0.02, 0.03, 0.04, 0.05)

    Using an ``Item``:

    >>> b.map_partitions(div, den=b.max()).take(5)
    (0.01, 0.02, 0.03, 0.04, 0.05)

    Note that while both versions give the same output, the second forms a
    single graph, and then computes everything at once, and in some cases
    may be more efficient.
    rA  Nr   zmap-partitionsr  c                 S  s   h | ]
}|j qS rG   r  r  rG   rG   rJ   ri     s     z!map_partitions.<locals>.<setcomp>rV   r  c                   s    fddD S )Nc                   s$   g | ]}t |tr|j fn|qS rG   r  r  r  rG   rJ   rU     s     z6map_partitions.<locals>.build_args.<locals>.<listcomp>rG   r  )r  r  rJ   
build_args  s    z"map_partitions.<locals>.build_argsc                   s,   si S t tt fdd D ffS )Nc                   s   g | ]}|j  fqS rG   r   r  r  rG   rJ   rU     s     z<map_partitions.<locals>.build_bag_kwargs.<locals>.<listcomp>)r  r  rZ   r  r  )r  r  rJ   build_bag_kwargs  s
    z(map_partitions.<locals>.build_bag_kwargsc              	     s,   i | ]$}|ft  |t|ffqS rG   )r9   r   r  )r  r  r   r   r  rG   rJ   rg     s   z"map_partitions.<locals>.<dictcomp>r   r   T)	numblocksZconcatenaterm   r   )r@  r>   r   r*   rv   r   r  r   r2   r   rj   r  r  r   r[   r   r   r   r]   r+   r   r   rY   r4   r   )r   r   ro   r  rm   r9  re   rf   r  r   rb   pairsr  rT   r  r   rG   )r  r  r  r  r   r   r  rJ   r    sx    #





$r  c                 C  s"   |t k	rt| ||S t| |S d S r   )r  r	   )r)  r  r$  rG   rG   rJ   r(  "	  s    r(  c                   s    fdd}|S )Nc                   s   | d      S r   rG   )rO  re   stagerG   rJ   h*	  s    zmake_group.<locals>.hrG   )re   r  r  rG   r  rJ   
make_group)	  s    r      c              	     s&  |pd}| j }ttt|t| p.ddkrPtt|d  n|g }g }g }fddt D }| ttj	| d}	t
|  ||}
d|
 d|
 }d|
 i }t|D ]\}}i }i }|| j k r|	j|f|d	|f< ng |d	|f< tdd D ]\||f}ttd fd |ff||< tD ] }tj||i f|||f< qHq|| || qtdd D ](fd
d|D }|| qd|
  fddt|D }|| || t||f| }tj||	gd}t| |t|S )Nr  rV   c                   s(   g | ]  t  fd dtD qS )c                 3  s   | ]}t  |V  qd S r   )r:   r<  )r   re   rG   rJ   rX   >	  s     z+groupby_tasks.<locals>.<listcomp>.<genexpr>r  r  )re   stagesr  rJ   rU   >	  s     z!groupby_tasks.<locals>.<listcomp>)r  rw  zshuffle-join-zshuffle-group-zshuffle-split-r   c                   s8   i | ]0  ft tj fd dtD ffqS )c              	     s,   g | ]$} d   t  d  |fqS rV   )r?   r<  )inpshuffle_split_namer  rG   rJ   rU   k	  s   
z,groupby_tasks.<locals>.<dictcomp>.<listcomp>)rZ   r   r   r   r  )re   shuffle_join_namer  r  )r  rJ   rg   f	  s   z!groupby_tasks.<locals>.<dictcomp>zshuffle-c              
     s0   i | ](\}}|ft tjt td |ffffqS r  )rZ   r  rj   r   r   rc  )rw  r   rG   rJ   rg   }	  s    r   )r   r   rX  r  logr   r   r   r#   Zgroupby_tasks_group_hashr*   r   r   r   r  r  r   r  r  r   r4   r   rY   r[   )r   rw  r  rs  r  groupssplitsZjoinsinputsZb2rA  Zshuffle_group_namerW   idxr  r  splitZ
_key_tupler   r   endrb   r   rG   )rw  re   r   r  r  r  r  rJ   rv  0	  sd    "




rv  c                   s<  d kr j t }dd l}d| ftdd }|rRt|jdd|if}n|jf}z|j|j|ffi}W n" t	k
r   |j|fi}Y nX dt
 d|  fdd	t j D }	d
| tjft|	 i}
d| fdd	tD }t||	|
|}tj| gd}t |S )Nr   zpartd-Ztemporary_directoryrG   r   zgroupby-part-r	  c              	     s(   i | ] }|ft  j|ffqS rG   )r  r   r   )r   rr  rw  r   r   r  rG   rJ   rg   	  s    z groupby_disk.<locals>.<dictcomp>zgroupby-barrier-zgroupby-collect-c                   s    i | ]}|ft | fqS rG   )r  r   )r  rw  r   r  rG   rJ   rg   	  s     r   )r   r*   partdr"   r   r9   ZFilePythonZSnappyr   r>   r   r#   Zbarrierr]   r   r4   r   rY   )r   rw  r   rr  rA  r  dirnamefileZdsk1rp   rq   rr   rb   r   rG   )r   r  rr  rw  r   r   r  rJ   ru  	  s4    
ru  c                 C  sd   t |trDzt|\}}W n  tk
r:   |s6t Y S Y nX | |S |sXt|dkrXtS | |S d S r   )rv   r   r   r  rF   r[   )r   r  r:  r   rG   rG   rJ   r8  	  s    
r8  c                 C  s   dd |D }t | ||S )Nc                 s  s   | ]}|t k	r|V  qd S r   )rF   )rS   r  rG   rG   rJ   rX   	  s      z'empty_safe_aggregate.<locals>.<genexpr>)r8  )r   r  r:  parts2rG   rG   rJ   r?  	  s    r?  c                 C  s>   t t| |}t|| kr:|r:td|  dt| d |S )Nz"Insufficient elements for `take`. z elements requested, only z@ elements available. Try passing larger `npartitions` to `take`.)rZ   r   r[   warningsrl  )r  r   rl  rrG   rG   rJ   rk  	  s    rk  c                 c  s0   t  }|| | D ]}| |k r|V  qdS )a7  Filter elements of `x` by a probability `prob`.

    Parameters
    ----------
    x : iterable
    state_data : tuple
        A tuple that can be passed to ``random.Random.setstate``.
    prob : float
        A float between 0 and 1, representing the probability that each
        element will be yielded.
    N)r   setstaterandom)rO  r  r  r  r   rG   rG   rJ   r  	  s
    
r  c                   s.   t tstd  fddt| D S )a  Return a list of tuples that can be passed to
    ``random.Random.setstate``.

    Parameters
    ----------
    n : int
        Number of tuples to return.
    random_state : int or ``random.Random``, optional
        If an int, is used to seed a new ``random.Random``.
    l        c                   s2   g | ]*}d t  fddtdD d dfqS )   c                 3  s   | ]} d  V  qdS )r   N)randintr   Z	maxuint32r  rG   rJ   rX   	  s     z6random_state_data_python.<locals>.<listcomp>.<genexpr>p  )r  Nr  r   r  rG   rJ   rU   	  s
   z,random_state_data_python.<locals>.<listcomp>)rv   r   r   )r  r  rG   r  rJ   r  	  s    
r  c                   s`   t ttfstt|   fddt|d D }|t |d  d  |S )zvSplit apart a sequence into n equal pieces.

    >>> split(range(10), 3)
    [[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
    c                   s,   g | ]$}t  | t  |d    qS r  r   r   r  rM  rG   rJ   rU   
  s     zsplit.<locals>.<listcomp>rV   N)rv   rZ   r]   r[   r   r  r   )rM  r  LrG   r  rJ   r  	  s    r  c                 C  sB   dd l }t| } t| ts"t| } |j| t|d}|j|ddS )Nr   ry  F)copy)r{  r\   rv   rZ   r|  Zastype)rM  rz  r~  r  r  rG   rG   rJ   r  
  s    
r  c                   s   || j kr| S d|t| |f }| j |krZ| j |   fddt|d D }t| ||S t|| j \}}|g| j  }|d  |7  < t| ||S dS )zChanges the number of partitions of the bag.

    This can be used to reduce or increase the number of partitions
    of the bag.
    zrepartition-%d-%sc                   s   g | ]}t |  qS rG   r  rS   Zold_partition_indexZratiorG   rJ   rU   "
  s   z+repartition_npartitions.<locals>.<listcomp>rV   ri  N)r   r*   r   _repartition_from_boundariesdivmod_split_partitions)r   r   new_namenew_partitions_boundariesdivmodnsplitsrG   r  rJ   r  
  s    




r  c                 C  s6   ddl m} ddlm} t| |r.t|| } t| S )Nr   )deepcopy)chain)r  r  ra  r  rv   r\   r8   )r  r  r  rG   rG   rJ   total_mem_usage.
  s
    
r   c                   s   t  trt  t  | t } fdd|D }tdd |D rdt|   }t	| ||} g }t
||D ]\}}||| g|  qv|}t fdd|D sttttt| }ttj|}	dt|   }
t| |	|
S )z\
    Repartition bag so that new partitions have approximately `size` memory usage each
    c                   s   g | ]}d |   qS r  rG   rS   Z	mem_usager  rG   rJ   rU   C
  s     z$repartition_size.<locals>.<listcomp>c                 s  s   | ]}|d kV  qdS )rV   NrG   )rS   ZnsplitrG   rG   rJ   rX   D
  s     z#repartition_size.<locals>.<genexpr>zrepartition-split-c                 3  s   | ]}| kV  qd S r   rG   r  r  rG   rJ   rX   N
  s     zrepartition-)rv   r   rB   r   r  r   r   rK  r*   r  r  r  rL  r   rZ   r   r[   r@   r   r   addr  )r   r  Z
mem_usagesr  
split_nameZsplit_mem_usagesr  usageZnew_npartitionsr  r  rG   r  rJ   r  9
  s"    
r  c           
      C  s   t || jkrtd| j i }dt| | }d}t|D ]r\}}|dkrj| j|f|||f< |d7 }q>t| j|f|f|||f< t|D ]$}tj	||f|f|||f< |d7 }qq>t
j||| gd}	t|	|t|dS )aD  Split a Dask bag into new partitions

    Parameters
    ----------
    bag: Dask bag
    nsplits: List[int]
        Number of target bags for each partition
        The length of nsplits should be the same as bag.npartitions
    new_name: str

    See Also
    --------
    repartition_npartitions
    repartition_size
    znsplits should have len=zsplit-r   rV   r   r   r   )r[   r   r   r*   r   r   r  r   r   r  r4   r   r   rG  )
r   r  r  rb   r  r=  r   re   Zjjr   rG   rG   rJ   r  U
  s    
r  c              	     s   t |tst|}|d dkr*|dd |d  jk rD| j t|d }i }t|D ]>}ttj fddt|| ||d  D ff}||||f< q\t	j
|| gd}t|||dS )Nr   ri  rV   c                   s   g | ]} j |fqS rG   r   r  r   rG   rJ   rU   
  s   z0_repartition_from_boundaries.<locals>.<listcomp>r   r  )rv   rZ   r?   r   r  r[   r   r   r   r4   r   r   )r   r  r  Znum_new_partitionsrb   Znew_partition_indexr   r   rG   r  rJ   r  x
  s,    


r  )T)N)NN)F)rp  )NN)N)N)NN)Nrp  )T)N)
__future__r   rw   ra  rX  r   r   r  r  r   collections.abcr   r   r   r   	functoolsr   r	   r
   r  r   urllib.requestr   Ztlzr   Zfsspec.corer   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   r   r"   Zdask.bagr#   Zdask.bag.avror%   Z	dask.baser&   r'   r(   r)   r*   Zdask.blockwiser+   Zdask.contextr,   Z	dask.corer-   r.   r/   r0   r1   r   r2   r3   Zdask.highlevelgraphr4   Zdask.optimizationr5   r6   r7   Zdask.sizeofr8   Z
dask.utilsr9   r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   rD   r   r   r  rY   r   rF   rQ   rc   rn   rs   r   r   r   r   r   r   r   r   r  r  r  r  r  r/  r   r\   r   r4  r5  r.  r  r  r  r  r  r  r   r  r(  r  r  rv  ru  r8  r?  rk  r  r  r  r  r  r   r  r  r  rG   rG   rG   rJ   <module>   s   X8  



]9
X          



/ 	+

3
&w Z
)


#