U
    /e                     @   s^  d dl Z d dlZd dlmZ d dlmZ d dlZd dlZd dl	Z
d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZmZmZmZmZm Z m!Z! d d
l"m#Z#m$Z$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+m,Z, ee
j-Z.d dl/m0Z0m1Z1 d dl2m3Z3 d dl	m4Z5 d dl	m6Z7 e.edkZ8e.edkZ9e.edkZ:[.e
; e< e
= e> e
? e@ e
A eB e
C eD e
E eF e
G eH e
I eJ e
K eL e
M eN i
ZOereP eOe
Q < eR eOe
S < dd ZTdd ZUd,ddZVdd ZWG dd dZXd d! ZYd"d# ZZd$d% Z[d&d' Z\d(d) Z]G d*d+ d+eZ^dS )-    N)defaultdict)datetime)parse)tokenize)flatten)PANDAS_GT_120)pyarrow_schema_dispatch)Engine_get_aggregation_depth_normalize_index_columns_process_open_file_options_row_groups_to_parts_set_gather_statistics_set_metadata_task_size_sort_and_analyze_paths)_get_pyarrow_dtypes_is_local_fs_open_input_files)clear_known_categories)Delayed)
getargspecnatural_sort_key)expand_paths_if_neededstringify_path)ArrowFSWrapper)datasetfsz2.0.0z5.0.0c                 C   s   t | tr| jS | S )z5Return the wrapped filesystem if fs is ArrowFSWrapper)
isinstancer   r   r    r   C/tmp/pip-unpacked-wheel-dbjnr7gq/dask/dataframe/io/parquet/arrow.py_wrapped_fsA   s    r!   c              
   C   sR   z|  | W n> tk
rL } z dt|kr8td|n|W 5 d}~X Y nX dS )zmAppend row-group metadata and include a helpful
    error message if an inconsistent schema is detected.
    zrequires equal schemaszSchemas are inconsistent, try using `to_parquet(..., schema="infer")`, or pass an explicit pyarrow schema. Such as `to_parquet(..., schema={"column1": pa.string()})`N)Zappend_row_groupsRuntimeErrorstr)metadataZmderrr   r   r    _append_row_groupsF   s    r&   r   Tc
              
      s  |j |dd |r jdd  | jj  |r6t|ng }d}|rT j|dd d} fdd|D } j|dd} j|}t|d	kr|st	d
| j}| jjD ]}||kr|
||}qg }t|dkr|d	 n|}||D ]\}}t|ts|f}|jdd t||D }||||d}|j||g}|j |dd |j||g}||d(}tj||fd|	r|ndi|
 W 5 Q R X |	r|d |j||g q|S )zWrite table to a partitioned dataset with pyarrow.

    Logic copied from pyarrow.parquet.
    (arrow/python/pyarrow/parquet.py::write_to_dataset)

    TODO: Remove this in favor of pyarrow's `write_to_dataset`
          once ARROW-8244 is addressed.
    Texist_okinplaceFc                    s   g | ]} | qS r   r   ).0coldfr   r    
