U
    /e~                 
   @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
Zd dlZd dlmZ d dlmZmZ d dlmZmZmZ d dlmZmZmZmZmZmZmZmZm Z m!Z! d dl"m#Z# d d	l$m%Z%m&Z& d d
l'm(Z( d dl)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5m6Z6m7Z7m8Z8 er:d dl9m:Z:m;Z; dZ<dd Z=dd Z>dd Z?dd Z@dd ZAdd ZBedddddZCeddddd ZDeddfd!d"ZEd#d$ ZFG d%d& d&ZGdd(d)ZHdd*d+ZIddd,d-d.ZJddd,d/d0ZKd1d2 ZLdd3d4ZMdd5d6ZNd7d8 ZOdd9d:ZPd;d< ZQd=d> ZRdd?d@ZSdAdB ZTdCdD ZUddEdFZVddGdHZWdIdJ ZXdKdL ZYdMdN ZZdOdP Z[dQdR Z\dSdT Z]dUdV Z^dWdX Z_dYdZ Z`d[d\ Zad]d^ Zbd_d` Zcdadb ZdddcddZededf Zfdgdh Zgdidj Zhdkdl Zidmdn Zjdodp Zkdqdr ZlddsdtZmdudv ZnddwdxZoG dydz dzZpG d{d| d|epZqG d}d~ d~epZrdddZsdd Ztdd Zudd Zvdd Zwdd Zxdd Zydd ZzdddZ{dS )    N)partial)Integral)config)is_dask_collectiontokenize)PANDAS_GT_140PANDAS_GT_150check_numeric_only_deprecation)
GROUP_KEYS_DEFAULT	DataFrameSeries_extract_meta_Frameacamap_partitionsnew_dd_object
no_defaultsplit_out_on_index)grouper_dispatch)concatdrop_columns)shuffle)PANDAS_GT_110insert_meta_param_descriptionis_dataframe_likeis_index_likeis_series_like	make_metaraise_on_meta_error)HighLevelGraph)M_deprecatedderived_fromfuncname
itemgetter)reconstruct_funcvalidate_func_kwargszIn the future, `sort` for groupby operations will default to `True` to match the behavior of pandas. However, `sort=True` does not work with `split_out>1`. To retain the current behavior for multiple output partitions, set `sort=False`.c                 C   s2   t | ttfr*t| dkr*ttt| S dS dS )z1Determine the correct levels argument to groupby.   r   N)
isinstancetuplelistlenrangeby r/   :/tmp/pip-unpacked-wheel-dbjnr7gq/dask/dataframe/groupby.py_determine_levelsY   s    r1   c                 C   s,   | dkr(|dkr$| p"t ddp"dS dS | S )z9Determine the default shuffle behavior based on split_outNr'   r   tasksF)r   get)r   	split_outr/   r/   r0   _determine_shufflea   s
    r5   c                    s   t  ts|S t |tr* fdd|D S t|rV|j jkrV|j |j jkrV|jS t |trt|j jr|j |j jkrt|jS |S dS )z3Replace series with column names wherever possible.c                    s   g | ]}t  |qS r/   )_normalize_by.0coldfr/   r0   
<listcomp>q   s     z!_normalize_by.<locals>.<listcomp>N)	r(   r   r*   r   namecolumns_namesetissubsetr;   r.   r/   r:   r0   r6   k   s    

&
r6   c                 C   sB   dt | j kr>|dk	r>t|ttttjfr6t|}| | S | S )z9
    Slice columns if grouped is pd.DataFrameGroupBy
    groupbyN)	type__name__lowerr(   r)   r*   r@   pdIndex)groupedr>   r/   r/   r0   _maybe_slice   s    rJ   c                    sJ   t |st|r j|jS t|ttfrBt fdd|D S dS dS )z/Check if ``df`` and ``by`` have aligned indicesc                 3   s   | ]}t  |V  qd S N)_is_alignedr8   ir:   r/   r0   	<genexpr>   s     z_is_aligned.<locals>.<genexpr>TN)r   r   indexequalsr(   r*   r)   allrB   r/   r:   r0   rL      s
    rL   c                 K   sh   | dd}|dk	r,t| |s,d}t|n0|dk	r\t|r\t|trL|g}|jt|d | jf |S )a  Groupby, but raise if df and `by` key are unaligned.

    Pandas supports grouping by a column that doesn't align with the input
    frame/series/index. However, the reindexing does not seem to be
    threadsafe, and can result in incorrect results. Since grouping by an
    unaligned key is generally a bad idea, we just error loudly in dask.

    For more information see pandas GH issue #15244 and Dask GH issue #1876.r.   Na  Grouping by an unaligned column is unsafe and unsupported.
This can be caused by filtering only one of the object or
grouping key. For example, the following works in pandas,
but not in dask:

df[df.foo < 0].groupby(df.bar)

This can be avoided by either filtering beforehand, or
passing in the name of the column instead:

df2 = df[df.foo < 0]
df2.groupby(df2.bar)
# or
df[df.foo < 0].groupby('bar')

For more information see dask GH issue #1876.r-   )	r3   rL   
ValueErrorr+   r(   strupdater*   rC   )r;   kwargsr.   msgr/   r/   r0   _groupby_raise_unaligned   s    	

rX   )
group_keysdropnaobservedc          
      O   s^   |d k	rd|ini }|d k	r$d|ini }| j |fd|i||}	|rL|	| }	|	j|f||S NrZ   r[   rY   )rC   apply
r;   grouperkeyfuncrY   rZ   r[   argsrV   gr/   r/   r0   _groupby_slice_apply   s    rd   c          
      O   s|   |d k	rd|ini }|d k	r$d|ini }| j |fd|i||}	|rL|	| }	t| dkrj|	j|f||S |	j|f||S )NrZ   r[   rY   r   )rC   r+   r]   	transformr^   r/   r/   r0   _groupby_slice_transform   s    rf   c           	      K   sd   |d k	rd|ini }|d k	r$d|ini }|r4|   } | j|fd|i||}|rX|| }|jf |S r\   )Z
sort_indexrC   shift)	r;   r_   r`   ZshuffledrY   rZ   r[   rV   rc   r/   r/   r0   _groupby_slice_shift   s    rh   c                 C   s`   t | |d}zt| r|| }||W S  tk
rZ   t| rH| | } | jdd  Y S X d S )Nr-   r   )rX   r   	get_groupKeyErroriloc)r;   by_keyget_keyr>   rI   r/   r/   r0   _groupby_get_group  s    rn   c                   @   s   e Zd ZdZdddZdS )Aggregationa  User defined groupby-aggregation.

    This class allows users to define their own custom aggregation in terms of
    operations on Pandas dataframes in a map-reduce style. You need to specify
    what operation to do on each chunk of data, how to combine those chunks of
    data together, and then how to finalize the result.

    See :ref:`dataframe.groupby.aggregate` for more.

    Parameters
    ----------
    name : str
        the name of the aggregation. It should be unique, since intermediate
        result will be identified by this name.
    chunk : callable
        a function that will be called with the grouped column of each
        partition. It can either return a single series or a tuple of series.
        The index has to be equal to the groups.
    agg : callable
        a function that will be called to aggregate the results of each chunk.
        Again the argument(s) will be grouped series. If ``chunk`` returned a
        tuple, ``agg`` will be called with all of them as individual positional
        arguments.
    finalize : callable
        an optional finalizer that will be called with the results from the
        aggregation.

    Examples
    --------
    We could implement ``sum`` as follows:

    >>> custom_sum = dd.Aggregation(
    ...     name='custom_sum',
    ...     chunk=lambda s: s.sum(),
    ...     agg=lambda s0: s0.sum()
    ... )  # doctest: +SKIP
    >>> df.groupby('g').agg(custom_sum)  # doctest: +SKIP

    We can implement ``mean`` as follows:

    >>> custom_mean = dd.Aggregation(
    ...     name='custom_mean',
    ...     chunk=lambda s: (s.count(), s.sum()),
    ...     agg=lambda count, sum: (count.sum(), sum.sum()),
    ...     finalize=lambda count, sum: sum / count,
    ... )  # doctest: +SKIP
    >>> df.groupby('g').agg(custom_mean)  # doctest: +SKIP

    Though of course, both of these are built-in and so you don't need to
    implement them yourself.
    Nc                 C   s   || _ || _|| _|| _d S rK   )chunkaggfinalizerE   )selfr=   rp   rq   rr   r/   r/   r0   __init__P  s    zAggregation.__init__)N)rE   
__module____qualname____doc__rt   r/   r/   r/   r0   ro     s   4ro   Fc                 K   sL   |d k	rd|ini }|d k	r$d|ini }| j f ||d||}||f|S )NrZ   r[   levelsort)rC   )r;   aggfunclevelsrZ   rz   r[   rV   rI   r/   r/   r0   _groupby_aggregateW  s    r}   c                 K   sJ   |dk	rd|ini }|dk	r$d|ini }| j f ||d||j|f|S )zq
    A simpler version of _groupby_aggregate that just calls ``aggregate`` using
    the user-provided spec.
    NrZ   r[   rx   )rC   	aggregate)r;   specr|   rZ   rz   r[   rV   r/   r/   r0   _groupby_aggregate_speca  s    r   )rZ   r[   c                O   s(  t | r6|  t|dkr$|d nt|| j }n2| t|}t|tttt	j
