U
    /e                     @   sZ  d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZ d dl	Z
d dlZd dlZd dlmZ d dlmZ z@d dlZd dlmZ d dlmZmZmZmZ d dlmZmZ W n ek
r   Y nX d d	lm Z  d d
l!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z* d dl+m,Z,m-Z-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 e5 Z6dd Z7e7Z8G dd de"Z9dS )    N)OrderedDictdefaultdict)	ExitStack)parse)flatten)ParquetFile)ex_from_sepget_file_schemegroupby_types
val_to_num)make_part_filepartition_on_columns)tokenize)	Engine_get_aggregation_depth_normalize_index_columns_parse_pandas_metadata_process_open_file_options_row_groups_to_parts_set_gather_statistics_set_metadata_task_size_sort_and_analyze_paths)_is_local_fs_meta_from_dtypes_open_input_files)UNKNOWN_CATEGORIES)Delayed)natural_sort_keyc                    s  |dkri }|S t  }t  }td t| } |dkrt fdd| D }|D ]4\}}||t t| ||t | qRnVtdd | D }|D ]<\}}d| }||t t| ||t | q| D ]\}}	|| }
t	|	t	|
krVt  }|| D ]}|t|t | qdd	 |
 D }td
| t|	}t	|dkrdd	 |
 D }td|  qt dd	 | D }|S )a  
    Extract categorical fields and labels from hive- or drill-style paths.
    FixMe: This has been pasted from https://github.com/dask/fastparquet/pull/471
    Use fastparquet.api.paths_to_cats from fastparquet>0.3.2 instead.

    Parameters
    ----------
    paths (Iterable[str]): file paths relative to root
    file_scheme (str):

    Returns
    -------
    cats (OrderedDict[str, List[Any]]): a dict of field names and their values
    )simpleflatother/hivec                 3   s*   | ]"}  |D ]\}}||fV  qqd S N)findall).0pathkvs I/tmp/pip-unpacked-wheel-dbjnr7gq/dask/dataframe/io/parquet/fastparquet.py	<genexpr>H   s       z!_paths_to_cats.<locals>.<genexpr>c                 s   s6   | ].}t |d dd D ]\}}||fV  qqdS )r!   N)	enumeratesplit)r%   r&   ivalr+   r+   r,   r-   M   s      zdir%ic                 S   s&   g | ]}t |d kr|D ]}|qqS )   )len)r%   r'   cr+   r+   r,   
<listcomp>\   s
       z"_paths_to_cats.<locals>.<listcomp>z)Partition names map to the same value: %sr3   c                 S   s   g | ]}|d  qS r   r+   )r%   xr+   r+   r,   r6   d   s     z<Partition names coerce to values of different types, e.g. %sc                 S   s   g | ]\}}|t |fqS r+   )list)r%   keyr(   r+   r+   r,   r6   j   s     )r   r   toolzunique
setdefaultsetaddr   itemsr4   values
ValueErrorr
   warningswarn)pathsfile_schemecatsZraw_cats
partitionsr:   r2   Zi_valr1   r(   rawZconflicts_by_valueZraw_val	conflictsZvals_by_typeZexamplesr+   r)   r,   _paths_to_cats0   sN    

