U
    /e=m                     @  s  U d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZmZ d dlZd dlZd dlmZ d d	lmZ d d
lmZ d dlmZmZ d dlmZmZmZmZmZ d dl m!Z!m"Z" ej#$dZ%ej#$dZ&i Z'de(d< dhddZ)dd Z*diddZ+dd Z,dd Z-dd Z.dd Z/d d! Z0d"d# Z1d$d% Z2i Z3d&d' Z4e4d(e)e* e4d)e+e, e4d*e0e1 e4d+de2 d,d- Z5djd/d0d1d2d3d4d5Z6dkd6d7Z7dld8d9Z8dmd:d;Z9G d<d= d=Z:e:Z;G d>d? d?Z<G d@dA dAZ=G dBdC dCZ>dDdE Z?dFdG Z@dHdI ZAdJdK ZBdLdM ZCdNdO ZDe	ejEe<dPdQ ZFe%EeGdRdS ZHe%EeIdTdU ZJe&EeGdVdW ZKe&EeIdXdY ZLe%EedZd[ ZMe&Eed\d] ZNe%EeOd^d_ ZPe&EeOd`da ZQdbdc ZRG ddde deZSeSd(ZTe&EeUeTj7 d(e%e&fdfdgZVdS )n    )annotationsN)array)Enum)partial)
ModuleType)AnyLiteral)normalize_token)typename)pickle)
decompressmaybe_compress)frame_split_sizemerge_memoryviewsmsgpack_optspack_frames_preludeunpack_frames)ensure_memoryviewhas_keyworddask_serializedask_deserializezdict[str, ModuleType]_cached_allowed_modulesc                 C  s   t t| }ztt| }W n tk
r:   t|Y nX t|drX|| |d\}}n|| \}}||tt| dd}||fS )z/Serialize object using the class-based registrycontextr   dask)
sub-headertypetype-serialized
serializer)	r
   r   r   dispatch	TypeErrorNotImplementedErrorr   r   dumps)xr   	type_namer"   Z
sub_headerframesheader r'   B/tmp/pip-unpacked-wheel-g426oqom/distributed/protocol/serialize.py
dask_dumps#   s    
r)   c                 C  s&   t | d }t|}|| d |S )Nr   r   )r   loadsr   r   )r&   r%   typr*   r'   r'   r(   
dask_loads8   s    
r,   c                   sR   d g g  fdd}t j| ||r0|dd nd d d< dtd}| fS )Nc                   s$   t | }  |  | j  d S N)
memoryviewappendreadonly)fr%   	writeabler'   r(   buffer_callbackB   s    
z%pickle_dumps.<locals>.buffer_callbackzpickle-protocol)r4   protocolr   r   )r   r3   )r   r"   gettuple)r#   r   r4   r&   r'   r2   r(   pickle_dumps>   s    
r8   c                 C  sX   |d |dd   }}|  d}|s0t|d }dd t|tt|D }tj||dS )Nr      r3   r-   c                 S  s6   g | ].\}}||j kr.t|r$t|nt|n|qS r'   )r0   r.   	bytearraybytes).0wmvr'   r'   r(   
<listcomp>[   s   z pickle_loads.<locals>.<listcomp>)buffers)r6   lenzipmapr   r   r*   )r&   r%   r#   r@   r3   r'   r'   r(   pickle_loadsT   s    
rD   c                 C  sp   | t krt |  S | d } | ddd }|rX|tjdkrXt| t | < t |  S t	dt
|  dd S )Nascii.r9   r   z%distributed.scheduler.allowed-importsz
Importing z is not allowed, please add it to the list of allowed modules the scheduler can import via the distributed.scheduler.allowed-imports configuration setting.)r   encodedecodesplitr   configr6   	importlibimport_moduleRuntimeErrorrepr)namerootr'   r'   r(   import_allowed_modulec   s    rQ   c                 C  s\   d| kr0t | d }t|| d }t|| d S d| krDt| d S d| krXt| d  S | S )	,
    Custom packer/unpacker for msgpack
    __Enum__
