U
    d/e\                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dlm	Z	 ddl
mZ ddl
mZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z m!Z!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z( ddl)m*Z* ddl+m,Z,m-Z- ddl.m/Z/ ddl0m1Z1m2Z2 ddl3m4Z4 e 5e6Z7G dd de8Z9G dd de8Z:G dd  d e8Z;dS )!    N)deque)contextmanager)time)urlparse   )defines)errors)RowOrientedBlock)BlockStreamProfileInfo)BufferedSocketReader)BufferedSocketWriter)
ClientInfo)get_compressor_cls)Context)	log_block)Progress)CompressionClientPacketTypesServerPacketTypes)QueryProcessingStage)read_binary_str)read_exception)write_settings)BlockInputStreamBlockOutputStream)	threading)write_varintread_varint)write_binary_strc                       s   e Zd Z fddZ  ZS )Packetc                    s6   d | _ d | _d | _d | _d | _d | _tt|   d S N)	typeblock	exceptionprogressprofile_infomultistring_messagesuperr   __init__self	__class__ @/tmp/pip-unpacked-wheel-mds_o03w/clickhouse_driver/connection.pyr(   "   s    zPacket.__init__)__name__
__module____qualname__r(   __classcell__r-   r-   r+   r.   r   !   s   r   c                       s,   e Zd Z fddZdd Zdd Z  ZS )
ServerInfoc                    s<   || _ || _|| _|| _|| _|| _|| _tt| 	  d S r    )
nameversion_majorversion_minorversion_patchrevisiontimezonedisplay_namer'   r3   r(   )r*   r4   r5   r6   r7   r8   r9   r:   r+   r-   r.   r(   .   s    zServerInfo.__init__c                 C   s   | j | j| jfS r    )r5   r6   r7   r)   r-   r-   r.   version_tuple:   s    zServerInfo.version_tuplec                 C   sZ   d| j | j| jf }d| jfd|fd| jfd| jfd| jfg}ddd	 |D }d
| S )Nz%s.%s.%sr4   versionr8   r9   r:   z, c                 s   s   | ]\}}d  ||V  qdS )z{}={}N)format).0keyvaluer-   r-   r.   	<genexpr>I   s     z&ServerInfo.__repr__.<locals>.<genexpr>z<ServerInfo(%s)>)r5   r6   r7   r4   r8   r9   r:   join)r*   r<   itemsparamsr-   r-   r.   __repr__=   s      zServerInfo.__repr__)r/   r0   r1   r(   r;   rE   r2   r-   r-   r+   r.   r3   -   s   r3   c                       s<  e Zd ZdZdejejejejej	ej
ejejddddddddddf fdd	Zdd Zd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd>d%d&Zd'd( Zd)d* Zd+d, Z d-d. Z!d?d0d1Z"d@d2d3Z#d4d5 Z$dAd6d7Z%e&d8d9 Z'd:d; Z(d<d= Z)  Z*S )B
Connectiona	  
    Represents connection between client and ClickHouse server.

    :param host: host with running ClickHouse server.
    :param port: port ClickHouse server is bound to.
                 Defaults to ``9000`` if connection is not secured and
                 to ``9440`` if connection is secured.
    :param database: database connect to. Defaults to ``'default'``.
    :param user: database user. Defaults to ``'default'``.
    :param password: user's password. Defaults to ``''`` (no password).
    :param client_name: this name will appear in server logs.
                        Defaults to ``'python-driver'``.
    :param connect_timeout: timeout for establishing connection.
                            Defaults to ``10`` seconds.
    :param send_receive_timeout: timeout for sending and receiving data.
                                 Defaults to ``300`` seconds.
    :param sync_request_timeout: timeout for server ping.
                                 Defaults to ``5`` seconds.
    :param compress_block_size: size of compressed block to send.
                                Defaults to ``1048576``.
    :param compression: specifies whether or not use compression.
                        Defaults to ``False``. Possible choices:

                            * ``True`` is equivalent to ``'lz4'``.
                            * ``'lz4'``.
                            * ``'lz4hc'`` high-compression variant of
                              ``'lz4'``.
                            * ``'zstd'``.

    :param secure: establish secure connection. Defaults to ``False``.
    :param verify: specifies whether a certificate is required and whether it
                   will be validated after connection.
                   Defaults to ``True``.
    :param ssl_version: see :func:`ssl.wrap_socket` docs.
    :param ca_certs: see :func:`ssl.wrap_socket` docs.
    :param ciphers: see :func:`ssl.wrap_socket` docs.
    :param keyfile: see :func:`ssl.wrap_socket` docs.
    :param certfile: see :func:`ssl.wrap_socket` docs.
    :param alt_hosts: list of alternative hosts for connection.
                      Example: alt_hosts=host1:port1,host2:port2.
    :param settings_is_important: ``False`` means unknown settings will be
                                  ignored, ``True`` means that the query will
                                  fail with UNKNOWN_SETTING error.
                                  Defaults to ``False``.
    NFTc                    s  |rt j}nt j}t||p|fg| _|r^|dD ](}td| }| j|j|j	pV|f q4|| _
