U
    /e                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlmZ d dlmZ d dlZd dlmZmZ d dlmZmZmZmZ d dlmZ d dlmZmZmZmZ d d	lm Z m!Z!m"Z"m#Z# e$e%Z&e' Z(d)dddddddZ)G dd dej*Z+G dd deZ,G dd de,Z-dd Z.dd Z/G dd deZ0G dd de0Z1G dd  d eZ2G d!d" d"e2Z3G d#d$ d$eZ4G d%d& d&e4Z5G d'd( d(Z6dS )*    )annotationsN)islice)Any)parse_host_portunparse_host_port)CommCommClosedError	ConnectorListener)Backend)ensure_concrete_hostfrom_frames
host_array	to_frames)	ensure_ipensure_memoryviewget_ipget_ipv6      list[bytes]int)bufferstarget_buffer_sizesmall_buffer_sizereturnc                   s   t | dkr| S g g  d fdd}| D ]F}t |}||krf | |7 |krv|  q0|  | q0|  S )a  Given a list of buffers, coalesce them into a new list of buffers that
    minimizes both copying and tiny writes.

    Parameters
    ----------
    buffers : list of bytes_like
    target_buffer_size : int, optional
        The target intermediate buffer size from concatenating small buffers
        together. Coalesced buffers will be no larger than approximately this size.
    small_buffer_size : int, optional
        Buffers <= this size are considered "small" and may be copied.
       r   c                     s@    r<t  dkr  d  nd     dd S )Nr   r       )lenappendjoinclear concatZcsizeZout_buffersr"   @/tmp/pip-unpacked-wheel-g426oqom/distributed/comm/asyncio_tcp.pyflush;   s    zcoalesce_buffers.<locals>.flush)r   r   )r   r   r   r&   bsizer"   r#   r%   coalesce_buffers"   s"    

r)   c                      s   e Zd ZdZd4 fdd	Zedd Zedd	 Zed
d Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd$d% Zd5d&d'Zd(d) Zd*d+ Zd,d-d.d/Zd,d0d1d2d3Z  ZS )6DaskCommProtocola:  Manages a state machine for parsing the message framing used by dask.

    Parameters
    ----------
    on_connection : callable, optional
        A callback to call on connection, used server side for handling
        incoming connections.
    min_read_size : int, optional
        The minimum buffer size to pass to ``socket.recv_into``. Larger sizes
        will result in fewer recv calls, at the cost of more copying. For
        request-response comms (where only one message may be in the queue at a
        time), a smaller value is likely more performant.
    N   c                   s   t    || _t | _t | _d | _d| _	d | _
| j | _d| _t|d| _t| j| _d| _d| _d | _d | _d | _d | _d| _d S )NFT   r   )super__init__on_connectionasyncioget_running_loop_loopQueue_queue
_transport_paused_drain_waitercreate_future_closed_waiter_using_default_buffermax_default_lenr   _default_buffer_default_start_default_end_nframes_frame_lengths_frames_frame_index_frame_nbytes_needed)selfr/   Zmin_read_size	__class__r"   r%   r.   c   s$    



zDaskCommProtocol.__init__c                 C  s2   | j r
dS | jd}|d k	r.t|d d  S dS )N<closed>sockname   	<unknown>	is_closedr5   get_extra_infor   )rE   rI   r"   r"   r%   
local_addr   s    zDaskCommProtocol.local_addrc                 C  s2   | j r
dS | jd}|d k	r.t|d d  S dS )NrH   peernamerJ   rK   rL   )rE   rP   r"   r"   r%   	peer_addr   s    zDaskCommProtocol.peer_addrc                 C  s
   | j d kS N)r5   rE   r"   r"   r%   rM      s    zDaskCommProtocol.is_closedc                 C  s    | j sd | j | _}|  d S rR   )rM   r5   abortrE   	transportr"   r"   r%   _abort   s    zDaskCommProtocol._abortc                 C  s>   | j s:td| d z|   W n tk