rK   c                   @   s   e Zd Zedd Zedd Zed"ddZedd	 Zed
d Zedd Z	edd Z
ed#ddZedd Zed$ddZed%ddZed&ddZed'ddZed(d d!ZdS ))FastParquetEnginec                  C   s  t |j}|r>|
r>|jr>|jd jd jr>t|jdd d|_i }|jr~|jr~|jdg D ] }d|kr\|dd||d < q\t|d	k}t	t }t	t }t	t }i }t
|jD ]\}}|r|rtj||rq|jd j}t|tr| n|}|dkr|s|j}|pd
}ntd|| rL|| || d d d	 |f n|| d|f |r|r|||j|jg d}n|j|jd}g }| D ]\}}|j| }|jjr2d}d}|jd | d dk	r|jd | | }|jd | | }n|| dkrz|jjj}|jjj}|dkr0|jjj}|dkrD|jjj}t|ttfrz||ddkrz|d}|d}t|tjrt || dd}t!j"||d}t!j"||d}||d}|s|	s|
s|dks|r||k rd}i }i } q|r|d |||d n|||g7 }|||< nZ|sd|	sd|
sd|jj#dkrdd}i }i } q|r~|d d|i n|dddg7 }q|r|| | |s|| t$| q|||||fS )zOrganize row-groups by file.r   c                 S   s   t | jd jS )Nr   )r   columns	file_pathr8   r+   r+   r,   <lambda>       z8FastParquetEngine._organize_row_groups.<locals>.<lambda>)r:   rM   
field_namepandas_typeNr3    zGlobal metadata structure is missing a file_path string. If the dataset includes a _metadata file, that file may have one or more missing file_path fields.r.   )Zfile_path_0num-rowstotal_byte_sizerM   )rU   rV   minmaxobjectbyteszutf-8tz)r[   F)namerW   rX   r\   )%r9   rG   
row_groupsrM   rN   sortedpandas_metadatagetintr   r/   fastparquetapiZfilter_out_cats
isinstancerZ   decodefnrB   appendnum_rowsrV   r@   	meta_data
statistics	min_value	max_valuerW   rX   	bytearraynpZ
datetime64getattrpdZ	TimestampZ
num_valuestuple) clspfsplit_row_groupsgather_statisticsstat_col_indicesfiltersdtypes	base_pathhas_metadata_file	chunksizeaggregation_depthZpqpartitionsrS   r5   Zsingle_rg_partsfile_row_groupsfile_row_group_statsfile_row_group_column_statsZ	cmax_lastrg	row_groupfpZfpathr*   Zcstatsr\   r1   columnZcminZcmaxr[   lastr+   r+   r,   _organize_row_groupst   s    


$











z&FastParquetEngine._organize_row_groupsc                 C   s   g }|D ]\}}|j | }|j}t|D ]X\}	}
|	r:d|
_|
j}d|_|j}|rnd|_d|_d|_	d|_
d|_d|_d|_d|_q(||_|| q|S )z]Turn a set of row-groups into bytes-serialized form
        using thrift via pickle.
        N)r]   rM   r/   rN   ri   key_value_metadatarj   Zdistinct_countrX   rW   rl   rk   	encodingsZtotal_uncompressed_sizeZencoding_statsrg   )rr   rs   filenamer]   real_row_groups_Z	rg_globalr   rM   r5   colZmdstr+   r+   r,   _get_thrift_row_groups*  s,    