|| _|| _t jd | | _|| _|| _|	| _|| _|| _|| _i }|d k	r||d< |d k	r||d< |d k	r||d< |d k	r||d< |d k	r||d< || _|d	krd
}|dkr,tj| _d | _d | _ntj| _t|| _|
| _d | _d | _d | _d| _ d | _!d | _"t# | _$d | _%d | _&d | _'t() | _*d| _+t,t-| .  d S )N,zclickhouse:// ssl_versionca_certscipherskeyfilecertfileTZlz4F)/r   ZDEFAULT_SECURE_PORTZDEFAULT_PORTr   hostssplitr   appendhostnameportdatabaseuserpasswordZ	DBMS_NAMEclient_nameconnect_timeoutsend_receive_timeoutsync_request_timeoutsettings_is_importantsecure_socketverify_certssl_optionsr   ZDISABLEDcompressioncompressor_clscompress_block_sizeZENABLEDr   socketfinfout	connectedclient_trace_contextserver_infor   contextblock_in	block_outblock_in_rawr   Lock_lockis_query_executingr'   rF   r(   )r*   hostrR   rS   rT   rU   rV   rW   rX   rY   r`   r^   secureverifyrI   rJ   rK   rL   rM   Z	alt_hostsrZ   default_porturlr]   r+   r-   r.   r(   |   sh    



zConnection.__init__c                 C   s   d | j| jS )Nz{}:{})r=   rn   rR   r)   r-   r-   r.   get_description   s    zConnection.get_descriptionc                 C   s6   |    | js|   n|  s2td |   d S )Nz$Connection was closed, reconnecting.)check_query_executionrd   connectpingloggerwarningr)   r-   r-   r.   force_connect   s    

zConnection.force_connectc                 C   s   i }| j r0| jrtj}ntj}| j }||d< d}t||dtj	D ]}|\}}}	}
}d}zLt|||	}|
| j | j r| |}|j||d}|| |W   S  tjk
r } z|}|dk	r|  W 5 d}~X Y qFX qF|dk	r|n
tddS )zp
        Acts like socket.create_connection, but wraps socket with SSL
        if connection is secure.
        	cert_reqsNr   )server_hostnamez!getaddrinfo returns an empty list)r[   r\   sslCERT_REQUIRED	CERT_NONEr]   copyra   getaddrinfoSOCK_STREAM
settimeoutrW   _create_ssl_contextwrap_socketru   errorclose)r*   rn   rR   r]   rz   errresafsocktypeproto	canonnamesasockssl_context_r-   r-   r.   _create_socket   s4    



zConnection._create_socketc                 C   s   t jj}|dt j}t |}d|kr8||d  n|dt jkrR|| d|krh|	|d  d|krz|d |_
d|kr|d}|j|d |d |S )NrI   rJ   rz   rK   rM   rL   )rL   )r|   PurposeSERVER_AUTHgetPROTOCOL_TLS
SSLContextload_verify_locationsr~   load_default_certsset_ciphersoptionsload_cert_chain)r*   r]   purposer<   rg   rL   r-   r-   r.   r   	  s    



zConnection._create_ssl_contextc                 C   s   |  ||| _d| _|| | _| _| j| j | jtjtj	d t
| jtj| _t| jtj| _|   |   |  | _t| j| j| _|  | _d S )NTr   )r   ra   rd   rn   rR   r   rX   
setsockoptIPPROTO_TCPTCP_NODELAYr   r   BUFFER_SIZErb   r   rc   
send_helloreceive_helloget_block_in_streamrh   r   rg   rj   get_block_out_streamri   )r*   rn   rR   r-   r-   r.   _init_connection   s    
zConnection._init_connectionc                 C   s$   |j r|j d nd}|d|| S )NrH    z({}:{}))strerrorr=   )r*   ern   rR   r   r-   r-   r.   _format_connection_error3  s    z#Connection._format_connection_errorc                 C   s2  | j r|   td| j| j d }tt| jD ]}| jd \}}td|| z| 	||W   S  t
jk
r } z6|   tjd||dd | |||}t|}W 5 d }~X Y nX t
jk
r } z6|   tjd||dd | |||}t|}W 5 d }~X Y nX | jd q2|d k	r.|d S )Nz"Connecting. Database: %s. User: %sr   zConnecting to %s:%szFailed to connect to %s:%sT)exc_info)rd   
disconnectrw   debugrS   rT   rangelenrN   r   ra   timeoutrx   r   r   ZSocketTimeoutErrorr   ZNetworkErrorrotate)r*   r   irn   rR   r   err_strr-   r-   r.   ru   7  sH            
zConnection.connectc                 C   sL   d | _ d | _d | _d | _d | _d| _d | _d | _d | _d | _	d | _
d| _d S )NF)rn   rR   ra   rb   rc   rd   re   rf   rh   rj   ri   rm   r)   r-   r-   r.   reset_state\  s    zConnection.reset_statec              
   C   sr   | j rVz| jtj W n0 tjk
