U
    /e)E                     @   sT  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlZd dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZ d d	lmZ d d
lmZ d dlmZmZ d dlmZ d dlmZmZ d dl m!Z! e"de# Z$d)ddZ%d*ddZ&dZ'dZ(G dd deZ)e*dd+ddZ+d d! Z,d"d# Z-d$d% Z.d&d' Z/d d(lm0Z0 e&j1e0j&_1dS ),    Nfnmatch)glob)warn)build_name_functionstringify_path)merge)config)compute_as_if_collectionget_schedulernamed_schedulerstokenize)dataframe_creation_dispatch)	DataFrame)_linkfrom_map)DataFrameIOFunction)Delayeddelayed)get_scheduler_lockZ	processesc                 C   s.   |r|   z| || W 5 |r(|  X dS )z8A wrapper function around pd_to_hdf that enables lockingN)acquirerelease)	pd_to_hdflockargskwargs r   9/tmp/pip-unpacked-wheel-dbjnr7gq/dask/dataframe/io/hdf.py
_pd_to_hdf   s    
r   aFTc
              
      s&  |	dkri }	dt  j | jj}d}d}t|}t|trr|d|d dkr\t	ddd }d|krd	}n|ddkrt	d
dd }d|krd	}d|
kr|
d dkrt	d|dkrt	ddkrt
| jd |r|sfddt| jD }|t|krtd |dkr>tdds>|r>|r>d}t| g|d}|dkr||sbd}n|sx|tk	rxd}nd	}|rt| |d}|
d||d t  d}t||| jdf||||d|g|
f df< |
 }|rd|d< |rd|d< g }td| jD ]}|}|||| qtd| jD ]h}|}t||| j|f||||d|g|f}|r|r|d nd}t|f|f}| |f< q>t| j  |r|r҈| jd fg}nfddt| jD }|rtt |fd|i|	 |S t  fdd|D S dS ) a  Store Dask Dataframe to Hierarchical Data Format (HDF) files

    This is a parallel version of the Pandas function of the same name.  Please
    see the Pandas docstring for more detailed information about shared keyword
    arguments.

    This function differs from the Pandas version by saving the many partitions
    of a Dask DataFrame in parallel, either to many files, or to many datasets
    within the same file.  You may specify this parallelism with an asterix
    ``*`` within the filename or datapath, and an optional ``name_function``.
    The asterix will be replaced with an increasing sequence of integers
    starting from ``0`` or with the result of calling ``name_function`` on each
    of those integers.

    This function only supports the Pandas ``'table'`` format, not the more
    specialized ``'fixed'`` format.

    Parameters
    ----------
    path : string, pathlib.Path
        Path to a target filename. Supports strings, ``pathlib.Path``, or any
        object implementing the ``__fspath__`` protocol. May contain a ``*`` to
        denote many filenames.
    key : string
        Datapath within the files.  May contain a ``*`` to denote many locations
    name_function : function
        A function to convert the ``*`` in the above options to a string.
        Should take in a number from 0 to the number of partitions and return a
        string. (see examples below)
    compute : bool
        Whether or not to execute immediately.  If False then this returns a
        ``dask.Delayed`` value.
    lock : bool, Lock, optional
        Lock to use to prevent concurrency issues.  By default a
        ``threading.Lock``, ``multiprocessing.Lock`` or ``SerializableLock``
        will be used depending on your scheduler if a lock is required. See
        dask.utils.get_scheduler_lock for more information about lock
        selection.
    scheduler : string
        The scheduler to use, like "threads" or "processes"
    **other:
        See pandas.to_hdf for more information

    Examples
    --------
    Save Data to a single file

    >>> df.to_hdf('output.hdf', '/data')            # doctest: +SKIP

    Save data to multiple datapaths within the same file:

    >>> df.to_hdf('output.hdf', '/data-*')          # doctest: +SKIP

    Save data to multiple files:

    >>> df.to_hdf('output-*.hdf', '/data')          # doctest: +SKIP

    Save data to multiple files, using the multiprocessing scheduler:

    >>> df.to_hdf('output-*.hdf', '/data', scheduler='processes') # doctest: +SKIP

    Specify custom naming scheme.  This writes files as
    '2000-01-01.hdf', '2000-01-02.hdf', '2000-01-03.hdf', etc..

    >>> from datetime import date, timedelta
    >>> base = date(year=2000, month=1, day=1)
    >>> def name_function(i):
    ...     ''' Convert integer 0 to n to a string '''
    ...     return base + timedelta(days=i)

    >>> df.to_hdf('*.hdf', '/data', name_function=name_function) # doctest: +SKIP

    Returns
    -------
    filenames : list
        Returned if ``compute`` is True. List of file names that each partition
        is saved to.
    delayed : dask.Delayed
        Returned if ``compute`` is False. Delayed object to execute ``to_hdf``
        when computed.

    See Also
    --------
    read_hdf:
    to_parquet:
    Nzto-hdf-T*   zBA maximum of one asterisk is accepted in file path and dataset keyc                 S   s   |  d|S )Nr    )replace)pathi_namer   r   r   <lambda>       zto_hdf.<locals>.<lambda>Fz4A maximum of one asterisk is accepted in dataset keyc                 S   s   | S Nr   )r#   _r   r   r   r%      r&   format)ttablez.Dask only support 'table' format in hdf files.)r   wzr+z$Mode must be one of 'a', 'w' or 'r+'c                    s   g | ]} |qS r   r   .0i)name_functionr   r   