z(FastParquetEngine._get_thrift_row_groupsNc                 C   sV   |r|  |||}d|fi}n4|jdd ||fD }	dd |D }
d|	|
fi}|S )z1Generate a partition-specific element of `parts`.piecec                 S   s   g | ]}|d kr|qS )rT   r+   r%   pr+   r+   r,   r6   d  s      z0FastParquetEngine._make_part.<locals>.<listcomp>c                 S   s   g | ]}|d  qS r7   r+   r%   r   r+   r+   r,   r6   e  s     )r   sepjoin)rr   r   Zrg_listfsrs   ry   rH   r   part	full_pathr]   r+   r+   r,   
_make_partO  s    zFastParquetEngine._make_partc                    s  | di }g }d}t|dkr|d r|d  d}|
sXj dg}|
s`|st  d\} }d}dD ],}z|| d}W q t	k
r   Y qX q|rȇ fd	d
|D }d}|rt
j dgfdji|nrFt|}fdd
|D }fdd
|D }|rF|g krFt	d dt
|d d fj d|t|}|_t||_|sT fdd
|D }nt|\} }d|k}|r|
r|d d} fdd
|D }|rt
j dgfdji|nHt|}t
|d d fj d||_t||_|sT| }t|	tj}jrfdd
jD }|si _n,t|tjkrt	djj ||| |||||||	||d|i|dS )NdatasetFr3   r   T	_metadata)root)r   _common_metadatac                    s   g | ]}j  |gqS r+   r   r   r%   rf   baser   r+   r,   r6     s     z;FastParquetEngine._collect_dataset_info.<locals>.<listcomp>	open_withc                    s   g | ]}|  r|qS r+   endswith)r%   r&   parquet_file_extensionr+   r,   r6     s    
 c                    s   g | ]}|  r|qS r+   r   r   r   r+   r,   r6     s     
 zLNo files satisfy the `parquet_file_extension` criteria (files must end with z).r   r   c                    s   g | ]}j  |gqS r+   r   r   r   r+   r,   r6     s     c                    s   g | ]}j  |gqS r+   r   r   r   r+   r,   r6     s     c                    s   g | ]}| j kr|qS r+   rM   r   )rs   r+   r,   r6     s     
 zNo partition-columns should be written in the 
file unless they are ALL written in the file.
This restriction is removed as of fastparquet 0.8.4
columns: {} | partitions: {})rs   rE   rz   partsr   r   ru   
categoriesindexrw   rt   r{   aggregate_filesr|   metadata_task_sizekwargs)popr4   isdirisfiler   r   r   findremoverB   r   openr	   rF   paths_to_catsrG   copyr   r9   formatrM   keys)rr   rE   r   r   r   ru   rw   rt   r{   r   ignore_metadata_filer   r   r   Zdataset_kwargsr   Z_metadata_existsfnsZ_update_pathsrf   Zlen0schemer|   Z_partitionsr+   )r   r   r   rs   r,   _collect_dataset_infoj  s    
	


 

   z'FastParquetEngine._collect_dataset_infoc                    s   |d }|d }|d }d }|j }|rDt|\}} }	||j n(g }|jt|j }dd |D  d g}	|d krt|dkrt|dkr|d d k	r|d }n|}t||||\}}|| }
d }t|t	r|}|d kr|j
}nt|tr|g}nt|}|r&t||
s&td|t|
||} fd	d| D }|pNd
}t|trb|g}|D ](}t||dd rf|| j||< qf|D ]"}||
krtjtgd||< q|jD ]&}||
krtj|j| d||< qt|
|||	}||d< ||d< ||d< ||d< ||d< |S )Nrs   r   r   c                 S   s   i | ]
}||qS r+   r+   )r%   r'   r+   r+   r,   
<dictcomp>  s      z5FastParquetEngine._create_dd_meta.<locals>.<dictcomp>r   r3   zAcategories not in available columns.
categories: {} | columns: {}c                    s   i | ]\}}  |||qS r+   )r`   )r%   r'   r(   Zstorage_name_mappingr+   r,   r   >  s     
 r+   numpy_dtype)r   rx   