<listcomp>y   s     z&_write_partitioned.<locals>.<listcomp>columns)Zaxisr   z.No data left to save outside partition columns   c                 S   s   g | ]\}}| d | qS )=r   )r+   namevalr   r   r    r/      s     preserve_indexschemawbmetadata_collectorN)mkdirsreset_indexr7   nameslist	set_indexdropr0   len
ValueErrorremoveget_field_indexgroupbyr   tuplesepjoinzipopenpqwrite_tableset_file_path)tabler.   	root_pathfilenamepartition_colsr   Zpandas_to_arrow_tabler6   
index_colsreturn_metadatakwargspartition_keysZdata_dfZ	data_colsZ	subschemar,   md_listkeysZsubgroupsubdirZsubtableprefix	full_pathfr   r-   r    _write_partitionedX   s\      r\   c                 C   s8   | r(|dk	r(t t| |jt | kS | r0dS dS dS )z[Simple utility to check if all `index` columns are included
    in the known `schema`.
    NTF)rA   setintersectionr=   )indexr7   r   r   r    _index_in_schema   s
    r`   c                   @   s   e Zd ZdZdd ZdS )PartitionObjag  Simple class providing a `name` and `keys` attribute
    for a single partition column.

    This class was originally designed as a mechanism to build a
    duck-typed version of pyarrow's deprecated `ParquetPartitions`
    class. Now that `ArrowLegacyEngine` is deprecated, this class
    can be modified/removed, but it is still used as a convenience.
    c                 C   s   || _ t|| _d S N)r3   sortedrW   )selfr3   rW   r   r   r    __init__   s    zPartitionObj.__init__N)__name__
__module____qualname____doc__re   r   r   r   r    ra      s   	ra   c                 C   s   | j j| j| j| j|dS )z*Create new fragment with row-group subset.)
row_groups)formatZmake_fragmentpath
filesystempartition_expression)Zold_fragrj   r   r   r    _frag_subset   s    ro   c                 C   s6   | j dk	od| j k}|r.t| j d dS i S dS )z)Get pandas-specific metadata from schema.N   pandasutf8)r$   jsonloadsdecode)r7   has_pandas_metadatar   r   r    _get_pandas_metadata   s    rv   c              
   K   s  | di  }t|di ft|r0dddn||dgkr@|n|gddd\}}	| d	ddk}
trxd
|d
|
ini }t| gf||d|	d n}|dgkrtj|f|j	f |ddd|W  5 Q R  S tj|f|j
|f|ddd|W  5 Q R  S W 5 Q R X dS )zRead arrow table from file path.

    Used by `ArrowDatasetEngine._read_table` when no filters
    are specified (otherwise fragments are converted directly
    into tables).
    readopen_file_optionsFnone)Zallow_precachedefault_cacheNpyarrow)r0   rj   Zdefault_enginerz   method
pre_buffer)r   precache_optionsr   T)r0   use_threadsZuse_pandas_metadata)getcopyr   popr   pre_buffer_supportedr   rK   ParquetFilerw   Zread_row_groups)rl   r   rj   r0   r7   filtersrT   Zread_kwargsr~   rx   Zpre_buffer_defaultr}   filr   r   r    _read_table_from_path   s`    

r   c                    s2   t r( fdd}dd t|| D S  jS dS )ap  Custom version of pyarrow's RowGroupInfo.statistics method
    (https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset.pyx)

    We use col_indices to specify the specific subset of columns
    that we need statistics for.  This is more optimal than the
    upstream `RowGroupInfo.statistics` method, which will return
    statistics for all columns.
    c                    sV    j | }|j}|d ks |js$dS |j} j|}|dk rBdS |j|j|jdfS )N)NNr   )minmax)	r$   column
statisticsZhas_min_maxZpath_in_schemar7   rD   r   r   )ir,   statsr3   Zfield_index	row_groupr   r    
name_stats  s    z&_get_rg_statistics.<locals>.name_statsc                 S   s   i | ]\}}|d k	r||qS rb   r   )r+   r3   r   r   r   r    
<dictcomp>0  s    z&_get_rg_statistics.<locals>.<dictcomp>N)subset_stats_supportedmapvaluesr   )r   Zcol_indicesr   r   r   r    _get_rg_statistics  s    
r   c                 C   sL   |rdd t |tdD nt }| r:dd t | tdD nt }t|| S )Nc                 S   s   h | ]}t |r|d  qS r   rA   r+   vr   r   r    	<setcomp>@  s      z"_need_fragments.<locals>.<setcomp>	containerc                 S   s   h | ]}t |r|d  qS r   r   r   r   r   r    r   E  s      )r   r>   r]   bool)r   rU   rQ   Zfiltered_colsr   r   r    _need_fragments:  s    r   c                   @   s  e Zd Zedd Zed*ddZedd	 Zed+ddZed,ddZed-e	j
ejdddZed.ddZed/ddZedd Zed0ddZedd Zedd Zed1dd Zed!d" Zed2eje	j
d#d$d%Zed&d' Zed(d) ZdS )3ArrowDatasetEnginec           
         s  |d kr| dd}nd|kr&td|}t|tjs@|dkr
