U
    /e(                     @   s   d dl Z d dlZd dlmZ d dlZd dlmZ d dlm	Z
 d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZ dddZdd ZededddddddddejddfddZd ddZdd Zdd ZdS )!    N)zip_longest)
open_files)compute)
read_bytes)flatten)dataframe_creation_dispatch)from_delayed)insert_meta_param_description	make_meta)delayedrecordsTutf-8strictc                    s   |dkr|dk}|dkr$|r$t d| d< |o6|dk d< t|df|||
| j|d|pZi } fdd	t||  D }|r|	dkrt }	tt||	S |S dS )
a  Write dataframe into JSON text files

    This utilises ``pandas.DataFrame.to_json()``, and most parameters are
    passed through - see its docstring.

    Differences: orient is 'records' by default, with lines=True; this
    produces the kind of JSON output that is most common in big-data
    applications, and which can be chunked when reading (see ``read_json()``).

    Parameters
    ----------
    df: dask.DataFrame
        Data to save
    url_path: str, list of str
        Location to write to. If a string, and there are more than one
        partitions in df, should include a glob character to expand into a
        set of file names, or provide a ``name_function=`` parameter.
        Supports protocol specifications such as ``"s3://"``.
    encoding, errors:
        The text encoding to implement, e.g., "utf-8" and how to respond
        to errors in the conversion (see ``str.encode()``).
    orient, lines, kwargs
        passed to pandas; if not specified, lines=True when orient='records',
        False otherwise.
    storage_options: dict
        Passed to backend file-system implementation
    compute: bool
        If true, immediately executes. If False, returns a set of delayed
        objects, which can be computed at a later time.
    compute_kwargs : dict, optional
        Options to be passed in to the compute method
    compression : string or None
        String like 'gzip' or 'xz'.
    name_function : callable, default None
        Function accepting an integer (partition index) and producing a
        string to replace the asterisk in the given filename globstring.
        Should preserve the lexicographic order of partitions.
    Nr   ;Line-delimited JSON is only available withorient="records".orientlineswt)encodingerrorsname_functionnumcompressionc                    s    g | ]\}}t t|| qS  )r   write_json_partition).0outfiledkwargsr   :/tmp/pip-unpacked-wheel-dbjnr7gq/dask/dataframe/io/json.py
<listcomp>W   s   zto_json.<locals>.<listcomp>)
ValueErrorr   ZnpartitionszipZ
to_delayeddictlistdask_compute)dfurl_pathr   r   storage_optionsr   r   r   r   Zcompute_kwargsr   r   Zoutfilespartsr   r   r   to_json   s8    4

