U
    n/eɆ                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z d dlmZ ddlmZmZ ddlmZ ddlmZ ddlmZmZ ddlmZ d	d
 ZG dd deZG dd deZdS )    N)
itemgetter   )bucket_nameBase)DatasetManager)check_http_status_code)checklistDatanode)find_list_datanode_from_datec                 C   s   | d dkr| d7 } | S )N/ )txtr   r   7/tmp/pip-unpacked-wheel-t2z2wkof/dsmlibrary/datanode.pyappend_slash   s    r   c                	   @   s   e Zd ZdddZdd Zddd	Zd d
dZd!ddZddi fddZd"ddZ	d#ddZ
d$ddZd%ddZd&ejdddZd'ejdddZdS )(DataNodeN Fc              
   K   s  t |tjtjfkr&tdt | |dks:t |tkrLtdt | |dks`t |tkrrtdt | |dkst |tkrd| }| d}|dkr| ||n|}tj	| j
 d| j|||d	|d
d}	|	jdkrtd|	  |	 }
d|ko|d  |d< d|kr<|d r<|dd dd t |tjkr^dd | D }|jdt d|
d  f| jdd| tj| j
|
d| jd}|r|rtj| j
 d|
d  d| jd tj| j
 d|
d  d| jd d|
d |
d  d!}|dkrt|d"kr|rtj|d#d$ tj	| j
 d|
d  d%| jd#|id}zt| W n8 tk
r } z|t|d	d& W 5 d}~X Y nX |d'dd( |S ))ab  
        _summary_

        Args:
            df (dask or pandas DataFrame, required): _description_. Defaults to None.
            directory (int, required): _description_. Defaults to None.
            name (str, required): _description_. Defaults to None.
            description (str, optional): _description_. Defaults to "".
            replace (bool, optional): _description_. Defaults to None.
            profiling (bool, optional): _description_. Defaults to False.
            lineage (list, optional): _description_. Defaults to None.
            
        Returns:
            dict: status
        z]Invalid type expect ddf(dask dataframe) or df(pandas dataframe), Please input `df=`, but got N(Please input data `name`=<str>, but got -Please input data `directory`=<int>, but got r   zdata .parquet/file/F)namedescription	directoryis_usereplaceheadersjson   z&can not create directory in discovery append	overwritec                 S   s   dt j    dS )Nzpart-r   )datetimenow	isoformat)xr   r   r   <lambda>E       z DataNode.write.<locals>.<lambda>T)name_functionignore_divisionsc                 S   s   i | ]\}}|d kr||qS ))r!   r    r(   r)   r   ).0kvr   r   r   
<dictcomp>J   s       z"DataNode.write.<locals>.<dictcomp>s3://r   keyZpyarrow)storage_optionsZengineidZdiscovery_apifile_idheaderz/createDatadict/r   z/createProfileling/path)sucessr3   r6   r   lineagedataZdataName/setLineage/Zlineage_msgr8   createdr8   r8   )typepd	DataFramedd	Exceptionstrint_check_fileExistsrequestspost_discovery_api_jwt_headerstatus_coder   updateitemsZ
to_parquetr   _storage_optionsr   check_file_alreadygetlencheck_lists_intr   )selfdfr   r   r   r   	profilingr8   kwargs_resmeta_file_already	_responserer   r   r   write   s~    

	
"" zDataNode.writec                 C   s  |}| d| }	|  ||	}
|dkr*|	}|
sftj| j d| j|	||d|
d|||diidd}n| j|	|d	}tj| j d| d
| jd}tj|d |	 }|di di }|
|||di tj| j d| d
| jdd|iid}tj|d tj|d |	 }tj| j|d| jd}|d krt|dkr|rtj|dd tj| j d|d  d| jd|id}zt| W n8 tk
r } z|
t|dd W 5 d }~X Y nX |
ddd d|d|d dS )N.r   r   Tversion_file_id)	timestampr3   )r   r   r   r   r   contextr   )r   directory_idr   r5   responsera   r1   r2   r   r8   r9   r;   Fr<   r=   r>   r7   r6   )statusr3   r6   )Z_check_fileExists_no_askrG   rH   rI   rJ   get_file_idrP   r   r   r   rL   patchrO   rQ   rR   rC   rD   )rS   r?   	file_namer3   rb   r8   r   r`   _file_idr   Z_is_file_existsrW   Zlist_datanode_idrX   r_   Z_DataNode__resrY   r[   r\   r   r   r   _write_listfilev   s      zDataNode._write_listfilec                 C   sx   t j|}|d\}}tj d}| d| d| }	| j|||	|dd}
|
dd}| j	d|||||d	}|S )
Nr^   %Y/%m/%d %H:%M:%S-T)rb   	file_pathrh   r   r   r1   r   ZlistFile)r?   rh   r3   rb   r8   r   )
osr6   basenamesplitr"   r#   strftimeZupload_filerP   rj   )rS   rb   rm   r   r8   r   Zoriginal_file_nameZfile_extensionr`   rh   rZ   ri   resr   r   r   writeListFile   s*    	zDataNode.writeListFilec
              	   K   s   |r0t j }t j |j|j|jd}|d}nt j  d}|	rdt j |	j|	j|	jd}|d}| j||| d| ||||d}|	dd}| j
