U
    /ey                     @   s   d dl Z d dlZd dlZd dlmZmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ G dd dZd	d
 Zdd Zd"ddZd#ddZdd Zdd Zdd Zdd Zd$ddZdd Zd d! ZdS )%    N)expand_paths_if_neededget_fs_token_pathsstringify_path)AbstractFileSystem)config)_is_local_fs)natural_sort_keyc                   @   sx   e Zd ZdZedd ZedddZeddd	Zedd
dZedd Z	edddZ
edd Zedd ZdS )Enginez8The API necessary to provide a new Parquet reader/writerc                    s   |dkr| dd nd|kr&td|  dkrRt|d|d\ }} |||fS t tsjtd  |r|td	| t|tttfr|std
dd |D }n
t|g}t	|dd d}  fdd|D ||fS dS )a  Extract filesystem object from urlpath or user arguments

        This classmethod should only be overridden for engines that need
        to handle filesystem implementations other than ``fsspec``
        (e.g. ``pyarrow.fs.S3FileSystem``).

        Parameters
        ----------
        urlpath: str or List[str]
            Source directory for data, or path(s) to individual parquet files.
        filesystem: "fsspec" or fsspec.AbstractFileSystem
            Filesystem backend to use. Default is "fsspec"
        dataset_options: dict
            Engine-specific dataset options.
        open_file_options: dict
            Options to be used for file-opening at read time.
        storage_options: dict
            Options to be passed on to the file-system backend.

        Returns
        -------
        fs: Any
            A global filesystem object to be used for metadata
            processing and file-opening by the engine.
        paths: List[str]
            List of data-source paths.
        dataset_options: dict
            Engine-specific dataset options.
        open_file_options: dict
            Options to be used for file-opening at read time.
        NfsfsspeczPCannot specify a filesystem argument if the 'fs' dataset option is also defined.)Nr   rb)modestorage_optionsz4Expected fsspec.AbstractFileSystem or 'fsspec'. Got zUCannot specify storage_options when an explicit filesystem object is specified. Got: zempty urlpath sequencec                 S   s   g | ]}t |qS  )r   .0ur   r   C/tmp/pip-unpacked-wheel-dbjnr7gq/dask/dataframe/io/parquet/utils.py
<listcomp>]   s     z-Engine.extract_filesystem.<locals>.<listcomp>   c                    s   g | ]}  |qS r   )Z_strip_protocolr   r
   r   r   r   d   s     )
pop
ValueErrorr   
isinstancer   listtuplesetr   r   )clsZurlpath
filesystemdataset_optionsopen_file_optionsr   _pathsr   r   r   extract_filesystem   sB    *  

zEngine.extract_filesystemNFc           	      K   s
   t  dS )a  Gather metadata about a Parquet Dataset to prepare for a read

        This function is called once in the user's Python session to gather
        important metadata about the parquet dataset.

        Parameters
        ----------
        fs: FileSystem
        paths: List[str]
            A list of paths to files (or their equivalents)
        categories: list, dict or None
            Column(s) containing categorical data.
        index: str, List[str], or False
            The column name(s) to be used as the index.
            If set to ``None``, pandas metadata (if available) can be used
            to reset the value in this function
        use_nullable_dtypes: boolean
            Whether to use pandas nullable dtypes (like "string" or "Int64")
            where appropriate when reading parquet files.
        gather_statistics: bool
            Whether or not to gather statistics to calculate divisions
            for the output DataFrame collection.
        filters: list
            List of filters to apply, like ``[('x', '>', 0), ...]``.
        **kwargs: dict (of dicts)
            User-specified arguments to pass on to backend.
            Top level key can be used by engine to select appropriate dict.

        Returns
        -------
        meta: pandas.DataFrame
            An empty DataFrame object to use for metadata.
            Should have appropriate column names and dtypes but need not have
            any actual data
        statistics: Optional[List[Dict]]
            Either None, if no statistics were found, or a list of dictionaries
            of statistics data, one dict for every partition (see the next
            return value).  The statistics should look like the following:

            [
                {'num-rows': 1000, 'columns': [
                    {'name': 'id', 'min': 0, 'max': 100},
                    {'name': 'x', 'min': 0.0, 'max': 1.0},
                    ]},
                ...
            ]
        parts: List[object]
            A list of objects to be passed to ``Engine.read_partition``.
            Each object should represent a piece of data (usually a row-group).
            The type of each object can be anything, as long as the
            engine's read_partition function knows how to interpret it.
        NNotImplementedError)	r   r
   r"   