__module____name__rO   __Set__as-list__Serialized__data)rQ   getattrset
Serialized)objmodr+   r'   r'   r(   msgpack_decode_defaultx   s    r_   c                 C  sX   t | trdt| jdS t | tr<d| j| jt| jdS t | t	rTdt
| dS | S )rR   T)rX   rY   )rS   rO   rT   rU   )rV   rW   )
isinstance	Serialize	serializerY   r   rO   rT   r   rU   r[   list)r]   r'   r'   r(   msgpack_encode_default   s    


rd   c                 C  s@   zt j| dd}W n tk
r,   t Y nX ddi|gfS d S )NTuse_bin_typer   msgpack)rg   r"   	Exceptionr!   )r#   framer'   r'   r(   msgpack_dumps   s
    rj   c                 C  s   t jd|fdditS )N    use_listF)rg   r*   joinr   r&   r%   r'   r'   r(   msgpack_loads   s    ro   c                 C  s    d dd |D }t|d S )N
c                 S  s   g | ]}t |d qS )utf8)codecsrH   r<   ri   r'   r'   r(   r?      s     z-serialization_error_loads.<locals>.<listcomp>)rm   r    )r&   r%   msgr'   r'   r(   serialization_error_loads   s    ru   c                 C  s   |||ot |dft| < d S )Nr   )r   families)rO   r"   r*   r'   r'   r(   register_serialization_family   s    rw   r   r   rg   errorc                 C  s   t | tttfkr*t| r*ttt| S t | tkrVt| rVttt| 	 d S zt
t |  W dS  tk
r~   Y nX dS )Nr9   TF)r   rc   r[   r7   rA   check_dask_serializablenextiterdictitemsr   r   r    r#   r'   r'   r(   ry      s    ry   messageobjectzLiteral['message' | 'raise']zbool | Nonez/tuple[dict[str, Any], list[bytes | memoryview]])r#   on_erroriterate_collectionreturnc              	     s@  dkrdt | tr"| j| jfS t | tr@t| j ddS |dkrt| tt	t
tfkrt| tkrdkrdkpddk}|st| }t| tkr|rztt|   W n tk
r   d}Y nX d}t| tt	t
fkr|st| tkr$|r$|r$t | tr^g }|  D ]4\}}t| d\}	}
||	d	< ||	|
f q&n,t | tt	t
fstt fd
d| D }g }g }g }|D ]F\}	}
||
 t|
}|| ||	dpdgt|
  qdd |D d|t| jd}tdd |D r||d< ||fS d}D ]}t| \}}}z4|rR||  dn|| \}}||d< ||fW   S  tk