d||||||d}|S )	aQ  _summary_

        Args:
            df (_type_, optional): _description_. Defaults to None.
            directory_id (_type_, optional): _description_. Defaults to None.
            name (_type_, optional): _description_. Defaults to None.
            description (str, optional): _description_. Defaults to "".
            replace (_type_, optional): _description_. Defaults to None.
            profiling (bool, optional): _description_. Defaults to False.
            lineage (_type_, optional): _description_. Defaults to None.

        Returns:
            _type_: _description_
        )yearmonthdayrk   rl   )rT   r   r   r   r   rU   r8   r3   r   listDataNode)r?   rh   r3   rb   r8   r   r`   )r"   datetodayrt   ru   rv   rq   r#   r]   rP   rj   )rS   rT   rb   r   r   r   rU   r8   Zoverwrite_same_date	data_daterV   ry   Ztoday_with_timeZdatetime_strZdatetime_inputrZ   ri   rr   r   r   r   writeListDataNode   s8    

"
zDataNode.writeListDataNodec              
   C   s   t j| j d| d| jd}|jdkrL|jdk r:| nd}td| | }z| jjt	|d d	}W n& tk
r } z|W 5 d
}~X Y n:X |
d|d d i dd | D }|t|jfS d
S )z_summary_

        Args:
            file_id (_type_, optional): _description_. Defaults to None.

        Raises:
            Exception: _description_
            e: _description_

        Returns:
            _type_: _description_
        r   r   r5         Some thing wrong, s3_key)r   Zobject_nameNowneruserc                 S   s   i | ]\}}|d kr||qS ))r   r   r   r6   r   Z
human_sizer   )r*   r/   valuer   r   r   r-   -  s       z%DataNode.get_file.<locals>.<dictcomp>)rG   rP   rI   rJ   rK   r   rC   clientZ
get_objectr   rL   rM   ioBytesIOr:   )rS   r3   rW   r   rX   rd   r\   r   r   r   get_file  s    
 
zDataNode.get_filer   c                 C   s   t j| j d| d| jd}t| | }|dt|d i |d d }|dkrtj	d	t
 d|d  fd
| ji|S |dkrtjd	t
 d|d  fd
| ji|S |dkrt j| j d|d  d| jd}t| t j| j d| d  d| jd}t| | }t| j| j| j| jd}	|d }
|	j|
d\}}|dd}td| d|d |  dt  dd |d D }|d }|	||d t|}dd |D }|	j|
t|d |d S |d!krtj||| j| j| jt
d"S td#| d$S )%a8  _summary_

        Args:
            file_id (_type_, optional): _description_. Defaults to None.
            index (_type_, optional): _description_. Defaults to None.

        Raises:
            Exception: _description_
            e: _description_

        Returns:
            _type_: _description_
        r   r   r5   r/   r   r?   r   parquetr.   r0   csvz	sql-queryz/api/v2/file/r1   z/getSqlQuery//api/sql/query/)tokenapikeyZdataplatform_api_uriZobject_storage_uridatabasedatabase_idwhere_conditionr   

from sqlalchemy import sql
query
            c                 S   s   i | ]}|d  |d qS columnr?   r   r*   itemr   r   r   r-   c  s      z%DataNode.read_ddf.<locals>.<dictcomp>rX   sqlalchemy_uri	pk_columnc                 S   s   i | ]}|d  |d qS r   r   r   r   r   r   r-   f  s      )r   query_functionr   rX   rw   rX   indexr   Zbase_urir0   Z
bucketNameCan not read file extension , support [parquet, csv])rG   rP   rI   rJ   r   r   rL   r   rB   read_parquetr   rN   read_csv_base_discovery_apiDatabaseManagementr   r   Z_base_minio_urlget_database_schemaexecglobals_get_order_meta_datar   read_sql_queryr	   get_list_datanoderC   )rS   r3   r   Zextra_paramrW   rX   _f_typer[   r:   Z	_databaser   database_metaschemar   reordered_metacon_strZupdated_reordered_metar   r   r   read_ddf1  s\     
((
"&

 zDataNode.read_ddfc                 C   s   t j| j d| d| jd}|jdkrL|jdk r:| nd}td| | }|dt|d	 i |d
 d }|dkrt	j
dt d|d  | jdS |dkrt	jdt d|d  | jdS td| dS )z_summary_

        Args:
            file_id (_type_, optional): _description_. Defaults to None.

        Raises:
            Exception: _description_

        Returns:
            _type_: _description_
        r   r   r5   r|   r}   r~   r   r/   r   r?   r   r   r.   )r0   r   r   r   )rG   rP   rI   rJ   rK   r   rC   rL   r   r@   r   r   rN   r   )rS   r3   rW   r   rX   r   r   r   r   read_dfp  s    
 
  zDataNode.read_dfc                 C   sV   t j|dtd t j|dtd tj| j d| j||dd}t| |	 
dS )z_summary_

        Args:
            name (_type_, optional): _description_. Defaults to None.
            directory_id (_type_, optional): _description_. Defaults to None.

        Returns:
            _type_: _description_
        r   variableZvariableNameZdtyperb   z/file/get-file-id/)r   r   r   r1   )r   
check_typerD   rE   rG   rH   rI   rJ   r   r   rP   )rS   r   rb   r[   r   r   r   rf     s    
zDataNode.get_file_idc                 C   sZ   t j|dtd t j|dtd tj| j d| j||dd}t j|d |	 
dS )	z_summary_

        Args:
            parent_dir_id (_type_, optional): _description_. Defaults to None.
            name (_type_, optional): _description_. Defaults to None.

        Returns:
            _type_: _description_
        parent_dir_idr   r   z/directory/get_directory_id/)
parent_dirr   r   rc   r1   )r   r   rE   rD   rG   rH   rI   rJ   r   r   rP   )rS   r   r   rr   r   r   r   get_directory_id  s    
zDataNode.get_directory_idc                 C   st   t j|dtd tj| j d| d| jd}t j|d | }|di di }|	 }t
|td	d
d}|S )_summary_

        Args:
            file_id (_type_, optional): _description_. Defaults to None.

        Returns:
            _type_: _description_
        r3   r   r   r   r5   rc   ra   r_   r`   F)r/   reverse)r   r   rE   rG   rP   rI   rJ   r   r   valuessortedr   )rS   r3   rW   rX   version_file_idsr   r   r   get_file_version  s    	zDataNode.get_file_version)rz   c                 C   s   t j|dtd tj| j d| d| jd}t j|d | }|di di }|	 }t