index_colscategories_dict)r_   r   extendrG   rM   r9   r4   r   rd   dictr   strr>   intersectionrB   r   _dtypesr@   ro   r`   r   rp   ZCategoricalDtyper   r   )rr   dataset_infors   r   r   rM   Z	pandas_mdZindex_namesZcolumn_namesZcolumn_index_namesZall_columnsr   rx   r   indcatZcatcolmetar+   r   r,   _create_dd_meta  s    
   

 




z!FastParquetEngine._create_dd_metac           !      C   s  |d }|d }|d }|d }|d }|d }|d }|d }	|d	 }
|d
 }|d }|d }|d }|d }|d }|d }|d }t |d |}dd t|pg tdD }i }|	rt|dkr|ng }t|jD ] \}}||ks||kr|||< qt|	||||t||B }	|p||j|j	|
d|}|	dkrx|sxt
|trxt|rxt
|d trxdd |D g |fS |||	||||||j|j	|
d krdn|
|d}|s|dks|t|kr|r|n|}| ||\}}ng g  }}|ri }dt|| }g }ttdt||D ]8\}}|||f | j||||  |f||d < q dd  } | |f|d!| < td!| | \}}|||fS )"Nr   r   rE   rw   rs   rt   r{   ru   r   r|   r   r   rx   r   rz   r   r   c                 S   s   h | ]}|d  qS r7   r+   )r%   tr+   r+   r,   	<setcomp>y  s     z?FastParquetEngine._construct_collection_plan.<locals>.<setcomp>)	containerr3   )r   	root_catsroot_file_schemery   Fr   c                 S   s   g | ]}d |dfiqS )r   Nr+   )r%   r   r+   r+   r,   r6     s     z@FastParquetEngine._construct_collection_plan.<locals>.<listcomp>rT   )r   rt   ru   rw   rx   rv   r|   r{   r   r   ry   rz   zgather-pq-parts-r.   c                 S   s4   g g  }}| D ]\}}||7 }|r||7 }q||fS r#   r+   )Zparts_and_statsr   statsr   statr+   r+   r,   _combine_parts  s    

zDFastParquetEngine._construct_collection_plan.<locals>._combine_partszfinal-)r   r   r9   r4   r/   rM   r   r>   rG   rF   rd   r   _collect_file_partsr   rangerg   r   Zcompute)!rr   r   r   r   rE   rw   rs   rt   r{   ru   ry   r|   r   r   rx   r   rz   r   r   Zfilter_columnsrv   Z_index_colsr1   r\   common_kwargsdataset_info_kwargsZpf_or_pathsr   Zgather_parts_dskZfinalize_listZtask_iZfile_ir   r+   r+   r,   _construct_collection_planZ  s     


	
z,FastParquetEngine._construct_collection_planc                 C   s  |d }|d }|d }|d }|d }|d }|d }	|d }
| d	d }| d
d }| dd }|d }t|tjjst||j|d}|pi |_|r||_n|}| |||||||||	|

\}}}}}t	|||
||||| j
|||t|jdd	\}}||fS )Nr   rt   ru   rv   rw   rx   r{   r|   ry   r   r   rz   r   )r   rs   ry   rH   )Zmake_part_kwargs)r`   rd   rb   rc   r   r   rG   rF   r   r   r   r9   )rr   Zpf_or_filesr   r   rt   ru   rv   rw   rx   r{   r|   ry   r   r   rz   rs   r}   r~   r   r   r   r+   r+   r,   r     sp    
	
z%FastParquetEngine._collect_file_partsFc                 K   s   |rt d| ||||||||	|
||||}| |}| |\}}}|d }|dkrrd |jkrr|jd gdd t|r||d d< |d |d d< t|rt|d d	 d
kr|d }d |_d |j_d |_	||d d d< ||||fS )Nz@`use_nullable_dtypes` is not supported by the fastparquet enginer   FT)rM   Zinplacer   r   r|   r   r3   rs   parquet_file)
rB   r   r   r   rM   Zdropr4   r]   fmdZ_statistics)rr   r   rE   r   r   use_nullable_dtypesru   rw   rt   r{   r   r   r   r   r   r   r   r   r   r   rs   r+   r+   r,   read_metadata.  sD    
zFastParquetEngine.read_metadatac                 C   s   | t kS r#   )rL   )rr   r+   r+   r,   multi_supportt  s    zFastParquetEngine.multi_supportr+   c
              	      s  d}|sdn|	}	t |tr4|d gkr,g }d}||7 }|
dd  t |tsP|g}|d }t |trnt |d trL d kstg }dtdd |D f|j|	pdd|
di  |D ]|}t	|d	krΈ n&t|d f|j|	pdd|
di }t	|j
}|d	 ptt|}| fd
d|D 7 }|7 qt	|t	 j
k }nJ rg }|D ],}|d }t |tr|t|}||7 }qZd}ntd|rt^ |D ]6}|jD ](}|j}|rt |tr| |_qq| j_
 j}   | _W 5 Q R X |r$d jkr$dg}||7 }|p,i  _|r<| _ fdd _| j f||||d|
