U
    /e!                     @  s  U d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
mZmZ d dlmZ d dlmZ d dlmZ d dlZd dlmZ d dlmZmZmZ d d	lmZmZ d d
lmZ d dlm Z  dd Z!ee"e#j$e! eej%ej&dZ'ej(Z)dd Z*G dd de+Z,i Z-de.d< dddddZ/zd dl0Z1e1j23  dd Z4W n( e5k
rx   dd Z4d-ddZY nX dd Z6d Z7d!d" Z8d.d$d%d&d'd(Z9d)d* Z:d/d+d,Z;dS )0    )annotationsN)HashableMappingSequence)ProcessPoolExecutor)partial)warn)config)MultiprocessingPoolExecutor	get_asyncreraise)cullfuse)	CPU_COUNT)ensure_dictc                 C  s   t | j| jffS N)getattr__objclass____name__)m r   8/tmp/pip-unpacked-wheel-dbjnr7gq/dask/multiprocessing.py_reduce_method_descriptor   s    r   )protocolc                   C  s
   t  jS r   )multiprocessingcurrent_processidentr   r   r   r   _process_get_id#   s    r   c                   @  s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )RemoteExceptionzXRemote Exception

    Contains the exception and traceback from a remotely run task
    c                 C  s   || _ || _d S r   )	exception	traceback)selfr   r    r   r   r   __init__:   s    zRemoteException.__init__c                 C  s   t | jd | j S )Nz

Traceback
---------
)strr   r    r!   r   r   r   __str__>   s    zRemoteException.__str__c                 C  s(   t ttt| t| j t| j S r   )sortedsetdirtypelist__dict__r   r$   r   r   r   __dir__A   s    zRemoteException.__dir__c                 C  s4   zt | |W S  tk
r.   t| j| Y S X d S r   )object__getattribute__AttributeErrorr   r   )r!   keyr   r   r   __getattr__D   s    zRemoteException.__getattr__N)r   
__module____qualname____doc__r"   r%   r,   r1   r   r   r   r   r   4   s
   r   z&dict[type[Exception], type[Exception]]
exceptions	Exception)excreturnc                 C  sx   t | tkr"tt |  }|| |S z8t | jjtt | fdt | i}|tt | < || |W S  tk
rr   |  Y S X dS )z6Metaclass that wraps exception type in RemoteExceptionZexception_typeN)r)   r5   	__class__r   r   	TypeError)r7   tbtypr   r   r   remote_exceptionN   s    


r=   c                 C  s   | S r   r   r;   r   r   r   _pack_tracebacke   s    r?   c                 C  s   d t| S )N )joinr    	format_tbr>   r   r   r   r?   j   s    c                 C  s   t | |} | d S r   )r=   )r7   r;   r   r   r   r   m   s    
r   c              
   C  sp   t  \}}}t|}z|| |f}W nD tk
rj }  z&t  \}}}t|}|| |f}W 5 d } ~ X Y nX |S r   )sysexc_infor?   BaseException)edumpsexc_type	exc_valueexc_tracebackr;   resultr   r   r   pack_exceptionr   s    rL   zThe 'multiprocessing.context' configuration option will be ignored on Python 2
and on Windows, because they each only support a single context.
c                  C  s:   t dd} tjdkr,| dkr(ttt tS t| S dS )z+Return the current multiprocessing context.zmultiprocessing.contextspawnwin32N)	r	   getrC   platformr   _CONTEXT_UNSUPPORTEDUserWarningr   get_context)Zcontext_namer   r   r   rS      s    

rS   Tr   zSequence[Hashable] | Hashable)dskkeysc	              	   K  sT  |pt dd}|pt dd}|p.t dd}|pBt ddpBt}|dkrtjddkrfd	tjd< t }
tt|d
}t||
|d}d}n*|dk	rt	d t
|tjjrt|}d}t| } t| |\}}|rt|||\}}n|}|pt ddpt}|pt ddpt}z,t|j|j||ft||tt|d|	}W 5 |rN|  X |S )a   Multiprocessed get function appropriate for Bags

    Parameters
    ----------
    dsk : dict
        dask graph
    keys : object or list
        Desired results from graph
    num_workers : int
        Number of worker processes (defaults to number of cores)
    func_dumps : function
        Function to use for function serialization (defaults to cloudpickle.dumps)
    func_loads : function
        Function to use for function deserialization (defaults to cloudpickle.loads)
    optimize_graph : bool
        If True [default], `fuse` is applied to the graph before computation.
    pool : Executor or Pool
        Some sort of `Executor` or `Pool` to use
    initializer: function
        Ignored if ``pool`` has been set.
        Function to initialize a worker process before running any tasks in it.
    chunksize: int, optional
        Size of chunks to use when dispatching work.
        Defaults to 5 as some batching is helpful.
        If -1, will be computed to evenly divide ready work across workers.
    	chunksize   poolNzmultiprocessing.initializernum_workersZPYTHONHASHSEED)N0Z6640Zuser_initializer)Z
mp_contextinitializerTzThe ``initializer`` argument is ignored when ``pool`` is provided. The user should configure ``pool`` with the needed ``initializer`` on creation.F
func_loads
func_dumps)Zget_idrG   loadsrL   Zraise_exceptionrV   )r	   rO   r   osenvironrS   r   initialize_worker_processr   r   
isinstancer   rX   ZPoolr
   r   r   r   _loads_dumpsshutdownr   ZsubmitZ_max_workersr   rL   r   )rT   rU   rY   r]   r^   Zoptimize_graphrX   r\   rV   kwargscontextcleanupZdsk2ZdependenciesZdsk3r_   rG   rK   r   r   r   rO      s`    &
  

rO   c                  C  s"   t jd} | d k	r| j  d S )NZnumpy)rC   modulesrO   randomseed)npr   r   r   default_initializer   s    rn   c                 C  s   t   | dk	r|   dS )zE
    Initialize a worker process before running any tasks in it.
    N)rn   r[   r   r   r   rb     s    rb   )N)NNNTNNN)N)<
__future__r   copyregr   Zmultiprocessing.poolr`   picklerC   r    collections.abcr   r   r   concurrent.futuresr   	functoolsr   warningsr   ZcloudpickleZdaskr	   Z
dask.localr
   r   r   Zdask.optimizationr   r   Zdask.systemr   Z
dask.utilsr   r   r)   r'   unionrG   HIGHEST_PROTOCOLre   r_   rd   r   r6   r   r5   __annotations__r=   Ztblib.pickling_supportZtblibZpickling_supportinstallr?   ImportErrorrL   rQ   rS   rO   rn   rb   r   r   r   r   <module>   s\    
       j