Source code for climpred.preprocessing.mpi
import glob as _glob
import os as _os
[docs]def get_path(
dir_base_experiment: str = "/work/bm1124/m300086/CMIP6/experiments",
member: int = 1,
init: int = 1960,
model: str = "hamocc",
output_stream: str = "monitoring_ym",
timestr: str = "*1231",
ending: str = "nc",
) -> str:
"""Get the path of a file for MPI-ESM standard output file names and directory.
Args:
dir_base_experiment (str): Path of experiments folder. Defaults to
"/work/bm1124/m300086/CMIP6/experiments".
member (int): member label. Defaults to 1.
init (int): initialization label. Typically year. Defaults to 1960.
model (str): submodel name. Defaults to "hamocc".
Allowed: ['echam6', 'jsbach', 'mpiom', 'hamocc'].
output_stream (str): output_stream name. Defaults to "monitoring_ym".
Allowed: ['data_2d_mm', 'data_3d_ym', 'BOT_mm', ...]
timestr (str): timestr likely including *. Defaults to "*1231".
ending (str): ending indicating file format. Defaults to "nc".
Allowed: ['nc', 'grb'].
Returns:
str: path of requested file(s)
"""
# get experiment_id
dirs = _os.listdir(dir_base_experiment)
experiment_id = [
x for x in dirs if (f"{init}" in x and "r" + str(member) + "i" in x)
]
assert len(experiment_id) == 1
experiment_id = experiment_id[0] # type: ignore
dir_outdata = f"{dir_base_experiment}/{experiment_id}/outdata/{model}"
path = f"{dir_outdata}/{experiment_id}_{model}_{output_stream}_{timestr}.{ending}"
if _os.path.exists(_glob.glob(path)[0]):
return path
else:
raise ValueError(f"Path not found or no access: {path}")