"""Interpolate stations."""
import logging
import numpy as np
import xarray as xr
from wavespectra.core.attributes import attrs, set_spec_attributes
logger = logging.getLogger(__name__)
class Coordinates:
"""Slicing of circular coordinates.
Args:
dset (xr.Dataset): Dataset object to slice from.
lons (array): Longitudes to slice.
lats (array): Latitudes to slice.
dset_lons (array): Dataset longitudes for optimising.
dset_lats (array): Dataset latitudes for optimising.
"""
def __init__(self, dset, lons, lats, dset_lons=None, dset_lats=None):
self.dset = dset
self._lons = np.array(lons)
self.lats = np.array(lats)
if dset_lons is None:
self.dset_lons = dset[attrs.LONNAME].values
else:
self.dset_lons = dset_lons
if dset_lats is None:
self.dset_lats = dset[attrs.LATNAME].values
else:
self.dset_lats = dset_lats
self._validate()
if self._is_360(self._lons) == self._is_360(self.dset_lons):
self.consistent = True
else:
self.consistent = False
def _validate(self):
"""Few input checks."""
assert len(self._lons) == len(self.lats), "lons and lats must have same size."
if (
attrs.LONNAME in self.dset.dims
or attrs.LATNAME in self.dset.dims
or attrs.SITENAME not in self.dset.dims
):
raise NotImplementedError("sel only supports stations not gridded data.")
def _is_180(self, array):
"""True if longitudes are in -180 -- 180 convention."""
if array.min() < 0 and array.max() <= 180:
return True
return False
def _is_360(self, array):
"""True if longitudes are in 0 -- 360 convention."""
if array.min() >= 0 and array.max() <= 360:
return True
return False
def _swap_longitude_convention(self, longitudes):
"""Swap longitudes between [0 <--> 360] and [-180 <--> 180] conventions."""
if self._is_180(longitudes):
return longitudes % 360
elif self._is_360(longitudes):
longitudes[longitudes > 180] = longitudes[longitudes > 180] - 360
return longitudes
@property
def lons(self):
"""Longitudes to query, always in same convention as dataset."""
if self._is_360(self._lons) == self._is_360(self.dset_lons):
return self._lons
else:
return self._swap_longitude_convention(self._lons)
[docs]
def distance(self, lon, lat):
"""Distance between each station in (dset_lons, dset_lats) and site (lon, lat).
Args:
lon (float): Longitude to locate from lons.
lat (float): Latitude to locate from lats.
Returns:
List of distances between each station and site.
"""
dist = np.sqrt((self.dset_lons % 360 - np.array(lon) % 360) ** 2 + (self.dset_lats - np.array(lat)) ** 2)
if isinstance(dist, xr.DataArray):
dist = dist.values
return np.abs(dist)
[docs]
def nearer(self, lon, lat, tolerance=np.inf, max_sites=None):
"""Nearer stations in (dset_lons, dset_lats) to site (lon, lat).
Args:
lon (float): Longitude of of station to locate from lons.
lat (float): Latitude of of station to locate from lats.
tolerance (float): Maximum distance for scanning neighbours.
max_sites (int): Maximum number of neighbours.
Returns:
Indices and distances of up to `max_sites` neighbour stations not farther from
`tolerance`, ordered from closer to farthest station.
"""
dist = self.distance(lon, lat)
closest_ids = np.argsort(dist)
closest_dist = dist[closest_ids]
keep_ids = closest_ids[closest_dist <= tolerance][:max_sites]
return keep_ids, dist[keep_ids]
[docs]
def nearest(self, lon, lat):
"""Nearest station in (dset_lons, dset_lats) to site (lon, lat).
Args:
lon (float): Longitude to locate from lons.
lat (float): Latitude to locate from lats.
Returns:
Index and distance of closest station.
"""
dist = self.distance(lon, lat)
closest_id = dist.argmin()
closest_dist = dist[closest_id]
return closest_id, closest_dist
[docs]
def sel_nearest(
dset,
lons,
lats,
tolerance=2.0,
unique=False,
exact=False,
dset_lons=None,
dset_lats=None,
missing="raise",
):
"""Select sites from nearest distance.
Args:
dset (Dataset): Stations SpecDataset to select from.
lons (array): Longitude of sites to interpolate spectra at.
lats (array): Latitude of sites to interpolate spectra at.
tolerance (float): Maximum distance to use site for interpolation.
unique (bool): Only returns unique sites in case of repeated inexact matches.
exact (bool): Require exact matches.
dset_lons (array): Longitude of stations in dset.
dset_lats (array): Latitude of stations in dset.
missing (str): Action to take if no site is found within tolerance:
- 'raise': raise an error
- 'ignore': skip site
Returns:
Selected SpecDataset at locations defined by (lons, lats).
Note:
Args `dset_lons`, `dset_lats` are not required but can improve performance when
`dset` is chunked with site=1 (expensive to access station coordinates) and
improve precision if projected coordinates are provided at high latitudes.
"""
coords = Coordinates(dset, lons=lons, lats=lats, dset_lons=dset_lons, dset_lats=dset_lats)
station_ids = []
for lon, lat in zip(coords.lons, coords.lats):
closest_id, closest_dist = coords.nearest(lon, lat)
if closest_dist > tolerance:
if missing == "raise":
raise AssertionError(
f"Nearest site in dataset from ({lon, lat}) is {closest_dist:g} "
f"deg away but tolerance is {tolerance:g} deg"
)
elif missing == "ignore":
logger.debug(
f"No site in dataset within tolerance={tolerance} deg "
f"from requested location ({lon, lat}), skipping"
)
continue
if exact and closest_dist > 0:
raise AssertionError(
f"Exact match required but no site in dataset at ({lon, lat}), "
f"nearest site is {closest_dist} deg away."
)
if unique and closest_id in station_ids:
logger.debug(
f"Nearest site at ({lon, lat}) is repeated, skipping "
f"because unique=True"
)
continue
station_ids.append(closest_id)
if not station_ids:
raise ValueError(
f"No site in dataset found within tolerance={tolerance} deg of any site "
f"{list(zip(coords.lons, coords.lats))}"
)
dsout = dset.isel(**{attrs.SITENAME: station_ids})
# Return longitudes in the convention provided
if coords.consistent is False:
dsout.lon.values = coords._swap_longitude_convention(dsout.lon.values)
dsout = dsout.assign_coords({attrs.SITENAME: np.arange(len(station_ids))})
return dsout
[docs]
def sel_idw(
dset, lons, lats, tolerance=2.0, max_sites=4, dset_lons=None, dset_lats=None
):
"""Select sites from inverse distance weighting.
Args:
dset (Dataset): Stations SpecDataset to interpolate from.
lons (array): Longitude of sites to interpolate spectra at.
lats (array): Latitude of sites to interpolate spectra at.
tolerance (float): Maximum distance to use site for interpolation.
max_sites (int): Maximum number of neighbour sites to use for interpolation.
dset_lons (array): Longitude of stations in dset.
dset_lats (array): Latitude of stations in dset.
Returns:
Selected SpecDataset at locations defined by (lons, lats).
Note:
Args `dset_lons`, `dset_lats` are not required but can improve performance when
`dset` is chunked with site=1 (expensive to access station coordinates) and
improve precision if projected coordinates are provided at high latitudes.
"""
coords = Coordinates(dset, lons=lons, lats=lats, dset_lons=dset_lons, dset_lats=dset_lats)
mask = dset.isel(site=0, drop=True) * np.nan
dsout = []
for lon, lat in zip(coords.lons, coords.lats):
closest_ids, closest_dist = coords.nearer(lon, lat, tolerance, max_sites)
if len(closest_ids) == 0:
logger.debug(
f"No stations within {tolerance} deg of site (lat={lat}, lon={lon}), "
"this site will be masked."
)
# Collect ids and factors of neighbours
indices = []
factors = []
for ind, dist in zip(closest_ids, closest_dist):
indices.append(ind)
if dist == 0:
factors.append(1.0)
break
factors.append(1.0 / dist)
# Mask it if not enough neighbours
if len(indices) == 0 or (len(indices) == 1 and dist > 0):
dsout.append(mask)
else:
# Inverse distance weighting
sumfac = float(1.0 / sum(factors))
ind = indices.pop(0)
fac = factors.pop(0)
weighted = float(fac) * dset.isel(site=ind, drop=True)
for ind, fac in zip(indices, factors):
weighted += float(fac) * dset.isel(site=ind, drop=True)
if len(indices) > 0:
weighted *= sumfac
dsout.append(weighted)
# Concat sites into dataset
dsout = xr.concat(dsout, dim=attrs.SITENAME)
for dvar in dsout.data_vars:
if set(dsout[dvar].dims) == set(dset[dvar].dims):
dsout[dvar] = dsout[dvar].transpose(*dset[dvar].dims)
# Redefining coordinates and variables
dsout[attrs.SITENAME] = np.arange(len(coords.lons))
dsout[attrs.LONNAME] = ((attrs.SITENAME), coords.lons)
dsout[attrs.LATNAME] = ((attrs.SITENAME), coords.lats)
# Return longitudes in the convention provided
if coords.consistent is False:
dsout.lon.values = coords._swap_longitude_convention(dsout.lon.values)
dsout.attrs = dset.attrs
set_spec_attributes(dsout)
return dsout
[docs]
def sel_bbox(dset, lons, lats, tolerance=0.0, dset_lons=None, dset_lats=None):
"""Select sites within bbox.
Args:
dset (Dataset): Stations SpecDataset to select from.
lons (array): Longitude of sites to interpolate spectra at.
lats (array): Latitude of sites to interpolate spectra at.
tolerance (float): Extend bbox extents by.
dset_lons (array): Longitude of stations in dset.
dset_lats (array): Latitude of stations in dset.
Returns:
Selected SpecDataset within bbox defined by:
lower-left=[min(lons), min(lats)], upper-right=[max(lons), max(lats)].
Note:
Args `dset_lons`, `dset_lats` are not required but can improve performance when
`dset` is chunked with site=1 (expensive to access station coordinates) and
improve precision if projected coordinates are provided at high latitudes.
"""
coords = Coordinates(dset, lons=lons, lats=lats, dset_lons=dset_lons, dset_lats=dset_lats)
minlon = min(coords.lons) - tolerance
minlat = min(coords.lats) - tolerance
maxlon = max(coords.lons) + tolerance
maxlat = max(coords.lats) + tolerance
if not (coords._is_360(coords.dset_lons) and not coords.consistent):
station_ids = np.where(
(coords.dset_lons >= minlon)
& (coords.dset_lats >= minlat)
& (coords.dset_lons <= maxlon)
& (coords.dset_lats <= maxlat)
)[0]
else:
station_ids = np.where(
(coords.dset_lons >= maxlon)
& (coords.dset_lats >= minlat)
& (coords.dset_lons <= 360)
& (coords.dset_lats <= maxlat)
)[0]
station_ids = np.append(
station_ids,
np.where(
(coords.dset_lons >= 0)
& (coords.dset_lats >= minlat)
& (coords.dset_lons <= minlon)
& (coords.dset_lats <= maxlat)
)[0]
)
if station_ids.size == 0:
raise ValueError(
"No site found within bbox defined by "
f"([{min(coords._lons) - tolerance}, {minlat}], "
f"[{max(coords._lons) + tolerance}, {maxlat}])"
)
dsout = dset.isel(**{attrs.SITENAME: station_ids})
# Return longitudes in the convention provided
if coords.consistent is False:
dsout.lon.values = coords._swap_longitude_convention(dsout.lon.values)
dsout = dsout.assign_coords({attrs.SITENAME: np.arange(len(station_ids))})
return dsout