rH } ztd| W 5 d}~X Y nX | j  n| jrf| j  |   dS )zk
        Closes connection between server and client.
        Frees resources: e.g. closes socket.
        zError on socket shutdown: %sN)	rd   ra   shutdown	SHUT_RDWRr   rw   rx   r   r   )r*   r   r-   r-   r.   r   n  s    
zConnection.disconnectc                 C   s~   t tj| j t| j| j t tj| j t tj| j t tj	| j t| j
| j t| j| j t| j| j | j  d S r    )r   r   HELLOrc   r   rV   r   ZCLIENT_VERSION_MAJORZCLIENT_VERSION_MINORZCLIENT_REVISIONrS   rT   rU   flushr)   r-   r-   r.   r     s    zConnection.send_helloc           
      C   s   t | j}|tjkrt| j}t | j}t | j}t | j}d }|tjkrTt| j}d}|tjkrlt| j}|}|tjkrt | j}t	|||||||| _
| j
| j_
td||||| n2|tjkr|  n| d|}	|   t|	d S )Nr   z5Connected to %s server version %s.%s.%s, revision: %szHello or Exception)r   rb   r   r   r   r   Z&DBMS_MIN_REVISION_WITH_SERVER_TIMEZONEZ*DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAMEZ$DBMS_MIN_REVISION_WITH_VERSION_PATCHr3   rf   rg   rw   r   	EXCEPTIONreceive_exceptionunexpected_packet_messager   r   UnexpectedPacketFromServerError)
r*   packet_typeZserver_nameZserver_version_majorZserver_version_minorZserver_revisionZserver_timezoneZserver_display_nameZserver_version_patchmessager-   r-   r.   r     s\    








    
   