r   Y q,Y n& tk
r   t }Y  qY nX q,dt| j }dkr
|g}|r||dd  dd |D }ddi|fS dkr,t|t| dd nt dddS )a!  
    Convert object to a header and list of bytestrings

    This takes in an arbitrary Python object and returns a msgpack serializable
    header and a list of bytes or memoryview objects.

    The serialization protocols to use are configurable: a list of names
    define the set of serializers to use, in order. These names are keys in
    the ``serializer_registry`` dict (e.g., 'pickle', 'msgpack'), which maps
    to the de/serialize functions. The name 'dask' is special, and will use the
    per-class serialization methods. ``None`` gives the default list
    ``['dask', 'pickle']``.

    Notes on the ``iterate_collection`` argument (only relevant when
    ``x`` is a collection):
    - ``iterate_collection=True``: Serialize collection elements separately.
    - ``iterate_collection=False``: Serialize collection elements together.
    - ``iterate_collection=None`` (default): Infer the best setting.

    Examples
    --------
    >>> serialize(1)
    ({}, [b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00K\x01.'])

    >>> serialize(b'123')  # some special types get custom treatment
    ({'type': 'builtins.bytes'}, [b'123'])

    >>> deserialize(*serialize(1))
    1

    Returns
    -------
    header: dictionary containing any msgpack-serializable metadata
    frames: list of bytes or memoryviews, commonly of length one

    See Also
    --------
    deserialize : Convert header and frames back to object
    to_serialize : Mark that data in a message should be serialized
    register_serialization : Register custom serialization functions
    N)r   r   T)serializersr   r   r   rg   r   Fr   r   r   keyc                   s   g | ]}t | d qS )r   )rb   r<   r]   r   r   r   r'   r(   r?   <  s      zserialize.<locals>.<listcomp>compressionc                 S  s   g | ]}|d  qS )r   r'   r   r'   r'   r(   r?   M  s     )sub-headersis-collectionframe-lengthsr   c                 s  s   | ]}|d k	V  qd S r-   r'   )r<   r   r'   r'   r(   	<genexpr>R  s     zserialize.<locals>.<genexpr> r   r   z#Could not serialize object of type r   i c                 S  s   g | ]}|  qS r'   )rG   rs   r'   r'   r(   r?   j  s     rx   raisei'  z	on_error=z; expected 'message' or 'raise')!r`   r\   r&   r%   ra   rb   rY   r   rc   r[   r7   r|   indexry   rg   r"   keysrh   r}   r/   AssertionErrorextendrA   r6   rU   anyrv   r!   	traceback
format_excr    str
ValueError)r#   r   r   r   r   Z	dict_safeZheaders_frameskv_headerZ_framesr%   lengthscompressionslengthheaderstbrO   r"   _wants_contextr&   rt   Z
txt_framesr'   r   r(   rb      s    0



   


$


rb   c              	   C  s  d| kr| d }| d }t tttd| d  }d}|tkri }t||D ]8\}}	|d}
t|||||	  |d||
< ||	7 }qH|S g }t||D ]0\}}	|t|||||	  |d ||	7 }q||S | d	}|d
k	r||krt	d|t
t|f t| \}}}|| |S )a  
    Convert serialized header and list of bytestrings back to a Python object

    Parameters
    ----------
    header : dict
    frames : list of bytes
    deserializers : dict[str, tuple[Callable, Callable, bool]] | None
        An optional dict mapping a name to a (de)serializer.
        See `dask_serialize` and `dask_deserialize` for more.

    See Also
    --------
    serialize
    r   r   r   )r7   rc   r[   r|   r   r   r   deserializersr   NzAData serialized with %s but only able to deserialize data with %s)r7   rc   r[   r|   rB   popdeserializer/   r6   r    r   rv   )r&   r%   r   r   r   clsstartdr   _lengthr   lstrO   r"   r*   r   r'   r'   r(   r   s  sL    




r   c                 C  s  t | |||\}}g }g }g }	g }
t||dp<dgt| D ]\}}|dkrt||d}|t| |t|	 |	| |
dgt|  q@|d |t|	 |	| |
| q@t|
t|	kstt||d< t||d< t|
|d< ||	fS )a\  Serialize and split compressible frames

    This function is a drop-in replacement of `serialize()` that calls `serialize()`
    followed by `frame_split_size()` on frames that should be compressed.

    Use `merge_and_deserialize()` to merge and deserialize the frames back.

    See Also
    --------
    serialize
    merge_and_deserialize
    r   N)nr9   split-num-sub-framessplit-offsets)	rb   rB   r6   rA   r   r/   r   r   r7   )r#   r   r   r   sizer&   r%   Znum_sub_framesoffsetsZ
out_framesZout_compressionri   r   Z
sub_framesr'   r'   r(   serialize_and_split  s0     


r   c              
   C  s   d| kr|}nlg }t | d | d D ]T\}}||||  }zt|}W n$ ttfk
