from datetime import datetime import numpy as np import pandas as pd import pytest import dask.array as da import dask.dataframe as dd from dask import config from dask.blockwise import Blockwise from dask.dataframe._compat import tm from dask.dataframe.io.io import _meta_from_array from dask.dataframe.optimize import optimize from dask.dataframe.utils import assert_eq from dask.delayed import Delayed, delayed from dask.utils_test import hlg_layer_topological ########## # Arrays # ########## def test_meta_from_array(): x = np.array([[1, 2], [3, 4]], dtype=np.int64) res = _meta_from_array(x) assert isinstance(res, pd.DataFrame) assert res[0].dtype == np.int64 assert res[1].dtype == np.int64 tm.assert_index_equal(res.columns, pd.Index([0, 1])) x = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float64) res = _meta_from_array(x, columns=["a", "b"]) assert isinstance(res, pd.DataFrame) assert res["a"].dtype == np.float64 assert res["b"].dtype == np.float64 tm.assert_index_equal(res.columns, pd.Index(["a", "b"])) with pytest.raises(ValueError): _meta_from_array(x, columns=["a", "b", "c"]) np.random.seed(42) x = np.random.rand(201, 2) x = dd.from_array(x, chunksize=50, columns=["a", "b"]) assert len(x.divisions) == 6 # Should be 5 partitions and the end def test_meta_from_1darray(): x = np.array([1.0, 2.0, 3.0], dtype=np.float64) res = _meta_from_array(x) assert isinstance(res, pd.Series) assert res.dtype == np.float64 x = np.array([1, 2, 3], dtype=np.object_) res = _meta_from_array(x, columns="x") assert isinstance(res, pd.Series) assert res.name == "x" assert res.dtype == np.object_ x = np.array([1, 2, 3], dtype=np.object_) res = _meta_from_array(x, columns=["x"]) assert isinstance(res, pd.DataFrame) assert res["x"].dtype == np.object_ tm.assert_index_equal(res.columns, pd.Index(["x"])) with pytest.raises(ValueError): _meta_from_array(x, columns=["a", "b"]) def test_meta_from_recarray(): x = np.array( [(i, i * 10) for i in range(10)], dtype=[("a", np.float64), ("b", np.int64)] ) res = _meta_from_array(x) assert isinstance(res, pd.DataFrame) assert res["a"].dtype == np.float64 assert res["b"].dtype == np.int64 tm.assert_index_equal(res.columns, pd.Index(["a", "b"])) res = _meta_from_array(x, columns=["b", "a"]) assert isinstance(res, pd.DataFrame) assert res["a"].dtype == np.float64 assert res["b"].dtype == np.int64 tm.assert_index_equal(res.columns, pd.Index(["b", "a"])) with pytest.raises(ValueError): _meta_from_array(x, columns=["a", "b", "c"]) def test_from_array(): x = np.arange(10 * 3).reshape(10, 3) d = dd.from_array(x, chunksize=4) assert isinstance(d, dd.DataFrame) tm.assert_index_equal(d.columns, pd.Index([0, 1, 2])) assert d.divisions == (0, 4, 8, 9) assert (d.compute().values == x).all() d = dd.from_array(x, chunksize=4, columns=list("abc")) assert isinstance(d, dd.DataFrame) tm.assert_index_equal(d.columns, pd.Index(["a", "b", "c"])) assert d.divisions == (0, 4, 8, 9) assert (d.compute().values == x).all() with pytest.raises(ValueError): dd.from_array(np.ones(shape=(10, 10, 10))) def test_from_array_with_record_dtype(): x = np.array([(i, i * 10) for i in range(10)], dtype=[("a", "i4"), ("b", "i4")]) d = dd.from_array(x, chunksize=4) assert isinstance(d, dd.DataFrame) assert list(d.columns) == ["a", "b"] assert d.divisions == (0, 4, 8, 9) assert (d.compute().to_records(index=False) == x).all() def test_from_pandas_dataframe(): a = list("aaaaaaabbbbbbbbccccccc") df = pd.DataFrame( dict(a=a, b=np.random.randn(len(a))), index=pd.date_range(start="20120101", periods=len(a)), ) ddf = dd.from_pandas(df, 3) assert len(ddf.dask) == 3 assert len(ddf.divisions) == len(ddf.dask) + 1 assert isinstance(ddf.divisions[0], type(df.index[0])) tm.assert_frame_equal(df, ddf.compute()) ddf = dd.from_pandas(df, chunksize=8) msg = "Exactly one of npartitions and chunksize must be specified." with pytest.raises(ValueError) as err: dd.from_pandas(df, npartitions=2, chunksize=2) assert msg in str(err.value) with pytest.raises((ValueError, AssertionError)) as err: dd.from_pandas(df) assert msg in str(err.value) assert len(ddf.dask) == 3 assert len(ddf.divisions) == len(ddf.dask) + 1 assert isinstance(ddf.divisions[0], type(df.index[0])) tm.assert_frame_equal(df, ddf.compute()) def test_from_pandas_small(): df = pd.DataFrame({"x": [1, 2, 3]}) for i in [1, 2, 30]: a = dd.from_pandas(df, i) assert len(a.compute()) == 3 assert a.divisions[0] == 0 assert a.divisions[-1] == 2 a = dd.from_pandas(df, chunksize=i) assert len(a.compute()) == 3 assert a.divisions[0] == 0 assert a.divisions[-1] == 2 for sort in [True, False]: for i in [0, 2]: df = pd.DataFrame({"x": [0] * i}) ddf = dd.from_pandas(df, npartitions=5, sort=sort) assert_eq(df, ddf) s = pd.Series([0] * i, name="x", dtype=int) ds = dd.from_pandas(s, npartitions=5, sort=sort) assert_eq(s, ds) @pytest.mark.parametrize("n", [1, 2, 4, 5]) def test_from_pandas_npartitions_is_accurate(n): df = pd.DataFrame( {"x": [1, 2, 3, 4, 5, 6], "y": list("abdabd")}, index=[10, 20, 30, 40, 50, 60] ) assert dd.from_pandas(df, npartitions=n).npartitions <= n def test_from_pandas_series(): n = 20 s = pd.Series(np.random.randn(n), index=pd.date_range(start="20120101", periods=n)) ds = dd.from_pandas(s, 3) assert len(ds.dask) == 3 assert len(ds.divisions) == len(ds.dask) + 1 assert isinstance(ds.divisions[0], type(s.index[0])) tm.assert_series_equal(s, ds.compute()) ds = dd.from_pandas(s, chunksize=8) assert len(ds.dask) == 3 assert len(ds.divisions) == len(ds.dask) + 1 assert isinstance(ds.divisions[0], type(s.index[0])) tm.assert_series_equal(s, ds.compute()) def test_from_pandas_non_sorted(): df = pd.DataFrame({"x": [1, 2, 3]}, index=[3, 1, 2]) ddf = dd.from_pandas(df, npartitions=2, sort=False) assert not ddf.known_divisions assert_eq(df, ddf) ddf = dd.from_pandas(df, chunksize=2, sort=False) assert not ddf.known_divisions assert_eq(df, ddf) def test_from_pandas_single_row(): df = pd.DataFrame({"x": [1]}, index=[1]) ddf = dd.from_pandas(df, npartitions=1) assert ddf.divisions == (1, 1) assert_eq(ddf, df) def test_from_pandas_with_datetime_index(): df = pd.DataFrame( { "Date": [ "2015-08-28", "2015-08-27", "2015-08-26", "2015-08-25", "2015-08-24", "2015-08-21", "2015-08-20", "2015-08-19", "2015-08-18", ], "Val": list(range(9)), } ) df.Date = df.Date.astype("datetime64[ns]") ddf = dd.from_pandas(df, 2) assert_eq(df, ddf) ddf = dd.from_pandas(df, chunksize=2) assert_eq(df, ddf) @pytest.mark.parametrize("null_value", [None, pd.NaT, pd.NA]) def test_from_pandas_with_index_nulls(null_value): df = pd.DataFrame({"x": [1, 2, 3]}, index=["C", null_value, "A"]) with pytest.raises(NotImplementedError, match="is non-numeric and contains nulls"): dd.from_pandas(df, npartitions=2, sort=False) def test_from_pandas_with_wrong_args(): df = pd.DataFrame({"x": [1, 2, 3]}, index=[3, 2, 1]) with pytest.raises(TypeError, match="must be a pandas DataFrame or Series"): dd.from_pandas("foo") with pytest.raises( ValueError, match="one of npartitions and chunksize must be specified" ): dd.from_pandas(df) with pytest.raises(TypeError, match="provide npartitions as an int"): dd.from_pandas(df, npartitions=5.2, sort=False) with pytest.raises(TypeError, match="provide chunksize as an int"): dd.from_pandas(df, chunksize=18.27) def test_from_pandas_chunksize_one(): # See: https://github.com/dask/dask/issues/9218 df = pd.DataFrame(np.random.randint(0, 10, size=(10, 4)), columns=list("ABCD")) ddf = dd.from_pandas(df, chunksize=1) num_rows = list(ddf.map_partitions(len).compute()) # chunksize=1 with range index should # always have unit-length partitions assert num_rows == [1] * 10 @pytest.mark.parametrize( "index", [ ["A", "B", "C", "C", "C", "C", "C", "C"], ["A", "B", "B", "B", "B", "B", "B", "C"], ["A", "A", "A", "A", "A", "B", "B", "C"], ], ) def test_from_pandas_npartitions_duplicates(index): df = pd.DataFrame({"a": range(8), "index": index}).set_index("index") ddf = dd.from_pandas(df, npartitions=3) assert ddf.divisions == ("A", "B", "C", "C") @pytest.mark.gpu def test_gpu_from_pandas_npartitions_duplicates(): cudf = pytest.importorskip("cudf") index = ["A", "A", "A", "A", "A", "B", "B", "C"] df = cudf.DataFrame({"a": range(8), "index": index}).set_index("index") ddf = dd.from_pandas(df, npartitions=3) assert ddf.divisions == ("A", "B", "C", "C") def test_DataFrame_from_dask_array(): x = da.ones((10, 3), chunks=(4, 2)) pdf = pd.DataFrame(np.ones((10, 3)), columns=["a", "b", "c"]) df = dd.from_dask_array(x, ["a", "b", "c"]) assert not hlg_layer_topological(df.dask, -1).is_materialized() assert_eq(df, pdf) # dd.from_array should re-route to from_dask_array df2 = dd.from_array(x, columns=["a", "b", "c"]) assert not hlg_layer_topological(df2.dask, -1).is_materialized() assert_eq(df, df2) def test_DataFrame_from_dask_array_with_blockwise_ops(): x = da.ones((10, 3), chunks=(4, 2)) x *= 2 pdf = pd.DataFrame(np.ones((10, 3)) * 2, columns=["a", "b", "c"]) df = dd.from_dask_array(x, ["a", "b", "c"]) # None of the layers in this graph should be materialized, everything should # be a HighLevelGraph still. assert all( not hlg_layer_topological(df.dask, i).is_materialized() for i in range(len(df.dask.layers)) ) assert_eq(df, pdf) def test_Series_from_dask_array(): x = da.ones(10, chunks=4) pser = pd.Series(np.ones(10), name="a") ser = dd.from_dask_array(x, "a") assert_eq(ser, pser) # Not passing a name should result in the name == None pser = pd.Series(np.ones(10)) ser = dd.from_dask_array(x) assert_eq(ser, pser) # dd.from_array should re-route to from_dask_array ser2 = dd.from_array(x) assert_eq(ser, ser2) @pytest.mark.parametrize("as_frame", [True, False]) def test_from_dask_array_index(as_frame): s = dd.from_pandas(pd.Series(range(10), index=list("abcdefghij")), npartitions=3) if as_frame: s = s.to_frame() result = dd.from_dask_array(s.values, index=s.index) assert_eq(s, result) def test_from_dask_array_index_raises(): x = da.random.uniform(size=(10,), chunks=(5,)) with pytest.raises(ValueError, match="must be an instance"): dd.from_dask_array(x, index=pd.Index(np.arange(10))) a = dd.from_pandas(pd.Series(range(12)), npartitions=2) b = dd.from_pandas(pd.Series(range(12)), npartitions=4) with pytest.raises(ValueError, match=".*index.*numbers of blocks.*4 != 2"): dd.from_dask_array(a.values, index=b.index) def test_from_array_raises_more_than_2D(): x = da.ones((3, 3, 3), chunks=2) y = np.ones((3, 3, 3)) with pytest.raises(ValueError, match="more than 2D array"): dd.from_dask_array(x) # dask with pytest.raises(ValueError, match="more than 2D array"): dd.from_array(y) # numpy def test_from_dask_array_compat_numpy_array(): x = da.ones((10, 3), chunks=(3, 3)) y = np.ones((10, 3)) d1 = dd.from_dask_array(x) # dask p1 = pd.DataFrame(y) assert_eq(d1, p1) d2 = dd.from_array(y) # numpy assert_eq(d2, d1) def test_from_array_wrong_column_shape_error(): x = da.ones((10, 3), chunks=(3, 3)) with pytest.raises(ValueError, match="names must match width"): dd.from_dask_array(x, columns=["a"]) # dask y = np.ones((10, 3)) with pytest.raises(ValueError, match="names must match width"): dd.from_array(y, columns=["a"]) # numpy def test_from_array_with_column_names(): x = da.ones((10, 3), chunks=(3, 3)) y = np.ones((10, 3)) d1 = dd.from_dask_array(x, columns=["a", "b", "c"]) # dask p1 = pd.DataFrame(y, columns=["a", "b", "c"]) assert_eq(d1, p1) d2 = dd.from_array(y, columns=["a", "b", "c"]) # numpy assert_eq(d1, d2) def test_from_dask_array_compat_numpy_array_1d(): x = da.ones(10, chunks=3) y = np.ones(10) d1 = dd.from_dask_array(x) # dask p1 = pd.Series(y) assert_eq(d1, p1) d2 = dd.from_array(y) # numpy assert_eq(d2, d1) def test_from_array_1d_with_column_names(): x = da.ones(10, chunks=3) y = np.ones(10) d1 = dd.from_dask_array(x, columns="name") # dask p1 = pd.Series(y, name="name") assert_eq(d1, p1) d2 = dd.from_array(x.compute(), columns="name") # numpy assert_eq(d2, d1) def test_from_array_1d_list_of_columns_gives_dataframe(): x = da.ones(10, chunks=3) y = np.ones(10) # passing list via columns results in DataFrame d1 = dd.from_dask_array(x, columns=["name"]) # dask p1 = pd.DataFrame(y, columns=["name"]) assert_eq(d1, p1) d2 = dd.from_array(y, columns=["name"]) # numpy assert_eq(d2, d1) def test_from_dask_array_struct_dtype(): x = np.array([(1, "a"), (2, "b")], dtype=[("a", "i4"), ("b", "object")]) y = da.from_array(x, chunks=(1,)) df = dd.from_dask_array(y) tm.assert_index_equal(df.columns, pd.Index(["a", "b"])) assert_eq(df, pd.DataFrame(x)) assert_eq( dd.from_dask_array(y, columns=["b", "a"]), pd.DataFrame(x, columns=["b", "a"]) ) def test_from_dask_array_unknown_chunks(): # Series dx = da.Array( {("x", 0): np.arange(5), ("x", 1): np.arange(5, 11)}, "x", ((np.nan, np.nan),), np.arange(1).dtype, ) df = dd.from_dask_array(dx) assert isinstance(df, dd.Series) assert not df.known_divisions assert_eq(df, pd.Series(np.arange(11)), check_index=False) # DataFrame dsk = {("x", 0, 0): np.random.random((2, 3)), ("x", 1, 0): np.random.random((5, 3))} dx = da.Array(dsk, "x", ((np.nan, np.nan), (3,)), np.float64) df = dd.from_dask_array(dx) assert isinstance(df, dd.DataFrame) assert not df.known_divisions assert_eq(df, pd.DataFrame(dx.compute()), check_index=False) @pytest.mark.parametrize( "chunksizes, expected_divisions", [ pytest.param((1, 2, 3, 0), (0, 1, 3, 5, 5)), pytest.param((0, 1, 2, 3), (0, 0, 1, 3, 5)), pytest.param((1, 0, 2, 3), (0, 1, 1, 3, 5)), ], ) def test_from_dask_array_empty_chunks(chunksizes, expected_divisions): monotonic_index = da.from_array(np.arange(6), chunks=chunksizes) df = dd.from_dask_array(monotonic_index) assert df.divisions == expected_divisions def test_from_dask_array_unknown_width_error(): dsk = {("x", 0, 0): np.random.random((2, 3)), ("x", 1, 0): np.random.random((5, 3))} dx = da.Array(dsk, "x", ((np.nan, np.nan), (np.nan,)), np.float64) with pytest.raises(ValueError, match="Shape along axis 1 must be known"): dd.from_dask_array(dx) @pytest.mark.gpu @pytest.mark.parametrize( "array_backend, df_backend", [("cupy", "cudf"), ("numpy", "pandas")], ) def test_from_array_dispatching(array_backend, df_backend): # Check array -> dataframe dispatching array_lib = pytest.importorskip(array_backend) df_lib = pytest.importorskip(df_backend) with config.set({"array.backend": array_backend}): darr = da.ones(10) assert isinstance(darr._meta, array_lib.ndarray) ddf1 = dd.from_array(darr) # Invokes `from_dask_array` ddf2 = dd.from_array(darr.compute()) assert isinstance(ddf1._meta, df_lib.Series) assert isinstance(ddf2._meta, df_lib.Series) assert_eq(ddf1, ddf2) def test_to_bag(): a = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]}, index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"), ) ddf = dd.from_pandas(a, 2) assert ddf.to_bag().compute() == list(a.itertuples(False)) assert ddf.to_bag(True).compute() == list(a.itertuples(True)) assert ddf.to_bag(format="dict").compute() == [ {"x": "a", "y": 2}, {"x": "b", "y": 3}, {"x": "c", "y": 4}, {"x": "d", "y": 5}, ] assert ddf.to_bag(True, format="dict").compute() == [ {"index": 1.0, "x": "a", "y": 2}, {"index": 2.0, "x": "b", "y": 3}, {"index": 3.0, "x": "c", "y": 4}, {"index": 4.0, "x": "d", "y": 5}, ] assert ddf.x.to_bag(True).compute() == list(a.x.items()) assert ddf.x.to_bag().compute() == list(a.x) assert ddf.x.to_bag(True, format="dict").compute() == [ {"x": "a"}, {"x": "b"}, {"x": "c"}, {"x": "d"}, ] assert ddf.x.to_bag(format="dict").compute() == [ {"x": "a"}, {"x": "b"}, {"x": "c"}, {"x": "d"}, ] def test_to_bag_frame(): from dask import get from dask.bag import Bag ddf = dd.from_pandas( pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]}, index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"), ), npartitions=2, ) # Convert to bag, and check that # collection type has changed, but # partition data has not bagdf = ddf.to_bag(format="frame") assert isinstance(bagdf, Bag) assert_eq(get(bagdf.dask, (bagdf.name, 0)), ddf.partitions[0]) assert_eq(get(bagdf.dask, (bagdf.name, 1)), ddf.partitions[1]) def test_to_records(): pytest.importorskip("dask.array") from dask.array.utils import assert_eq df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]}, index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"), ) ddf = dd.from_pandas(df, 2) assert_eq( df.to_records(), ddf.to_records(), check_type=False ) # TODO: make check_type pass @pytest.mark.parametrize("lengths", [[2, 2], True]) def test_to_records_with_lengths(lengths): pytest.importorskip("dask.array") from dask.array.utils import assert_eq df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]}, index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"), ) ddf = dd.from_pandas(df, 2) result = ddf.to_records(lengths=lengths) assert_eq(df.to_records(), result, check_type=False) # TODO: make check_type pass assert isinstance(result, da.Array) expected_chunks = ((2, 2),) assert result.chunks == expected_chunks def test_to_records_raises(): pytest.importorskip("dask.array") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]}, index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"), ) ddf = dd.from_pandas(df, 2) with pytest.raises(ValueError): ddf.to_records(lengths=[2, 2, 2]) pytest.fail("3 != 2") with pytest.raises(ValueError): ddf.to_records(lengths=5) pytest.fail("Unexpected value") def test_from_delayed(): df = pd.DataFrame(data=np.random.normal(size=(10, 4)), columns=list("abcd")) parts = [df.iloc[:1], df.iloc[1:3], df.iloc[3:6], df.iloc[6:10]] dfs = [delayed(parts.__getitem__)(i) for i in range(4)] meta = dfs[0].compute() my_len = lambda x: pd.Series([len(x)]) for divisions in [None, [0, 1, 3, 6, 10]]: ddf = dd.from_delayed(dfs, meta=meta, divisions=divisions) assert_eq(ddf, df) assert list(ddf.map_partitions(my_len).compute()) == [1, 2, 3, 4] assert ddf.known_divisions == (divisions is not None) s = dd.from_delayed([d.a for d in dfs], meta=meta.a, divisions=divisions) assert_eq(s, df.a) assert list(s.map_partitions(my_len).compute()) == [1, 2, 3, 4] assert ddf.known_divisions == (divisions is not None) meta2 = [(c, "f8") for c in df.columns] # Make sure `from_delayed` is Blockwise check_ddf = dd.from_delayed(dfs, meta=meta2) assert isinstance(check_ddf.dask.layers[check_ddf._name], Blockwise) assert_eq(dd.from_delayed(dfs, meta=meta2), df) assert_eq(dd.from_delayed([d.a for d in dfs], meta=("a", "f8")), df.a) with pytest.raises(ValueError): dd.from_delayed(dfs, meta=meta, divisions=[0, 1, 3, 6]) with pytest.raises(ValueError) as e: dd.from_delayed(dfs, meta=meta.a).compute() assert str(e.value).startswith("Metadata mismatch found in `from_delayed`") def test_from_delayed_optimize_fusion(): # Test that DataFrame optimization fuses a `from_delayed` # layer with other Blockwise layers and input Delayed tasks. # See: https://github.com/dask/dask/pull/8852 ddf = ( dd.from_delayed( map(delayed(lambda x: pd.DataFrame({"x": [x] * 10})), range(10)), meta=pd.DataFrame({"x": [0] * 10}), ) + 1 ) # NOTE: Fusion requires `optimize_blockwise`` and `fuse_roots` assert isinstance(ddf.dask.layers[ddf._name], Blockwise) assert len(optimize(ddf.dask, ddf.__dask_keys__()).layers) == 1 def test_from_delayed_to_dask_array(): # Check that `from_delayed`` can be followed # by `to_dask_array` without breaking # optimization behavior # See: https://github.com/dask-contrib/dask-sql/issues/497 from dask.blockwise import optimize_blockwise dfs = [delayed(pd.DataFrame)(np.ones((3, 2))) for i in range(3)] ddf = dd.from_delayed(dfs) arr = ddf.to_dask_array() # If we optimize this graph without calling # `fuse_roots`, the underlying `BlockwiseDep` # `mapping` keys will be 1-D (e.g. `(4,)`), # while the collection keys will be 2-D # (e.g. `(4, 0)`) keys = [k[0] for k in arr.__dask_keys__()] dsk = optimize_blockwise(arr.dask, keys=keys) dsk.cull(keys) result = arr.compute() assert result.shape == (9, 2) def test_from_delayed_preserves_hlgs(): df = pd.DataFrame(data=np.random.normal(size=(10, 4)), columns=list("abcd")) parts = [df.iloc[:1], df.iloc[1:3], df.iloc[3:6], df.iloc[6:10]] dfs = [delayed(parts.__getitem__)(i) for i in range(4)] meta = dfs[0].compute() chained = [d.a for d in dfs] hlg = dd.from_delayed(chained, meta=meta).dask for d in chained: for layer_name, layer in d.dask.layers.items(): assert hlg.layers[layer_name] == layer assert hlg.dependencies[layer_name] == d.dask.dependencies[layer_name] def test_from_delayed_misordered_meta(): df = pd.DataFrame( columns=["(1)", "(2)", "date", "ent", "val"], data=[range(i * 5, i * 5 + 5) for i in range(3)], index=range(3), ) # meta with different order for columns misordered_meta = pd.DataFrame( columns=["date", "ent", "val", "(1)", "(2)"], data=[range(5)] ) ddf = dd.from_delayed([delayed(lambda: df)()], meta=misordered_meta) with pytest.raises(ValueError) as info: # produces dataframe which does not match meta ddf.reset_index().compute(scheduler="sync") msg = ( "The columns in the computed data do not match the columns in the" " provided metadata" ) assert msg in str(info.value) def test_from_delayed_sorted(): a = pd.DataFrame({"x": [1, 2]}, index=[1, 10]) b = pd.DataFrame({"x": [4, 1]}, index=[100, 200]) A = dd.from_delayed([delayed(a), delayed(b)], divisions="sorted") assert A.known_divisions assert A.divisions == (1, 100, 200) def test_to_delayed(): df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]}) ddf = dd.from_pandas(df, npartitions=2) # Frame a, b = ddf.to_delayed() assert isinstance(a, Delayed) assert isinstance(b, Delayed) assert_eq(a.compute(), df.iloc[:2]) # Scalar x = ddf.x.sum() dx = x.to_delayed() assert isinstance(dx, Delayed) assert_eq(dx.compute(), x) def test_to_delayed_optimize_graph(): df = pd.DataFrame({"x": list(range(20))}) ddf = dd.from_pandas(df, npartitions=20) ddf2 = (ddf + 1).loc[:2] # Frame d = ddf2.to_delayed()[0] assert len(d.dask) < 20 d2 = ddf2.to_delayed(optimize_graph=False)[0] assert sorted(d2.dask) == sorted(ddf2.dask) assert_eq(ddf2.get_partition(0), d.compute()) assert_eq(ddf2.get_partition(0), d2.compute()) # Scalar x = ddf2.x.sum() dx = x.to_delayed() dx2 = x.to_delayed(optimize_graph=False) assert len(dx.dask) < len(dx2.dask) assert_eq(dx.compute(), dx2.compute()) def test_from_dask_array_index_dtype(): x = da.ones((10,), chunks=(5,)) df = pd.DataFrame( { "date": pd.date_range("2019-01-01", periods=10, freq="1T"), "val1": list(range(10)), } ) ddf = dd.from_pandas(df, npartitions=2).set_index("date") ddf2 = dd.from_dask_array(x, index=ddf.index, columns="val2") assert ddf.index.dtype == ddf2.index.dtype assert ddf.index.name == ddf2.index.name df = pd.DataFrame({"idx": np.arange(0, 1, 0.1), "val1": list(range(10))}) ddf = dd.from_pandas(df, npartitions=2).set_index("idx") ddf2 = dd.from_dask_array(x, index=ddf.index, columns="val2") assert ddf.index.dtype == ddf2.index.dtype assert ddf.index.name == ddf2.index.name @pytest.mark.parametrize( "vals", [ ("A", "B"), (3, 4), (datetime(2020, 10, 1), datetime(2022, 12, 31)), ], ) def test_from_map_simple(vals): # Simple test to ensure required inputs (func & iterable) # and basic kwargs work as expected for `from_map` def func(input, size=0): # Simple function to create Series with a # repeated value and index value, index = input return pd.Series([value] * size, index=[index] * size) iterable = [(vals[0], 1), (vals[1], 2)] ser = dd.from_map(func, iterable, size=2) expect = pd.Series( [vals[0], vals[0], vals[1], vals[1]], index=[1, 1, 2, 2], ) # Make sure `from_map` produces single `Blockwise` layer layers = ser.dask.layers assert len(layers) == 1 assert isinstance(layers[ser._name], Blockwise) # Check that result and partition count make sense assert ser.npartitions == len(iterable) assert_eq(ser, expect) def test_from_map_multi(): # Test that `iterables` can contain multiple Iterables func = lambda x, y: pd.DataFrame({"add": x + y}) iterables = ( [np.arange(2, dtype="int64"), np.arange(2, dtype="int64")], [np.array([2, 2], dtype="int64"), np.array([2, 2], dtype="int64")], ) index = np.array([0, 1, 0, 1], dtype="int64") expect = pd.DataFrame({"add": np.array([2, 3, 2, 3], dtype="int64")}, index=index) ddf = dd.from_map(func, *iterables) assert_eq(ddf, expect) def test_from_map_args(): # Test that the optional `args` argument works as expected func = lambda x, y, z: pd.DataFrame({"add": x + y + z}) iterable = [np.arange(2, dtype="int64"), np.arange(2, dtype="int64")] index = np.array([0, 1, 0, 1], dtype="int64") expect = pd.DataFrame({"add": np.array([5, 6, 5, 6], dtype="int64")}, index=index) ddf = dd.from_map(func, iterable, args=[2, 3]) assert_eq(ddf, expect) def test_from_map_divisions(): # Test that `divisions` argument works as expected for `from_map` func = lambda x: pd.Series([x[0]] * 2, index=range(x[1], x[1] + 2)) iterable = [("B", 0), ("C", 2)] divisions = (0, 2, 4) ser = dd.from_map(func, iterable, divisions=divisions) expect = pd.Series( ["B", "B", "C", "C"], index=[0, 1, 2, 3], ) assert ser.divisions == divisions assert_eq(ser, expect) def test_from_map_meta(): # Test that `meta` can be specified to `from_map`, # and that `enforce_metadata` works as expected func = lambda x, s=0: pd.DataFrame({"x": [x] * s}) iterable = ["A", "B"] expect = pd.DataFrame({"x": ["A", "A", "B", "B"]}, index=[0, 1, 0, 1]) # First Check - Pass in valid metadata meta = pd.DataFrame({"x": ["A"]}).iloc[:0] ddf = dd.from_map(func, iterable, meta=meta, s=2) assert_eq(ddf._meta, meta) assert_eq(ddf, expect) # Second Check - Pass in invalid metadata meta = pd.DataFrame({"a": ["A"]}).iloc[:0] ddf = dd.from_map(func, iterable, meta=meta, s=2) assert_eq(ddf._meta, meta) with pytest.raises(ValueError, match="The columns in the computed data"): assert_eq(ddf.compute(), expect) # Third Check - Pass in invalid metadata, # but use `enforce_metadata=False` ddf = dd.from_map(func, iterable, meta=meta, enforce_metadata=False, s=2) assert_eq(ddf._meta, meta) assert_eq(ddf.compute(), expect) def test_from_map_custom_name(): # Test that `label` and `token` arguments to # `from_map` works as expected func = lambda x: pd.DataFrame({"x": [x] * 2}) iterable = ["A", "B"] label = "my-label" token = "8675309" expect = pd.DataFrame({"x": ["A", "A", "B", "B"]}, index=[0, 1, 0, 1]) ddf = dd.from_map(func, iterable, label=label, token=token) assert ddf._name == label + "-" + token assert_eq(ddf, expect) def _generator(): # Simple generator for test_from_map_other_iterables yield from enumerate(["A", "B", "C"]) @pytest.mark.parametrize( "iterable", [ enumerate(["A", "B", "C"]), ((0, "A"), (1, "B"), (2, "C")), _generator(), ], ) def test_from_map_other_iterables(iterable): # Test that iterable arguments to `from_map` # can be enumerate and generator # See: https://github.com/dask/dask/issues/9064 def func(t): size = t[0] + 1 x = t[1] return pd.Series([x] * size) ddf = dd.from_map(func, iterable) expect = pd.Series( ["A", "B", "B", "C", "C", "C"], index=[0, 0, 1, 0, 1, 2], ) assert_eq(ddf.compute(), expect) def test_from_map_column_projection(): # Test that column projection works # as expected with from_map when # enforce_metadata=True projected = [] class MyFunc: def __init__(self, columns=None): self.columns = columns def project_columns(self, columns): return MyFunc(columns) def __call__(self, t): size = t[0] + 1 x = t[1] df = pd.DataFrame({"A": [x] * size, "B": [10] * size}) if self.columns is None: return df projected.extend(self.columns) return df[self.columns] ddf = dd.from_map( MyFunc(), enumerate([0, 1, 2]), label="myfunc", enforce_metadata=True, ) expect = pd.DataFrame( { "A": [0, 1, 1, 2, 2, 2], "B": [10] * 6, }, index=[0, 0, 1, 0, 1, 2], ) assert_eq(ddf["A"], expect["A"]) assert set(projected) == {"A"} assert_eq(ddf, expect) @pytest.mark.gpu @pytest.mark.parametrize("backend", ["pandas", "cudf"]) def test_from_dict_backends(backend): _lib = pytest.importorskip(backend) with config.set({"dataframe.backend": backend}): data = {"a": [1, 2, 3, 4], "B": [10, 11, 12, 13]} expected = _lib.DataFrame(data) # Check dd.from_dict API got = dd.from_dict(data, npartitions=2) assert_eq(expected, got) # Check from_dict classmethod got_classmethod = got.from_dict(data, npartitions=2) assert_eq(expected, got_classmethod)