t|tttfrl|s\tddd |D }n
t|g}|dkrt	tj
|d d f |pi }t|}|d d	rt|tjrdd
lm} |  n| t|dd|d }	| fdd|	D |d|jifS t|||||S )Nrm   ZfsspeczXCannot specify a filesystem argument if the 'filesystem' dataset option is also defined.)Zarrowr{   zempty urlpath sequencec                 S   s   g | ]}t |qS r   )r   r+   ur   r   r    r/   p  s     z9ArrowDatasetEngine.extract_filesystem.<locals>.<listcomp>r   zC:)LocalFileSystemrbr1   c                    s   g | ]}  |qS r   )Z_strip_protocolr   Zfs_stripr   r    r/     s     Zopen_file_func)r   rB   r   pa_fsZ
FileSystemr>   rF   r]   r   typeZfrom_urir   
startswithr   Zfsspec.implementations.localr   Zopen_input_filer	   extract_filesystem)
clsZurlpathrm   dataset_optionsrx   Zstorage_optionsr   Z	fsspec_fsr   pathsr   r   r    r   V  sF    
z%ArrowDatasetEngine.extract_filesystemNFr   c                 K   sr   |  ||||||||	|
||||}| ||}| |\}}}t|rb||d d< |d |d d< ||||d fS )Nr   common_kwargsaggregation_depthr_   )_collect_dataset_info_create_dd_meta_construct_collection_planrA   )r   r   r   
categoriesr_   use_nullable_dtypesgather_statisticsr   split_row_groups	chunksizeaggregate_filesignore_metadata_filemetadata_task_sizeparquet_file_extensionrT   dataset_infometapartsr   r   r   r   r    read_metadata  s*    z ArrowDatasetEngine.read_metadatac                 C   s   | t kS rb   )r   )r   r   r   r    multi_support  s    z ArrowDatasetEngine.multi_supportr   c
              
   K   sB  t |tr&|D ]}||kr|| q| }t |ttfsv|rv|rv|jD ]"}||krb|| qJ|| qJ|ptd}t |ts|g}g }t|dk}|D ]r}t |tr|}d}d}n
|\}}}t |trt|}t |ts|g}| j	|||||	|||f|
}|r|| q|rt
|}| j||fd|i|
}|rt |tr|D ]D}||j jjdkrLtjtj|j||j jd|jd||j< qLt|jjt|}|s|r|jddd	 n|jddd	 nNt|jjt|kr |r |jddd	 n"|r"d}tt|t|jj }|t| }|r>||}|S )
z!Read in a single output partitionNr1   r   categoryr   r   r_   FT)r@   r*   )r   r>   appendr   rF   partition_namesrC   rA   r#   _read_tablepaZconcat_tables_arrow_table_to_pandasr3   dtypepdSeriesCategoricalrW   r   r_   r]   r=   issubsetr<   r?   )r   r   piecesr0   r_   r   r   
partitionsr   r7   rT   levelZcolumns_and_partsZ	part_nameZtablesZ
multi_readpiecepath_or_fragr   rU   arrow_tabler.   	partitionZindex_in_columns_and_partsr   r   r    read_partition  s    






	
 


z!ArrowDatasetEngine.read_partitioninferc
           !         s>  |dkst |trzt|	r$|j|	n|j }t |trvt|}|jD ]*}|	|}|	|}|