di S tdt| d S )NFTr   r   c                 S   s   g | ]}|d  qS r7   r+   r   r+   r+   r,   r6     s     z4FastParquetEngine.read_partition.<locals>.<listcomp>r   r   r3   c                    s   g | ]} j |  qS r+   )r]   r   r   Z	rg_offsetr+   r,   r6     s   z&Neither path nor ParquetFile detected!Z__index_level_0__c                     s    j S r#   )rx   )args)r   r+   r,   rP     rQ   z2FastParquetEngine.read_partition.<locals>.<lambda>)r   rM   r   r   readzExpected tuple, got )rd   r9   r   rq   r   AssertionErrorr   r   r`   r4   r]   r   rZ   pickleloadsrB   _FP_FILE_LOCKrM   rN   re   r   rG   
_set_attrsrF   r   pf_to_pandastype)rr   r   piecesrM   r   r   r   r   r   ry   r   Znull_index_namesampler]   r   Z_pfZn_local_row_groupsZlocal_rg_indicesZupdate_parquet_filergsr   chunkr*   Z	save_catsr+   r   r,   read_partitionx  s    












z FastParquetEngine.read_partitionc              	      s   d k	r d d   n|j t|j  |r@  fdd|D 7  |j}tdd |D }	||	 ||\}
}dtt}|D ]}||}|| | q|t	|ft
|rdddn,|tt |j d	d | D d
dd\}}t }t| tt| f|||d|D ]b\}}|| D ]N}|jfdd| D }|j| ||f||j|d| 7 q(qW 5 Q R X |
S )Nc                    s   g | ]}| kr|qS r+   r+   )r%   r1   r   r+   r,   r6     s      z2FastParquetEngine.pf_to_pandas.<locals>.<listcomp>c                 s   s   | ]}|j V  qd S r#   )rh   r   r+   r+   r,   r-     s     z1FastParquetEngine.pf_to_pandas.<locals>.<genexpr>r   FZ	readahead)Zallow_precachedefault_cachec                 S   s   g | ]}|qS r+   r+   )r%   r   r+   r+   r,   r6   *  s     rb   )metadatarM   r]   Zdefault_enginer   )r   Zcontext_stackprecache_optionsc                    s0   i | ](\}}|| d r|n|    qS )z-catdefr   )r%   r\   r(   )startthislenr+   r,   r   ?  s
   z2FastParquetEngine.pf_to_pandas.<locals>.<dictcomp>)Zassignpartition_metainfile)rM   r9   rG   r]   sumZpre_allocater   Zrow_group_filenamerg   r   r   r>   r   rA   r   zipr   r   rh   r@   Zread_row_group_filer   )rr   rs   r   rM   r   r   Zopen_file_optionsr   r   sizedfZviewsZ	fn_rg_mapr   rf   r   stackr   r   r+   )rM   r   r   r,   r     sx    




