U
    /eT                     @   st  d dl Z d dlZd dlmZ d dlZd dlmZ d dlm	Z
 d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZmZmZmZmZmZmZmZ d d
lmZ d dlm Z  d dl!m"Z"m#Z#m$Z$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+m,Z,m-Z-m.Z.m/Z/ e0de1fi Z2dd Z3dd Z4e"eddddddZ5dd Z6dd Z7dd Z8dd  Z9G d!d" d"Z	G d#d$ d$e	Z:dS )%    N)Integral)is_datetime64_any_dtype)Rolling)normalize_arg)tokenize)BlockwiseDepDict)methods)	Scalar_Frame_get_divisions_map_partitions_get_meta_map_partitions_maybe_from_pandasapply_and_enforcenew_dd_object
no_defaultpartitionwise_graph)from_pandas)_maybe_align_partitions)insert_meta_param_descriptionis_dask_collectionis_dataframe_likeis_series_like)unpack_collections)HighLevelGraph)Mapplyderived_fromfuncnamehas_keywordCombinedOutputc                 C   s   d}| d k	r,t |tr,| jd |kr,t||d k	rTt |trT|jd |krTt|dd | ||fD }t|}t|| d k	rt| nd |d k	rt|nd fS )NzqPartition size is less than overlapping window size. Try using ``df.repartition`` to increase the partition size.r   c                 S   s   g | ]}|d k	r|qS N ).0pr!   r!   :/tmp/pip-unpacked-wheel-dbjnr7gq/dask/dataframe/rolling.py
<listcomp>6   s      z#_combined_parts.<locals>.<listcomp>)
isinstancer   shapeNotImplementedErrorr   concatr   len)Z	prev_partZcurrent_partZ	next_partbeforeaftermsgpartscombinedr!   r!   r$   _combined_parts'   s     
r0   c                 O   s   dd |D }|d \}}}dd |D }| ||}	|d kr@d }t |tjrP|}d }
|jd dkrv|	jd |jd  }
|r|
r||
9 }|d kr|	j|d  S t |tjr|}|r|
r||
9 }|	j||  S )Nc                 S   s   g | ]}t |tr|qS r!   r&   r   r"   dfr!   r!   r$   r%   C   s     
 z!overlap_chunk.<locals>.<listcomp>r   c                 S   s"   g | ]}t |tr|d  n|qS r   r1   r"   argr!   r!   r$   r%   F   s     )r&   datetime	timedeltar'   iloc)funcr+   r,   argskwargsdfsr/   Zprev_part_lengthZnext_part_lengthoutZ	expansionr!   r!   r$   overlap_chunkB   s(    
r?   T)metaenforce_metadatatransform_divisionsalign_dataframesc             
      s  t |st|r"t|s"t|dn|}|f| }ttrDtt trXt  ttj	spt tj	rt
|jjjstdn,ttrdkrt tr dkstd|	dd}
|	dd}t| st|
dk	rt f||	}n"dt|  }
t|  f||	}|
 d	| }