|||}qJ|}|j|dd |r|d krd}d }d  d}d}|r~tj|t|dd}t|j}|dkrzz<|j|j|dgd	d
}t|}W 5 Q R X | d}W nd tk
rv   z6|jt|jtdd d	d
}t| W 5 Q R X W n tk
rp   Y nX Y nX nd}|r( d k	r( j }|j}|jd k	od|jk}|rt|jd d}dd |d D }nd }t||}t
|t
|j t
| kr&t!d"|t#|j nDt$%|j&| || j'k( rjt!d"t
|) t
|j') A |d |kr|d}|s(d } fddt* j+D }|,|d }|D ]N}|-|}|j.r|d kr|j.j/}n"|j.j/|kr|j.j/}n
d } q q|d }|d k	r(|d |kr(t!d||	d} |||| fS )Nr   Tr'   r   Fparquet)rm   rk   	_metadatar   )modekeyr:   rp   rq   c                 S   s    g | ]}|d  dkr|d qS )pandas_typecategoricalr3   r   r+   cr   r   r    r/     s   z7ArrowDatasetEngine.initialize_write.<locals>.<listcomp>r0   z5Appended columns not the same.
Previous: {} | New: {}zAppended dtypes differ.
{}r3   c                 3   s   | ]}  |V  qd S rb   r   )r+   r   Ztail_metadatar   r    	<genexpr>  s   z6ArrowDatasetEngine.initialize_write.<locals>.<genexpr>	divisionszThe divisions of the appended dataframe overlap with previously written divisions. If this is desired, set ``ignore_divisions=True`` to append anyway.
- End of last written partition: {old_end}
- Start of first new partition: {divisions[0]})r7   rR   )0r   dictr   Z_meta_nonemptyr?   Zremove_metadatar   r7   r=   rD   r]   fieldr;   pa_dsr   r!   rA   filesrJ   rG   rH   rK   r   OSErrorrc   r   Zto_arrow_schemar$   rr   rs   rt   r   r0   rB   rk   r>   r   r   locdtypesanyitemsrangeZnum_row_groupsr_   r   r   r   )!r   r.   r   rl   r   partition_onZignore_divisionsZdivision_infor7   rR   rT   Zinferred_schemar3   r   jZfull_metadataZi_offsetZmetadata_file_existsdsr   Zarrow_schemar=   ru   pandas_metadatar   r   Zold_endrj   Zindex_col_ir   r   r   Zextra_write_kwargsr   r   r    initialize_writeI  s    






 

  






z#ArrowDatasetEngine.initialize_write)r.   returnc                 C   s   zt jj|d||dW S  t jk
r } z`|d kr4 t j|}t|jddd}t|jddd}td|d| d| d	d W 5 d }~X Y nX d S )
Nr1   )Znthreadsr6   r7   F)Zshow_schema_metadataz    z=Failed to convert partition to expected pyarrow schema:
    `z`

Expected partition schema:
z

Received partition schema:
z

This error *may* be resolved by passing in schema information for
the mismatched column(s) using the `schema` keyword in `to_parquet`.)	r   TableZfrom_pandasZArrowExceptionZSchematextwrapindentZ	to_stringrB   )r   r.   r6   r7   excZ	df_schemaexpectedactualr   r   r    _pandas_to_arrow_table  s0       
 
 z)ArrowDatasetEngine._pandas_to_arrow_tablec              	   K   sF  d }d}t |	|
r&|j|	dd d}ng }	| j|||
d}|r\|jj}|| |j|d}|rt||||||| j|f|	||d|}|r|d }tdt	|D ]}t
|||  qnbg }||j||gd	(}tj||f||r|nd d
| W 5 Q R X |r|d }|| |r>d|i}|r8|j|d< |gS g S d S )NFTr)   r5   )r$   )rR   compressionrS   r   r1   r8   )r   r9   r   r7   )r`   r?   r   r7   r$   updateZreplace_schema_metadatar\   r   rA   r&   rJ   rG   rH   rK   rL   rM   )r   r.   rl   r   rP   r   rS   Zfmdr   rR   r7   headZcustom_metadatarT   _metar6   tZ_mdrV   r   r   dr   r   r    write_partition  sh    

	


z"ArrowDatasetEngine.write_partitionc              	      s  |d d  dd }dd |D }|r|s|j|dg}ttjj  fdd| D }	||d}
tj||
f|	 W 5 Q R X |j|d	g}|r|d k	r|}d}n|d d d
 }d}t	|t
|D ]}t||| d d
  q||d}