r8   Y nX d S )NzClosing dangling comm ``)rM   loggerwarningrW   RuntimeError)rE   Z	comm_reprr"   r"   r%   _close_from_finalizer   s    z&DaskCommProtocol._close_from_finalizerc                   s,   | j sd | j | _}|  | jI d H  d S rR   )rM   r5   closer9   rU   r"   r"   r%   _close   s    zDaskCommProtocol._closec                 C  sF   t |tjjkrt| |}|| _| jjdd | jd k	rB| |  d S )Ni   )high)typer0   selector_eventsZ_SelectorSocketTransport_ZeroCopyWriterr5   set_write_buffer_limitsr/   rU   r"   r"   r%   connection_made   s    

z DaskCommProtocol.connection_madec                 C  sR   | j dks| j| jk r,d| _| j| jd S d| _| j | j }|| j d S dS )z-Get a buffer to read into for this read eventNTF)rB   rD   r<   r:   r=   r?   rC   )rE   sizehintframer"   r"   r%   
get_buffer   s    zDaskCommProtocol.get_bufferc                 C  sL   |dkrd S | j r*|  j|7  _|   n|  j|8  _|  sH|   d S Nr   )r:   r?   _parse_default_bufferrD   _frames_check_remaining_message_completed)rE   nbytesr"   r"   r%   buffer_updated   s    
zDaskCommProtocol.buffer_updatedc                 C  sF   | j dkr|  sq:t| j| j k r.|  s.q:|  s q:q |   dS )z)Parse all messages in the default buffer.N)r@   _parse_nframesr   rA   _parse_frame_lengths_parse_frames_reset_default_bufferrS   r"   r"   r%   ri      s    
z&DaskCommProtocol._parse_default_bufferc                 C  sJ   | j | j dkrFtjd| j| jd dd | _|  jd7  _g | _dS dS )zlFill in `_nframes` from the default buffer. Returns True if
        successful, False if more data is neededr,   z<Q   offsetr   TF)r?   r>   structunpack_fromr=   r@   rA   rS   r"   r"   r%   rn      s      zDaskCommProtocol._parse_nframesc                 C  s   | j t| j }| j| j d }t||}| jtjd| d| j	| jd |  jd| 7  _||krdd | jD | _
d| _| jr| jd nd| _dS d	S )
zrFill in `_frame_lengths` from the default buffer. Returns True if
        successful, False if more data is neededrr   <Qrs   c                 S  s   g | ]}t |qS r"   )r   ).0nr"   r"   r%   
<listcomp>  s     z9DaskCommProtocol._parse_frame_lengths.<locals>.<listcomp>r   TF)r@   r   rA   r?   r>   minextendru   rv   r=   rB   rC   rD   )rE   needed	availablen_readr"   r"   r%   ro      s$    

  z%DaskCommProtocol._parse_frame_lengthsc                 C  sH   | j r
dS |  jd7  _| j| jk r>| j| j | _ | j rBdS q
dS q
d S )NTr   F)rD   rC   r@   rA   rS   r"   r"   r%   rj     s    z(DaskCommProtocol._frames_check_remainingc                 C  s   | j | j }|  s$|   t|S |s,dS | j| j }t| j|}| j	| j| j|  || j || j pld< |  j|7  _|  j|8  _q dS )zkFill in `_frames` from the default buffer. Returns True if
        successful, False if more data is neededFN)
