Skip to content

Python API reference

xcube_multistore.multistore.MultiSourceDataStore

Manages access to multiple data sources and their configurations for generating data cubes.

This class utilizes xcube data store plugins for data access, supports data harmonization, and enables visualization of data cube generation.

Parameters:

Name Type Description Default
config str | dict[str, Any]

Configuration settings, provided as a dictionary or a string reference to a YAML configuration file.

required
Notes

Detailed instructions on setting up the configuration can be found in the Configuration Guide.

Source code in xcube_multistore/multistore.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
class MultiSourceDataStore:
    """Manages access to multiple data sources and their configurations for generating
    data cubes.

    This class utilizes xcube data store plugins for data access, supports data
    harmonization, and enables visualization of data cube generation.

    Args:
        config: Configuration settings, provided as a dictionary or a string
            reference to a YAML configuration file.

    Notes:
        Detailed instructions on setting up the configuration can be found in the
        [Configuration Guide](https://xcube-dev.github.io/xcube-multistore/config/).
    """

    def __init__(self, config: str | dict[str, Any]):
        config = MultiSourceConfig(config)
        self.config = config
        self.stores = DataStores.setup_data_stores(config)
        if config.grid_mappings:
            self._grid_mappings = GridMappings.setup_grid_mappings(config)
        else:
            self._grid_mappings = None
        self._states = {
            identifier: GeneratorState(
                identifier=identifier, status=GeneratorStatus.waiting
            )
            for identifier, config_ds in config.datasets.items()
        }

        # preload data, which is not preloaded as default
        if config.preload_datasets is not None:
            self._preload_datasets()

        # generate data cubes
        if self.config.general["visualize"]:
            self._display = GeneratorDisplay.create(list(self._states.values()))
            self._display.display_title("Cube Generation")
            self._display.show()
        self._generate_cubes()

    @classmethod
    def get_config_schema(cls) -> JsonObjectSchema:
        """Retrieves the configuration schema for the multi-source data store.

        Returns:
            A schema object defining the expected structure of the configuration.
        """
        return MultiSourceConfig.get_schema()

    def _notify(self, event: GeneratorState):
        state = self._states[event.identifier]
        state.update(event)
        if self.config.general["visualize"]:
            self._display.update()
        else:
            if event.status == GeneratorStatus.failed:
                LOG.error("An error occurred: %s", event.exception)
            else:
                LOG.info(event.message)

    def _notify_error(self, identifier: str, exception: Any):
        self._notify(
            GeneratorState(
                identifier,
                status=GeneratorStatus.failed,
                exception=exception,
            )
        )

    def _preload_datasets(self):
        for config_preload in self.config.preload_datasets:
            store = getattr(self.stores, config_preload["store"])

            if self.config.general["force_preload"]:
                # preload all datasets again
                data_ids_preloaded = []
                data_ids = config_preload["data_ids"]
            else:
                # filter preloaded data IDs
                data_ids = []
                data_ids_preloaded = []
                for data_id_preload in config_preload["data_ids"]:
                    if all(
                        store.cache_store.has_data(data_id)
                        for data_id in self.config.preload_map[data_id_preload]
                    ):
                        data_ids_preloaded.append(data_id_preload)
                    else:
                        data_ids.append(data_id_preload)

            # setup visualization
            if self.config.general["visualize"]:
                display_preloaded = GeneratorDisplay.create(
                    [
                        GeneratorState(
                            identifier=data_id,
                            status=GeneratorStatus.stopped,
                            message="Already preloaded.",
                        )
                        for data_id in data_ids_preloaded
                    ]
                )
                display_preloaded.display_title(
                    f"Preload Datasets from store {config_preload['store']!r}"
                )
                if data_ids_preloaded:
                    display_preloaded.show()
            else:
                LOG.info(f"Preload Datasets from store {config_preload['store']!r}")
                for data_id in data_ids_preloaded:
                    LOG.info(f"Data ID {data_id!r} already preloaded.")

            if data_ids:
                preload_params = config_preload.get("preload_params", {})
                if "silent" not in preload_params:
                    preload_params["silent"] = self.config.general["visualize"]
                _ = store.preload_data(*data_ids, **preload_params)

    def _generate_cubes(self):
        for identifier, config_ds in self.config.datasets.items():
            data_id = _get_data_id(config_ds)
            if getattr(self.stores, "storage").has_data(data_id):
                self._notify(
                    GeneratorState(
                        identifier,
                        status=GeneratorStatus.stopped,
                        message=f"Dataset {identifier!r} already generated.",
                    )
                )
                continue
            self._notify(
                GeneratorState(
                    identifier,
                    status=GeneratorStatus.started,
                    message=f"Open dataset {identifier!r}.",
                )
            )
            ds = self._open_dataset(config_ds)
            if isinstance(ds, xr.Dataset):
                self._notify(
                    GeneratorState(
                        identifier,
                        message=f"Processing dataset {identifier!r}.",
                    )
                )
            else:
                self._notify_error(identifier, ds)
                continue
            ds = self._process_dataset(ds, config_ds)
            if isinstance(ds, xr.Dataset):
                self._notify(
                    GeneratorState(
                        identifier,
                        message=f"Write dataset {identifier!r}.",
                    )
                )
            else:
                self._notify_error(identifier, ds)
                continue
            ds = self._write_dataset(ds, config_ds)
            if isinstance(ds, xr.Dataset):
                self._notify(
                    GeneratorState(
                        identifier,
                        status=GeneratorStatus.stopped,
                        message=f"Dataset {identifier!r} finished.",
                    )
                )
            else:
                store = getattr(self.stores, NAME_WRITE_STORE)
                format_id = config_ds.get("format_id", "zarr")
                data_id = (
                    f"{config_ds['identifier']}.{MAP_FORMAT_ID_FILE_EXT[format_id]}"
                )
                store.has_data(data_id) and store.delete_data(data_id)
                self._notify_error(identifier, ds)

    @_safe_execute()
    def _open_dataset(self, config: dict) -> xr.Dataset | Exception:
        if "data_id" in config:
            return self._open_single_dataset(config)
        else:
            dss = []
            for config_var in config["variables"]:
                ds = self._open_single_dataset(config_var)
                if len(ds.data_vars) > 1:
                    name_dict = {
                        var: f"{config_var["identifier"]}_{var}"
                        for var in ds.data_vars.keys()
                    }
                else:
                    name_dict = {
                        var: f"{config_var["identifier"]}"
                        for var in ds.data_vars.keys()
                    }
                dss.append(ds.rename_vars(name_dict=name_dict))
            merge_params = config.get("xr_merge_params", {})
            if "join" not in merge_params:
                merge_params["join"] = "exact"
            if "combine_attrs" not in merge_params:
                merge_params["combine_attrs"] = "drop_conflicts"
            ds = xr.merge(dss, **merge_params)
        return clean_dataset(ds)

    def _open_single_dataset(self, config: dict) -> xr.Dataset | Exception:
        store = getattr(self.stores, config["store"])
        open_params = copy.deepcopy(config.get("open_params", {}))
        lat, lon = open_params.pop("point", [np.nan, np.nan])
        schema = store.get_open_data_params_schema(data_id=config["data_id"])
        if (
            ~np.isnan(lat)
            and ~np.isnan(lon)
            and "bbox" in schema.properties
            and "spatial_res" in open_params
            and "spatial_res" in schema.properties
        ):
            open_params["bbox"] = [
                lon - 2 * open_params["spatial_res"],
                lat - 2 * open_params["spatial_res"],
                lon + 2 * open_params["spatial_res"],
                lat + 2 * open_params["spatial_res"],
            ]

        if hasattr(store, "cache_store"):
            try:
                ds = store.cache_store.open_data(config["data_id"], **open_params)
            except Exception:
                ds = store.open_data(config["data_id"], **open_params)
        else:
            ds = store.open_data(config["data_id"], **open_params)

        # custom processing
        if "custom_processing" in config:
            module = importlib.import_module(config["custom_processing"]["module_path"])
            function = getattr(module, config["custom_processing"]["function_name"])
            ds = function(ds)

        return clean_dataset(ds)

    @_safe_execute()
    def _process_dataset(self, ds: xr.Dataset, config: dict) -> xr.Dataset | Exception:
        # if grid mapping is given, resample the dataset
        if "grid_mapping" in config:
            if hasattr(self._grid_mappings, config["grid_mapping"]):
                target_gm = getattr(self._grid_mappings, config["grid_mapping"])
            else:
                config_ref = self.config.datasets[config["grid_mapping"]]
                data_id = _get_data_id(config_ref)
                ds_ref = getattr(self.stores, "storage").open_data(data_id)
                target_gm = GridMapping.from_dataset(ds_ref)
                for var_name, data_array in ds.items():
                    if np.issubdtype(data_array.dtype, np.number):
                        ds[var_name] = data_array.astype(target_gm.x_coords.dtype)
            source_gm = GridMapping.from_dataset(ds)
            transformer = pyproj.Transformer.from_crs(
                target_gm.crs, source_gm.crs, always_xy=True
            )
            bbox = transformer.transform_bounds(*target_gm.xy_bbox, densify_pts=21)
            bbox = [
                bbox[0] - 2 * source_gm.x_res,
                bbox[1] - 2 * source_gm.y_res,
                bbox[2] + 2 * source_gm.x_res,
                bbox[3] + 2 * source_gm.y_res,
            ]

            ds = clip_dataset_by_geometry(ds, geometry=bbox)
            ds = resample_in_space(ds, target_gm=target_gm, encode_cf=True)
            # this is needed since resample in space returns one chunk along the time
            # axis; this part can be removed once https://github.com/xcube-dev/xcube/issues/1124
            # is resolved.
            if "time" in ds.coords:
                ds = chunk_dataset(
                    ds, dict(time=1), format_name=config.get("format_id", "zarr")
                )

        # if "point" in open_params, timeseries is requested
        open_params = config.get("open_params", {})
        if "point" in open_params:
            ds = ds.interp(
                lat=open_params["point"][0],
                lon=open_params["point"][1],
                method="linear",
            )

        return ds

    @_safe_execute()
    def _write_dataset(self, ds: xr.Dataset, config: dict) -> xr.Dataset | Exception:
        store = getattr(self.stores, NAME_WRITE_STORE)
        format_id = config.get("format_id", "zarr")
        if format_id == "netcdf":
            ds = prepare_dataset_for_netcdf(ds)
        data_id = f"{config['identifier']}.{MAP_FORMAT_ID_FILE_EXT[format_id]}"
        ds = clean_dataset(ds)
        store.write_data(ds, data_id, replace=True)
        return ds