fr`t|}|| }|dkr$d}t|jt	jrd}|jj |jj}nJt|jt	jrtdd |jjD rd}t	jjdd |jjD |jjd}|r$|||j  }	t	jt	j|	|jd	}
t	||
g}|S )
z
    A non-aggregation agg function. This simuates the behavior of an initial
    partitionwise aggregation, but doesn't actually aggregate or throw away
    any data.
    r'   r   FTc                 s   s   | ]}t |tjV  qd S rK   )r(   rG   CategoricalIndexr8   ry   r/   r/   r0   rO     s    z!_non_agg_chunk.<locals>.<genexpr>c                 s   s$   | ]}t |tjr|jn|V  qd S rK   )r(   rG   r   
categoriesr   r/   r/   r0   rO     s   
names)rP   r>   )r   to_frame	set_indexr+   r*   r=   r(   r)   r@   rG   rH   rP   r   r   copyrename
MultiIndexanyr|   from_productr   isinr   ZNAr>   r   )r;   r`   rZ   r[   r.   rV   resultZhas_categoricalsZ
full_indexZnew_catsemptyr/   r/   r0   _non_agg_chunko  s4    .	
	r   c                O   s   | d}| d}|d k	r$d|ini }|d k	r8d|ini }t| fd|i||}t| sb|d krn||f|S t|ttttjfrt|}||| f|S d S )Nrp   r>   rZ   r[   r.   )	poprX   r   r(   r)   r*   r@   rG   rH   )r;   rZ   r[   r.   rV   ra   r>   rc   r/   r/   r0   _apply_chunk  s    

r   c              	   G   s   t | r|  } |  } t| |d}t  | }W 5 Q R X ||j  jdd d}|j}| | d | |< t| |d}t  | jdd d}W 5 Q R X t	|||gddS )	Nr-   c                 S   s   | dfS Nz-countr/   cr/   r/   r0   <lambda>      z_var_chunk.<locals>.<lambda>r>      c                 S   s   | dfS )Nz-x2r/   r   r/   r/   r0   r     r   r'   axis)
r   r   r   rX   r	   sumr>   countr   r   )r;   r.   rc   xncolsZg2x2r/   r/   r0   
_var_chunk  s    r   c                 C   s   | j ||d S Nrx   rC   r   )rc   r|   rz   r/   r/   r0   _var_combine  s    r   c           
      C   s   | j ||d } t| j}| | jd |d   }| | j|d d| d   jdd d}| | j| d d   jdd d}||d |  }|| }	d|	|	dk < ||	 }tj||| dk< t|std||dk < |S )	Nrx      r   c                 S   s   | d S Nr   r/   r   r/   r/   r0   r     r   z_var_agg.<locals>.<lambda>r   c                 S   s   | d S r   r/   r   r/   r/   r0   r     r   r   )	rC   r   r+   r>   r   npnanr   AssertionError)
rc   r|   ddofrz   Zncr   r   r   r   divr/   r/   r0   _var_agg  s    
*$r   c                 C   s   | S rK   r/   )rc   r|   r/   r/   r0   _cov_combine  s    r   c                 C   s  g }t ttj|dd}t |}tt|}tt|t|}t| | jdD ]:\}}|| }	|| }
|	||
  }| | }| d|  }| d|  }t	
|| }|d }d||dk < | | | | | |  |  jd |jd  }|rj| | }| | }| | | | d |  jd |jd  }| | | | d |  jd |jd  }|t	
||  }|||< ||krR||	 |
 }|||< qR|}tj||g}tj||dS )Nr   )repeatz%s-countr'   r   )rP   )r+   r*   itproductr,   dictzipcombinations_with_replacementr>   r   sqrtvaluesrG   r   r   r   )r;   r   stdvalsZnum_elementsZnum_colsZcol_idx_mappingrN   jr   yidxZmul_colniZnjr   r   valiiZjjZ	std_val_iZ	std_val_jZlevel_1rP   r/   r/   r0   _cov_finalizer  s:    ,((
r   c                 C   sV   |   }t|dD ](\}}| | }| | | |  ||< qtjt|td|_|S )zInternal function to be used with apply to multiply
    each column in a dataframe by every other column

    a b c -> a*a, a*b, b*b, b*c, c*c
    r   dtype)	__class__r   r   r   zerosr+   intrP   )r;   r   Z_dfrN   r   r9   r/   r/   r0   	_mul_cols
  s    r   c           
         s   t | r|  } |  } t  t| jD ]\}}t| |< q*| j d} | 	 j}t
dd |D }|s fdd|D }|t|}t| |d}| }|jt|djdd	d
}||j  jdd d}	|||	 fS )zCovariance Chunk Logic

    Parameters
    ----------
    df : Pandas.DataFrame
    std : bool, optional
        When std=True we are calculating with Correlation

    Returns
    -------
    tuple
        Processed X, Multiplied Cols,
    r   c                 s   s   | ]}t |V  qd S rK   r   r8   sr/   r/   r0   rO   9  s     z_cov_chunk.<locals>.<genexpr>c                    s   g | ]} | qS r/   r/   )r8   kcol_mappingr/   r0   r<   ;  s     z_cov_chunk.<locals>.<listcomp>r-   )r   Try   dropc                 S   s
   |  dS r   r/   r   r/   r/   r0   r   C  r   z_cov_chunk.<locals>.<lambda>)r   r   r   collectionsOrderedDict	enumerater>   rT   r   Z_get_numeric_datar   r   r   arrayrX   r   r]   r   reset_indexr   )
r;   r.   rN   r   r   is_maskrc   r   mulr   r/   r   r0   
_cov_chunk  s"    
r   c              	   C   s  g }g }g }t | }|d d j}	|D ].\}
}}}||
 || || |}q&t|j||d }t|j||d }t|j|d }t|||gddj|djt|	|d}dd | D }|j	j
}t  }t|dkrtd	d
 |D rt | t|j }|D ]\}|||}|| t|jjd t|k rz|| W n tk
rp   Y nX qt | }tt|jjD ]}|jj||d|_q|j	j|dd |jdd}t|st|S )Nr   rx   ry   r'   r   )r   r   c                 S   s   i | ]\}}||qS r/   r/   r8   r   vr/   r/   r0   
<dictcomp>_  s      z_cov_agg.<locals>.<dictcomp>c                 s   s   | ]}|d kV  qd S rK   r/   )r8   r   r/   r/   r0   rO   e  s     z_cov_agg.<locals>.<genexpr>T)ZinplaceF)rZ   )r*   r>   appendr   rC   r   r]   r   itemsrP   r   r+   rR   keysr@   r3   r|   r   rj   r,   Z
set_levelsZ	set_namesstackr   r   )Z_tr|   r   r   rz   ZsumsZmulscountstr   r   r   r   r   Z
total_sumsZ
total_mulsZtotal_countsr   Zinv_col_mappingZidx_valsZidx_mappingr   Zidx_namer   ry   Zs_resultr/   r/   r0   _cov_aggG  sR    


  "

r   c                 C   s    |   }tjt|td|_|S Nr   )Zdrop_duplicatesr   r   r+   r   rP   )r;   r   r/   r/   r0   _drop_duplicates_reindex  s    r   c                 O   sf   | d}t| |d}t| dkr>||g tjddd}n$||g  }|| j|j	 
 }|S )Nr=   r-   r   r   Tr   )r   rX   r+   r]   r   r   nuniqueZastypeZdtypesr>   Zto_dict)r;   r.   rV   r=   rc   rI   r/   r/   r0   _nunique_df_chunk  s    