categoriesindexuse_nullable_dtypesgather_statisticsfilterskwargsr   r   r   read_metadatai   s    @zEngine.read_metadatac                 K   s
   t  dS )a  Read a single piece of a Parquet dataset into a Pandas DataFrame

        This function is called many times in individual tasks

        Parameters
        ----------
        fs: FileSystem
        piece: object
            This is some token that is returned by Engine.read_metadata.
            Typically it represents a row group in a Parquet dataset
        columns: List[str]
            List of column names to pull out of that row group
        index: str, List[str], or False
            The index name(s).
        use_nullable_dtypes: boolean
            Whether to use pandas nullable dtypes (like "string" or "Int64")
            where appropriate when reading parquet files.
        **kwargs:
            Includes `"kwargs"` values stored within the `parts` output
            of `engine.read_metadata`. May also include arguments to be
            passed to the backend (if stored under a top-level `"read"` key).

        Returns
        -------
        A Pandas DataFrame
        Nr$   )r   r
   Zpiececolumnsr'   r(   r+   r   r   r   read_partition   s    zEngine.read_partitionc           	      K   s   t dS )a#  Perform engine-specific initialization steps for this dataset

        Parameters
        ----------
        df: dask.dataframe.DataFrame
        fs: FileSystem
        path: str
            Destination directory for data.  Prepend with protocol like ``s3://``
            or ``hdfs://`` for remote data.
        append: bool
            If True, may use existing metadata (if any) and perform checks
            against the new data being stored.
        partition_on: List(str)
            Column(s) to use for dataset partitioning in parquet.
        ignore_divisions: bool
            Whether or not to ignore old divisions when appending.  Otherwise,
            overlapping divisions will lead to an error being raised.
        division_info: dict
            Dictionary containing the divisions and corresponding column name.
        **kwargs: dict
            Other keyword arguments (including `index_cols`)

        Returns
        -------
        tuple:
            engine-specific instance
            list of filenames, one per partition
        Nr$   )	r   dfr
   pathappendpartition_onZignore_divisionsZdivision_infor+   r   r   r   initialize_write   s    (zEngine.initialize_writec                 K   s   t dS )a
  
        Output a partition of a dask.DataFrame. This will correspond to
        one output file, unless partition_on is set, in which case, it will
        correspond to up to one file in each sub-directory.

        Parameters
        ----------
        df: dask.dataframe.DataFrame
        path: str
            Destination directory for data.  Prepend with protocol like ``s3://``
            or ``hdfs://`` for remote data.
        fs: FileSystem
        filename: str
        partition_on: List(str)
            Column(s) to use for dataset partitioning in parquet.
        return_metadata : bool
            Whether to return list of instances from this write, one for each
            output file. These will be passed to write_metadata if an output
            metadata file is requested.
        **kwargs: dict
            Other keyword arguments (including `fmd` and `index_cols`)

        Returns
        -------
        List of metadata-containing instances (if `return_metadata` is `True`)
        or empty list
        Nr$   )r   r/   r0   r
   filenamer2   Zreturn_metadatar+   r   r   r   write_partition   s    zEngine.write_partitionc                 K   s
   t  dS )a;  
        Write the shared metadata file for a parquet dataset.

        Parameters
        ----------
        parts: List
            Contains metadata objects to write, of the type undrestood by the
            specific implementation
        meta: non-chunk metadata
            Details that do not depend on the specifics of each chunk write,
            typically the schema and pandas metadata, in a format the writer
            can use.
        fs: FileSystem
        path: str
            Output file to write to, usually ``"_metadata"`` in the root of
            the output dataset
        append: boolean
            Whether or not to consolidate new metadata with existing (True)
            or start from scratch (False)
        **kwargs: dict
            Other keyword arguments (including `compression`)
        Nr$   )r   partsmetar
   r0   r1   r+   r   r   r   write_metadata  s    zEngine.write_metadatac                 C   s
   t  dS )a  
        Collect parquet metadata from a file and set the file_path.

        Parameters
        ----------
        path: str
            Parquet-file path to extract metadata from.
        fs: FileSystem
        file_path: str
            Relative path to set as `file_path` in the metadata.

        Returns
        -------
        A metadata object.  The specific type should be recognized
        by the aggregate_metadata method.
        Nr$   )r   r0   r
   	file_pathr   r   r   collect_file_metadata0  s    zEngine.collect_file_metadatac                 C   s
   t  dS )a  
        Aggregate a list of metadata objects and optionally
        write out the final result as a _metadata file.

        Parameters
        ----------
        meta_list: list
            List of metadata objects to be aggregated into a single
            metadata object, and optionally written to disk. The
            specific element type can be engine specific.
        fs: FileSystem
        out_path: str or None
            Directory to write the final _metadata file. If None
            is specified, the aggregated metadata will be returned,
            and nothing will be written to disk.

        Returns
        -------
        If out_path is None, an aggregate metadata object is returned.
        Otherwise, None is returned.
        Nr$   )r   Z	meta_listr
   Zout_pathr   r   r   aggregate_metadataD  s    zEngine.aggregate_metadata)NNFNN)F)FNFN)F)__name__
