Source code for wavespectra.core.watershed

"""Watershed partitioning."""
import numpy as np
import xarray as xr

from wavespectra.core.attributes import attrs
from wavespectra.specpart import specpart
from wavespectra.core.utils import D2R, R2D, celerity
from wavespectra.core.npstats import hs


[docs] def nppart(spectrum, freq, dir, wspd, wdir, dpt, swells=3, agefac=1.7, wscut=0.3333): """Watershed partition on a numpy array. Args: - spectrum (2darray): Wave spectrum array with shape (nf, nd). - freq (1darray): Wave frequency array with shape (nf). - dir (1darray): Wave direction array with shape (nd). - wspd (float): Wind speed. - wdir (float): Wind direction. - dpt (float): Water depth. - swells (int): Number of swell partitions to compute. - agefac (float): Age factor. - wscut (float): Wind speed cutoff. Returns: - specpart (3darray): Wave spectrum partitions with shape (np, nf, nd). """ part_array = specpart.partition(spectrum) Up = agefac * wspd * np.cos(D2R * (dir - wdir)) windbool = np.tile(Up, (freq.size, 1)) > np.tile( celerity(freq, dpt)[:, np.newaxis], (1, dir.size) ) ipeak = 1 # values from specpart.partition start at 1 part_array_max = part_array.max() # partitions_hs_swell = np.zeros(part_array_max + 1) # zero is used for sea partitions_hs_swell = np.zeros(part_array_max + 1) # zero is used for sea while ipeak <= part_array_max: part_spec = np.where(part_array == ipeak, spectrum, 0.0) # Assign new partition if multiple valleys and satisfying conditions __, imin = inflection(part_spec, freq, dfres=0.01, fmin=0.05) if len(imin) > 0: part_spec_new = part_spec.copy() part_spec_new[imin[0].squeeze() :, :] = 0 newpart = part_spec_new > 0 if newpart.sum() > 20: part_spec[newpart] = 0 part_array_max += 1 part_array[newpart] = part_array_max partitions_hs_swell = np.append(partitions_hs_swell, 0) # Assign sea partition W = part_spec[windbool].sum() / part_spec.sum() if W > wscut: part_array[part_array == ipeak] = 0 else: partitions_hs_swell[ipeak] = hs(part_spec, freq, dir) ipeak += 1 sorted_swells = np.flipud(partitions_hs_swell[1:].argsort() + 1) parts = np.concatenate(([0], sorted_swells[:swells])) all_parts = [] for part in parts: all_parts.append(np.where(part_array == part, spectrum, 0.0)) # Extend partitions list if it is less than swells if len(all_parts) < swells + 1: nullspec = 0 * spectrum nmiss = (swells + 1) - len(all_parts) for i in range(nmiss): all_parts.append(nullspec) return np.array(all_parts)
[docs] def partition( dset, wspd="wspd", wdir="wdir", dpt="dpt", swells=3, agefac=1.7, wscut=0.3333, ): """Watershed partitioning. Args: - dset (xr.DataArray, xr.Dataset): Spectra array or dataset in wavespectra convention. - wspd (xr.DataArray, str): Wind speed DataArray or variable name in dset. - wdir (xr.DataArray, str): Wind direction DataArray or variable name in dset. - dpt (xr.DataArray, str): Depth DataArray or the variable name in dset. - swells (int): Number of swell partitions to compute. - agefac (float): Age factor. - wscut (float): Wind speed cutoff. Returns: - dspart (xr.Dataset): Partitioned spectra dataset with extra dimension. References: - Hanson, Jeffrey L., et al. "Pacific hindcast performance of three numerical wave models." JTECH 26.8 (2009): 1614-1633. """ # Sort out inputs if isinstance(wspd, str): wspd = dset[wspd] if isinstance(wdir, str): wdir = dset[wdir] if isinstance(dpt, str): dpt = dset[dpt] if isinstance(dset, xr.Dataset): dset = dset[attrs.SPECNAME] # Partitioning full spectra dsout = xr.apply_ufunc( nppart, dset, dset.freq, dset.dir, wspd, wdir, dpt, swells, agefac, wscut, input_core_dims=[["freq", "dir"], ["freq"], ["dir"], [], [], [], [], [], []], output_core_dims=[["part", "freq", "dir"]], vectorize=True, dask="parallelized", output_dtypes=["float32"], dask_gufunc_kwargs={ "output_sizes": { "part": swells + 1, }, }, ) # Finalise output dsout.name = "efth" dsout["part"] = np.arange(swells + 1) dsout.part.attrs = {"standard_name": "spectral_partition_number", "units": ""} return dsout.transpose("part", ...)
[docs] def frequency_resolution(freq): """Frequency resolution array. Args: - freq (1darray): Frequencies to calculate resolutions from with shape (nf). Returns: - df (1darray): Resolution for pairs of adjacent frequencies with shape (nf-1). """ if len(freq) > 1: return abs(freq[1:] - freq[:-1]) else: return np.array((1.0,))
[docs] def inflection(spectrum, freq, dfres=0.01, fmin=0.05): """Points of inflection in smoothed frequency spectra. Args: - fdspec (ndarray): freq-dir 2D specarray. - dfres (float): used to determine length of smoothing window. - fmin (float): minimum frequency for looking for minima/maxima. """ if len(freq) > 1: df = frequency_resolution(freq) sf = spectrum.sum(axis=1) nsmooth = int(dfres / df[0]) # Window size if nsmooth > 1: sf = np.convolve(sf, np.hamming(nsmooth), "same") # Smoothed f-spectrum sf[(sf < 0) | (freq < fmin)] = 0 diff = np.diff(sf) imax = np.argwhere(np.diff(np.sign(diff)) == -2) + 1 imin = np.argwhere(np.diff(np.sign(diff)) == 2) + 1 else: imax = 0 imin = 0 return imax, imin