Rechunker Tutorial

This tutorial notebook explains how to use rechunker with real datasets. We will also use xarray to make some things easier and prettier, but we note that xarray is not a dependency for rechunker.

Toy Example

Create Example Data

Here we load one of xarray’s tutorial datasets and write it to Zarr. This is not actually a big dataset, so rechunker is not really needed here. But it’s a convenient example.

[ ]:
import xarray as xr

xr.set_options(display_style="text")
import zarr
import dask.array as dsa

ds = xr.tutorial.open_dataset("air_temperature")
# create initial chunk structure
ds = ds.chunk({"time": 100})
ds.air.encoding = {}  # helps when writing to zarr
ds

We can examine the chunk structure of the data variable using Dask’s pretty Array repr.

[2]:
ds.air.data
[2]:
Array Chunk
Bytes 14.76 MiB 517.58 kiB
Shape (2920, 25, 53) (100, 25, 53)
Dask graph 30 chunks in 2 graph layers
Data type float32 numpy.ndarray
53 25 2920
[3]:
! rm -rf *.zarr # clean up any existing temporary data
ds.to_zarr("air_temperature.zarr")
[3]:
<xarray.backends.zarr.ZarrStore at 0x7f98615f7740>

Now we open up a Zarr Group and Array that we will use as inputs to rechunker.

[4]:
source_group = zarr.open("air_temperature.zarr")
print(source_group.tree())
/
 ├── air (2920, 25, 53) float32
 ├── lat (25,) float32
 ├── lon (53,) float32
 └── time (2920,) float32
[5]:
source_array = source_group["air"]
source_array.info
[5]:
Name/air
Typezarr.core.Array
Data typefloat32
Shape(2920, 25, 53)
Chunk shape(100, 25, 53)
OrderC
Read-onlyFalse
CompressorBlosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)
Store typezarr.storage.DirectoryStore
No. bytes15476000 (14.8M)
No. bytes stored9005544 (8.6M)
Storage ratio1.7
Chunks initialized30/30

Rechunk a single Array

The original array has chunks of (100, 25, 53). Let’s rechunk it to be contiguous in time, but chunked in space. We specify a small value of max_mem in order to force rechunker to create an intermediate dataset. We also have to specify a place to store the final and intermediate data.

We use the rechunk function, which returns a Rechunked object.

[6]:
from rechunker import rechunk

target_chunks = (2920, 25, 1)
max_mem = "1MB"

target_store = "air_rechunked.zarr"
temp_store = "air_rechunked-tmp.zarr"

array_plan = rechunk(
    source_array, target_chunks, max_mem, target_store, temp_store=temp_store
)
array_plan
[6]:

Rechunked

Source

<zarr.core.Array (2920, 25, 53) float32>

Intermediate

<zarr.core.Array (2920, 25, 53) float32>

Target

<zarr.core.Array (2920, 25, 53) float32>

Since this array has dimensions, we can also specify the chunks using a dictionary syntax.

[7]:
target_chunks_dict = {"time": 2920, "lat": 25, "lon": 1}

# need to remove the existing stores or it won't work
!rm -rf air_rechunked.zarr air_rechunked-tmp.zarr
array_plan = rechunk(
    source_array, target_chunks_dict, max_mem, target_store, temp_store=temp_store
)
array_plan
[7]:

Rechunked

Source

<zarr.core.Array (2920, 25, 53) float32>

Intermediate

<zarr.core.Array (2920, 25, 53) float32>

Target

<zarr.core.Array (2920, 25, 53) float32>

The array_plan is a Rechunked object. It has not actually performed the rechunking yet. To do this, we need to call the execute method. This will use Dask to perform the rechunking.

[8]:
result = array_plan.execute()
result.chunks
[8]:
(2920, 25, 1)

By default, Dask will use the multi-threaded scheduler. Since rechunking can take a long time, we might want to use a progress bar.

[9]:
from dask.diagnostics import ProgressBar

with ProgressBar():
    array_plan.execute()
