import os from collections.abc import Mapping from io import BytesIO from warnings import catch_warnings, simplefilter, warn try: import psutil except ImportError: psutil = None # type: ignore import numpy as np import pandas as pd from fsspec.compression import compr from fsspec.core import get_fs_token_paths from fsspec.core import open as open_file from fsspec.core import open_files from fsspec.utils import infer_compression from pandas.api.types import ( CategoricalDtype, is_datetime64_any_dtype, is_float_dtype, is_integer_dtype, is_object_dtype, ) from dask.base import tokenize from dask.bytes import read_bytes from dask.core import flatten from dask.dataframe.backends import dataframe_creation_dispatch from dask.dataframe.io.io import from_map from dask.dataframe.io.utils import DataFrameIOFunction from dask.dataframe.utils import clear_known_categories from dask.delayed import delayed from dask.utils import asciitable, parse_bytes class CSVFunctionWrapper(DataFrameIOFunction): """ CSV Function-Wrapper Class Reads CSV data from disk to produce a partition (given a key). """ def __init__( self, full_columns, columns, colname, head, header, reader, dtypes, enforce, kwargs, ): self.full_columns = full_columns self._columns = columns self.colname = colname self.head = head self.header = header self.reader = reader self.dtypes = dtypes self.enforce = enforce self.kwargs = kwargs @property def columns(self): if self._columns is None: return self.full_columns if self.colname: return self._columns + [self.colname] return self._columns def project_columns(self, columns): """Return a new CSVFunctionWrapper object with a sub-column projection. """ # Make sure columns is ordered correctly columns = [c for c in self.head.columns if c in columns] if columns == self.columns: return self if self.colname and self.colname not in columns: # when path-as-column is on, we must keep it at IO # whatever the selection head = self.head[columns + [self.colname]] else: head = self.head[columns] return CSVFunctionWrapper( self.full_columns, columns, self.colname, head, self.header, self.reader, {c: self.dtypes[c] for c in columns}, self.enforce, self.kwargs, ) def __call__(self, part): # Part will be a 3-element tuple block, path, is_first, is_last = part # Construct `path_info` if path is not None: path_info = ( self.colname, path, sorted(list(self.head[self.colname].cat.categories)), ) else: path_info = None # Deal with arguments that are special # for the first block of each file write_header = False rest_kwargs = self.kwargs.copy() if not is_first: if rest_kwargs.get("names", None) is None: write_header = True rest_kwargs.pop("skiprows", None) if rest_kwargs.get("header", 0) is not None: rest_kwargs.pop("header", None) if not is_last: rest_kwargs.pop("skipfooter", None) # Deal with column projection columns = self.full_columns project_after_read = False if self._columns is not None: if self.kwargs: # To be safe, if any kwargs are defined, avoid # changing `usecols` here. Instead, we can just # select columns after the read project_after_read = True else: columns = self._columns rest_kwargs["usecols"] = columns # Call `pandas_read_text` df = pandas_read_text( self.reader, block, self.header, rest_kwargs, self.dtypes, columns, write_header, self.enforce, path_info, ) if project_after_read: return df[self.columns] return df def pandas_read_text( reader, b, header, kwargs, dtypes=None, columns=None, write_header=True, enforce=False, path=None, ): """Convert a block of bytes to a Pandas DataFrame Parameters ---------- reader : callable ``pd.read_csv`` or ``pd.read_table``. b : bytestring The content to be parsed with ``reader`` header : bytestring An optional header to prepend to ``b`` kwargs : dict A dictionary of keyword arguments to be passed to ``reader`` dtypes : dict dtypes to assign to columns path : tuple A tuple containing path column name, path to file, and an ordered list of paths. See Also -------- dask.dataframe.csv.read_pandas_from_bytes """ bio = BytesIO() if write_header and not b.startswith(header.rstrip()): bio.write(header) bio.write(b) bio.seek(0) df = reader(bio, **kwargs) if dtypes: coerce_dtypes(df, dtypes) if enforce and columns and (list(df.columns) != list(columns)): raise ValueError("Columns do not match", df.columns, columns) if path: colname, path, paths = path code = paths.index(path) df = df.assign( **{colname: pd.Categorical.from_codes(np.full(len(df), code), paths)} ) return df def coerce_dtypes(df, dtypes): """Coerce dataframe to dtypes safely Operates in place Parameters ---------- df: Pandas DataFrame dtypes: dict like {'x': float} """ bad_dtypes = [] bad_dates = [] errors = [] for c in df.columns: if c in dtypes and df.dtypes[c] != dtypes[c]: actual = df.dtypes[c] desired = dtypes[c] if is_float_dtype(actual) and is_integer_dtype(desired): bad_dtypes.append((c, actual, desired)) elif is_object_dtype(actual) and is_datetime64_any_dtype(desired): # This can only occur when parse_dates is specified, but an # invalid date is encountered. Pandas then silently falls back # to object dtype. Since `object_array.astype(datetime)` will # silently overflow, error here and report. bad_dates.append(c) else: try: df[c] = df[c].astype(dtypes[c]) except Exception as e: bad_dtypes.append((c, actual, desired)) errors.append((c, e)) if bad_dtypes: if errors: ex = "\n".join( f"- {c}\n {e!r}" for c, e in sorted(errors, key=lambda x: str(x[0])) ) exceptions = ( "The following columns also raised exceptions on " "conversion:\n\n%s\n\n" ) % ex extra = "" else: exceptions = "" # All mismatches are int->float, also suggest `assume_missing=True` extra = ( "\n\nAlternatively, provide `assume_missing=True` " "to interpret\n" "all unspecified integer columns as floats." ) bad_dtypes = sorted(bad_dtypes, key=lambda x: str(x[0])) table = asciitable(["Column", "Found", "Expected"], bad_dtypes) dtype_kw = "dtype={%s}" % ",\n ".join( f"{k!r}: '{v}'" for (k, v, _) in bad_dtypes ) dtype_msg = ( "{table}\n\n" "{exceptions}" "Usually this is due to dask's dtype inference failing, and\n" "*may* be fixed by specifying dtypes manually by adding:\n\n" "{dtype_kw}\n\n" "to the call to `read_csv`/`read_table`." "{extra}" ).format(table=table, exceptions=exceptions, dtype_kw=dtype_kw, extra=extra) else: dtype_msg = None if bad_dates: also = " also " if bad_dtypes else " " cols = "\n".join("- %s" % c for c in bad_dates) date_msg = ( "The following columns{also}failed to properly parse as dates:\n\n" "{cols}\n\n" "This is usually due to an invalid value in that column. To\n" "diagnose and fix it's recommended to drop these columns from the\n" "`parse_dates` keyword, and manually convert them to dates later\n" "using `dd.to_datetime`." ).format(also=also, cols=cols) else: date_msg = None if bad_dtypes or bad_dates: rule = "\n\n%s\n\n" % ("-" * 61) msg = "Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.\n\n%s" % ( rule.join(filter(None, [dtype_msg, date_msg])) ) raise ValueError(msg) def text_blocks_to_pandas( reader, block_lists, header, head, kwargs, enforce=False, specified_dtypes=None, path=None, blocksize=None, urlpath=None, ): """Convert blocks of bytes to a dask.dataframe This accepts a list of lists of values of bytes where each list corresponds to one file, and the value of bytes concatenate to comprise the entire file, in order. Parameters ---------- reader : callable ``pd.read_csv`` or ``pd.read_table``. block_lists : list of lists of delayed values of bytes The lists of bytestrings where each list corresponds to one logical file header : bytestring The header, found at the front of the first file, to be prepended to all blocks head : pd.DataFrame An example Pandas DataFrame to be used for metadata. kwargs : dict Keyword arguments to pass down to ``reader`` path : tuple, optional A tuple containing column name for path and the path_converter if provided Returns ------- A dask.dataframe """ dtypes = head.dtypes.to_dict() # dtypes contains only instances of CategoricalDtype, which causes issues # in coerce_dtypes for non-uniform categories across partitions. # We will modify `dtype` (which is inferred) to # 1. contain instances of CategoricalDtypes for user-provided types # 2. contain 'category' for data inferred types categoricals = head.select_dtypes(include=["category"]).columns if isinstance(specified_dtypes, Mapping): known_categoricals = [ k for k in categoricals if isinstance(specified_dtypes.get(k), CategoricalDtype) and specified_dtypes.get(k).categories is not None ] unknown_categoricals = categoricals.difference(known_categoricals) else: unknown_categoricals = categoricals # Fixup the dtypes for k in unknown_categoricals: dtypes[k] = "category" columns = list(head.columns) blocks = tuple(flatten(block_lists)) # Create mask of first blocks from nested block_lists is_first = tuple(block_mask(block_lists)) is_last = tuple(block_mask_last(block_lists)) if path: colname, path_converter = path paths = [b[1].path for b in blocks] if path_converter: paths = [path_converter(p) for p in paths] head = head.assign( **{ colname: pd.Categorical.from_codes( np.zeros(len(head), dtype=int), set(paths) ) } ) path = (colname, paths) if len(unknown_categoricals): head = clear_known_categories(head, cols=unknown_categoricals) # Define parts parts = [] colname, paths = path or (None, None) for i in range(len(blocks)): parts.append([blocks[i], paths[i] if paths else None, is_first[i], is_last[i]]) # Construct the output collection with from_map return from_map( CSVFunctionWrapper( columns, None, colname, head, header, reader, dtypes, enforce, kwargs, ), parts, meta=head, label="read-csv", token=tokenize(reader, urlpath, columns, enforce, head, blocksize), enforce_metadata=False, produces_tasks=True, ) def block_mask(block_lists): """ Yields a flat iterable of booleans to mark the zeroth elements of the nested input ``block_lists`` in a flattened output. >>> list(block_mask([[1, 2], [3, 4], [5]])) [True, False, True, False, True] """ for block in block_lists: if not block: continue yield True yield from (False for _ in block[1:]) def block_mask_last(block_lists): """ Yields a flat iterable of booleans to mark the last element of the nested input ``block_lists`` in a flattened output. >>> list(block_mask_last([[1, 2], [3, 4], [5]])) [False, True, False, True, True] """ for block in block_lists: if not block: continue yield from (False for _ in block[:-1]) yield True def auto_blocksize(total_memory, cpu_count): memory_factor = 10 blocksize = int(total_memory // cpu_count / memory_factor) return min(blocksize, int(64e6)) def _infer_block_size(): default = 2**25 if psutil is not None: with catch_warnings(): simplefilter("ignore", RuntimeWarning) mem = psutil.virtual_memory().total cpu = psutil.cpu_count() if mem and cpu: return auto_blocksize(mem, cpu) return default # guess blocksize if psutil is installed or use acceptable default one if not AUTO_BLOCKSIZE = _infer_block_size() def read_pandas( reader, urlpath, blocksize="default", lineterminator=None, compression="infer", sample=256000, sample_rows=10, enforce=False, assume_missing=False, storage_options=None, include_path_column=False, **kwargs, ): reader_name = reader.__name__ if lineterminator is not None and len(lineterminator) == 1: kwargs["lineterminator"] = lineterminator else: lineterminator = "\n" if include_path_column and isinstance(include_path_column, bool): include_path_column = "path" if "index" in kwargs or "index_col" in kwargs: raise ValueError( "Keywords 'index' and 'index_col' not supported. " f"Use dd.{reader_name}(...).set_index('my-index') instead" ) for kw in ["iterator", "chunksize"]: if kw in kwargs: raise ValueError(f"{kw} not supported for dd.{reader_name}") if kwargs.get("nrows", None): raise ValueError( "The 'nrows' keyword is not supported by " "`dd.{0}`. To achieve the same behavior, it's " "recommended to use `dd.{0}(...)." "head(n=nrows)`".format(reader_name) ) if isinstance(kwargs.get("skiprows"), int): skiprows = lastskiprow = firstrow = kwargs.get("skiprows") elif kwargs.get("skiprows") is None: skiprows = lastskiprow = firstrow = 0 else: # When skiprows is a list, we expect more than max(skiprows) to # be included in the sample. This means that [0,2] will work well, # but [0, 440] might not work. skiprows = set(kwargs.get("skiprows")) lastskiprow = max(skiprows) # find the firstrow that is not skipped, for use as header firstrow = min(set(range(len(skiprows) + 1)) - set(skiprows)) if isinstance(kwargs.get("header"), list): raise TypeError(f"List of header rows not supported for dd.{reader_name}") if isinstance(kwargs.get("converters"), dict) and include_path_column: path_converter = kwargs.get("converters").get(include_path_column, None) else: path_converter = None # If compression is "infer", inspect the (first) path suffix and # set the proper compression option if the suffix is recongnized. if compression == "infer": # Translate the input urlpath to a simple path list paths = get_fs_token_paths(urlpath, mode="rb", storage_options=storage_options)[ 2 ] # Check for at least one valid path if len(paths) == 0: raise OSError(f"{urlpath} resolved to no files") # Infer compression from first path compression = infer_compression(paths[0]) if blocksize == "default": blocksize = AUTO_BLOCKSIZE if isinstance(blocksize, str): blocksize = parse_bytes(blocksize) if blocksize and compression: # NONE of the compressions should use chunking warn( "Warning %s compression does not support breaking apart files\n" "Please ensure that each individual file can fit in memory and\n" "use the keyword ``blocksize=None to remove this message``\n" "Setting ``blocksize=None``" % compression ) blocksize = None if compression not in compr: raise NotImplementedError("Compression format %s not installed" % compression) if blocksize and sample and blocksize < sample and lastskiprow != 0: warn( "Unexpected behavior can result from passing skiprows when\n" "blocksize is smaller than sample size.\n" "Setting ``sample=blocksize``" ) sample = blocksize b_lineterminator = lineterminator.encode() b_out = read_bytes( urlpath, delimiter=b_lineterminator, blocksize=blocksize, sample=sample, compression=compression, include_path=include_path_column, **(storage_options or {}), ) if include_path_column: b_sample, values, paths = b_out path = (include_path_column, path_converter) else: b_sample, values = b_out path = None if not isinstance(values[0], (tuple, list)): values = [values] # If we have not sampled, then use the first row of the first values # as a representative sample. if b_sample is False and len(values[0]): b_sample = values[0][0].compute() # Get header row, and check that sample is long enough. If the file # contains a header row, we need at least 2 nonempty rows + the number of # rows to skip. names = kwargs.get("names", None) header = kwargs.get("header", "infer" if names is None else None) need = 1 if header is None else 2 if kwargs.get("comment"): # if comment is provided, step through lines of b_sample and strip out comments parts = [] for part in b_sample.split(b_lineterminator): split_comment = part.decode().split(kwargs.get("comment")) if len(split_comment) > 1: # if line starts with comment, don't include that line in parts. if len(split_comment[0]) > 0: parts.append(split_comment[0].strip().encode()) else: parts.append(part) if len(parts) > need: break else: parts = b_sample.split(b_lineterminator, lastskiprow + need) # If the last partition is empty, don't count it nparts = 0 if not parts else len(parts) - int(not parts[-1]) if sample is not False and nparts < lastskiprow + need and len(b_sample) >= sample: raise ValueError( "Sample is not large enough to include at least one " "row of data. Please increase the number of bytes " "in `sample` in the call to `read_csv`/`read_table`" ) if isinstance(header, int): firstrow += header header = b"" if header is None else parts[firstrow] + b_lineterminator # Use sample to infer dtypes and check for presence of include_path_column head_kwargs = kwargs.copy() head_kwargs.pop("skipfooter", None) try: head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs) except pd.errors.ParserError as e: if "EOF" in str(e): raise ValueError( "EOF encountered while reading header. \n" "Pass argument `sample_rows` and make sure the value of `sample` " "is large enough to accommodate that many rows of data" ) from e raise if include_path_column and (include_path_column in head.columns): raise ValueError( "Files already contain the column name: %s, so the " "path column cannot use this name. Please set " "`include_path_column` to a unique name." % include_path_column ) specified_dtypes = kwargs.get("dtype", {}) if specified_dtypes is None: specified_dtypes = {} # If specified_dtypes is a single type, then all columns were specified if assume_missing and isinstance(specified_dtypes, dict): # Convert all non-specified integer columns to floats for c in head.columns: if is_integer_dtype(head[c].dtype) and c not in specified_dtypes: head[c] = head[c].astype(float) values = [[list(dsk.dask.values()) for dsk in block] for block in values] return text_blocks_to_pandas( reader, values, header, head, kwargs, enforce=enforce, specified_dtypes=specified_dtypes, path=path, blocksize=blocksize, urlpath=urlpath, ) READ_DOC_TEMPLATE = """ Read {file_type} files into a Dask.DataFrame This parallelizes the :func:`pandas.{reader}` function in the following ways: - It supports loading many files at once using globstrings: >>> df = dd.{reader}('myfiles.*.csv') # doctest: +SKIP - In some cases it can break up large files: >>> df = dd.{reader}('largefile.csv', blocksize=25e6) # 25MB chunks # doctest: +SKIP - It can read CSV files from external resources (e.g. S3, HDFS) by providing a URL: >>> df = dd.{reader}('s3://bucket/myfiles.*.csv') # doctest: +SKIP >>> df = dd.{reader}('hdfs:///myfiles.*.csv') # doctest: +SKIP >>> df = dd.{reader}('hdfs://namenode.example.com/myfiles.*.csv') # doctest: +SKIP Internally ``dd.{reader}`` uses :func:`pandas.{reader}` and supports many of the same keyword arguments with the same performance guarantees. See the docstring for :func:`pandas.{reader}` for more information on available keyword arguments. Parameters ---------- urlpath : string or list Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol. blocksize : str, int or None, optional Number of bytes by which to cut up larger files. Default value is computed based on available physical memory and the number of cores, up to a maximum of 64MB. Can be a number like ``64000000`` or a string like ``"64MB"``. If ``None``, a single block is used for each file. sample : int, optional Number of bytes to use when determining dtypes assume_missing : bool, optional If True, all integer columns that aren't specified in ``dtype`` are assumed to contain missing values, and are converted to floats. Default is False. storage_options : dict, optional Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc. include_path_column : bool or str, optional Whether or not to include the path to each particular file. If True a new column is added to the dataframe called ``path``. If str, sets new column name. Default is False. **kwargs Extra keyword arguments to forward to :func:`pandas.{reader}`. Notes ----- Dask dataframe tries to infer the ``dtype`` of each column by reading a sample from the start of the file (or of the first file if it's a glob). Usually this works fine, but if the ``dtype`` is different later in the file (or in other files) this can cause issues. For example, if all the rows in the sample had integer dtypes, but later on there was a ``NaN``, then this would error at compute time. To fix this, you have a few options: - Provide explicit dtypes for the offending columns using the ``dtype`` keyword. This is the recommended solution. - Use the ``assume_missing`` keyword to assume that all columns inferred as integers contain missing values, and convert them to floats. - Increase the size of the sample using the ``sample`` keyword. It should also be noted that this function may fail if a {file_type} file includes quoted strings that contain the line terminator. To get around this you can specify ``blocksize=None`` to not split files into multiple partitions, at the cost of reduced parallelism. """ def make_reader(reader, reader_name, file_type): def read( urlpath, blocksize="default", lineterminator=None, compression="infer", sample=256000, sample_rows=10, enforce=False, assume_missing=False, storage_options=None, include_path_column=False, **kwargs, ): return read_pandas( reader, urlpath, blocksize=blocksize, lineterminator=lineterminator, compression=compression, sample=sample, sample_rows=sample_rows, enforce=enforce, assume_missing=assume_missing, storage_options=storage_options, include_path_column=include_path_column, **kwargs, ) read.__doc__ = READ_DOC_TEMPLATE.format(reader=reader_name, file_type=file_type) read.__name__ = reader_name return read read_csv = dataframe_creation_dispatch.register_inplace( backend="pandas", name="read_csv", )(make_reader(pd.read_csv, "read_csv", "CSV")) read_table = make_reader(pd.read_table, "read_table", "delimited") read_fwf = make_reader(pd.read_fwf, "read_fwf", "fixed-width") def _write_csv(df, fil, *, depend_on=None, **kwargs): with fil as f: df.to_csv(f, **kwargs) return os.path.normpath(fil.path) def to_csv( df, filename, single_file=False, encoding="utf-8", mode="wt", name_function=None, compression=None, compute=True, scheduler=None, storage_options=None, header_first_partition_only=None, compute_kwargs=None, **kwargs, ): """ Store Dask DataFrame to CSV files One filename per partition will be created. You can specify the filenames in a variety of ways. Use a globstring:: >>> df.to_csv('/path/to/data/export-*.csv') # doctest: +SKIP The * will be replaced by the increasing sequence 0, 1, 2, ... :: /path/to/data/export-0.csv /path/to/data/export-1.csv Use a globstring and a ``name_function=`` keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices. >>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1)) >>> name(0) '2015-01-01' >>> name(15) '2015-01-16' >>> df.to_csv('/path/to/data/export-*.csv', name_function=name) # doctest: +SKIP :: /path/to/data/export-2015-01-01.csv /path/to/data/export-2015-01-02.csv ... You can also provide an explicit list of paths:: >>> paths = ['/path/to/data/alice.csv', '/path/to/data/bob.csv', ...] # doctest: +SKIP >>> df.to_csv(paths) # doctest: +SKIP You can also provide a directory name: >>> df.to_csv('/path/to/data') # doctest: +SKIP The files will be numbered 0, 1, 2, (and so on) suffixed with '.part': :: /path/to/data/0.part /path/to/data/1.part Parameters ---------- df : dask.DataFrame Data to save filename : string or list Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` to save to remote filesystems. single_file : bool, default False Whether to save everything into a single CSV file. Under the single file mode, each partition is appended at the end of the specified CSV file. encoding : string, default 'utf-8' A string representing the encoding to use in the output file. mode : str, default 'w' Python file mode. The default is 'w' (or 'wt'), for writing a new file or overwriting an existing file in text mode. 'a' (or 'at') will append to an existing file in text mode or create a new file if it does not already exist. See :py:func:`open`. name_function : callable, default None Function accepting an integer (partition index) and producing a string to replace the asterisk in the given filename globstring. Should preserve the lexicographic order of partitions. Not supported when ``single_file`` is True. compression : string, optional A string representing the compression to use in the output file, allowed values are 'gzip', 'bz2', 'xz', only used when the first argument is a filename. compute : bool, default True If True, immediately executes. If False, returns a set of delayed objects, which can be computed at a later time. storage_options : dict Parameters passed on to the backend filesystem class. header_first_partition_only : bool, default None If set to True, only write the header row in the first output file. By default, headers are written to all partitions under the multiple file mode (``single_file`` is False) and written only once under the single file mode (``single_file`` is True). It must be True under the single file mode. compute_kwargs : dict, optional Options to be passed in to the compute method kwargs : dict, optional Additional parameters to pass to :meth:`pandas.DataFrame.to_csv`. Returns ------- The names of the file written if they were computed right away. If not, the delayed tasks associated with writing the files. Raises ------ ValueError If ``header_first_partition_only`` is set to False or ``name_function`` is specified when ``single_file`` is True. See Also -------- fsspec.open_files """ if single_file and name_function is not None: raise ValueError("name_function is not supported under the single file mode") if header_first_partition_only is None: header_first_partition_only = single_file elif not header_first_partition_only and single_file: raise ValueError( "header_first_partition_only cannot be False in the single file mode." ) file_options = dict( compression=compression, encoding=encoding, newline="", **(storage_options or {}), ) to_csv_chunk = delayed(_write_csv, pure=False) dfs = df.to_delayed() if single_file: first_file = open_file(filename, mode=mode, **file_options) value = to_csv_chunk(dfs[0], first_file, **kwargs) append_mode = mode.replace("w", "") + "a" append_file = open_file(filename, mode=append_mode, **file_options) kwargs["header"] = False for d in dfs[1:]: value = to_csv_chunk(d, append_file, depend_on=value, **kwargs) values = [value] files = [first_file] else: files = open_files( filename, mode=mode, name_function=name_function, num=df.npartitions, **file_options, ) values = [to_csv_chunk(dfs[0], files[0], **kwargs)] if header_first_partition_only: kwargs["header"] = False values.extend( [to_csv_chunk(d, f, **kwargs) for d, f in zip(dfs[1:], files[1:])] ) if compute: if compute_kwargs is None: compute_kwargs = dict() if scheduler is not None: warn( "The 'scheduler' keyword argument for `to_csv()` is deprecated and" "will be removed in a future version. " "Please use the `compute_kwargs` argument instead. " f"For example, df.to_csv(..., compute_kwargs={{scheduler: {scheduler}}})", FutureWarning, ) if ( scheduler is not None and compute_kwargs.get("scheduler") is not None and compute_kwargs.get("scheduler") != scheduler ): raise ValueError( f"Differing values for 'scheduler' have been passed in.\n" f"scheduler argument: {scheduler}\n" f"via compute_kwargs: {compute_kwargs.get('scheduler')}" ) if scheduler is not None and compute_kwargs.get("scheduler") is None: compute_kwargs["scheduler"] = scheduler import dask return list(dask.compute(*values, **compute_kwargs)) else: return values from dask.dataframe.core import _Frame _Frame.to_csv.__doc__ = to_csv.__doc__