rl   t |}Y nX || q$t| ||dS )zMerge and deserialize frames

    This function is a drop-in replacement of `deserialize()` that merges
    frames that were split by `serialize_and_split()`

    See Also
    --------
    deserialize
    serialize_and_split
    r   r   r   )rB   r   r   r    r:   rm   r/   r   )r&   r%   r   Zmerged_framesr   offsetZ	subframesmergedr'   r'   r(   merge_and_deserialize  s    r   c                   @  s8   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d ZdS )ra   a	  Mark an object that should be serialized

    Examples
    --------
    >>> msg = {'op': 'update', 'data': to_serialize(123)}
    >>> msg  # doctest: +SKIP
    {'op': 'update', 'data': <Serialize: 123>}

    See also
    --------
    distributed.protocol.dumps
    c                 C  s
   || _ d S r-   rY   selfrY   r'   r'   r(   __init__  s    zSerialize.__init__c                 C  s   d| j  dS )Nz<Serialize: >r   r   r'   r'   r(   __repr__  s    zSerialize.__repr__c                 C  s   t |to|j| jkS r-   )r`   ra   rY   r   otherr'   r'   r(   __eq__  s    zSerialize.__eq__c                 C  s
   | |k S r-   r'   r   r'   r'   r(   __ne__  s    zSerialize.__ne__c                 C  s
   t | jS r-   hashrY   r   r'   r'   r(   __hash__  s    zSerialize.__hash__N	rU   rT   __qualname____doc__r   r   r   r   r   r'   r'   r'   r(   ra     s   ra   c                   @  s(   e Zd ZdZdd Zdd Zdd ZdS )	r\   a  An object that is already serialized into header and frames

    Normal serialization operations pass these objects through.  This is
    typically used within the scheduler which accepts messages that contain
    data without actually unpacking that data.
    c                 C  s   || _ || _d S r-   rn   r   r&   r%   r'   r'   r(   r     s    zSerialized.__init__c                 C  s"   t |to |j| jko |j| jkS r-   )r`   r\   r&   r%   r   r'   r'   r(   r   !  s
    


zSerialized.__eq__c                 C  s
   | |k S r-   r'   r   r'   r'   r(   r   (  s    zSerialized.__ne__NrU   rT   r   r   r   r   r   r'   r'   r'   r(   r\     s   r\   c                   @  s8   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d ZdS )ToPickleaQ  Mark an object that should be pickled

    Both the scheduler and workers with automatically unpickle this
    object on arrival.

    Notice, this requires that the scheduler is allowed to use pickle.
    If the configuration option "distributed.scheduler.pickle" is set
    to False, the scheduler will raise an exception instead.
    c                 C  s
   || _ d S r-   r   r   r'   r'   r(   r   7  s    zToPickle.__init__c                 C  s   dt | j S )Nz<ToPickle: %s>)r   rY   r   r'   r'   r(   r   :  s    zToPickle.__repr__c                 C  s   t |t| o|j| jkS r-   )r`   r   rY   r   r'   r'   r(   r   =  s    zToPickle.__eq__c                 C  s
   | |k S r-   r'   r   r'   r'   r(   r   @  s    zToPickle.__ne__c                 C  s
   t | jS r-   r   r   r'   r'   r(   r   C  s    zToPickle.__hash__Nr   r'   r'   r'   r(   r   ,  s   
r   c                   @  s(   e Zd ZdZdd Zdd Zdd ZdS )	PickledzyAn object that is already pickled into header and frames

    Normal pickled objects are unpickled by the scheduler.
    c                 C  s   || _ || _d S r-   rn   r   r'   r'   r(   r   M  s    zPickled.__init__c                 C  s&   t |t| o$|j| jko$|j| jkS r-   )r`   r   r&   r%   r   r'   r'   r(   r   Q  s
    

zPickled.__eq__c                 C  s
   | |k S r-   r'   r   r'   r'   r(   r   X  s    zPickled.__ne__Nr   r'   r'   r'   r(   r   G  s   r   c                   s    fdd  | S )z
    Replace all Serialize and Serialized values nested in *x*
    with the original values.  Returns a copy of *x*.

    >>> msg = {'op': 'update', 'data': to_serialize(123)}
    >>> nested_deserialize(msg)
    {'op': 'update', 'data': 123}
    c                   s   t | tkr||  } |  D ]\\}}t |}|tks<|tkrJ || |< q|tkr^|j| |< q|tkrt|j	|j
