U
    /eR$                     @   sr  d dl Z d dlZedZd dlZd dlmZ d dlmZmZ d dl	m
Z
mZ d dlmZ d dlmZmZmZ G dd	 d	eZd
d Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zejjd d!ej d"ed#fed#fed#fed#fed#fed#fed$fed$fgej d%d&d'gd(d) Z!d*d+ Z"d,d- Z#ej d.e"e#gd/d0 Z$d1d2 Z%dS )3    Ndistributed)getitem)ClientSchedulerPlugin)clusterloop)HighLevelGraph)ArrayChunkShapeDepArraySliceDepfractional_slicec                   @   s$   e Zd ZdZdZdd Zdd ZdS )SchedulerImportCheckzAPlugin to help record which modules are imported on the schedulerzimport-checkc                 C   s
   || _ d S N)pattern)selfr    r   :/tmp/pip-unpacked-wheel-dbjnr7gq/dask/tests/test_layers.py__init__   s    zSchedulerImportCheck.__init__c                    sB   t  | _t tjD ]*}|| js0| j| qtj| qd S r   )setstart_modulessysmodules
startswithr   addpop)r   	schedulermodr   r   r   start   s
    zSchedulerImportCheck.startN)__name__
__module____qualname____doc__namer   r   r   r   r   r   r      s   r   c                     st   t d} ddtfddD }| |}t|  fdd}tD ]}tD ]}||| q^qRd S )Ndask.array.core   r#      c                 3   s   | ]} | V  qd S r   r   .0ndr   r   	<genexpr>(   s     z-test_array_chunk_shape_dep.<locals>.<genexpr>c                    s    | |f }|kst d S r   )AssertionError)ijZchunk_shape
array_depschunkr   r   check,   s    z)test_array_chunk_shape_dep.<locals>.check)pytestimportorskiptuplenormalize_chunksr	   rangeZdacshapechunksr2   r-   r.   r   r0   r1   r*   r   test_array_chunk_shape_dep$   s    
r<   c                     st   t d} ddtfddD }| |}t|  fdd}tD ]}tD ]}||| q^qRd S )Nr"   r#   r$   c                 3   s   | ]} | V  qd S r   r   r&   r)   r   r   r+   9   s     z(test_array_slice_deps.<locals>.<genexpr>c                    sX    | |f }|t d |  d | d  d t d | d |d  d fksTtd S )Nr      )slicer,   )r-   r.   Zslicesr/   r   r   r2   =   s
    z$test_array_slice_deps.<locals>.check)r3   r4   r5   r6   r
   r7   r8   r   r;   r   test_array_slice_deps5   s    
r?   c                 C   sF   t d}t d}|tdtddd}|j|ddjdd	d
S )Npandasdask.dataframe
      abr#   npartitionsrE   tasksshuffle)r3   r4   	DataFramer7   from_pandasrK   tmpdirpddddfr   r   r   _dataframe_shuffleI   s    

rS   c                 C   s@   t d}t d}|tdtddd}|j|dd S )Nr@   rA   rB   rC   rD   r#   rG   )r3   r4   rL   r7   rM   meanrN   r   r   r   _dataframe_tree_reductionR   s    

rU   c                 C   s\   t d}t d}|tdtddd}|j|dd}|j|dd}|j|d	d
ddS )Nr@   rA   rB   rC   rD      rG   r=   leftTrI   )how	broadcastrK   )r3   r4   rL   r7   rM   merge)rO   rP   rQ   rR   ddf1ddf2r   r   r   _dataframe_broadcast_join[   s    

r]   c                 C   s   t d}|d|d S )N
dask.arrayd   )r3   r4   oneszeros)rO   dar   r   r   _array_creationf   s    
rd   c                 C   s(   t d}|d}|jdd dddS )Nr^   r_   c                 S   s   | S r   r   xr   r   r   <lambda>p       z$_array_map_overlap.<locals>.<lambda>r=   none)depthboundary)r3   r4   ra   Zmap_overlap)rO   rc   arrayr   r   r   _array_map_overlapm   s    

rm   c                  C   s   t ddditdtddffks$tt ddddtdtd d d td	d ffksTtt d
dddtdtddtd	d ffkstt dddi} t| d d tstd S )N)rf   g@r   r#   )rf      )rf   r%   ffffff@r%   )r   r=   )rf   r%   rn   )rf   g333333@ro   r=   )r   r   r>   r,   
isinstanceint)fsr   r   r   test_fractional_slices   s    $