r?   r>   rj   rk   boolrB   rC   r|   rD   r=   )rE   r   rf   r   r"   r"   r%   rp     s      
zDaskCommProtocol._parse_framesc                 C  sb   | j }| j}||k rJ|dkrJ| j|| | jd|| < d| _ || | _n||kr^d| _ d| _dS )z0Reset the default buffer for the next read eventr   N)r>   r?   r=   )rE   startendr"   r"   r%   rq   4  s    z&DaskCommProtocol._reset_default_bufferc                 C  s*   | j | j d| _d| _d| _d| _dS )zAPush a completed message to the queue and reset per-message stateNr   )r4   
put_nowaitrB   r@   rA   Z_frame_nbytes_remainingrS   r"   r"   r%   rk   C  s
    z#DaskCommProtocol._message_completedc                 C  sR   d | _ | jd  | jt | jrN| j}|d k	rNd | _| sN|	t
d d S )NConnection closed)r5   r9   
set_resultr4   r   _COMM_CLOSEDr6   r7   doneZset_exceptionr   )rE   excwaiterr"   r"   r%   connection_lostK  s    z DaskCommProtocol.connection_lostc                 C  s
   d| _ d S )NT)r6   rS   r"   r"   r%   pause_writingZ  s    zDaskCommProtocol.pause_writingc                 C  s0   d| _ | j}|d k	r,d | _| s,|d  d S )NF)r6   r7   r   r   )rE   r   r"   r"   r%   resume_writing]  s    zDaskCommProtocol.resume_writingr   r   c                   s8   | j dk	r,| j  I dH }|tk	r&|S d| _ tddS )z$Read a single message from the comm.Nr   )r4   getr   r   )rE   outr"   r"   r%   readf  s    
zDaskCommProtocol.readr   )framesr   c                   s   | j rtdn | jr0| j  }| _|I dH  dd |D }t|}dd |D }t||d d  }tj	|d  d	||f| }|d
k rd
|f|g}nt|f|}t|dkr| j| n| j|d  |S )zWrite a message to the comm.r   Nc                 S  s"   g | ]}t |trt|n|qS r"   )
isinstance
memoryviewr   ry   fr"   r"   r%   r{   {  s    z*DaskCommProtocol.write.<locals>.<listcomp>c                 S  s   g | ]}t |qS r"   )r   r   r"   r"   r%   r{     s     r   rr   rJ   rx   i   r   r   )rM   r   r6   r2   r8   r7   r   sumru   packr    r)   r5   
writelineswrite)rE   r   Zdrain_waiterZnframesZframes_nbytesZ
msg_nbytesheaderr   r"   r"   r%   r   q  s&    

zDaskCommProtocol.write)Nr+   )N)__name__
__module____qualname____doc__r.   propertyrO   rQ   rM   rW   r\   r^   rd   rg   rm   ri   rn   ro   rj   rp   rq   rk   r   r   r   r   r   __classcell__r"   r"   rF   r%   r*   T   s4   *


	
	r*   c                      s   e Zd ZejejdZdddddd fddZ	d	d
 Z
eddddZeddddZdddZd ddZdd Zdd Zdd Zedd Z  ZS )!TCPzdistributed.comm.shardTr*   strr   )protocolrO   rQ   deserializec                   sT   || _ || _|| _d| _t j|d t| | j jt	| | _
d| j
_|  | _d S )NFr   )	_protocol_local_addr
_peer_addr_closedr-   r.   weakreffinalizer\   repr
_finalizeratexit_get_extra_info_extra_info)rE   r   rO   rQ   r   rF   r"   r%   r.     s      zTCP.__init__c                 C  s   i S rR   r"   rS   r"   r"   r%   r     s    zTCP._get_extra_infor   c                 C  s   | j S rR   )r   rS   r"   r"   r%   local_address  s    zTCP.local_addressc                 C  s   | j S rR   )r   rS   r"   r"   r%   peer_address  s    zTCP.peer_addressNc                   sV   | j  I d H }zt|| j|| jdI d H W S  tk
rP   |   tdY nX d S )N)r   deserializersallow_offloadz aborted stream on truncated data)r   r   r   r   r   EOFErrorrT   r   )rE   r   r   r"   r"   r%   r     s    zTCP.readmessagec                   sD   t || j||| j| jd| j| jdI d H }| j|I d H }|S )N)ZsenderZ	recipient)r   serializerson_errorcontextZframe_split_size)r   r   Z
local_infoZremote_infoZhandshake_optionsmax_shard_sizer   r   )rE   msgr   r   r   rl   r"   r"   r%   r     s    z	TCP.writec                   s   | j  I dH  | j  dS )zFlush and close the commN)r   r^   r   detachrS   r"   r"   r%   r]     s    z	TCP.closec                 C  s   | j   | j  dS )zHard close the commN)r   rW   r   r   rS   r"   r"   r%   rT     s    
z	TCP.abortc                 C  s   | j jS rR   )r   rM   rS   r"   r"   r%   closed  s    z
TCP.closedc                 C  s   | j S rR   )r   rS   r"   r"   r%   
extra_info  s    zTCP.extra_info)T)N)Nr   )r   r   r   daskutilsZparse_bytesconfigr   r   r.   r   r   r   r   r   r   r]   rT   r   r   r   r"   r"   rF   r%   r     s    

