import contextlib import glob import math import os import sys import warnings from decimal import Decimal from unittest.mock import MagicMock import numpy as np import pandas as pd import pytest from packaging.version import parse as parse_version import dask import dask.dataframe as dd import dask.multiprocessing from dask.array.numpy_compat import _numpy_124 from dask.blockwise import Blockwise, optimize_blockwise from dask.dataframe._compat import ( PANDAS_GT_110, PANDAS_GT_121, PANDAS_GT_130, PANDAS_GT_150, PANDAS_GT_200, ) from dask.dataframe.io.parquet.core import get_engine from dask.dataframe.io.parquet.utils import _parse_pandas_metadata from dask.dataframe.optimize import optimize_dataframe_getitem from dask.dataframe.utils import assert_eq from dask.layers import DataFrameIOLayer from dask.utils import natural_sort_key from dask.utils_test import hlg_layer try: import fastparquet except ImportError: fastparquet = False fastparquet_version = parse_version("0") else: fastparquet_version = parse_version(fastparquet.__version__) try: import pyarrow as pa except ImportError: pa = False pa_version = parse_version("0") else: pa_version = parse_version(pa.__version__) try: import pyarrow.parquet as pq except ImportError: pq = False SKIP_FASTPARQUET = not fastparquet FASTPARQUET_MARK = pytest.mark.skipif(SKIP_FASTPARQUET, reason="fastparquet not found") if sys.platform == "win32" and pa and pa_version == parse_version("2.0.0"): SKIP_PYARROW = True SKIP_PYARROW_REASON = ( "skipping pyarrow 2.0.0 on windows: " "https://github.com/dask/dask/issues/6093" "|https://github.com/dask/dask/issues/6754" ) else: SKIP_PYARROW = not pq SKIP_PYARROW_REASON = "pyarrow not found" PYARROW_MARK = pytest.mark.skipif(SKIP_PYARROW, reason=SKIP_PYARROW_REASON) nrows = 40 npartitions = 15 df = pd.DataFrame( { "x": [i * 7 % 5 for i in range(nrows)], # Not sorted "y": [i * 2.5 for i in range(nrows)], # Sorted }, index=pd.Index([10 * i for i in range(nrows)], name="myindex"), ) ddf = dd.from_pandas(df, npartitions=npartitions) @pytest.fixture( params=[ pytest.param("fastparquet", marks=FASTPARQUET_MARK), pytest.param("pyarrow", marks=PYARROW_MARK), ] ) def engine(request): return request.param def write_read_engines(**kwargs): """Product of both engines for write/read: To add custom marks, pass keyword of the form: `mark_writer_reader=reason`, or `mark_engine=reason` to apply to all parameters with that engine.""" backends = {"pyarrow", "fastparquet"} # Skip if uninstalled skip_marks = { "fastparquet": FASTPARQUET_MARK, "pyarrow": PYARROW_MARK, } marks = {(w, r): [skip_marks[w], skip_marks[r]] for w in backends for r in backends} # Custom marks for kw, val in kwargs.items(): kind, rest = kw.split("_", 1) key = tuple(rest.split("_")) if kind not in ("xfail", "skip") or len(key) > 2 or set(key) - backends: raise ValueError("unknown keyword %r" % kw) val = getattr(pytest.mark, kind)(reason=val) if len(key) == 2: marks[key].append(val) else: for k in marks: if key in k: marks[k].append(val) return pytest.mark.parametrize( ("write_engine", "read_engine"), [pytest.param(*k, marks=tuple(v)) for (k, v) in sorted(marks.items())], ) if ( fastparquet and fastparquet_version < parse_version("0.5") and PANDAS_GT_110 and not PANDAS_GT_121 ): # a regression in pandas 1.1.x / 1.2.0 caused a failure in writing partitioned # categorical columns when using fastparquet 0.4.x, but this was (accidentally) # fixed in fastparquet 0.5.0 fp_pandas_msg = "pandas with fastparquet engine does not preserve index" pyarrow_fastparquet_msg = "pyarrow schema and pandas metadata may disagree" fp_pandas_xfail = write_read_engines( **{ "xfail_pyarrow_fastparquet": pyarrow_fastparquet_msg, "xfail_fastparquet_fastparquet": fp_pandas_msg, "xfail_fastparquet_pyarrow": fp_pandas_msg, } ) else: fp_pandas_xfail = write_read_engines() @PYARROW_MARK def test_get_engine_pyarrow(): from dask.dataframe.io.parquet.arrow import ArrowDatasetEngine assert get_engine("pyarrow") == ArrowDatasetEngine assert get_engine("arrow") == ArrowDatasetEngine @FASTPARQUET_MARK def test_get_engine_fastparquet(): from dask.dataframe.io.parquet.fastparquet import FastParquetEngine assert get_engine("fastparquet") == FastParquetEngine @write_read_engines() @pytest.mark.parametrize("has_metadata", [False, True]) def test_local(tmpdir, write_engine, read_engine, has_metadata): tmp = str(tmpdir) data = pd.DataFrame( { "i32": np.arange(1000, dtype=np.int32), "i64": np.arange(1000, dtype=np.int64), "f": np.arange(1000, dtype=np.float64), "bhello": np.random.choice(["hello", "yo", "people"], size=1000).astype( "O" ), } ) df = dd.from_pandas(data, chunksize=500) kwargs = {"write_metadata_file": True} if has_metadata else {} df.to_parquet(tmp, write_index=False, engine=write_engine, **kwargs) files = os.listdir(tmp) assert ("_common_metadata" in files) == has_metadata assert ("_metadata" in files) == has_metadata assert "part.0.parquet" in files df2 = dd.read_parquet(tmp, index=False, engine=read_engine) assert len(df2.divisions) > 1 out = df2.compute(scheduler="sync").reset_index() for column in df.columns: assert (data[column] == out[column]).all() @pytest.mark.parametrize("index", [False, True]) @write_read_engines() def test_empty(tmpdir, write_engine, read_engine, index): fn = str(tmpdir) df = pd.DataFrame({"a": ["a", "b", "b"], "b": [4, 5, 6]})[:0] if index: df = df.set_index("a", drop=True) ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(fn, write_index=index, engine=write_engine, write_metadata_file=True) read_df = dd.read_parquet(fn, engine=read_engine) assert_eq(ddf, read_df) @write_read_engines() def test_simple(tmpdir, write_engine, read_engine): fn = str(tmpdir) df = pd.DataFrame({"a": ["a", "b", "b"], "b": [4, 5, 6]}) df = df.set_index("a", drop=True) ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(fn, engine=write_engine) read_df = dd.read_parquet( fn, index=["a"], engine=read_engine, calculate_divisions=True ) assert_eq(ddf, read_df) @write_read_engines() def test_delayed_no_metadata(tmpdir, write_engine, read_engine): fn = str(tmpdir) df = pd.DataFrame({"a": ["a", "b", "b"], "b": [4, 5, 6]}) df = df.set_index("a", drop=True) ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet( fn, engine=write_engine, compute=False, write_metadata_file=False ).compute() files = os.listdir(fn) assert "_metadata" not in files # Fastparquet doesn't currently handle a directory without "_metadata" read_df = dd.read_parquet( os.path.join(fn, "*.parquet"), index=["a"], engine=read_engine, calculate_divisions=True, ) assert_eq(ddf, read_df) @write_read_engines() def test_read_glob(tmpdir, write_engine, read_engine): tmp_path = str(tmpdir) ddf.to_parquet(tmp_path, engine=write_engine) if os.path.exists(os.path.join(tmp_path, "_metadata")): os.unlink(os.path.join(tmp_path, "_metadata")) files = os.listdir(tmp_path) assert "_metadata" not in files ddf2 = dd.read_parquet( os.path.join(tmp_path, "*.parquet"), engine=read_engine, index="myindex", # Must specify index without _metadata calculate_divisions=True, ) assert_eq(ddf, ddf2) @write_read_engines() def test_calculate_divisions_false(tmpdir, write_engine, read_engine): tmp_path = str(tmpdir) ddf.to_parquet(tmp_path, write_index=False, engine=write_engine) ddf2 = dd.read_parquet( tmp_path, engine=read_engine, index=False, calculate_divisions=False, ) assert_eq(ddf, ddf2, check_index=False, check_divisions=False) @write_read_engines() def test_read_list(tmpdir, write_engine, read_engine): if write_engine == read_engine == "fastparquet" and os.name == "nt": # fastparquet or dask is not normalizing filepaths correctly on # windows. pytest.skip("filepath bug.") tmpdir = str(tmpdir) ddf.to_parquet(tmpdir, engine=write_engine) files = sorted( ( os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if not f.endswith("_metadata") ), key=natural_sort_key, ) ddf2 = dd.read_parquet( files, engine=read_engine, index="myindex", calculate_divisions=True ) assert_eq(ddf, ddf2) @write_read_engines() def test_columns_auto_index(tmpdir, write_engine, read_engine): fn = str(tmpdir) ddf.to_parquet(fn, engine=write_engine) # ### Empty columns ### # With divisions if supported assert_eq( dd.read_parquet(fn, columns=[], engine=read_engine, calculate_divisions=True), ddf[[]], ) # No divisions assert_eq( dd.read_parquet(fn, columns=[], engine=read_engine, calculate_divisions=False), ddf[[]].clear_divisions(), check_divisions=True, ) # ### Single column, auto select index ### # With divisions if supported assert_eq( dd.read_parquet( fn, columns=["x"], engine=read_engine, calculate_divisions=True ), ddf[["x"]], ) # No divisions assert_eq( dd.read_parquet( fn, columns=["x"], engine=read_engine, calculate_divisions=False ), ddf[["x"]].clear_divisions(), check_divisions=True, ) @write_read_engines() def test_columns_index(tmpdir, write_engine, read_engine): fn = str(tmpdir) ddf.to_parquet(fn, engine=write_engine) # With Index # ---------- # ### Empty columns, specify index ### # With divisions if supported assert_eq( dd.read_parquet( fn, columns=[], engine=read_engine, index="myindex", calculate_divisions=True, ), ddf[[]], ) # No divisions assert_eq( dd.read_parquet( fn, columns=[], engine=read_engine, index="myindex", calculate_divisions=False, ), ddf[[]].clear_divisions(), check_divisions=True, ) # ### Single column, specify index ### # With divisions if supported assert_eq( dd.read_parquet( fn, index="myindex", columns=["x"], engine=read_engine, calculate_divisions=True, ), ddf[["x"]], ) # No divisions assert_eq( dd.read_parquet( fn, index="myindex", columns=["x"], engine=read_engine, calculate_divisions=False, ), ddf[["x"]].clear_divisions(), check_divisions=True, ) # ### Two columns, specify index ### # With divisions if supported assert_eq( dd.read_parquet( fn, index="myindex", columns=["x", "y"], engine=read_engine, calculate_divisions=True, ), ddf, ) # No divisions assert_eq( dd.read_parquet( fn, index="myindex", columns=["x", "y"], engine=read_engine, calculate_divisions=False, ), ddf.clear_divisions(), check_divisions=True, ) def test_nonsense_column(tmpdir, engine): fn = str(tmpdir) ddf.to_parquet(fn, engine=engine) with pytest.raises((ValueError, KeyError)): dd.read_parquet(fn, columns=["nonesense"], engine=engine) with pytest.raises((Exception, KeyError)): dd.read_parquet(fn, columns=["nonesense"] + list(ddf.columns), engine=engine) @write_read_engines() def test_columns_no_index(tmpdir, write_engine, read_engine): fn = str(tmpdir) ddf.to_parquet(fn, engine=write_engine) ddf2 = ddf.reset_index() # No Index # -------- # All columns, none as index assert_eq( dd.read_parquet(fn, index=False, engine=read_engine, calculate_divisions=True), ddf2, check_index=False, check_divisions=True, ) # Two columns, none as index assert_eq( dd.read_parquet( fn, index=False, columns=["x", "y"], engine=read_engine, calculate_divisions=True, ), ddf2[["x", "y"]], check_index=False, check_divisions=True, ) # One column and one index, all as columns assert_eq( dd.read_parquet( fn, index=False, columns=["myindex", "x"], engine=read_engine, calculate_divisions=True, ), ddf2[["myindex", "x"]], check_index=False, check_divisions=True, ) @write_read_engines() def test_calculate_divisions_no_index(tmpdir, write_engine, read_engine): fn = str(tmpdir) ddf.to_parquet(fn, engine=write_engine, write_index=False) df = dd.read_parquet(fn, engine=read_engine, index=False) assert df.index.name is None assert not df.known_divisions def test_columns_index_with_multi_index(tmpdir, engine): fn = os.path.join(str(tmpdir), "test.parquet") index = pd.MultiIndex.from_arrays( [np.arange(10), np.arange(10) + 1], names=["x0", "x1"] ) df = pd.DataFrame(np.random.randn(10, 2), columns=["a", "b"], index=index) df2 = df.reset_index(drop=False) if engine == "fastparquet": fastparquet.write(fn, df.reset_index(), write_index=False) else: pq.write_table(pa.Table.from_pandas(df.reset_index(), preserve_index=False), fn) ddf = dd.read_parquet(fn, engine=engine, index=index.names) assert_eq(ddf, df) d = dd.read_parquet(fn, columns="a", engine=engine, index=index.names) assert_eq(d, df["a"]) d = dd.read_parquet(fn, index=["a", "b"], columns=["x0", "x1"], engine=engine) assert_eq(d, df2.set_index(["a", "b"])[["x0", "x1"]]) # Just index d = dd.read_parquet(fn, index=False, engine=engine) assert_eq(d, df2) d = dd.read_parquet(fn, columns=["b"], index=["a"], engine=engine) assert_eq(d, df2.set_index("a")[["b"]]) d = dd.read_parquet(fn, columns=["a", "b"], index=["x0"], engine=engine) assert_eq(d, df2.set_index("x0")[["a", "b"]]) # Just columns d = dd.read_parquet(fn, columns=["x0", "a"], index=["x1"], engine=engine) assert_eq(d, df2.set_index("x1")[["x0", "a"]]) # Both index and columns d = dd.read_parquet(fn, index=False, columns=["x0", "b"], engine=engine) assert_eq(d, df2[["x0", "b"]]) for index in ["x1", "b"]: d = dd.read_parquet(fn, index=index, columns=["x0", "a"], engine=engine) assert_eq(d, df2.set_index(index)[["x0", "a"]]) # Columns and index intersect for index in ["a", "x0"]: with pytest.raises(ValueError): d = dd.read_parquet(fn, index=index, columns=["x0", "a"], engine=engine) # Series output for ind, col, sol_df in [ ("x1", "x0", df2.set_index("x1")), (False, "b", df2), (False, "x0", df2[["x0"]]), ("a", "x0", df2.set_index("a")[["x0"]]), ("a", "b", df2.set_index("a")), ]: d = dd.read_parquet(fn, index=ind, columns=col, engine=engine) assert_eq(d, sol_df[col]) @write_read_engines() def test_no_index(tmpdir, write_engine, read_engine): fn = str(tmpdir) df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(fn, engine=write_engine) ddf2 = dd.read_parquet(fn, engine=read_engine) assert_eq(df, ddf2, check_index=False) def test_read_series(tmpdir, engine): fn = str(tmpdir) ddf.to_parquet(fn, engine=engine) ddf2 = dd.read_parquet( fn, columns=["x"], index="myindex", engine=engine, calculate_divisions=True ) assert_eq(ddf[["x"]], ddf2) ddf2 = dd.read_parquet( fn, columns="x", index="myindex", engine=engine, calculate_divisions=True ) assert_eq(ddf.x, ddf2) def test_names(tmpdir, engine): fn = str(tmpdir) ddf.to_parquet(fn, engine=engine) def read(fn, **kwargs): return dd.read_parquet(fn, engine=engine, **kwargs) assert set(read(fn).dask) == set(read(fn).dask) assert set(read(fn).dask) != set(read(fn, columns=["x"]).dask) assert set(read(fn, columns=("x",)).dask) == set(read(fn, columns=["x"]).dask) @write_read_engines() def test_roundtrip_from_pandas(tmpdir, write_engine, read_engine): fn = str(tmpdir.join("test.parquet")) dfp = df.copy() dfp.index.name = "index" dfp.to_parquet( fn, engine="pyarrow" if write_engine.startswith("pyarrow") else "fastparquet" ) ddf = dd.read_parquet(fn, index="index", engine=read_engine) assert_eq(dfp, ddf) @write_read_engines() def test_roundtrip_nullable_dtypes(tmp_path, write_engine, read_engine): """ Test round-tripping nullable extension dtypes. Parquet engines will typically add dtype metadata for this. """ if read_engine == "fastparquet" or write_engine == "fastparquet": pytest.xfail("https://github.com/dask/fastparquet/issues/465") df = pd.DataFrame( { "a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64"), "b": pd.Series([True, pd.NA, False, True, False], dtype="boolean"), "c": pd.Series([0.1, 0.2, 0.3, pd.NA, 0.4], dtype="Float64"), "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype="string"), } ) ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(tmp_path, engine=write_engine) ddf2 = dd.read_parquet(tmp_path, engine=read_engine) assert_eq(df, ddf2) @PYARROW_MARK @pytest.mark.parametrize( "dtype_backend", [ "pandas", pytest.param( "pyarrow", marks=pytest.mark.skipif( not PANDAS_GT_150, reason="Requires pyarrow-backed nullable dtypes" ), ), ], ) def test_use_nullable_dtypes(tmp_path, engine, dtype_backend): """ Test reading a parquet file without pandas metadata, but forcing use of nullable dtypes where appropriate """ if dtype_backend == "pandas": dtype_extra = "" else: # dtype_backend == "pyarrow" dtype_extra = "[pyarrow]" df = pd.DataFrame( { "a": pd.Series([1, 2, pd.NA, 3, 4], dtype=f"Int64{dtype_extra}"), "b": pd.Series( [True, pd.NA, False, True, False], dtype=f"boolean{dtype_extra}" ), "c": pd.Series([0.1, 0.2, 0.3, pd.NA, 0.4], dtype=f"Float64{dtype_extra}"), "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype=f"string{dtype_extra}"), } ) ddf = dd.from_pandas(df, npartitions=2) @dask.delayed def write_partition(df, i): """Write a parquet file without the pandas metadata""" table = pa.Table.from_pandas(df).replace_schema_metadata({}) pq.write_table(table, tmp_path / f"part.{i}.parquet") # Create a pandas-metadata-free partitioned parquet. By default it will # not read into nullable extension dtypes partitions = ddf.to_delayed() dask.compute([write_partition(p, i) for i, p in enumerate(partitions)]) with dask.config.set({"dataframe.dtype_backend": dtype_backend}): # Not supported by fastparquet if engine == "fastparquet": with pytest.raises( ValueError, match="`use_nullable_dtypes` is not supported" ): dd.read_parquet(tmp_path, engine=engine, use_nullable_dtypes=True) # Works in pyarrow else: # Doesn't round-trip by default when we aren't using nullable dtypes with pytest.raises(AssertionError): ddf2 = dd.read_parquet(tmp_path, engine=engine) assert_eq(df, ddf2) # Round trip works when we use nullable dtypes ddf2 = dd.read_parquet(tmp_path, engine=engine, use_nullable_dtypes=True) assert_eq(df, ddf2, check_index=False) @PYARROW_MARK @pytest.mark.xfail( not PANDAS_GT_130, reason=( "Known bug in pandas. " "See https://issues.apache.org/jira/browse/ARROW-13413 " "and https://github.com/pandas-dev/pandas/pull/41052." ), ) def test_use_nullable_dtypes_with_types_mapper(tmp_path, engine): # Read in dataset with `use_nullable_dtypes=True` and a custom pyarrow `types_mapper`. # Ensure `types_mapper` takes priority. df = pd.DataFrame( { "a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64"), "b": pd.Series([True, pd.NA, False, True, False], dtype="boolean"), "c": pd.Series([0.1, 0.2, 0.3, pd.NA, 0.4], dtype="Float64"), "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype="string"), } ) ddf = dd.from_pandas(df, npartitions=3) ddf.to_parquet(tmp_path, engine=engine) types_mapper = { pa.int64(): pd.Float32Dtype(), } result = dd.read_parquet( tmp_path, engine="pyarrow", use_nullable_dtypes=True, arrow_to_pandas={"types_mapper": types_mapper.get}, ) expected = df.astype({"a": pd.Float32Dtype()}) assert_eq(result, expected) @write_read_engines() def test_categorical(tmpdir, write_engine, read_engine): tmp = str(tmpdir) df = pd.DataFrame({"x": ["a", "b", "c"] * 100}, dtype="category") ddf = dd.from_pandas(df, npartitions=3) dd.to_parquet(ddf, tmp, engine=write_engine) ddf2 = dd.read_parquet(tmp, categories="x", engine=read_engine) assert ddf2.compute().x.cat.categories.tolist() == ["a", "b", "c"] ddf2 = dd.read_parquet(tmp, categories=["x"], engine=read_engine) assert ddf2.compute().x.cat.categories.tolist() == ["a", "b", "c"] # autocat if read_engine == "fastparquet": ddf2 = dd.read_parquet(tmp, engine=read_engine) assert ddf2.compute().x.cat.categories.tolist() == ["a", "b", "c"] ddf2.loc[:1000].compute() assert assert_eq(df, ddf2) # dereference cats ddf2 = dd.read_parquet(tmp, categories=[], engine=read_engine) ddf2.loc[:1000].compute() assert (df.x == ddf2.x.compute()).all() @pytest.mark.parametrize("metadata_file", [False, True]) def test_append(tmpdir, engine, metadata_file): """Test that appended parquet equal to the original one.""" tmp = str(tmpdir) df = pd.DataFrame( { "i32": np.arange(1000, dtype=np.int32), "i64": np.arange(1000, dtype=np.int64), "f": np.arange(1000, dtype=np.float64), "bhello": np.random.choice(["hello", "yo", "people"], size=1000).astype( "O" ), } ) df.index.name = "index" half = len(df) // 2 ddf1 = dd.from_pandas(df.iloc[:half], chunksize=100) ddf2 = dd.from_pandas(df.iloc[half:], chunksize=100) ddf1.to_parquet(tmp, engine=engine, write_metadata_file=metadata_file) if metadata_file: with open(str(tmpdir.join("_metadata")), "rb") as f: metadata1 = f.read() ddf2.to_parquet(tmp, append=True, engine=engine) if metadata_file: with open(str(tmpdir.join("_metadata")), "rb") as f: metadata2 = f.read() assert metadata2 != metadata1 # 2nd write updated the metadata file ddf3 = dd.read_parquet(tmp, engine=engine) assert_eq(df, ddf3) def test_append_create(tmpdir, engine): """Test that appended parquet equal to the original one.""" tmp_path = str(tmpdir) df = pd.DataFrame( { "i32": np.arange(1000, dtype=np.int32), "i64": np.arange(1000, dtype=np.int64), "f": np.arange(1000, dtype=np.float64), "bhello": np.random.choice(["hello", "yo", "people"], size=1000).astype( "O" ), } ) df.index.name = "index" half = len(df) // 2 ddf1 = dd.from_pandas(df.iloc[:half], chunksize=100) ddf2 = dd.from_pandas(df.iloc[half:], chunksize=100) ddf1.to_parquet(tmp_path, append=True, engine=engine) ddf2.to_parquet(tmp_path, append=True, engine=engine) ddf3 = dd.read_parquet(tmp_path, engine=engine) assert_eq(df, ddf3) def test_append_with_partition(tmpdir, engine): tmp = str(tmpdir) df0 = pd.DataFrame( { "lat": np.arange(0, 10, dtype="int64"), "lon": np.arange(10, 20, dtype="int64"), "value": np.arange(100, 110, dtype="int64"), } ) df0.index.name = "index" df1 = pd.DataFrame( { "lat": np.arange(10, 20, dtype="int64"), "lon": np.arange(10, 20, dtype="int64"), "value": np.arange(120, 130, dtype="int64"), } ) df1.index.name = "index" # Check that nullable dtypes work # (see: https://github.com/dask/dask/issues/8373) df0["lat"] = df0["lat"].astype("Int64") df1["lat"].iloc[0] = np.nan df1["lat"] = df1["lat"].astype("Int64") dd_df0 = dd.from_pandas(df0, npartitions=1) dd_df1 = dd.from_pandas(df1, npartitions=1) dd.to_parquet(dd_df0, tmp, partition_on=["lon"], engine=engine) dd.to_parquet( dd_df1, tmp, partition_on=["lon"], append=True, ignore_divisions=True, engine=engine, ) out = dd.read_parquet( tmp, engine=engine, index="index", calculate_divisions=True ).compute() # convert categorical to plain int just to pass assert out["lon"] = out.lon.astype("int64") # sort required since partitioning breaks index order assert_eq( out.sort_values("value"), pd.concat([df0, df1])[out.columns], check_index=False ) def test_partition_on_cats(tmpdir, engine): tmp = str(tmpdir) d = pd.DataFrame( { "a": np.random.rand(50), "b": np.random.choice(["x", "y", "z"], size=50), "c": np.random.choice(["x", "y", "z"], size=50), } ) d = dd.from_pandas(d, 2) d.to_parquet(tmp, partition_on=["b"], engine=engine) df = dd.read_parquet(tmp, engine=engine) assert set(df.b.cat.categories) == {"x", "y", "z"} @PYARROW_MARK @pytest.mark.parametrize("meta", [False, True]) @pytest.mark.parametrize("stats", [False, True]) def test_partition_on_cats_pyarrow(tmpdir, stats, meta): tmp = str(tmpdir) d = pd.DataFrame( { "a": np.random.rand(50), "b": np.random.choice(["x", "y", "z"], size=50), "c": np.random.choice(["x", "y", "z"], size=50), } ) d = dd.from_pandas(d, 2) d.to_parquet(tmp, partition_on=["b"], engine="pyarrow", write_metadata_file=meta) df = dd.read_parquet(tmp, engine="pyarrow", calculate_divisions=stats) assert set(df.b.cat.categories) == {"x", "y", "z"} def test_partition_parallel_metadata(tmpdir, engine): # Check that parallel metadata collection works # for hive-partitioned data tmp = str(tmpdir) d = pd.DataFrame( { "a": np.random.rand(50), "b": np.random.choice(["x", "y", "z"], size=50), "c": np.random.choice(["x", "y", "z"], size=50), } ) d = dd.from_pandas(d, 2) d.to_parquet(tmp, partition_on=["b"], engine=engine, write_metadata_file=False) df = dd.read_parquet( tmp, engine=engine, calculate_divisions=True, metadata_task_size=1 ) assert set(df.b.cat.categories) == {"x", "y", "z"} def test_partition_on_cats_2(tmpdir, engine): tmp = str(tmpdir) d = pd.DataFrame( { "a": np.random.rand(50), "b": np.random.choice(["x", "y", "z"], size=50), "c": np.random.choice(["x", "y", "z"], size=50), } ) d = dd.from_pandas(d, 2) d.to_parquet(tmp, partition_on=["b", "c"], engine=engine) df = dd.read_parquet(tmp, engine=engine) assert set(df.b.cat.categories) == {"x", "y", "z"} assert set(df.c.cat.categories) == {"x", "y", "z"} df = dd.read_parquet(tmp, columns=["a", "c"], engine=engine) assert set(df.c.cat.categories) == {"x", "y", "z"} assert "b" not in df.columns assert_eq(df, df.compute()) df = dd.read_parquet(tmp, index="c", engine=engine) assert set(df.index.categories) == {"x", "y", "z"} assert "c" not in df.columns # series df = dd.read_parquet(tmp, columns="b", engine=engine) assert set(df.cat.categories) == {"x", "y", "z"} @pytest.mark.parametrize("metadata_file", [False, True]) def test_append_wo_index(tmpdir, engine, metadata_file): """Test append with write_index=False.""" tmp = str(tmpdir.join("tmp1.parquet")) df = pd.DataFrame( { "i32": np.arange(1000, dtype=np.int32), "i64": np.arange(1000, dtype=np.int64), "f": np.arange(1000, dtype=np.float64), "bhello": np.random.choice(["hello", "yo", "people"], size=1000).astype( "O" ), } ) half = len(df) // 2 ddf1 = dd.from_pandas(df.iloc[:half], chunksize=100) ddf2 = dd.from_pandas(df.iloc[half:], chunksize=100) ddf1.to_parquet(tmp, engine=engine, write_metadata_file=metadata_file) with pytest.raises(ValueError) as excinfo: ddf2.to_parquet(tmp, write_index=False, append=True, engine=engine) assert "Appended columns" in str(excinfo.value) tmp = str(tmpdir.join("tmp2.parquet")) ddf1.to_parquet( tmp, write_index=False, engine=engine, write_metadata_file=metadata_file ) ddf2.to_parquet(tmp, write_index=False, append=True, engine=engine) ddf3 = dd.read_parquet(tmp, index="f", engine=engine) assert_eq(df.set_index("f"), ddf3) @pytest.mark.parametrize("metadata_file", [False, True]) @pytest.mark.parametrize( ("index", "offset"), [ ( # There is some odd behavior with date ranges and pyarrow in some cirucmstances! # https://github.com/pandas-dev/pandas/issues/48573 pd.date_range("2022-01-01", "2022-01-31", freq="D"), pd.Timedelta(days=1), ), (pd.RangeIndex(0, 500, 1), 499), ], ) def test_append_overlapping_divisions(tmpdir, engine, metadata_file, index, offset): """Test raising of error when divisions overlapping.""" tmp = str(tmpdir) df = pd.DataFrame( { "i32": np.arange(len(index), dtype=np.int32), "i64": np.arange(len(index), dtype=np.int64), "f": np.arange(len(index), dtype=np.float64), "bhello": np.random.choice( ["hello", "yo", "people"], size=len(index) ).astype("O"), }, index=index, ) ddf1 = dd.from_pandas(df, chunksize=100) ddf2 = dd.from_pandas(df.set_index(df.index + offset), chunksize=100) ddf1.to_parquet(tmp, engine=engine, write_metadata_file=metadata_file) with pytest.raises(ValueError, match="overlap with previously written divisions"): ddf2.to_parquet(tmp, engine=engine, append=True) ddf2.to_parquet(tmp, engine=engine, append=True, ignore_divisions=True) def test_append_known_divisions_to_unknown_divisions_works(tmpdir, engine): tmp = str(tmpdir) df1 = pd.DataFrame( {"x": np.arange(100), "y": np.arange(100, 200)}, index=np.arange(100, 0, -1) ) ddf1 = dd.from_pandas(df1, npartitions=3, sort=False) df2 = pd.DataFrame({"x": np.arange(100, 200), "y": np.arange(200, 300)}) ddf2 = dd.from_pandas(df2, npartitions=3) # fastparquet always loads all metadata when appending, pyarrow only does # if a `_metadata` file exists. If we know the existing divisions aren't # sorted, then we want to skip erroring for overlapping divisions. Setting # `write_metadata_file=True` ensures this test works the same across both # engines. ddf1.to_parquet(tmp, engine=engine, write_metadata_file=True) ddf2.to_parquet(tmp, engine=engine, append=True) res = dd.read_parquet(tmp, engine=engine) sol = pd.concat([df1, df2]) assert_eq(res, sol) @pytest.mark.parametrize("metadata_file", [False, True]) def test_append_different_columns(tmpdir, engine, metadata_file): """Test raising of error when non equal columns.""" tmp = str(tmpdir) df1 = pd.DataFrame({"i32": np.arange(100, dtype=np.int32)}) df2 = pd.DataFrame({"i64": np.arange(100, dtype=np.int64)}) df3 = pd.DataFrame({"i32": np.arange(100, dtype=np.int64)}) ddf1 = dd.from_pandas(df1, chunksize=2) ddf2 = dd.from_pandas(df2, chunksize=2) ddf3 = dd.from_pandas(df3, chunksize=2) ddf1.to_parquet(tmp, engine=engine, write_metadata_file=metadata_file) with pytest.raises(ValueError) as excinfo: ddf2.to_parquet(tmp, engine=engine, append=True) assert "Appended columns" in str(excinfo.value) with pytest.raises(ValueError) as excinfo: ddf3.to_parquet(tmp, engine=engine, append=True) assert "Appended dtypes" in str(excinfo.value) def test_append_dict_column(tmpdir, engine): # See: https://github.com/dask/dask/issues/7492 if engine == "fastparquet": pytest.xfail("Fastparquet engine is missing dict-column support") elif pa_version < parse_version("1.0.1"): pytest.skip("PyArrow 1.0.1+ required for dict-column support.") tmp = str(tmpdir) dts = pd.date_range("2020-01-01", "2021-01-01") df = pd.DataFrame( {"value": [{"x": x} for x in range(len(dts))]}, index=dts, ) ddf1 = dd.from_pandas(df, npartitions=1) schema = {"value": pa.struct([("x", pa.int32())])} # Write ddf1 to tmp, and then append it again ddf1.to_parquet(tmp, append=True, engine=engine, schema=schema) ddf1.to_parquet( tmp, append=True, engine=engine, schema=schema, ignore_divisions=True ) # Read back all data (ddf1 + ddf1) ddf2 = dd.read_parquet(tmp, engine=engine) # Check computed result expect = pd.concat([df, df]) result = ddf2.compute() assert_eq(expect, result) @write_read_engines() def test_ordering(tmpdir, write_engine, read_engine): tmp = str(tmpdir) df = pd.DataFrame( {"a": [1, 2, 3], "b": [10, 20, 30], "c": [100, 200, 300]}, index=pd.Index([-1, -2, -3], name="myindex"), columns=["c", "a", "b"], ) ddf = dd.from_pandas(df, npartitions=2) dd.to_parquet(ddf, tmp, engine=write_engine) ddf2 = dd.read_parquet(tmp, index="myindex", engine=read_engine) assert_eq(ddf, ddf2, check_divisions=False) def test_read_parquet_custom_columns(tmpdir, engine): tmp = str(tmpdir) data = pd.DataFrame( {"i32": np.arange(1000, dtype=np.int32), "f": np.arange(1000, dtype=np.float64)} ) df = dd.from_pandas(data, chunksize=50) df.to_parquet(tmp, engine=engine) df2 = dd.read_parquet( tmp, columns=["i32", "f"], engine=engine, calculate_divisions=True ) assert_eq(df[["i32", "f"]], df2, check_index=False) fns = glob.glob(os.path.join(tmp, "*.parquet")) df2 = dd.read_parquet(fns, columns=["i32"], engine=engine).compute() df2.sort_values("i32", inplace=True) assert_eq(df[["i32"]], df2, check_index=False, check_divisions=False) df3 = dd.read_parquet( tmp, columns=["f", "i32"], engine=engine, calculate_divisions=True ) assert_eq(df[["f", "i32"]], df3, check_index=False) @pytest.mark.parametrize( "df,write_kwargs,read_kwargs", [ (pd.DataFrame({"x": [3, 2, 1]}), {}, {}), (pd.DataFrame({"x": ["c", "a", "b"]}), {}, {}), (pd.DataFrame({"x": ["cc", "a", "bbb"]}), {}, {}), ( pd.DataFrame({"x": [b"a", b"b", b"c"]}), {"object_encoding": "bytes", "schema": {"x": pa.binary()} if pa else None}, {}, ), ( pd.DataFrame({"x": pd.Categorical(["a", "b", "a"])}), {}, {"categories": ["x"]}, ), (pd.DataFrame({"x": pd.Categorical([1, 2, 1])}), {}, {"categories": ["x"]}), (pd.DataFrame({"x": list(map(pd.Timestamp, [3000, 2000, 1000]))}), {}, {}), (pd.DataFrame({"x": [3000, 2000, 1000]}).astype("M8[ns]"), {}, {}), (pd.DataFrame({"x": [3, 2, 1]}).astype("M8[ns]"), {}, {}), pytest.param( pd.DataFrame({"x": [3, 2, 1]}).astype("M8[us]"), {}, {}, marks=[ # fails on pyarrow pytest.mark.xfail( PANDAS_GT_200, reason="https://github.com/apache/arrow/issues/15079" ), # fails on fastparquet pytest.mark.xfail( PANDAS_GT_200, reason="https://github.com/dask/fastparquet/issues/837", ), ], ), pytest.param( pd.DataFrame({"x": [3, 2, 1]}).astype("M8[ms]"), {}, {}, marks=[ # fails on pyarrow pytest.mark.xfail( PANDAS_GT_200, reason="https://github.com/apache/arrow/issues/15079" ), # fails on fastparquet pytest.mark.xfail( PANDAS_GT_200, reason="https://github.com/dask/fastparquet/issues/837", ), ], ), (pd.DataFrame({"x": [3000, 2000, 1000]}).astype("datetime64[ns]"), {}, {}), (pd.DataFrame({"x": [3000, 2000, 1000]}).astype("datetime64[ns, UTC]"), {}, {}), (pd.DataFrame({"x": [3000, 2000, 1000]}).astype("datetime64[ns, CET]"), {}, {}), (pd.DataFrame({"x": [3, 2, 1]}).astype("uint16"), {}, {}), (pd.DataFrame({"x": [3, 2, 1]}).astype("float32"), {}, {}), (pd.DataFrame({"x": [3, 1, 2]}, index=[3, 2, 1]), {}, {}), (pd.DataFrame({"x": [3, 1, 5]}, index=pd.Index([1, 2, 3], name="foo")), {}, {}), (pd.DataFrame({"x": [1, 2, 3], "y": [3, 2, 1]}), {}, {}), (pd.DataFrame({"x": [1, 2, 3], "y": [3, 2, 1]}, columns=["y", "x"]), {}, {}), (pd.DataFrame({"0": [3, 2, 1]}), {}, {}), (pd.DataFrame({"x": [3, 2, None]}), {}, {}), (pd.DataFrame({"-": [3.0, 2.0, None]}), {}, {}), (pd.DataFrame({".": [3.0, 2.0, None]}), {}, {}), (pd.DataFrame({" ": [3.0, 2.0, None]}), {}, {}), ], ) def test_roundtrip(tmpdir, df, write_kwargs, read_kwargs, engine): if "x" in df and df.x.dtype == "M8[ns]" and "arrow" in engine: pytest.xfail(reason="Parquet pyarrow v1 doesn't support nanosecond precision") if ( "x" in df and df.x.dtype == "M8[ns]" and engine == "fastparquet" and fastparquet_version <= parse_version("0.6.3") ): pytest.xfail(reason="fastparquet doesn't support nanosecond precision yet") if ( PANDAS_GT_130 and read_kwargs.get("categories", None) and engine == "fastparquet" and fastparquet_version <= parse_version("0.6.3") ): pytest.xfail("https://github.com/dask/fastparquet/issues/577") tmp = str(tmpdir) if df.index.name is None: df.index.name = "index" ddf = dd.from_pandas(df, npartitions=2) oe = write_kwargs.pop("object_encoding", None) if oe and engine == "fastparquet": dd.to_parquet(ddf, tmp, engine=engine, object_encoding=oe, **write_kwargs) else: dd.to_parquet(ddf, tmp, engine=engine, **write_kwargs) ddf2 = dd.read_parquet( tmp, index=df.index.name, engine=engine, calculate_divisions=True, **read_kwargs ) if str(ddf2.dtypes.get("x")) == "UInt16" and engine == "fastparquet": # fastparquet choooses to use masked type to be able to get true repr of # 16-bit int assert_eq(ddf.astype("UInt16"), ddf2, check_divisions=False) else: assert_eq(ddf, ddf2, check_divisions=False) def test_categories(tmpdir, engine): fn = str(tmpdir) df = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": list("caaab")}) ddf = dd.from_pandas(df, npartitions=2) ddf["y"] = ddf.y.astype("category") ddf.to_parquet(fn, engine=engine) ddf2 = dd.read_parquet( fn, categories=["y"], engine=engine, calculate_divisions=True ) # Shouldn't need to specify categories explicitly ddf3 = dd.read_parquet(fn, engine=engine, calculate_divisions=True) assert_eq(ddf3, ddf2) with pytest.raises(NotImplementedError): ddf2.y.cat.categories assert set(ddf2.y.compute().cat.categories) == {"a", "b", "c"} cats_set = ddf2.map_partitions(lambda x: x.y.cat.categories.sort_values()).compute() assert cats_set.tolist() == ["a", "c", "a", "b"] if engine == "fastparquet": assert_eq(ddf.y, ddf2.y, check_names=False) with pytest.raises(TypeError): # attempt to load as category that which is not so encoded ddf2 = dd.read_parquet(fn, categories=["x"], engine=engine).compute() with pytest.raises((ValueError, FutureWarning)): # attempt to load as category unknown column ddf2 = dd.read_parquet(fn, categories=["foo"], engine=engine) def test_categories_unnamed_index(tmpdir, engine): # Check that we can handle an unnamed categorical index # https://github.com/dask/dask/issues/6885 tmpdir = str(tmpdir) df = pd.DataFrame( data={"A": [1, 2, 3], "B": ["a", "a", "b"]}, index=["x", "y", "y"] ) ddf = dd.from_pandas(df, npartitions=1) ddf = ddf.categorize(columns=["B"]) ddf.to_parquet(tmpdir, engine=engine) ddf2 = dd.read_parquet(tmpdir, engine=engine) assert_eq(ddf.index, ddf2.index, check_divisions=False) def test_empty_partition(tmpdir, engine): fn = str(tmpdir) df = pd.DataFrame({"a": range(10), "b": range(10)}) ddf = dd.from_pandas(df, npartitions=5) ddf2 = ddf[ddf.a <= 5] ddf2.to_parquet(fn, engine=engine) # Pyarrow engine will not filter out emtpy # partitions unless calculate_divisions=True ddf3 = dd.read_parquet(fn, engine=engine, calculate_divisions=True) assert ddf3.npartitions < 5 sol = ddf2.compute() assert_eq(sol, ddf3, check_names=False, check_index=False) @pytest.mark.parametrize("write_metadata", [True, False]) def test_timestamp_index(tmpdir, engine, write_metadata): fn = str(tmpdir) df = dd._compat.makeTimeDataFrame() df.index.name = "foo" ddf = dd.from_pandas(df, npartitions=5) ddf.to_parquet(fn, engine=engine, write_metadata_file=write_metadata) ddf2 = dd.read_parquet(fn, engine=engine, calculate_divisions=True) assert_eq(ddf, ddf2) @PYARROW_MARK @FASTPARQUET_MARK def test_to_parquet_fastparquet_default_writes_nulls(tmpdir): fn = str(tmpdir.join("test.parquet")) df = pd.DataFrame({"c1": [1.0, np.nan, 2, np.nan, 3]}) ddf = dd.from_pandas(df, npartitions=1) ddf.to_parquet(fn, engine="fastparquet") table = pq.read_table(fn) assert table[1].null_count == 2 @PYARROW_MARK def test_to_parquet_pyarrow_w_inconsistent_schema_by_partition_succeeds_w_manual_schema( tmpdir, ): # Data types to test: strings, arrays, ints, timezone aware timestamps in_arrays = [[0, 1, 2], [3, 4], np.nan, np.nan] out_arrays = [[0, 1, 2], [3, 4], None, None] in_strings = ["a", "b", np.nan, np.nan] out_strings = ["a", "b", None, None] tstamp = pd.Timestamp(1513393355, unit="s") in_tstamps = [tstamp, tstamp, pd.NaT, pd.NaT] out_tstamps = [ # Timestamps come out in numpy.datetime64 format tstamp.to_datetime64(), tstamp.to_datetime64(), np.datetime64("NaT"), np.datetime64("NaT"), ] timezone = "US/Eastern" tz_tstamp = pd.Timestamp(1513393355, unit="s", tz=timezone) in_tz_tstamps = [tz_tstamp, tz_tstamp, pd.NaT, pd.NaT] out_tz_tstamps = [ # Timezones do not make it through a write-read cycle. tz_tstamp.tz_convert(None).to_datetime64(), tz_tstamp.tz_convert(None).to_datetime64(), np.datetime64("NaT"), np.datetime64("NaT"), ] df = pd.DataFrame( { "partition_column": [0, 0, 1, 1], "arrays": in_arrays, "strings": in_strings, "tstamps": in_tstamps, "tz_tstamps": in_tz_tstamps, } ) ddf = dd.from_pandas(df, npartitions=2) schema = pa.schema( [ ("arrays", pa.list_(pa.int64())), ("strings", pa.string()), ("tstamps", pa.timestamp("ns")), ("tz_tstamps", pa.timestamp("ns", timezone)), ("partition_column", pa.int64()), ] ) ddf.to_parquet( str(tmpdir), engine="pyarrow", partition_on="partition_column", schema=schema ) ddf_after_write = ( dd.read_parquet(str(tmpdir), engine="pyarrow", calculate_divisions=False) .compute() .reset_index(drop=True) ) # Check array support arrays_after_write = ddf_after_write.arrays.values for i in range(len(df)): assert np.array_equal(arrays_after_write[i], out_arrays[i]), type(out_arrays[i]) # Check datetime support tstamps_after_write = ddf_after_write.tstamps.values for i in range(len(df)): # Need to test NaT separately if np.isnat(tstamps_after_write[i]): assert np.isnat(out_tstamps[i]) else: assert tstamps_after_write[i] == out_tstamps[i] # Check timezone aware datetime support tz_tstamps_after_write = ddf_after_write.tz_tstamps.values for i in range(len(df)): # Need to test NaT separately if np.isnat(tz_tstamps_after_write[i]): assert np.isnat(out_tz_tstamps[i]) else: assert tz_tstamps_after_write[i] == out_tz_tstamps[i] # Check string support assert np.array_equal(ddf_after_write.strings.values, out_strings) # Check partition column assert np.array_equal(ddf_after_write.partition_column, df.partition_column) @PYARROW_MARK @pytest.mark.parametrize("index", [False, True]) @pytest.mark.parametrize("schema", ["infer", "complex"]) def test_pyarrow_schema_inference(tmpdir, index, schema): if schema == "complex": schema = {"index": pa.string(), "amount": pa.int64()} tmpdir = str(tmpdir) df = pd.DataFrame( { "index": ["1", "2", "3", "2", "3", "1", "4"], "date": pd.to_datetime( [ "2017-01-01", "2017-01-01", "2017-01-01", "2017-01-02", "2017-01-02", "2017-01-06", "2017-01-09", ] ), "amount": [100, 200, 300, 400, 500, 600, 700], }, index=range(7, 14), ) if index: df = dd.from_pandas(df, npartitions=2).set_index("index") else: df = dd.from_pandas(df, npartitions=2) df.to_parquet(tmpdir, engine="pyarrow", schema=schema) df_out = dd.read_parquet(tmpdir, engine="pyarrow", calculate_divisions=True) assert_eq(df, df_out) @PYARROW_MARK def test_pyarrow_schema_mismatch_error(tmpdir): df1 = pd.DataFrame({"x": [1, 2, 3], "y": [4.5, 6, 7]}) df2 = pd.DataFrame({"x": [4, 5, 6], "y": ["a", "b", "c"]}) ddf = dd.from_delayed( [dask.delayed(df1), dask.delayed(df2)], meta=df1, verify_meta=False ) with pytest.raises(ValueError) as rec: ddf.to_parquet(str(tmpdir), engine="pyarrow") msg = str(rec.value) assert "Failed to convert partition to expected pyarrow schema" in msg assert "y: double" in str(rec.value) assert "y: string" in str(rec.value) @PYARROW_MARK def test_pyarrow_schema_mismatch_explicit_schema_none(tmpdir): df1 = pd.DataFrame({"x": [1, 2, 3], "y": [4.5, 6, 7]}) df2 = pd.DataFrame({"x": [4, 5, 6], "y": ["a", "b", "c"]}) ddf = dd.from_delayed( [dask.delayed(df1), dask.delayed(df2)], meta=df1, verify_meta=False ) ddf.to_parquet(str(tmpdir), engine="pyarrow", schema=None) res = dd.read_parquet(tmpdir, engine="pyarrow") sol = pd.concat([df1, df2]) # Only checking that the data was written correctly, we don't care about # the incorrect _meta from read_parquet assert_eq(res, sol, check_dtype=False) def test_partition_on(tmpdir, engine): tmpdir = str(tmpdir) df = pd.DataFrame( { "a1": np.random.choice(["A", "B", "C"], size=100), "a2": np.random.choice(["X", "Y", "Z"], size=100), "b": np.random.random(size=100), "c": np.random.randint(1, 5, size=100), "d": np.arange(0, 100), } ) d = dd.from_pandas(df, npartitions=2) d.to_parquet(tmpdir, partition_on=["a1", "a2"], engine=engine) # Note #1: Cross-engine functionality is missing # Note #2: The index is not preserved in pyarrow when partition_on is used out = dd.read_parquet( tmpdir, engine=engine, index=False, calculate_divisions=False ).compute() for val in df.a1.unique(): assert set(df.d[df.a1 == val]) == set(out.d[out.a1 == val]) # Now specify the columns and allow auto-index detection out = dd.read_parquet(tmpdir, engine=engine, columns=["d", "a2"]).compute() for val in df.a2.unique(): assert set(df.d[df.a2 == val]) == set(out.d[out.a2 == val]) def test_partition_on_duplicates(tmpdir, engine): # https://github.com/dask/dask/issues/6445 tmpdir = str(tmpdir) df = pd.DataFrame( { "a1": np.random.choice(["A", "B", "C"], size=100), "a2": np.random.choice(["X", "Y", "Z"], size=100), "data": np.random.random(size=100), } ) d = dd.from_pandas(df, npartitions=2) for _ in range(2): d.to_parquet(tmpdir, partition_on=["a1", "a2"], engine=engine) out = dd.read_parquet(tmpdir, engine=engine).compute() assert len(df) == len(out) for _, _, files in os.walk(tmpdir): for file in files: assert file in ( "part.0.parquet", "part.1.parquet", "_common_metadata", "_metadata", ) @PYARROW_MARK @pytest.mark.parametrize("partition_on", ["aa", ["aa"]]) def test_partition_on_string(tmpdir, partition_on): tmpdir = str(tmpdir) with dask.config.set(scheduler="single-threaded"): tmpdir = str(tmpdir) df = pd.DataFrame( { "aa": np.random.choice(["A", "B", "C"], size=100), "bb": np.random.random(size=100), "cc": np.random.randint(1, 5, size=100), } ) d = dd.from_pandas(df, npartitions=2) d.to_parquet( tmpdir, partition_on=partition_on, write_index=False, engine="pyarrow" ) out = dd.read_parquet( tmpdir, index=False, calculate_divisions=False, engine="pyarrow" ) out = out.compute() for val in df.aa.unique(): assert set(df.bb[df.aa == val]) == set(out.bb[out.aa == val]) @write_read_engines() def test_filters_categorical(tmpdir, write_engine, read_engine): tmpdir = str(tmpdir) cats = ["2018-01-01", "2018-01-02", "2018-01-03", "2018-01-04"] dftest = pd.DataFrame( { "dummy": [1, 1, 1, 1], "DatePart": pd.Categorical(cats, categories=cats, ordered=True), } ) ddftest = dd.from_pandas(dftest, npartitions=4).set_index("dummy") ddftest.to_parquet(tmpdir, partition_on="DatePart", engine=write_engine) ddftest_read = dd.read_parquet( tmpdir, index="dummy", engine=read_engine, filters=[(("DatePart", "<=", "2018-01-02"))], calculate_divisions=True, ) assert len(ddftest_read) == 2 @write_read_engines() def test_filters(tmpdir, write_engine, read_engine): tmp_path = str(tmpdir) df = pd.DataFrame({"x": range(10), "y": list("aabbccddee")}) ddf = dd.from_pandas(df, npartitions=5) assert ddf.npartitions == 5 ddf.to_parquet(tmp_path, engine=write_engine, write_metadata_file=True) a = dd.read_parquet(tmp_path, engine=read_engine, filters=[("x", ">", 4)]) assert a.npartitions == 3 assert (a.x > 3).all().compute() b = dd.read_parquet(tmp_path, engine=read_engine, filters=[("y", "==", "c")]) assert b.npartitions == 1 assert (b.y == "c").all().compute() c = dd.read_parquet( tmp_path, engine=read_engine, filters=[("y", "==", "c"), ("x", ">", 6)] ) assert c.npartitions <= 1 assert not len(c) assert_eq(c, c) d = dd.read_parquet( tmp_path, engine=read_engine, filters=[ # Select two overlapping ranges [("x", ">", 1), ("x", "<", 6)], [("x", ">", 3), ("x", "<", 8)], ], ) assert d.npartitions == 3 assert ((d.x > 1) & (d.x < 8)).all().compute() e = dd.read_parquet(tmp_path, engine=read_engine, filters=[("x", "in", (0, 9))]) assert e.npartitions == 2 assert ((e.x < 2) | (e.x > 7)).all().compute() f = dd.read_parquet(tmp_path, engine=read_engine, filters=[("y", "=", "c")]) assert f.npartitions == 1 assert len(f) assert (f.y == "c").all().compute() g = dd.read_parquet(tmp_path, engine=read_engine, filters=[("x", "!=", 1)]) assert g.npartitions == 5 @write_read_engines() def test_filters_v0(tmpdir, write_engine, read_engine): if write_engine == "fastparquet" or read_engine == "fastparquet": pytest.importorskip("fastparquet", minversion="0.3.1") # Recent versions of pyarrow support full row-wise filtering # (fastparquet and older pyarrow versions do not) pyarrow_row_filtering = read_engine == "pyarrow" fn = str(tmpdir) df = pd.DataFrame({"at": ["ab", "aa", "ba", "da", "bb"]}) ddf = dd.from_pandas(df, npartitions=1) # Ok with 1 partition and filters ddf.repartition(npartitions=1, force=True).to_parquet( fn, write_index=False, engine=write_engine ) ddf2 = dd.read_parquet( fn, index=False, engine=read_engine, filters=[("at", "==", "aa")] ).compute() ddf3 = dd.read_parquet( fn, index=False, engine=read_engine, filters=[("at", "=", "aa")] ).compute() if pyarrow_row_filtering: assert_eq(ddf2, ddf[ddf["at"] == "aa"], check_index=False) assert_eq(ddf3, ddf[ddf["at"] == "aa"], check_index=False) else: assert_eq(ddf2, ddf) assert_eq(ddf3, ddf) # with >1 partition and no filters ddf.repartition(npartitions=2, force=True).to_parquet(fn, engine=write_engine) ddf2 = dd.read_parquet(fn, engine=read_engine).compute() assert_eq(ddf2, ddf) # with >1 partition and filters using base fastparquet if read_engine == "fastparquet": ddf.repartition(npartitions=2, force=True).to_parquet(fn, engine=write_engine) df2 = fastparquet.ParquetFile(fn).to_pandas(filters=[("at", "==", "aa")]) df3 = fastparquet.ParquetFile(fn).to_pandas(filters=[("at", "=", "aa")]) assert len(df2) > 0 assert len(df3) > 0 # with >1 partition and filters ddf.repartition(npartitions=2, force=True).to_parquet(fn, engine=write_engine) ddf2 = dd.read_parquet( fn, engine=read_engine, filters=[("at", "==", "aa")] ).compute() ddf3 = dd.read_parquet( fn, engine=read_engine, filters=[("at", "=", "aa")] ).compute() assert len(ddf2) > 0 assert len(ddf3) > 0 assert_eq(ddf2, ddf3) def test_filtering_pyarrow_dataset(tmpdir, engine): pytest.importorskip("pyarrow", minversion="1.0.0") fn = str(tmpdir) df = pd.DataFrame({"aa": range(100), "bb": ["cat", "dog"] * 50}) ddf = dd.from_pandas(df, npartitions=10) ddf.to_parquet(fn, write_index=False, engine=engine, write_metadata_file=True) # Filtered read aa_lim = 40 bb_val = "dog" filters = [[("aa", "<", aa_lim), ("bb", "==", bb_val)]] ddf2 = dd.read_parquet(fn, index=False, engine="pyarrow", filters=filters) # Check that partitions are filtered for "aa" filter nonempty = 0 for part in ddf[ddf["aa"] < aa_lim].partitions: nonempty += int(len(part.compute()) > 0) assert ddf2.npartitions == nonempty # Check that rows are filtered for "aa" and "bb" filters df = df[df["aa"] < aa_lim] df = df[df["bb"] == bb_val] assert_eq(df, ddf2.compute(), check_index=False) def test_fiters_file_list(tmpdir, engine): df = pd.DataFrame({"x": range(10), "y": list("aabbccddee")}) ddf = dd.from_pandas(df, npartitions=5) ddf.to_parquet(str(tmpdir), engine=engine) fils = str(tmpdir.join("*.parquet")) ddf_out = dd.read_parquet( fils, calculate_divisions=True, engine=engine, filters=[("x", ">", 3)] ) assert ddf_out.npartitions == 3 assert_eq(df[df["x"] > 3], ddf_out.compute(), check_index=False) # Check that first parition gets filtered for single-path input ddf2 = dd.read_parquet( str(tmpdir.join("part.0.parquet")), calculate_divisions=True, engine=engine, filters=[("x", ">", 3)], ) assert len(ddf2) == 0 def test_pyarrow_filter_divisions(tmpdir): pytest.importorskip("pyarrow") # Write simple dataset with an index that will only # have a sorted index if certain row-groups are filtered out. # In this case, we filter "a" <= 3 to get a sorted # index. Otherwise, "a" is NOT monotonically increasing. df = pd.DataFrame({"a": [0, 1, 10, 12, 2, 3, 8, 9], "b": range(8)}).set_index("a") df.iloc[:4].to_parquet( str(tmpdir.join("file.0.parquet")), engine="pyarrow", row_group_size=2 ) df.iloc[4:].to_parquet( str(tmpdir.join("file.1.parquet")), engine="pyarrow", row_group_size=2 ) # Only works for ArrowDatasetEngine. # Legacy code will not apply filters on individual row-groups # when `split_row_groups=False`. ddf = dd.read_parquet( str(tmpdir), engine="pyarrow", split_row_groups=False, calculate_divisions=True, filters=[("a", "<=", 3)], ) assert ddf.divisions == (0, 2, 3) ddf = dd.read_parquet( str(tmpdir), engine="pyarrow", split_row_groups=True, calculate_divisions=True, filters=[("a", "<=", 3)], ) assert ddf.divisions == (0, 2, 3) def test_divisions_read_with_filters(tmpdir): pytest.importorskip("fastparquet", minversion="0.3.1") tmpdir = str(tmpdir) # generate dataframe size = 100 categoricals = [] for value in ["a", "b", "c", "d"]: categoricals += [value] * int(size / 4) df = pd.DataFrame( { "a": categoricals, "b": np.random.random(size=size), "c": np.random.randint(1, 5, size=size), } ) d = dd.from_pandas(df, npartitions=4) # save it d.to_parquet(tmpdir, write_index=True, partition_on=["a"], engine="fastparquet") # read it out = dd.read_parquet( tmpdir, engine="fastparquet", filters=[("a", "==", "b")], calculate_divisions=True, ) # test it expected_divisions = (25, 49) assert out.divisions == expected_divisions def test_divisions_are_known_read_with_filters(tmpdir): pytest.importorskip("fastparquet", minversion="0.3.1") tmpdir = str(tmpdir) # generate dataframe df = pd.DataFrame( { "unique": [0, 0, 1, 1, 2, 2, 3, 3], "id": ["id1", "id2", "id1", "id2", "id1", "id2", "id1", "id2"], }, index=[0, 0, 1, 1, 2, 2, 3, 3], ) d = dd.from_pandas(df, npartitions=2) # save it d.to_parquet(tmpdir, partition_on=["id"], engine="fastparquet") # read it out = dd.read_parquet( tmpdir, engine="fastparquet", filters=[("id", "==", "id1")], calculate_divisions=True, ) # test it assert out.known_divisions expected_divisions = (0, 2, 3) assert out.divisions == expected_divisions @FASTPARQUET_MARK @pytest.mark.xfail(reason="No longer accept ParquetFile objects") def test_read_from_fastparquet_parquetfile(tmpdir): fn = str(tmpdir) df = pd.DataFrame( { "a": np.random.choice(["A", "B", "C"], size=100), "b": np.random.random(size=100), "c": np.random.randint(1, 5, size=100), } ) d = dd.from_pandas(df, npartitions=2) d.to_parquet(fn, partition_on=["a"], engine="fastparquet") pq_f = fastparquet.ParquetFile(fn) # OK with no filters out = dd.read_parquet(pq_f).compute() for val in df.a.unique(): assert set(df.b[df.a == val]) == set(out.b[out.a == val]) # OK with filters out = dd.read_parquet(pq_f, filters=[("a", "==", "B")]).compute() assert set(df.b[df.a == "B"]) == set(out.b) # Engine should not be set to 'pyarrow' with pytest.raises(AssertionError): out = dd.read_parquet(pq_f, engine="pyarrow") @pytest.mark.parametrize("scheduler", ["threads", "processes"]) def test_to_parquet_lazy(tmpdir, scheduler, engine): tmpdir = str(tmpdir) df = pd.DataFrame({"a": [1, 2, 3, 4], "b": [1.0, 2.0, 3.0, 4.0]}) df.index.name = "index" ddf = dd.from_pandas(df, npartitions=2) value = ddf.to_parquet(tmpdir, compute=False, engine=engine) assert hasattr(value, "dask") value.compute(scheduler=scheduler) assert os.path.exists(tmpdir) ddf2 = dd.read_parquet(tmpdir, engine=engine, calculate_divisions=True) assert_eq(ddf, ddf2, check_divisions=False) @PYARROW_MARK @pytest.mark.parametrize("compute", [False, True]) def test_to_parquet_calls_invalidate_cache(tmpdir, monkeypatch, compute): from fsspec.implementations.local import LocalFileSystem invalidate_cache = MagicMock() monkeypatch.setattr(LocalFileSystem, "invalidate_cache", invalidate_cache) ddf.to_parquet(tmpdir, compute=compute, engine="pyarrow") path = LocalFileSystem._strip_protocol(str(tmpdir)) assert invalidate_cache.called assert invalidate_cache.call_args.args[0] == path @FASTPARQUET_MARK def test_timestamp96(tmpdir): fn = str(tmpdir) df = pd.DataFrame({"a": [pd.to_datetime("now", utc=True)]}) ddf = dd.from_pandas(df, 1) ddf.to_parquet(fn, engine="fastparquet", write_index=False, times="int96") pf = fastparquet.ParquetFile(fn) assert pf._schema[1].type == fastparquet.parquet_thrift.Type.INT96 out = dd.read_parquet(fn, engine="fastparquet", index=False).compute() assert_eq(out, df) @FASTPARQUET_MARK def test_drill_scheme(tmpdir): fn = str(tmpdir) N = 5 df1 = pd.DataFrame({c: np.random.random(N) for i, c in enumerate(["a", "b", "c"])}) df2 = pd.DataFrame({c: np.random.random(N) for i, c in enumerate(["a", "b", "c"])}) files = [] for d in ["test_data1", "test_data2"]: dn = os.path.join(fn, d) if not os.path.exists(dn): os.mkdir(dn) files.append(os.path.join(dn, "data1.parq")) fastparquet.write(files[0], df1) fastparquet.write(files[1], df2) df = dd.read_parquet(files, engine="fastparquet") assert "dir0" in df.columns out = df.compute() assert "dir0" in out assert (np.unique(out.dir0) == ["test_data1", "test_data2"]).all() def test_parquet_select_cats(tmpdir, engine): fn = str(tmpdir) df = pd.DataFrame( { "categories": pd.Series( np.random.choice(["a", "b", "c", "d", "e", "f"], size=100), dtype="category", ), "ints": pd.Series(list(range(0, 100)), dtype="int"), "floats": pd.Series(list(range(0, 100)), dtype="float"), } ) ddf = dd.from_pandas(df, 1) ddf.to_parquet(fn, engine=engine) rddf = dd.read_parquet(fn, columns=["ints"], engine=engine) assert list(rddf.columns) == ["ints"] rddf = dd.read_parquet(fn, engine=engine) assert list(rddf.columns) == list(df) def test_columns_name(tmpdir, engine): if engine == "fastparquet" and fastparquet_version <= parse_version("0.3.1"): pytest.skip("Fastparquet does not write column_indexes up to 0.3.1") tmp_path = str(tmpdir) df = pd.DataFrame({"A": [1, 2]}, index=pd.Index(["a", "b"], name="idx")) df.columns.name = "cols" ddf = dd.from_pandas(df, 2) ddf.to_parquet(tmp_path, engine=engine) result = dd.read_parquet(tmp_path, engine=engine, index=["idx"]) assert_eq(result, df) def check_compression(engine, filename, compression): if engine == "fastparquet": pf = fastparquet.ParquetFile(filename) md = pf.fmd.row_groups[0].columns[0].meta_data if compression is None: assert md.total_compressed_size == md.total_uncompressed_size else: assert md.total_compressed_size != md.total_uncompressed_size else: metadata = pa.parquet.read_metadata(os.path.join(filename, "_metadata")) names = metadata.schema.names for i in range(metadata.num_row_groups): row_group = metadata.row_group(i) for j in range(len(names)): column = row_group.column(j) if compression is None: assert ( column.total_compressed_size == column.total_uncompressed_size ) else: compress_expect = compression if compression == "default": compress_expect = "snappy" assert compress_expect.lower() == column.compression.lower() assert ( column.total_compressed_size != column.total_uncompressed_size ) @pytest.mark.parametrize("compression,", [None, "gzip", "snappy"]) def test_writing_parquet_with_compression(tmpdir, compression, engine): fn = str(tmpdir) df = pd.DataFrame({"x": ["a", "b", "c"] * 10, "y": [1, 2, 3] * 10}) df.index.name = "index" ddf = dd.from_pandas(df, npartitions=3) ddf.to_parquet(fn, compression=compression, engine=engine, write_metadata_file=True) out = dd.read_parquet(fn, engine=engine, calculate_divisions=True) assert_eq(out, ddf) check_compression(engine, fn, compression) @pytest.mark.parametrize("compression,", [None, "gzip", "snappy"]) def test_writing_parquet_with_partition_on_and_compression(tmpdir, compression, engine): fn = str(tmpdir) df = pd.DataFrame({"x": ["a", "b", "c"] * 10, "y": [1, 2, 3] * 10}) df.index.name = "index" ddf = dd.from_pandas(df, npartitions=3) ddf.to_parquet( fn, compression=compression, engine=engine, partition_on=["x"], write_metadata_file=True, ) check_compression(engine, fn, compression) @pytest.fixture( params=[ # fastparquet 0.1.3 { "columns": [ { "metadata": None, "name": "idx", "numpy_type": "int64", "pandas_type": "int64", }, { "metadata": None, "name": "A", "numpy_type": "int64", "pandas_type": "int64", }, ], "index_columns": ["idx"], "pandas_version": "0.21.0", }, # pyarrow 0.7.1 { "columns": [ { "metadata": None, "name": "A", "numpy_type": "int64", "pandas_type": "int64", }, { "metadata": None, "name": "idx", "numpy_type": "int64", "pandas_type": "int64", }, ], "index_columns": ["idx"], "pandas_version": "0.21.0", }, # pyarrow 0.8.0 { "column_indexes": [ { "field_name": None, "metadata": {"encoding": "UTF-8"}, "name": None, "numpy_type": "object", "pandas_type": "unicode", } ], "columns": [ { "field_name": "A", "metadata": None, "name": "A", "numpy_type": "int64", "pandas_type": "int64", }, { "field_name": "__index_level_0__", "metadata": None, "name": "idx", "numpy_type": "int64", "pandas_type": "int64", }, ], "index_columns": ["__index_level_0__"], "pandas_version": "0.21.0", }, # TODO: fastparquet update ] ) def pandas_metadata(request): return request.param def test_parse_pandas_metadata(pandas_metadata): index_names, column_names, mapping, column_index_names = _parse_pandas_metadata( pandas_metadata ) assert index_names == ["idx"] assert column_names == ["A"] assert column_index_names == [None] # for new pyarrow if pandas_metadata["index_columns"] == ["__index_level_0__"]: assert mapping == {"__index_level_0__": "idx", "A": "A"} else: assert mapping == {"idx": "idx", "A": "A"} assert isinstance(mapping, dict) def test_parse_pandas_metadata_null_index(): # pyarrow 0.7.1 None for index e_index_names = [None] e_column_names = ["x"] e_mapping = {"__index_level_0__": None, "x": "x"} e_column_index_names = [None] md = { "columns": [ { "metadata": None, "name": "x", "numpy_type": "int64", "pandas_type": "int64", }, { "metadata": None, "name": "__index_level_0__", "numpy_type": "int64", "pandas_type": "int64", }, ], "index_columns": ["__index_level_0__"], "pandas_version": "0.21.0", } index_names, column_names, mapping, column_index_names = _parse_pandas_metadata(md) assert index_names == e_index_names assert column_names == e_column_names assert mapping == e_mapping assert column_index_names == e_column_index_names # pyarrow 0.8.0 None for index md = { "column_indexes": [ { "field_name": None, "metadata": {"encoding": "UTF-8"}, "name": None, "numpy_type": "object", "pandas_type": "unicode", } ], "columns": [ { "field_name": "x", "metadata": None, "name": "x", "numpy_type": "int64", "pandas_type": "int64", }, { "field_name": "__index_level_0__", "metadata": None, "name": None, "numpy_type": "int64", "pandas_type": "int64", }, ], "index_columns": ["__index_level_0__"], "pandas_version": "0.21.0", } index_names, column_names, mapping, column_index_names = _parse_pandas_metadata(md) assert index_names == e_index_names assert column_names == e_column_names assert mapping == e_mapping assert column_index_names == e_column_index_names @PYARROW_MARK def test_read_no_metadata(tmpdir, engine): # use pyarrow.parquet to create a parquet file without # pandas metadata tmp = str(tmpdir) + "table.parq" table = pa.Table.from_arrays( [pa.array([1, 2, 3]), pa.array([3, 4, 5])], names=["A", "B"] ) pq.write_table(table, tmp) result = dd.read_parquet(tmp, engine=engine) expected = pd.DataFrame({"A": [1, 2, 3], "B": [3, 4, 5]}) assert_eq(result, expected) def test_parse_pandas_metadata_duplicate_index_columns(): md = { "column_indexes": [ { "field_name": None, "metadata": {"encoding": "UTF-8"}, "name": None, "numpy_type": "object", "pandas_type": "unicode", } ], "columns": [ { "field_name": "A", "metadata": None, "name": "A", "numpy_type": "int64", "pandas_type": "int64", }, { "field_name": "__index_level_0__", "metadata": None, "name": "A", "numpy_type": "object", "pandas_type": "unicode", }, ], "index_columns": ["__index_level_0__"], "pandas_version": "0.21.0", } ( index_names, column_names, storage_name_mapping, column_index_names, ) = _parse_pandas_metadata(md) assert index_names == ["A"] assert column_names == ["A"] assert storage_name_mapping == {"__index_level_0__": "A", "A": "A"} assert column_index_names == [None] def test_parse_pandas_metadata_column_with_index_name(): md = { "column_indexes": [ { "field_name": None, "metadata": {"encoding": "UTF-8"}, "name": None, "numpy_type": "object", "pandas_type": "unicode", } ], "columns": [ { "field_name": "A", "metadata": None, "name": "A", "numpy_type": "int64", "pandas_type": "int64", }, { "field_name": "__index_level_0__", "metadata": None, "name": "A", "numpy_type": "object", "pandas_type": "unicode", }, ], "index_columns": ["__index_level_0__"], "pandas_version": "0.21.0", } ( index_names, column_names, storage_name_mapping, column_index_names, ) = _parse_pandas_metadata(md) assert index_names == ["A"] assert column_names == ["A"] assert storage_name_mapping == {"__index_level_0__": "A", "A": "A"} assert column_index_names == [None] def test_writing_parquet_with_kwargs(tmpdir, engine): fn = str(tmpdir) path1 = os.path.join(fn, "normal") path2 = os.path.join(fn, "partitioned") df = pd.DataFrame( { "a": np.random.choice(["A", "B", "C"], size=100), "b": np.random.random(size=100), "c": np.random.randint(1, 5, size=100), } ) df.index.name = "index" ddf = dd.from_pandas(df, npartitions=3) engine_kwargs = { "pyarrow": { "compression": "snappy", "coerce_timestamps": None, "use_dictionary": True, }, "fastparquet": {"compression": "snappy", "times": "int64", "fixed_text": None}, } ddf.to_parquet(path1, engine=engine, **engine_kwargs[engine]) out = dd.read_parquet(path1, engine=engine, calculate_divisions=True) assert_eq(out, ddf, check_index=(engine != "fastparquet")) # Avoid race condition in pyarrow 0.8.0 on writing partitioned datasets with dask.config.set(scheduler="sync"): ddf.to_parquet( path2, engine=engine, partition_on=["a"], **engine_kwargs[engine] ) out = dd.read_parquet(path2, engine=engine).compute() for val in df.a.unique(): assert set(df.b[df.a == val]) == set(out.b[out.a == val]) def test_writing_parquet_with_unknown_kwargs(tmpdir, engine): fn = str(tmpdir) with pytest.raises(TypeError): ddf.to_parquet(fn, engine=engine, unknown_key="unknown_value") def test_to_parquet_with_get(tmpdir, engine): from dask.multiprocessing import get as mp_get tmpdir = str(tmpdir) flag = [False] def my_get(*args, **kwargs): flag[0] = True return mp_get(*args, **kwargs) df = pd.DataFrame({"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}) ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(tmpdir, engine=engine, compute_kwargs={"scheduler": my_get}) assert flag[0] result = dd.read_parquet(os.path.join(tmpdir, "*"), engine=engine) assert_eq(result, df, check_index=False) def test_select_partitioned_column(tmpdir, engine): fn = str(tmpdir) size = 20 d = { "signal1": np.random.normal(0, 0.3, size=size).cumsum() + 50, "fake_categorical1": np.random.choice(["A", "B", "C"], size=size), "fake_categorical2": np.random.choice(["D", "E", "F"], size=size), } df = dd.from_pandas(pd.DataFrame(d), 2) df.to_parquet( fn, compression="snappy", write_index=False, engine=engine, partition_on=["fake_categorical1", "fake_categorical2"], ) df_partitioned = dd.read_parquet(fn, engine=engine) df_partitioned[df_partitioned.fake_categorical1 == "A"].compute() def test_with_tz(tmpdir, engine): if engine == "fastparquet" and fastparquet_version < parse_version("0.3.0"): pytest.skip("fastparquet<0.3.0 did not support this") with warnings.catch_warnings(): if engine == "fastparquet": # fastparquet-442 warnings.simplefilter("ignore", FutureWarning) # pandas 0.25 fn = str(tmpdir) df = pd.DataFrame([[0]], columns=["a"], dtype="datetime64[ns, UTC]") df = dd.from_pandas(df, 1) df.to_parquet(fn, engine=engine) df2 = dd.read_parquet(fn, engine=engine) assert_eq(df, df2, check_divisions=False, check_index=False) @PYARROW_MARK def test_arrow_partitioning(tmpdir): # Issue #3518 path = str(tmpdir) data = { "p": np.repeat(np.arange(3), 2).astype(np.int8), "b": np.repeat(-1, 6).astype(np.int16), "c": np.repeat(-2, 6).astype(np.float32), "d": np.repeat(-3, 6).astype(np.float64), } pdf = pd.DataFrame(data) ddf = dd.from_pandas(pdf, npartitions=2) ddf.to_parquet(path, engine="pyarrow", write_index=False, partition_on="p") ddf = dd.read_parquet(path, index=False, engine="pyarrow") ddf.astype({"b": np.float32}).compute() def test_informative_error_messages(): with pytest.raises(ValueError) as info: dd.read_parquet("foo", engine="foo") assert "foo" in str(info.value) assert "arrow" in str(info.value) assert "fastparquet" in str(info.value) def test_append_cat_fp(tmpdir, engine): path = str(tmpdir) # https://github.com/dask/dask/issues/4120 df = pd.DataFrame({"x": ["a", "a", "b", "a", "b"]}) df["x"] = df["x"].astype("category") ddf = dd.from_pandas(df, npartitions=1) dd.to_parquet(ddf, path, engine=engine) dd.to_parquet(ddf, path, append=True, ignore_divisions=True, engine=engine) d = dd.read_parquet(path, engine=engine).compute() assert d["x"].tolist() == ["a", "a", "b", "a", "b"] * 2 @PYARROW_MARK @pytest.mark.parametrize( "df", [ pd.DataFrame({"x": [4, 5, 6, 1, 2, 3]}), pd.DataFrame({"x": ["c", "a", "b"]}), pd.DataFrame({"x": ["cc", "a", "bbb"]}), pytest.param(pd.DataFrame({"x": pd.Categorical(["a", "b", "a"])})), pytest.param(pd.DataFrame({"x": pd.Categorical([1, 2, 1])})), pd.DataFrame({"x": list(map(pd.Timestamp, [3000000, 2000000, 1000000]))}), # ms pd.DataFrame({"x": list(map(pd.Timestamp, [3000, 2000, 1000]))}), # us pd.DataFrame({"x": [3000, 2000, 1000]}).astype("M8[ns]"), # pd.DataFrame({'x': [3, 2, 1]}).astype('M8[ns]'), # Casting errors pytest.param( pd.DataFrame({"x": [3, 2, 1]}).astype("M8[us]"), marks=pytest.mark.xfail( PANDAS_GT_200, reason="https://github.com/apache/arrow/issues/15079" ), ), pytest.param( pd.DataFrame({"x": [3, 2, 1]}).astype("M8[ms]"), marks=pytest.mark.xfail( PANDAS_GT_200, reason="https://github.com/apache/arrow/issues/15079" ), ), pd.DataFrame({"x": [3, 2, 1]}).astype("uint16"), pd.DataFrame({"x": [3, 2, 1]}).astype("float32"), pd.DataFrame({"x": [3, 1, 2]}, index=[3, 2, 1]), pd.DataFrame( {"x": [4, 5, 6, 1, 2, 3]}, index=pd.Index([1, 2, 3, 4, 5, 6], name="foo") ), pd.DataFrame({"x": [1, 2, 3], "y": [3, 2, 1]}), pd.DataFrame({"x": [1, 2, 3], "y": [3, 2, 1]}, columns=["y", "x"]), pd.DataFrame({"0": [3, 2, 1]}), pd.DataFrame({"x": [3, 2, None]}), pd.DataFrame({"-": [3.0, 2.0, None]}), pd.DataFrame({".": [3.0, 2.0, None]}), pd.DataFrame({" ": [3.0, 2.0, None]}), ], ) def test_roundtrip_arrow(tmpdir, df): # Index will be given a name when preserved as index tmp_path = str(tmpdir) if not df.index.name: df.index.name = "index" ddf = dd.from_pandas(df, npartitions=2) dd.to_parquet(ddf, tmp_path, engine="pyarrow", write_index=True) ddf2 = dd.read_parquet(tmp_path, engine="pyarrow", calculate_divisions=True) assert_eq(ddf, ddf2) def test_datasets_timeseries(tmpdir, engine): tmp_path = str(tmpdir) df = dask.datasets.timeseries( start="2000-01-01", end="2000-01-10", freq="1d" ).persist() df.to_parquet(tmp_path, engine=engine) df2 = dd.read_parquet(tmp_path, engine=engine, calculate_divisions=True) assert_eq(df, df2) def test_pathlib_path(tmpdir, engine): import pathlib df = pd.DataFrame({"x": [4, 5, 6, 1, 2, 3]}) df.index.name = "index" ddf = dd.from_pandas(df, npartitions=2) path = pathlib.Path(str(tmpdir)) ddf.to_parquet(path, engine=engine) ddf2 = dd.read_parquet(path, engine=engine, calculate_divisions=True) assert_eq(ddf, ddf2) @FASTPARQUET_MARK def test_categories_large(tmpdir, engine): # Issue #5112 fn = str(tmpdir.join("parquet_int16.parq")) numbers = np.random.randint(0, 800000, size=1000000) df = pd.DataFrame(numbers.T, columns=["name"]) df.name = df.name.astype("category") df.to_parquet(fn, engine="fastparquet", compression="uncompressed") ddf = dd.read_parquet(fn, engine=engine, categories={"name": 80000}) assert_eq(sorted(df.name.cat.categories), sorted(ddf.compute().name.cat.categories)) @write_read_engines() def test_read_glob_no_meta(tmpdir, write_engine, read_engine): tmp_path = str(tmpdir) ddf.to_parquet(tmp_path, engine=write_engine) ddf2 = dd.read_parquet( os.path.join(tmp_path, "*.parquet"), engine=read_engine, calculate_divisions=False, ) assert_eq(ddf, ddf2, check_divisions=False) @write_read_engines() def test_read_glob_yes_meta(tmpdir, write_engine, read_engine): tmp_path = str(tmpdir) ddf.to_parquet(tmp_path, engine=write_engine, write_metadata_file=True) paths = glob.glob(os.path.join(tmp_path, "*.parquet")) paths.append(os.path.join(tmp_path, "_metadata")) ddf2 = dd.read_parquet(paths, engine=read_engine, calculate_divisions=False) assert_eq(ddf, ddf2, check_divisions=False) @pytest.mark.parametrize("divisions", [True, False]) @pytest.mark.parametrize("remove_common", [True, False]) @write_read_engines() def test_read_dir_nometa(tmpdir, write_engine, read_engine, divisions, remove_common): tmp_path = str(tmpdir) ddf.to_parquet(tmp_path, engine=write_engine, write_metadata_file=True) if os.path.exists(os.path.join(tmp_path, "_metadata")): os.unlink(os.path.join(tmp_path, "_metadata")) files = os.listdir(tmp_path) assert "_metadata" not in files if remove_common and os.path.exists(os.path.join(tmp_path, "_common_metadata")): os.unlink(os.path.join(tmp_path, "_common_metadata")) ddf2 = dd.read_parquet(tmp_path, engine=read_engine, calculate_divisions=divisions) assert_eq(ddf, ddf2, check_divisions=divisions) @write_read_engines() def test_statistics_nometa(tmpdir, write_engine, read_engine): tmp_path = str(tmpdir) ddf.to_parquet(tmp_path, engine=write_engine, write_metadata_file=False) ddf2 = dd.read_parquet(tmp_path, engine=read_engine, calculate_divisions=True) assert_eq(ddf, ddf2) @pytest.mark.parametrize("schema", ["infer", None]) def test_timeseries_nulls_in_schema(tmpdir, engine, schema): # GH#5608: relative path failing _metadata/_common_metadata detection. tmp_path = str(tmpdir.mkdir("files")) tmp_path = os.path.join(tmp_path, "../", "files") ddf2 = ( dask.datasets.timeseries(start="2000-01-01", end="2000-01-03", freq="1h") .reset_index() .map_partitions(lambda x: x.loc[:5]) ) ddf2 = ddf2.set_index("x").reset_index().persist() ddf2.name = ddf2.name.where(ddf2.timestamp == "2000-01-01", None) # Note: `append_row_groups` will fail with pyarrow>0.17.1 for _metadata write ddf2.to_parquet(tmp_path, engine=engine, write_metadata_file=False, schema=schema) ddf_read = dd.read_parquet(tmp_path, engine=engine) assert_eq(ddf_read, ddf2, check_divisions=False, check_index=False) def test_graph_size_pyarrow(tmpdir, engine): import pickle fn = str(tmpdir) ddf1 = dask.datasets.timeseries( start="2000-01-01", end="2000-01-02", freq="60S", partition_freq="1H" ) ddf1.to_parquet(fn, engine=engine) ddf2 = dd.read_parquet(fn, engine=engine) assert len(pickle.dumps(ddf2.__dask_graph__())) < 25000 @pytest.mark.parametrize("preserve_index", [True, False]) @pytest.mark.parametrize("index", [None, np.random.permutation(2000)]) def test_getitem_optimization(tmpdir, engine, preserve_index, index): tmp_path_rd = str(tmpdir.mkdir("read")) tmp_path_wt = str(tmpdir.mkdir("write")) df = pd.DataFrame( {"A": [1, 2] * 1000, "B": [3, 4] * 1000, "C": [5, 6] * 1000}, index=index ) df.index.name = "my_index" ddf = dd.from_pandas(df, 2, sort=False) ddf.to_parquet(tmp_path_rd, engine=engine, write_index=preserve_index) ddf = dd.read_parquet(tmp_path_rd, engine=engine)["B"] # Write ddf back to disk to check that the round trip # preserves the getitem optimization out = ddf.to_frame().to_parquet(tmp_path_wt, engine=engine, compute=False) dsk = optimize_dataframe_getitem(out.dask, keys=[out.key]) subgraph_rd = hlg_layer(dsk, "read-parquet") assert isinstance(subgraph_rd, DataFrameIOLayer) assert subgraph_rd.columns == ["B"] assert next(iter(subgraph_rd.dsk.values()))[0].columns == ["B"] subgraph_wt = hlg_layer(dsk, "to-parquet") assert isinstance(subgraph_wt, Blockwise) assert_eq(ddf.compute(optimize_graph=False), ddf.compute()) def test_getitem_optimization_empty(tmpdir, engine): df = pd.DataFrame({"A": [1] * 100, "B": [2] * 100, "C": [3] * 100, "D": [4] * 100}) ddf = dd.from_pandas(df, 2, sort=False) fn = os.path.join(str(tmpdir)) ddf.to_parquet(fn, engine=engine) ddf2 = dd.read_parquet(fn, engine=engine)[[]] dsk = optimize_dataframe_getitem(ddf2.dask, keys=[ddf2._name]) subgraph = next(l for l in dsk.layers.values() if isinstance(l, DataFrameIOLayer)) assert subgraph.columns == [] assert_eq(ddf2, ddf[[]]) def test_getitem_optimization_multi(tmpdir, engine): df = pd.DataFrame({"A": [1] * 100, "B": [2] * 100, "C": [3] * 100, "D": [4] * 100}) ddf = dd.from_pandas(df, 2) fn = os.path.join(str(tmpdir)) ddf.to_parquet(fn, engine=engine) a = dd.read_parquet(fn, engine=engine)["B"] b = dd.read_parquet(fn, engine=engine)[["C"]] c = dd.read_parquet(fn, engine=engine)[["C", "A"]] a1, a2, a3 = dask.compute(a, b, c) b1, b2, b3 = dask.compute(a, b, c, optimize_graph=False) assert_eq(a1, b1) assert_eq(a2, b2) assert_eq(a3, b3) def test_getitem_optimization_after_filter(tmpdir, engine): df = pd.DataFrame({"a": [1, 2, 3] * 5, "b": range(15), "c": range(15)}) dd.from_pandas(df, npartitions=3).to_parquet(tmpdir, engine=engine) ddf = dd.read_parquet(tmpdir, engine=engine) df2 = df[df["b"] > 10][["a"]] ddf2 = ddf[ddf["b"] > 10][["a"]] dsk = optimize_dataframe_getitem(ddf2.dask, keys=[ddf2._name]) subgraph_rd = hlg_layer(dsk, "read-parquet") assert isinstance(subgraph_rd, DataFrameIOLayer) assert set(subgraph_rd.columns) == {"a", "b"} assert_eq(df2, ddf2) def test_getitem_optimization_after_filter_complex(tmpdir, engine): df = pd.DataFrame({"a": [1, 2, 3] * 5, "b": range(15), "c": range(15)}) dd.from_pandas(df, npartitions=3).to_parquet(tmpdir, engine=engine) ddf = dd.read_parquet(tmpdir, engine=engine) df2 = df[["b"]] df2 = df2.assign(d=1) df2 = df[df2["d"] == 1][["b"]] ddf2 = ddf[["b"]] ddf2 = ddf2.assign(d=1) ddf2 = ddf[ddf2["d"] == 1][["b"]] dsk = optimize_dataframe_getitem(ddf2.dask, keys=[ddf2._name]) subgraph_rd = hlg_layer(dsk, "read-parquet") assert isinstance(subgraph_rd, DataFrameIOLayer) assert set(subgraph_rd.columns) == {"b"} assert_eq(df2, ddf2) def test_layer_creation_info(tmpdir, engine): df = pd.DataFrame({"a": range(10), "b": ["cat", "dog"] * 5}) dd.from_pandas(df, npartitions=1).to_parquet( tmpdir, engine=engine, partition_on=["b"] ) # Apply filters directly in dd.read_parquet filters = [("b", "==", "cat")] ddf1 = dd.read_parquet(tmpdir, engine=engine, filters=filters) assert "dog" not in ddf1["b"].compute() # Results will not match if we use dd.read_parquet # without filters ddf2 = dd.read_parquet(tmpdir, engine=engine) with pytest.raises(AssertionError): assert_eq(ddf1, ddf2) # However, we can use `creation_info` to regenerate # the same collection with `filters` defined info = ddf2.dask.layers[ddf2._name].creation_info kwargs = info.get("kwargs", {}) kwargs["filters"] = filters ddf3 = info["func"](*info.get("args", []), **kwargs) assert_eq(ddf1, ddf3) def test_blockwise_parquet_annotations(tmpdir, engine): df = pd.DataFrame({"a": np.arange(40, dtype=np.int32)}) expect = dd.from_pandas(df, npartitions=2) expect.to_parquet(str(tmpdir), engine=engine) with dask.annotate(foo="bar"): ddf = dd.read_parquet(str(tmpdir), engine=engine) # `ddf` should now have ONE Blockwise layer layers = ddf.__dask_graph__().layers assert len(layers) == 1 layer = next(iter(layers.values())) assert isinstance(layer, DataFrameIOLayer) assert layer.annotations == {"foo": "bar"} def test_optimize_blockwise_parquet(tmpdir, engine): size = 40 npartitions = 2 tmp = str(tmpdir) df = pd.DataFrame({"a": np.arange(size, dtype=np.int32)}) expect = dd.from_pandas(df, npartitions=npartitions) expect.to_parquet(tmp, engine=engine) ddf = dd.read_parquet(tmp, engine=engine, calculate_divisions=True) # `ddf` should now have ONE Blockwise layer layers = ddf.__dask_graph__().layers assert len(layers) == 1 assert isinstance(list(layers.values())[0], Blockwise) # Check single-layer result assert_eq(ddf, expect) # Increment by 1 ddf += 1 expect += 1 # Increment by 10 ddf += 10 expect += 10 # `ddf` should now have THREE Blockwise layers layers = ddf.__dask_graph__().layers assert len(layers) == 3 assert all(isinstance(layer, Blockwise) for layer in layers.values()) # Check that `optimize_blockwise` fuses all three # `Blockwise` layers together into a singe `Blockwise` layer keys = [(ddf._name, i) for i in range(npartitions)] graph = optimize_blockwise(ddf.__dask_graph__(), keys) layers = graph.layers name = list(layers.keys())[0] assert len(layers) == 1 assert isinstance(layers[name], Blockwise) # Check final result assert_eq(ddf, expect) @PYARROW_MARK def test_split_row_groups(tmpdir, engine): """Test split_row_groups read_parquet kwarg""" tmp = str(tmpdir) df = pd.DataFrame( {"i32": np.arange(800, dtype=np.int32), "f": np.arange(800, dtype=np.float64)} ) df.index.name = "index" half = len(df) // 2 dd.from_pandas(df.iloc[:half], npartitions=2).to_parquet( tmp, engine="pyarrow", row_group_size=100 ) ddf3 = dd.read_parquet(tmp, engine=engine, split_row_groups=True) assert ddf3.npartitions == 4 ddf3 = dd.read_parquet( tmp, engine=engine, calculate_divisions=True, split_row_groups=False ) assert ddf3.npartitions == 2 dd.from_pandas(df.iloc[half:], npartitions=2).to_parquet( tmp, append=True, engine="pyarrow", row_group_size=50 ) ddf3 = dd.read_parquet( tmp, engine=engine, calculate_divisions=True, split_row_groups=True, ) assert ddf3.npartitions == 12 ddf3 = dd.read_parquet( tmp, engine=engine, calculate_divisions=True, split_row_groups=False ) assert ddf3.npartitions == 4 @PYARROW_MARK @pytest.mark.parametrize("split_row_groups", [1, 12]) @pytest.mark.parametrize("calculate_divisions", [True, False]) def test_split_row_groups_int(tmpdir, split_row_groups, calculate_divisions, engine): tmp = str(tmpdir) row_group_size = 10 npartitions = 4 half_size = 400 df = pd.DataFrame( { "i32": np.arange(2 * half_size, dtype=np.int32), "f": np.arange(2 * half_size, dtype=np.float64), } ) half = len(df) // 2 dd.from_pandas(df.iloc[:half], npartitions=npartitions).to_parquet( tmp, engine="pyarrow", row_group_size=row_group_size ) dd.from_pandas(df.iloc[half:], npartitions=npartitions).to_parquet( tmp, append=True, engine="pyarrow", row_group_size=row_group_size ) ddf2 = dd.read_parquet( tmp, engine=engine, split_row_groups=split_row_groups, calculate_divisions=calculate_divisions, ) expected_rg_cout = int(half_size / row_group_size) assert ddf2.npartitions == 2 * math.ceil(expected_rg_cout / split_row_groups) @PYARROW_MARK @pytest.mark.parametrize("split_row_groups", [8, 25]) def test_split_row_groups_int_aggregate_files(tmpdir, engine, split_row_groups): # Use pyarrow to write a multi-file dataset with # multiple row-groups per file row_group_size = 10 size = 800 df = pd.DataFrame( { "i32": np.arange(size, dtype=np.int32), "f": np.arange(size, dtype=np.float64), } ) dd.from_pandas(df, npartitions=4).to_parquet( str(tmpdir), engine="pyarrow", row_group_size=row_group_size, write_index=False ) # Read back with both `split_row_groups>1` and # `aggregate_files=True` with pytest.warns(FutureWarning, match="argument will be deprecated"): ddf2 = dd.read_parquet( str(tmpdir), engine=engine, split_row_groups=split_row_groups, aggregate_files=True, ) # Check that we are aggregating files as expected npartitions_expected = math.ceil((size / row_group_size) / split_row_groups) assert ddf2.npartitions == npartitions_expected assert len(ddf2) == size assert_eq(df, ddf2, check_index=False) @PYARROW_MARK def test_split_row_groups_filter(tmpdir, engine): tmp = str(tmpdir) df = pd.DataFrame( {"i32": np.arange(800, dtype=np.int32), "f": np.arange(800, dtype=np.float64)} ) df.index.name = "index" search_val = 600 filters = [("f", "==", search_val)] dd.from_pandas(df, npartitions=4).to_parquet( tmp, append=True, engine="pyarrow", row_group_size=50 ) ddf2 = dd.read_parquet(tmp, engine=engine) ddf3 = dd.read_parquet( tmp, engine=engine, calculate_divisions=True, split_row_groups=True, filters=filters, ) assert (ddf3["i32"] == search_val).any().compute() assert_eq( ddf2[ddf2["i32"] == search_val].compute(), ddf3[ddf3["i32"] == search_val].compute(), ) def test_optimize_getitem_and_nonblockwise(tmpdir, engine): path = os.path.join(tmpdir, "path.parquet") df = pd.DataFrame( {"a": [3, 4, 2], "b": [1, 2, 4], "c": [5, 4, 2], "d": [1, 2, 3]}, index=["a", "b", "c"], ) df.to_parquet(path, engine=engine) df2 = dd.read_parquet(path, engine=engine) df2[["a", "b"]].rolling(3).max().compute() def test_optimize_and_not(tmpdir, engine): path = os.path.join(tmpdir, "path.parquet") df = pd.DataFrame( {"a": [3, 4, 2], "b": [1, 2, 4], "c": [5, 4, 2], "d": [1, 2, 3]}, index=["a", "b", "c"], ) df.to_parquet(path, engine=engine) df2 = dd.read_parquet(path, engine=engine) df2a = df2["a"].groupby(df2["c"]).first().to_delayed() df2b = df2["b"].groupby(df2["c"]).first().to_delayed() df2c = df2[["a", "b"]].rolling(2).max().to_delayed() df2d = df2.rolling(2).max().to_delayed() (result,) = dask.compute(df2a + df2b + df2c + df2d) expected = [ dask.compute(df2a)[0][0], dask.compute(df2b)[0][0], dask.compute(df2c)[0][0], dask.compute(df2d)[0][0], ] for a, b in zip(result, expected): assert_eq(a, b) @write_read_engines() def test_chunksize_empty(tmpdir, write_engine, read_engine): df = pd.DataFrame({"a": pd.Series(dtype="int"), "b": pd.Series(dtype="float")}) ddf1 = dd.from_pandas(df, npartitions=1) ddf1.to_parquet(tmpdir, engine=write_engine, write_metadata_file=True) with pytest.warns(FutureWarning, match="argument will be deprecated"): ddf2 = dd.read_parquet(tmpdir, engine=read_engine, chunksize="1MiB") assert_eq(ddf1, ddf2, check_index=False) @PYARROW_MARK @pytest.mark.parametrize("metadata", [True, False]) @pytest.mark.parametrize("partition_on", [None, "a"]) @pytest.mark.parametrize("chunksize", [4096, "1MiB"]) @write_read_engines() def test_chunksize_files( tmpdir, chunksize, partition_on, write_engine, read_engine, metadata ): if partition_on and read_engine == "fastparquet" and not metadata: pytest.skip("Fastparquet requires _metadata for partitioned data.") df_size = 100 df1 = pd.DataFrame( { "a": np.random.choice(["apple", "banana", "carrot"], size=df_size), "b": np.random.random(size=df_size), "c": np.random.randint(1, 5, size=df_size), } ) ddf1 = dd.from_pandas(df1, npartitions=9) ddf1.to_parquet( str(tmpdir), engine=write_engine, partition_on=partition_on, write_metadata_file=metadata, write_index=False, ) with pytest.warns(FutureWarning, match="argument will be deprecated"): ddf2 = dd.read_parquet( str(tmpdir), engine=read_engine, chunksize=chunksize, aggregate_files=partition_on if partition_on else True, ) # Check that files where aggregated as expected if chunksize == 4096: assert ddf2.npartitions < ddf1.npartitions elif chunksize == "1MiB": if partition_on: assert ddf2.npartitions == 3 else: assert ddf2.npartitions == 1 # Check that the final data is correct if partition_on: df2 = ddf2.compute().sort_values(["b", "c"]) df1 = df1.sort_values(["b", "c"]) assert_eq(df1[["b", "c"]], df2[["b", "c"]], check_index=False) else: assert_eq(ddf1, ddf2, check_divisions=False, check_index=False) @write_read_engines() @pytest.mark.parametrize("aggregate_files", ["a", "b"]) def test_chunksize_aggregate_files(tmpdir, write_engine, read_engine, aggregate_files): chunksize = "1MiB" partition_on = ["a", "b"] df_size = 100 df1 = pd.DataFrame( { "a": np.random.choice(["apple", "banana", "carrot"], size=df_size), "b": np.random.choice(["small", "large"], size=df_size), "c": np.random.random(size=df_size), "d": np.random.randint(1, 100, size=df_size), } ) ddf1 = dd.from_pandas(df1, npartitions=9) ddf1.to_parquet( str(tmpdir), engine=write_engine, partition_on=partition_on, write_index=False, ) with pytest.warns(FutureWarning, match="argument will be deprecated"): ddf2 = dd.read_parquet( str(tmpdir), engine=read_engine, chunksize=chunksize, aggregate_files=aggregate_files, ) # Check that files where aggregated as expected if aggregate_files == "a": assert ddf2.npartitions == 3 elif aggregate_files == "b": assert ddf2.npartitions == 6 # Check that the final data is correct df2 = ddf2.compute().sort_values(["c", "d"]) df1 = df1.sort_values(["c", "d"]) assert_eq(df1[["c", "d"]], df2[["c", "d"]], check_index=False) @PYARROW_MARK @pytest.mark.parametrize("metadata", [True, False]) @pytest.mark.parametrize("chunksize", [None, 1024, 4096, "1MiB"]) def test_chunksize(tmpdir, chunksize, engine, metadata): nparts = 2 df_size = 100 row_group_size = 5 df = pd.DataFrame( { "a": np.random.choice(["apple", "banana", "carrot"], size=df_size), "b": np.random.random(size=df_size), "c": np.random.randint(1, 5, size=df_size), "index": np.arange(0, df_size), } ).set_index("index") ddf1 = dd.from_pandas(df, npartitions=nparts) ddf1.to_parquet( str(tmpdir), engine="pyarrow", row_group_size=row_group_size, write_metadata_file=metadata, ) if metadata: path = str(tmpdir) else: dirname = str(tmpdir) files = os.listdir(dirname) assert "_metadata" not in files path = os.path.join(dirname, "*.parquet") with pytest.warns(FutureWarning, match="argument will be deprecated"): ddf2 = dd.read_parquet( path, engine=engine, chunksize=chunksize, split_row_groups=True, calculate_divisions=True, index="index", aggregate_files=True, ) assert_eq(ddf1, ddf2, check_divisions=False) num_row_groups = df_size // row_group_size if not chunksize: assert ddf2.npartitions == num_row_groups else: # Check that we are really aggregating assert ddf2.npartitions < num_row_groups if chunksize == "1MiB": # Largest chunksize will result in # a single output partition assert ddf2.npartitions == 1 @write_read_engines() def test_roundtrip_pandas_chunksize(tmpdir, write_engine, read_engine): path = str(tmpdir.join("test.parquet")) pdf = df.copy() pdf.index.name = "index" pdf.to_parquet( path, engine="pyarrow" if write_engine.startswith("pyarrow") else "fastparquet" ) with pytest.warns(FutureWarning, match="argument will be deprecated"): ddf_read = dd.read_parquet( path, engine=read_engine, chunksize="10 kiB", calculate_divisions=True, split_row_groups=True, index="index", ) assert_eq(pdf, ddf_read) @FASTPARQUET_MARK def test_read_pandas_fastparquet_partitioned(tmpdir, engine): pdf = pd.DataFrame( [{"str": str(i), "int": i, "group": "ABC"[i % 3]} for i in range(6)] ) path = str(tmpdir) pdf.to_parquet(path, partition_cols=["group"], engine="fastparquet") ddf_read = dd.read_parquet(path, engine=engine) assert len(ddf_read["group"].compute()) == 6 assert len(ddf_read.compute().group) == 6 def test_read_parquet_getitem_skip_when_getting_read_parquet(tmpdir, engine): # https://github.com/dask/dask/issues/5893 pdf = pd.DataFrame({"A": [1, 2, 3, 4, 5, 6], "B": ["a", "b", "c", "d", "e", "f"]}) path = os.path.join(str(tmpdir), "data.parquet") pd_engine = "pyarrow" if engine.startswith("pyarrow") else "fastparquet" pdf.to_parquet(path, engine=pd_engine) ddf = dd.read_parquet(path, engine=engine) a, b = dask.optimize(ddf["A"], ddf) # Make sure we are still allowing the getitem optimization ddf = ddf["A"] dsk = optimize_dataframe_getitem(ddf.dask, keys=[(ddf._name, 0)]) read = [key for key in dsk.layers if key.startswith("read-parquet")][0] subgraph = dsk.layers[read] assert isinstance(subgraph, DataFrameIOLayer) assert subgraph.columns == ["A"] @pytest.mark.parametrize("calculate_divisions", [None, True]) @write_read_engines() def test_filter_nonpartition_columns( tmpdir, write_engine, read_engine, calculate_divisions ): tmpdir = str(tmpdir) df_write = pd.DataFrame( { "id": [1, 2, 3, 4] * 4, "time": np.arange(16), "random": np.random.choice(["cat", "dog"], size=16), } ) ddf_write = dd.from_pandas(df_write, npartitions=4) ddf_write.to_parquet( tmpdir, write_index=False, partition_on=["id"], engine=write_engine ) ddf_read = dd.read_parquet( tmpdir, index=False, engine=read_engine, calculate_divisions=calculate_divisions, filters=[(("time", "<", 5))], ) df_read = ddf_read.compute() assert len(df_read) == len(df_read[df_read["time"] < 5]) assert df_read["time"].max() < 5 @PYARROW_MARK def test_pandas_metadata_nullable_pyarrow(tmpdir): tmpdir = str(tmpdir) ddf1 = dd.from_pandas( pd.DataFrame( { "A": pd.array([1, None, 2], dtype="Int64"), "B": pd.array(["dog", "cat", None], dtype="str"), } ), npartitions=1, ) ddf1.to_parquet(tmpdir, engine="pyarrow") ddf2 = dd.read_parquet(tmpdir, engine="pyarrow", calculate_divisions=True) assert_eq(ddf1, ddf2, check_index=False) @PYARROW_MARK def test_pandas_timestamp_overflow_pyarrow(tmpdir): info = np.iinfo(np.dtype("int64")) # In `numpy=1.24.0` NumPy warns when an overflow is encountered when casting from float to int # https://numpy.org/doc/stable/release/1.24.0-notes.html#numpy-now-gives-floating-point-errors-in-casts if _numpy_124: ctx = pytest.warns(RuntimeWarning, match="invalid value encountered in cast") else: ctx = contextlib.nullcontext() with ctx: arr_numeric = np.linspace( start=info.min + 2, stop=info.max, num=1024, dtype="int64" ) arr_dates = arr_numeric.astype("datetime64[ms]") table = pa.Table.from_arrays([pa.array(arr_dates)], names=["ts"]) pa.parquet.write_table( table, f"{tmpdir}/file.parquet", use_deprecated_int96_timestamps=False ) # This will raise by default due to overflow with pytest.raises(pa.lib.ArrowInvalid) as e: dd.read_parquet(str(tmpdir), engine="pyarrow").compute() assert "out of bounds" in str(e.value) from dask.dataframe.io.parquet.arrow import ArrowDatasetEngine as ArrowEngine class ArrowEngineWithTimestampClamp(ArrowEngine): @classmethod def clamp_arrow_datetimes(cls, arrow_table: pa.Table) -> pa.Table: """Constrain datetimes to be valid for pandas Since pandas works in ns precision and arrow / parquet defaults to ms precision we need to clamp our datetimes to something reasonable""" new_columns = [] for col in arrow_table.columns: if pa.types.is_timestamp(col.type) and ( col.type.unit in ("s", "ms", "us") ): multiplier = {"s": 1_0000_000_000, "ms": 1_000_000, "us": 1_000}[ col.type.unit ] original_type = col.type series: pd.Series = col.cast(pa.int64()).to_pandas() info = np.iinfo(np.dtype("int64")) # constrain data to be within valid ranges series.clip( lower=info.min // multiplier + 1, upper=info.max // multiplier, inplace=True, ) new_array = pa.array(series, pa.int64()) new_array = new_array.cast(original_type) new_columns.append(new_array) else: new_columns.append(col) return pa.Table.from_arrays(new_columns, names=arrow_table.column_names) @classmethod def _arrow_table_to_pandas( cls, arrow_table: pa.Table, categories, use_nullable_dtypes=False, **kwargs ) -> pd.DataFrame: fixed_arrow_table = cls.clamp_arrow_datetimes(arrow_table) return super()._arrow_table_to_pandas( fixed_arrow_table, categories, use_nullable_dtypes, **kwargs ) # this should not fail, but instead produce timestamps that are in the valid range dd.read_parquet(str(tmpdir), engine=ArrowEngineWithTimestampClamp).compute() @PYARROW_MARK def test_arrow_to_pandas(tmpdir, engine): # Test that dtypes are correct when arrow_to_pandas is used # (See: https://github.com/dask/dask/issues/9664) df = pd.DataFrame({"A": [pd.Timestamp("2000-01-01")]}) path = str(tmpdir.join("test.parquet")) df.to_parquet(path, engine=engine) arrow_to_pandas = {"timestamp_as_object": True} expect = pq.ParquetFile(path).read().to_pandas(**arrow_to_pandas) got = dd.read_parquet(path, engine="pyarrow", arrow_to_pandas=arrow_to_pandas) assert_eq(expect, got) assert got.A.dtype == got.compute().A.dtype @pytest.mark.parametrize( "write_cols", [["part", "col"], ["part", "kind", "col"]], ) def test_partitioned_column_overlap(tmpdir, engine, write_cols): tmpdir.mkdir("part=a") tmpdir.mkdir("part=b") path0 = str(tmpdir.mkdir("part=a/kind=x")) path1 = str(tmpdir.mkdir("part=b/kind=x")) path0 = os.path.join(path0, "data.parquet") path1 = os.path.join(path1, "data.parquet") _df1 = pd.DataFrame({"part": "a", "kind": "x", "col": range(5)}) _df2 = pd.DataFrame({"part": "b", "kind": "x", "col": range(5)}) df1 = _df1[write_cols] df2 = _df2[write_cols] df1.to_parquet(path0, index=False) df2.to_parquet(path1, index=False) if engine == "fastparquet": path = [path0, path1] else: path = str(tmpdir) expect = pd.concat([_df1, _df2], ignore_index=True) if engine == "fastparquet" and fastparquet_version > parse_version("0.8.3"): # columns will change order and partitions will be categorical result = dd.read_parquet(path, engine=engine) assert result.compute().reset_index(drop=True).to_dict() == expect.to_dict() elif write_cols == ["part", "kind", "col"]: result = dd.read_parquet(path, engine=engine) assert_eq(result, expect, check_index=False) else: # For now, partial overlap between partition columns and # real columns is not allowed for pyarrow or older fastparquet with pytest.raises(ValueError): dd.read_parquet(path, engine=engine) @PYARROW_MARK @pytest.mark.parametrize( "write_cols", [["col"], ["part", "col"]], ) def test_partitioned_no_pandas_metadata(tmpdir, engine, write_cols): # See: https://github.com/dask/dask/issues/8087 # Manually construct directory-partitioned dataset path1 = tmpdir.mkdir("part=a") path2 = tmpdir.mkdir("part=b") path1 = os.path.join(path1, "data.parquet") path2 = os.path.join(path2, "data.parquet") # Write partitions without parquet metadata. # Note that we always use pyarrow to do this # (regardless of the `engine`) _df1 = pd.DataFrame({"part": "a", "col": range(5)}) _df2 = pd.DataFrame({"part": "b", "col": range(5)}) t1 = pa.Table.from_pandas( _df1[write_cols], preserve_index=False, ).replace_schema_metadata(metadata={}) pq.write_table(t1, path1) t2 = pa.Table.from_pandas( _df2[write_cols], preserve_index=False, ).replace_schema_metadata(metadata={}) pq.write_table(t2, path2) # Check results expect = pd.concat([_df1, _df2], ignore_index=True) result = dd.read_parquet(str(tmpdir), engine=engine) result["part"] = result["part"].astype("object") assert_eq(result[list(expect.columns)], expect, check_index=False) @PYARROW_MARK def test_pyarrow_directory_partitioning(tmpdir): # Manually construct directory-partitioned dataset path1 = tmpdir.mkdir("a") path2 = tmpdir.mkdir("b") path1 = os.path.join(path1, "data.parquet") path2 = os.path.join(path2, "data.parquet") _df1 = pd.DataFrame({"part": "a", "col": range(5)}) _df2 = pd.DataFrame({"part": "b", "col": range(5)}) _df1.to_parquet(path1, engine="pyarrow") _df2.to_parquet(path2, engine="pyarrow") # Check results expect = pd.concat([_df1, _df2], ignore_index=True) result = dd.read_parquet( str(tmpdir), engine="pyarrow", dataset={"partitioning": ["part"], "partition_base_dir": str(tmpdir)}, ) result["part"] = result["part"].astype("object") assert_eq(result[list(expect.columns)], expect, check_index=False) @fp_pandas_xfail def test_partitioned_preserve_index(tmpdir, write_engine, read_engine): tmp = str(tmpdir) size = 1_000 npartitions = 4 b = np.arange(npartitions).repeat(size // npartitions) data = pd.DataFrame( { "myindex": np.arange(size), "A": np.random.random(size=size), "B": pd.Categorical(b), } ).set_index("myindex") data.index.name = None df1 = dd.from_pandas(data, npartitions=npartitions) df1.to_parquet(tmp, partition_on="B", engine=write_engine) expect = data[data["B"] == 1] got = dd.read_parquet(tmp, engine=read_engine, filters=[("B", "==", 1)]) assert_eq(expect, got) def test_from_pandas_preserve_none_index(tmpdir, engine): if engine.startswith("pyarrow"): pytest.importorskip("pyarrow", minversion="0.15.0") fn = str(tmpdir.join("test.parquet")) df = pd.DataFrame({"a": [1, 2], "b": [4, 5], "c": [6, 7]}).set_index("c") df.index.name = None df.to_parquet( fn, engine="pyarrow" if engine.startswith("pyarrow") else "fastparquet", index=True, ) expect = pd.read_parquet(fn) got = dd.read_parquet(fn, engine=engine) assert_eq(expect, got) def test_multi_partition_none_index_false(tmpdir, engine): if engine.startswith("pyarrow"): pytest.importorskip("pyarrow", minversion="0.15.0") write_engine = "pyarrow" else: assert engine == "fastparquet" write_engine = "fastparquet" # Write dataset without dask.to_parquet ddf1 = ddf.reset_index(drop=True) for i, part in enumerate(ddf1.partitions): path = tmpdir.join(f"test.{i}.parquet") part.compute().to_parquet(str(path), engine=write_engine) # Read back with index=False ddf2 = dd.read_parquet(str(tmpdir), index=False, engine=engine) assert_eq(ddf1, ddf2) @write_read_engines() def test_from_pandas_preserve_none_rangeindex(tmpdir, write_engine, read_engine): # See GitHub Issue#6348 fn = str(tmpdir.join("test.parquet")) df0 = pd.DataFrame({"t": [1, 2, 3]}, index=pd.RangeIndex(start=1, stop=4)) df0.to_parquet( fn, engine="pyarrow" if write_engine.startswith("pyarrow") else "fastparquet" ) df1 = dd.read_parquet(fn, engine=read_engine) assert_eq(df0, df1.compute()) def test_illegal_column_name(tmpdir, engine): # Make sure user is prevented from preserving a "None" index # name if there is already a column using the special `null_name` null_name = "__null_dask_index__" fn = str(tmpdir.join("test.parquet")) df = pd.DataFrame({"x": [1, 2], null_name: [4, 5]}).set_index("x") df.index.name = None ddf = dd.from_pandas(df, npartitions=2) # If we don't want to preserve the None index name, the # write should work, but the user should be warned with pytest.warns(UserWarning, match=null_name): ddf.to_parquet(fn, engine=engine, write_index=False) # If we do want to preserve the None index name, should # get a ValueError for having an illegal column name with pytest.raises(ValueError) as e: ddf.to_parquet(fn, engine=engine) assert null_name in str(e.value) def test_divisions_with_null_partition(tmpdir, engine): df = pd.DataFrame({"a": [1, 2, None, None], "b": [1, 2, 3, 4]}) ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(str(tmpdir), engine=engine, write_index=False) ddf_read = dd.read_parquet(str(tmpdir), engine=engine, index="a") assert ddf_read.divisions == (None, None, None) @PYARROW_MARK def test_pyarrow_dataset_simple(tmpdir, engine): fn = str(tmpdir) df = pd.DataFrame({"a": [4, 5, 6], "b": ["a", "b", "b"]}) df = df.set_index("a", drop=True) ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(fn, engine=engine) read_df = dd.read_parquet(fn, engine="pyarrow", calculate_divisions=True) read_df.compute() assert_eq(ddf, read_df) @PYARROW_MARK @pytest.mark.parametrize("test_filter", [True, False]) def test_pyarrow_dataset_partitioned(tmpdir, engine, test_filter): fn = str(tmpdir) df = pd.DataFrame({"a": [4, 5, 6], "b": ["a", "b", "b"]}) df["b"] = df["b"].astype("category") ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(fn, engine=engine, partition_on="b", write_metadata_file=True) read_df = dd.read_parquet( fn, engine="pyarrow", filters=[("b", "==", "a")] if test_filter else None, calculate_divisions=True, ) if test_filter: assert_eq(ddf[ddf["b"] == "a"].compute(), read_df.compute()) else: assert_eq(ddf, read_df) @PYARROW_MARK def test_pyarrow_dataset_read_from_paths(tmpdir): fn = str(tmpdir) df = pd.DataFrame({"a": [4, 5, 6], "b": ["a", "b", "b"]}) df["b"] = df["b"].astype("category") ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(fn, engine="pyarrow", partition_on="b") with pytest.warns(FutureWarning): read_df_1 = dd.read_parquet( fn, engine="pyarrow", filters=[("b", "==", "a")], read_from_paths=False, ) read_df_2 = dd.read_parquet( fn, engine="pyarrow", filters=[("b", "==", "a")], ) assert_eq(read_df_1, read_df_2) assert_eq(ddf[ddf["b"] == "a"].compute(), read_df_2.compute()) @PYARROW_MARK @pytest.mark.parametrize("split_row_groups", [True, False]) def test_pyarrow_dataset_filter_partitioned(tmpdir, split_row_groups): fn = str(tmpdir) df = pd.DataFrame( { "a": [4, 5, 6], "b": ["a", "b", "b"], "c": ["A", "B", "B"], } ) df["b"] = df["b"].astype("category") ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(fn, engine="pyarrow", partition_on=["b", "c"]) # Filter on a a non-partition column read_df = dd.read_parquet( fn, engine="pyarrow", split_row_groups=split_row_groups, filters=[("a", "==", 5)], ) assert_eq( read_df.compute()[["a"]], df[df["a"] == 5][["a"]], check_index=False, ) def test_pyarrow_dataset_filter_on_partitioned(tmpdir, engine): # See: https://github.com/dask/dask/issues/9246 df = pd.DataFrame({"val": range(7), "part": list("abcdefg")}) ddf = dd.from_map( lambda i: df.iloc[i : i + 1], range(7), ) ddf.to_parquet(tmpdir, engine=engine, partition_on=["part"]) # Check that List[Tuple] filters are applied read_ddf = dd.read_parquet( tmpdir, engine=engine, filters=[("part", "==", "c")], ) read_ddf["part"] = read_ddf["part"].astype("object") assert_eq(df.iloc[2:3], read_ddf) # Check that List[List[Tuple]] filters are aplied. # (fastparquet doesn't support this format) if engine == "pyarrow": read_ddf = dd.read_parquet( tmpdir, engine=engine, filters=[[("part", "==", "c")]], ) read_ddf["part"] = read_ddf["part"].astype("object") assert_eq(df.iloc[2:3], read_ddf) @PYARROW_MARK def test_parquet_pyarrow_write_empty_metadata(tmpdir): # https://github.com/dask/dask/issues/6600 tmpdir = str(tmpdir) df_a = dask.delayed(pd.DataFrame.from_dict)( {"x": [], "y": []}, dtype=("int", "int") ) df_b = dask.delayed(pd.DataFrame.from_dict)( {"x": [1, 1, 2, 2], "y": [1, 0, 1, 0]}, dtype=("int64", "int64") ) df_c = dask.delayed(pd.DataFrame.from_dict)( {"x": [1, 2, 1, 2], "y": [1, 0, 1, 0]}, dtype=("int64", "int64") ) df = dd.from_delayed([df_a, df_b, df_c]) df.to_parquet( tmpdir, engine="pyarrow", partition_on=["x"], append=False, write_metadata_file=True, ) # Check that metadata files where written files = os.listdir(tmpdir) assert "_metadata" in files assert "_common_metadata" in files # Check that the schema includes pandas_metadata schema_common = pq.ParquetFile( os.path.join(tmpdir, "_common_metadata") ).schema.to_arrow_schema() pandas_metadata = schema_common.pandas_metadata assert pandas_metadata assert pandas_metadata.get("index_columns", False) @PYARROW_MARK def test_parquet_pyarrow_write_empty_metadata_append(tmpdir): # https://github.com/dask/dask/issues/6600 tmpdir = str(tmpdir) df_a = dask.delayed(pd.DataFrame.from_dict)( {"x": [1, 1, 2, 2], "y": [1, 0, 1, 0]}, dtype=("int64", "int64") ) df_b = dask.delayed(pd.DataFrame.from_dict)( {"x": [1, 2, 1, 2], "y": [2, 0, 2, 0]}, dtype=("int64", "int64") ) df1 = dd.from_delayed([df_a, df_b]) df1.to_parquet( tmpdir, engine="pyarrow", partition_on=["x"], append=False, write_metadata_file=True, ) df_c = dask.delayed(pd.DataFrame.from_dict)( {"x": [], "y": []}, dtype=("int64", "int64") ) df_d = dask.delayed(pd.DataFrame.from_dict)( {"x": [3, 3, 4, 4], "y": [1, 0, 1, 0]}, dtype=("int64", "int64") ) df2 = dd.from_delayed([df_c, df_d]) df2.to_parquet( tmpdir, engine="pyarrow", partition_on=["x"], append=True, ignore_divisions=True, write_metadata_file=True, ) @PYARROW_MARK @pytest.mark.parametrize("partition_on", [None, "a"]) @write_read_engines() def test_create_metadata_file(tmpdir, write_engine, read_engine, partition_on): tmpdir = str(tmpdir) # Write ddf without a _metadata file df1 = pd.DataFrame({"b": range(100), "a": ["A", "B", "C", "D"] * 25}) df1.index.name = "myindex" ddf1 = dd.from_pandas(df1, npartitions=10) ddf1.to_parquet( tmpdir, write_metadata_file=False, partition_on=partition_on, engine=write_engine, ) # Add global _metadata file if partition_on: fns = glob.glob(os.path.join(tmpdir, partition_on + "=*/*.parquet")) else: fns = glob.glob(os.path.join(tmpdir, "*.parquet")) dd.io.parquet.create_metadata_file( fns, engine="pyarrow", split_every=3, # Force tree reduction ) # Check that we can now read the ddf # with the _metadata file present ddf2 = dd.read_parquet( tmpdir, calculate_divisions=True, split_row_groups=False, engine=read_engine, index="myindex", # python-3.6 CI ) if partition_on: ddf1 = df1.sort_values("b") ddf2 = ddf2.compute().sort_values("b") ddf2.a = ddf2.a.astype("object") assert_eq(ddf1, ddf2) # Check if we can avoid writing an actual file fmd = dd.io.parquet.create_metadata_file( fns, engine="pyarrow", split_every=3, # Force tree reduction out_dir=False, # Avoid writing file ) # Check that the in-memory metadata is the same as # the metadata in the file. fmd_file = pq.ParquetFile(os.path.join(tmpdir, "_metadata")).metadata assert fmd.num_rows == fmd_file.num_rows assert fmd.num_columns == fmd_file.num_columns assert fmd.num_row_groups == fmd_file.num_row_groups def test_read_write_overwrite_is_true(tmpdir, engine): # https://github.com/dask/dask/issues/6824 # Create a Dask DataFrame if size (100, 10) with 5 partitions and write to local ddf = dd.from_pandas( pd.DataFrame( np.random.randint(low=0, high=100, size=(100, 10)), columns=["A", "B", "C", "D", "E", "F", "G", "H", "I", "J"], ), npartitions=5, ) ddf = ddf.reset_index(drop=True) dd.to_parquet(ddf, tmpdir, engine=engine, overwrite=True) # Keep the contents of the DataFrame constant but change the # of partitions ddf2 = ddf.repartition(npartitions=3) # Overwrite the existing Dataset with the new dataframe and evaluate # the number of files against the number of dask partitions dd.to_parquet(ddf2, tmpdir, engine=engine, overwrite=True) # Assert the # of files written are identical to the number of # Dask DataFrame partitions (we exclude _metadata and _common_metadata) files = os.listdir(tmpdir) files = [f for f in files if f not in ["_common_metadata", "_metadata"]] assert len(files) == ddf2.npartitions def test_read_write_partition_on_overwrite_is_true(tmpdir, engine): # https://github.com/dask/dask/issues/6824 from pathlib import Path # Create a Dask DataFrame with 5 partitions and write to local, partitioning on the column A and column B df = pd.DataFrame( np.vstack( ( np.full((50, 3), 0), np.full((50, 3), 1), np.full((20, 3), 2), ) ) ) df.columns = ["A", "B", "C"] ddf = dd.from_pandas(df, npartitions=5) dd.to_parquet(ddf, tmpdir, engine=engine, partition_on=["A", "B"], overwrite=True) # Get the total number of files and directories from the original write files_ = Path(tmpdir).rglob("*") files = [f.as_posix() for f in files_] # Keep the contents of the DataFrame constant but change the # of partitions ddf2 = ddf.repartition(npartitions=3) # Overwrite the existing Dataset with the new dataframe and evaluate # the number of files against the number of dask partitions # Get the total number of files and directories from the original write dd.to_parquet(ddf2, tmpdir, engine=engine, partition_on=["A", "B"], overwrite=True) files2_ = Path(tmpdir).rglob("*") files2 = [f.as_posix() for f in files2_] # After reducing the # of partitions and overwriting, we expect # there to be fewer total files than were originally written assert len(files2) < len(files) def test_to_parquet_overwrite_raises(tmpdir, engine): # https://github.com/dask/dask/issues/6824 # Check that overwrite=True will raise an error if the # specified path is the current working directory df = pd.DataFrame({"a": range(12)}) ddf = dd.from_pandas(df, npartitions=3) with pytest.raises(ValueError): dd.to_parquet(ddf, "./", engine=engine, overwrite=True) with pytest.raises(ValueError): dd.to_parquet(ddf, tmpdir, engine=engine, append=True, overwrite=True) def test_to_parquet_overwrite_files_from_read_parquet_in_same_call_raises( tmpdir, engine ): subdir = tmpdir.mkdir("subdir") dd.from_pandas(pd.DataFrame({"x": range(20)}), npartitions=2).to_parquet( subdir, engine=engine ) ddf = dd.read_parquet(subdir) # Test writing to the same files, as well as a parent directory for target in [subdir, tmpdir]: with pytest.raises(ValueError, match="same parquet file"): ddf.to_parquet(target, overwrite=True) ddf2 = ddf.assign(y=ddf.x + 1) with pytest.raises(ValueError, match="same parquet file"): ddf2.to_parquet(target, overwrite=True) def test_to_parquet_errors_non_string_column_names(tmpdir, engine): df = pd.DataFrame({"x": range(10), 1: range(10)}) ddf = dd.from_pandas(df, npartitions=2) with pytest.raises(ValueError, match="non-string column names"): ddf.to_parquet(str(tmpdir.join("temp")), engine=engine) def test_dir_filter(tmpdir, engine): # github #6898 df = pd.DataFrame.from_dict( { "A": { 0: 351.0, 1: 355.0, 2: 358.0, 3: 266.0, 4: 266.0, 5: 268.0, 6: np.nan, }, "B": { 0: 2063.0, 1: 2051.0, 2: 1749.0, 3: 4281.0, 4: 3526.0, 5: 3462.0, 6: np.nan, }, "year": {0: 2019, 1: 2019, 2: 2020, 3: 2020, 4: 2020, 5: 2020, 6: 2020}, } ) ddf = dask.dataframe.from_pandas(df, npartitions=1) ddf.to_parquet(tmpdir, partition_on="year", engine=engine) ddf2 = dd.read_parquet(tmpdir, filters=[("year", "==", 2020)], engine=engine) ddf2["year"] = ddf2.year.astype("int64") assert_eq(ddf2, df[df.year == 2020]) @PYARROW_MARK def test_roundtrip_decimal_dtype(tmpdir): # https://github.com/dask/dask/issues/6948 tmpdir = str(tmpdir) data = [ { "ts": pd.to_datetime("2021-01-01", utc="Europe/Berlin"), "col1": Decimal("123.00"), } for i in range(23) ] ddf1 = dd.from_pandas(pd.DataFrame(data), npartitions=1) ddf1.to_parquet(path=tmpdir, engine="pyarrow", schema={"col1": pa.decimal128(5, 2)}) ddf2 = dd.read_parquet(tmpdir, engine="pyarrow") assert ddf1["col1"].dtype == ddf2["col1"].dtype assert_eq(ddf1, ddf2, check_divisions=False) def test_roundtrip_rename_columns(tmpdir, engine): # https://github.com/dask/dask/issues/7017 path = os.path.join(str(tmpdir), "test.parquet") df1 = pd.DataFrame(columns=["a", "b", "c"], data=np.random.uniform(size=(10, 3))) df1.to_parquet(path) # read it with dask and rename columns ddf2 = dd.read_parquet(path, engine=engine) ddf2.columns = ["d", "e", "f"] df1.columns = ["d", "e", "f"] assert_eq(df1, ddf2.compute()) def test_custom_metadata(tmpdir, engine): # Write a parquet dataset with custom metadata # Define custom metadata custom_metadata = {b"my_key": b"my_data"} # Write parquet dataset path = str(tmpdir) df = pd.DataFrame({"a": range(10), "b": range(10)}) dd.from_pandas(df, npartitions=2).to_parquet( path, engine=engine, custom_metadata=custom_metadata, write_metadata_file=True, ) # Check that data is correct assert_eq(df, dd.read_parquet(path, engine=engine)) # Require pyarrow.parquet to check key/value metadata if pq: # Read footer metadata and _metadata. # Check that it contains keys/values from `custom_metadata` files = glob.glob(os.path.join(path, "*.parquet")) files += [os.path.join(path, "_metadata")] for fn in files: _md = pq.ParquetFile(fn).metadata.metadata for k in custom_metadata.keys(): assert _md[k] == custom_metadata[k] # Make sure we raise an error if the custom metadata # includes a b"pandas" key custom_metadata = {b"pandas": b"my_new_pandas_md"} with pytest.raises(ValueError) as e: dd.from_pandas(df, npartitions=2).to_parquet( path, engine=engine, custom_metadata=custom_metadata, ) assert "User-defined key/value" in str(e.value) @pytest.mark.parametrize("calculate_divisions", [True, False, None]) def test_ignore_metadata_file(tmpdir, engine, calculate_divisions): tmpdir = str(tmpdir) dataset_with_bad_metadata = os.path.join(tmpdir, "data1") dataset_without_metadata = os.path.join(tmpdir, "data2") # Write two identical datasets without any _metadata file df1 = pd.DataFrame({"a": range(100), "b": ["dog", "cat"] * 50}) ddf1 = dd.from_pandas(df1, npartitions=2) ddf1.to_parquet( path=dataset_with_bad_metadata, engine=engine, write_metadata_file=False ) ddf1.to_parquet( path=dataset_without_metadata, engine=engine, write_metadata_file=False ) # Copy "bad" metadata into `dataset_with_bad_metadata` assert "_metadata" not in os.listdir(dataset_with_bad_metadata) with open(os.path.join(dataset_with_bad_metadata, "_metadata"), "w") as f: f.write("INVALID METADATA") assert "_metadata" in os.listdir(dataset_with_bad_metadata) assert "_metadata" not in os.listdir(dataset_without_metadata) # Read back the datasets with `ignore_metadata_file=True`, and # test that the results are the same ddf2a = dd.read_parquet( dataset_with_bad_metadata, engine=engine, ignore_metadata_file=True, calculate_divisions=calculate_divisions, ) ddf2b = dd.read_parquet( dataset_without_metadata, engine=engine, ignore_metadata_file=True, calculate_divisions=calculate_divisions, ) assert_eq(ddf2a, ddf2b) @pytest.mark.parametrize("write_metadata_file", [True, False]) @pytest.mark.parametrize("metadata_task_size", [2, 0]) def test_metadata_task_size(tmpdir, engine, write_metadata_file, metadata_task_size): # Write simple dataset tmpdir = str(tmpdir) df1 = pd.DataFrame({"a": range(100), "b": ["dog", "cat"] * 50}) ddf1 = dd.from_pandas(df1, npartitions=10) ddf1.to_parquet( path=str(tmpdir), engine=engine, write_metadata_file=write_metadata_file ) # Read back ddf2a = dd.read_parquet( str(tmpdir), engine=engine, calculate_divisions=True, ) ddf2b = dd.read_parquet( str(tmpdir), engine=engine, calculate_divisions=True, metadata_task_size=metadata_task_size, ) assert_eq(ddf2a, ddf2b) with dask.config.set( {"dataframe.parquet.metadata-task-size-local": metadata_task_size} ): ddf2c = dd.read_parquet( str(tmpdir), engine=engine, calculate_divisions=True, ) assert_eq(ddf2b, ddf2c) @PYARROW_MARK @pytest.mark.parametrize("partition_on", ("b", None)) def test_extra_file(tmpdir, engine, partition_on): # Check that read_parquet can handle spark output # See: https://github.com/dask/dask/issues/8087 tmpdir = str(tmpdir) df = pd.DataFrame({"a": range(100), "b": ["dog", "cat"] * 50}) df = df.assign(b=df.b.astype("category")) ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet( tmpdir, engine=engine, write_metadata_file=True, partition_on=partition_on, ) open(os.path.join(tmpdir, "_SUCCESS"), "w").close() open(os.path.join(tmpdir, "part.0.parquet.crc"), "w").close() os.remove(os.path.join(tmpdir, "_metadata")) out = dd.read_parquet(tmpdir, engine=engine, calculate_divisions=True) # Weird two-step since that we don't care if category ordering changes assert_eq(out, df, check_categorical=False) assert_eq(out.b, df.b, check_category_order=False) # For "fastparquet" and "pyarrow", we can pass the # expected file extension, or avoid checking file extensions # by passing False. Check here that this works: def _parquet_file_extension(val, legacy=False): # This function allows us to switch between "new" # and "legacy" parquet_file_extension behavior return ( {"dataset": {"require_extension": val}} if legacy else {"parquet_file_extension": val} ) # Should Work out = dd.read_parquet( tmpdir, engine=engine, **_parquet_file_extension(".parquet"), calculate_divisions=True, ) # Weird two-step since that we don't care if category ordering changes assert_eq(out, df, check_categorical=False) assert_eq(out.b, df.b, check_category_order=False) # Should Work (with FutureWarning) with pytest.warns(FutureWarning, match="require_extension is deprecated"): out = dd.read_parquet( tmpdir, engine=engine, **_parquet_file_extension(".parquet", legacy=True), calculate_divisions=True, ) # Should Fail (for not capturing the _SUCCESS and crc files) with pytest.raises((OSError, pa.lib.ArrowInvalid)): dd.read_parquet( tmpdir, engine=engine, **_parquet_file_extension(None) ).compute() # Should Fail (for filtering out all files) # (Related to: https://github.com/dask/dask/issues/8349) with pytest.raises(ValueError): dd.read_parquet( tmpdir, engine=engine, **_parquet_file_extension(".foo") ).compute() def test_unsupported_extension_file(tmpdir, engine): # File extension shouldn't matter when we are only # reading a single file. # (See: https://github.com/dask/dask/issues/8349) fn = os.path.join(str(tmpdir), "multi.foo") df0 = pd.DataFrame({"a": range(10)}) df0.to_parquet(fn, engine=engine) assert_eq( df0, dd.read_parquet(fn, engine=engine, index=False, calculate_divisions=True) ) def test_unsupported_extension_dir(tmpdir, engine): # File extensions shouldn't matter when we have # a _metadata file # (Related to: https://github.com/dask/dask/issues/8349) path = str(tmpdir) ddf0 = dd.from_pandas(pd.DataFrame({"a": range(10)}), 1) ddf0.to_parquet( path, engine=engine, name_function=lambda i: f"part.{i}.foo", write_metadata_file=True, ) assert_eq(ddf0, dd.read_parquet(path, engine=engine, calculate_divisions=True)) def test_custom_filename(tmpdir, engine): fn = str(tmpdir) pdf = pd.DataFrame( {"num1": [1, 2, 3, 4], "num2": [7, 8, 9, 10]}, ) df = dd.from_pandas(pdf, npartitions=2) df.to_parquet( fn, write_metadata_file=True, name_function=lambda x: f"hi-{x}.parquet", engine=engine, ) files = os.listdir(fn) assert "_common_metadata" in files assert "_metadata" in files assert "hi-0.parquet" in files assert "hi-1.parquet" in files assert_eq(df, dd.read_parquet(fn, engine=engine, calculate_divisions=True)) def test_custom_filename_works_with_pyarrow_when_append_is_true(tmpdir, engine): fn = str(tmpdir) pdf = pd.DataFrame( {"num1": [1, 2, 3, 4], "num2": [7, 8, 9, 10]}, ) df = dd.from_pandas(pdf, npartitions=2) df.to_parquet( fn, write_metadata_file=True, name_function=lambda x: f"hi-{x * 2}.parquet", engine=engine, ) pdf = pd.DataFrame( {"num1": [33], "num2": [44]}, ) df = dd.from_pandas(pdf, npartitions=1) if engine == "fastparquet": pytest.xfail( "fastparquet errors our with IndexError when ``name_function`` is customized " "and append is set to True. We didn't do a detailed investigation for expediency. " "See this comment for the conversation: https://github.com/dask/dask/pull/7682#issuecomment-845243623" ) df.to_parquet( fn, name_function=lambda x: f"hi-{x * 2}.parquet", engine=engine, append=True, ignore_divisions=True, ) files = os.listdir(fn) assert "_common_metadata" in files assert "_metadata" in files assert "hi-0.parquet" in files assert "hi-2.parquet" in files assert "hi-4.parquet" in files expected_pdf = pd.DataFrame( {"num1": [1, 2, 3, 4, 33], "num2": [7, 8, 9, 10, 44]}, ) actual = dd.read_parquet(fn, engine=engine, index=False) assert_eq(actual, expected_pdf, check_index=False) def test_throws_error_if_custom_filename_is_invalid(tmpdir, engine): fn = str(tmpdir) pdf = pd.DataFrame( {"num1": [1, 2, 3, 4], "num2": [7, 8, 9, 10]}, ) df = dd.from_pandas(pdf, npartitions=2) with pytest.raises( ValueError, match="``name_function`` must be a callable with one argument." ): df.to_parquet(fn, name_function="whatever.parquet", engine=engine) with pytest.raises( ValueError, match="``name_function`` must produce unique filenames." ): df.to_parquet(fn, name_function=lambda x: "whatever.parquet", engine=engine) def test_custom_filename_with_partition(tmpdir, engine): fn = str(tmpdir) pdf = pd.DataFrame( { "first_name": ["frank", "li", "marcela", "luis"], "country": ["canada", "china", "venezuela", "venezuela"], }, ) df = dd.from_pandas(pdf, npartitions=4) df.to_parquet( fn, engine=engine, partition_on=["country"], name_function=lambda x: f"{x}-cool.parquet", write_index=False, ) for _, dirs, files in os.walk(fn): for dir in dirs: assert dir in ( "country=canada", "country=china", "country=venezuela", ) for file in files: assert file in ( *[f"{i}-cool.parquet" for i in range(df.npartitions)], "_common_metadata", "_metadata", ) actual = dd.read_parquet(fn, engine=engine, index=False) assert_eq( pdf, actual, check_index=False, check_dtype=False, check_categorical=False ) @PYARROW_MARK @pytest.mark.skipif( pa_version < parse_version("5.0"), reason="pyarrow write_dataset was added in version 5.0", ) def test_roundtrip_partitioned_pyarrow_dataset(tmpdir, engine): # See: https://github.com/dask/dask/issues/8650 import pyarrow.parquet as pq from pyarrow.dataset import HivePartitioning, write_dataset # Sample data df = pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]}) # Write partitioned dataset with dask dask_path = tmpdir.mkdir("foo-dask") ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(dask_path, engine=engine, partition_on=["col1"], write_index=False) # Write partitioned dataset with pyarrow pa_path = tmpdir.mkdir("foo-pyarrow") table = pa.Table.from_pandas(df) write_dataset( data=table, base_dir=pa_path, basename_template="part.{i}.parquet", format="parquet", partitioning=HivePartitioning(pa.schema([("col1", pa.int32())])), ) # Define simple function to ensure results should # be comparable (same column and row order) def _prep(x): return x.sort_values("col2")[["col1", "col2"]] # Check that reading dask-written data is the same for pyarrow and dask df_read_dask = dd.read_parquet(dask_path, engine=engine) df_read_pa = pq.read_table(dask_path).to_pandas() assert_eq(_prep(df_read_dask), _prep(df_read_pa), check_index=False) # Check that reading pyarrow-written data is the same for pyarrow and dask df_read_dask = dd.read_parquet(pa_path, engine=engine) df_read_pa = pq.read_table(pa_path).to_pandas() assert_eq(_prep(df_read_dask), _prep(df_read_pa), check_index=False) @pytest.mark.parametrize("filter_value", ({1}, [1], (1,)), ids=("set", "list", "tuple")) def test_in_predicate_can_use_iterables(tmp_path, engine, filter_value): """Regression test for https://github.com/dask/dask/issues/8720""" path = tmp_path / "in_predicate_iterable_pandas.parquet" df = pd.DataFrame( {"A": [1, 2, 3, 4], "B": [1, 1, 2, 2]}, ) df.to_parquet(path, engine=engine) filters = [("B", "in", filter_value)] result = dd.read_parquet(path, engine=engine, filters=filters) expected = pd.read_parquet(path, engine=engine, filters=filters) assert_eq(result, expected) # pandas to_parquet outputs a single file, dask outputs a folder with global # metadata that changes the filtering code path ddf = dd.from_pandas(df, npartitions=2) path = tmp_path / "in_predicate_iterable_dask.parquet" ddf.to_parquet(path, engine=engine) result = dd.read_parquet(path, engine=engine, filters=filters) expected = pd.read_parquet(path, engine=engine, filters=filters) assert_eq(result, expected, check_index=False) # Non-iterable filter value with `in` predicate # Test single nested and double nested lists of filters, as well as having multiple # filters to test against. @pytest.mark.parametrize( "filter_value", ( [("B", "in", 10)], [[("B", "in", 10)]], [("B", "<", 10), ("B", "in", 10)], [[("B", "<", 10), ("B", "in", 10)]], ), ids=( "one-item-single-nest", "one-item-double-nest", "two-item-double-nest", "two-item-two-nest", ), ) def test_in_predicate_requires_an_iterable(tmp_path, engine, filter_value): """Regression test for https://github.com/dask/dask/issues/8720""" path = tmp_path / "gh_8720_pandas.parquet" df = pd.DataFrame( {"A": [1, 2, 3, 4], "B": [1, 1, 2, 2]}, ) df.to_parquet(path, engine=engine) with pytest.raises(TypeError, match="Value of 'in' filter"): dd.read_parquet(path, engine=engine, filters=filter_value) # pandas to_parquet outputs a single file, dask outputs a folder with global # metadata that changes the filtering code path ddf = dd.from_pandas(df, npartitions=2) path = tmp_path / "gh_8720_dask.parquet" ddf.to_parquet(path, engine=engine) with pytest.raises(TypeError, match="Value of 'in' filter"): dd.read_parquet(path, engine=engine, filters=filter_value) def test_deprecate_gather_statistics(tmp_path, engine): # The `gather_statistics` deprecation warning # (and this test) should be removed after a # "sufficient" deprecation period. # See: https://github.com/dask/dask/pull/8992 df = pd.DataFrame({"a": range(10)}) path = tmp_path / "test_deprecate_gather_statistics.parquet" df.to_parquet(path, engine=engine) with pytest.warns(FutureWarning, match="deprecated"): out = dd.read_parquet( path, engine=engine, gather_statistics=True, ) assert_eq(out, df) @pytest.mark.gpu def test_gpu_write_parquet_simple(tmpdir): fn = str(tmpdir) cudf = pytest.importorskip("cudf") dask_cudf = pytest.importorskip("dask_cudf") from dask.dataframe.dispatch import pyarrow_schema_dispatch @pyarrow_schema_dispatch.register((cudf.DataFrame,)) def get_pyarrow_schema_cudf(obj): return obj.to_arrow().schema df = cudf.DataFrame( { "a": ["abc", "def"], "b": ["a", "z"], } ) ddf = dask_cudf.from_cudf(df, 3) ddf.to_parquet(fn) got = dask_cudf.read_parquet(fn) assert_eq(df, got) @PYARROW_MARK def test_retries_on_remote_filesystem(tmpdir): # Fake a remote filesystem with a cached one fn = str(tmpdir) remote_fn = f"simplecache://{tmpdir}" storage_options = {"target_protocol": "file"} df = pd.DataFrame({"a": range(10)}) ddf = dd.from_pandas(df, npartitions=2) ddf.to_parquet(fn) # Check that we set retries for reading and writing to parquet when not otherwise set scalar = ddf.to_parquet(remote_fn, compute=False, storage_options=storage_options) layer = hlg_layer(scalar.dask, "to-parquet") assert layer.annotations assert layer.annotations["retries"] == 5 ddf2 = dd.read_parquet(remote_fn, storage_options=storage_options) layer = hlg_layer(ddf2.dask, "read-parquet") assert layer.annotations assert layer.annotations["retries"] == 5 # But not for a local filesystem scalar = ddf.to_parquet(fn, compute=False, storage_options=storage_options) layer = hlg_layer(scalar.dask, "to-parquet") assert not layer.annotations ddf2 = dd.read_parquet(fn, storage_options=storage_options) layer = hlg_layer(ddf2.dask, "read-parquet") assert not layer.annotations # And we don't overwrite existing retries with dask.annotate(retries=2): scalar = ddf.to_parquet( remote_fn, compute=False, storage_options=storage_options ) layer = hlg_layer(scalar.dask, "to-parquet") assert layer.annotations assert layer.annotations["retries"] == 2 ddf2 = dd.read_parquet(remote_fn, storage_options=storage_options) layer = hlg_layer(ddf2.dask, "read-parquet") assert layer.annotations assert layer.annotations["retries"] == 2 @pytest.mark.parametrize("fs", ["fsspec", None]) def test_filesystem_option(tmp_path, engine, fs): from fsspec.implementations.local import LocalFileSystem df = pd.DataFrame({"a": range(10)}) dd.from_pandas(df, npartitions=2).to_parquet(tmp_path, engine=engine) filesystem = fs or LocalFileSystem() ddf = dd.read_parquet( tmp_path, engine=engine, filesystem=filesystem, ) if fs is None: layer_fs = next(iter(ddf.dask.layers.values())).io_func.fs assert layer_fs is filesystem assert_eq(ddf, df) @PYARROW_MARK @pytest.mark.parametrize("fs", ["arrow", None]) def test_pyarrow_filesystem_option(tmp_path, fs): from fsspec.implementations.arrow import ArrowFSWrapper from pyarrow.fs import LocalFileSystem df = pd.DataFrame({"a": range(10)}) dd.from_pandas(df, npartitions=2).to_parquet(tmp_path) fs = fs or LocalFileSystem() ddf = dd.read_parquet( tmp_path, engine="pyarrow", filesystem=fs, ) layer_fs = next(iter(ddf.dask.layers.values())).io_func.fs assert isinstance(layer_fs, ArrowFSWrapper) assert isinstance(layer_fs.fs, LocalFileSystem) assert_eq(ddf, df) def test_select_filtered_column(tmp_path, engine): df = pd.DataFrame({"a": range(10), "b": ["cat"] * 10}) path = tmp_path / "test_select_filtered_column.parquet" df.to_parquet(path, index=False) with pytest.warns(UserWarning, match="Sorted columns detected"): ddf = dd.read_parquet(path, engine=engine, filters=[("b", "==", "cat")]) assert_eq(df, ddf)