| |< qnzt | tkrt| } t| D ]\\}}t |}|tks|tkrƈ || |< q|tkr|j| |< q|tkrt|j	|j
| |< q| S r-   )r   r|   copyr}   rc   ra   rY   r\   r   r&   r%   	enumerate)r#   r   r   r+   replace_innerr'   r(   r   f  s*    z)nested_deserialize.<locals>.replace_innerr'   r~   r'   r   r(   nested_deserialize\  s    
r   c                 K  sl   t | f|\}}|r(ttt| \}}ng }||d< t||d< tj|dd}|f|}|dt| |S )Nr   countTre   r   )	r   rB   rC   r   rA   rg   r"   insertr   )r#   kwargsr&   r%   r   Zframes2r'   r'   r(   serialize_bytelist  s    
r   c                 K  s   t | f|}d|S )Nrk   )r   rm   )r#   r   Lr'   r'   r(   serialize_bytes  s    r   c                 C  sL   t | }|d |dd   }}|r4tj|ddd}ni }t||}t||S )Nr   r9   F)rawrl   )r   rg   r*   r   r   )br%   r&   r'   r'   r(   deserialize_bytes  s    
r   c                 C  s2   t | trtdt| | t| | dS )a  Register a new class for dask-custom serialization

    Parameters
    ----------
    cls : type
    serialize : callable(cls) -> Tuple[Dict, List[bytes]]
    deserialize : callable(header: Dict, frames: List[bytes]) -> cls

    Examples
    --------
    >>> class Human:
    ...     def __init__(self, name):
    ...         self.name = name

    >>> def serialize(human):
    ...     header = {}
    ...     frames = [human.name.encode()]
    ...     return header, frames

    >>> def deserialize(header, frames):
    ...     return Human(frames[0].decode())

    >>> register_serialization(Human, serialize, deserialize)
    >>> serialize(Human('Alice'))
    ({}, [b'Alice'])

    See Also
    --------
    serialize
    deserialize
    z^Strings are no longer accepted for type registration. Use dask_serialize.register_lazy insteadN)r`   r   r    r   registerr   )r   rb   r   r'   r'   r(   register_serialization  s     
r   c                 C  s   t ddS )z[Register a registration function to be called if *toplevel*
    module is ever loaded.
    z9Serialization registration has changed. See documentationN)rh   )toplevelfuncr'   r'   r(   register_serialization_lazy  s    r   c                 C  s   | j g| j S r-   rn   )or'   r'   r(   normalize_Serialized  s    r   c                 C  s   i }| g}||fS r-   r'   r]   r&   r%   r'   r'   r(   _serialize_bytes  s    r   c                 C  s   i }| g}||fS r-   r'   r   r'   r'   r(   _serialize_bytearray  s    r   c                 C  s0   t |dkr"t|d tr"|d S d|S d S )Nr9   r   rk   )rA   r`   r;   rm   rn   r'   r'   r(   _deserialize_bytes  s    r   c                 C  s2   t |dkr"t|d tr"|d S t |S d S )Nr9   r   )rA   r`   r:   rm   rn   r'   r'   r(   _deserialize_bytearray  s    r   c                 C  s   | j dd}t| g}||fS )Nr-   )typecoder3   )r   r.   r   r'   r'   r(   _serialize_array  s    
r   c                 C  sR   t | d }t|}|dkr0|t|d  n|dkrN|dtt| |S )Nr   r9   r   rk   )r   rA   Z	frombytesr   rm   rC   )r&   r%   aZnframesr'   r'   r(   _deserialize_array  s    r   c                 C  sD   | j dkrtd| s(| jdkr(td| j | jd}| g}||fS )NOz7Cannot serialize `memoryview` containing Python objectsr9   z+Cannot serialize empty non-1-D `memoryview`)formatshape)r   r   ndimr   r   r'   r'   r(   _serialize_memoryview  s    
r   c                 C  sf   t |dkrt|d }ntd|}|rB|| d | d }n || d }|j| d ksbt|S )Nr9   r   rk   r   r   )rA   r   r.   rm   castr   r   )r&   r%   outr'   r'   r(   _deserialize_memoryview  s    r   c                 C  s   t | }| d kp|tkp|tkp|tkp|tkp|tkpt| trjtt	t
|  rjtdd |  D pt| ttfott	t
| S )Nc                 s  s   | ]}t |tkV  qd S r-   )r   r   )r<   r#   r'   r'   r(   r   9  s     z+_is_msgpack_serializable.<locals>.<genexpr>)r   r   boolr;   intfloatr`   r|   allrC   _is_msgpack_serializablevaluesr   rc   r7   )r   r+   r'   r'   r(   r   .  s(    
	r   c                   @  s$   e Zd Zdd Zdd Zdd ZdS )ObjectDictSerializerc                 C  s
   || _ d S r-   )r   )r   r   r'   r'   r(   r   @  s    zObjectDictSerializer.__init__c           	      C  s   | j tt|i i d}g }t|tr.|}n|j}| D ]\}}t|rZ||d |< q<t|tr|| 	|\}}d|i}nt	|| j dfd\}}|t
