Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High unmanaged memory using kwargs in apply_ufunc with Dask #9981

Open
abiasiol opened this issue Jan 23, 2025 · 5 comments
Open

High unmanaged memory using kwargs in apply_ufunc with Dask #9981

abiasiol opened this issue Jan 23, 2025 · 5 comments

Comments

@abiasiol
Copy link

abiasiol commented Jan 23, 2025

What is your issue?

Hi,
I have an embarrassingly parallel function that I am applying along the time dimension. It needs some extra constant data (in this case some_big_constant) that I pass through kwargs. I find that unmanaged memory keeps increasing due to the kwargs being associated to each task. The problem gets worse when I have more chunks along the time dimension.

My doubts are:

  • I would expect this computation to proceed in a streaming fashion: process one chunk, write the result to the correspondent zarr region, move on to the next chunk. Release memory for the finished task.
  • I am okay with the extra memory for some_big_constant being allocated for each task, but I am surprised by the memory not being released
  • does Dask keeps all the tasks info in memory till the end of the computation? I guess that some_big_constant gets baked in into the partial for add_one.
  • any suggestion on how to remediate the issue?

EDIT: dask-2025.1.0 distributed-2025.1.0 xarray-2025.1.1 zarr-2.18.3

Full example:

import time
import xarray as xr
import numpy as np
import dask.array as da
from dask.distributed import Client
from dask.diagnostics import ResourceProfiler

client = Client(n_workers=1, threads_per_worker=1, memory_limit="20GB")
print(client.dashboard_link)

times = np.arange(250000)
other = np.arange(300)
some_big_constant = np.ones((10000, 5000))

data = da.random.random((len(times), len(other)), chunks=(25000, 300))
da = xr.DataArray(data, coords={"time": times, "y": other}, dims=["time", "y"])


def add_one(x, b):
    time.sleep(1)  # some op using b
    return x + 1


with ResourceProfiler() as rprof:
    result = xr.apply_ufunc(
        add_one,
        da,
        dask="parallelized",
        kwargs={"b": some_big_constant},
    )
    result.to_zarr("test_zarr.zarr", mode="w")

The dask graph looks good and parallel:
Image

On the Dask dashboard, I see the unmanaged memory increasing as the computation proceeds. I see that store_map proceeds well, which is comforting.

Image

With the profiler, I see the memory increasing too. It roughly looks like there is one step up for every chunk (some chunks are probably loaded in memory at the same time).

Image

In the Dask dashboard profile, I see the zarr calls at the very end of the computation only (the tall column). I would have expected to see some calls along the computation too (like how store_map proceeds), but not overly concerned about this.

Image

@abiasiol abiasiol added the needs triage Issue that has not been reviewed by xarray team member label Jan 23, 2025
Copy link

welcome bot commented Jan 23, 2025

Thanks for opening your first issue here at xarray! Be sure to follow the issue template!
If you have an idea for a solution, we would really welcome a Pull Request with proposed changes.
See the Contributing Guide for more.
It may take us a while to respond here, but we really value your contribution. Contributors like you help make xarray better.
Thank you!

@dcherian
Copy link
Contributor

cc @phofl this looks like a regression

@phofl
Copy link
Contributor

phofl commented Jan 23, 2025

I am not quite sure what I am seeing here yet. I increased the underlying array and the memory fell back to the expected size. It's weird

@dcherian generally we would like to wrap the large array into delayed to get this down, but the partial function application blocks that. Is there a way around this?

@dcherian
Copy link
Contributor

oh I missed that some_big_constant is an array! Not sure there's an easy way around that, there's layers and layers functions underneath the top-level apply_ufunc.

@abiasiol this is a bit of an anti-pattern. can you just pass it as *args instead? I bet that triggers nicer behaviour.

@abiasiol
Copy link
Author

@dcherian Apologies, I used an array for the MRE only. In my use case, b is a Python object (containing a bunch of splines to be evaluated).

Does that change conclusions or suggested approaches? I am open to other approaches if anything comes to mind.

My use case is something along the lines of:

from typing import Callable

class MySplines:
    def __init__(self, splines: dict[int, Callable]):
        self.splines = splines

# bs is a large Python object
bs = MySplines(splines={1: lambda x: x + 1, 2: lambda x: x + 2}) # splines instead of these lambdas

data = da.random.random((len(times), len(other)), chunks=(100, 300))
da = xr.DataArray(data, coords={"time": times, "y": other}, dims=["time", "y"])


def add_one(x, b):
    xm = x.mean(axis=0)  # I do some processing on the chunk data
    temp = b.splines[1](xm) + b.splines[2](xm)  # evaluate the splines
    return x + temp

with ResourceProfiler() as rprof:
    result = xr.apply_ufunc(
        add_one,
        da,
        dask="parallelized",
        kwargs={"b": bs},
    )
    result.to_zarr("test_zarr.zarr", mode="w")

The memory growth has the same pattern as the MRE, the graph has the same pure parallel structure.

Image

Image

@TomNicholas TomNicholas added bug topic-dask topic-performance and removed needs triage Issue that has not been reviewed by xarray team member labels Jan 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants