U
    /e                  	   @  sz  d dl mZ d dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZmZmZmZ d dlZd dlZd dlmZ d d	lmZmZ d d
lmZmZ d dlmZ d dlm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z-m.Z.m/Z/m0Z0 d dl1m2Z2m3Z3 d dl4m5Z5 d dl6m7Z7 d dl8m9Z9m:Z:m;Z; erJd dl<Z<e Z=dHddZ>dIddZ?edJdddddddd d!Z@edKd"ddddd#dd$d!Z@dLd%ddddd&dd'd!Z@eAd(d)ddej fd*d+ZBdMd,d-ZCdNd.d/ZDd0d1 ZEdOd4d5ZFdPd6d7ZGd8d9 ZHe.dQd;d<d=dd&d>d?d@ZIdRdAdBZJG dCdD dDe+ZKe.dddddddEdFdGZLeHjMe jH_MeGjMe jG_MdS )S    )annotations)Iterable)partial)ceil)getitem)Lock)TYPE_CHECKINGr   LiteraloverloadN)is_dask_collectiontokenize)BlockwiseDepDict	blockwise)dataframe_creation_dispatch)	DataFrameIndexSeries_concat_emulateapply_and_enforcehas_parallel_typenew_dd_object)meta_lib_from_array)DataFrameIOFunction)
check_metainsert_meta_param_descriptionis_series_like	make_meta)Delayeddelayed)HighLevelGraph)DataFrameIOLayer)Mfuncnameis_arraylikec                   s  j dkrtdjf |dk	r:t|ts4td|j}|dkrNt }tj	dddk	r|dkrvt
j	j}nVt|rtdnBtfdd|D stt|j	j}td	j	 d
| j	j  fdd|D }nj dkrRt|s
|dkr|jg |j	|dS t|dkrH|jtjg j	d||dS tdntjd rltd|dkrj dkrt
tjd ndg}n4t|jd krtdt| djd  dj	gt| }dd t||D }|j|||dS )z8Create empty DataFrame or Series which has correct dtype   zEfrom_array does not input more than 2D array, got array with shape %rNz3'index' must be an instance of dask.dataframe.Indexnamesz+For a struct dtype, columns must be a list.c                 3  s   | ]}| j jkV  qd S N)dtyper&   ).0ix 8/tmp/pip-unpacked-wheel-dbjnr7gq/dask/dataframe/io/io.py	<genexpr>F   s     z#_meta_from_array.<locals>.<genexpr>zdtype z doesn't have fields c                   s$   g | ]}| kr | d  ndqS )r   Zf8r-   )r)   n)fieldsr-   r.   
<listcomp>J   s     z$_meta_from_array.<locals>.<listcomp>   )namer(   indexr(   )columnsr5   z?For a 1d array, columns must be a scalar or single element listz Shape along axis 1 must be knownr   z:Number of column names must match width of the array. Got z names for z columnsc                 S  s    i | ]\}}|t jg |d qS )r6   )nparray)r)   cdtr-   r-   r.   
<dictcomp>c   s      z$_meta_from_array.<locals>.<dictcomp>)ndim
ValueErrorshape
isinstancer   _metar   r   getattrr(   listr&   r8   Zisscalarallsortedset
differencer1   Z_constructor_slicedlenZ_constructorr9   isnanrangezip)r,   r7   r5   metaextraZdtypesdatar-   )r1   r,   r.   _meta_from_array/   sd    



     
&rO   P  c           
      C  s   t | tjrt| ||dS t| ||d}ttdt| |}|t| d f }t| ||}d| }i }tdt	t
t| | D ]b}t| t|| |d | f}	t|rt||	d|j|jf|||f< qt||	d|jf|||f< qt||||S )a|  Read any sliceable array into a Dask Dataframe

    Uses getitem syntax to pull slices out of the array.  The array need not be
    a NumPy array but must support slicing syntax

        x[50000:100000]

    and have 2 dimensions:

        x.ndim == 2

    or have a record dtype:

        x.dtype == [('name', 'O'), ('balance', 'i8')]

    Parameters
    ----------
    x : array_like
    chunksize : int, optional
        The number of rows per partition to use.
    columns : list or string, optional
        list of column names if DataFrame, single string if Series
    meta : object, optional
        An optional `meta` parameter can be passed for dask
        to specify the concrete dataframe type to use for partitions of
        the Dask dataframe. By default, pandas DataFrame is used.

    Returns
    -------
    dask.DataFrame or dask.Series
        A dask DataFrame/Series
    )r7   rL   rL   r   r3   zfrom_array-N)r@   daArrayfrom_dask_arrayrO   tuplerJ   rH   r   intr   r   slicer   typer(   r4   r7   r   )
r,   	chunksizer7   rL   	divisionstokenr4   dskr*   rN   r-   r-   r.   
from_arrayg   s    ! r]   Tzpd.DataFramez
int | Noneboolz
str | Noner   )rN   npartitionsrY   sortr4   returnc                 C  s   d S r'   r-   rN   r_   rY   r`   r4   r-   r-   r.   from_pandas   s    rc   z	pd.Seriesr   c                 C  s   d S r'   r-   rb   r-   r-   r.   rc      s    zpd.DataFrame | pd.SerieszDataFrame | Seriesc           	        s  t t ddtjrtdt s,td|dk|dkkrDtdt }|dkrht |t	sztdnt |t	sztdpdt
 || |std	f i ddgS  j  r̈ j std
|r jjs jdd t j||d\}}nR|dkr$t |t	stt	t|| }ttd	||t g }dgt| } fddtt|dd |dd D }t| |S )a{  
    Construct a Dask DataFrame from a Pandas DataFrame

    This splits an in-memory Pandas dataframe into several parts and constructs
    a dask.dataframe from those parts on which Dask.dataframe can operate in
    parallel.  By default, the input dataframe will be sorted by the index to
    produce cleanly-divided partitions (with known divisions).  To preserve the
    input ordering, make sure the input index is monotonically-increasing. The
    ``sort=False`` option will also avoid reordering, but will not result in
    known divisions.

    Note that, despite parallelism, Dask.dataframe may not always be faster
    than Pandas.  We recommend that you stay with Pandas for as long as
    possible before switching to Dask.dataframe.

    Parameters
    ----------
    data : pandas.DataFrame or pandas.Series
        The DataFrame/Series with which to construct a Dask DataFrame/Series
    npartitions : int, optional
        The number of partitions of the index to create. Note that if there
        are duplicate values or insufficient elements in ``data.index``, the
        output may have fewer partitions than requested.
    chunksize : int, optional
        The desired number of rows per index partition to use. Note that
        depending on the size and index of the dataframe, actual partition
        sizes may vary.
    sort: bool
        Sort the input by index first to obtain cleanly divided partitions
        (with known divisions).  If False, the input will not be sorted, and
        all divisions will be set to None. Default is True.
    name: string, optional
        An optional keyname for the dataframe.  Defaults to hashing the input

    Returns
    -------
    dask.DataFrame or dask.Series
        A dask DataFrame/Series partitioned along the index

    Examples
    --------
    >>> from dask.dataframe import from_pandas
    >>> df = pd.DataFrame(dict(a=list('aabbcc'), b=list(range(6))),
    ...                   index=pd.date_range(start='20100101', periods=6))
    >>> ddf = from_pandas(df, npartitions=3)
    >>> ddf.divisions  # doctest: +NORMALIZE_WHITESPACE
    (Timestamp('2010-01-01 00:00:00', freq='D'),
     Timestamp('2010-01-03 00:00:00', freq='D'),
     Timestamp('2010-01-05 00:00:00', freq='D'),
     Timestamp('2010-01-06 00:00:00', freq='D'))
    >>> ddf = from_pandas(df.a, npartitions=3)  # Works with Series too!
    >>> ddf.divisions  # doctest: +NORMALIZE_WHITESPACE
    (Timestamp('2010-01-01 00:00:00', freq='D'),
     Timestamp('2010-01-03 00:00:00', freq='D'),
     Timestamp('2010-01-05 00:00:00', freq='D'),
     Timestamp('2010-01-06 00:00:00', freq='D'))

    Raises
    ------
    TypeError
        If something other than a ``pandas.DataFrame`` or ``pandas.Series`` is
        passed in.

    See Also
    --------
    from_array : Construct a dask.DataFrame from an array that has record dtype
    read_csv : Construct a dask.DataFrame from a CSV file
    r5   Nz,Dask does not support MultiIndex Dataframes.z+Input must be a pandas DataFrame or Series.;Exactly one of npartitions and chunksize must be specified.zSPlease provide npartitions as an int, or possibly as None if you specify chunksize.zSPlease provide chunksize as an int, or possibly as None if you specify npartitions.zfrom_pandas-r   zIndex in passed data is non-numeric and contains nulls, which Dask does not entirely support.
Consider passing `data.loc[~data.isna()]` instead.T)Z	ascending)r_   rY   c                   s(   i | ] \}\}}|f j || qS r-   )Ziloc)r)   r*   startstoprN   r4   r-   r.   r<   .  s   
 zfrom_pandas.<locals>.<dictcomp>r3   )r@   rB   pdZ