get_config_schema() classmethod

Retrieves the configuration schema for the multi-source data store.

Returns:

Type Description
JsonObjectSchema

A schema object defining the expected structure of the configuration.

Source code in xcube_multistore/multistore.py
91
92
93
94
95
96
97
98
@classmethod
def get_config_schema(cls) -> JsonObjectSchema:
    """Retrieves the configuration schema for the multi-source data store.

    Returns:
        A schema object defining the expected structure of the configuration.
    """
    return MultiSourceConfig.get_schema()

xcube_multistore.utils.prepare_dataset_for_netcdf(ds)

Prepares an xarray Dataset for NetCDF serialization.

Converts non-serializable attributes (lists, tuples, and dictionaries) into strings to ensure compatibility with NetCDF format.

Parameters:

Name Type Description Default
ds Dataset

The input xarray Dataset.

required

Returns:

Type Description
Dataset

A dataset with updated attributes, ensuring compatibility with NetCDF.

Source code in xcube_multistore/utils.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def prepare_dataset_for_netcdf(ds: xr.Dataset) -> xr.Dataset:
    """Prepares an xarray Dataset for NetCDF serialization.

    Converts non-serializable attributes (lists, tuples, and dictionaries) into strings
    to ensure compatibility with NetCDF format.

    Args:
        ds: The input xarray Dataset.

    Returns:
        A dataset with updated attributes, ensuring compatibility with NetCDF.
    """
    attrs = ds.attrs
    for key in attrs:
        if (
            isinstance(attrs[key], list)
            or isinstance(attrs[key], tuple)
            or isinstance(attrs[key], dict)
        ):
            attrs[key] = str(attrs[key])
    ds = ds.assign_attrs(attrs)
    return ds