|t
|t
| d|d |< ||7 }q<||fS )N)r   r   simplecomplexr   nested-dictr   )r   )r&   r   stopr   )r   r   r"   r   r`   r|   __dict__r}   r   rb   rA   )	r   Zestr&   r%   r   r   r   hr1   r'   r'   r(   rb   C  s,    



zObjectDictSerializer.serializec                 C  s   t |d }t|tr"i  }}nt|}|j}||d  |d  D ]R\}}|d }||d |d  }	|	d}
|
r| 
|
|	}n
t
||	}|||< qL|S )Nr   r   r   r&   r   r   r   )r   r*   
issubclassr|   r   __new__r   updater}   r6   r   )r   r&   r%   r   ddr]   r   r   r   r1   Znested_dictr   r'   r'   r(   r   b  s    





z ObjectDictSerializer.deserializeN)rU   rT   r   r   rb   r   r'   r'   r'   r(   r   ?  s   r   c                 C  s,   t |}|| |j || |j dS )a  Register (de)serialize to traverse through __dict__

    Normally when registering new classes for Dask's custom serialization you
    need to manage headers and frames, which can be tedious.  If all you want
    to do is traverse through your object and apply serialize to all of your
    object's attributes then this function may provide an easier path.

    This registers a class for the custom Dask serialization family.  It
    serializes it by traversing through its __dict__ of attributes and applying
    ``serialize`` and ``deserialize`` recursively.  It collects a set of frames
    and keeps small attributes in the header.  Deserialization reverses this
    process.

    This is a good idea if the following hold:

    1.  Most of the bytes of your object are composed of data types that Dask's
        custom serializtion already handles well, like Numpy arrays.
    2.  Your object doesn't require any special constructor logic, other than
        object.__new__(cls)

    Examples
    --------
    >>> import sklearn.base
    >>> from distributed.protocol import register_generic
    >>> register_generic(sklearn.base.BaseEstimator)

    See Also
    --------
    dask_serialize
    dask_deserialize
    N)r   r   rb   r   )r   Zserializer_nameZserialize_funcZdeserialize_funcZobject_with_dict_serializerr'   r'   r(   register_generic|  s    %r  )N)N)Nr   NN)N)Nr   NN)N)W
__future__r   rr   rK   r   r   enumr   	functoolsr   typesr   typingr   r   rg   r   Z	dask.baser	   Z
dask.utilsr
   Zdistributed.protocolr   Z distributed.protocol.compressionr   r   Zdistributed.protocol.utilsr   r   r   r   r   Zdistributed.utilsr   r   utilsZDispatchr   r   r   __annotations__r)   r,   r8   rD   rQ   r_   rd   rj   ro   ru   rv   rw   ry   rb   r   r   r   ra   Zto_serializer\   r   r   r   r   r   r   r   r   r   r   r;   r   r:   r   r   r   r   r   r.   r   r   r   r   Z dask_object_with_dict_serializerr|   r  r'   r'   r'   r(   <module>   s    

	     "
;       
,
&)











8