U
    /eI                  	   @   s   d dl Z d dlmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZmZ ed	d
Zddeddd	dddf	ddZdddZdd Zdd ZdS )    N)partial)
open_files)concat)from_delayed)
read_bytes)delayed)parse_bytessystem_encodingT)ZpureZinferstrictFc
                    s  |dk	r|dk	rt dt|tr*t|}|dkr܈dkrD}
dnd}
t| fd ||
d|pbi }|dkrfdd|D }nTg }td	t||D ]>}||||  }tttt	t
td
|}|| qnt| fdk	r nd|d|d|pi }|d  fddtD }rlttfddt|d D }dd t||D }|s|t d| |rt|}|S )a\	  Read lines from text files

    Parameters
    ----------
    urlpath : string or list
        Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
        to read from alternative filesystems. To read from multiple files you
        can pass a globstring or a list of paths, with the caveat that they
        must all have the same protocol.
    blocksize: None, int, or str
        Size (in bytes) to cut up larger files.  Streams by default.
        Can be ``None`` for streaming, an integer number of bytes, or a string
        like "128MiB"
    compression: string
        Compression format like 'gzip' or 'xz'.  Defaults to 'infer'
    encoding: string
    errors: string
    linedelimiter: string or None
    collection: bool, optional
        Return dask.bag if True, or list of delayed values if false
    storage_options: dict
        Extra options that make sense to a particular storage connection, e.g.
        host, port, username, password, etc.
    files_per_partition: None or int
        If set, group input files into partitions of the requested size,
        instead of one partition per file. Mutually exclusive with blocksize.
    include_path: bool
        Whether or not to include the path in the bag.
        If true, elements are tuples of (line, path).
        Default is False.

    Examples
    --------
    >>> b = read_text('myfiles.1.txt')  # doctest: +SKIP
    >>> b = read_text('myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('myfiles.*.txt.gz')  # doctest: +SKIP
    >>> b = read_text('s3://bucket/myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('s3://key:secret@bucket/myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('hdfs://namenode.example.com/myfiles.*.txt')  # doctest: +SKIP

    Parallelize a large file by providing the number of uncompressed bytes to
    load into each partition.

    >>> b = read_text('largefile.txt', blocksize='10MB')  # doctest: +SKIP

    Get file paths of the bag by setting include_path=True

    >>> b = read_text('myfiles.*.txt', include_path=True) # doctest: +SKIP
    >>> b.take(1) # doctest: +SKIP
    (('first line of the first file', '/home/dask/myfiles.0.txt'),)

    Returns
    -------
    dask.bag.Bag or list
        dask.bag.Bag if collection is True or list of Delayed lists otherwise.

    See Also
    --------
    from_sequence: Build bag from Python sequence
    Nz7Only one of blocksize or files_per_partition can be setN 
z
r   rt)modeencodingerrorscompressionnewlinec              	      s*   g | ]"}t tt tt d |qS )	delimiter)r   listr   file_to_blocks).0Zfil)include_pathlinedelimiter 1/tmp/pip-unpacked-wheel-dbjnr7gq/dask/bag/text.py
<listcomp>l   s   zread_text.<locals>.<listcomp>r   r      
F)r   	blocksizesampler   r      c                    s   g | ]}t t| qS r   )r   decode)r   b)r   r   r   r   r   r      s   c                    s"   g | ]\}}|gt  |  qS r   )len)r   ipath)
raw_blocksr   r   r      s        c                 S   s   g | ]\}}t t||qS r   )r   attach_path)r   entryr'   r   r   r   r      s    zNo files found)
ValueError
isinstancestrr   r   ranger%   r   r   mapr   r   appendr   encoder   	enumeratezipr   )Zurlpathr    r   r   r   r   Z
collectionZstorage_optionsZfiles_per_partitionr   r   filesblocksstartZblock_filesZblock_linesopathsr   )r   r   r   r   r(   r   	read_text   sz    H
			
r:   c              
   #   s   } d k	rr|  }|s*g W  5 Q R  S | }fdd fdd|d d D |dd   D E d H  n|D ]}r|jfn|V  qvW 5 Q R X d S )Nc                 3   s    | ]} r|j fn|V  qd S N)r'   r   line)r   	lazy_filer   r   	<genexpr>   s   z!file_to_blocks.<locals>.<genexpr>c                    s   g | ]}|  qS r   r   r<   r   r   r   r      s     z"file_to_blocks.<locals>.<listcomp>)readsplitr'   )r   r>   r   ftextpartsr=   r   )r   r   r>   r   r      s    
$r   c                 c   s   | D ]}||fV  qd S r;   r   )blockr'   pr   r   r   r*      s    r*   c                    sx   |  ||} dkr*tj| d}t|S |s2g S | } fdd|d d D | sj|dd  ng  }|S d S )Nr   )r   c                    s   g | ]}|  qS r   r   )r   tline_delimiterr   r   r      s     zdecode.<locals>.<listcomp>r@   )r#   ioStringIOr   rB   endswith)rF   r   r   rJ   rD   linesrE   outr   rI   r   r#      s    
r#   )N)rK   	functoolsr   Zfsspec.corer   Ztlzr   Zdask.bag.corer   Z
dask.bytesr   Zdask.delayedr   Z
dask.utilsr   r	   r:   r   r*   r#   r   r   r   r   <module>   s,   

 