r   c                   @  s   e Zd Zdd ZdS )TLSc                 C  s   | j jj}|d|ddS )Npeercertcipher)r   r   )r   r5   rN   )rE   r   r"   r"   r%   r     s    
zTLS._get_extra_infoN)r   r   r   r   r"   r"   r"   r%   r     s   r   c                 C  s(   |  d}t|tjs$td||S )Nssl_contextzpTLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?) Instead got )r   r   ssl
SSLContext	TypeError)connection_argsctxr"   r"   r%   _expect_tls_context  s    
r   c                 K  s    | drtdd|  f d S )NZrequire_encryptionzLencryption required by Dask configuration, refusing communication from/to %rtcp://)r   r[   )addresskwargsr"   r"   r%   _error_if_require_encryption  s    
r   c                   @  s&   e Zd ZdZeZdddZdd ZdS )	TCPConnectorr   Tc                   sf   t  }t|\}}| j|f|}|jt||f|I d H \}}| j|j }	| j| }
| j||	|
|dS )Nr   )	r0   r1   r   _get_extra_kwargscreate_connectionr*   prefixrO   
comm_class)rE   r   r   r   loopipportrV   r   rO   rQ   r"   r"   r%   connect  s      
zTCPConnector.connectc                 K  s   t |f| i S rR   r   rE   r   r   r"   r"   r%   r     s    zTCPConnector._get_extra_kwargsN)T)r   r   r   r   r   r   r   r   r"   r"   r"   r%   r     s   
r   c                   @  s   e Zd ZdZeZdd ZdS )TLSConnectortls://c                 K  s   t |}d|iS Nr   r   rE   r   r   r   r"   r"   r%   r     s    zTLSConnector._get_extra_kwargsNr   r   r   r   r   r   r   r"   r"   r"   r%   r     s   r   c                   @  sn   e Zd ZdZeZdddZdd Zd	d
 Zdd Z	dd Z
dd Zdd Zdd Zedd Zedd ZdS )TCPListenerr   TNr   c                 K  sD   t ||\| _| _|| _|| _|| _|| _| j|f|| _d | _	d S rR   )
r   r   r   default_hostcomm_handlerr   r   r   _extra_kwargsbound_address)rE   r   r   r   r   r   default_portr   r"   r"   r%   r.   %  s    
zTCPListener.__init__c                 K  s   t |f| i S rR   r   r   r"   r"   r%   r   7  s    zTCPListener._get_extra_kwargsc                 C  s@   | j || j|j | j|j | jd}| j|_t| | d S )N)rO   rQ   r   )	r   r   rO   rQ   r   r   r0   Zensure_future_comm_handler)rE   r   commr"   r"   r%   _on_connection;  s    