xcube_multistore.utils.get_utm_zone(lat, lon)

Determines the UTM (Universal Transverse Mercator) zone for given coordinates.

Computes the UTM zone based on longitude and returns the corresponding EPSG code. Northern hemisphere zones use EPSG codes in the 32600 range, while southern hemisphere zones use EPSG codes in the 32700 range.

Parameters:

Name Type Description Default
lat float

Latitude in degrees.

required
lon float

Longitude in degrees.

required

Returns:

Type Description
str

The EPSG code for the corresponding UTM zone (e.g., "epsg:32633").

Source code in xcube_multistore/utils.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def get_utm_zone(lat: float, lon: float) -> str:
    """Determines the UTM (Universal Transverse Mercator) zone for given coordinates.

    Computes the UTM zone based on longitude and returns the corresponding EPSG code.
    Northern hemisphere zones use EPSG codes in the 32600 range, while southern
    hemisphere zones use EPSG codes in the 32700 range.

    Args:
        lat: Latitude in degrees.
        lon: Longitude in degrees.

    Returns:
        The EPSG code for the corresponding UTM zone (e.g., "epsg:32633").
    """
    zone_number = int((lon + 180) / 6) + 1
    if lat >= 0:
        epsg_code = 32600 + zone_number
    else:
        epsg_code = 32700 + zone_number
    return f"epsg:{epsg_code}"