|rvt|}zt|}W n4 tk
rt } zt| d
|W 5 d}~X Y nX dd |D }t||| |	|tdd |D r|
dft| tdd |D f|	fi}tj|
||d}t||
S g }g }t|||| ||	 fdd}|D ]l}t|t rH||}|!| |!| qt"|}t#|\}}|rx|!| |$| n
|!| qi }d}|	% D ]:\}}t"|}t#|\}}|$| |||< |rd}qt&| drdd t'dd D }|(dt)| | fdd} |rDt*t+|
|  f||t,d|}n.|rN|	n|}t*t,|
|  f||d|i}tj|
||d}t-||
S )a]	  Apply a function to each partition, sharing rows with adjacent partitions.

    Parameters
    ----------
    func : function
        The function applied to each partition. If this function accepts
        the special ``partition_info`` keyword argument, it will recieve
        information on the partition's relative location within the
        dataframe.
    df: dd.DataFrame, dd.Series
    args, kwargs :
        Positional and keyword arguments to pass to the function.
        Positional arguments are computed on a per-partition basis, while
        keyword arguments are shared across all partitions. The partition
        itself will be the first positional argument, with all other
        arguments passed *after*. Arguments can be ``Scalar``, ``Delayed``,
        or regular Python objects. DataFrame-like args (both dask and
        pandas) will be repartitioned to align (if necessary) before
        applying the function; see ``align_dataframes`` to control this
        behavior.
    enforce_metadata : bool, default True
        Whether to enforce at runtime that the structure of the DataFrame
        produced by ``func`` actually matches the structure of ``meta``.
        This will rename and reorder columns for each partition,
        and will raise an error if this doesn't work,
        but it won't raise if dtypes don't match.
    before : int, timedelta or string timedelta
        The rows to prepend to partition ``i`` from the end of
        partition ``i - 1``.
    after : int, timedelta or string timedelta
        The rows to append to partition ``i`` from the beginning
        of partition ``i + 1``.
    transform_divisions : bool, default True
        Whether to apply the function onto the divisions and apply those
        transformed divisions to the output.
    align_dataframes : bool, default True
        Whether to repartition DataFrame- or Series-like args
        (both dask and pandas) so their divisions align before applying
        the function. This requires all inputs to have known divisions.
        Single-partition inputs will be split into multiple partitions.

        If False, all inputs must have either the same number of partitions
        or a single partition. Single-partition inputs will be broadcast to
        every partition of multi-partition inputs.
    $META

    See Also
    --------
    dd.DataFrame.map_overlap
       zMMust have a `DatetimeIndex` when using string offset for `before` and `after`r   z*before and after must be positive integerstokenNparent_metazoverlap--zx. If you don't want the partitions to be aligned, and are calling `map_overlap` directly, pass `align_dataframes=False`.c                 S   s   g | ]}t |tr|qS r!   )r&   r
   r2   r!   r!   r$   r%      s     
 zmap_overlap.<locals>.<listcomp>c                 s   s   | ]}t |tV  qd S r    )r&   r	   r5   r!   r!   r$   	<genexpr>   s     zmap_overlap.<locals>.<genexpr>c                 S   s   g | ]}|j d fqS r4   )_namer5   r!   r!   r$   r%      s     dependenciesc                    s   i }t | \}}|| t|  \}}|| dt|  }tt||  |D ]*\}\}}	}
||f}t||	|
 f||< qTtj	||| gd}t
||S )Nzoverlap-concat-rJ   )_get_previous_partitionsupdate_get_nexts_partitionsr   	enumeratezipZ__dask_keys__r0   r   from_collectionsr   )r6   dskZprevs_parts_dskprevsZnexts_parts_dsknextsname_aiprevcurrentnextkeygraph)r,   r+   	divisionsr@   r!   r$   _handle_frame_argument   s    

z+map_overlap.<locals>._handle_frame_argumentTFpartition_infoc                 S   s   i | ]\}}|f||d qS ))numberdivisionr!   )r"   rV   r`   r!   r!   r$   
<dictcomp>  s    zmap_overlap.<locals>.<dictcomp>c                    s    ||d| iS )Nr^   r!   )r^   r;   r<   )	orig_funcr!   r$   r:     s    zmap_overlap.<locals>.func)rK   Z_func_metarK   ).r   r   r   r   r&   strpdZto_timedeltar7   r8   r   index_meta_nonemptyZinferred_type	TypeErrorr   
ValueErrorpopcallableAssertionErrorr   r   r   r   r   allr   tupler   rQ   r	   r   r
   appendr   r   extenditemsr   rO   insertr   r   r   r?   r   )r:   r3   r+   r,   r@   rA   rB   rC   r;   r<   namerF   rE   er=   Zlayerr[   Zargs2rK   r]   r6   Zarg2collectionsZkwargs3simplekvr^   rR   Zkwargs4r!   )r,   r+   r\   r@   rc   r$   map_overlap]   s    B




     




rz   c           
      C   s  i }| j }d}dt| | }|rrt|trrg }td| jD ]*}||f}tj||f|f||< || q:|d nt|t	j
rt| j jdd }	||	k rt|g }td| jD ]2}||f}t||d f||f|f||< || q|d ndg| j }||fS )zE
    Helper to get the nexts partitions required for the overlap
    ziPartition size is less than specified window. Try using ``df.repartition`` to increase the partition sizezoverlap-append-rD   Nrb   r   )rI   r   r&   r   rangenpartitionsr   headrp   r7   r8   rf   Seriesr\   diffr9   anyrj   _head_timedelta)