rt   c                 C   s   t d t d}t d}zdd lm} W n tk
rF   d }Y nX |j|dtdiddjt	| d	d
 dg}|j
t	| d	|d}|r|j
t	| d|d}||fS |S d S )Nzpyarrow.parquetr@   rA   r   rE   rB   r#   rG   Zpyarrowengine)rE   z<=r#   )rv   filterszpyarrow-dataset)r3   r4   Zpyarrow.datasetZdatasetImportErrorrM   rL   r7   
to_parquetstrread_parquet)rO   rP   rQ   Zpa_dsrw   r[   r\   r   r   r   _pq_pyarrow   s*    



r|   c                 C   sZ   t d t d}t d}|j|dtdiddjt| dd |jt| ddS )	NZfastparquetr@   rA   rE   rB   r#   rG   ru   )r3   r4   rM   rL   r7   ry   rz   r{   rO   rP   rQ   r   r   r   _pq_fastparquet   s    


r~   c                 C   sR   t d}t d}|j|dtdiddt|  |tj	
t| dS )Nr@   rA   rE   rB   r#   rG   *)r3   r4   rM   rL   r7   Zto_csvrz   Zread_csvospathjoinr}   r   r   r   	_read_csv   s    

r   z#8480)reasonzop,libzpandas.znumpy.optimize_graphTFc              
      s   t dt gid\}}t|d |dp}|j| ||d |dd }|dd }	||	 }
t fd	d
|	D r|tt fdd
|
D rtW 5 Q R X W 5 Q R X d S )Nplugins)Zscheduler_kwargsaddress)r   )r   c                   S   s
   t tjS r   )r   r   r   r   r   r   r   rg      rh   z>test_scheduler_highlevel_graph_unpack_import.<locals>.<lambda>c                 S   s   | j tj jS r   )r   r   r!   r   )Zdask_schedulerr   r   r   rg      s   c                 3   s   | ]}|  V  qd S r   r   r'   modulelibr   r   r+      s     z?test_scheduler_highlevel_graph_unpack_import.<locals>.<genexpr>c                 3   s   | ]}|  V  qd S r   r   r   r   r   r   r+      s     )r   r   r   ZcomputeZrun_on_scheduleranyr,   )opr   r   r   rO   r   workerscZend_modulesr   Znew_modulesr   r   r   ,test_scheduler_highlevel_graph_unpack_import   s    r   c                 C   s   | j dddS )Nrf   rI   rJ   rJ   ddfr   r   r   _shuffle_op   s    r   c                 C   s   |  dddiS )Nr!   rf   rT   )groupbyZaggr   r   r   r   _groupby_op   s    r   r   c                 C   sT   t d t d}| |jdd }|j}|| }| | ksPtd S )NrA   dask.datasets
2000-01-15end)	r3   r4   
timeseriescountdaskcull__dask_keys__get_all_dependenciesr,   )r   datasetsresultgraphculled_graphr   r   r   $test_dataframe_cull_key_dependencies   s    

r   c                     s   t d} t d}| jdd}d d}i }t|jD ]6}dd |j|ff|||f< d	d ||ff| |f< q6tj ||gd
}|j	| |j
|j}|j} fdd| D }||}	|	j }
|	 }|
|kst| }| df ||df ||jdf ||kstd S )Nr   rA   r   r   Zcustom_graph_testZcustom_graph_test_0c                 S   s   | S r   r   re   r   r   r   rg     rh   zCtest_dataframe_cull_key_dependencies_materialized.<locals>.<lambda>c                 S   s   | S r   r   re   r   r   r   rg     rh   )Zdependenciesc                    s   g | ]}| d fkr|qS )r   r   )r'   kr!   r   r   
<listcomp>  s      zEtest_dataframe_cull_key_dependencies_materialized.<locals>.<listcomp>r   )r3   r4   r   r7   rH   _namer   Zfrom_collectionscoreZnew_dd_objectZ_metaZ	divisionsr   r   r   Zkey_dependenciescopyr   r,   r   )r   rQ   r   Zname_0Zdskr-   r   r   Zculled_keysr   Zcached_depsdepsZdeps0r   r   r   1test_dataframe_cull_key_dependencies_materialized  s,    



r   )&r   r3   r4   r   r   operatorr   r   r   Zdistributed.utils_testr   r   Zdask.highlevelgraphr   Zdask.layersr	   r
   r   r   r<   r?   rS   rU   r]   rd   rm   rt   r|   r~   r   markZxfailZparametrizer   r   r   r   r   r   r   r   r   <module>   sP   
		