zFastParquetEngine.pf_to_pandasinferutf8c              	   K   sb  |
d krg }
|r|d krd}|j |dd |	dksHt|	trPd|	 krPtdd}|rz,tjj||jd}|	|j
|dg}W n ttfk
r   d}Y nX |r|jdkrtd	nt|jt|jt| kst|t|jkrtd
|jt|jn\t|jj|j ||j jk rTtdt|j t|j A n||j|  }|j}tj|j}|st|
|d gsd}|stj|}|
d |kr||
d  d d nd }|d }|d k	r|d |krtdn"tjj|j f|	|
|d|}d}|d k	rN|j!p.g }|"dd | D  ||_!d|i}||||fS )NTexist_okr   zM"infer" not allowed as object encoding, because this required data in memory.Fr   r   )r"   emptyr   z?Requested file scheme is hive, but existing file scheme is not.z5Appended columns not the same.
Previous: {} | New: {}zAppended dtypes differ.
{}r\   r   rX   r.   	divisionszThe divisions of the appended dataframe overlap with previously written divisions. If this is desired, set ``ignore_divisions=True`` to append anyway.
- End of last written partition: {old_end}
- Start of first new partition: {divisions[0]})object_encodingr   Zignore_columnsc                 S   s    g | ]\}}t jj||d qS )r:   valuerb   Zparquet_thriftZKeyValuer%   r:   r   r+   r+   r,   r6     s   z6FastParquetEngine.initialize_write.<locals>.<listcomp>r   )#mkdirsrd   r   rA   rB   rb   rc   r   r   existsr   r   OSErrorrF   r>   rM   rG   r   r9   rp   ZSeriesrx   locanyr@   r   writerZfind_max_partr]   r   Zsorted_partitioned_columnsZmake_metadata_metar   r   )rr   r   r   r&   rg   partition_onZignore_divisionsZdivision_infoschemar   r   custom_metadatar   Zmetadata_file_existsrs   r   Zi_offsetZminmaxZold_endr   ZkvmZextra_write_kwargsr+   r+   r,   initialize_writeV  s    


 &

z"FastParquetEngine.initialize_writec
              
      s8  t  |}|jD ]*}z|j |_W q tk
r8   Y qX q|	rb|d k	rb|jdd |	 D  |_t|spg }n|rʇ fdd}tt	j
tdkrt|||||| j|}nt||||| j| j|	}n\  j||gd$}t||_t|||j||d}W 5 Q R X |jD ]}||_q|g}|r0|S g S d S )Nc                 S   s    g | ]\}}t jj||d qS r   r  r  r+   r+   r,   r6     s   z5FastParquetEngine.write_partition.<locals>.<listcomp>c                    s    j | ddS )NTr   )r  rO   r   r+   r,   rP     rQ   z3FastParquetEngine.write_partition.<locals>.<lambda>z0.1.4wb)compressionr   )r   r  r\   re   AttributeErrorr   r@   r4   parse_versionrb   __version__r   r   r   r   rh   r   rM   rN   )rr   r   r&   r   r   r
  Zreturn_metadatar   r  r  r   r*   r   r  Zfilr   r   r+   r  r,   write_partition  sj    

       
    

z!FastParquetEngine.write_partitionc                 K   s   t  |}|j}|r||D ]6}	|	d k	rt|	trD|	D ]}
||
 q2q||	 q||_|j|dg}tjj	|||j
dd |j|dg}tjj	|||j
d d S )Nr   F)r   Zno_row_groupsr   r   )r   r]   rd   r9   rg   r   r   rb   r  Zwrite_common_metadatar   )rr   r   r   r   r&   rg   r   r	  r   r   rrf   r+   r+   r,   write_metadata   s&    

   z FastParquetEngine.write_metadata)NNNN)NNFNNFNNFNN)Fr+   NNN)NNNNN)FNFNr   r   NN)NNN)F)__name__
__module____qualname__classmethodr   r   r   r   r   r   r   r   r   r   r   r  r  r  r+   r+   r+   r,   rL   s   s   
 6
$    
 
T
 
L           E
           [        h	   @rL   ):r   r   	threadingrC   collectionsr   r   
contextlibr   Znumpyrn   Zpandasrp   Ztlzr;   Zpackaging.versionr   r  Z	dask.corer   rb   r   Zfastparquet.utilr   r	   r
   r   Zfastparquet.writerr   r   ImportErrorZ	dask.baser   Zdask.dataframe.io.parquet.utilsr   r   r   r   r   r   r   r   r   Zdask.dataframe.io.utilsr   r   r   Zdask.dataframe.utilsr   Zdask.delayedr   Z
dask.utilsr   RLockr   rK   r   rL   r+   r+   r+   r,   <module>   s8   ,?