r3   r,   rR   df_nameZtimedelta_partition_messageZname_brT   rV   rZ   deltasr!   r!   r$   rN   :  s0    rN   c                    s  i }| j  dt| | }|rjt|trjdg}t| jd D ]*}||f}tj |f|f||< || q:nLt|t	j
rt| j}| jdd }||k r\|d }	dg}t| jd D ]}||d  }
t|
| |	}|| | }}||kr|dkr|||  }|d }q||f}t fddt||d D  |d f|f||< || qnLdg}t| jd D ]6}||f}t |fg |d f|f||< || qpndg| j }||fS )zH
    Helper to get the previous partitions required for the overlap
    zoverlap-prepend-NrD   rb   r   c                    s   g | ]} |fqS r!   r!   )r"   rx   r   r!   r$   r%     s     z,_get_previous_partitions.<locals>.<listcomp>)rI   r   r&   r   r{   r|   r   tailrp   r7   r8   rf   r~   r\   r   r9   r   max_tail_timedelta)r3   r+   rR   rU   rS   rV   rZ   Zdivsr   Zpt_zZpt_iZlbfirstjr!   r   r$   rL   _  sP    


rL   c                 C   s   ||j | j  | k  S )zReturn rows of ``next_`` whose index is before the last
    observation in ``current`` + ``after``.

    Parameters
    ----------
    current : DataFrame
    next_ : DataFrame
    after : timedelta

    Returns
    -------
    overlapped : DataFrame
    )rg   r   )rX   Znext_r,   r!   r!   r$   r     s    r   c                    s   t  fdd| D }|S )a4  Return the concatenated rows of each dataframe in ``prevs`` whose
    index is after the first observation in ``current`` - ``before``.

    Parameters
    ----------
    current : DataFrame
    prevs : list of DataFrame objects
    before : timedelta

    Returns
    -------
    overlapped : DataFrame
    c                    s$   g | ]}||j j    k qS r!   )rg   min)r"   rW   r+   rX   r!   r$   r%     s     z#_tail_timedelta.<locals>.<listcomp>)r   r)   )rS   rX   r+   selectedr!   r   r$   r     s    r   c                   @   s6  e Zd ZdZd0ddZdd Zed	d
 Zedd Z	dd Z
eedd Zeedd Zeedd Zeedd Zeedd Zeedd Zeedd Zeed1ddZeed2d d!Zeed"d# Zeed$d% Zeed&d' Zeed3d)d*Zeed4d,d-ZeZd.d/ ZdS )5r   z%Provides rolling window calculations.NFr   c                 C   sP   || _ || _|| _|| _|| _|| _|jjf |   t	| jt
rFd nd| _d S )Nfreq)objwindowmin_periodscenteraxiswin_typerd   rolling_rolling_kwargsr&   int	_win_type)selfr   r   r   r   r   r   r!   r!   r$   __init__  s    zRolling.__init__c                 C   s   | j | j| j| j| jdS Nr   r   r   r   r   r   r   r!   r!   r$   r     s    zRolling._rolling_kwargsc                 C   s,   | j dkp*t| jtr | jdkp*| jjdkS )zm
        Indicator for whether the object has a single partition (True)
        or multiple (False).
        )rD   columnsrD   )r   r&   r   r   r   r|   r   r!   r!   r$   _has_single_partition  s
    

zRolling._has_single_partitionc                 O   s   | j f |}t||||S r    )r   getattr)r3   rolling_kwargsrt   r;   r<   r   r!   r!   r$   pandas_rolling_method  s    zRolling.pandas_rolling_methodc                 O   s   |   }| j| jj||f||}| jrL| jj| j||f|||d|S | jrl| jd }| j| d }n*| jdkrt	
| j}d}n| jd }d}t| j| j||||f|||d|S )N)rE   r@      rD   r   r   )r   r   r   rh   r   Zmap_partitionsr   r   r   rf   Z	Timedeltarz   )r   method_namer;   r<   r   r@   r+   r,   r!   r!   r$   _call_method  s\      