r   c                 C   s"   | j ||dtjddd}|S )Nrx   r   Tr   )rC   r]   r   r   )r;   r|   rz   r   r/   r/   r0   _nunique_df_combine  s     r   c                 C   s   | j ||d|  S r   )rC   r   )r;   r|   r=   rz   r/   r/   r0   _nunique_df_aggregate  s    r   c                 O   s:   t | st|  } t| jd t|d}t| f||S )Nr   )r=   r|   )r   r   r   r   r>   r1   r   )r;   r.   Z	_ignored_rV   r/   r/   r0   _nunique_series_chunk  s    r   c                 C   s   | d|dt | | S )N-)r   ra   columnr/   r/   r0   _make_agg_id  s    r   c                    s   t | ts tt|t| } g }t | tr|  D ]X\}t |trf|fdd| D  q6t |t	sv|g}|fdd|D  q6nt
dt|  t	ttf t fdd|  D  }|rdd |D }|S )a  
    Return a list of ``(result_column, func, input_column)`` tuples.

    Spec can be

    - a function
    - a list of functions
    - a dictionary that maps input-columns to functions
    - a dictionary that maps input-columns to a lists of functions
    - a dictionary that maps input-columns to a dictionaries that map
      output-columns to functions.

    The non-group columns are a list of all column names that are not used in
    the groupby operation.

    Usually, the result columns are mutli-level names, returned as tuples.
    If only a single function is supplied or dictionary mapping columns
    to single functions, simple names are returned as strings (see the first
    two examples below).

    Examples
    --------
    >>> _normalize_spec('mean', ['a', 'b', 'c'])
    [('a', 'mean', 'a'), ('b', 'mean', 'b'), ('c', 'mean', 'c')]

    >>> spec = collections.OrderedDict([('a', 'mean'), ('b', 'count')])
    >>> _normalize_spec(spec, ['a', 'b', 'c'])
    [('a', 'mean', 'a'), ('b', 'count', 'b')]

    >>> _normalize_spec(['var', 'mean'], ['a', 'b', 'c'])
    ... # doctest: +NORMALIZE_WHITESPACE
    [(('a', 'var'), 'var', 'a'), (('a', 'mean'), 'mean', 'a'),      (('b', 'var'), 'var', 'b'), (('b', 'mean'), 'mean', 'b'),      (('c', 'var'), 'var', 'c'), (('c', 'mean'), 'mean', 'c')]

    >>> spec = collections.OrderedDict([('a', 'mean'), ('b', ['sum', 'count'])])
    >>> _normalize_spec(spec, ['a', 'b', 'c'])
    ... # doctest: +NORMALIZE_WHITESPACE
    [(('a', 'mean'), 'mean', 'a'), (('b', 'sum'), 'sum', 'b'),       (('b', 'count'), 'count', 'b')]

    >>> spec = collections.OrderedDict()
    >>> spec['a'] = ['mean', 'size']
    >>> spec['b'] = collections.OrderedDict([('e', 'count'), ('f', 'var')])
    >>> _normalize_spec(spec, ['a', 'b', 'c'])
    ... # doctest: +NORMALIZE_WHITESPACE
    [(('a', 'mean'), 'mean', 'a'), (('a', 'size'), 'size', 'a'),      (('b', 'e'), 'count', 'b'), (('b', 'f'), 'var', 'b')]
    c                 3   s    | ]\}} |f| fV  qd S rK   r/   )r8   result_columnra   input_columnr/   r0   rO      s   z"_normalize_spec.<locals>.<genexpr>c                 3   s    | ]} t |f| fV  qd S rK   )r#   )r8   ra   r   r/   r0   rO   	  s   zunsupported agg spec of type c                 3   s   | ]}t | V  qd S rK   )r(   )r8   subspec)	compoundsr/   r0   rO     s    c                 S   s   g | ]\}}}|||fqS r/   r/   )r8   _ra   Z	input_colr/   r/   r0   r<     s     z#_normalize_spec.<locals>.<listcomp>)r(   r   r   r   r   r   r   r   extendr*   rS   rD   r)   r   r   )r   non_group_columnsresr   Zuse_flat_columnsr/   )r   r   r0   _normalize_spec  s,    2






r   c              
   C   sT  t jdt jdt jdt jdt jdi}i }| D ]4\}}}t||||f}||g 	||f q*|
 D ]}t|dkrhtd| qhi }i }	g }
| D ]\}}}d}i }t|tr|j|j }}t|tst|||}t|||||}|d	 D ]} | || d
 < q|d D ]} | |	| d
 < q|
	|d  qt|
 }t|	
 }	||	|
fS )av  
    Create transformation functions for a normalized aggregate spec.

    Parameters
    ----------
    spec: a list of (result-column, aggregation-function, input-column) triples.
        To work with all argument forms understood by pandas use
        ``_normalize_spec`` to normalize the argment before passing it on to
        ``_build_agg_args``.

    Returns
    -------
    chunk_funcs: a list of (intermediate-column, function, keyword) triples
        that are applied on grouped chunks of the initial dataframe.

    agg_funcs: a list of (intermediate-column, functions, keyword) triples that
        are applied on the grouped concatination of the preprocessed chunks.

    finalizers: a list of (result-column, function, keyword) triples that are
        applied after the ``agg_funcs``. They are used to create final results
        from intermediate representations.
    minmaxmedianr   varr'   z#conflicting aggregation functions: r/   chunk_funcsr   aggregate_funcs	finalizer)r   r   r   r   r   r   r#   r3   