<listcomp>   s     zto_hdf.<locals>.<listcomp>zWTo preserve order between partitions name_function must preserve the order of its input	schedulerzsingle-threaded)collectionsr2   )r2   r+   )r)   modeappendr   r   r4   r5   c                    s   g | ]} |fqS r   r   r-   )namer   r   r1     s     c                    s   g | ]}t | qS r   )r   r.   k)dskr   r   r1   
  s     )!uuidZuuid1hexZ_partition_typeto_hdfr   
isinstancestrcount
ValueErrorr   Znpartitionsrangesortedr   r	   getr   MP_GETr   updatedictr   _namer"   copyr5   r   r   daskr
   r   r   )Zdfr#   keyr4   r5   r2   r0   Zcomputer   Zdask_kwargsr   r   Zsingle_fileZsingle_nodeZfmt_objZformatted_namesZ_actual_getr$   Zkwargs2	filenamesr/   ZtaskZlink_depkeysr   )r9   r6   r0   r   r<   *   s    c



  r<   z
This HDFStore is not partitionable and can only be use monolithically with
pandas.  In the future when creating HDFStores use the ``format='table'``
option to ensure that your dataset can be parallelizedz
The start and stop keywords are not supported when reading from more than
one file/dataset.

The combination is ambiguous because it could be interpreted as the starting
and stopping index per file, or starting and stopping index of the global
dataset.c                   @   s4   e Zd ZdZdd Zedd Zdd Zdd	 Zd
S )HDFFunctionWrapperzj
    HDF5 Function-Wrapper Class

    Reads HDF5 data from disk to produce a partition (given a key).
    c                 C   s8   || _ || _|| _|| _|r4|dkr4t|d|i| _d S )Nr!   columns)_columnsr   common_kwargsdimr   )selfrN   rQ   r   rP   r   r   r   __init__"  s    zHDFFunctionWrapper.__init__c                 C   s   | j S r'   )rO   )rR   r   r   r   rN   *  s    zHDFFunctionWrapper.columnsc                 C   s"   || j kr| S t|| j| j| jS )zUReturn a new HDFFunctionWrapper object with
        a sub-column projection.
        )rN   rM   rQ   r   rP   )rR   rN   r   r   r   project_columns.  s    
z"HDFFunctionWrapper.project_columnsc                 C   sN   |\}}}| j r| j   ztj||ft| j|}W 5 | j rH| j   X |S )zRead from hdf5 file with a lock)r   r   r   pdread_hdfr   rP   )rR   partr#   rJ   r   resultr   r   r   __call__6  s    

zHDFFunctionWrapper.__call__N)	__name__
__module____qualname____doc__rS   propertyrN   rT   rY   r   r   r   r   rM     s   
rM   pandas@B rc	                 C   s  |dkrt  }|dr|nd| }t| } t| trDtt| }	n| }	t| tsft|	dkrftd|	rvt|	dkrt	d|  |	D ]D}
zt
j|
}W n ttfk
r   d}Y nX |st	d|
 q|dks|dk	rt|	d	krtt|dkrtd
|dks|dk	r&|r&tdtj|	d |d}t||d }W 5 Q R X ztj|	d ||dd}W n* tk
r   tj|	d ||d}Y nX |dk	r|| }|jd	kr|j|d}nd|i}t|	||||||\}}tt||j|||||dt|	||||||ddS )a  
    Read HDF files into a Dask DataFrame

    Read hdf files into a dask dataframe. This function is like
    ``pandas.read_hdf``, except it can read from a single large file, or from
    multiple files, or from multiple keys from the same file.

    Parameters
    ----------
    pattern : string, pathlib.Path, list
        File pattern (string), pathlib.Path, buffer to read from, or list of
        file paths. Can contain wildcards.
    key : group identifier in the store. Can contain wildcards
    start : optional, integer (defaults to 0), row number to start at
    stop : optional, integer (defaults to None, the last row), row number to
        stop at
    columns : list of columns, optional
        A list of columns that if not None, will limit the return
        columns (default is None)
    chunksize : positive integer, optional
        Maximal number of rows per partition (default is 1000000).
    sorted_index : boolean, optional
        Option to specify whether or not the input hdf files have a sorted
        index (default is False).
    lock : boolean, optional
        Option to use a lock to prevent concurrency issues (default is True).
    mode : {'a', 'r', 'r+'}, default 'r'. Mode to use when opening file(s).
        'r'
            Read-only; no data can be modified.
        'a'
            Append; an existing file is opened for reading and writing,
            and if the file does not exist it is created.
        'r+'
            It is similar to 'a', but the file must already exist.

    Returns
    -------
    dask.DataFrame

    Examples
    --------
    Load single file

    >>> dd.read_hdf('myfile.1.hdf5', '/x')  # doctest: +SKIP

    Load multiple files

    >>> dd.read_hdf('myfile.*.hdf5', '/x')  # doctest: +SKIP

    >>> dd.read_hdf(['myfile.1.hdf5', 'myfile.2.hdf5'], '/x')  # doctest: +SKIP

    Load multiple datasets

    >>> dd.read_hdf('myfile.1.hdf5', '/*')  # doctest: +SKIP
    T/r   zNo files providedzFile(s) not found: Fz,File not found or insufficient permissions: Nr!   z$Chunksize must be a positive integerz_When assuming pre-partitioned data, data must be read in its entirety using the same chunksizesr4   )r4   stop)r6   r4   r4   zread-hdf)meta	divisionslabeltokenZenforce_metadata)r   