zConnection.receive_helloc                 C   s   | j }| | zdttj| j | j  t| j}|t	j
krT|   t| j}q6|t	jkrt| d|}t|W n` tjk
r    Y nJ tjtfk
r } z&td|  | W Y W 5 Q R  dS d }~X Y nX W 5 Q R X dS )NZPongzError on %s ping: %sFT)rY   timeout_setterr   r   ZPINGrc   r   r   rb   r   PROGRESSreceive_progressZPONGr   r   r   Errorra   r   EOFErrorrw   rx   rs   )r*   r   r   msgr   r-   r-   r.   rv     s,    



  ,zConnection.pingc                 C   sb  t  }t| j |_}|tjkr2| jdd|_n,|tjkrJ| 	 |_
n|tjkr`|  |_n|tjkrv|  |_n|tjkr|  |_n|tjkr|  |_n|tjkr| jdd|_t|j n|tjkrd| _n|tjkr| ||_nn|tjkr|  |_nV|tjkr |  |_n>|tjkr<| jdd|_n"d||  }|   t !||S )NT)may_be_use_numpyF)may_be_compressedz Unknown packet {} from server {})"r   r   rb   r!   r   DATAreceive_datar"   r   r   r#   r   r   r$   ZPROFILE_INFOreceive_profile_infor%   ZTOTALSZEXTREMESZLOGr   ZEND_OF_STREAMrm   ZTABLE_COLUMNSreceive_multistring_messager&   Z
PART_UUIDSZREAD_TASK_REQUESTZPROFILE_EVENTSr=   rs   r   r   ZUnknownPacketFromServerError)r*   packetr   r   r-   r-   r.   receive_packet  sJ    








 
zConnection.receive_packetc                 C   s2   | j r ddlm} || j| jS t| j| jS d S )Nr   )CompressedBlockInputStream)r^   streams.compressedr   rb   rg   r   )r*   r   r-   r-   r.   r     s    zConnection.get_block_in_streamc                 C   s:   | j r(ddlm} || j| j| j| jS t| j| jS d S )Nr   )CompressedBlockOutputStream)r^   r   r   r_   r`   rc   rg   r   )r*   r   r-   r-   r.   r     s      zConnection.get_block_out_streamc                 C   sD   | j j}|tjkrt| j |r&| jn| j}|s4dnd }|j|dS )NF)	use_numpy)	rf   r8   r   'DBMS_MIN_REVISION_WITH_TEMPORARY_TABLESr   rb   rh   rj   read)r*   r   r   r8   readerr   r-   r-   r.   r   (  s    

zConnection.receive_datac                 C   s
   t | jS r    )r   rb   r)   r-   r-   r.   r   2  s    zConnection.receive_exceptionc                 C   s   t  }|| jj| j |S r    )r   r   rf   r8   rb   )r*   r$   r-   r-   r.   r   5  s    zConnection.receive_progressc                 C   s   t  }|| j |S r    )r
   r   rb   )r*   r%   r-   r-   r.   r   :  s    zConnection.receive_profile_infoc                    s    t |} fddt|D S )Nc                    s   g | ]}t  jqS r-   )r   rb   )r>   Z_ir)   r-   r.   
<listcomp>A  s     z:Connection.receive_multistring_message.<locals>.<listcomp>)r   Zstrings_in_messager   )r*   r   numr-   r)   r.   r   ?  s    
z&Connection.receive_multistring_messager   c                 C   sV   t  }ttj| j | jj}|tjkr2t	|| j | j
| td|t  |  d S )NzBlock "%s" send time: %f)r   r   r   r   rc   rf   r8   r   r   r   ri   writerw   r   )r*   r"   
table_namestartr8   r-   r-   r.   	send_dataC  s    
zConnection.send_datac                 C   s   | j s|   ttj| j t|p$d| j | jj}|t	j
krdt| j| j}tjj|_||| j |t	jk}t| jj| j|| j |t	jkrtd| j ttj| j t| j| j t|| j td| | j  d S )Nr   z	Query: %s)rd   ru   r   r   ZQUERYrc   r   rf   r8   r   Z"DBMS_MIN_REVISION_WITH_CLIENT_INFOr   rV   rg   Z	QueryKindZINITIAL_QUERYZ
query_kindr   Z5DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGSr   settingsrZ   Z)DBMS_MIN_REVISION_WITH_INTERSERVER_SECRETr   ZCOMPLETEr^   rw   r   r   )r*   queryZquery_idr8   Zclient_infoZsettings_as_stringsr-   r-   r.   
send_queryN  s*    


zConnection.send_queryc                 C   s   t tj| j | j  d S r    )r   r   ZCANCELrc   r   r)   r-   r-   r.   send_cancelp  s    zConnection.send_cancelc                    s   |pg D ]}|d s&t d|d |d  t}| jjd rrddlm} dd	 |d D } fd
d	|D  |}||d  |d}| j||d d q| t  d S )NZ	structurezEmpty table "{}" structurer4   datar   r   )NumpyColumnOrientedBlockc                 S   s   g | ]}|d  qS )r   r-   )r>   xr-   r-   r.   r     s     z3Connection.send_external_tables.<locals>.<listcomp>c                    s   g | ]} | j qS r-   )values)r>   columnr   r-   r.   r     s     )types_check)r   )
ValueErrorr=   r	   rg   Zclient_settingsZnumpy.blockr   r   )r*   Ztablesr   tableZ	block_clsr   columnsr"   r-   r   r.   send_external_tablesu  s"    
zConnection.send_external_tablesc                 c   s,   | j  }| j | d V  | j | d S r    )ra   
gettimeoutr   )r*   Znew_timeoutZold_timeoutr-   r-   r.   r     s    
zConnection.timeout_setterc                 C   s   t |}d|  ||S )Nz6Unexpected packet from server {} (expected {}, got {}))r   Zto_strr=   rs   )r*   expectedr   r-   r-   r.   r     s    
  z$Connection.unexpected_packet_messagec                 C   s0   | j jdd | jrt d| _| j   d S )NF)blockingT)rl   acquirerm   r   ZPartiallyConsumedQueryErrorreleaser)   r-   r-   r.   rt     s
    z Connection.check_query_execution)TF)r   )N)F)+r/   r0   r1   __doc__r   ZDEFAULT_DATABASEZDEFAULT_USERZDEFAULT_PASSWORDZCLIENT_NAMEZ DBMS_DEFAULT_CONNECT_TIMEOUT_SECZDBMS_DEFAULT_TIMEOUT_SECZ%DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SECZDEFAULT_COMPRESS_BLOCK_SIZEr(   rs   ry   r   r   r   r   ru   r   r   r   r   rv   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rt   r2   r-   r-   r+   r.   rF   M   sb   /     X
(%.6



"

rF   )<loggingra   r|   collectionsr   
contextlibr   r   urllib.parser   r   r   r   r"   r	   Zblockstreamprofileinfor
   Zbufferedreaderr   Zbufferedwriterr   Z
clientinfor   r^   r   rg   r   logr   r$   r   protocolr   r   r   Zqueryprocessingstager   r   r   Zreadhelpersr   Zsettings.writerr   Zstreams.nativer   r   Zutil.compatr   Zvarintr   r   writerr   	getLoggerr/   rw   objectr   r3   rF   r-   r-   r-   r.   <module>   s<   
 