__module____qualname____doc__classmethodr#   r,   r.   r3   r5   r8   r:   r;   r   r   r   r   r	      s6   
X     A     )
 
r	   c                    s   dd | d D }t d}dd | d D }g }|D ](\}}|rP||rPd}|||f q6dd |D }| d	d
dig}dd |D }|s|rt|d trg }t|}t|  fdd|D }	ndd |D }	t|}
||	|
|fS )aP  Get the set of names from the pandas metadata section

    Parameters
    ----------
    pandas_metadata : dict
        Should conform to the pandas parquet metadata spec

    Returns
    -------
    index_names : list
        List of strings indicating the actual index names
    column_names : list
        List of strings indicating the actual column names
    storage_name_mapping : dict
        Pairs of storage names (e.g. the field names for
        PyArrow) and actual names. The storage and field names will
        differ for index names for certain writers (pyarrow > 0.8).
    column_indexes_names : list
        The names for ``df.columns.name`` or ``df.columns.names`` for
        a MultiIndex in the columns

    Notes
    -----
    This should support metadata written by at least

    * fastparquet>=0.1.3
    * pyarrow>=0.7.0
    c                 S   s"   g | ]}t |tr|d  n|qS name)r   dict)r   nr   r   r   r   {  s   z*_parse_pandas_metadata.<locals>.<listcomp>Zindex_columnsz__index_level_\d+__c                 S   s$   g | ]}| d |d |d fqS )
field_namerB   )getr   xr   r   r   r     s    r-   Nc                 S   s   g | ]\}}||kr|qS r   r   r   storage_namerB   r   r   r   r     s      Zcolumn_indexesrB   c                 S   s   g | ]}|d  qS rA   r   rG   r   r   r   r     s     r   c                    s   g | ]\}}| kr|qS r   r   rI   Zindex_storage_names2r   r   r     s     c                 S   s   g | ]\}}||kr|qS r   r   rI   r   r   r   r     s      )	recompilematchr1   rF   r   rC   r   r   )Zpandas_metadataZindex_storage_namesZindex_name_xprpairsZpairs2rJ   Z	real_nameindex_namesZcolumn_index_namescolumn_namesZstorage_name_mappingr   rK   r   _parse_pandas_metadata^  s4    

rR   c                    s   | dk	}|dk	}| dkr"t |} nt| tr4| g} nt | } |dkrJ|}n0|dkr`g }|| }nt|trr|g}nt |}|r|s|fdd|D  nP|r|s|   fdd|D n0|r|r|  |t rtdn| | fS )aD  Normalize user and file-provided column and index names

    Parameters
    ----------
    user_columns : None, str or list of str
    data_columns : list of str
    user_index : None, str, or list of str
    data_index : list of str

    Returns
    -------
    column_names : list of str
    index_names : list of str
    NFc                    s   g | ]}| kr|qS r   r   rG   )rP   r   r   r     s      z,_normalize_index_columns.<locals>.<listcomp>c                    s   g | ]}| kr|qS r   r   rG   )rQ   r   r   r     s      z3Specified index and column names must not intersect)r   r   strr   intersectionr   )Zuser_columnsZdata_columnsZ
user_indexZ
data_indexZspecified_columnsZspecified_indexr   )rQ   rP   r   _normalize_index_columns  s:    




rU   Fc                 C   s(   t | td} t| ||d\}}| ||fS )N)key)root)sortedr   _analyze_paths)	file_listr
   rW   basefnsr   r   r   _sort_and_analyze_paths  s    r]   c           
         s   fdd  fdd| D }|dkr|d dd |D ]H}t |d	 }tt|D ]\}\}}||krX|} qvqXd| q:t n6 |d
t tfdd|D stdg }	|D ]}|	d
|d  qd
|	fS )zConsolidate list of file-paths into parquet relative paths

    Note: This function was mostly copied from dask/fastparquet to
    use in both `FastParquetEngine` and `ArrowEngine`.c            	         sb   fdd}d}| r| d r| d d dkrNd}t | } | d dd  | d< nL jdkr| d dd  dr| d dd	 }t | } | d d	d  | d< g }t| D ]\}}||||d qg }|D ]h}|d