xcube_multistore.utils.get_bbox(lat, lon, cube_width, crs_final='utm')

Generates a bounding box around a specified latitude and longitude.

Given a point (latitude, longitude) and the desired width of a cube, this function computes the bounding box in the specified coordinate reference system (CRS). The bounding box is returned as a list of coordinates, and the CRS is returned as well.

Parameters:

Name Type Description Default
lat float

Latitude of the central point in degrees.

required
lon float

Longitude of the central point in degrees.

required
cube_width float

The width of the cube in units of crs_final, used to define the extent of the bounding box.

required
crs_final CRS or str

The target CRS for the bounding box. Defaults to "utm", which automatically determines the UTM zone based on the latitude and longitude.

'utm'

Returns:

Type Description
(list[int], CRS)

A list of four integers representing the bounding box in the format [west, south, east, north].

(list[int], CRS)

The final CRS used for the bounding box, returned as a pyproj.CRS object.

Source code in xcube_multistore/utils.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def get_bbox(
    lat: float, lon: float, cube_width: float, crs_final: pyproj.CRS | str = "utm"
) -> (list[int], pyproj.CRS):
    """Generates a bounding box around a specified latitude and longitude.

    Given a point (latitude, longitude) and the desired width of a cube, this function
    computes the bounding box in the specified coordinate reference system (CRS).
    The bounding box is returned as a list of coordinates, and the CRS is returned
    as well.

    Args:
        lat: Latitude of the central point in degrees.
        lon: Longitude of the central point in degrees.
        cube_width: The width of the cube in units of crs_final, used to define the
            extent of the bounding box.
        crs_final (pyproj.CRS or str, optional): The target CRS for the bounding box.
            Defaults to "utm", which automatically determines the UTM zone based on the
            latitude and longitude.

    Returns:
        A list of four integers representing the bounding box in the format
            [west, south, east, north].
        The final CRS used for the bounding box, returned as a `pyproj.CRS` object.
    """
    if crs_final == "utm":
        crs_final = get_utm_zone(lat, lon)
    if isinstance(crs_final, str):
        crs_final = pyproj.CRS.from_user_input(crs_final)

    transformer = pyproj.Transformer.from_crs(CRS_WGS84, crs_final, always_xy=True)
    x, y = transformer.transform(lon, lat)

    half_size = cube_width / 2
    bbox_final = [x - half_size, y - half_size, x + half_size, y + half_size]
    if not crs_final.is_geographic:
        bbox_final = [round(item) for item in bbox_final]
    return bbox_final, crs_final

xcube_multistore.utils.clean_dataset(ds)

Cleans an xarray Dataset by removing boundary variables and normalizing the grid mapping.

This function removes specific variables related to bounds (e.g., "x_bnds", "y_bnds", "lat_bnds", "lon_bnds", "time_bnds") and normalizes the grid mapping by adding a spatial reference coordinate called "spatial_ref" and assigning it to the relevant data variables.

Parameters:

Name Type Description Default
ds Dataset

The input xarray dataset to be cleaned.

required

Returns:

Type Description
Dataset

A cleaned version of the dataset with boundary variables removed and grid

Dataset

mapping normalized.

Source code in xcube_multistore/utils.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def clean_dataset(ds: xr.Dataset) -> xr.Dataset:
    """Cleans an xarray Dataset by removing boundary variables and normalizing the
    grid mapping.

    This function removes specific variables related to bounds (e.g., "x_bnds",
    "y_bnds", "lat_bnds", "lon_bnds", "time_bnds") and normalizes the grid mapping
    by adding a spatial reference coordinate called "spatial_ref" and assigning
    it to the relevant data variables.

    Args:
        ds: The input xarray dataset to be cleaned.

    Returns:
        A cleaned version of the dataset with boundary variables removed and grid
        mapping normalized.
    """
    check_vars = ["x_bnds", "y_bnds", "lat_bnds", "lon_bnds", "time_bnds"]
    sel_vars = []
    for var in check_vars:
        if var in ds:
            sel_vars.append(var)
    ds = ds.drop_vars(sel_vars)
    ds = _normalize_grid_mapping(ds)
    return ds