U
    /ei                  
   @   s  d dl Z d dlmZ d dlmZ d dlmZmZmZ zd dl	Z	W n e
k
rX   dZ	Y nX d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZmZmZmZ d dlmZ d dl m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0 G dd de)Z1d?ddZ2dd Z3d@ddZ4dd Z5d d! Z6d"d# Z7d$d% Z8e8 Z9dAd*d+Z:d,Z;d-d. Z<e%j=d/d0d1e<ej>d0d2Z>e<ej?d3d4Z?e<ej@d5d6Z@dd7d8d9ZAdBd<d=ZBd d>lCmDZD eBjEeDjB_EdS )C    N)Mapping)BytesIO)catch_warningssimplefilterwarn)compr)get_fs_token_paths)open)
open_files)infer_compression)CategoricalDtypeis_datetime64_any_dtypeis_float_dtypeis_integer_dtypeis_object_dtype)tokenize)
read_bytes)flatten)dataframe_creation_dispatch)from_map)DataFrameIOFunction)clear_known_categories)delayed)
asciitableparse_bytesc                   @   s4   e Zd ZdZdd Zedd Zdd Zdd	 Zd
S )CSVFunctionWrapperzg
    CSV Function-Wrapper Class
    Reads CSV data from disk to produce a partition (given a key).
    c
           
      C   s:   || _ || _|| _|| _|| _|| _|| _|| _|	| _d S N)	full_columns_columnscolnameheadheaderreaderdtypesenforcekwargs)
selfr   columnsr   r    r!   r"   r#   r$   r%    r(   9/tmp/pip-unpacked-wheel-dbjnr7gq/dask/dataframe/io/csv.py__init__+   s    zCSVFunctionWrapper.__init__c                 C   s*   | j d kr| jS | jr$| j | jg S | j S r   )r   r   r   r&   r(   r(   r)   r'   A   s
    
zCSVFunctionWrapper.columnsc              
      s    fddj jD   jkr$S jrHj krHj  jg  }n
j   }tj j|jjfdd D jj	S )zUReturn a new CSVFunctionWrapper object with
        a sub-column projection.
        c                    s   g | ]}| kr|qS r(   r(   .0c)r'   r(   r)   
<listcomp>N   s      z6CSVFunctionWrapper.project_columns.<locals>.<listcomp>c                    s   i | ]}| j | qS r(   )r#   r,   r+   r(   r)   
<dictcomp>^   s      z6CSVFunctionWrapper.project_columns.<locals>.<dictcomp>)	r    r'   r   r   r   r!   r"   r$   r%   )r&   r'   r    r(   )r'   r&   r)   project_columnsI   s"    