||\}}|d	krtd
| d| d||fS )r   r3   r   r   r   r5   rc   ra   r_   Nz
data_date z) is not exist in list data node (file_id=))r   r   rE   rG   rP   rI   rJ   r   r   r   r
   
ValueError)rS   r3   rz   rW   rX   r   r   result_listr   r   r   get_file_from_date  s    	zDataNode.get_file_from_datec              
   C   s(  t j| j d| d| jd}t| | }|dt|d i |d d }|dkrd	}d
}|r| j||d\}}	|dkr|}|d }t	j
||| j| j| jtd}
|dk r|
S z t	j
||| j| j| jtd}W n6 tk
r
 } z| jrt| d}W 5 d}~X Y nX t	|
|S td| dS )r   r   r   r5   r/   r   r?   r   rw   r   )r3   rz   r   r   r   Nr   z, support only [listDataNode])rG   rP   rI   rJ   r   r   rL   r   r   r	   r   r   rN   r   rC   Z_verboseprintZget_change_data)rS   r3   rz   rW   rX   r   Ztarget_indexZprevious_indexr   r   Zddf_currentZddf_previousr\   r   r   r   get_update_data  sR    	 

	
 zDataNode.get_update_data)NNNr   NFN)NNr   NN)	NNNr   NFNFN)N)N)NN)NN)N)NN)NN)__name__
