U
    /e                     @   sr  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ edZ	edZ
edZed d dlZd dlZd dlmZmZ d dlmZ ejjejd	kd
dejjeddgZedd Zejde_e Zejdddd Zej ddej dddd Z!ej dddd Z"ej ddej dddd Z#dd Z$ejje d dd!d" Z%dS )#    N)
timeserieszdask.dataframepysparkpyarrowfastparquet)PANDAS_GT_150PANDAS_GT_200)	assert_eqlinuxzAUnnecessary, and hard to get spark working on non-linux platforms)reasonz/pyspark doesn't yet have support for pandas 2.0Z1H)freqUTCmodule)Zscopec                  c   s^   t t j} tjjjdd	dd
 }|V  |  t t krZt  t j|  d S )NlocalzDask Testingzspark.sql.session.timeZoner   )signal	getsignalSIGINTr   sqlZSparkSessionZbuilderZmasterZappNameconfigZgetOrCreatestop	threadingcurrent_threadmain_thread)prevZspark r   @/tmp/pip-unpacked-wheel-dbjnr7gq/dask/tests/test_spark_compat.pyspark_session(   s     r   npartitions)      
   engine)r   r   c                 C   sl   t |}| t}||jj|dd tj||d}|j|j	j
dd}|j|ksZtt|tdd d S )N	overwritemoder    r   	timestampFZcheck_index)strcreateDataFramepdfrepartitionwriteparquetddread_parquetassignr&   dttz_localizer   AssertionErrorr   )r   r   tmpdirr    sdfddfr   r   r   $test_roundtrip_parquet_spark_to_dask>   s    
r7   c                 C   s   t |}| t}|jj|ddd tj||d}|j|jj	
dd}| jdd}|j|jd	d
}t|tjdddd d S )Nr!   name)r#   ZpartitionByr$   r   r%   r   )Zaxisr(   )r8   Fr'   )r(   r)   r*   r,   r-   r.   r/   r0   r&   r1   r2   computeZ
sort_indexr8   astyper   )r   r4   r    r5   r6   r   r   r   )test_roundtrip_hive_parquet_spark_to_daskP   s    
r;   c                 C   s~   t |}tjt|d}|dkr&ddini }|j|f|dd| | j|}| }|j|j	j
dd}t||dd	 d S )
N)r   r   timesZint96F)r    Zwrite_indexr   r%   r'   )r(   r.   Zfrom_pandasr*   Z
to_parquetreadr-   ZtoPandasr0   r&   r1   r2   r   )r   r   r4   r    r6   kwargsr5   r   r   r   $test_roundtrip_parquet_dask_to_sparkg   s    r?   c                 C   s   t |}d}d}tt|tjj|dddg|d  ddg|d  d	}|d
dddd	}tdd |jD svt	| 
|}||jj|dd tj|ddd}tdd |jD st	|jt||dd d S )Nr      )sizeTF   ZaliceZbob)abcdZInt64ZFloat64booleanstringc                 S   s   g | ]}t jj|qS r   pdapitypesZis_extension_array_dtype.0dtyper   r   r   
<listcomp>   s     zItest_roundtrip_parquet_spark_to_dask_extension_dtypes.<locals>.<listcomp>r!   r"   r   r    Zuse_nullable_dtypesc                 S   s   g | ]}t jj|qS r   rI   rM   r   r   r   rP      s     r'   )r(   rJ   	DataFramerangenprandomr:   allZdtypesr3   r)   r+   r,   r-   r.   r/   r   )r   r4   r   rA   r*   r5   r6   r   r   r   5test_roundtrip_parquet_spark_to_dask_extension_dtypes{   s6    	
rW   z'Requires pyarrow-backed nullable dtypesc           	   	   C   s.  t |}d}d}tdtdtdtdtdtdg}tt||d	}| |}|d
|d
 t	j
jdd}||jj|dd tjddi tj|ddd}W 5 Q R X |jjjtddkst|j jjtddkst|dttddd	}t||dd d S )N      z8093.234z8094.234z8095.234z8096.234z8097.234z8098.234)rC   rD   rD      r!   r"   zdataframe.dtype_backendr   TrQ   zint64[pyarrow]Fr'   )r(   decimalDecimalrJ   rR   rS   r)   Z
withColumncastr   r   rL   ZDecimalTyper+   r,   r-   daskr   setr.   r/   rD   rO   Zpyarrow_dtypepaZ
decimal128r3   r9   r:   Z
ArrowDtyper   )	r   r4   r   rA   Zdecimal_datar*   r5   r6   expectedr   r   r   test_read_decimal_dtype_pyarrow   s8    
"rb   )&r[   r   sysr   Zpytestr^   Zdask.datasetsr   Zimportorskipr.   r   r`   ZnumpyrT   ZpandasrJ   Zdask.dataframe._compatr   r   Zdask.dataframe.utilsr   markZskipifplatformZ
pytestmarkr9   r*   indexr2   Zreset_indexZfixturer   Zparametrizer7   r;   r?   rW   rb   r   r   r   r   <module>   sN   






(