U
    /eL                     @  s   d dl mZ d dlZd dlZd dlZd dlmZ d dlm	Z	m
Z
 d dlmZmZmZmZmZmZmZmZ d dlmZ d dlmZ eeZdd	d
ddZdddZdS )    )annotationsN)pickle)
decompressmaybe_compress)Pickled	Serialize
SerializedToPicklemerge_and_deserializemsgpack_decode_defaultmsgpack_encode_defaultserialize_and_split)msgpack_opts)ensure_memoryviewmessagelist)returnc                   s   zrdkrdd ini fdd ddd fddd	dd fd
ddgfdd}t j| |ddd< W S  tk
r   tjddd  Y nX dS )a  Transform Python message to bytestream suitable for communication

    Developer Notes
    ---------------
    The approach here is to use `msgpack.dumps()` to serialize `msg` and
    write the result to the first output frame. If `msgpack.dumps()`
    encounters an object it cannot serialize like a NumPy array, it is handled
    out-of-band by `_encode_default()` and appended to the output frame list.
    compressionc                   sd   t | dd gt| }tt|D ],}|| d kr&t|| f \||< ||< q&t|| d< d S )Nr   )r   getlenranger   tuple)headerframesr   i)compress_opts =/tmp/pip-unpacked-wheel-g426oqom/distributed/protocol/core.py_inplace_compress_frames-   s    z'dumps.<locals>._inplace_compress_frameszSerialized | Serializer   )objr   c                   s`   t | tr| j| j }}n t| d\}} || t||d< tj|tdd}|g| S )N)serializerson_errorcontextsizenum-sub-framesTdefaultuse_bin_type)	
isinstancer   r   r   r   r   msgpackdumpsr   )r   
sub_header
sub_frames)r   r"   frame_split_sizer!   r    r   r   create_serialized_sub_frames8   s"    


  z+dumps.<locals>.create_serialized_sub_frameszPickled | ToPicklec                   sd   t | tr| j| j } n*g  dtj| j fdddi}|  t |d< t|}|g  S )Npickled-objc                   s     t| S )N)appendr   )xr,   r   r   <lambda>T   s   z:dumps.<locals>.create_pickled_sub_frames.<locals>.<lambda>)buffer_callbackr$   )	r(   r   r   r   r   r*   datar   r)   )r   r+   )r   r2   r   create_pickled_sub_framesJ   s    
 



z(dumps.<locals>.create_pickled_sub_framesNc                   sd   t | ttfr,t}|  d|iS t | ttfrXt} |  d|iS t| S d S )N__Serialized____Pickled__)r(   r   r   r   extendr	   r   r   )r   offset)r6   r.   r   r   r   _encode_defaulta   s    zdumps.<locals>._encode_defaultTr%   r   zFailed to Serializeexc_info)r)   r*   	Exceptionloggercritical)msgr    r!   r"   r-   r;   r   )	r   r   r"   r6   r.   r-   r   r!   r    r   r*      s    r*   Tc                   sf   t jd z0 fdd}tjd f|ddtW S  tk
r`   tjddd	  Y nX d
S )z+Transform bytestream back into Python valuezdistributed.scheduler.picklec                   s   |  dd}|dkrztj| ftddt}|d7 }|||d   }rpd|krbt||}t||dS t||S |  d	d}|dkrt| }|d7 }|||d   } rtj|d
 |dS t	dt| S )Nr7   r   Fobject_hookuse_list   r$   r   )deserializersr8   r/   )bufferszPUnpickle on the Scheduler isn't allowed, set `distributed.scheduler.pickle=true`)
r   r)   loadsr   r   r   r
   r   r   
ValueError)r   r:   r+   r,   Zallow_pickledeserializerF   r   r   r   _decode_default|   s@    
  
zloads.<locals>._decode_defaultr   FrB   zFailed to deserializeTr<   N)	daskconfigr   r)   rH   r   r>   r?   r@   )r   rK   rF   rL   r   rJ   r   rH   u   s    " rH   )Nr   NN)TN)
__future__r   loggingr)   Zdask.configrM   Zdistributed.protocolr   Z distributed.protocol.compressionr   r   Zdistributed.protocol.serializer   r   r   r	   r
   r   r   r   Zdistributed.protocol.utilsr   Zdistributed.utilsr   	getLogger__name__r?   r*   rH   r   r   r   r   <module>   s   (

       Z