Source code for wavespectra.input.obscape

"""WaveSpectra reader for Obscape CSV files (as downloaded from portal).

Buoy data: https://obscape.com/site/
Note: for raw data use the python file that can be downloaded from the obs site.

Revision history:
----------------
2024-03-11 : First version



"""

import logging
from datetime import datetime, timezone
from pathlib import Path
import numpy as np
import xarray as xr

from wavespectra.core.attributes import set_spec_attributes


logger = logging.getLogger(__name__)


def _read_obscape_file(filename: str) -> dict:
    """Read an Obscape file.

    args:
        - filename (str): The filename to read.

    returns:
        - R (dict): A dictionary containing the data read from the file.

    Notes:
        - metadata is marked with a # symbol.
        - time is in filename, UTC.
        - all other lines are a CSV file.

    """
    filename = Path(filename)
    assert filename.exists()

    info = []

    with open(filename, "r") as f:
        lines = f.readlines()

        metadata = dict()

        for line in lines:
            line = line.strip()
            if line.startswith("#"):
                if "=" in line:
                    key, value = line.split("=")
                    metadata[key[1:].strip()] = value.strip()
                else:
                    info.append(line)

    # read the csv data using numpy, skipping lines that start with #
    data = np.genfromtxt(filename, delimiter=",", comments="#")

    # get frequencies
    freq = metadata["Rows [Hz]"]
    freq = freq.split(",")
    freq = [float(f) for f in freq]

    dir = metadata["Columns [deg]"]

    # split on , or space
    dir = dir.replace(" ", ",").split(",")

    # check if it makes sense
    dd = float(dir[1]) - float(dir[0])

    dirs = np.arange(0, 360, dd)

    # check dims

    if data.shape[0] != len(freq):
        logger.warning(f"Frequency dimension mismatch: {data.shape[0]} != {len(freq)}")

    if data.shape[1] != len(dirs):
        logger.warning(f"Direction dimension mismatch: {data.shape[1]} != {len(dirs)}")

    metadata["info"] = info

    # return as a dict
    R = dict()
    R["freq"] = freq
    R["dir"] = dirs
    R["data"] = data

    timestamp = metadata["Timestamp"]  # unix timestamp

    # convert timestamp to datetime object
    R["utc"] = datetime.fromtimestamp(int(timestamp), tz=timezone.utc).replace(
        tzinfo=None
    )

    R["metadata"] = metadata

    return R


[docs] def read_obscape(filename_or_fileglob): """Read spectra from Obscape wave buoy csv files. The CSV files are downloaded from the Obscape portal. Args: - filename_or_fileglob (str, list): A single filename or a list of filenames or a fileglob pattern. Returns: - dset (SpecDataset): A wavespectra SpecDataset object. """ # step 1: get the files if isinstance(filename_or_fileglob, list): files = filename_or_fileglob else: path = Path(filename_or_fileglob) files = sorted(path.absolute().parent.glob(path.name)) if not files: raise ValueError(f"No files found from {filename_or_fileglob}") # step 2: get the data R = [] for file in files: R.append(_read_obscape_file(file)) # step 3: construct the data metadata = R[0]["metadata"] # use the first read spectrum dirs = R[0]["dir"] freqs = R[0]["freq"] efth = [d["data"] for d in R] times = [d["utc"] for d in R] ds = xr.DataArray( data=efth, coords={"time": times, "freq": freqs, "dir": dirs}, dims=("time", "freq", "dir"), name="efth", ).to_dataset() ds = ds.sortby("time", ascending=True) # scale ds["efth"] = ds["efth"] * np.pi / 180 # convert to m2/Hz/deg for key in [ "Station name", "Device type", "Device serial", "Latitude [deg]", "Longitude [deg]", "Magnetic declination (corrected) [deg]", "Directions", "info", ]: try: ds.attrs[key] = metadata[key] except KeyError: pass # Set attributes set_spec_attributes(ds) # add site dimension ds["site"] = [0] return ds
def _get_timestamp(stem): try: year = int(stem[:4]) month = int(stem[4:6]) day = int(stem[6:8]) hour = int(stem[9:11]) minute = int(stem[11:13]) second = int(stem[13:15]) except ValueError: logger.warning( "Filename does not contain a valid UTC timestamp, expect the filename " f"to start with yyyymmdd_hhmmss but got: {stem}" ) return None # make a python datetime object utc = datetime(year, month, day, hour, minute, second) return utc def _get_obs_files(directory, start_date=None, end_date=None): """Get a list of csv files in a directory with timestamps in a given range. This function return all the .csv files in the directory that have a timestamp greater than or equal to start_date and less than or equal to end_date. Timestamps are extracted from the filename which are expected to be in the format: `yyyymmdd_hhmmss.....csv` Args: - directory (str): The directory containing the Obscape files. - start_date (datetime): The start date to filter the files. - end_date (datetime): The end date to filter the files. Returns: - R (list): A list of Path objects. """ directory = Path(directory) assert directory.exists() files = directory.glob("*.csv") R = [] for file in files: timestamp = _get_timestamp(file.stem) if start_date is not None: if timestamp < start_date: continue if end_date is not None: if timestamp > end_date: continue R.append(file) return R def read_obscape_dir(directory, start_date=None, end_date=None): """Read obscape spectra files from directory. This function reads all the files in the directory that have a timestamp greater than or equal to `start_date` and less than or equal to `end_date`. Timestamps are extracted from the filename. The filename is expected to start with the timestamp which is expected to be in the format yyyymmdd_hhmmss, e.g., `20240214_000000_wavebuoy_xxx_spec2D.csv`. Args: - directory (str): The directory containing the Obscape files. - start_date (datetime): The start date to filter the files. - end_date (datetime): The end date to filter the files. Returns: - dset (SpecDataset): A wavespectra SpecDataset object. """ files = _get_obs_files(directory, start_date, end_date) return read_obscape(files)