"""Read SWAN spectra files.
Functions:
read_swan: Read Spectra from SWAN ASCII file.
read_swans: Read multiple swan files into single Dataset.
read_hotswan: Read multiple swan hotfiles into single gridded Dataset.
read_swanow: Read SWAN nowcast from fileglob, keep overlapping dates
from most recent files.
"""
import os
import glob
import datetime
import warnings
import pandas as pd
import xarray as xr
import numpy as np
from collections import OrderedDict
from sortedcontainers import SortedDict
from wavespectra.specdataset import SpecDataset
from wavespectra.core.swan import SwanSpecFile, read_tab
from wavespectra.core.attributes import attrs, set_spec_attributes
from wavespectra.core.utils import uv_to_spddir, interp_spec, flatten_list
[docs]
def read_swan(filename, dirorder=True, as_site=False):
"""Read Spectra from SWAN ASCII file.
Args:
- dirorder (bool): If True reorder spectra so that directions are
sorted.
- as_site (bool): If True locations are defined by 1D site dimension.
Returns:
- dset (SpecDataset): spectra dataset object read from file.
"""
swanfile = SwanSpecFile(filename, dirorder=dirorder)
times = swanfile.times
lons = swanfile.x
lats = swanfile.y
if len(lons) == 1:
sites = [os.path.splitext(os.path.basename(filename))[0]]
else:
sites = np.arange(len(lons)) + 1
freqs = swanfile.freqs
dirs = swanfile.dirs
tab = None
if as_site:
swanfile.is_grid = False
spec_list = [s for s in swanfile.readall()]
# Create fake time if no timestamp
times = times or [datetime.datetime.now().replace(second=0, microsecond=0)]
if swanfile.is_tab:
try:
tab = read_tab(swanfile.tabfile)
if len(swanfile.times) == tab.index.size:
if "X-wsp" in tab and "Y-wsp" in tab:
tab[attrs.WSPDNAME], tab[attrs.WDIRNAME] = uv_to_spddir(
tab["X-wsp"], tab["Y-wsp"], coming_from=True
)
else:
warnings.warn(
f"Times in {swanfile.filename} and {swanfile.tabfile} "
f"not consistent, not appending winds and depth"
)
tab = None
except Exception as exc:
warnings.warn(
f"Cannot parse depth and winds from {swanfile.tabfile}:\n{exc}"
)
if swanfile.is_grid:
lons = sorted(np.unique(lons))
lats = sorted(np.unique(lats))
arr = np.array(spec_list).reshape(
len(times), len(lons), len(lats), len(freqs), len(dirs)
)
dset = xr.DataArray(
data=np.swapaxes(arr, 1, 2),
coords=OrderedDict(
(
(attrs.TIMENAME, times),
(attrs.LATNAME, lats),
(attrs.LONNAME, lons),
(attrs.FREQNAME, freqs),
(attrs.DIRNAME, dirs),
)
),
dims=(
attrs.TIMENAME,
attrs.LATNAME,
attrs.LONNAME,
attrs.FREQNAME,
attrs.DIRNAME,
),
name=attrs.SPECNAME,
).to_dataset()
if tab is not None and attrs.WSPDNAME in tab:
dset[attrs.WSPDNAME] = xr.DataArray(
data=tab[attrs.WSPDNAME].values.reshape(-1, 1, 1),
dims=[attrs.TIMENAME, attrs.LATNAME, attrs.LONNAME],
)
dset[attrs.WDIRNAME] = xr.DataArray(
data=tab[attrs.WDIRNAME].values.reshape(-1, 1, 1),
dims=[attrs.TIMENAME, attrs.LATNAME, attrs.LONNAME],
)
if tab is not None and "dep" in tab:
dset[attrs.DEPNAME] = xr.DataArray(
data=tab["dep"].values.reshape(-1, 1, 1),
dims=[attrs.TIMENAME, attrs.LATNAME, attrs.LONNAME],
)
else:
arr = np.array(spec_list).reshape(len(times), len(sites), len(freqs), len(dirs))
dset = xr.DataArray(
data=arr,
coords=OrderedDict(
(
(attrs.TIMENAME, times),
(attrs.SITENAME, sites),
(attrs.FREQNAME, freqs),
(attrs.DIRNAME, dirs),
)
),
dims=(attrs.TIMENAME, attrs.SITENAME, attrs.FREQNAME, attrs.DIRNAME),
name=attrs.SPECNAME,
).to_dataset()
if tab is not None and attrs.WSPDNAME in tab:
dset[attrs.WSPDNAME] = xr.DataArray(
data=tab[attrs.WSPDNAME].values.reshape(-1, 1),
dims=[attrs.TIMENAME, attrs.SITENAME],
)
dset[attrs.WDIRNAME] = xr.DataArray(
data=tab[attrs.WDIRNAME].values.reshape(-1, 1),
dims=[attrs.TIMENAME, attrs.SITENAME],
)
if tab is not None and "dep" in tab:
dset[attrs.DEPNAME] = xr.DataArray(
data=tab["dep"].values.reshape(-1, 1),
dims=[attrs.TIMENAME, attrs.SITENAME],
)
dset[attrs.LATNAME] = xr.DataArray(
data=lats, coords={attrs.SITENAME: sites}, dims=[attrs.SITENAME]
)
dset[attrs.LONNAME] = xr.DataArray(
data=lons, coords={attrs.SITENAME: sites}, dims=[attrs.SITENAME]
)
set_spec_attributes(dset)
if "dir" in dset and len(dset.dir) > 1:
dset[attrs.SPECNAME].attrs.update(
{"_units": "m^{2}.s.degree^{-1}", "_variable_name": "VaDens"}
)
else:
dset[attrs.SPECNAME].attrs.update(
{"units": "m^{2}.s", "_units": "m^{2}.s", "_variable_name": "VaDens"}
)
return dset
[docs]
def read_swans(
fileglob, ndays=None, int_freq=True, int_dir=False, dirorder=True, ntimes=None
):
"""Read multiple SWAN ASCII files into single Dataset.
Args:
- fileglob (str, list): glob pattern specifying files to read.
- ndays (float): number of days to keep from each file, choose None to
keep entire period.
- int_freq (ndarray, bool): frequency array for interpolating onto:
- ndarray: 1d array specifying frequencies to interpolate onto.
- True: logarithm array is constructed such that fmin=0.0418 Hz,
fmax=0.71856 Hz, df=0.1f.
- False: No interpolation performed in frequency space.
- int_dir (ndarray, bool): direction array for interpolating onto:
- ndarray: 1d array specifying directions to interpolate onto.
- True: circular array is constructed such that dd=10 degrees.
- False: No interpolation performed in direction space.
- dirorder (bool): if True ensures directions are sorted.
- ntimes (int): use it to read only specific number of times, useful
for checking headers only.
Returns:
- dset (SpecDataset): spectra dataset object read from file with
different sites and cycles concatenated along the 'site' and 'time'
dimensions.
Note:
- If multiple cycles are provided, 'time' coordinate is replaced by
'cycletime' multi-index coordinate.
- If more than one cycle is prescribed from fileglob, each cycle must
have same number of sites.
- Either all or none of the spectra in fileglob must have tabfile
associated to provide wind/depth data.
- Concatenation is done with numpy arrays for efficiency.
"""
swans = (
sorted(fileglob) if isinstance(fileglob, list) else sorted(glob.glob(fileglob))
)
assert swans, "No SWAN file identified with fileglob %s" % (fileglob)
# Default spectral basis for interpolating
if int_freq is True:
int_freq = [0.04118 * 1.1 ** n for n in range(31)]
elif int_freq is False:
int_freq = None
if int_dir is True:
int_dir = np.arange(0, 360, 10)
elif int_dir is False:
int_dir = None
cycles = list()
dsets = SortedDict()
tabs = SortedDict()
all_times = list()
all_sites = SortedDict()
all_lons = SortedDict()
all_lats = SortedDict()
deps = SortedDict()
wspds = SortedDict()
wdirs = SortedDict()
for filename in swans:
swanfile = SwanSpecFile(filename, dirorder=dirorder)
times = swanfile.times
lons = list(swanfile.x)
lats = list(swanfile.y)
sites = (
[os.path.splitext(os.path.basename(filename))[0]]
if len(lons) == 1
else np.arange(len(lons)) + 1
)
freqs = swanfile.freqs
dirs = swanfile.dirs
if ntimes is None:
spec_list = [s for s in swanfile.readall()]
else:
spec_list = [swanfile.read() for itime in range(ntimes)]
# Read tab files for winds / depth
if swanfile.is_tab:
try:
tab = read_tab(swanfile.tabfile).rename(columns={"dep": attrs.DEPNAME})
if len(swanfile.times) == tab.index.size:
if "X-wsp" in tab and "Y-wsp" in tab:
tab[attrs.WSPDNAME], tab[attrs.WDIRNAME] = uv_to_spddir(
tab["X-wsp"], tab["Y-wsp"], coming_from=True
)
else:
warnings.warn(
f"Times in {swanfile.filename} and {swanfile.tabfile} "
f"not consistent, not appending winds and depth"
)
tab = pd.DataFrame()
tab = tab[
list(
set(tab.columns).intersection(
(attrs.DEPNAME, attrs.WSPDNAME, attrs.WDIRNAME)
)
)
]
except Exception as exc:
warnings.warn(
f"Cannot parse depth and winds from {swanfile.tabfile}:\n{exc}"
)
else:
tab = pd.DataFrame()
# Shrinking times
if ndays is not None:
tend = times[0] + datetime.timedelta(days=ndays)
if tend > times[-1]:
raise OSError(
"Times in %s does not extend for %0.2f days" % (filename, ndays)
)
iend = times.index(min(times, key=lambda d: abs(d - tend)))
times = times[0 : iend + 1]
spec_list = spec_list[0 : iend + 1]
tab = tab.loc[times[0] : tend] if tab is not None else tab
spec_list = flatten_list(spec_list, [])
# Interpolate spectra
if int_freq is not None or int_dir is not None:
spec_list = [
interp_spec(spec, freqs, dirs, int_freq, int_dir) for spec in spec_list
]
freqs = int_freq if int_freq is not None else freqs
dirs = int_dir if int_dir is not None else dirs
# Appending
try:
arr = np.array(spec_list).reshape(
len(times), len(sites), len(freqs), len(dirs)
)
cycle = times[0]
if cycle not in dsets:
dsets[cycle] = [arr]
tabs[cycle] = [tab]
all_sites[cycle] = sites
all_lons[cycle] = lons
all_lats[cycle] = lats
all_times.append(times)
nsites = 1
else:
dsets[cycle].append(arr)
tabs[cycle].append(tab)
all_sites[cycle].extend(sites)
all_lons[cycle].extend(lons)
all_lats[cycle].extend(lats)
nsites += 1
except Exception:
if len(spec_list) != arr.shape[0]:
raise OSError(
"Time length in %s (%i) does not match previous files (%i), "
"cannot concatenate",
(filename, len(spec_list), arr.shape[0]),
)
else:
raise
swanfile.close()
cycles = dsets.keys()
# Ensuring sites are consistent across cycles
sites = all_sites[cycle]
lons = all_lons[cycle]
lats = all_lats[cycle]
for site, lon, lat in zip(all_sites.values(), all_lons.values(), all_lats.values()):
if (
(list(site) != list(sites))
or (list(lon) != list(lons))
or (list(lat) != list(lats))
):
raise OSError("Inconsistent sites across cycles in glob pattern provided")
# Ensuring consistent tabs
cols = set(
[
frozenset(tabs[cycle][n].columns)
for cycle in cycles
for n in range(len(tabs[cycle]))
]
)
if len(cols) > 1:
raise OSError(
"Inconsistent tab files, ensure either all or none of the spectra have "
"associated tabfiles and columns are consistent"
)
# Concat sites
for cycle in cycles:
dsets[cycle] = np.concatenate(dsets[cycle], axis=1)
deps[cycle] = (
np.vstack([tab[attrs.DEPNAME].values for tab in tabs[cycle]]).T
if attrs.DEPNAME in tabs[cycle][0]
else None
)
wspds[cycle] = (
np.vstack([tab[attrs.WSPDNAME].values for tab in tabs[cycle]]).T
if attrs.WSPDNAME in tabs[cycle][0]
else None
)
wdirs[cycle] = (
np.vstack([tab[attrs.WDIRNAME].values for tab in tabs[cycle]]).T
if attrs.WDIRNAME in tabs[cycle][0]
else None
)
time_sizes = [dsets[cycle].shape[0] for cycle in cycles]
# Concat cycles
if len(dsets) > 1:
dsets = np.concatenate(dsets.values(), axis=0)
deps = (
np.concatenate(deps.values(), axis=0)
if attrs.DEPNAME in tabs[cycle][0]
else None
)
wspds = (
np.concatenate(wspds.values(), axis=0)
if attrs.WSPDNAME in tabs[cycle][0]
else None
)
wdirs = (
np.concatenate(wdirs.values(), axis=0)
if attrs.WDIRNAME in tabs[cycle][0]
else None
)
else:
dsets = dsets[cycle]
deps = deps[cycle] if attrs.DEPNAME in tabs[cycle][0] else None
wspds = wspds[cycle] if attrs.WSPDNAME in tabs[cycle][0] else None
wdirs = wdirs[cycle] if attrs.WDIRNAME in tabs[cycle][0] else None
# Creating dataset
times = flatten_list(all_times, [])
dsets = xr.DataArray(
data=dsets,
coords=OrderedDict(
(
(attrs.TIMENAME, times),
(attrs.SITENAME, sites),
(attrs.FREQNAME, freqs),
(attrs.DIRNAME, dirs),
)
),
dims=(attrs.TIMENAME, attrs.SITENAME, attrs.FREQNAME, attrs.DIRNAME),
name=attrs.SPECNAME,
).to_dataset()
dsets[attrs.LATNAME] = xr.DataArray(
data=lats, coords={attrs.SITENAME: sites}, dims=[attrs.SITENAME]
)
dsets[attrs.LONNAME] = xr.DataArray(
data=lons, coords={attrs.SITENAME: sites}, dims=[attrs.SITENAME]
)
if wspds is not None:
dsets[attrs.WSPDNAME] = xr.DataArray(
data=wspds,
dims=[attrs.TIMENAME, attrs.SITENAME],
coords=OrderedDict(((attrs.TIMENAME, times), (attrs.SITENAME, sites))),
)
dsets[attrs.WDIRNAME] = xr.DataArray(
data=wdirs,
dims=[attrs.TIMENAME, attrs.SITENAME],
coords=OrderedDict(((attrs.TIMENAME, times), (attrs.SITENAME, sites))),
)
if deps is not None:
dsets[attrs.DEPNAME] = xr.DataArray(
data=deps,
dims=[attrs.TIMENAME, attrs.SITENAME],
coords=OrderedDict(((attrs.TIMENAME, times), (attrs.SITENAME, sites))),
)
# Setting multi-index
if len(cycles) > 1:
dsets = dsets.rename({attrs.TIMENAME: "cycletime"})
cycletime = zip(
[
item
for sublist in [[c] * t for c, t in zip(cycles, time_sizes)]
for item in sublist
],
dsets.cycletime.values,
)
dsets["cycletime"] = pd.MultiIndex.from_tuples(
cycletime, names=[attrs.CYCLENAME, attrs.TIMENAME]
)
dsets["cycletime"].attrs = attrs.ATTRS[attrs.TIMENAME]
set_spec_attributes(dsets)
if "dir" in dsets and len(dsets.dir) > 1:
dsets[attrs.SPECNAME].attrs.update(
{"_units": "m^{2}.s.degree^{-1}", "_variable_name": "VaDens"}
)
else:
dsets[attrs.SPECNAME].attrs.update(
{"units": "m^{2}.s", "_units": "m^{2}.s", "_variable_name": "VaDens"}
)
return dsets
[docs]
def read_hotswan(fileglob, dirorder=True):
"""Read partial SWAN hotfiles into single gridded hotfile Dataset.
Args:
- fileglob (str, list): glob pattern specifying hotfiles to read and
merge.
- dirorder (bool): if True ensures directions are sorted.
Returns:
- dset (SpecDataset): spectra dataset object with different grid parts
merged.
Note:
- SWAN hotfiles from mpi runs are split by the number of cores over the
largest dim of (lat, lon) with overlapping rows or columns that are
computed in only one of the split hotfiles. Here overlappings are
merged so that those with higher values are kept which assumes
non-computed overlapping rows or columns are filled with zeros.
"""
hotfiles = (
sorted(fileglob) if isinstance(fileglob, list) else sorted(glob.glob(fileglob))
)
assert hotfiles, "No SWAN file identified with fileglob %s" % (fileglob)
dsets = [read_swan(hotfiles[0])]
for hotfile in hotfiles[1:]:
dset = read_swan(hotfile)
# Ensure we keep non-zeros in overlapping rows or columns
overlap = {
attrs.LONNAME: set(dsets[-1].lon.values).intersection(dset.lon.values),
attrs.LATNAME: set(dsets[-1].lat.values).intersection(dset.lat.values),
}
concat_dim = min(overlap, key=lambda x: len(overlap[x]))
for concat_val in overlap[concat_dim]:
slc = {concat_dim: [concat_val]}
if dsets[-1].efth.loc[slc].sum() > dset.efth.loc[slc].sum():
dset.efth.loc[slc] = dsets[-1].efth.loc[slc]
else:
dsets[-1].efth.loc[slc] = dset.efth.loc[slc]
dsets.append(dset)
dset = xr.combine_by_coords(dsets)
set_spec_attributes(dset)
if attrs.DIRNAME in dset and len(dset.dir) > 1:
dset[attrs.SPECNAME].attrs.update(
{"_units": "m^{2}.s.degree^{-1}", "_variable_name": "VaDens"}
)
else:
dset[attrs.SPECNAME].attrs.update(
{"units": "m^{2}.s", "_units": "m^{2}.s", "_variable_name": "VaDens"}
)
return dset
[docs]
def read_swanow(fileglob):
"""Read SWAN nowcast from fileglob, keep overlapping dates from most recent files.
Inefficient workaround. This should ideally be handled within read_swans by
manipulating multi-indexes
"""
swans = (
sorted(fileglob) if isinstance(fileglob, list) else sorted(glob.glob(fileglob))
)
ds = xr.Dataset()
for swan in swans:
ds = read_swan(swan).combine_first(ds)
return ds