setdefaultr   r   r+   rS   r(   r   rb   keywordsro   _build_agg_args_singlesorted)r   Zknown_np_funcsZby_namer   ra   r   r`   funcschunksZaggs
finalizersr   	func_argsfunc_kwargsimplsr/   r/   r0   _build_agg_args  sV         	

    r  c              
   C   s   t jt jft jt jft jt jft jt jft jt jft jt jft jt jft jt jfd t j	fd	}||
 kr|t| |||| S |dkrt| ||||S |dkrt| ||||S |dkrt| ||S |dkrt| ||S t|trt| ||S td| d S )N)	r   r   r   r   sizefirstlastprodr   r   r   meanr*   zunknown aggregate )r    r   r   r   r   r	  r
  r  r  r   r   _build_agg_args_simple_build_agg_args_var_build_agg_args_std_build_agg_args_mean_build_agg_args_listr(   ro   _build_agg_args_customrS   )r   ra   r  r  r   Zsimple_implr/   r/   r0   r   f  sT    







           
r   c                 C   sL   t ||}|\}}t|tt||dfg|tt||dfg| t|t fdS )Nr   ra   r   r   r   r   r   _apply_func_to_columnr$   )r   ra   r   Z	impl_pairintermediateZ
chunk_implZagg_implr/   r/   r0   r    s    

	
r  c           
      C   s   t d|}t d|}t d|}|r6td| d| dh}| | }	|	rftd| d| d|	 t|tt|tjd	f|tt|tjd	f|tt|d
fgdd |||fD | t	tf |||d|fdS )Nr   Zsum2r   zaggregate function 'z0' doesn't support positional arguments, but got r   z' supports z keyword arguments, but got r  )r   c                 S   s    g | ]}|t t|tjd fqS r  r  r   r    r   r7   r/   r/   r0   r<     s   z'_build_agg_args_var.<locals>.<listcomp>)
sum_columncount_columnsum2_columnr  )
r   	TypeErrorr   r   r  r    r   r   _compute_sum_of_squares_finalize_var)
r   ra   r  r  r   int_sumZint_sum2	int_countZexpected_kwargsZunexpected_kwargsr/   r/   r0   r    s@    


r  c                 C   s0   t | ||||}|d \} }}| t|f|d< |S )Nr   )r  _finalize_std)r   ra   r  r  r   r  r   rV   r/   r/   r0   r    s        r  c              	   C   sb   t d|}t d|}t|tt|tjdf|tt|tjdfgdd ||fD | tt||dfdS )Nr   r   r  c                 S   s    g | ]}|t t|tjd fqS r  r  r7   r/   r/   r0   r<     s   z(_build_agg_args_mean.<locals>.<listcomp>)r  r  r  )r   r   r  r    r   r   _finalize_mean)r   ra   r   r!  r"  r/   r/   r0   r    s    


r  c                 C   sL   t d|}t|tt|dd dfg|tt|dd dfg| t|t fdS )Nr*   c                 S   s
   |  tS rK   )r]   r*   )r   r/   r/   r0   r     r   z&_build_agg_args_list.<locals>.<lambda>r  c                 S   s   |  dd S )Nc                 S   s   t tj| S rK   )r*   r   chainfrom_iterable)r  r/   r/   r0   r     r   z8_build_agg_args_list.<locals>.<lambda>.<locals>.<lambda>)r]   )s0r/   r/   r0   r     s   r  r  )r   ra   r   r  r/   r/   r0   r    s"    
	r  c                 C   sr   t t||}|jd kr,| t|t f}n| tt|j|df}t|tt|j|dfg|tt|j	|dfg|dS )N)ra   prefixr   r  )
r   r#   rr   operatorr$   r   _apply_func_to_columnsr  rp   rq   )r   ra   r   r9   r   r/   r/   r0   r    s    
r  c                 O   s   t |r|jt|d |d}t| f|}t }|D ]N\}}}||f|}	t|	trt	|	D ]\}
}||| d|
 < qbq:|	||< q:t
| r| |S | d |S dS )a  
    Group a dataframe and apply multiple aggregation functions.

    Parameters
    ----------
    df: pandas.DataFrame
        The dataframe to work on.
    by: list of groupers
        If given, they are added to the keyword arguments as the ``by``
        argument.
    funcs: list of result-colum, function, keywordargument triples
        The list of functions that are applied on the grouped data frame.
        Has to be passed as a keyword argument.
    kwargs:
        All keyword arguments, but ``funcs``, are passed verbatim to the groupby
        operation of the dataframe

    Returns
    -------
    aggregated:
        the aggregated dataframe.
    r-   r  r   r   N)r+   rU   r*   r   rX   r   r   r(   r)   r   r   r   headr   )r;   r.   rV   r  rI   r   r   ra   r  rr   r   r/   r/   r0   _groupby_apply_funcs&  s    



r-  c                 C   sH   t | dr| j}n| jj}|r.| j| dn
| jd}|| S )Nr_   r   )hasattrr_   groupingr   objpowrC   r   )rI   r   r   r;   r/   r/   r0   r  X  s
    
 r  c           
      K   sJ   t | f|||d|} t }|D ]\}}}	|| f|	||< q$| |S )Nr  ry   rz   )r-  r   r   r   )
r;   r   finalize_funcsry   rz   rV   r   r   ra   Zfinalize_kwargsr/   r/   r0   _agg_finalized  s      r4  c                 C   s   |d kr|| S || | S rK   r/   )df_liker   ra   r/   r/   r0   r  r  s    r  c                    sH   t  r j}n jj}tfdd|D } fdd|D }|| S )Nc                 3   s   | ]}|  r|V  qd S rK   )
startswithr7   )r(  r/   r0   rO     s     
 z)_apply_func_to_columns.<locals>.<genexpr>c                    s   g | ]} | qS r/   r/   r7   )r5  r/   r0   r<     s     z*_apply_func_to_columns.<locals>.<listcomp>)r   r>   r0  r  )r5  r(  ra   r>   r/   )r5  r(  r0   r*  y  s    r*  c                 C   s   | | | |  S rK   r/   )r;   r  r  r/   r/   r0   r$    s    r$  c                 K   sf   | dd}| | }| | }| | }||d |  }	|| }
d|
|
dk < |	|
 }	tj|	|| dk< |	S )Nr   r'   r   r   )r3   r   r   )r;   r  r  r  rV   r   r   r   r   r   r   r/   r/   r0   r     s    r   c                 K   s   t | |||f|}t|S rK   )r   r   r   )r;   r  r  r  rV   r   r/   r/   r0   r#    s    r#  c                 C   s,   |j | |j|d}| j|_|| | |S N
fill_value)reindexr   rP   )partZcum_lastrP   r>   ra   initialZalignr/   r/   r0   _cum_agg_aligned  s    r=  c                 C   s0   | j |j }|| j||d|j||d|dS r7  )rP   unionr:  )abra   r<  r>  r/   r/   r0   _cum_agg_filled  s    rA  c                 C   s   | j ||dd S )Nr8  r'   )add)r?  r@  r9  r/   r/   r0   _cumcount_aggregate  s    rC  c                 C   s   | j |dj||||dS )Nr   )valuemethodlimitr   )r   fillna)groupr.   rD  rE  rF  fillna_axisr/   r/   r0   _fillna_group  s       rJ  c                    s(   | d krdn
d|  d  fdd}|S )N
z

Based on c                    s   d  d| _ | S )Nz9Aggregate using one or more specified operations
        a  
        Parameters
        ----------
        arg : callable, str, list or dict, optional
            Aggregation spec. Accepted combinations are:

            - callable function
            - string function name
            - list of functions and/or function names, e.g. ``[np.sum, 'mean']``
            - dict of column names -> function, function name or list of such.
            - None only if named aggregation syntax is used
        split_every : int, optional
            Number of intermediate partitions that may be aggregated at once.
            This defaults to 8. If your intermediate partitions are likely to
            be small (either due to a small number of groups or a small initial
            partition size), consider increasing this number for better performance.
        split_out : int, optional
            Number of output partitions. Default is 1.
        shuffle : bool or str, optional
            Whether a shuffle-based algorithm should be used. A specific
            algorithm name may also be specified (e.g. ``"tasks"`` or ``"p2p"``).
            The shuffle-based algorithm is likely to be more efficient than
            ``shuffle=False`` when ``split_out>1`` and the number of unique
            groups is large (high cardinality). Default is ``False`` when
            ``split_out = 1``. When ``split_out > 1``, it chooses the algorithm
            set by the ``shuffle`` option in the dask config system, or ``"tasks"``
            if nothing is set.
        kwargs: tuple or pd.NamedAgg, optional
            Used for named aggregations where the keywords are the output column
            names and the values are tuples where the first element is the input
            column name and the second element is the aggregation function.
            ``pandas.NamedAgg`` can also be used as the value. To use the named
            aggregation syntax, arg must be set to None.
        )rw   )ra   Zbased_on_strr/   r0   wrapper  s    
#z%_aggregate_docstring.<locals>.wrapperr/   )based_onrM  r/   rL  r0   _aggregate_docstring  s    &rO  c                   @   s  e Zd ZdZddedddfddZee dd Zej	d	d Zed
d Z
dd Zedd ZdVddZdd Zdd Zdd ZeejjjdWddZeejjjdXddZeejjjefddZeejjjdYd d!ZeejjjdZd"d#Zeejjjd[d$d%Zeejjjd\d&d'Zeejd]d(d)Zeejd^d*d+Z eejjjd_d,d-Z!eejjjd`d.d/Z"eejjjdad0d1Z#eejjjdbd2d3Z$eejjjdcd4d5Z%eejjjddd6d7Z&eejded8d9Z'eejdfd:d;Z(eejjjdgd<d=Z)eejjjdhd>d?Z*eejjjd@dAdBdC Z+e, didDdEZ-e.dFdGdHdI Z/e.dFdGdJdK Z0e.dFdGddddefdLdMZ1djdNdOZ2dkdPdQZ3eejjjdldRdSZ4eejjjdmdTdUZ5dS )n_GroupBya  Superclass for DataFrameGroupBy and SeriesGroupBy

    Parameters
    ----------

    obj: DataFrame or Series
        DataFrame or Series to be grouped
    by: str, list or Series
        The key for grouping
    slice: str, list
        The slice keys applied to GroupBy result
    group_keys: bool | None
        Passed to pandas.DataFrame.groupby()
    dropna: bool
        Whether to drop null values from groupby index
    sort: bool
        Passed along to aggregation methods. If allowed,
        the output aggregation will have sorted keys.
    observed: bool, default False
        This only applies if any of the groupers are Categoricals.
        If True: only show observed values for categorical groupers.
        If False: show all values for categorical groupers.
    NTFc                    s  t |ttfr|n|g}tdd |D r2td|| _d t| jsxt | jtttfsxt	| jsnt
| jrt| jst|t| jst | jtr| jhn| jfdd jD t  ttfst|| _r  n | _t || _|| _t fddt | jttfr| jn| jgD }	|	s8tdt | jtrXdd | jD }
nt | jtrp| jj}
n| j}
i | _|d k	r|| jd	< i | _|d k	r|| jd
< | jjj|
fd|i| j| j| _d S )Nc                 s   s   | ]}t |tjV  qd S rK   )r(   rG   ZGrouper)r8   r`   r/   r/   r0   rO     s     z$_GroupBy.__init__.<locals>.<genexpr>z.pd.Grouper is currently not supported by Dask.c                    s   g | ]}| kr|qS r/   r/   r8   r   )
projectionr/   r0   r<   #  s      z%_GroupBy.__init__.<locals>.<listcomp>c                 3   s(   | ] }t |tr|j jknd V  qdS )TN)r(   r   npartitionsr8   itemr:   r/   r0   rO   ,  s   zHThe grouped object and 'by' of the groupby must have the same divisions.c                 S   s    g | ]}t |tr|jn|qS r/   )r(   r   _metarT  r/   r/   r0   r<   7  s    rZ   r[   rY   )r(   r)   r*   r   NotImplementedError_slicer   isscalarrT   r   r   r   r@   r>  r>   r   r   r   rY   r0  r6   r.   rz   rR   rV  rZ   r[   rC   )rs   r;   r.   slicerY   rZ   rz   r[   Zby_Zpartitions_alignedby_metar/   )r;   rR  r0   rt     sp    








 z_GroupBy.__init__c                 C   s   | j S rK   r-   rs   r/   r/   r0   rP   N  s    z_GroupBy.indexc                 C   s
   || _ d S rK   r-   )rs   rD  r/   r/   r0   rP   S  s    c                 C   s    | j | jd| jd| ji| jS )N)r.   rY   rz   )r.   rY   rZ   rz   r[   r\  r/   r/   r0   _groupby_kwargsW  s     z_GroupBy._groupby_kwargsc                 C   s   t dd S )Na  Iteration of DataFrameGroupBy objects requires computing the groups which may be slow. You probably want to use 'apply' to execute a function for all the columns. To access individual groups, use 'get_group'. To list all the group names, use 'df[<group column>].unique().compute()'.rW  r\  r/   r/   r0   __iter__a  s    z_GroupBy.__iter__c                 C   sl   | j j}t| jtr&dd | jD }nt| jtr<| jj}n| j}|j|fd| ji| j| j	}t
|| jS )z]
        Return a pd.DataFrameGroupBy / pd.SeriesGroupBy which contains sample data.
        c                 S   s    g | ]}t |tr|jn|qS r/   )r(   r   _meta_nonemptyrT  r/   r/   r0   r<   q  s   z+_GroupBy._meta_nonempty.<locals>.<listcomp>rY   )r0  r`  r(   r.   r*   r   rC   rY   r[   rZ   rJ   rX  )rs   sampler[  rI   r/   r/   r0   r`  i  s"    
z_GroupBy._meta_nonemptyr'   c
                 C   sR  t ||}| jdkr(|dkr(ttt |dkr4|}|dkrXt  || j}W 5 Q R X |dkrdi }|	dkrpi }	t|r~|j	n|j
}
| jgt| jtr| jn| jg }| j| }t| j}|rt|t||
d| j| j|t||d| j| j|	||||| jd
S t|ttf ||
d| j|| jt|||tf ||d| j|	| j|t| jdS )zz
        Aggregation with a single function/aggfunc rather than a compound spec
        like in GroupBy.aggregate
        Nr'   )rp   r>   r{   r|   	rp   chunk_kwargsr~   aggregate_kwargstokensplit_everyr4   r   rz   )
rp   rd  r~   metarf  rg  re  r4   split_out_setuprz   )r5   rz   warningswarnSORT_SPLIT_OUT_WARNINGFutureWarningr	   r`  r   r=   r>   r0  r(   r.   r*   _token_prefixr1   _shuffle_aggregater   r[   rZ   r}   r   r   r   )rs   rf  ra   r{   rh  rg  r4   r   rd  re  r>   rb   r|   r/   r/   r0   _single_agg  s    
"

z_GroupBy._single_aggc                    s  |j }t|r|jn|j}tjtr0jnjg}|dk	rt|rN|gn|}t|t|@ }	g }
|D ]P}||	krtt	
 }jjf || j| i_|