zTCPListener._on_connectionc                   sJ   z|  |I d H  W n  tk
r4   td Y d S X | |I d H  d S )Nz,Connection closed before handshake completed)r/   r   rY   debugr   )rE   r   r"   r"   r%   r   E  s    
zTCPListener._comm_handlerc                   s  t  }|jddtjtjtjddI dH }t|dd d}g }d}z0|D ]$}|\}}}}	}
zt|||}W n tk
r   Y qJY nX |t	tddkrt
tdr|tjtjd	 |dk	r|
d |f|
d
d }
z||
 W n@ tk
r$ } z t|jd|
|j f dW 5 d}~X Y nX |dkr<| d }|j fddfd|i jI dH }|| d}qJW n> tk
r   |D ]}|  q|dk	r|   Y nX |S )a8  Due to a design decision in asyncio, listening on `("", 0)` will
        result in two different random ports being used (one for IPV4, one for
        IPV6), rather than both interfaces sharing the same random port. We
        work around this here. See https://bugs.python.org/issue45693 for more
        info.Nr   )familyr`   flagsprotoc                 S  s
   | d j S rh   )name)xr"   r"   r%   <lambda>^  r   zDTCPListener._start_all_interfaces_with_random_port.<locals>.<lambda>)keyAF_INET6IPPROTO_IPV6TrJ   z0error while attempting to bind on address %r: %sr   c                     s
   t  jS rR   r*   r   r"   rS   r"   r%   r     r   sock)r0   r1   getaddrinfosocket	AF_UNSPECSOCK_STREAM
AI_PASSIVEsortedOSErrorgetattrhasattr
setsockoptr   IPV6_V6ONLYbinderrnostrerrorlowergetsocknamecreate_serverr   r   BaseExceptionr]   )rE   r   Zinfosserversr   resafsocktyper   	canonnamesar   errserverr"   rS   r%   &_start_all_interfaces_with_random_portM  sn    	

 




z2TCPListener._start_all_interfaces_with_random_portc                   s\   t  } js$ js$  I d H }n.|j fddf j jd jI d H g}| _d S )Nc                     s
   t  jS rR   r   r"   rS   r"   r%   r     r   z#TCPListener.start.<locals>.<lambda>)hostr   )r0   r1   r   r   r  r  r   _servers)rE   r   r	  r"   rS   r%   r     s    

zTCPListener.startc                 C  s   | j D ]}|  qd S rR   )r  r]   )rE   r  r"   r"   r%   stop  s    
zTCPListener.stopc                 C  s8   | j dkr2dd }|| jd }| dd | _ | j S )z@
        The listening address as a (host, port) tuple.
        Nc                 S  s@   t jt jfD ]&}| jD ]}|j|kr|    S qqtdd S )NzNo active INET socket found?)r   AF_INETr   Zsocketsr   r[   )r  r   r   r"   r"   r%   
get_socket  s
    

z-TCPListener.get_host_port.<locals>.get_socketr   rJ   )r   r  r  )rE   r  r   r"   r"   r%   get_host_port  s
    
zTCPListener.get_host_portc                 C  s   | j t|    S )z4
        The listening address as a string.
        )r   r   r  rS   r"   r"   r%   listen_address  s    zTCPListener.listen_addressc                 C  s*   |   \}}t|| jd}| jt|| S )z2
        The contact address as a string.
        )r   )r  r   r   r   r   )rE   r  r   r"   r"   r%   contact_address  s    zTCPListener.contact_address)TTNr   )r   r   r   r   r   r   r.   r   r   r   r  r   r  r  r   r  r  r"   r"   r"   r%   r   !  s$       

L
r   c                   @  s   e Zd ZdZeZdd ZdS )TLSListenerr   c                 K  s   t |}d|iS r   r   r   r"   r"   r%   r     s    zTLSListener._get_extra_kwargsNr   r"   r"   r"   r%   r    s   r  c                   @  sD   e Zd ZeZeZdd Zdd Zdd Z	dd Z
d	d
 Zdd ZdS )
TCPBackendc                 C  s   |   S rR   )_connector_classrS   r"   r"   r%   get_connector  s    zTCPBackend.get_connectorc                 K  s   | j |||f|S rR   )_listener_class)rE   locZhandle_commr   r   r"   r"   r%   get_listener  s    zTCPBackend.get_listenerc                 C  s   t |d S rh   r   rE   r  r"   r"   r%   get_address_host  s    zTCPBackend.get_address_hostc                 C  s   t |S rR   r!  r"  r"   r"   r%   get_address_host_port  s    z TCPBackend.get_address_host_portc                 C  s   t |\}}tt||S rR   )r   r   r   )rE   r  r  r   r"   r"   r%   resolve_address  s    zTCPBackend.resolve_addressc                 C  s8   t |\}}t|}d|kr&t|}nt|}t|d S )N:)r   r   r   r   r   )rE   r  r  r   Z
local_hostr"   r"   r%   get_local_address_for  s    
z TCPBackend.get_local_address_forN)r   r   r   r   r  r   r  r  r   r#  r$  r%  r'  r"   r"   r"   r%   r    s   r  c                   @  s   e Zd ZeZeZdS )
TLSBackendN)r   r   r   r   r  r  r  r"   r"   r"   r%   r(    s   r(  c                   @  s$  e Zd ZdZeejdrRejdkr(dZqVze	
dZW qV ek
rN   dZY qVX ndZdd Zd4d
d
ddddZdd Zdd Zdd ZdddddZddddZddddd Zdddd!d"Zd#dd$d%d&Zddd'd(Zddd)d*Zd+d,d-d.d/Zddd0d1Zddd2d3Zd	S )5rb   zThe builtin socket transport in asyncio makes a bunch of copies, which
    can make sending large amounts of data much slower. This hacks around that.

    Note that this workaround isn't used with the windows ProactorEventLoop or
    uvloop.sendmsgwin32r,   
SC_IOV_MAXr   c                 C  sj   || _ || _t | _dD ]}t||stqdD ]}t| j|s2tq2t | _	d| _
d| _|   d S )N)_sock_sock_fd_fatal_error_eof_closing
_conn_lost_call_connection_lost)_add_writer_remove_writerr   F)r   rV   r0   r1   r2   r   AssertionErrorcollectionsdeque_buffers_size_protocol_pausedrc   )rE   r   rV   attrr"   r"   r%   r.     s    
	
z_ZeroCopyWriter.__init__Nz
int | NoneNone)r_   lowr   c                 C  sF   |dkr|dkrd}nd| }|dkr.|d }|| _ || _|   dS )zSet the write buffer limitsNr      )_high_water
_low_water_maybe_pause_protocol)rE   r_   r=  r"   r"   r%   rc   4  s    z'_ZeroCopyWriter.set_write_buffer_limitsc                 C  s&   | j s"| j| jkr"d| _ | j  dS )z;If the high water mark has been reached, pause the protocolTN)r:  r9  r?  r   r   rS   r"   r"   r%   rA  F  s    z%_ZeroCopyWriter._maybe_pause_protocolc                 C  s&   | j r"| j| jkr"d| _ | j  dS )z<If the low water mark has been reached, unpause the protocolFN)r:  r9  r@  r   r   rS   r"   r"   r%   _maybe_resume_protocolL  s    z&_ZeroCopyWriter._maybe_resume_protocolc                 C  s   | j   d| _dS )zClear the send bufferr   N)r8  r!   r9  rS   r"   r"   r%   _buffer_clearR  s    
z_ZeroCopyWriter._buffer_clearbytes)datar   c                 C  s*   t |}|  jt|7  _| j| dS )z"Append new data to the send bufferN)r   r9  r   r8  r   )rE   rE  mvr"   r"   r%   _buffer_appendW  s    z_ZeroCopyWriter._buffer_appendzlist[memoryview]r   c                 C  s   t t| j| jS )z.Get one or more buffers to write to the socket)listr   r8  SENDMSG_MAX_COUNTrS   r"   r"   r%   _buffer_peek]  s    z_ZeroCopyWriter._buffer_peekr   )r(   r   c                 C  sZ   |  j |8  _ | j}|rV|d }t|}||krB|  ||8 }q||d |d< qVqdS )z*Advance the buffer index forward by `size`r   N)r9  r8  r   popleft)rE   r(   r   r'   Zb_lenr"   r"   r%   _buffer_advancea  s    
z_ZeroCopyWriter._buffer_advancec              
   C  s   | j }|jrtd|sd S |jr&d S | jsz|j|}W nb ttfk
rT   Y n` t	t
fk
rl    Y nH tk
r } z||d W Y d S d }~X Y nX ||d  }|sd S | j|j| j | | |   d S )Nz%Cannot call write() after write_eof()%Fatal write error on socket transport)rV   r/  r[   r1  r8  r,  sendBlockingIOErrorInterruptedError
SystemExitKeyboardInterruptr  r.  r2   r3  r-  _on_write_readyrG  rA  )rE   rE  rV   rz   r   r"   r"   r%   r   p  s.    
z_ZeroCopyWriter.writer   )r   r   c              
   C  s   t | j}|D ]}| | q|sz|   W nd ttfk
