Source code for wavespectra.core.swan

"""Read and write swan spectra files"""
from pathlib import Path
import re
import gzip
import datetime
import pandas as pd
import numpy as np

from wavespectra.core.attributes import attrs
from wavespectra.core.utils import to_nautical

E2V = 1025 * 9.81


[docs] class SwanSpecFile(object): """Read spectra in SWAN ASCII format."""
[docs] def __init__( self, filename, freqs=None, dirs=None, x=None, y=None, time=False, id="Swan Spectrum", dirorder=False, append=False, tabfile=None, ): self.times = False self.filename = Path(filename) self.tabfile = ( tabfile or Path(str(self.filename).replace(".gz", "")).with_suffix(".tab") ) self.is_tab = False self.buf = None if self.filename.suffix == ".gz": fopen = gzip.open suffix = "t" else: fopen = open suffix = "" if freqs is not None: # Writable file self.freqs = np.array(freqs) self.dirs = np.array(dirs) self.x = np.array(x) self.y = np.array(y) if time: self.times = [] self.fid = fopen(filename, f"w{suffix}") self.write_header(time, id) self.fmt = len(self.dirs) * "{:5.0f}" else: self.fid = fopen(filename, f"r{suffix}+" if append else f"r{suffix}") self._read_header("SWAN") while True: if not self._read_header("$"): break if self._read_header("TIME"): self._read_header("1") self.times = [] self.x = [] self.y = [] locs = self._read_header("LONLAT", True) or self._read_header( "LOCATIONS", True ) for ip in locs: xy = [float(val) for val in ip.split()] self.x.append(xy[0]) self.y.append(xy[1]) self.x = np.array(self.x) self.y = np.array(self.y) self.afreq = self._read_header("AFREQ", True) self.rfreq = self._read_header("RFREQ", True) self.ndir = self._read_header("NDIR", True) self.cdir = self._read_header("CDIR", True) if self.afreq: self.freqs = np.array([float(val) for val in self.afreq]) else: self.freqs = np.array([float(val) for val in self.rfreq]) if self.ndir: self.dirs = np.array([float(val) for val in self.ndir]) else: self.dirs = to_nautical(np.array([float(val) for val in self.cdir])) self._read_header("QUANT", True) # Figure units out, if Energy density factor needs to be applied units = self.fid.readline().strip().split()[0] if units.upper().startswith("J"): self.units_factor = E2V else: self.units_factor = 1.0 self.excval = int(float(self.fid.readline().split()[0])) if dirorder: self.dirmap = list(np.argsort(self.dirs % 360.0)) self.dirs = self.dirs[self.dirmap] % 360.0 else: self.dirmap = False lons = np.unique(self.x) lats = np.unique(self.y) self.is_grid = len(lons) * len(lats) == len(self.x) self.is_tab = self.tabfile.is_file() & (len(lons) * len(lats) == 1)
def _read_header(self, keyword, numspec=False): if not self.buf: self.buf = self.fid.readline() if self.buf.find(keyword) >= 0: if numspec: line = self.fid.readline() n = int(re.findall(r"\b(\d+)\b", line)[0]) self.buf = [self.fid.readline() for i in range(0, n)] rtn = self.buf self.buf = None else: rtn = False return rtn def read(self): """Read single timestep from current position in file.""" if not self.fid: return None if isinstance(self.times, list): line = self.fid.readline() if line: ttime = datetime.datetime.strptime(line[0:15], "%Y%m%d.%H%M%S") self.times.append(ttime) else: return None Sout = [] for ip, pp in enumerate(self.x): Snew = np.nan * np.zeros((len(self.freqs), len(self.dirs))) if self._read_header("NODATA"): pass else: if self._read_header("ZERO"): Snew = np.zeros((len(self.freqs), len(self.dirs))) elif self._read_header("FACTOR"): fac = float(self.fid.readline()) for i, f in enumerate(self.freqs): line = self.fid.readline() lsplit = line.split() try: Snew[i, :] = [float(val) for val in lsplit] except Exception: import warnings warnings.warn("Check what this is supposed to be doing.") pass Snew *= fac if self.dirmap: Snew = Snew[:, self.dirmap] else: # For files with no timestamp return None Sout.append(Snew / self.units_factor) return Sout def readall(self): """Read the entire file.""" while True: sset = self.read() if sset: yield sset else: break def write_header(self, time=False, str1="", str2="", timecode=1, excval=-99): """Write header to file.""" # Description strout = "{:40}{}\n".format("SWAN 1", "Swan standard spectral file") strout += "{:4}{}\n".format("$", str1) strout += "{:4}{}\n".format("$", str2) # Time if time: strout += "{:40}{}\n".format("TIME", "time-dependent data") strout += "{:>6d}{:34}{}\n".format(timecode, "", "time coding option") # Location strout += "{:40}{}\n".format("LONLAT", "locations in spherical coordinates") strout += "{:>6d}{:34}{}\n".format(len(self.x), "", "number of locations") for x, y in zip(self.x, self.y): strout += "{:2}{:<0.6f}{:2}{:<0.6f}\n".format("", x, "", y) # Frequency strout += "{:40}{}\n".format("AFREQ", "absolute frequencies in Hz") strout += "{:6d}{:34}{}\n".format(len(self.freqs), "", "number of frequencies") for freq in self.freqs: strout += "{:>11.5f}\n".format(freq) # Direction strout += "{:40}{}\n".format("NDIR", "spectral nautical directions in degr") strout += "{:6d}{:34}{}\n".format(len(self.dirs), "", "number of directions") for wdir in self.dirs: strout += "{:>11.4f}\n".format(wdir) # Data strout += "QUANT\n{:>6d}{:34}{}\n".format( 1, "", "number of quantities in table" ) strout += "{:40}{}\n".format("VaDens", "variance densities in m2/Hz/degr") strout += "{:40}{}\n".format("m2/Hz/degr", "unit") strout += "{:3}{:<37g}{}\n".format("", excval, "exception value") # Dumping self.fid.write(strout) def write_spectra(self, arr, time=None): """Write spectra from single timestamp. Args: arr (3D ndarray): spectra to write S(site, freq, dim). time (yyymmdd.HHMMSS): time of spectra to write. """ if time is not None: self.fid.write(f"{time:40}{'date and time'}\n") for spec in arr: fac = spec.max() / 9998.0 if np.isnan(fac): self.fid.write("NODATA\n") elif fac <= 0: self.fid.write("ZERO\n") else: self.fid.write(f"FACTOR\n{'':4}{fac:0.8E}\n") np.savetxt(self.fid, spec / fac, fmt="%5.0f", delimiter="") def close(self): """Close file handle.""" if self.fid: self.fid.close() self.fid = False
[docs] def read_tab(filename, toff=0): """Read swan table file. Args: filename (str): name of SWAN tab file to read toff (float): timezone offset in hours Returns: Pandas DataFrame object """ df = pd.read_csv( filename, delim_whitespace=True, skiprows=[0, 1, 2, 3, 5, 6], parse_dates=[0], date_format="%Y%m%d.%H%M%S", index_col=0, ) df.index.name = attrs.TIMENAME df.index = df.index.shift(toff, freq="1H") for col1, col2 in zip(df.columns[-1:0:-1], df.columns[-2::-1]): df = df.rename(columns={col2: col1}) return df.iloc[:, 0:-1]