||  qj|
| qjn|}
j| }|d }|d }|d }ttjf|
||||dj}t|r| n|}|jf fdd|
D }tj j  fdd	|
D }tt|f||dkrpd
n|tj||dj}t||||}|d| 7 }|d| 7 }i }|jd
f||d
f< tdjjD ]}|dkr|j|d f|||f< n(t||d f|j|d f||f|||f< t|j|f||f|
|dkrBd
n|||f|||f< q|g}jjdkrx|||g7 }tj|||d}t|||j jjS )z(Wrapper for cumulative groupby operationNz-mapz
-take-lastz	-cum-last)rp   r>   rf  rh  c                    s<   i | ]4}|t |r0|t jd g kr0 j| n jjqS r   )r   rY  getattrr0  rP   rM   r\  r/   r0   r     s   z%_GroupBy._cum_agg.<locals>.<dictcomp>c                    s   g | ]} |d qS ))r`   r/   )r8   ind)r_   r/   r0   r<     s     z%_GroupBy._cum_agg.<locals>.<listcomp>r   )r>   rp   rh  rf  r   r'   )dependencies) rV  r   r=   r>   r(   r.   r*   r@   rT   uuidZuuid4r0  assignr   rn  r   r   rZ   r   r   r    r  r   r?   r,   rS  rA  r=  r   Zfrom_collectionsr   Z	divisions)rs   rf  rp   r~   r<  rh  r>   Zby_colsZgrouping_columnsZ	to_renamer.   r9   suffixr=   Z	name_partZ	name_lastZname_cumZcumpart_rawZcumpart_raw_frameZcumpart_extZby_groupersZcumlast_hashdaskrN   rs  graphr/   )r_   rs   r0   _cum_agg  s    





z_GroupBy._cum_aggc                 K   s   t dd S )NzDataFrameGroupBy does not allow compute method.Please chain it with an aggregation method (like ``.mean()``) or get a specific group using ``.get_group()`` before calling ``compute()``r^  )rs   rV   r/   r/   r0   computeN  s    z_GroupBy.computec           
         s^   j }t j tr"|d}d}nd}t jtrX|jf  fdd jjD } j}n2t jtrz|j jd} j}n|}| j}t	||}t jtrdd  jjD }|| }t
|r|t||jj}	n|j|d	d
}	nXt jtr4|d } jj|_t
|r$|td|jj}	n|jdd	d
}	n
|}	 j}|rV|	d  j j}	|	|fS )N
__series__TFc                    s   i | ]}d |  j | qS Z_by_r-   rQ  r\  r/   r0   r   `  s      z%_GroupBy._shuffle.<locals>.<dictcomp>)_byc                 S   s   g | ]}d | qS r}  r/   rQ  r/   r/   r0   r<   m  s     z%_GroupBy._shuffle.<locals>.<listcomp>r'   r   r~  )r0  r(   r   r   r.   r   ru  r>   Z_select_columns_or_indexr   r   r   r   r   r   r=   r   )
rs   rh  r;   convert_back_to_seriesdf2r.   df3r   Zby2Zdf4r/   r\  r0   _shuffleU  s>    



z_GroupBy._shuffler   c                 C   sB   |r(t | trtdq>| jj|dS n| jdtjtjddS d S )N&No axis named 1 for object type Seriesr   cumsumr   rp   r~   r<  )r(   SeriesGroupByrS   r0  r  rz  r    rB  rs   r   r/   r/   r0   r    s
    

z_GroupBy.cumsumc                 C   sB   |r(t | trtdq>| jj|dS n| jdtjtjddS d S )Nr  r   cumprodr'   r  )r(   r  rS   r0  r  rz  r    r   r  r/   r/   r0   r    s
    

z_GroupBy.cumprodc                 C   s(   |t k	rtdt | jdtjtddS )Nz{The `axis` keyword argument is deprecated and will removed in a future release. Previously it was unused and had no effect.cumcountr   r  )r   rj  rk  rm  rz  r    r  rC  r  r/   r/   r0   r    s       z_GroupBy.cumcountc                 C   s:   | j tjd|||d}|r2|j|  |ktjdS |S d S )Nr   ra   rf  rg  r4   r   other)rp  r    r   wherer   r   NaNrs   rg  r4   r   Z	min_countr   r/   r/   r0   r     s    z_GroupBy.sumc                 C   s:   | j tjd|||d}|r2|j|  |ktjdS |S d S )Nr  r  r  )rp  r    r  r  r   r   r  r  r/   r/   r0   r    s    z_GroupBy.prodc                 C   s   | j tjd|||dS )Nr   r  )rp  r    r   rs   rg  r4   r   r/   r/   r0   r     s    z_GroupBy.minc                 C   s   | j tjd|||dS )Nr   r  )rp  r    r   r  r/   r/   r0   r     s    z_GroupBy.maxc              
   C   s"   | j tjdtj|||t|ddS )Nidxminskipnara   rf  r{   rg  r4   r   rd  )rp  r    r  r
  r   rs   rg  r4   r   r   r  r/   r/   r0   r    s    z_GroupBy.idxminc              
   C   s"   | j tjdtj|||t|ddS )Nidxmaxr  r  )rp  r    r  r
  r   r  r/   r/   r0   r    s    z_GroupBy.idxmaxc                 C   s   | j tjdtj|||dS )Nr   ra   rf  r{   rg  r4   r   )rp  r    r   r   r  r/   r/   r0   r     s    z_GroupBy.countc                 C   s:   | j |||d}| j|||d}t|r2||j }|| S )N)rg  r4   r   )r   r   r   r>   )rs   rg  r4   r   r   r   r/   r/   r0   r    s
    
z_GroupBy.meanc                 C   s   |dkrt d|p"tdd p"d}t  | j }W 5 Q R X t|rN|jn|j}t	| j
trf| j
n| j
g}t| jg| dtd|i| j| jttt| j
d| j| j|||| jd
S )	NFeIn order to aggregate with 'median', you must use shuffling-based aggregation (e.g., shuffle='tasks')r   r2   znon-aggr`   rb  )	rf  rp   rd  r~   re  rg  r4   r   rz   )rS   r   r3   r	   r`  r   r   r=   r>   r(   r.   r*   ro  r0  r   r[   rZ   r}   _median_aggregater1   rz   )rs   rg  r4   r   rh  r>   r.   r/   r/   r0   r     s>    
 z_GroupBy.medianc                 C   s   | j dtjtj|||dS )Nr	  )rf  ra   r{   rg  r4   r   )rp  r    r	  r   r  r/   r/   r0   r	  %  s    z_GroupBy.sizec                 C   s   | j d kr|dkrttt t| j}tt| jt	sB| j
| jgn| j
g| j ttt| jd ||dd|i||t| j d}t| j
tr||jd  }| jr|| j }|S )Nr'   r   )r   r|   r|   
rp   r~   combinerf  re  combine_kwargsrg  r4   ri  rz   r   )rz   rj  rk  rl  rm  r1   r.   r   r(   r*   r0  r   r   r   rn  r   r   r>   rX  )rs   r   rg  r4   r|   r   r/   r/   r0   r   0  s.    


z_GroupBy.varc                 C   s$   | j |||d}ttj||d}|S )N)rg  r4   )rh  )r   r   r   r   )rs   r   rg  r4   r   r   r/   r/   r0   r   M  s    z_GroupBy.stdc                 C   s   | j ||ddS )zNGroupby correlation:
        corr(X, Y) = cov(X, Y) / (std_x * std_y)
        T)rg  r4   r   )cov)rs   r   rg  r4   r/   r/   r0   corrS  s    z_GroupBy.corrc           	      C   s   | j dkr|dkrttt t| j}tdd | jD }| jrv|rV| j	| j | _	n t