z"CSVFunctionWrapper.project_columnsc              
   C   s   |\}}}}|d k	r6| j |tt| j| j  jjf}nd }d}| j }|s|dd d kr`d}|	dd  |ddd k	r|	dd  |s|	dd  | j
}	d}
| jd k	r| jrd}
n| j}	|	|d< t| j|| j|| j|	|| j|	}|
r|| j S |S )	NFnamesTskiprowsr!   r   
skipfooterZusecols)r   sortedlistr    cat
categoriesr%   copygetpopr   r   pandas_read_textr"   r!   r#   r$   r'   )r&   partblockpathis_firstis_lastZ	path_infowrite_headerZrest_kwargsr'   Zproject_after_readdfr(   r(   r)   __call__c   sL    


zCSVFunctionWrapper.__call__N)	__name__
__module____qualname____doc__r*   propertyr'   r1   rD   r(   r(   r(   r)   r   %   s   
r   TFc	              	   C   s   t  }	|r"|| s"|	| |	| |	d | |	f|}
|rPt|
| |rx|rxt|
jt|krxtd|
j||r|\}}}|	|}|
j
f |tjtt|
||i}
|
S )aa  Convert a block of bytes to a Pandas DataFrame

    Parameters
    ----------
    reader : callable
        ``pd.read_csv`` or ``pd.read_table``.
    b : bytestring
        The content to be parsed with ``reader``
    header : bytestring
        An optional header to prepend to ``b``
    kwargs : dict
        A dictionary of keyword arguments to be passed to ``reader``
    dtypes : dict
        dtypes to assign to columns
    path : tuple
        A tuple containing path column name, path to file, and an ordered list of paths.

    See Also
    --------
    dask.dataframe.csv.read_pandas_from_bytes
    r   zColumns do not match)r   
startswithrstripwriteseekcoerce_dtypesr6   r'   
ValueErrorindexassignpdCategorical
from_codesnpfulllen)r"   br!   r%   r#   r'   rB   r$   r?   ZbiorC   r   pathscoder(   r(   r)   r<      s"     





r<   c                 C   s  g }g }g }| j D ]}||kr| j| || kr| j| }|| }t|rdt|rd||||f qt|rt|r|| qz| | || | |< W q tk
r } z"||||f |||f W 5 d}~X Y qX q|rl|rd	dd t
|dd dD }	d|	 }
d	}nd	}
d
}t
|dd d}tdddg|}dd	dd |D  }dj||
||d}nd}|r|rdnd}d	dd |D }dj||d}nd}|s|rdd }d|	td||g }t|dS )zCoerce dataframe to dtypes safely

    Operates in place

    Parameters
    ----------
    df: Pandas DataFrame
    dtypes: dict like {'x': float}
    N
c                 s   s"   | ]\}}d | d|V  qdS )z- z
  Nr(   )r-   r.   er(   r(   r)   	<genexpr>   s    z coerce_dtypes.<locals>.<genexpr>c                 S   s   t | d S Nr   strxr(   r(   r)   <lambda>       zcoerce_dtypes.<locals>.<lambda>)keyzAThe following columns also raised exceptions on conversion:

%s

 zf

Alternatively, provide `assume_missing=True` to interpret
all unspecified integer columns as floats.c                 S   s   t | d S r^   r_   ra   r(   r(   r)   rc     rd   ZColumnFoundZExpectedz
dtype={%s}z	,
       c                 s   s$   | ]\}}}|d | dV  qdS )z: ''Nr(   )r-   kv_r(   r(   r)   r]     s    z{table}

{exceptions}Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

{dtype_kw}

to the call to `read_csv`/`read_table`.{extra})table
exceptionsdtype_kwextraz also  c                 s   s   | ]}d | V  qdS )z- %sNr(   r,   r(   r(   r)   r]     s     a  The following columns{also}failed to properly parse as dates:

{cols}

This is usually due to an invalid value in that column. To
diagnose and fix it's recommended to drop these columns from the
`parse_dates` keyword, and manually convert them to dates later
using `dd.to_datetime`.)alsocolsz

%s

z=-------------------------------------------------------------z=Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

%s)r'   r#   r   r   appendr   r   astype	Exceptionjoinr5   r   formatfilterrO   )rC   r#   Z
bad_dtypesZ	bad_dateserrorsr.   actualZdesiredr\   exrm   ro   rl   rn   Z	dtype_msgrq   rr   Zdate_msgZrulemsgr(   r(   r)   rN      sp    


"

   
 	rN   c
                    s  |j  }
|jdgdj}ttrBfdd|D }||}n|}|D ]}d|
|< qJt|j}tt	|}tt
|}tt|}|r|\} dd |D } r fdd|D }|jf |tjtjt|tdt|i}||f}t|rt||d}g }|p
d	\}}tt|D ]2}||| |r8|| nd
|| || g qtt|d
|||| |
||	||dt| |	||||dddS )a  Convert blocks of bytes to a dask.dataframe

    This accepts a list of lists of values of bytes where each list corresponds
    to one file, and the value of bytes concatenate to comprise the entire
    file, in order.

    Parameters
    ----------
    reader : callable
        ``pd.read_csv`` or ``pd.read_table``.
    block_lists : list of lists of delayed values of bytes
        The lists of bytestrings where each list corresponds to one logical file
    header : bytestring
        The header, found at the front of the first file, to be prepended to
        all blocks
    head : pd.DataFrame
        An example Pandas DataFrame to be used for metadata.
    kwargs : dict
        Keyword arguments to pass down to ``reader``
    path : tuple, optional
        A tuple containing column name for path and the path_converter if provided

    Returns
    -------
    A dask.dataframe
    category)includec                    s0   g | ](}t  |tr |jd k	r|qS r   )
isinstancer:   r   r8   )r-   ri   )specified_dtypesr(   r)   r/   [  s   z)text_blocks_to_pandas.<locals>.<listcomp>c                 S   s   g | ]}|d  j qS )   )r?   )r-   rX   r(   r(   r)   r/   r  s     c                    s   g | ]} |qS r(   r(   )r-   p)path_converterr(   r)   r/   t  s     )dtype)rr   )NNNzread-csvFT)metalabeltokenZenforce_metadataZproduces_tasks)r#   Zto_dictZselect_dtypesr'   r   r   
differencer6   tupler   
block_maskblock_mask_lastrQ   rR   rS   rT   rU   zerosrW   intsetr   rangers   r   r   r   )r"   block_listsr!   r    r%   r$   r   r?   	blocksizeurlpathr#   ZcategoricalsZknown_categoricalsZunknown_categoricalsri   r'   blocksr@   rA   r   rY   partsir(   )r   r   r)   text_blocks_to_pandas,  sj    &




  0r   c                 c   s6   | D ],}|sqdV  dd |dd D E dH  qdS )z
    Yields a flat iterable of booleans to mark the zeroth elements of the
    nested input ``block_lists`` in a flattened output.

    >>> list(block_mask([[1, 2], [3, 4], [5]]))
    [True, False, True, False, True]
    Tc                 s   s   | ]
}d V  qdS FNr(   r-   rk   r(   r(   r)   r]     s     zblock_mask.<locals>.<genexpr>r   Nr(   r   r>   r(   r(   r)   r     s
    r   c                 c   s6   | D ],}|sqdd |dd D E dH  dV  qdS )z
    Yields a flat iterable of booleans to mark the last element of the
    nested input ``block_lists`` in a flattened output.

    >>> list(block_mask_last([[1, 2], [3, 4], [5]]))
    [False, True, False, True, True]
    c                 s   s   | ]
}d V  qdS r   r(   r   r(   r(   r)   r]     s     z"block_mask_last.<locals>.<genexpr>NTr(   r   r(   r(   r)   r     s
    r   c                 C   s"   d}t | | | }t|t dS )N
   g    A)r   min)Ztotal_memory	cpu_countZmemory_factorr   r(   r(   r)   auto_blocksize  s    r   c               	   C   sP   d} t d k	rLt " tdt t  j}t  }W 5 Q R X |rL|rLt||S | S )Ni   ignore)psutilr   r   RuntimeWarningZvirtual_memorytotalr   r   )defaultZmemcpur(   r(   r)   _infer_block_size  s    


r   r   infer  r   c           $      K   s(  | j }|d k	r$t|dkr$||d< nd}|
r:t|
tr:d}
d|ksJd|krZtd| dd	D ]}||kr^t| d
| q^|dd rtd|t|dtr|d } }}nR|dd krd } }}n6t|d}t	|}t
ttt|d t| }t|dtr.td| t|dtrZ|
rZ|d|
d }nd }|dkrt|d|	dd }t|dkrt| dt|d }|dkrt}t|trt|}|r|rtd|  d }|tkrtd| |r"|r"||k r"|dkr"td |}| }t|f|||||
d|	pFi }|
rf|\}}}|
|f}n|\}}d }t|d ttfs|g}|dkrt|d r|d d  }|dd }|d|d krdnd }|d krdnd}|drzg }||D ]p}| |d}t|dkrVt|d dkr`||d    n
|| t||kr qqn|||| }|sdnt|t|d   }|dk	r||| k rt||krtd t|tr||7 }|d krd!n
|| | }|  }|!d"d  z| t"|fd|i|} W nB t#j$j%k
rx }! zd#t|!krftd$|! W 5 d }!~!X Y nX |
r|
| j&krtd%|
 |d&i }"|"d kri }"|rt|"tr| j&D ]2}#t'| |# j(r|#|"kr| |# )t*| |#< qd'd( |D }t+| ||| |||"|||d)