rF   Y nN ttfk
r^    Y n6 tk
r } z| j	
|d W Y d S d }~X Y nX | jsd S | j| j	j| j |   d S NrM  )r   r8  rG  _do_bulk_writerO  rP  rQ  rR  r  rV   r.  r2   r3  r-  rS  rA  )rE   r   Zwaitingr'   r   r"   r"   r%   r     s(    
 z_ZeroCopyWriter.writelinesc                 C  s   |    | j S rR   )rC  rV   r]   rS   r"   r"   r%   r]     s    z_ZeroCopyWriter.closec                 C  s   |    | j S rR   )rC  rV   rT   rS   r"   r"   r%   rT     s    z_ZeroCopyWriter.abortr   r   )r   r   c                 C  s   | j |S rR   )rV   rN   )rE   r   r"   r"   r%   rN     s    z_ZeroCopyWriter.get_extra_infoc                 C  sD   |   }t|dkr(| jj|d }n| jj|}| | d S )Nr   r   )rJ  r   rV   r,  rN  r)  rL  )rE   r   rz   r"   r"   r%   rU    s
    z_ZeroCopyWriter._do_bulk_writec              
   C  s   | j }|jrd S z|   W nt ttfk
r4   Y n ttfk
rL    Y n tk
r } z(| j	|j
 | j  ||d W 5 d }~X Y nDX |   | js| j	|j
 |jr|d  n|jr|jtj d S rT  )rV   r1  rU  rO  rP  rQ  rR  r  r2   r4  r-  r8  r!   r.  rB  r0  r2  r/  r,  shutdownr   SHUT_WR)rE   rV   r   r"   r"   r%   rS    s(    
z_ZeroCopyWriter._on_write_ready)NN)r   r   r   r   r   r   sysplatformrI  ossysconf	Exceptionr.   rc   rA  rB  rC  rG  rJ  rL  r   r   r]   rT   rN   rU  rS  r"   r"   r"   r%   rb     s4   
!   rb   )r   r   )7
__future__r   r0   r6  loggingrZ  r   r   ru   rX  r   	itertoolsr   typingr   r   Zdistributed.comm.addressingr   r   Zdistributed.comm.corer   r   r	   r
   Zdistributed.comm.registryr   Zdistributed.comm.utilsr   r   r   r   Zdistributed.utilsr   r   r   r   	getLoggerr   rY   objectr   r)   ZBufferedProtocolr*   r   r   r   r   r   r   r   r  r  r(  rb   r"   r"   r"   r%   <module>   sJ   
  2  DT	 /	