zRolling._call_methodc                 C   s
   |  dS )Ncountr   r   r!   r!   r$   r     s    zRolling.countc                 C   s
   |  dS )Ncovr   r   r!   r!   r$   r   !  s    zRolling.covc                 C   s
   |  dS )Nsumr   r   r!   r!   r$   r   %  s    zRolling.sumc                 C   s
   |  dS )Nmeanr   r   r!   r!   r$   r   )  s    zRolling.meanc                 C   s
   |  dS )Nmedianr   r   r!   r!   r$   r   -  s    zRolling.medianc                 C   s
   |  dS )Nr   r   r   r!   r!   r$   r   1  s    zRolling.minc                 C   s
   |  dS )Nr   r   r   r!   r!   r$   r   5  s    zRolling.maxrD   c                 C   s   | j dddS )NstdrD   ddofr   r   r   r!   r!   r$   r   9  s    zRolling.stdc                 C   s   | j dddS )NvarrD   r   r   r   r!   r!   r$   r   =  s    zRolling.varc                 C   s
   |  dS )Nskewr   r   r!   r!   r$   r   A  s    zRolling.skewc                 C   s
   |  dS )Nkurtr   r   r!   r!   r$   r   E  s    zRolling.kurtc                 C   s   |  d|S )Nquantiler   )r   r   r!   r!   r$   r   I  s    zRolling.quantilecythonc           	      C   sp   i }|p
i }|pd}| j jd}t|jdr:t||d}|d krTt|jjd }| j	d|f|||d|S )Nr!   r   engine)r   engine_kwargsrawr   )r   r;   r<   )
r   rd   r   r   r   dictinspect	signature
parametersr   )	r   r:   r   r   r   r;   r<   Zcompat_kwargsr@   r!   r!   r$   r   M  s$    
   zRolling.applyr!   c                 K   s&   |d kri }| j d|f||d|S )Nagg)r;   r<   r   )r   r:   r;   r<   kwdsr!   r!   r$   	aggregatef  s    zRolling.aggregatec              	   C   sJ   dd }|   }| j|d< | j|d< dddd t| |d	D S )
Nc                 S   s    | \}}dddddd}|| S )Nr   rD   r         r   r!   )itemrx   ry   _orderr!   r!   r$   ordero  s    zRolling.__repr__.<locals>.orderr   r   zRolling [{}],c                 s   s(   | ] \}}|d k	r| d| V  qd S )N=r!   )r"   rx   ry   r!   r!   r$   rH   ~  s   z#Rolling.__repr__.<locals>.<genexpr>)rZ   )r   r   r   formatjoinsortedrr   )r   r   r   r!   r!   r$   __repr__n  s    


zRolling.__repr__)NNFNr   )rD   )rD   )Nr   NNN)r!   N)__name__
__module____qualname____doc__r   r   propertyr   staticmethodr   r   r   
pd_Rollingr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r!   r!   r!   r$   r     s`            
	

(









     r   c                       s>   e Zd Zd fdd	ZedddddZ fd	d
Z  ZS )RollingGroupbyNFr   c           	         s   |j | _ |j| _|j}| jd k	rnt| jtr6| jg}n
t| j}t|jtrZ||j n|	|j || }t
 j||||||d d S r   )_groupby_kwargsZ_slice_groupby_slicer   r&   re   listZbyrp   rq   superr   )	r   groupbyr   r   r   r   r   r   Zsliced_plus	__class__r!   r$   r     s&    	


zRollingGroupby.__init__groupby_kwargsgroupby_slicec          	      O   s<   | j f |}|r|| }|jf |}t||||jddS )Nrb   )level)r   r   r   Z
sort_index)	r3   r   rt   r   r   r;   r<   r   r   r!   r!   r$   r     s
    
z$RollingGroupby.pandas_rolling_methodc                    s"   t  j|f|| j| jd|S )Nr   )r   r   r   r   )r   r   r;   r<   r   r!   r$   r     s    zRollingGroupby._call_method)NNFNr   )r   r   r   r   r   r   r   __classcell__r!   r!   r   r$   r     s        !r   );r7   r   Znumbersr   Zpandasrf   Zpandas.api.typesr   Zpandas.core.windowr   r   Zdask.array.corer   Z	dask.baser   Zdask.blockwiser   Zdask.dataframer   Zdask.dataframe.corer	   r
   r   r   r   r   r   r   r   Zdask.dataframe.ior   Zdask.dataframe.multir   Zdask.dataframe.utilsr   r   r   r   Zdask.delayedr   Zdask.highlevelgraphr   Z
dask.utilsr   r   r   r   r   typero   r   r0   r?   rz   rN   rL   r   r   r   r!   r!   r!   r$   <module>   sB   , ]%B A