[########################################] | 100% Completed | 710.85 ms

If we create a distributed cluster, then rechunker will use that when it executes.

[10]:
from dask.distributed import Client, LocalCluster, progress

cluster = LocalCluster()
client = Client(cluster)

Now that it is written to disk, we can open the rechunked array however we please. Using Zarr…

[11]:
target_array = zarr.open("air_rechunked.zarr")
target_array
[11]:
<zarr.core.Array (2920, 25, 53) float32>

…or Dask

[12]:
target_array_dask = dsa.from_zarr("air_rechunked.zarr")
target_array_dask
[12]:
Array Chunk
Bytes 14.76 MiB 285.16 kiB
Shape (2920, 25, 53) (2920, 25, 1)
Dask graph 53 chunks in 2 graph layers
Data type float32 numpy.ndarray
53 25 2920

Rechunk a Group

In the example above, we only rechunked a single array. We can open it with Dask, but not Xarray, because it doesn’t contain any coordinates or metadata.

Rechunker also supports rechunking entire groups. In this case, target_chunks must be a dictionary.

[13]:
target_chunks = {
    "air": {"time": 2920, "lat": 25, "lon": 1},
    "time": None,  # don't rechunk this array
    "lon": None,
    "lat": None,
}
max_mem = "1MB"

target_store = "group_rechunked.zarr"
temp_store = "group_rechunked-tmp.zarr"

# need to remove the existing stores or it won't work
!rm -rf group_rechunked.zarr group_rechunked-tmp.zarr
array_plan = rechunk(
    source_group, target_chunks, max_mem, target_store, temp_store=temp_store
)
array_plan
[13]:

Rechunked

Source

<zarr.hierarchy.Group '/'>

Intermediate

<zarr.hierarchy.Group '/'>

Target

<zarr.hierarchy.Group '/'>

[14]:
array_plan.execute()
/home/filipe/micromamba/envs/TEST/lib/python3.8/site-packages/dask/base.py:1369: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
  warnings.warn(
[14]:
<zarr.hierarchy.Group '/'>

Now that we have written a group, we can open it back up with Xarray.

[15]:
xr.open_zarr("group_rechunked.zarr")
/tmp/ipykernel_63292/4235005900.py:1: RuntimeWarning: Failed to open Zarr store with consolidated metadata, but successfully read with non-consolidated metadata. This is typically much slower for opening a dataset. To silence this warning, consider:
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  xr.open_zarr('group_rechunked.zarr')
[15]:
<xarray.Dataset>
Dimensions:  (time: 2920, lat: 25, lon: 53)
Coordinates:
  * lat      (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0
  * lon      (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0
  * time     (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
    air      (time, lat, lon) float32 dask.array<chunksize=(2920, 25, 1), meta=np.ndarray>
Attributes:
    Conventions:  COARDS
    description:  Data is from NMC initialized reanalysis\n(4x/day).  These a...
    platform:     Model
    references:   http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
    title:        4x daily NMC reanalysis (1948)

Often groups have many variables sharing all or a subset of dimensions. In the common case that a given dimension should have equivalent chunks in each variable that contains it, chunks can be provided as a simpler dictionary, mapping dimension names to chunksize.

[16]:
# extend the dataset with some more variables
ds_complex = ds
ds_complex["air_slice"] = ds.air.isel(lat=10)
ds_complex["air_timeseries"] = ds.air.isel(lat=10, lon=10)
ds_complex

target_chunks = {"time": 2920, "lat": 25, "lon": 1}
max_mem = "1MB"

target_store = "group_complex_rechunked.zarr"
temp_store = "group_complex_rechunked-tmp.zarr"

# need to remove the existing stores or it won't work
!rm -rf group_complex_rechunked.zarr group_complex_rechunked-tmp.zarr

# rechunk directly from dataset this time
array_plan = rechunk(
    ds_complex, target_chunks, max_mem, target_store, temp_store=temp_store
)
array_plan
[16]:

Rechunked

Source
<xarray.Dataset>
Dimensions:         (lat: 25, time: 2920, lon: 53)
Coordinates:
  * lat             (lat) float32 75.0 72.5 70.0 67.5 ... 22.5 20.0 17.5 15.0
  * lon             (lon) float32 200.0 202.5 205.0 207.5 ... 325.0 327.5 330.0
  * time            (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
    air             (time, lat, lon) float32 dask.array<chunksize=(100, 25, 53), meta=np.ndarray>
    air_slice       (time, lon) float32 dask.array<chunksize=(100, 53), meta=np.ndarray>
    air_timeseries  (time) float32 dask.array<chunksize=(100,), meta=np.ndarray>
Attributes:
    Conventions:  COARDS
    title:        4x daily NMC reanalysis (1948)
    description:  Data is from NMC initialized reanalysis\n(4x/day).  These a...
    platform:     Model
    references:   http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
Intermediate

<zarr.hierarchy.Group '/'>

Target

<zarr.hierarchy.Group '/'>

[17]:
array_plan.execute()
/home/filipe/micromamba/envs/TEST/lib/python3.8/site-packages/dask/base.py:1369: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
  warnings.warn(
[17]:
<zarr.hierarchy.Group '/'>
[18]:
xr.open_zarr("group_complex_rechunked.zarr")
/tmp/ipykernel_63292/3867248564.py:1: RuntimeWarning: Failed to open Zarr store with consolidated metadata, but successfully read with non-consolidated metadata. This is typically much slower for opening a dataset. To silence this warning, consider:
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  xr.open_zarr('group_complex_rechunked.zarr')
[18]:
<xarray.Dataset>
Dimensions:         (time: 2920, lat: 25, lon: 53)
Coordinates:
  * lat             (lat) float32 75.0 72.5 70.0 67.5 ... 22.5 20.0 17.5 15.0
  * lon             (lon) float32 200.0 202.5 205.0 207.5 ... 325.0 327.5 330.0
  * time            (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
    air             (time, lat, lon) float32 dask.array<chunksize=(2920, 25, 1), meta=np.ndarray>
    air_slice       (time, lon) float32 dask.array<chunksize=(2920, 1), meta=np.ndarray>
    air_timeseries  (time) float32 dask.array<chunksize=(2920,), meta=np.ndarray>
Attributes:
    Conventions:  COARDS
    description:  Data is from NMC initialized reanalysis\n(4x/day).  These a...
    platform:     Model
    references:   http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
    title:        4x daily NMC reanalysis (1948)

Note that all the variables now have the same time chunks. Other dimensions (if they exist) also have consistent chunks.

Cloud Example

In this example we use real data from Pangeo’s Cloud Data Catalog. This dataset is stored in Google Cloud Storage. We also use a Dask Gateway distributed cluster to scale up our processing. This part of the tutorial won’t work for you unless you are in a Pangeo Cloud environment or binder.

[ ]:
from dask_gateway import GatewayCluster

cluster = GatewayCluster()
cluster.scale(20)
cluster
[ ]:
from dask.distributed import Client

client = Client(cluster)
client
[ ]:
import gcsfs

# a zarr group lives here
url = "gs://pangeo-cmems-duacs"
gcs = gcsfs.GCSFileSystem(requester_pays=True)
source_store = gcs.get_mapper(url)

Open Zarr Array

[ ]:
group = zarr.open_consolidated(source_store, mode="r")
source_array = group["sla"]
source_array
[ ]:
source_array.chunks

Make a Rechunking Plan

[ ]:
max_mem = "1GB"
target_chunks = (8901, 72, 72)
# you must have write access to this location
store_tmp = gcs.get_mapper("pangeo-scratch/rabernat/rechunker_demo/temp.zarr")
store_target = gcs.get_mapper("pangeo-scratch/rabernat/rechunker_demo/target.zarr")
r = rechunk(source_array, target_chunks, max_mem, store_target, temp_store=store_tmp)
r

Execute the Plan

[ ]:
result = r.execute()
result
[ ]:
dsa.from_zarr(result)