Source code for wavespectra.core.utils

"""Miscellaneous functions."""
import copy
import datetime
import numpy as np
import pandas as pd
import xarray as xr

from scipy.interpolate import griddata

from wavespectra.core.attributes import attrs


GAMMA = (
    lambda x: np.sqrt(2.0 * np.pi / x)
    * ((x / np.exp(1)) * np.sqrt(x * np.sinh(1.0 / x))) ** x
)
D2R = np.pi / 180.0
R2D = 180.0 / np.pi


[docs] def wavelen(freq, depth=None): """Wavelength L. Args: - freq (ndarray): Frequencies (Hz) for calculating L. - depth (float): Water depth, use deep water approximation by default. Returns; - L: ndarray of same shape as freq with wavelength for each frequency. """ if depth is not None: ang_freq = 2 * np.pi * freq return 2 * np.pi / wavenuma(ang_freq, depth) else: return 1.56 / freq ** 2
[docs] def wavenuma(ang_freq, water_depth): """Chen and Thomson wavenumber approximation.""" k0h = 0.10194 * ang_freq * ang_freq * water_depth D = [0, 0.6522, 0.4622, 0, 0.0864, 0.0675] a = 1.0 for i in range(1, 6): a += D[i] * k0h ** i return (k0h * (1 + 1.0 / (k0h * a)) ** 0.5) / water_depth
[docs] def celerity(freq, depth=None): """Wave celerity C. Args: - freq (ndarray): Frequencies (Hz) for calculating C. - depth (float): Water depth, use deep water approximation by default. Returns; - C: ndarray of same shape as freq with wave celerity for each frequency. """ if depth is not None: ang_freq = 2 * np.pi * freq return ang_freq / wavenuma(ang_freq, depth) else: return 1.56 / freq
[docs] def dnum_to_datetime(dnum): """Convert from numeric date to datetime.""" return datetime.datetime.fromordinal(int(dnum) - 366) + datetime.timedelta( days=dnum % 1 )
[docs] def to_nautical(ang): """Convert from cartesian to nautical angle.""" return np.mod(270 - ang, 360)
[docs] def unique_indices(ds, dim="time"): """Remove duplicate indices from dataset. Args: - ds (Dataset, DataArray): Dataset to remove duplicate indices from. - dim (str): Dimension to remove duplicate indices from. Returns: dsout (Dataset, DataArray): Dataset with duplicate indices along dim removed. """ _, index = np.unique(ds[dim], return_index=True) return ds.isel(**{dim: index})
[docs] def unique_times(ds): """Remove duplicate times from dataset.""" return unique_indices(ds, "time")
[docs] def to_datetime(np64): """Convert Datetime64 date to datatime.""" if isinstance(np64, np.datetime64): dt = pd.to_datetime(str(np64)).to_pydatetime() elif isinstance(np64, xr.DataArray): dt = pd.to_datetime(str(np64.values)).to_pydatetime() else: OSError( "Cannot convert %s into datetime, expected np.datetime64 or xr.DataArray" % type(np64) ) return dt
[docs] def spddir_to_uv(spd, direc, coming_from=False): """Converts (spd, dir) to (u, v). Args: spd (array): magnitudes to convert. direc (array): directions to convert (degree). coming_from (bool): True if directions in coming-from convention, False if in going-to. Returns: u (array): eastward wind component. v (array): northward wind component. """ ang_rot = 180 if coming_from else 0 direcR = np.deg2rad(direc + ang_rot) u = spd * np.sin(direcR) v = spd * np.cos(direcR) return u, v
[docs] def uv_to_spddir(u, v, coming_from=False): """Converts (u, v) to (spd, dir). Args: u (array): eastward wind component. v (array): northward wind component. coming_from (bool): True for output directions in coming-from convention, False for going-to. Returns: mag (array): magnitudes. direc (array): directions (degree). """ to_nautical = 270 if coming_from else 90 mag = np.sqrt(u ** 2 + v ** 2) direc = np.rad2deg(np.arctan2(v, u)) direc = (to_nautical - direc) % 360 return mag, direc
[docs] def interp_spec(inspec, infreq, indir, outfreq=None, outdir=None, method="linear"): """Interpolate onto new spectral basis. Args: inspec (2D ndarray): input spectrum E(infreq,indir) to be interpolated. infreq (1D ndarray): frequencies of input spectrum. indir (1D ndarray): directions of input spectrum. outfreq (1D ndarray): frequencies of output interpolated spectrum, same as infreq by default. outdir (1D ndarray): directions of output interpolated spectrum, same as infreq by default. method: {'linear', 'nearest', 'cubic'}, method of interpolation to use with griddata. Returns: outspec (2D ndarray): interpolated ouput spectrum E(outfreq,outdir). Note: If either outfreq or outdir is None or False this coordinate is not interpolated Choose indir=None if spectrum is 1D. TODO: Deprecate in favour of new regrid_spec function. """ outfreq = infreq if outfreq is None or outfreq is False else outfreq outdir = indir if outdir is None or outdir is False else outdir if (np.array_equal(infreq, outfreq)) & (np.array_equal(indir, outdir)): outspec = copy.deepcopy(inspec) elif np.array_equal(indir, outdir): if indir is not None: outspec = np.zeros((len(outfreq), len(outdir))) for idir in range(len(indir)): outspec[:, idir] = np.interp( outfreq, infreq, inspec[:, idir], left=0.0, right=0.0 ) else: outspec = np.interp( outfreq, infreq, np.array(inspec).ravel(), left=0.0, right=0.0 ) else: dirs = D2R * (270 - np.expand_dims(outdir, 0)) dirs2 = D2R * (270 - np.expand_dims(indir, 0)) cosmat = np.dot(np.expand_dims(outfreq, -1), np.cos(dirs)) sinmat = np.dot(np.expand_dims(outfreq, -1), np.sin(dirs)) cosmat2 = np.dot(np.expand_dims(infreq, -1), np.cos(dirs2)) sinmat2 = np.dot(np.expand_dims(infreq, -1), np.sin(dirs2)) outspec = griddata( (cosmat2.flat, sinmat2.flat), inspec.flat, (cosmat, sinmat), method, 0.0 ) return outspec
[docs] def flatten_list(l, a): """Flatten list of lists""" for i in l: if isinstance(i, list): flatten_list(i, a) else: a.append(i) return a
[docs] def regrid_spec(dset, freq=None, dir=None, maintain_m0=True): """Regrid spectra onto new spectral basis. Args: - dset (Dataset, DataArray): Spectra to interpolate. - freq (DataArray, 1darray): Frequencies of interpolated spectra (Hz). - dir (DataArray, 1darray): Directions of interpolated spectra (deg). - maintain_m0 (bool): Ensure variance is conserved in interpolated spectra. Returns: - dsi (Dataset, DataArray): Regridded spectra. Note: - All freq below lowest freq are interpolated assuming :math:`E_d(f=0)=0`. - :math:`Ed(f)` is set to zero for new freq above the highest freq in dset. - Only the 'linear' method is currently supported. - Duplicate wrapped directions (e.g., 0 and 360) are removed when regridding directions because indices must be unique to intepolate. """ dsout = dset.copy() if dir is not None: dsout = dsout.assign_coords({attrs.DIRNAME: dsout[attrs.DIRNAME] % 360}) # Remove any duplicate direction index dsout = unique_indices(dsout, attrs.DIRNAME) # Interpolate heading dsout = dsout.sortby('dir') to_concat = [dsout] # Repeat the first and last direction with 360 deg offset when required if dir.min() < dsout.dir.min(): highest = dsout.isel(dir=-1) highest['dir'] = highest.dir - 360 to_concat = [highest, dsout] if dir.max() > dsout.dir.max(): lowest = dsout.isel(dir=0) lowest['dir'] = lowest.dir + 360 to_concat.append(lowest) if len(to_concat) > 1: dsout = xr.concat(to_concat, dim='dir') # Interpolate directions dsout = dsout.interp(dir=dir, assume_sorted=True) if freq is not None: # If needed, add a new frequency at f=0 with zero energy if freq.min() < dsout.freq.min(): fzero = 0 * dsout.isel(freq=0) fzero['freq'] = 0 dsout = xr.concat([fzero, dsout], dim='freq') # Interpolate frequencies dsout = dsout.interp(freq=freq, assume_sorted=False, kwargs={'fill_value': 0}) if maintain_m0: scale = dset.spec.hs()**2 / dsout.spec.hs()**2 dsout = dsout * scale return dsout