startswithr   r=   r>   rB   r   lenr@   OSErrorosr#   exists	TypeErrorNotImplementedErrorread_hdf_error_msgrU   HDFStore_expand_keyrV   
IndexErrorndimr6   _build_partsr   rM   r   )patternrJ   startrd   rN   	chunksizesorted_indexr   r4   pathsr#   rm   hdfZmeta_keyre   rP   partsrf   r   r   r   rV   D  sn    C



      rV   c              
   C   s   g }g }| D ]l}	t |	|||||\}
}}t|
||D ]B\}}}|rX|rX|dd | }n|r`|}|t|	|||| q4q||pdgt|d  fS )zG
    Build the list of partition inputs and divisions for read_hdf
    Nr!   )_get_keys_stops_divisionszipextend_one_path_one_keyrj   )rz   rJ   rw   rd   rx   ry   r4   r|   Zglobal_divisionsr#   rL   stopsrf   r8   sdr   r   r   ru     s$         
ru   c                    s:   ||krt d|| fddtt|| D S )zo
    Get the DataFrame corresponding to one path and one key (which
    should not contain any wildcards).
    z?Start row number ({}) is above or equal to stop row number ({})c                    s$   g | ]\}}||  d fqS )rw   rd   r   )r.   r/   r   rx   rJ   r#   r   r   r1     s   z%_one_path_one_key.<locals>.<listcomp>)r@   r)   	enumeraterA   )r#   rJ   rw   rd   rx   r   r   r   r     s     r   c                    sZ   dd l }| s gn< fdd| D  fdd|jjdddD  S )	Nr   c                    s   g | ]}t | r|qS r   r   r7   )rJ   r   r   r1     s     
 z_expand_key.<locals>.<listcomp>c                 3   s4   | ],}t |j r|jd kr|jkr|jV  qdS )r+   N)r   Z_v_pathnameZ_v_name)r.   nrJ   rL   r   r   	<genexpr>  s
   

z_expand_key.<locals>.<genexpr>rb   ZTable)	classname)r   	has_magicrL   r   _handleZ
walk_nodes)rJ   r{   r   r   r   r   rr     s    
rr   c              	      s   t j| |d}g }g }t||}	|	D ]}
||
  jdkrFtt|dkr\| j n&| jkrxt	d
 jn
|| |rԇ fddtd j|D } jd jd	  jd
d }|| || q&|d q&W 5 Q R X |	||fS )z
    Get the "keys" or group identifiers which match the given key, which
    can contain wildcards (see _expand_path). This uses the hdf file
    identified by the given path. Also get the index of the last row of
    data for each matched key.
    rc   r+   Nz0Stop keyword exceeds dataset number of rows ({})c                    s$   g | ]} j d ||d dd qS )indexr!   r   r   )read_column)r.   rw   Zstorerr   r   r1     s   z-_get_keys_stops_divisions.<locals>.<listcomp>r   r   r!   r   )rU   rq   rr   Z
get_storerZformat_typern   dont_use_fixed_error_messager5   Znrowsr@   r)   rA   r   )r#   rJ   rd   ry   rx   r4   r{   r   rf   rL   r8   divisionZdivision_endr   r   r   r~     s@    





  
r~   )_Frame)N)r   FNNTNN)r   NNr`   FTra   )2rl   r:   r   r   warningsr   r_   rU   Zfsspec.utilsr   r   Ztlzr   rI   r	   Z	dask.baser
   r   r   r   Zdask.dataframe.backendsr   Zdask.dataframe.corer   Zdask.dataframe.io.ior   r   Zdask.dataframe.io.utilsr   Zdask.delayedr   r   Z
dask.utilsr   rC   objectrD   r   r<   r   rp   rM   Zregister_inplacerV   ru   r   rr   r~   r   r]   r   r   r   r   <module>   sZ   
       
 d	)        )