MultiIndexNotImplementedErrorr   	TypeErrorr>   rH   rV   r   r   r5   ZisnaanyZ
is_numericZis_monotonic_increasingZ
sort_indexsorted_division_locationsAssertionErrorr   rC   rJ   	enumeraterK   )	rN   r_   rY   r`   r4   ZnrowsrZ   	locationsr\   r-   rg   r.   rc      sP    K


pandasr7   c                 C  s<   dd |   D }|r&td| dt|| ||||S )a  
    Construct a Dask DataFrame from a Python Dictionary

    Parameters
    ----------
    data : dict
        Of the form {field : array-like} or {field : dict}.
    npartitions : int
        The number of partitions of the index to create. Note that depending on
        the size and index of the dataframe, the output may have fewer
        partitions than requested.
    orient : {'columns', 'index', 'tight'}, default 'columns'
        The "orientation" of the data. If the keys of the passed dict
        should be the columns of the resulting DataFrame, pass 'columns'
        (default). Otherwise if the keys should be rows, pass 'index'.
        If 'tight', assume a dict with keys
        ['index', 'columns', 'data', 'index_names', 'column_names'].
    dtype: bool
        Data type to force, otherwise infer.
    columns: string, optional
        Column labels to use when ``orient='index'``. Raises a ValueError
        if used with ``orient='columns'`` or ``orient='tight'``.
    constructor: class, default pd.DataFrame
        Class with which ``from_dict`` should be called with.

    Examples
    --------
    >>> import dask.dataframe as dd
    >>> ddf = dd.from_dict({"num1": [1, 2, 3, 4], "num2": [7, 8, 9, 10]}, npartitions=2)
    c                 S  s   h | ]}t |rt|qS r-   )r   rX   )r)   vr-   r-   r.   	<setcomp>]  s      zfrom_dict.<locals>.<setcomp>zPfrom_dict doesn't currently support Dask collections as inputs. Objects of type z were given in the input dict.)valuesrj   rc   	from_dict)rN   r_   orientr(   r7   constructorZcollection_typesr-   r-   r.   ru   5  s    (
ru   c                 K  s(   t |trtj| }|| fd|i|S )a  Create a Dask partition for either a DataFrame or Series.

    Designed to be used with :func:`dask.blockwise.blockwise`. ``data`` is the array
    from which the partition will be created. ``index`` can be:

    1. ``None``, in which case each partition has an independent RangeIndex
    2. a `tuple` with two elements, the start and stop values for a RangeIndex for
       this partition, which gives a continuously varying RangeIndex over the
       whole Dask DataFrame
    3. an instance of a ``pandas.Index`` or a subclass thereof

    The ``kwargs`` _must_ contain an ``initializer`` key which is set by calling
    ``type(meta)``.
    r5   )r@   rU   ri   Z
RangeIndex)rN   r5   initializerkwargsr-   r-   r.   _partition_from_arrayj  s    

rz   c                 C  s  t | |||d}dt| | }| g}| j| jdkr6dndg}| j| ji}|dk	r|j| jd kr|d|j| jd }t||j}	|	| |
|jdg |jf||j< ntt| jrdgt| jd d	  }	nt| jd }
dg}	d}i }t| jd D ]@\}}||7 }|	| |f||f< ||
kr4|d	8 }|		| q |
t|d
dg t|rv| j|jt|d}n|jt|d}tt|df||dd|}tj|||d}t||||	S )a  Create a Dask DataFrame from a Dask Array.

    Converts a 2d array into a DataFrame and a 1d array into a Series.

    Parameters
    ----------
    x : da.Array
    columns : list or string
        list of column names if DataFrame, single string if Series
    index : dask.dataframe.Index, optional
        An optional *dask* Index to use for the output Series or DataFrame.

        The default output index depends on whether `x` has any unknown
        chunks. If there are any unknown chunks, the output has ``None``
        for all the divisions (one per chunk). If all the chunks are known,
        a default index with known divisions is created.

        Specifying `index` can be useful if you're conforming a Dask Array
        to an existing dask Series or DataFrame, and you would like the
        indices to match.
    meta : object, optional
        An optional `meta` parameter can be passed for dask
        to specify the concrete dataframe type to be returned.
        By default, pandas DataFrame is used.

    Examples
    --------
    >>> import dask.array as da
    >>> import dask.dataframe as dd
    >>> x = da.ones((4, 2), chunks=(2, 2))
    >>> df = dd.io.from_dask_array(x, columns=['a', 'b'])
    >>> df.compute()
         a    b
    0  1.0  1.0
    1  1.0  1.0
    2  1.0  1.0
    3  1.0  1.0

    See Also
    --------
    dask.bag.to_dataframe: from dask.bag
    dask.dataframe._Frame.values: Reverse conversion
    dask.dataframe._Frame.to_records: Reverse conversion
    rQ   zfrom-dask-array-r%   Zijr*   Nr   z@The index and array have different numbers of blocks. ({} != {})r3   )mapping)r(   r4   rx   )r7   rx   T)	numblocksZconcatenateZdependencies)rO   r   r4   r=   r|   r_   formatr>   rZ   appendextend_namer8   rI   sumr?   rH   chunksro   r   r   r(   rX   r7   r   rz   r    from_collectionsr   )r,   r7   r5   rL   r4   Zgraph_dependenciesZarrays_and_indicesr|   msgrZ   Z
n_elementsrf   Zindex_mappingr*   	incrementry   Zblkgraphr-   r-   r.   rT   ~  s^    - 


rT   c                 C  s   dS )zA dummy function to link results together in a graph

    We use this to enforce an artificial sequential ordering on tasks that
    don't explicitly pass around a shared resource
    Nr-   )r[   resultr-   r-   r.   _link  s    r   FrU   c                 C  s   t | tjr^|dkr(ttt| |S |dkr|rPdd t| d| j	D S | jddS nDt | tj
r|dkr|rt|  S t| S |dkr|  jddS d S )NrU   dictc                 S  s   g | ]\}}d |i|qS )r5   r-   )r)   rt   idxr-   r-   r.   r2     s   z_df_to_bag.<locals>.<listcomp>records)rv   )r@   ri   r   rC   maprU   Z
itertuplesrK   Zto_dictr5   r   itemsZto_frame)dfr5   r~   r-   r-   r.   
_df_to_bag  s    r   c                   s   ddl m} t| ttfs"tddt|    dkrH| j}| jn8 fddt	| 
 D }|| |  | 
  ||| jS )a1  Create Dask Bag from a Dask DataFrame

    Parameters
    ----------
    index : bool, optional
        If True, the elements are tuples of ``(index, value)``, otherwise
        they're just the ``value``.  Default is False.
    format : {"tuple", "dict", "frame"}, optional
        Whether to return a bag of tuples, dictionaries, or
        dataframe-like objects. Default is "tuple". If "frame",
        the original partitions of ``df`` will not be transformed
        in any way.


    Examples
    --------
    >>> bag = df.to_bag()  # doctest: +SKIP
    r   )Bagz%df must be either DataFrame or Serieszto_bag-framec                   s"   i | ]\}}|ft | fqS r-   )r   )r)   r*   blockr~   r5   r4   r-   r.   r<   (  s    zto_bag.<locals>.<dictcomp>)Zdask.bag.corer   r@   r   r   rk   r   Zdaskr   ro   Z__dask_keys__updateZ__dask_optimize__Z__dask_graph__r_   )r   r5   r~   r   r\   r-   r   r.   to_bag
  s    
r   c                 C  s   |  tjS )az  Create Dask Array from a Dask Dataframe

    Warning: This creates a dask.array without precise shape information.
    Operations that depend on shape information, like slicing or reshaping,
    will not work.

    Examples
    --------
    >>> df.to_records()  # doctest: +SKIP

    See Also
    --------
    dask.dataframe._Frame.values
    dask.dataframe.from_dask_array
    )Zmap_partitionsr"   
to_records)r   r-   r-   r.   r   0  s    r   from-delayedzEDelayed | distributed.Future | Iterable[Delayed | distributed.Future]z tuple | Literal['sorted'] | Nonestr)dfsrZ   prefixverify_metara   c              	     sf  ddl m  t|  s t| dr&| g}  fdd| D } | D ] }t| s<tdt|j q<|dkr|tt| d 	 }nt|}| stt|g} |dks|dkrdgt
| d	  }n$t|}t
|t
| d	 krtd
|d t|   }t|dtdd t| D dd|r tt|ddndd d}tt||| |||}	|dkrbddlm}
 |
|	S |	S )a  Create Dask DataFrame from many Dask Delayed objects

    Parameters
    ----------
    dfs :
        A ``dask.delayed.Delayed``, a ``distributed.Future``, or an iterable of either
        of these objects, e.g. returned by ``client.submit``. These comprise the
        individual partitions of the resulting dataframe.
        If a single object is provided (not an iterable), then the resulting dataframe
        will have only one partition.
    $META
    divisions :
        Partition boundaries along the index.
        For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions
        For string 'sorted' will compute the delayed values to find index
        values.  Assumes that the indexes are mutually sorted.
        If None, then won't use index information
    prefix :
        Prefix to prepend to the keys.
    verify_meta :
        If True check that the partitions have consistent metadata, defaults to True.
    r   r   keyc                   s,   g | ]$}t | s$t|d r$t|n|qS r   )r@   hasattrr   )r)   r   r   r-   r.   r2   e  s   z from_delayed.<locals>.<listcomp>zExpected Delayed object, got %sNrE   r3   z+divisions should be a tuple of len(dfs) + 1-c                 S  s   i | ]\}}|f|j qS r-   r   )r)   r*   inpr-   r-   r.   r<     s      z from_delayed.<locals>.<dictcomp>T)Zproduces_keysfrom_delayed)rL   r#   c                 S  s   | S r'   r-   r+   r-   r-   r.   <lambda>      zfrom_delayed.<locals>.<lambda>)r4   r7   inputsio_func)compute_and_set_divisions)dask.delayedr   r@   r   rk   rX   __name__r   r   ZcomputerH   rC   r>   r   r!   r   ro   r   r   r   r    r   Zdask.dataframe.shuffler   )r   rL   rZ   r   r   itemZdivsr4   layerr   r   r-   r   r.   r   C  sR    

   
r   c                   s^  |dk dkkrt dt| dr*|  nt| }t|t| k }d}|rt| drd| j|ddnt| j|dd}|ot||k}nd }}dd}|rt| |  t| | d	} fd
d}| d g}	dg}
|d}d}d}|r|t|	 nd}|t| k r:| | }|r|dkr>t|| | k d d }|rvt|| }||krv||| 8 }|| }| | }t|| }n|}||	d kr|r|d7 }|t|k rt|| nt| }n|d7 }q|r|||
d  |t|	d   }|r|d8 }|t	d|t|	|  }|	
| |

| d}q|	
| d  |

t|  |	|
fS )ai  Find division locations and values in sorted list

    Examples
    --------

    >>> L = ['A', 'B', 'C', 'D', 'E', 'F']
    >>> sorted_division_locations(L, chunksize=2)
    (['A', 'C', 'E', 'F'], [0, 2, 4, 6])

    >>> sorted_division_locations(L, chunksize=3)
    (['A', 'D', 'F'], [0, 3, 6])

    >>> L = ['A', 'A', 'A', 'A', 'B', 'B', 'B', 'C']
    >>> sorted_division_locations(L, chunksize=3)
    (['A', 'B', 'C', 'C'], [0, 4, 7, 8])

    >>> sorted_division_locations(L, chunksize=2)
    (['A', 'B', 'C', 'C'], [0, 4, 7, 8])

    >>> sorted_division_locations(['A'], chunksize=2)
    (['A', 'A'], [0, 1])
    Nrd   uniqueFsearchsortedleft)Zsider   Tc                   s    t | k  S r'   )rV   )indrY   Zresidualr-   r.   
chunksizes  s    z-sorted_division_locations.<locals>.chunksizesrh   r3   )r>   r   r   r8   rH   r   r9   rV   Znonzeromaxr   )seqr_   rY   Z
seq_unique
duplicatesZenforce_exactoffsetsZsubtract_driftr   rZ   rp   r*   r   ZdriftZdivs_remaindivZoffs_remainposr-   r   r.   rm     sl    


$
 

rm   c                   @  s6   e Zd ZdZdddZedd Zdd	 Zd
d ZdS )_PackedArgCallablea  Packed-argument wrapper for DataFrameIOFunction

    This is a private helper class for ``from_map``. This class
    ensures that packed positional arguments will be expanded
    before the underlying function (``func``) is called. This class
    also handles optional metadata enforcement and column projection
    (when ``func`` satisfies the ``DataFrameIOFunction`` protocol).
    NFc                 C  s6   || _ || _|| _|| _|| _|| _t| j t| _d S r'   )	funcargsry   rL   enforce_metadatapackedr@   r   is_dataframe_io_func)selfr   r   ry   rL   r   r   r-   r-   r.   __init__  s    	z_PackedArgCallable.__init__c                 C  s   | j r| jjS d S r'   )r   r   r7   )r   r-   r-   r.   r7   '  s    z_PackedArgCallable.columnsc                 C  s4   | j r0t| j|| j| j| j| | j| jdS | S )N)r   ry   rL   r   r   )	r   r   r   project_columnsr   ry   rL   r   r   )r   r7   r-   r-   r.   r   -  s    
z"_PackedArgCallable.project_columnsc                 C  sV   | j s|g}| jr:t|| jpg | j| jd| jp4i S | j|| jpHg | jpRi S )N)Z_funcrA   )r   r   r   r   r   rL   ry   )r   Z
packed_argr-   r-   r.   __call__9  s$    z_PackedArgCallable.__call__)NNNFF)	r   
__module____qualname____doc__r   propertyr7   r   r   r-   r-   r-   r.   r     s        

r   )r   rL   rZ   labelr[   r   c             
   O  s  t | stdt }	t|}t|D ]n\}
}t|tsJtdt| z|	t	| W q& t
tfk
r   t|||
< |	t	||
  Y q&X q&t	|	dkrtdnt	|	dkrtd|	dhkrtd|dd	}|d
d}|st	|dkr t	|dkrtd|d }d	}ntt| }d}|p<t| }|pXt| |||||f|}| d| }t| trz| jnd}|dkrt| f|r|d n
|dd |pg ddi|}d}nt|}d	}t|st|r|js|stdtt|g}t|}|s(|s(|s(|rHt| |||r:|nd||d}n| }t|||||||d}|pxdgt	|d  }tj||g d}t||||S )a  Create a DataFrame collection from a custom function map

    WARNING: The ``from_map`` API is experimental, and stability is not
    yet guaranteed. Use at your own risk!

    Parameters
    ----------
    func : callable
        Function used to create each partition. If ``func`` satisfies the
        ``DataFrameIOFunction`` protocol, column projection will be enabled.
    *iterables : Iterable objects
        Iterable objects to map to each output partition. All iterables must
        be the same length. This length determines the number of partitions
        in the output collection (only one element of each iterable will
        be passed to ``func`` for each partition).
    args : list or tuple, optional
        Positional arguments to broadcast to each output partition. Note
        that these arguments will always be passed to ``func`` after the
        ``iterables`` positional arguments.
    $META
    divisions : tuple, str, optional
        Partition boundaries along the index.
        For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions
        For string 'sorted' will compute the delayed values to find index
        values.  Assumes that the indexes are mutually sorted.
        If None, then won't use index information
    label : str, optional
        String to use as the function-name label in the output
        collection-key names.
    token : str, optional
        String to use as the "token" in the output collection-key names.
    enforce_metadata : bool, default True
        Whether to enforce at runtime that the structure of the DataFrame
        produced by ``func`` actually matches the structure of ``meta``.
        This will rename and reorder columns for each partition,
        and will raise an error if this doesn't work,
        but it won't raise if dtypes don't match.
    **kwargs:
        Key-word arguments to broadcast to each output partition. These
        same arguments will be passed to ``func`` for every output partition.

    Examples
    --------
    >>> import pandas as pd
    >>> import dask.dataframe as dd
    >>> func = lambda x, size=0: pd.Series([x] * size)
    >>> inputs = ["A", "B"]
    >>> dd.from_map(func, inputs, size=2).compute()
    0    A
    1    A
    0    B
    1    B
    dtype: object

    This API can also be used as an alternative to other file-based
    IO functions, like ``read_parquet`` (which are already just
    ``from_map`` wrapper functions):

    >>> import pandas as pd
    >>> import dask.dataframe as dd
    >>> paths = ["0.parquet", "1.parquet", "2.parquet"]
    >>> dd.from_map(pd.read_parquet, paths).head()  # doctest: +SKIP
                        name
    timestamp
    2000-01-01 00:00:00   Laura
    2000-01-01 00:00:01  Oliver
    2000-01-01 00:00:02   Alice
    2000-01-01 00:00:03  Victor
    2000-01-01 00:00:04     Bob

    Since ``from_map`` allows you to map an arbitrary function
    to any number of iterable objects, it can be a very convenient
    means of implementing functionality that may be missing from
    from other DataFrame-creation methods. For example, if you
    happen to have apriori knowledge about the number of rows
    in each of the files in a dataset, you can generate a
    DataFrame collection with a global RangeIndex:

    >>> import pandas as pd
    >>> import numpy as np
    >>> import dask.dataframe as dd
    >>> paths = ["0.parquet", "1.parquet", "2.parquet"]
    >>> file_sizes = [86400, 86400, 86400]
    >>> def func(path, row_offset):
    ...     # Read parquet file and set RangeIndex offset
    ...     df = pd.read_parquet(path)
    ...     return df.set_index(
    ...         pd.RangeIndex(row_offset, row_offset+len(df))
    ...     )
    >>> def get_ddf(paths, file_sizes):
    ...     offsets = [0] + list(np.cumsum(file_sizes))
    ...     return dd.from_map(
    ...         func, paths, offsets[:-1], divisions=offsets
    ...     )
    >>> ddf = get_ddf(paths, file_sizes)  # doctest: +SKIP
    >>> ddf.index  # doctest: +SKIP
    Dask Index Structure:
    npartitions=3
    0         int64
    86400       ...
    172800      ...
    259200      ...
    dtype: int64
    Dask Name: myfunc, 6 tasks

    See Also
    --------
    dask.dataframe.from_delayed
    dask.layers.DataFrameIOLayer
    z"`func` argument must be `callable`z2All elements of `iterables` must be Iterable, got r   z/`from_map` requires at least one Iterable inputr3   z)All `iterables` must have the same lengthz+All `iterables` must have a non-zero lengthproduces_tasksFcreation_infoNz9Multiple iterables not supported when produces_tasks=TrueTr   ZudfzMeta is not valid, `from_map` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns.)r   ry   rL   r   r   )r   r   r   r}   )callabler>   rF   rC   ro   r@   r   rX   addrH   AttributeErrorrk   poprK   r#   r   r   r7   r   r   r   r$   r?   r   r   r!   r    r   r   )r   r   rL   rZ   r   r[   r   	iterablesry   lengthsr*   iterabler   r   r   r   r4   Zcolumn_projectionZmeta_is_emulatedr   r   r   r-   r-   r.   from_mapK  s    |


     
	r   )NNN)rP   NN)NNTN)NNTN)NNTN)NN)NNN)FrU   )FrU   )NNr   T)NN)N
__future__r   collections.abcr   	functoolsr   mathr   operatorr   	threadingr   typingr   r	   r
   Znumpyr8   rq   ri   Z
dask.arrayr9   rR   Z	dask.baser   r   Zdask.blockwiser   r   Zdask.dataframe.backendsr   Zdask.dataframe.corer   r   r   r   r   r   r   r   Zdask.dataframe.dispatchr   Zdask.dataframe.io.utilsr   Zdask.dataframe.utilsr   r   r   r   r   r   r   Zdask.highlevelgraphr    Zdask.layersr!   Z
dask.utilsr"   r#   r$   ZdistributedlockrO   r]   rc   Zregister_inplaceru   rz   rT   r   r   r   r   r   rm   r   r   r   r-   r-   r-   r.   <module>   s   (

8
5             4

p	

&    Q
w? k
