Skip to content

Commit

Permalink
Use zarr-fixture to prevent thread leakage errors (#9967)
Browse files Browse the repository at this point in the history
* Use zarr-fixture to prevent thread leakage errors

* Apply suggestions from code review

Co-authored-by: Joe Hamman <[email protected]>

* Add whats-new.rst entry

* Explicitely add pyarrow to windows builds, as importing dask.dataframe (dask>=2025.1.0) raises ImportError when missing.

---------

Co-authored-by: Joe Hamman <[email protected]>
  • Loading branch information
kmuehlbauer and jhamman authored Jan 22, 2025
1 parent f2e9f86 commit 609412d
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 18 deletions.
2 changes: 2 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ Bug fixes
By `Kai Mühlbauer <https://github.com/kmuehlbauer>`_.
- Remove dask-expr from CI runs, add "pyarrow" dask dependency to windows CI runs, fix related tests (:issue:`9962`, :pull:`9971`).
By `Kai Mühlbauer <https://github.com/kmuehlbauer>`_.
- Use zarr-fixture to prevent thread leakage errors (:pull:`9967`).
By `Kai Mühlbauer <https://github.com/kmuehlbauer>`_.

Documentation
~~~~~~~~~~~~~
Expand Down
65 changes: 47 additions & 18 deletions xarray/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from distributed.client import futures_of
from distributed.utils_test import ( # noqa: F401
cleanup,
client,
cluster,
cluster_fixture,
gen_cluster,
loop,
loop_in_thread,
Expand All @@ -46,6 +48,7 @@
from xarray.tests.test_dataset import create_test_data

loop = loop # loop is an imported fixture, which flake8 has issues ack-ing
client = client # client is an imported fixture, which flake8 has issues ack-ing


@pytest.fixture
Expand Down Expand Up @@ -214,35 +217,61 @@ def test_dask_distributed_read_netcdf_integration_test(
assert_allclose(original, computed)


# fixture vendored from dask
# heads-up, this is using quite private zarr API
# https://github.com/dask/dask/blob/e04734b4d8959ba259801f2e2a490cb4ee8d891f/dask/tests/test_distributed.py#L338-L358
@pytest.fixture(scope="function")
def zarr(client):
zarr_lib = pytest.importorskip("zarr")
# Zarr-Python 3 lazily allocates a dedicated thread/IO loop
# for to execute async tasks. To avoid having this thread
# be picked up as a "leaked thread", we manually trigger it's
# creation before using zarr
try:
_ = zarr_lib.core.sync._get_loop()
_ = zarr_lib.core.sync._get_executor()
yield zarr_lib
except AttributeError:
yield zarr_lib
finally:
# Zarr-Python 3 lazily allocates a IO thread, a thread pool executor, and
# an IO loop. Here we clean up these resources to avoid leaking threads
# In normal operations, this is done as by an atexit handler when Zarr
# is shutting down.
try:
zarr_lib.core.sync.cleanup_resources()
except AttributeError:
pass


@requires_zarr
@pytest.mark.parametrize("consolidated", [True, False])
@pytest.mark.parametrize("compute", [True, False])
def test_dask_distributed_zarr_integration_test(
loop, consolidated: bool, compute: bool
client,
zarr,
consolidated: bool,
compute: bool,
) -> None:
if consolidated:
write_kwargs: dict[str, Any] = {"consolidated": True}
read_kwargs: dict[str, Any] = {"backend_kwargs": {"consolidated": True}}
else:
write_kwargs = read_kwargs = {}
chunks = {"dim1": 4, "dim2": 3, "dim3": 5}
with cluster() as (s, [a, b]):
with Client(s["address"], loop=loop):
original = create_test_data().chunk(chunks)
with create_tmp_file(
allow_cleanup_failure=ON_WINDOWS, suffix=".zarrc"
) as filename:
maybe_futures = original.to_zarr( # type: ignore[call-overload] #mypy bug?
filename, compute=compute, **write_kwargs
)
if not compute:
maybe_futures.compute()
with xr.open_dataset(
filename, chunks="auto", engine="zarr", **read_kwargs
) as restored:
assert isinstance(restored.var1.data, da.Array)
computed = restored.compute()
assert_allclose(original, computed)
original = create_test_data().chunk(chunks)
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS, suffix=".zarrc") as filename:
maybe_futures = original.to_zarr( # type: ignore[call-overload] #mypy bug?
filename, compute=compute, **write_kwargs
)
if not compute:
maybe_futures.compute()
with xr.open_dataset(
filename, chunks="auto", engine="zarr", **read_kwargs
) as restored:
assert isinstance(restored.var1.data, da.Array)
computed = restored.compute()
assert_allclose(original, computed)


@gen_cluster(client=True)
Expand Down

0 comments on commit 609412d

Please sign in to comment.