U
    /es#                     @   s   d dl Z d dlZd dlmZmZmZ d dlmZ d dlmZ	 d dl
mZ dZdZdd	 Zd
d Zdd Zdd Zd ddZdd Zdd Zd!ddZdd Zdd ZdS )"    N)OpenFileget_fs_token_paths
open_files)
read_block)tokenize)HighLevelGraphs   Obj   c                 C   sd   |  d}t|}|d@ }d}|d@ dkrRt|  d}||d@ |> O }|d7 }q|d? |d@  A S )z"variable-length, zig-zag encoding.            r   )readord)focbnshift r   1/tmp/pip-unpacked-wheel-dbjnr7gq/dask/bag/avro.py	read_long   s    

r   c                 C   s   t | }| |S )z+a long followed by that many bytes of data.)r   r   )r   sizer   r   r   
read_bytes   s    r   c                 C   s   |  tttkstdi }d|i}t| }|dkr8qXt|D ]}t|  t|  q@q&|  t|d< |  |d< | 	d |  |d |d< |S )zExtract an avro file's header

    fo: file-like
        This should be in bytes mode, e.g., io.BytesIO

    Returns dict representing the header

    Parameters
    ----------
    fo: file-like
    zMagic avro bytes missingmetar   syncheader_size
head_bytes)
r   lenMAGICAssertionErrorr   ranger   	SYNC_SIZEtellseek)r   r   outZn_keys_r   r   r   read_header!   s    
r&   c              	   C   s8   t | ||d}t|}W 5 Q R X | |d }||fS )z*Open a file just to read its head and sizecompressionr   )r   r&   info)fspathr(   fheadr   r   r   r   	open_head?   s    r.    c                    s  ddl m}m} ddlm} ddlm} |dd |p8i }|dk	rbt| d|d	\}}	|t| fd
d|	D  }
t	|
 \}}|t
g }g }|D ]6}ttd||}|gt| }|| || qg }
t	|	|||D ]t\}}}d }t| dt|||| |fdd|D }fddt	|||D }|
| q||
S t| fd i|}|tfdd|D }||S dS )a  Read set of avro files

    Use this with arbitrary nested avro schemas. Please refer to the
    fastavro documentation for its capabilities:
    https://github.com/fastavro/fastavro

    Parameters
    ----------
    urlpath: string or list
        Absolute or relative filepath, URL (may include protocols like
        ``s3://``), or globstring pointing to data.
    blocksize: int or None
        Size of chunks in bytes. If None, there will be no chunking and each
        file will become one partition.
    storage_options: dict or None
        passed to backend file-system
    compression: str or None
        Compression format of the targe(s), like 'gzip'. Should only be used
        with blocksize=None.
    r   )computedelayed)from_delayedimport_requiredfastavroz<fastavro is a required dependency for using bag.read_avro().Nrb)modestorage_optionsc                    s   g | ]}| qS r   r   ).0r+   )r(   dheadr*   r   r   
<listcomp>j   s     zread_avro.<locals>.<listcomp>r   r'   c                    s   g | ]}d | d  qS )z
read-avro--r   )r9   o)tokenr   r   r;   }   s     c              	      s$   g | ]\}}} |||d qS ))Zdask_key_namer   )r9   r=   keyl)dreadr,   r-   r   r   r;   ~   s   r(   c                    s   g | ]} |qS r   r   )r9   r   )rA   r   r   r;      s     )Zdaskr0   r1   Zdask.bagr2   
dask.utilsr4   r   r.   zip
read_chunklistr    r   appendr   fs_tokenizeZukeyextendr   	read_file)Zurlpath	blocksizer8   r(   r0   r1   r2   r4   Zfs_tokenpathsr$   ZheadsZsizesoffsetslengthsr   offlengthr+   offset	delimiterkeysvaluesfileschunksr   )r(   r:   rA   r,   r*   r-   r>   r   	read_avroG   s^     
  
     
rV   c           
   	   C   sr   ddl }t|dr|j}n|j}| }t||||d }W 5 Q R X |d }|ts\|| }t|}	t	||	S )zGet rows from raw bytes blockr   N	iter_avror   r   )
r5   hasattrrW   readerr   
startswithr   ioBytesIOrE   )
ZfobjrN   r@   r-   r5   rY   r,   chunkr   ir   r   r   rD      s    


