Source code for wavespectra.input.triaxys

import glob
import copy
import datetime
from pathlib import Path
from collections import OrderedDict
from dateutil import parser
import numpy as np
import xarray as xr

from wavespectra.specdataset import SpecDataset
from wavespectra.core.attributes import attrs, set_spec_attributes
from wavespectra.core.utils import interp_spec


[docs] def read_triaxys(filename_or_fileglob, toff=0, magnetic_variation=None, regrid_dir=True): """Read spectra from TRIAXYS wave buoy ASCII files. Args: - filename_or_fileglob (str): filename or fileglob specifying one or more files to read. - toff (float): time-zone offset from UTC in hours. - magnetic_variation (float): The angle between the magnetic and geographic meridians to correct from magnetic north to true north. Positive and negative values indicate East and West declination respectively. - regrid_dir (bool): Regrid directions after correcting for the magnetic variation so the direction coordinate remains the same. Returns: - dset (SpecDataset): spectra dataset object read from Triaxys file. Note: - frequencies and directions from first file are used as reference to interpolate spectra from other files in case they differ. - Duplicate dir coordinates (e.g., 0 and 360) are removed when correcting for the magnetic variation since indices must be unique to regrid. """ txys = Triaxys(filename_or_fileglob, toff) txys.run() dset = txys.dset if magnetic_variation is not None and dset.spec.dir is not None: # Rotate dirs = dset.dir dset = dset.assign_coords({"dir": dirs + magnetic_variation}) # Regrid if regrid_dir: dset = dset.spec.interp(dir=dirs) return dset
class Triaxys(object): def __init__(self, filename_or_fileglob, toff=0): """Read wave spectra file from TRIAXYS buoy. Args: - filename_or_fileglob (str, list): filename or fileglob specifying files to read. - toff (float): time offset in hours to account for time zone differences. Returns: - dset (SpecDataset) wavespectra SpecDataset instance. Remark: - frequencies and directions from first file are used as reference to interpolate spectra from other files in case they differ. """ self._filename_or_fileglob = filename_or_fileglob self.toff = toff self.stream = None self.is_dir = None self.time_list = [] self.spec_list = [] self.header_keys = [ "is_triaxys", "is_dir", "time", "nf", "f0", "df", "fmin", "fmax", "ddir", ] def run(self): for ind, self.filename in enumerate(self.filenames): self.open() self.read_header() if ind == 0: self.interp_freq = copy.deepcopy(self.freqs) self.interp_dir = copy.deepcopy(self.dirs) self.read_data() self.close() self.construct_dataset() def open(self): self.stream = open(self.filename, "r") def close(self): if self.stream and not self.stream.closed: self.stream.close() def read_header(self): self.header = {k: None for k in self.header_keys} while True: line = self.stream.readline() if "TRIAXYS BUOY DATA REPORT" in line or "TRIAXYS BUOY REPORT" in line: self.header.update(is_triaxys=True) if " DIRECTIONAL SPECTRUM" in line: self.header.update(is_dir=True) if "NON-DIRECTIONAL SPECTRUM" in line: self.header.update(is_dir=False) if "DATE" in line: time = parser.parse( line.split("=")[1].split("(")[0].strip() ) - datetime.timedelta(hours=self.toff) self.header.update(time=time) if "NUMBER OF FREQUENCIES" in line: self.header.update(nf=int(line.split("=")[1])) if "INITIAL FREQUENCY" in line: self.header.update(f0=float(line.split("=")[1])) if "FREQUENCY SPACING" in line: self.header.update(df=float(line.split("=")[1])) if "RESOLVABLE FREQUENCY RANGE" in line: fmin, fmax = list(map(float, line.split("=")[1].split("TO"))) self.header.update(fmin=fmin, fmax=fmax) if "DIRECTION SPACING" in line: self.header.update(ddir=float(line.split("=")[1])) if "ROWS" in line or "COLUMN 2" in line or not line: break if not self.header.get("is_triaxys"): raise OSError("Not a TRIAXYS Spectra file.") if not self.header.get("time"): raise OSError("Cannot parse time") if self.is_dir is not None and self.is_dir != self.header.get("is_dir"): raise OSError("Cannot merge spectra 2D and spectra 1D") self.is_dir = self.header.get("is_dir") def _append_spectrum(self): """Append spectra after ensuring same spectral basis.""" self.spec_list.append( interp_spec( inspec=self.spec_data, infreq=self.freqs, indir=self.dirs, outfreq=self.interp_freq, outdir=self.interp_dir, ) ) self.time_list.append(self.header.get("time")) def read_data(self): try: self.spec_data = np.zeros((len(self.freqs), len(self.dirs))) for i in range(self.header.get("nf")): row = list(map(float, self.stream.readline().replace(",", " ").split())) if self.header.get("is_dir"): self.spec_data[i, :] = row else: self.spec_data[i, :] = row[-1] self._append_spectrum() except ValueError as err: raise ValueError(f"Cannot read {self.filename}:\n{err}") def construct_dataset(self): self.dset = xr.DataArray( data=self.spec_list, coords=self.coords, dims=self.dims, name=attrs.SPECNAME ).to_dataset() set_spec_attributes(self.dset) if not self.is_dir: self.dset = self.dset.isel(drop=True, **{attrs.DIRNAME: 0}) self.dset[attrs.SPECNAME].attrs.update(units="m2 s") @property def dims(self): return (attrs.TIMENAME, attrs.FREQNAME, attrs.DIRNAME) @property def coords(self): _coords = OrderedDict( ( (attrs.TIMENAME, self.time_list), (attrs.FREQNAME, self.interp_freq), (attrs.DIRNAME, self.interp_dir), ) ) return _coords @property def dirs(self): ddir = self.header.get("ddir") if ddir: return list(np.arange(0.0, 360.0 + ddir, ddir)) else: return [0.0] @property def freqs(self): try: f0, df, nf = self.header["f0"], self.header["df"], self.header["nf"] return list(np.arange(f0, f0 + df * nf, df)) except Exception as exc: raise OSError(f"Not enough info to parse frequencies:\n{exc}") @property def filenames(self): if isinstance(self._filename_or_fileglob, Path): self._filename_or_fileglob = str(self._filename_or_fileglob) if isinstance(self._filename_or_fileglob, list): filenames = sorted(self._filename_or_fileglob) elif isinstance(self._filename_or_fileglob, str): filenames = sorted(glob.glob(self._filename_or_fileglob)) if not filenames: raise ValueError(f"No file located in {self._filename_or_fileglob}") return filenames if __name__ == "__main__": from pathlib import Path import matplotlib.pyplot as plt filename1 = "../../tests/sample_files/triaxys.DIRSPEC" filename2 = str(Path(filename1)) dset1 = read_triaxys(filename2) dset2 = read_triaxys(filename2, magnetic_variation=22) dset3 = read_triaxys(filename2, magnetic_variation=22, regrid_dir=False) fig = plt.figure() dset1.isel(time=0).spec.plot(rmin=0.05) plt.title("Original") fig = plt.figure() dset2.isel(time=0).spec.plot(rmin=0.05) plt.title("Rotated 22 degrees, regridded") fig = plt.figure() dset3.isel(time=0).spec.plot(rmin=0.05) plt.title("Rotated 22 degrees, not regridded") plt.show()