| jt
| j }| j	| | _	tt| jt
s| j	| jgn| j	g| j ttt| jd |||dd|i||t| j d}t| j	tr||jd	  }| jr|| j }|S )
ai  Groupby covariance is accomplished by

        1. Computing intermediate values for sum, count, and the product of
           all columns: a b c -> a*a, a*b, b*b, b*c, c*c.

        2. The values are then aggregated and the final covariance value is calculated:
           cov(X, Y) = X*Y - Xbar * Ybar

        When `std` is True calculate Correlation
        Nr'   c                 s   s   | ]}t |V  qd S rK   r   r   r/   r/   r0   rO   k  s     z_GroupBy.cov.<locals>.<genexpr>r  )r   r|   r   r|   r  r   )rz   rj  rk  rl  rm  r1   r.   r   rX  r0  r*   r   r(   r   r   r   rn  r   r   r>   )	rs   r   rg  r4   r   r|   r   Zsliced_plusr   r/   r/   r0   r  Z  s:    



z_GroupBy.covc                 C   s   | j tjd|||dS )Nr
  r  )rp  r    r
  r  r/   r/   r0   r
    s    z_GroupBy.firstc                 C   s   | j dtj|||dS )Nr  )rf  ra   rg  r4   r   )rp  r    r  r  r/   r/   r0   r    s    z_GroupBy.lastzHIf the group is not present, Dask will return an empty Series/DataFrame.)Zinconsistenciesc              	   C   s\   | j d }| jj}t|r.| jd k	r.|| j }t|r<|jn|j}tt| j| j	||||dS )Nri   rh  rf  )
rn  rV  r0  r   rX  r>   r=   r   rn   r.   )rs   r`   rf  rh  r>   r/   r/   r0   ri     s    

z_GroupBy.get_groupc                    s  |d krt jdtd d}t||}d }d }d }d }	trttr`|d krt|f|\}}}}n"ttr|d k}|rt	|\}}tj
tr:tjtstjrjh n$tjtrdd jD  nt  jrj}
t|
ts|
g}
n fddj
jD }
t||
}t|tr | j
j}	nptj
trt|tttfrxtd |ig }dd |D }n td |ig }fd	d|D }ntd
j
 t|\}}}tjttfrtjdkrtttj}nd}|	rj
t|	 nj
}tjts&|jg}n|gj }tsPjrPt dt!j" t#dd |D }|rv|svtd|rD|rt$|t%d fdd|j& D ij'jt(|t)jdj'jd||t|t*r|ndj+d
}nXt$|t,|j+dj'jt-tf |||dj'jd||t|t*r6|ndj+d
}nj+d krf|dkrft t.t j+r|dkrt dt/|t,tf |ddj'jt,tf ||ddj'jt-tf |||dj'jd||t0j+d}|r|d k	r|d k	r|j1d d |f }||_|S )NzVsplit_out=None is deprecated, please use a positive integer, or allow the default of 1)categoryr'   c                 S   s$   h | ]}t |tst|r|qS r/   )r(   r)   r   rY  rM   r/   r/   r0   	<setcomp>  s    
 
 z%_GroupBy.aggregate.<locals>.<setcomp>c                    s   g | ]}| kr|qS r/   r/   r7   group_columnsr/   r0   r<     s     z&_GroupBy.aggregate.<locals>.<listcomp>c                 S   s    g | ]\\}}}}|||fqS r/   r/   )r8   r   r   ra   r   r/   r/   r0   r<     s   c                    s    g | ]\}}} j j||fqS r/   )r0  r=   )r8   r   ra   r   r\  r/   r0   r<     s   zaggregate on unknown object r   zXdropna is not a valid argument for dask.groupby.aggif pandas < 1.1.0. Pandas version is c                 s   s    | ]}|d  dt jfkV  qdS )r'   r   N)r   r   r   r/   r/   r0   rO     s     z%_GroupBy.aggregate.<locals>.<genexpr>r  r`   c                    s   g | ]}| kr|qS r/   r/   rQ  r  r/   r0   r<   5  s     )r   r|   r~   r2   rc  )r  rz   )r   r3  ry   zCannot guarantee sorted keys for `split_out>1` and `shuffle=False` Try using `shuffle=True` if you are grouping on a single column. Otherwise, try using split_out=1, or grouping with sort=False.Fr2  )rp   rd  r  r  r~   re  rf  rg  r4   ri  rz   )2rj  rk  rm  r5   r   r(   DataFrameGroupByr%   r  r&   r0  r   r.   r)   r   rY  r*   r@   rX  r>   r   r   r>  r   intersectionr   rS   r  r+   r,   r   rZ   rW  rG   __version__r   ro  r   tolistr[   r   r1   rT   rz   r-  r4  rl  r   r   rk   )rs   argrg  r4   r   rV   Z
relabelingr>   orderZcolumn_projectionr   r   r   r   r  r|   _objZ
chunk_argsZ
has_medianr   r/   )r  rs   r0   r~     sD   







"

 
 
z_GroupBy.aggregate   )padc              	   O   s   | dt}|tkrrtdt| ddd. t||fdd\}}| jj|f||}W 5 Q R X d}tj|dd	 t	|| j
jd
}t| jtrtdd | jD rtd| j}|jo|| j }	|	r| |\}
}n
|}
| j}||d< tt|
|| j|f|t|| jd| j| j|}|S )a  Parallel version of pandas GroupBy.apply

        This mimics the pandas version except for the following:

        1.  If the grouper does not align with the index then this causes a full
            shuffle.  The order of rows within each group may not be preserved.
        2.  Dask's GroupBy.apply is not appropriate for aggregations. For custom
            aggregations, use :class:`dask.dataframe.groupby.Aggregation`.

        .. warning::

           Pandas' groupby-apply can be used to to apply arbitrary functions,
           including aggregations that result in one row per group. Dask's
           groupby-apply will apply ``func`` once on each group, doing a shuffle
           if needed, such that each group is contained in one partition.
           When ``func`` is a reduction, e.g., you'll end up with one row
           per group. To apply a custom aggregation with Dask,
           use :class:`dask.dataframe.groupby.Aggregation`.

        Parameters
        ----------
        func: function
            Function to apply
        args, kwargs : Scalar, Delayed or object
            Arguments and keywords to pass to the function.
        $META

        Returns
        -------
        applied : Series or DataFrame depending on columns keyword
        rh  zgroupby.apply()TZudfZnonemptya  `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series resultr   
stacklevelZparent_metac                 s   s   | ]}t |tV  qd S rK   r(   r   rT  r/   r/   r0   rO     s    z!_GroupBy.apply.<locals>.<genexpr>z?groupby-apply with a multiple Series is currently not supportedrf  rY   )r3   r   r   r#   r   r`  r]   rj  rk  r   rV  r0  r(   r.   r*   r   rW  known_divisions_contains_index_namer  r   rd   rX  rY   r[   rZ   rs   ra   rb   rV   rh  Z	meta_argsmeta_kwargsrW   r;   should_shuffler  r.   r  r/   r/   r0   r]     sN    !	z_GroupBy.applyc              	   O   s   | dt}|tkrrtdt| ddd. t||fdd\}}| jj|f||}W 5 Q R X d}tj|dd	 t	|| j
jd
}t| jtrtdd | jD rtd| j}|jo|| j }	|	r| |\}
}n
|}
| j}||d< tt|
|| j|f|t|| jd| j| j|}|S )a  Parallel version of pandas GroupBy.transform

        This mimics the pandas version except for the following:

        1.  If the grouper does not align with the index then this causes a full
            shuffle.  The order of rows within each group may not be preserved.
        2.  Dask's GroupBy.transform is not appropriate for aggregations. For custom
            aggregations, use :class:`dask.dataframe.groupby.Aggregation`.

        .. warning::

           Pandas' groupby-transform can be used to to apply arbitrary functions,
           including aggregations that result in one row per group. Dask's
           groupby-transform will apply ``func`` once on each group, doing a shuffle
           if needed, such that each group is contained in one partition.
           When ``func`` is a reduction, e.g., you'll end up with one row
           per group. To apply a custom aggregation with Dask,
           use :class:`dask.dataframe.groupby.Aggregation`.

        Parameters
        ----------
        func: function
            Function to apply
        args, kwargs : Scalar, Delayed or object
            Arguments and keywords to pass to the function.
        $META

        Returns
        -------
        applied : Series or DataFrame depending on columns keyword
        rh  zgroupby.transform(r  Tr  r  a  `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
  Before: .transform(func)
  After:  .transform(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .transform(func, meta=('x', 'f8'))            for series resultr   r  r  c                 s   s   | ]}t |tV  qd S rK   r  rT  r/   r/   r0   rO   	  s    z%_GroupBy.transform.<locals>.<genexpr>zCgroupby-transform with a multiple Series is currently not supportedr  )r3   r   r   r#   r   r`  re   rj  rk  r   rV  r0  r(   r.   r*   r   rW  r  r  r  r   rf   rX  rY   r[   rZ   r  r/   r/   r0   re     sN    !	z_GroupBy.transformc              
   C   s   |t krVtddd* t||||ddd}| jjf |}W 5 Q R X d}tj|dd	 t|| jj	d
}t
| jtrtdd | jD rtd| j	}|jo|| j }	|	r| |\}
}n
|}
| j}tt|
|| j|	f||||d| j|d| j| j}|S )a  Parallel version of pandas GroupBy.shift

        This mimics the pandas version except for the following:

        If the grouper does not align with the index then this causes a full
        shuffle.  The order of rows within each group may not be preserved.

        Parameters
        ----------
        periods : Delayed, Scalar or int, default 1
            Number of periods to shift.
        freq : Delayed, Scalar or str, optional
            Frequency string.
        axis : axis to shift, default 0
            Shift direction.
        fill_value : Scalar, Delayed or object, optional
            The scalar value to use for newly introduced missing values.
        $META

        Returns
        -------
        shifted : Series or DataFrame shifted within each group.

        Examples
        --------
        >>> import dask
        >>> ddf = dask.datasets.timeseries(freq="1H")
        >>> result = ddf.groupby("name").shift(1, meta={"id": int, "x": float, "y": float})
        zgroupby.shift()Fr  )periodsfreqr   r9  Tr  a  `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
  Before: .shift(1)
  After:  .shift(1, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .shift(1, meta=('x', 'f8'))            for series resultr   r  r  c                 s   s   | ]}t |tV  qd S rK   r  rT  r/   r/   r0   rO   w	  s    z!_GroupBy.shift.<locals>.<genexpr>z?groupby-shift with a multiple Series is currently not supportedzgroupby-shift)r  r  r   r9  rf  rY   rh  )r   r   r   r`  rg   rj  rk  r   rV  r0  r(   r.   r*   r   rW  r  r  r  r   rh   rX  rY   r[   rZ   )rs   r  r  r   r9  rh  r  rW   r;   r  r  r.   r   r/   r/   r0   rg   ?	  s\    	z_GroupBy.shiftc                 C   s\   ddl m} t|trtd|dk	rHt|ts8td|dk rHtd|| |||||dS )a  Provides rolling transformations.

        .. note::

            Since MultiIndexes are not well supported in Dask, this method returns a
            dataframe with the same index as the original data. The groupby column is
            not added as the first level of the index like pandas does.

            This method works differently from other groupby methods. It does a groupby
            on each partition (plus some overlap). This means that the output has the
            same shape and number of partitions as the original.

        Parameters
        ----------
        window : str, offset
           Size of the moving window. This is the number of observations used
           for calculating the statistic. Data must have a ``DatetimeIndex``
        min_periods : int, default None
            Minimum number of observations in window required to have a value
            (otherwise result is NA).
        center : boolean, default False
            Set the labels at the center of the window.
        win_type : string, default None
            Provide a window type. The recognized window types are identical
            to pandas.
        axis : int, default 0

        Returns
        -------
        a Rolling object on which to call a method to compute a statistic

        Examples
        --------
        >>> import dask
        >>> ddf = dask.datasets.timeseries(freq="1H")
        >>> result = ddf.groupby("name").x.rolling('1D').max()
        r   )RollingGroupbyzrOnly time indexes are supported for rolling groupbys in dask dataframe. ``window`` must be a ``freq`` (e.g. '1H').Nzmin_periods must be an integerzmin_periods must be >= 0)windowmin_periodscenterwin_typer   )Zdask.dataframe.rollingr  r(   r   rS   )rs   r  r  r  r  r   r  r/   r/   r0   rolling	  s$    &