r*   c              	   C   s,   |}| j |f| W 5 Q R X tj|jS N)r*   ospathnormpath)r&   Zopenfiler   fr   r   r   r   c   s    r   pandasi   ZinferFc              	      s  dkrdkdkr$r$t d|r<dks4s<t d|pBi }dkrPddkr`dd |r<t| d	f|||d
|}r|\}}}|d }tfdd|D 	tfddt||D }n|\}}d}d}d	t|}dkrt| |	t 	fddt||D }nPt	| df |d|}tfdd|D 		fdd|D }t
|dS )a+  Create a dataframe from a set of JSON files

    This utilises ``pandas.read_json()``, and most parameters are
    passed through - see its docstring.

    Differences: orient is 'records' by default, with lines=True; this
    is appropriate for line-delimited "JSON-lines" data, the kind of JSON output
    that is most common in big-data scenarios, and which can be chunked when
    reading (see ``read_json()``). All other options require blocksize=None,
    i.e., one partition per input file.

    Parameters
    ----------
    url_path: str, list of str
        Location to read from. If a string, can include a glob character to
        find a set of file names.
        Supports protocol specifications such as ``"s3://"``.
    encoding, errors:
        The text encoding to implement, e.g., "utf-8" and how to respond
        to errors in the conversion (see ``str.encode()``).
    orient, lines, kwargs
        passed to pandas; if not specified, lines=True when orient='records',
        False otherwise.
    storage_options: dict
        Passed to backend file-system implementation
    blocksize: None or int
        If None, files are not blocked, and you get one partition per input
        file. If int, which can only be used for line-delimited JSON files,
        each partition will be approximately this size in bytes, to the nearest
        newline character.
    sample: int
        Number of bytes to pre-load, to provide an empty dataframe structure
        to any blocks without data. Only relevant when using blocksize.
    encoding, errors:
        Text conversion, ``see bytes.decode()``
    compression : string or None
        String like 'gzip' or 'xz'.
    engine : function object, default ``pd.read_json``
        The underlying function that dask will use to read JSON files. By
        default, this will be the pandas JSON reader (``pd.read_json``).
    include_path_column : bool or str, optional
        Include a column with the file path where each row in the dataframe
        originated. If ``True``, a new column is added to the dataframe called
        ``path``. If ``str``, sets new column name. Default is ``False``.
    path_converter : function or None, optional
        A function that takes one argument and returns a string. Used to convert
        paths in the ``path`` column, for instance, to strip a common prefix from
        all the paths.
    $META

    Returns
    -------
    dask.DataFrame

    Examples
    --------
    Load single file

    >>> dd.read_json('myfile.1.json')  # doctest: +SKIP

    Load multiple files

    >>> dd.read_json('myfile.*.json')  # doctest: +SKIP

    >>> dd.read_json(['myfile.1.json', 'myfile.2.json'])  # doctest: +SKIP

    Load large line-delimited JSON files using partitions of approx
    256MB size

    >> dd.read_json('data/file*.csv', blocksize=2**28)
    Nr   r   zSJSON file chunking only allowed for JSON-linesinput (orient='records', lines=True).Tr-   c                 S   s   | S r+   r   )xr   r   r   <lambda>       zread_json.<locals>.<lambda>   
)	blocksizesampler   Zinclude_pathr   c                 3   s   | ]} |V  qd S r+   r   )r   ppath_converterr   r   	<genexpr>   s     zread_json.<locals>.<genexpr>c                 3   s$   | ]\}} |gt | V  qd S r+   )len)r   r7   chunkr8   r   r   r:      s    r+   c                    s.   g | ]&\}}t t| |d 	qS )meta)r   read_json_chunk)r   r<   r-   )r   enginer   include_path_columnr   r>   
path_dtyper   r   r       s   zread_json.<locals>.<listcomp>rt)r   r   r   c                 3   s   | ]} |j V  qd S r+   )r-   r   r/   r8   r   r   r:     s     c                    s,   g | ]$}t t| |jqS r   )r   read_json_filer-   rD   )r@   rA   r   r   r   r9   rB   r   r   r      s   r=   )r!   r   pdZCategoricalDtyper   r"   r?   r
   r   r   r   )r'   r   r   r(   r5   r6   r   r   r   r>   r@   rA   r9   r   Zb_outfirstchunkspathsZ
first_pathZ
flat_pathsZflat_chunksr)   filesr   )
r   r@   r   rA   r   r   r>   r   r9   rB   r   	read_jsoni   s    Y	



rK   c	                 C   sZ   t | ||}	|	d ||	fddd|}
|d k	rD|
jrD|S |rVt|
|||}
|
S )Nr   r   Tr   r   )ioStringIOdecodeseekemptyadd_path_column)r<   r   r   r@   column_namer-   rB   r   r>   sr&   r   r   r   r?   !  s    
r?   c           
   	   C   s<   | }||f||d|}	W 5 Q R X |r8t |	|||}	|	S )NrL   )rR   )
r/   r   r   r@   rS   r-   rB   r   	open_filer&   r   r   r   rE   0  s
     rE   c                 C   s>   || j krtd| d| jf |tj|gt|  |diS )Nz(Files already contain the column name: 'z^', so the path column cannot use this name. Please set `include_path_column` to a unique name.)dtype)columnsr!   ZassignrF   ZSeriesr;   )r&   rS   r-   rV   r   r   r   rR   8  s
    

rR   )	r   NNTr   r   NNN)N)rM   r,   	itertoolsr   r0   rF   Zfsspec.corer   Z	dask.baser   r%   Z
dask.bytesr   Z	dask.corer   Zdask.dataframe.backendsr   Zdask.dataframe.io.ior   Zdask.dataframe.utilsr	   r
   Zdask.delayedr   r*   r   Zregister_inplacerK   r?   rE   rR   r   r   r   r   <module>   sT            
R 8 