S )*Nr   lineterminatorr[   r?   rP   Z	index_colz7Keywords 'index' and 'index_col' not supported. Use dd.z#(...).set_index('my-index') instead)iterator	chunksizez not supported for dd.ZnrowszThe 'nrows' keyword is not supported by `dd.{0}`. To achieve the same behavior, it's recommended to use `dd.{0}(...).head(n=nrows)`r3   r   r!   z)List of header rows not supported for dd.
convertersr   rb)modestorage_options   z resolved to no filesr   zWarning %s compression does not support breaking apart files
Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``z#Compression format %s not installedz}Unexpected behavior can result from passing skiprows when
blocksize is smaller than sample size.
Setting ``sample=blocksize``)	delimiterr   samplecompressionZinclude_pathFr2   commentr   zSample is not large enough to include at least one row of data. Please increase the number of bytes in `sample` in the call to `read_csv`/`read_table`rd   r4   EOFzEOF encountered while reading header. 
Pass argument `sample_rows` and make sure the value of `sample` is large enough to accommodate that many rows of datazFiles already contain the column name: %s, so the path column cannot use this name. Please set `include_path_column` to a unique name.r   c                 S   s   g | ]}d d |D qS )c                 S   s   g | ]}t |j qS r(   )r6   daskvalues)r-   Zdskr(   r(   r)   r/     s     z*read_pandas.<locals>.<listcomp>.<listcomp>r(   )r-   r>   r(   r(   r)   r/     s     zread_pandas.<locals>.<listcomp>)r$   r   r?   r   r   ),rE   rW   r   boolrO   r:   rw   r   r   maxr   r   r6   	TypeErrordictr   OSErrorr   AUTO_BLOCKSIZEr`   r   r   r   NotImplementedErrorencoder   r   computesplitdecoders   stripr9   r;   r   rR   ry   ZParserErrorr'   r   r   rt   floatr   )$r"   r   r   r   r   r   sample_rowsr$   assume_missingr   include_path_columnr%   reader_namekwr3   ZlastskiprowZfirstrowr   rY   Zb_lineterminatorZb_outZb_sampler   r?   r2   r!   Zneedr   r=   Zsplit_commentZnpartsZhead_kwargsr    r\   r   r.   r(   r(   r)   read_pandas  s   

 


 



 &

r   a  
Read {file_type} files into a Dask.DataFrame

This parallelizes the :func:`pandas.{reader}` function in the following ways:

- It supports loading many files at once using globstrings:

    >>> df = dd.{reader}('myfiles.*.csv')  # doctest: +SKIP

- In some cases it can break up large files:

    >>> df = dd.{reader}('largefile.csv', blocksize=25e6)  # 25MB chunks  # doctest: +SKIP

- It can read CSV files from external resources (e.g. S3, HDFS) by
  providing a URL:

    >>> df = dd.{reader}('s3://bucket/myfiles.*.csv')  # doctest: +SKIP
    >>> df = dd.{reader}('hdfs:///myfiles.*.csv')  # doctest: +SKIP
    >>> df = dd.{reader}('hdfs://namenode.example.com/myfiles.*.csv')  # doctest: +SKIP

Internally ``dd.{reader}`` uses :func:`pandas.{reader}` and supports many of the
same keyword arguments with the same performance guarantees. See the docstring
for :func:`pandas.{reader}` for more information on available keyword arguments.

Parameters
----------
urlpath : string or list
    Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
    to read from alternative filesystems. To read from multiple files you
    can pass a globstring or a list of paths, with the caveat that they
    must all have the same protocol.
blocksize : str, int or None, optional
    Number of bytes by which to cut up larger files. Default value is computed
    based on available physical memory and the number of cores, up to a maximum
    of 64MB. Can be a number like ``64000000`` or a string like ``"64MB"``. If
    ``None``, a single block is used for each file.
sample : int, optional
    Number of bytes to use when determining dtypes
assume_missing : bool, optional
    If True, all integer columns that aren't specified in ``dtype`` are assumed
    to contain missing values, and are converted to floats. Default is False.
storage_options : dict, optional
    Extra options that make sense for a particular storage connection, e.g.
    host, port, username, password, etc.
include_path_column : bool or str, optional
    Whether or not to include the path to each particular file. If True a new
    column is added to the dataframe called ``path``. If str, sets new column
    name. Default is False.
**kwargs
    Extra keyword arguments to forward to :func:`pandas.{reader}`.

Notes
-----
Dask dataframe tries to infer the ``dtype`` of each column by reading a sample
from the start of the file (or of the first file if it's a glob). Usually this
works fine, but if the ``dtype`` is different later in the file (or in other
files) this can cause issues. For example, if all the rows in the sample had
integer dtypes, but later on there was a ``NaN``, then this would error at
compute time. To fix this, you have a few options:

- Provide explicit dtypes for the offending columns using the ``dtype``
  keyword. This is the recommended solution.

- Use the ``assume_missing`` keyword to assume that all columns inferred as
  integers contain missing values, and convert them to floats.

- Increase the size of the sample using the ``sample`` keyword.

It should also be noted that this function may fail if a {file_type} file
includes quoted strings that contain the line terminator. To get around this
you can specify ``blocksize=None`` to not split files into multiple partitions,
at the cost of reduced parallelism.
c              	      s(   d	 fdd	}t j||d|_||_|S )
Nr   r   r   r   Fc
                    s&   t  | f|||||||||	d	|
S )N)	r   r   r   r   r   r$   r   r   r   )r   )r   r   r   r   r   r   r$   r   r   r   r%   r"   r(   r)   read  s     zmake_reader.<locals>.read)r"   	file_type)	r   Nr   r   r   FFNF)READ_DOC_TEMPLATErw   rH   rE   )r"   r   r   r   r(   r   r)   make_reader  s             r   pandasread_csv)backendnameZCSV
read_tableZ	delimitedread_fwfzfixed-width)	depend_onc             	   K   s,   |}| j |f| W 5 Q R X tj|jS r   )to_csvosr?   normpath)rC   Zfilr   r%   fr(   r(   r)   
_write_csv  s    r   utf-8wtc                    s  |r|dk	rt d|
dkr"|}
n|
s2|r2t dtf ||dd|	pFi }ttdd|  }|rt|fd|i|}|d	 |f }|d
dd }t|fd|i|}d d< |dd D ]}||fd|i }q|g}|g}npt|f||| jd|}|d	 |d	 f g}|
r,d d< |	 fddt
|dd |dd D  |r|dkrrt }|dk	rtd| dt |dk	r|ddk	r|d|krt d| d|d |dk	r|ddkr||d< d	dl}t|j||S |S dS )a  
    Store Dask DataFrame to CSV files

    One filename per partition will be created. You can specify the
    filenames in a variety of ways.

    Use a globstring::

    >>> df.to_csv('/path/to/data/export-*.csv')  # doctest: +SKIP

    The * will be replaced by the increasing sequence 0, 1, 2, ...

    ::

        /path/to/data/export-0.csv
        /path/to/data/export-1.csv

    Use a globstring and a ``name_function=`` keyword argument.  The
    name_function function should expect an integer and produce a string.
    Strings produced by name_function must preserve the order of their
    respective partition indices.

    >>> from datetime import date, timedelta
    >>> def name(i):
    ...     return str(date(2015, 1, 1) + i * timedelta(days=1))

    >>> name(0)
    '2015-01-01'
    >>> name(15)
    '2015-01-16'

    >>> df.to_csv('/path/to/data/export-*.csv', name_function=name)  # doctest: +SKIP

    ::

        /path/to/data/export-2015-01-01.csv
        /path/to/data/export-2015-01-02.csv
        ...

    You can also provide an explicit list of paths::

    >>> paths = ['/path/to/data/alice.csv', '/path/to/data/bob.csv', ...]  # doctest: +SKIP
    >>> df.to_csv(paths) # doctest: +SKIP

    You can also provide a directory name:

    >>> df.to_csv('/path/to/data') # doctest: +SKIP

    The files will be numbered 0, 1, 2, (and so on) suffixed with '.part':

    ::

        /path/to/data/0.part
        /path/to/data/1.part

    Parameters
    ----------
    df : dask.DataFrame
        Data to save
    filename : string or list
        Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
        to save to remote filesystems.
    single_file : bool, default False
        Whether to save everything into a single CSV file. Under the
        single file mode, each partition is appended at the end of the
        specified CSV file.
    encoding : string, default 'utf-8'
        A string representing the encoding to use in the output file.
    mode : str, default 'w'
        Python file mode. The default is 'w' (or 'wt'), for writing
        a new file or overwriting an existing file in text mode. 'a'
        (or 'at') will append to an existing file in text mode or
        create a new file if it does not already exist. See :py:func:`open`.
    name_function : callable, default None
        Function accepting an integer (partition index) and producing a
        string to replace the asterisk in the given filename globstring.
        Should preserve the lexicographic order of partitions. Not
        supported when ``single_file`` is True.
    compression : string, optional
        A string representing the compression to use in the output file,
        allowed values are 'gzip', 'bz2', 'xz',
        only used when the first argument is a filename.
    compute : bool, default True
        If True, immediately executes. If False, returns a set of delayed
        objects, which can be computed at a later time.
    storage_options : dict
        Parameters passed on to the backend filesystem class.
    header_first_partition_only : bool, default None
        If set to True, only write the header row in the first output
        file. By default, headers are written to all partitions under
        the multiple file mode (``single_file`` is False) and written
        only once under the single file mode (``single_file`` is True).
        It must be True under the single file mode.
    compute_kwargs : dict, optional
        Options to be passed in to the compute method
    kwargs : dict, optional
        Additional parameters to pass to :meth:`pandas.DataFrame.to_csv`.

    Returns
    -------
    The names of the file written if they were computed right away.
    If not, the delayed tasks associated with writing the files.

    Raises
    ------
    ValueError
        If ``header_first_partition_only`` is set to False or
        ``name_function`` is specified when ``single_file`` is True.

    See Also
    --------
    fsspec.open_files
    Nz9name_function is not supported under the single file modezDheader_first_partition_only cannot be False in the single file mode.rf   )r   encodingnewlineF)Zpurer   r   war!   r   r   )r   name_functionnumc                    s   g | ]\}}||f qS r(   r(   )r-   dr   r%   Zto_csv_chunkr(   r)   r/     s     zto_csv.<locals>.<listcomp>zThe 'scheduler' keyword argument for `to_csv()` is deprecated andwill be removed in a future version. Please use the `compute_kwargs` argument instead. For example, df.to_csv(..., compute_kwargs={scheduler: z})	schedulerzJDiffering values for 'scheduler' have been passed in.
scheduler argument: z
via compute_kwargs: )rO   r   r   r   Z
to_delayed	open_filereplacer
   Znpartitionsextendzipr   FutureWarningr:   r   r6   r   )rC   filenameZsingle_filer   r   r   r   r   r   r   Zheader_first_partition_onlyZcompute_kwargsr%   Zfile_optionsdfsZ
first_filevalueZappend_modeZappend_filer   r   filesr   r(   r   r)   r     s     (


	r   )_Frame)NNTFN)FNNNN)	r   Nr   r   r   FFNF)
Fr   r   NNTNNNN)Fr   collections.abcr   ior   warningsr   r   r   r   ImportErrorZnumpyrU   r   rR   Zfsspec.compressionr   Zfsspec.corer   r	   r   r
   Zfsspec.utilsr   Zpandas.api.typesr   r   r   r   r   Z	dask.baser   Z
dask.bytesr   Z	dask.corer   Zdask.dataframe.backendsr   Zdask.dataframe.io.ior   Zdask.dataframe.io.utilsr   Zdask.dataframe.utilsr   Zdask.delayedr   Z
dask.utilsr   r   r   r<   rN   r   r   r   r   r   r   r   r   r   Zregister_inplacer   r   r   r   r   Zdask.dataframe.corer   rH   r(   r(   r(   r)   <module>   s   
}     
4a     
q         
 IK"	          
 K