krq|dkr,|r|d dkr|| n|  n|r tdn
|| q|| q|sP|rJ|}nd
}n|d	| }|S )Nc                    sV   |  jd}|dkrdS |d dkr2|d d }| dkrR|d dkrR|dd  }|S )N/ .r   r   )replacesep)ipr   r   r   _scrub  s    z2_analyze_paths.<locals>._join_path.<locals>._scrubr_   r   r^   r   \z:/   r`   z..ra   zcan not get parent of root)
r   rc   
startswith	enumerateextendsplitr1   r   	Exceptionjoin)	r0   rf   Z
abs_prefixZ	_scrubbedrd   re   ZsimplersZjoinedr   r   r   
_join_path  sB     


z"_analyze_paths.<locals>._join_pathc                    s   g | ]} | d qS )r^   )rl   )r   fn)rp   r   r   r   2  s     z"_analyze_paths.<locals>.<listcomp>Fr   Nra   r   r^   c                 3   s   | ]}|d   kV  qd S )Nr   )r   re   )basepathlr   r   	<genexpr>@  s    z!_analyze_paths.<locals>.<genexpr>z(All paths must begin with the given root)lenrj   ziprl   allAssertionErrorr1   rn   )
rZ   r
   rW   Zpath_parts_list
path_partsjkZ	base_partZ	path_partZout_listr   )rp   rr   r
   rs   r   rY     s6    6

rY   c           
   	   C   sD  t |dk ri S t |dkr4t |dks,t|d S t |dkrtt|}| |d  |d  |d  g d}n | |d d d|d d g d}d}t |dkrt|}t|D ]\}}|d }	|dk r|d ||d |	 |d |	d  d	 q|d ||jdd|	f 	 |jdd|	d f 
 d	 q|S dS )
z~Utility to aggregate the statistics for N row-groups
    into a single dictionary.

    Used by `Engine._construct_parts`
    r   r   num-rowstotal_byte_size)Zfile_path_0r|   znum-row-groupsr}   r-   N   r-   )rB   minmax)ru   rx   pdZ	DataFramesumcountrj   r1   Zilocr   r   )
r9   file_row_group_statsfile_row_group_column_statsstat_col_indicesZdf_rgsro   Zdf_colsindrB   rd   r   r   r   _aggregate_statsO  sL    



	




r   c	                 C   sn  g }	g }
|rt |}d}| D ]\}}t|}|rNdgtt||| }ntt|||}|D ]}|| }|dkr|r|dkr|}d}|| }|dkr|}||| }|||f|}|d krqb|	| | rbt||| || || || |}|
| qbq"n`| D ]V\}}|||f|}|d kr2q|	| | rt||| || |}|
| q|	|
fS )Nr   T)intitemsru   r   ranger1   r   )r)   split_row_groupsaggregation_depthZfile_row_groupsr   r   r   Zmake_part_funcZmake_part_kwargsr6   statsZresidualr4   
row_groupsZrow_group_countZ_rgsrd   Zi_endZ	_residualZrg_listpartstatr   r   r   _row_groups_to_parts  sp    


r   c                 C   s<   | }t | tr8| |kr*t|||  }nt|  d|S )Nz) is not a recognized directory partition.)r   rS   ru   r'   r   )Zaggregate_filesZpartition_namesr   r   r   r   _get_aggregation_depth  s    

r   c                 C   s,   | d kr(dt |rdnd }t|dS | S )Nz%dataframe.parquet.metadata-task-size-localremoter   )r   r   rF   )Zmetadata_task_sizer
   Z
config_strr   r   r   _set_metadata_task_size  s    r   	readaheadTc              	   C   s   | pi   } | di   }|s$i }d| kr|dd dkrj| dd| d< |||||d|d n | d|| d< | d	d
| d	< || fS )Nprecache_optionsZopen_file_funcmethodZparquetZ
cache_typer6   engine)metadatar-   r   r   r   r   )copyr   rF   update)r    r   r-   r   Zdefault_engineZdefault_cacheZallow_precacher   r   r   r   _process_open_file_options  s.     
	 r   c                  K   sh   |   }d|krtdt |di   |di   }|di   }|di   }||||fS )Nfilez^Passing user options with the 'file' argument is now deprecated. Please use 'dataset' instead.Zdatasetreadr    )r   warningswarnFutureWarningr   )r+   Zuser_kwargsr   Zread_optionsr    r   r   r   _split_user_options5  s     r   c                 C   s4   |st |dkr|s||r$d} n|s,d} t| S )Nr   TF)r   rT   bool)r)   	chunksizer   r   Zfilter_columnsZstat_columnsr   r   r   _set_gather_statisticsP  s    
	r   )F)F)NNNNr   T)rL   r   Zpandasr   Zfsspec.corer   r   r   Zfsspec.specr   Zdaskr   Zdask.dataframe.io.utilsr   Z
dask.utilsr   r	   rR   rU   r]   rY   r   r   r   r   r   r   r   r   r   r   r   <module>   s6     ST>

Y@Q!      
&