z_GroupBy.rollingc              	   C   sl   t |s|dk	rtd| jjt| j||||d}| jt| j|||||d}trh| jrh|	t
j| jS |S )a  Fill NA/NaN values using the specified method.

        Parameters
        ----------
        value : scalar, default None
            Value to use to fill holes (e.g. 0).
        method : {'bfill', 'ffill', None}, default None
            Method to use for filling holes in reindexed Series. ffill: propagate last
            valid observation forward to next valid. bfill: use next valid observation
            to fill gap.
        axis : {0 or 'index', 1 or 'columns'}
            Axis along which to fill missing values.
        limit : int, default None
            If method is specified, this is the maximum number of consecutive NaN values
            to forward/backward fill. In other words, if there is a gap with more than
            this number of consecutive NaNs, it will only be partially filled. If method
            is not specified, this is the maximum number of entries along the entire
            axis where NaNs will be filled. Must be greater than 0 if not None.

        Returns
        -------
        Series or DataFrame
            Object with missing values filled

        See also
        --------
        pandas.core.groupby.DataFrameGroupBy.fillna
        NzJgroupby-fillna with value=dict/Series/DataFrame is currently not supported)r.   rD  rE  rF  rI  )r.   rD  rE  rF  rI  rh  )r   rY  rW  r`  r]   rJ  r.   r   rY   r   r    	droplevel)rs   rD  rE  rF  r   rh  r   r/   r/   r0   rG  	  s0    	

z_GroupBy.fillnac                 C   s   | j d|dS )NffillrE  rF  rG  rs   rF  r/   r/   r0   r  
  s    z_GroupBy.ffillc                 C   s   | j d|dS )Nbfillr  r  r  r/   r/   r0   r  
  s    z_GroupBy.bfill)NNNr'   NNN)r   )r   )Nr'   NN)Nr'   NN)Nr'   N)Nr'   N)Nr'   NNT)Nr'   NNT)Nr'   N)Nr'   N)Nr'   N)Nr'   N)r'   Nr'   )r'   Nr'   )r'   Nr'   )r'   Nr'   F)Nr'   N)Nr'   N)NNr'   N)NFNr   )NNNN)N)N)6rE   ru   rv   rw   r
   rt   propertyr!   rP   setterr]  r_  r`  rp  rz  r{  r  r"   rG   corerC   ZGroupByr  r  r   r  r   r  r   r   r   r  r  r   r  r   r	  r   r   r  r  r
  r  ri   rO  r~   r   r]   re   rg   r  rG  r  r  r/   r/   r/   r0   rP    s   
L

	
       
\n.				                  
"
.		
        `
U
UY
=
9rP  c                       sX   e Zd ZdZdd Zdd Zdd Zedd	d fdd	Zedd	dddZ	  Z
S )r  zdataframe-groupby-c                 C   sn   t |tr,t| jf| j|| jd| j}n t| jf| j|| jd| j}t |tr^t|}|j	| |_	|S )N)r.   rZ  rz   )
r(   r*   r  r0  r.   rz   rZ   r  r)   rV  )rs   r`   rc   r/   r/   r0   __getitem__
  s.    
    
zDataFrameGroupBy.__getitem__c                 C   s2   t ttt| t| j tttj| j	j
 S rK   )r  r@   dirrD   r*   __dict__filterr    isidentifierr0  r>   r\  r/   r/   r0   __dir__+
  s    
zDataFrameGroupBy.__dir__c              
   C   s<   z
| | W S  t k
r6 } zt||W 5 d }~X Y nX d S rK   )rj   AttributeError)rs   r`   er/   r/   r0   __getattr__4
  s    
zDataFrameGroupBy.__getattr__z*pd.core.groupby.DataFrameGroupBy.aggregaterN  Nr'   c                    s,   |dkr|   S t jf ||||d|S )Nr	  r  rg  r4   r   )r	  superr~   rs   r  rg  r4   r   rV   r   r/   r0   r~   :
  s    zDataFrameGroupBy.aggregatez$pd.core.groupby.DataFrameGroupBy.aggc                 K   s   | j f ||||d|S Nr  r~   r  r/   r/   r0   rq   I
  s    zDataFrameGroupBy.agg)NNr'   N)NNr'   N)rE   ru   rv   rn  r  r  r  rO  r~   rq   __classcell__r/   r/   r  r0   r  
  s   	       r  c                       s   e Zd ZdZd fdd	Zeejjj	dddZ
edd	d fd
d	Zedd	dddZeejjj	dddZeejjj	dddZeejjj	dddZeejjj	dddZ  ZS ) r  zseries-groupby-Nc                    s   |d k	rd|ini }t |trxt |tr*nNt |trht|dkrHtddd |D }|jj|f| n|jj|f| t j|f||d|| d S )Nr[   r   zNo group keys passed!c                 S   s   g | ]}t |ts|qS r/   r  rT  r/   r/   r0   r<   c
  s     
 z*SeriesGroupBy.__init__.<locals>.<listcomp>)r.   rZ  )	r(   r   r*   r+   rS   rV  rC   r  rt   )rs   r;   r.   rZ  r[   rV   Znon_series_itemsr  r/   r0   rt   W
  s    


zSeriesGroupBy.__init__r'   c                 C   s   | j jj}t| j}t| jtr&t}nt}| j	dkrH|dkrHt
tt tt| jtsb| j| jgn| jg| j |ttd||d||dd|i||t| j	dS )a9  
        Examples
        --------
        >>> import pandas as pd
        >>> import dask.dataframe as dd
        >>> d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
        >>> df = pd.DataFrame(data=d)
        >>> ddf = dd.from_pandas(df, 2)
        >>> ddf.groupby(['col1']).col2.nunique().compute()
        Nr'   zseries-groupby-nunique)r|   r=   r|   )rp   r~   r  rf  rd  re  r  rg  r4   ri  rz   )rV  r0  r=   r1   r.   r(   r   r   r   rz   rj  rk  rl  rm  r   r*   r   r   r   )rs   rg  r4   r=   r|   rp   r/   r/   r0   r   m
  s.    


zSeriesGroupBy.nuniquez'pd.core.groupby.SeriesGroupBy.aggregater  c                    sx   t  jf ||||d|}| jrFz|| j }W n tk
rD   Y nX |d k	rtt|ttfstt|trt||jd  }|S )Nr  r   )	r  r~   rX  rj   r(   r*   r   r   r>   )rs   r  rg  r4   r   rV   r   r  r/   r0   r~   
  s*    zSeriesGroupBy.aggregatez!pd.core.groupby.SeriesGroupBy.aggc                 K   s   | j f ||||d|S r  r  r  r/   r/   r0   rq   
  s    zSeriesGroupBy.aggc                 C   s   | j tdt|||dS )Nvalue_countsr  )rp  _value_counts_value_counts_aggregater  r/   r/   r0   r  
  s    zSeriesGroupBy.value_countsc              	   C   s(   | j jj}| jtjdtd|i|||dS )Nuniquer=   )ra   rf  r{   re  rg  r4   r   )rV  r0  r=   rp  r    r  _unique_aggregate)rs   rg  r4   r   r=   r/   r/   r0   r  
  s    
zSeriesGroupBy.unique   c                 C   sH   t | jtrt| jnd}| jtdtt| j	d|i||d|||d	S )Nr'   tailr   r   index_levels	ra   rf  r{   rh  rd  re  rg  r4   r   )
r(   r.   r*   r+   rp  _tail_chunk_tail_aggregater    r  r`  rs   r   rg  r4   r   r  r/   r/   r0   r  
  s    
