import os import pathlib from time import sleep import numpy as np import pandas as pd import pytest from packaging.version import Version import dask import dask.dataframe as dd from dask.compatibility import _PY_VERSION from dask.dataframe._compat import tm from dask.dataframe.optimize import optimize_dataframe_getitem from dask.dataframe.utils import assert_eq from dask.layers import DataFrameIOLayer from dask.utils import dependency_depth, tmpdir, tmpfile def test_to_hdf(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) a = dd.from_pandas(df, 2) with tmpfile("h5") as fn: a.to_hdf(fn, "/data") out = pd.read_hdf(fn, "/data") tm.assert_frame_equal(df, out[:]) with tmpfile("h5") as fn: a.x.to_hdf(fn, "/data") out = pd.read_hdf(fn, "/data") tm.assert_series_equal(df.x, out[:]) a = dd.from_pandas(df, 1) with tmpfile("h5") as fn: a.to_hdf(fn, "/data") out = pd.read_hdf(fn, "/data") tm.assert_frame_equal(df, out[:]) # test compute = False with tmpfile("h5") as fn: r = a.to_hdf(fn, "/data", compute=False) r.compute() out = pd.read_hdf(fn, "/data") tm.assert_frame_equal(df, out[:]) @pytest.mark.skipif( _PY_VERSION >= Version("3.11"), reason="segfaults due to https://github.com/PyTables/PyTables/issues/977", ) def test_to_hdf_multiple_nodes(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) a = dd.from_pandas(df, 2) df16 = pd.DataFrame( { "x": [ "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", ], "y": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], }, index=[ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, ], ) b = dd.from_pandas(df16, 16) # saving to multiple nodes with tmpfile("h5") as fn: a.to_hdf(fn, "/data*") out = dd.read_hdf(fn, "/data*") assert_eq(df, out) # saving to multiple nodes making sure order is kept with tmpfile("h5") as fn: b.to_hdf(fn, "/data*") out = dd.read_hdf(fn, "/data*") assert_eq(df16, out) # saving to multiple datasets with custom name_function with tmpfile("h5") as fn: a.to_hdf(fn, "/data_*", name_function=lambda i: "a" * (i + 1)) out = dd.read_hdf(fn, "/data_*") assert_eq(df, out) out = pd.read_hdf(fn, "/data_a") tm.assert_frame_equal(out, df.iloc[:2]) out = pd.read_hdf(fn, "/data_aa") tm.assert_frame_equal(out, df.iloc[2:]) # test multiple nodes with hdf object with tmpfile("h5") as fn: with pd.HDFStore(fn) as hdf: b.to_hdf(hdf, "/data*") out = dd.read_hdf(fn, "/data*") assert_eq(df16, out) # Test getitem optimization with tmpfile("h5") as fn: a.to_hdf(fn, "/data*") out = dd.read_hdf(fn, "/data*")[["x"]] dsk = optimize_dataframe_getitem(out.dask, keys=out.__dask_keys__()) read = [key for key in dsk.layers if key.startswith("read-hdf")][0] subgraph = dsk.layers[read] assert isinstance(subgraph, DataFrameIOLayer) assert subgraph.columns == ["x"] def test_to_hdf_multiple_files(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) a = dd.from_pandas(df, 2) df16 = pd.DataFrame( { "x": [ "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", ], "y": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], }, index=[ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, ], ) b = dd.from_pandas(df16, 16) # saving to multiple files with tmpdir() as dn: fn = os.path.join(dn, "data_*.h5") a.to_hdf(fn, "/data") out = dd.read_hdf(fn, "/data") assert_eq(df, out) # saving to multiple files making sure order is kept with tmpdir() as dn: fn = os.path.join(dn, "data_*.h5") b.to_hdf(fn, "/data") out = dd.read_hdf(fn, "/data") assert_eq(df16, out) # saving to multiple files where first file is longer # https://github.com/dask/dask/issues/8023 with tmpdir() as dn: fn1 = os.path.join(dn, "data_1.h5") fn2 = os.path.join(dn, "data_2.h5") b.to_hdf(fn1, "/data") a.to_hdf(fn2, "/data") out = dd.read_hdf([fn1, fn2], "/data") assert_eq(pd.concat([df16, df]), out) # saving to multiple files with custom name_function with tmpdir() as dn: fn = os.path.join(dn, "data_*.h5") a.to_hdf(fn, "/data", name_function=lambda i: "a" * (i + 1)) out = dd.read_hdf(fn, "/data") assert_eq(df, out) out = pd.read_hdf(os.path.join(dn, "data_a.h5"), "/data") tm.assert_frame_equal(out, df.iloc[:2]) out = pd.read_hdf(os.path.join(dn, "data_aa.h5"), "/data") tm.assert_frame_equal(out, df.iloc[2:]) # test hdf object with tmpfile("h5") as fn: with pd.HDFStore(fn) as hdf: a.to_hdf(hdf, "/data*") out = dd.read_hdf(fn, "/data*") assert_eq(df, out) def test_to_hdf_modes_multiple_nodes(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) # appending a single partition to existing data a = dd.from_pandas(df, 1) with tmpfile("h5") as fn: a.to_hdf(fn, "/data2") a.to_hdf(fn, "/data*", mode="a") out = dd.read_hdf(fn, "/data*") assert_eq(dd.concat([df, df]), out) # overwriting a file with a single partition a = dd.from_pandas(df, 1) with tmpfile("h5") as fn: a.to_hdf(fn, "/data2") a.to_hdf(fn, "/data*", mode="w") out = dd.read_hdf(fn, "/data*") assert_eq(df, out) # appending two partitions to existing data a = dd.from_pandas(df, 2) with tmpfile("h5") as fn: a.to_hdf(fn, "/data2") a.to_hdf(fn, "/data*", mode="a") out = dd.read_hdf(fn, "/data*") assert_eq(dd.concat([df, df]), out) # overwriting a file with two partitions a = dd.from_pandas(df, 2) with tmpfile("h5") as fn: a.to_hdf(fn, "/data2") a.to_hdf(fn, "/data*", mode="w") out = dd.read_hdf(fn, "/data*") assert_eq(df, out) # overwriting a single partition, keeping other partitions a = dd.from_pandas(df, 2) with tmpfile("h5") as fn: a.to_hdf(fn, "/data1") a.to_hdf(fn, "/data2") a.to_hdf(fn, "/data*", mode="a", append=False) out = dd.read_hdf(fn, "/data*") assert_eq(dd.concat([df, df]), out) def test_to_hdf_modes_multiple_files(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) # appending a single partition to existing data a = dd.from_pandas(df, 1) with tmpdir() as dn: fn = os.path.join(dn, "data*") a.to_hdf(os.path.join(dn, "data2"), "/data") a.to_hdf(fn, "/data", mode="a") out = dd.read_hdf(fn, "/data*") assert_eq(dd.concat([df, df]), out) # appending two partitions to existing data a = dd.from_pandas(df, 2) with tmpdir() as dn: fn = os.path.join(dn, "data*") a.to_hdf(os.path.join(dn, "data2"), "/data") a.to_hdf(fn, "/data", mode="a") out = dd.read_hdf(fn, "/data") assert_eq(dd.concat([df, df]), out) # overwriting a file with two partitions a = dd.from_pandas(df, 2) with tmpdir() as dn: fn = os.path.join(dn, "data*") a.to_hdf(os.path.join(dn, "data1"), "/data") a.to_hdf(fn, "/data", mode="w") out = dd.read_hdf(fn, "/data") assert_eq(df, out) # overwriting a single partition, keeping other partitions a = dd.from_pandas(df, 2) with tmpdir() as dn: fn = os.path.join(dn, "data*") a.to_hdf(os.path.join(dn, "data1"), "/data") a.to_hdf(fn, "/data", mode="a", append=False) out = dd.read_hdf(fn, "/data") assert_eq(dd.concat([df, df]), out) def test_to_hdf_link_optimizations(): """testing dask link levels is correct by calculating the depth of the dask graph""" pytest.importorskip("tables") df16 = pd.DataFrame( { "x": [ "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", ], "y": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], }, index=[ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, ], ) a = dd.from_pandas(df16, 16) # saving to multiple hdf files, no links are needed # expected layers: from_pandas, to_hdf, list = depth of 3 with tmpdir() as dn: fn = os.path.join(dn, "data*") d = a.to_hdf(fn, "/data", compute=False) assert dependency_depth(d.dask) == 3 # saving to a single hdf file with multiple nodes # all subsequent nodes depend on the first # expected layers: from_pandas, first to_hdf(creates file+node), subsequent to_hdfs, list = 4 with tmpfile() as fn: d = a.to_hdf(fn, "/data*", compute=False) assert dependency_depth(d.dask) == 4 # saving to a single hdf file with a single node # every node depends on the previous node # expected layers: from_pandas, to_hdf times npartitions(15), list = 2 + npartitions = 17 with tmpfile() as fn: d = a.to_hdf(fn, "/data", compute=False) assert dependency_depth(d.dask) == 2 + a.npartitions @pytest.mark.skipif( _PY_VERSION >= Version("3.11"), reason="segfaults due to https://github.com/PyTables/PyTables/issues/977", ) @pytest.mark.slow def test_to_hdf_lock_delays(): pytest.importorskip("tables") df16 = pd.DataFrame( { "x": [ "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", ], "y": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], }, index=[ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, ], ) a = dd.from_pandas(df16, 16) # adding artificial delays to make sure last tasks finish first # that's a way to simulate last tasks finishing last def delayed_nop(i): if i[1] < 10: sleep(0.1 * (10 - i[1])) return i # saving to multiple hdf nodes with tmpfile() as fn: a = a.apply(delayed_nop, axis=1, meta=a) a.to_hdf(fn, "/data*") out = dd.read_hdf(fn, "/data*") assert_eq(df16, out) # saving to multiple hdf files # adding artificial delays to make sure last tasks finish first with tmpdir() as dn: fn = os.path.join(dn, "data*") a = a.apply(delayed_nop, axis=1, meta=a) a.to_hdf(fn, "/data") out = dd.read_hdf(fn, "/data") assert_eq(df16, out) def test_to_hdf_exceptions(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) a = dd.from_pandas(df, 1) # triggering too many asterisks error with tmpdir() as dn: with pytest.raises(ValueError): fn = os.path.join(dn, "data_*.h5") a.to_hdf(fn, "/data_*") # triggering too many asterisks error with tmpfile() as fn: with pd.HDFStore(fn) as hdf: with pytest.raises(ValueError): a.to_hdf(hdf, "/data_*_*") @pytest.mark.skipif( _PY_VERSION >= Version("3.11"), reason="segfaults due to https://github.com/PyTables/PyTables/issues/977", ) @pytest.mark.parametrize("scheduler", ["sync", "threads", "processes"]) @pytest.mark.parametrize("npartitions", [1, 4, 10]) def test_to_hdf_schedulers(scheduler, npartitions): pytest.importorskip("tables") df = pd.DataFrame( { "x": [ "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", ], "y": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], }, index=[ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, ], ) a = dd.from_pandas(df, npartitions=npartitions) # test single file single node with tmpfile("h5") as fn: a.to_hdf(fn, "/data", scheduler=scheduler) out = pd.read_hdf(fn, "/data") assert_eq(df, out) # test multiple files single node with tmpdir() as dn: fn = os.path.join(dn, "data_*.h5") a.to_hdf(fn, "/data", scheduler=scheduler) out = dd.read_hdf(fn, "/data") assert_eq(df, out) # test single file multiple nodes with tmpfile("h5") as fn: a.to_hdf(fn, "/data*", scheduler=scheduler) out = dd.read_hdf(fn, "/data*") assert_eq(df, out) def test_to_hdf_kwargs(): pytest.importorskip("tables") df = pd.DataFrame({"A": ["a", "aaaa"]}) ddf = dd.from_pandas(df, npartitions=2) with tmpfile("h5") as fn: ddf.to_hdf(fn, "foo4", format="table", min_itemsize=4) df2 = pd.read_hdf(fn, "foo4") tm.assert_frame_equal(df, df2) # test shorthand 't' for table with tmpfile("h5") as fn: ddf.to_hdf(fn, "foo4", format="t", min_itemsize=4) df2 = pd.read_hdf(fn, "foo4") tm.assert_frame_equal(df, df2) def test_to_fmt_warns(): pytest.importorskip("tables") df16 = pd.DataFrame( { "x": [ "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", ], "y": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], }, index=[ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, ], ) a = dd.from_pandas(df16, 16) # testing warning when breaking order with tmpfile("h5") as fn: with pytest.warns( UserWarning, match="To preserve order between partitions name_function" ): a.to_hdf(fn, "/data*", name_function=str) with tmpdir() as dn: fn = os.path.join(dn, "data_*.csv") a.to_csv(fn, name_function=str) @pytest.mark.parametrize( "data, compare", [ ( pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0], ), tm.assert_frame_equal, ), (pd.Series([1, 2, 3, 4], name="a"), tm.assert_series_equal), ], ) def test_read_hdf(data, compare): pytest.importorskip("tables") with tmpfile("h5") as fn: data.to_hdf(fn, "/data") try: dd.read_hdf(fn, "data", chunksize=2, mode="r") assert False except TypeError as e: assert "format='table'" in str(e) with tmpfile("h5") as fn: data.to_hdf(fn, "/data", format="table") a = dd.read_hdf(fn, "/data", chunksize=2, mode="r") assert a.npartitions == 2 compare(a.compute(), data) compare( dd.read_hdf(fn, "/data", chunksize=2, start=1, stop=3, mode="r").compute(), pd.read_hdf(fn, "/data", start=1, stop=3), ) assert sorted(dd.read_hdf(fn, "/data", mode="r").dask) == sorted( dd.read_hdf(fn, "/data", mode="r").dask ) with tmpfile("h5") as fn: sorted_data = data.sort_index() sorted_data.to_hdf(fn, "/data", format="table") a = dd.read_hdf(fn, "/data", chunksize=2, sorted_index=True, mode="r") assert a.npartitions == 2 compare(a.compute(), sorted_data) def test_read_hdf_multiply_open(): """Test that we can read from a file that's already opened elsewhere in read-only mode.""" pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) with tmpfile("h5") as fn: df.to_hdf(fn, "/data", format="table") with pd.HDFStore(fn, mode="r"): dd.read_hdf(fn, "/data", chunksize=2, mode="r") @pytest.mark.skipif( _PY_VERSION >= Version("3.11"), reason="segfaults due to https://github.com/PyTables/PyTables/issues/977", ) def test_read_hdf_multiple(): pytest.importorskip("tables") df = pd.DataFrame( { "x": [ "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", ], "y": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], }, index=[ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, ], ) a = dd.from_pandas(df, 16) with tmpfile("h5") as fn: a.to_hdf(fn, "/data*") r = dd.read_hdf(fn, "/data*", sorted_index=True) assert a.npartitions == r.npartitions assert a.divisions == r.divisions assert_eq(a, r) def test_read_hdf_start_stop_values(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) with tmpfile("h5") as fn: df.to_hdf(fn, "/data", format="table") with pytest.raises(ValueError, match="number of rows"): dd.read_hdf(fn, "/data", stop=10) with pytest.raises(ValueError, match="is above or equal to"): dd.read_hdf(fn, "/data", start=10) with pytest.raises(ValueError, match="positive integer"): dd.read_hdf(fn, "/data", chunksize=-1) def test_hdf_globbing(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) with tmpdir() as tdir: df.to_hdf(os.path.join(tdir, "one.h5"), "/foo/data", format="table") df.to_hdf(os.path.join(tdir, "two.h5"), "/bar/data", format="table") df.to_hdf(os.path.join(tdir, "two.h5"), "/foo/data", format="table") with dask.config.set(scheduler="sync"): res = dd.read_hdf(os.path.join(tdir, "one.h5"), "/*/data", chunksize=2) assert res.npartitions == 2 tm.assert_frame_equal(res.compute(), df) res = dd.read_hdf( os.path.join(tdir, "one.h5"), "/*/data", chunksize=2, start=1, stop=3 ) expected = pd.read_hdf( os.path.join(tdir, "one.h5"), "/foo/data", start=1, stop=3 ) tm.assert_frame_equal(res.compute(), expected) res = dd.read_hdf(os.path.join(tdir, "two.h5"), "/*/data", chunksize=2) assert res.npartitions == 2 + 2 tm.assert_frame_equal(res.compute(), pd.concat([df] * 2)) res = dd.read_hdf(os.path.join(tdir, "*.h5"), "/foo/data", chunksize=2) assert res.npartitions == 2 + 2 tm.assert_frame_equal(res.compute(), pd.concat([df] * 2)) res = dd.read_hdf(os.path.join(tdir, "*.h5"), "/*/data", chunksize=2) assert res.npartitions == 2 + 2 + 2 tm.assert_frame_equal(res.compute(), pd.concat([df] * 3)) def test_hdf_file_list(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) with tmpdir() as tdir: df.iloc[:2].to_hdf(os.path.join(tdir, "one.h5"), "dataframe", format="table") df.iloc[2:].to_hdf(os.path.join(tdir, "two.h5"), "dataframe", format="table") with dask.config.set(scheduler="sync"): input_files = [os.path.join(tdir, "one.h5"), os.path.join(tdir, "two.h5")] res = dd.read_hdf(input_files, "dataframe") tm.assert_frame_equal(res.compute(), df) def test_read_hdf_pattern_pathlike(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) with tmpfile("h5") as fn: path = pathlib.Path(fn) df.to_hdf(path, "dataframe", format="table") res = dd.read_hdf(path, "dataframe") assert_eq(res, df) def test_to_hdf_path_pathlike(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) ddf = dd.from_pandas(df, npartitions=3) with tmpfile("h5") as fn: path = pathlib.Path(fn) ddf.to_hdf(path, "/data") res = pd.read_hdf(path, "/data") assert_eq(res, ddf) def test_read_hdf_doesnt_segfault(): pytest.importorskip("tables") with tmpfile("h5") as fn: N = 40 df = pd.DataFrame(np.random.randn(N, 3)) with pd.HDFStore(fn, mode="w") as store: store.append("/x", df) ddf = dd.read_hdf(fn, "/x", chunksize=2) assert len(ddf) == N def test_hdf_filenames(): pytest.importorskip("tables") df = pd.DataFrame( {"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]}, index=[1.0, 2.0, 3.0, 4.0] ) ddf = dd.from_pandas(df, npartitions=2) assert ddf.to_hdf("foo*.hdf5", "key") == ["foo0.hdf5", "foo1.hdf5"] os.remove("foo0.hdf5") os.remove("foo1.hdf5") def test_hdf_path_exceptions(): # single file doesn't exist with pytest.raises(IOError): dd.read_hdf("nonexistant_store_X34HJK", "/tmp") # a file from a list of files doesn't exist with pytest.raises(IOError): dd.read_hdf(["nonexistant_store_X34HJK", "nonexistant_store_UY56YH"], "/tmp") # list of files is empty with pytest.raises(ValueError): dd.read_hdf([], "/tmp") def test_hdf_nonpandas_keys(): # https://github.com/dask/dask/issues/5934 # TODO: maybe remove this if/when pandas copes with all keys tables = pytest.importorskip("tables") import tables class Table1(tables.IsDescription): value1 = tables.Float32Col() class Table2(tables.IsDescription): value2 = tables.Float32Col() class Table3(tables.IsDescription): value3 = tables.Float32Col() with tmpfile("h5") as path: with tables.open_file(path, mode="a") as h5file: group = h5file.create_group("/", "group") t = h5file.create_table(group, "table1", Table1, "Table 1") row = t.row row["value1"] = 1 row.append() t = h5file.create_table(group, "table2", Table2, "Table 2") row = t.row row["value2"] = 1 row.append() t = h5file.create_table(group, "table3", Table3, "Table 3") row = t.row row["value3"] = 1 row.append() # pandas keys should still work bar = pd.DataFrame(np.random.randn(10, 4)) bar.to_hdf(path, "/bar", format="table", mode="a") dd.read_hdf(path, "/group/table1") dd.read_hdf(path, "/group/table2") dd.read_hdf(path, "/group/table3") dd.read_hdf(path, "/bar") def test_hdf_empty_dataframe(tmp_path): pytest.importorskip("tables") # https://github.com/dask/dask/issues/8707 from dask.dataframe.io.hdf import dont_use_fixed_error_message df = pd.DataFrame({"A": [], "B": []}, index=[]) df.to_hdf(tmp_path / "data.h5", format="fixed", key="df", mode="w") with pytest.raises(TypeError, match=dont_use_fixed_error_message): dd.read_hdf(tmp_path / "data.h5", "df")