rD   c              
   C   sH   ddl }t|dr|j}n|j}| }t||W  5 Q R  S Q R X dS )zGet rows from file-liker   NrW   )r5   rX   rW   rY   rE   )r   r5   rY   r,   r   r   r   rI      s    
rI   null>  Tc	                    s   ddl m}
 |
dd t |p$i }t|df| jd|}dt j  fdd	t|D }t	j
| gd
}t | j}|r|jf |	 dd |D S | S dS )a	  Write bag to set of avro files

    The schema is a complex dictionary describing the data, see
    https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schema
    and https://fastavro.readthedocs.io/en/latest/writer.html .
    It's structure is as follows::

        {'name': 'Test',
         'namespace': 'Test',
         'doc': 'Descriptive text',
         'type': 'record',
         'fields': [
            {'name': 'a', 'type': 'int'},
         ]}

    where the "name" field is required, but "namespace" and "doc" are optional
    descriptors; "type" must always be "record". The list of fields should
    have an entry for every key of the input records, and the types are
    like the primitive, complex or logical types of the Avro spec
    ( https://avro.apache.org/docs/1.8.2/spec.html ).

    Results in one avro file per input partition.

    Parameters
    ----------
    b: dask.bag.Bag
    filename: list of str or str
        Filenames to write to. If a list, number must match the number of
        partitions. If a string, must include a glob character "*", which will
        be expanded using name_function
    schema: dict
        Avro schema dictionary, see above
    name_function: None or callable
        Expands integers into strings, see
        ``dask.bytes.utils.build_name_function``
    storage_options: None or dict
        Extra key/value options to pass to the backend file-system
    codec: 'null', 'deflate', or 'snappy'
        Compression algorithm
    sync_interval: int
        Number of records to include in each block within a file
    metadata: None or dict
        Included in the file header
    compute: bool
        If True, files are written immediately, and function blocks. If False,
        returns delayed objects, which can be computed by the user where
        convenient.
    kwargs: passed to compute(), if compute=True

    Examples
    --------
    >>> import dask.bag as db
    >>> b = db.from_sequence([{'name': 'Alice', 'value': 100},
    ...                       {'name': 'Bob', 'value': 200}])
    >>> schema = {'name': 'People', 'doc': "Set of people's scores",
    ...           'type': 'record',
    ...           'fields': [
    ...               {'name': 'name', 'type': 'string'},
    ...               {'name': 'value', 'type': 'int'}]}
    >>> b.to_avro('my-data.*.avro', schema)  # doctest: +SKIP
    ['my-data.0.avro', 'my-data.1.avro']
    r   r3   r5   z:fastavro is a required dependency for using bag.to_avro().wb)name_functionnumzto-avro-c              
      s.   i | ]&\}}|ft  j|f|fqS r   )_write_avro_partname)r9   r^   r,   r   codecmetadatare   schemasync_intervalr   r   
<dictcomp>  s   
zto_avro.<locals>.<dictcomp>)Zdependenciesc                 S   s   g | ]
}|j qS r   )r+   )r9   r,   r   r   r   r;     s     zto_avro.<locals>.<listcomp>N)rB   r4   _verify_schemar   ZnpartitionsuuidZuuid4hex	enumerater   Zfrom_collectionstyper0   Z
to_delayed)r   filenameri   rb   r8   rg   rj   rh   r0   kwargsr4   rT   Zdskgraphr$   r   rf   r   to_avro   s4    K 
rt   c                 C   s   t | tstddD ]}|| kstd| q| d dksDtdt | d tsZtd| d D ] }d	|krvd|ksbtd
| qbd S )NzSchema must be dictionary)re   rp   fieldszSchema missing '%s' fieldrp   recordzSchema must be of type 'record'ru   zFields entry must be a listre   zField spec incomplete: %s)
isinstancedictr   rE   )sfieldr,   r   r   r   rl     s    rl   c              	   C   s0   ddl }|}|||| ||| W 5 Q R X dS )z1Create single avro file from list of dictionariesr   N)r5   writer)partr,   ri   rg   rj   rh   r5   r   r   r   rd   %  s    rd   )r/   NN)NNr_   r`   NT)r[   rm   Zfsspec.corer   r   r   Zfsspec.utilsr   r   rG   Zdask.highlevelgraphr   r   r!   r   r   r&   r.   rV   rD   rI   rt   rl   rd   r   r   r   r   <module>   s.   
E      
p