zSeriesGroupBy.tailc                 C   sH   t | jtrt| jnd}| jtdtt| j	d|i||d|||d	S )Nr'   r+  r   r  r  )
r(   r.   r*   r+   rp  _head_chunk_head_aggregater    r+  r`  r  r/   r/   r0   r+  
  s    
zSeriesGroupBy.head)NNN)Nr'   )NNr'   N)NNr'   N)Nr'   N)Nr'   N)r  Nr'   N)r  Nr'   N)rE   ru   rv   rn  rt   r"   rG   r  rC   r  r   rO  r~   rq   r  r  r  r+  r  r/   r/   r  r0   r  T
  s(   (       	
r  c                 C   s.   t | jdd | D |d}| jjj|j_|S )Nc                 S   s   i | ]\}}||   qS r/   )Zexploder  r   r/   r/   r0   r   
  s      z%_unique_aggregate.<locals>.<dictcomp>)r=   )rD   r0  rP   r   )	series_gbr=   retr/   r/   r0   r  
  s     r  c                 K   s&   t | rtj| f|S tjtdS d S r   )r+   r    r  rG   r   r   )r   rV   r/   r/   r0   r  
  s    r  c                 C   s0   dd | D }t | jjj}ttj||dS )Nc                 S   s"   i | ]\}}||j d d qS )r'   r   r   r   r/   r/   r0   r     s      z+_value_counts_aggregate.<locals>.<dictcomp>r   )r*   r0  rP   r   rG   r   r   )r  Z	to_concatr   r/   r/   r0   r     s    r  c                    s:   t | rt|  nd| ff\}}tj fdd|D |dS )NTc                    s   g | ]}|j f  qS r/   )r  r8   rH  rV   r/   r0   r<     s     z_tail_chunk.<locals>.<listcomp>r   r+   r   rG   r   r  rV   r   groupsr/   r  r0   r    s    r  c                 K   s$   | d}| jf |tt|S Nr  )r   r  r  r*   r,   r  rV   r|   r/   r/   r0   r    s    
r  c                    s:   t | rt|  nd| ff\}}tj fdd|D |dS )Nr  c                    s   g | ]}|j f  qS r/   )r+  r  r  r/   r0   r<     s     z_head_chunk.<locals>.<listcomp>r  r  r  r/   r  r0   r    s    r  c                 K   s$   | d}| jf |tt|S r  )r   r+  r  r*   r,   r  r/   r/   r0   r    s    
r  c              
   K   s*   t   | jf |W  5 Q R  S Q R X d S rK   )r	   r   )r  rV   r/   r/   r0   r    s    r  r'   Tr2   c                 C   s*  |dkrt  }|dkrt  }t| ttfs0| g} dd | D }dd |D }t|dkr`td| }|dkrvd}n(|d	kr|}n|dk st|tstd
|pt| d}t	|f| |dd | D ||d|}t
|r|jj}|d}d}nd}d	}t|j| |}|dk	r.|p$i }||d< |dkrx|dkrxt|jjt|j j }t|dkrxtdt |r|dkrt|j}| }t|j| }t|dkrtd|jt|||
dj	|f|}n|j|j|	||
dj	|f|}|r|d |}||k r&|j|dS |S )a  Shuffle-based groupby aggregation

    This algorithm may be more efficient than ACA for large ``split_out``
    values (required for high-cardinality groupby indices), but it also
    requires the output of ``chunk`` to be a proper DataFrame object.

    Parameters
    ----------
    args :
        Positional arguments for the `chunk` function. All `dask.dataframe`
        objects should be partitioned and indexed equivalently.
    chunk : function [block-per-arg] -> block
        Function to operate on each block of data
    aggregate : function concatenated-block -> block
        Function to operate on the concatenated result of chunk
    token : str, optional
        The name to use for the output keys.
    chunk_kwargs : dict, optional
        Keywords for the chunk function only.
    aggregate_kwargs : dict, optional
        Keywords for the aggregate function only.
    split_every : int, optional
        Number of partitions to aggregate into a shuffle partition.
        Defaults to eight, meaning that the initial partitions are repartitioned
        into groups of eight before the shuffle. Shuffling scales with the number
        of partitions, so it may be helpful to increase this number as a performance
        optimization, but only when the aggregated partition can comfortably
        fit in worker memory.
    split_out : int, optional
        Number of output partitions.
    ignore_index : bool, default False
        Whether the index can be ignored during the shuffle.
    sort : bool
        If allowed, sort the keys of the output aggregation.
    shuffle : str, default "tasks"
        Shuffle option to be used by ``DataFrame.shuffle``.
    Nc                 S   s   g | ]}t |tr|qS r/   )r(   r   r8   r  r/   r/   r0   r<   Z  s     
 z&_shuffle_aggregate.<locals>.<listcomp>c                 S   s   h | ]
}|j qS r/   rS  r  r/   r/   r0   r  \  s     z%_shuffle_aggregate.<locals>.<setcomp>r'   z1All arguments must have same number of partitions   Fz#split_every must be an integer >= 1z-chunkc                 S   s    g | ]}t |tr|jn|qS r/   )r(   r   r`  r  r/   r/   r0   r<   p  s     r  r|  Trz   a  In the future, `sort` for groupby operations will default to `True` to match the behavior of pandas. However, `sort=True` does not work with `split_out>1` when grouping by multiple columns. To retain the current behavior for multiple output partitions, set `sort=False`.zCannot guarantee sorted keys for `split_out>1` when grouping on multiple columns. Try using split_out=1, or grouping with sort=False.)rS  r   )ignore_indexrS  r   r  )r   r(   r)   r*   r+   rS   r   r   r#   r   r   rV  r=   r   r   rS  r@   r>   r   rj  rk  rm  rW  r   r   rP   r   Zrepartition)rb   rp   r~   rf  rd  re  rg  r4   rz   r  r   dfsrS  Z
chunk_namechunkedZseries_namer  Zshuffle_npartitionsr   r   Z
index_colsr   r/   r/   r0   ro    s    3



	

ro  )NNNFN)NNFN)F)F)F)FF)F)F)F)N)N)N)
NNNNNNr'   TFr2   )|r   	itertoolsr   r)  rt  rj  	functoolsr   Znumbersr   Znumpyr   ZpandasrG   rx  r   Z	dask.baser   r   Zdask.dataframe._compatr   r   r	   Zdask.dataframe.corer
   r   r   r   r   r   r   r   r   r   Zdask.dataframe.dispatchr   Zdask.dataframe.methodsr   r   Zdask.dataframe.shuffler   Zdask.dataframe.utilsr   r   r   r   r   r   r   Zdask.highlevelgraphr   Z
dask.utilsr    r!   r"   r#   r$   Zpandas.core.applyr%   r&   rl  r1   r5   r6   rJ   rL   rX   rd   rf   rh   rn   ro   r}   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r   r  r  r  r  r  r  r-  r  r4  r  r*  r$  r   r#  r=  rA  rC  rJ  rO  rP  r  r  r  r  r  r  r  r  r  r  ro  r/   r/   r/   r0   <module>   s   0$	#

.
=         
       
8


#)
<
	
VJ,*2
	

/          9< 
          