||
 W 5 Q R X d S )Nr   r7   c                 S   s    g | ]}|d  d dk	r|qS )r   r   Nr   r+   pr   r   r    r/   6  s      z5ArrowDatasetEngine.write_metadata.<locals>.<listcomp>Z_common_metadatac                    s   i | ]\}}| kr||qS r   r   r+   kr   keywordsr   r    r   <  s       z5ArrowDatasetEngine.write_metadata.<locals>.<dictcomp>r8   r   r   r1   )r   rG   rH   r   rK   write_metadataargsr   rJ   r   rA   r&   write_metadata_file)r   r   r   r   rl   r   rT   r7   Zcommon_metadata_pathZkwargs_metar   metadata_pathr   Zi_startr   r   r  r    r  3  s&    z!ArrowDatasetEngine.write_metadatac           !         s"  d}d}| di }d|kr$d|d< d|kr8t |d< d}t|dkr||d rt||\}}}|j||d g}|j|d	g}|
s||rtj	|fd
t
|i|}d}n@rt|}fdd||D }|r|g krtd dnjt|dkrdt||\}}}|j|d	g}d	|krd|
sVtj	|fd
t
|i|}d}|d	 |}|dkrtj|fd
t
|i|}g }tt d}| D ]`}tr qt|j}|sΈ s q| D ]&\}}| | krֈ | | q֐q|j}|dk	rT|j}dd |j|jD }t t|krT fdd|D  tr|jjrtdd |jjD rt|jjj}t |D ]&\}}|t!||jj| "  qn(t }|D ]}|t!| |  qt#|	|} ||||j|||||||||	| |||d|i|dS )zpyarrow.dataset version of _collect_dataset_info
        Use pyarrow.dataset API to construct a dictionary of all
        general information needed to read the dataset.
        Nr   partitioningZhiverk   Fr1   r   r   rm   Tc                    s   g | ]}|  r|qS r   )endswith)r+   rl   )r   r   r    r/     s   
z<ArrowDatasetEngine._collect_dataset_info.<locals>.<listcomp>zLNo files satisfy the `parquet_file_extension` criteria (files must end with z).c                 S   s"   g | ]}d |kr| d d qS )r2   r   )split)r+   partr   r   r    r/     s   c                    s   i | ]}| kr| | qS r   r   )r+   r  )hive_categoriesr   r    r     s      z<ArrowDatasetEngine._collect_dataset_info.<locals>.<dictcomp>c                 s   s   | ]}|d k	V  qd S rb   r   )r+   arrr   r   r    r     s     z;ArrowDatasetEngine._collect_dataset_info.<locals>.<genexpr>)r   physical_schemahas_metadata_filer7   r   valid_pathsr   r   r_   r   r   r   r   r   r   r   r   rT   )$r   r   ZParquetFileFormatrA   isdirr   rG   rH   existsZparquet_datasetr!   findrB   rC   r   r   r>   get_fragmentspartitioning_supported_get_partition_keysrn   r   r   r7   r  rl   r  r]   r	  Zdictionariesallr=   	enumeratera   	to_pandasr
   )!r   r   r   r   r_   r   r   r   r   r   r   r   r   rT   r   r  Z_dataset_kwargsr  basefns	meta_pathZlen0partition_obj	file_fragrW   r  r   r  Zcat_keysr   r   r3   r   r   )r  r   r    r   Q  s    








 z(ArrowDatasetEngine._collect_dataset_infoc                    s  |d }|d }|d }|d }|d }| d|jd}t|pDi }	|	r|dkrg }|	d D ]*}
|
d	 d
kr^|
d |kr^||
d  q^|d  di  }| j| |||d t jj}t j	}|r|dgkr j
dd |dkr|r|dgks|	 ddr|}|pd}|r<|dgkr< j|dd |rfdd|D }|sxg }d|d< i |d< ||d< n"t|t|krtd|t||| ||\}}|| }|rt||std|t|t  fdd|D d |r|D ]}t|tr@|j|d kr@tjg |j|d d _nZ|j jjkrjtjg |j jjd _n0|j j	krtjtj|jg d jd |j< q||d< ||d< ||d<  S )z|Use parquet schema and hive-partition information
        (stored in dataset_info) to construct DataFrame metadata.
        r7   r_   r   r   r   r  Nr0   r   r   r3   rT   arrow_to_pandas)r   r   Tr)   Zindex_columnsr   c                    s   g | ]}| kr|qS r   r   r   )physical_column_namesr   r    r/   R  s      z6ArrowDatasetEngine._create_dd_meta.<locals>.<listcomp>rU   zNo partition-columns should be written in the 
file unless they are ALL written in the file.
physical columns: {} | partitions: {}zAcategories not in available columns.
categories: {} | columns: {}c                    s   g | ]}| j jkr|qS r   )r_   r=   r   )r   r   r    r/   r  s      )colsr   )r   r3   r   r   rR   )r   r=   rv   r   r   r   Zempty_tabler>   r_   r0   r<   r?   rA   rB   rk   r   r]   r^   r   r   r3   r   ZCategoricalIndexrW   r   r   )r   r   r   r7   r_   r   r  r   r0   r   r,   r   Zindex_namesZcolumn_namesrR   Z_partitionsZall_columnsr   r   )r   r!  r    r     s    


	

 	       
  
z"ArrowDatasetEngine._create_dd_metac           %         s  |d }|d  |d }|d }|d }|d }|d }|d }|d	 }	|d
 }
|d }|d }|d }|d |d }t |d  }t }|dk	rt|tdD ]8}|\}}}|dkrt|tttfstd|| qi }|rt|dkr|ng }t	|	j
D ]4\}}||ks||kr ||
kr*q |||< q t|||||t|}||||	d|}|dkr|s|sdd t|jtdD g |fS d}|dk	rt|} |||||	|||||d d}|s|dks|t|jkrtdd ||D d d! d}| ||\}}nt|jtd}rD fd"d|D }g g  }}|ri } d#t|| }g }!t	tdt||D ]8\}"}#|!||"f | j||#|#|  |f| |!d$ < q~d%d& }$|$|!f| d'| < td'| |  \}}|||fS )(aI  pyarrow.dataset version of _construct_collection_plan
        Use dataset_info to construct the general plan for
        generating the output DataFrame collection.

        The "plan" is essentially a list (called `parts`) of
        information that is needed to produce each output partition.
        After this function is returned, the information in each
        element of `parts` will be used to produce a single Dask-
        DataFrame partition (unless some elements of `parts`
        are aggregated together in a follow-up step).

        This method also returns ``stats`` (which is a list of
        parquet-metadata statistics for each element of parts),
        and ``common_metadata`` (which is a dictionary of kwargs
        that should be passed to the ``read_partition`` call for
        every output partition).
        r   r   r   r   r   r   r   rR   r7   r   r   r   r  r  rT   r   Nr   inz2Value of 'in' filter must be a list, set or tuple.r1   )r   r   r   r7   Fc                 S   s   g | ]}d |ddfiqS r   Nr   )r+   rZ   r   r   r    r/     s   zAArrowDatasetEngine._construct_collection_plan.<locals>.<listcomp>r   r   )r   r   r   r   
ds_filtersr7   stat_col_indicesr   r   r   r   r   c                 s   s   | ]
}|V  qd S rb   r   )r+   fragr   r   r    r     s     z@ArrowDatasetEngine._construct_collection_plan.<locals>.<genexpr>c                 S   s
   t | jS rb   )r   rl   )xr   r   r    <lambda>      z?ArrowDatasetEngine._construct_collection_plan.<locals>.<lambda>c                    s$   g | ]}|  jd  kr|qS )r:   )r  rG   )r+   Zfilefr   r  r   r    r/     s   zgather-pq-parts-r:   c                 S   s4   g g  }}| D ]\}}||7 }|r||7 }q||fS rb   r   )Zparts_and_statsr   r   r  statr   r   r    _combine_parts4  s    

zEArrowDatasetEngine._construct_collection_plan.<locals>._combine_partszfinal-)r   r]   r   r>   r   rF   	TypeErroraddrA   r  r=   r   rc   r   r   rK   _filters_to_expressionr  _collect_file_partsr   r   r   r   Zcompute)%r   r   r   r   r   r   r   r   rR   r7   r   r   r   r  rT   r   Zfilter_columnsfilterr,   opr4   r&  Z_index_colsr   r3   r   r%  dataset_info_kwargs
file_fragsr   r   	all_filesZgather_parts_dskZfinalize_listZtask_iZfile_ir-  r   r+  r    r     s     





	

z-ArrowDatasetEngine._construct_collection_planc           !         sV  |d }|d }|d }|d }|d }t |ts:|g}n|sFg g fS t |d tr|sv|sv|dkrvdd	 |D d fS ttj|fd
t|i| }n|}|d }	|d }
|d }|d }|d }|d }tt}tt}tt}t|dk}i }i }|D ] }|j	}t
|j  fdd	|D ||< |j|
|dD ]}|j}|sR|rz|d krj|  |j}t|sq:nd g||< q:|D ]}|| |j |rt||}|r||j|jg d}n|j|jd}g }| D ]}||kr|| d }|| d }t |tr$t|n|}t |tr>t|n|}||d } |	s|s|s|d ksz| r|| k rd}i }i } q|r|d |||d n|||g7 }|||< n(|r|d d|i n|d d d g7 }q|r|| | |s|| t| qq:q|s0g g fS t|||||||| j|||ddd	S )Nr   r   r   r   r   r   Fc                 S   s   g | ]}d |ddfiqS r$  r   )r+   Zfile_or_fragr   r   r    r/   \  s   z:ArrowDatasetEngine._collect_file_parts.<locals>.<listcomp>rm   r   r%  r7   r&  r   r   r1   c                    s   g | ]}|j  |j  fqS r   r3   r+   Z	hive_partZraw_keysr   r    r/     s    )r7   )Zfile_path_0num-rowstotal_byte_sizer0   )r:  r;  r   r   r0   )r3   r   r   r3    )r   rU   r  	data_path)Zmake_part_kwargs)r   r>   r#   r   r   r!   r  r   intrl   r  rn   Zsplit_by_row_grouprj   Zensure_complete_metadatarA   r   idr   Znum_rowsr;  rW   r   r   Z	Timestampr   rF   r   
_make_part)!r   Zfiles_or_fragsr4  r   r   r   r   r   r5  r   r%  r7   r&  r   r   Zfile_row_groupsZfile_row_group_statsZfile_row_group_column_statsZsingle_rg_partsZhive_partition_keysZ	cmax_lastr  Zfpathr'  Zrow_group_infor   r   sZcstatsr3   ZcminZcmaxlastr   r9  r    r1  A  s    










z&ArrowDatasetEngine._collect_file_partsc           	      C   sD   |j dd ||fD }||d}|r6|dkr6dS d|||fiS )z1Generate a partition-specific element of `parts`.c                 S   s   g | ]}|d kr|qS )r<  r   r   r   r   r    r/     s      z1ArrowDatasetEngine._make_part.<locals>.<listcomp>Nr   )rG   rH   r   )	r   rP   Zrg_listr   rU   r  r=  rZ   Zpkeysr   r   r    r@    s
    zArrowDatasetEngine._make_partc	                    s  t |tjr|}
nd}
|	di dd}|r6|dksD|rt||rtj|fdt|i|	di }t| }t	|dkst
|dgkrt|d |n|d }
t|
j  fdd|D }|
r g }|D ],}|dkrd	|jkr|d	 q|| q|
jd
|||rt|ndd}nt||||||f|	}|rt |trdd |D }|D ]d}|j|jjkrZ||jd}tjt	||j|dd}tj|t|j}||j|}qZ|S )zRead in a pyarrow tableNr   r	  rm   r1   r   c                    s   g | ]}|j  |j  fqS r   r7  r8  r9  r   r    r/   4  s   z2ArrowDatasetEngine._read_table.<locals>.<listcomp>Z__index_level_0__F)r   r7   r0   r2  c                 S   s   i | ]\}}||qS r   r   r  r   r   r    r   V  s      z2ArrowDatasetEngine._read_table.<locals>.<dictcomp>i4)r   )r   r   ZParquetFileFragmentr   r   r   r!   r>   r  rA   AssertionErrorro   r  rn   r=   r   Zto_tablerK   r0  r   r3   r7   npfullrW   r_   r   ZDictionaryArrayZfrom_arraysarrayZappend_column)r   r   r   rj   r0   r7   r   r   rU   rT   r'  r	  r   Zfragsr"  r3   r   Z	keys_dictr   catZcat_indr  r   r9  r    r     s    


 
  
zArrowDatasetEngine._read_table)r   r   c                    s|   | di }|ddd |rh|dkr0tj  ndd  d|kr`|d  fdd	}||d< n |d< |jf d
|i|S )Nr   F)r   Zignore_metadatapandasc                 S   s$   | t  krtdS t| S d S )Nr{   )r   stringr   StringDtypeZ
ArrowDtype)Zpyarrow_dtyper   r   r    default_types_mappert  s    
zGArrowDatasetEngine._arrow_table_to_pandas.<locals>.default_types_mappertypes_mapperc                    s   | p | S rb   r   )Zpa_typerL  rM  r   r    _types_mapper  s    z@ArrowDatasetEngine._arrow_table_to_pandas.<locals>._types_mapperr   )r   r   PYARROW_NULLABLE_DTYPE_MAPPINGr  )r   r   r   r   rT   _kwargsrO  r   rN  r    r   f  s    
z)ArrowDatasetEngine._arrow_table_to_pandasc              	   C   s6   | |d}t|j}W 5 Q R X |r2|| |S )Nr   )rJ   rK   r   r$   rM   )r   rl   r   	file_pathr[   r   r   r   r    collect_file_metadata  s
    
z(ArrowDatasetEngine.collect_file_metadatac              	   C   sp   d }|D ]}|rt || q|}q|rh|j|dg}||d}|sPtd|| W 5 Q R X d S |S d S )Nr   r8   zCannot write empty metdata!)r&   rG   rH   rJ   rB   r  )r   Z	meta_listr   Zout_pathr   r   r  r   r   r   r    aggregate_metadata  s    z%ArrowDatasetEngine.aggregate_metadata)NNFNNFNNFr   N)Fr   r   NN)FNFNr   N)FN)NNNNFN)F)F)NNNN)F)rf   rg   rh   classmethodr   r   r   r   r   r   Z	DataFramer   r   r   r   r  r   r   r   r1  r@  r   r   rS  rT  r   r   r   r    r   P  s   
<           2
     ~          	      H
 Dy
 2
 ,    
d #
r   )r   T)_rr   r   collectionsr   r   ZnumpyrE  rI  r   r{   r   Zpyarrow.parquetr   rK   Zpackaging.versionr   parse_versionZ	dask.baser   Z	dask.corer   Zdask.dataframe._compatr   Zdask.dataframe.backendsr   Zdask.dataframe.io.parquet.utilsr	   r
   r   r   r   r   r   r   Zdask.dataframe.io.utilsr   r   r   Zdask.dataframe.utilsr   Zdask.delayedr   Z
dask.utilsr   r   __version__Z_pa_versionZfsspec.corer   r   Zfsspec.implementations.arrowr   r   r   r   r   r   r   r  Zint8Z	Int8DtypeZint16Z
Int16DtypeZint32Z
Int32DtypeZint64Z
Int64DtypeZuint8Z
UInt8DtypeZuint16ZUInt16DtypeZuint32ZUInt32DtypeZuint64ZUInt64DtypeZbool_ZBooleanDtyperJ  rK  rP  ZFloat32DtypeZfloat32ZFloat64DtypeZfloat64r!   r&   r\   r`   ra   ro   rv   r   r   r   r   r   r   r   r    <module>   s~   (

            
G

D'