__module____qualname__r]   rj   rs   r{   r   r   r   rf   r   r   r"   rx   r   r   r   r   r   r   r      s,         
^M
         
3
?



r   c                   @   sf   e Zd ZdddZdddZdddZdd	 Zd
d Zdd Zdd Z	dddZ
dd ZdddZdS )r   Nc                 C   s   t |tkr tdt | dtj| j d| d| jd}|jdkrN|jS |jdkr`|	 S |	 }d|krx|d S d	| S )
zLGet sqlalchemy connection string
        
        Args:
        ...
        zExpect `con_id`=<int> but got z#, please input `con_id` eg con_id=0/api/sql/database/r   r5   r}     r   zSome thing wrong!, )
r?   rE   rC   rG   rP   r   rJ   rK   contentr   )rS   con_idr[   r:   r   r   r   _get_connection_str  s    

z&DatabaseManagement._get_connection_strc                 C   s   t |tkr tdt | dtj| j d| d| jd}|jdkrR|jdfS |jdkrh|	 dfS |	 }|
d	}||fS )
z_summary_

        Args:
            table_id (_type_, optional): _description_. Defaults to None.

        Raises:
            Exception: _description_

        Returns:
            _type_: _description_
        z Expect `table_id`=<int> but got z', please input `table_id` eg table_id=0z/api/sql/table/r   r5   i  r   r   schema_code)r?   rE   rC   rG   rP   r   rJ   rK   r   r   pop)rS   Ztable_idr[   rX   r   r   r   r   get_table_schema,  s    



z#DatabaseManagement.get_table_schemac           	      O   s   |dkr@t j| j d| jd|id}t|d | dd}tj|dt	d t j| j d	| d
| jd}tj|d | }|
d}||fS )z_summary_

        Args:
            database_id (_type_, optional): _description_. Defaults to None.

        Returns:
            _type_: _description_
        N"/api/sql/database/get_database_id/r   r   rc   r1   r   r   r   r   r5   r   )rG   rH   r   rJ   r   r   rP   r   r   rE   r   )	rS   r   database_nameargsrV   rr   r[   rX   r   r   r   r   r   D  s    	 

z&DatabaseManagement.get_database_schemac                 C   s   dS )zbCheck table and column accesing permission from con_id
        
        Args:
        ...
        Nr   )rS   r   tabler   r   r   r   check_permission\  s    z#DatabaseManagement.check_permissionc                 C   sH   |j D ]<}|d jj}| j|||d dstd| d|d  qdS )zoCheck table and column accesing permission from sqlalchemy query   
        
        Args:
        ...
        exprr   )r   r   r   z#You don't have permission in table=z	, column=N)Zcolumn_descriptionsr   r   r   rC   )rS   r   r   r   Z
table_namer   r   r   check_query_permissione  s    
z)DatabaseManagement.check_query_permissionc                 C   s@   t jt| d}|| dkr(d||< ||}||}|S )N)columnsZInt64Zint64)r@   rA   listkeysZastypeZ	set_index)rS   rX   r   Z	meta_daskr   r   r   _create_pandas_metar  s    

z&DatabaseManagement._create_pandas_metac              
   C   s   |   }|| dkr*tj| |||d}ntj| ||dd}|j}zdg }|D ]B}	|	|krp||	||	 d qNtd|	 d ||	dd qN|||| d W n0 tk
r }
 zt|
 d	W 5 d }
~
X Y nX |S )
NrD   string)sqlcon	index_col	divisionsr   )r   r   r   Znpartitionsr   zWarning! column "z" exist in data source but not exist in reflection schema, so it will be auto cast to "string". If you want to use data type same as data source, please regenreate reflection schemar   z. This column is exist in data source but not exist in reflection schema. Maybe this is a new column in data source. Please check your reflection schema file and regenreate it)_get_possible_str_divisionrB   r   r   r    r   KeyError)rS   rX   r   r   r   possible_divisionsddfZcolumns_orderr   r   Zker   r   r   r   z  s      z'DatabaseManagement._get_order_meta_datac	                 O   s  |dks"t |dkr6t|tkr6tdt| d|dksJt|tkr^tdt| d|dksrt|tkrtdt| d|dkst|tkrtd	t| d
|dkst|tkrtdt| d|dkst|tkrtdt| d|dkrd| }|dkr(| ||n|}| j|d\}}|d }t |r`t 	|}|}n0t|tkr|}t
d| d| dt  t}| j||t|d}tj| j d| j||||||||dd}t| |jdk r| S |jS )a  _summary_

        Args:
            query_function (_type_, optional): _description_. Defaults to None.
            directory_id (_type_, optional): _description_. Defaults to None.
            database_id (_type_, optional): _description_. Defaults to None.
            pk_column (_type_, optional): _description_. Defaults to None.
            meta (_type_, optional): _description_. Defaults to None.
            name (_type_, optional): _description_. Defaults to None.
            desciption (_type_, optional): _description_. Defaults to None.
            replace (_type_, optional): _description_. Defaults to None.

        Raises:
            Exception: _description_
            Exception: _description_
            Exception: _description_
            Exception: _description_
            Exception: _description_
            Exception: _description_

        Returns:
            _type_: _description_
        NFz(Expect `query`=<function>/<str> but got , please input `query`r   z eg directory_id=0z/Please input data `database_id`=<int>, but got z eg database_id=0z*Please input data `con_id`=<str>, but got z eg pk_column='id' z)Please input data `meta`=<dict>, but got z eg meta='{'id':'Int64'}' r   z eg name='myData' z	query of r   r   r   r   r   )rX   r   r   r   r   )r   r   r   r   r   r   rX   r   r   r}   )inspect
isfunctionr?   rD   rC   rE   dictrF   r   	getsourcer   r   r   r   rG   rH   r   rJ   r   rK   r   r   )rS   r   rb   r   r   rX   r   Z
desciptionr   r   rV   _replacer   r   r   Zquery_function_strZquery_function_functionr   rr   r   r   r   write_sql_query  sb    "


z"DatabaseManagement.write_sql_queryc                 C   s    t dtj tj }|  |S )N
0123456789)r   r   ascii_lowercaseascii_uppercasesort)rS   r   r   r   r   r     s    z-DatabaseManagement._get_possible_str_divisionc                 O   s  |dkr@t j| j d| jd|id}t|d | dd}tj|dt	d tj|d	t
d tj|d
td |dkst|dkrtdt| d| j|d\}	}
|	d }| j||d}|| dkr|  }tj| ||||d}ntj| |||dd}|S )a%  _summary_

        Args:
            table_id (_type_, optional): _description_. Defaults to None.
            query_function (_type_, optional): _description_. Defaults to None.
            pk_column (_type_, optional): _description_. Defaults to None.
            meta (_type_, optional): _description_. Defaults to None.

        Raises:
            Exception: _description_
            Exception: _description_
            Exception: _description_
            Exception: _description_

        Returns:
            _type_: _description_
        Nr   r   r   rc   r1   r   r   r   rX   Fz"Expect `query`=<function> but got r   r   r   )rX   r   r   )r   r   r   rX   r   z300 MB)r   r   r   rX   Zbytes_per_chunk)rG   rH   r   rJ   r   r   rP   r   r   rE   rD   r   r   r   rC   r?   r   r   r   rB   r   )rS   r   r   r   rX   r   r   rV   rr   r   r   r   Zdf_metar   r   r   r   r   r     sB     
	z!DatabaseManagement.read_sql_query)N)N)NN)NNNNNNNN)NNNNN)r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r     s   


	        
dr   )r   timer   r   rG   rn   r"   Zpandasr@   Zdask.dataframeZ	dataframerB   operatorr   baser   r   Zdatasetr   Zutils.requestsr   utilsr   r	   Zutils.listDatanoder
   r   r   r   r   r   r   r   <module>   s*       