Source code for stglib.met

import numpy as np
import pandas as pd
import xarray as xr
from scipy import stats

from .core import attrs, qaqc, utils


def read_mayfly(filnam, skiprows=7):
    """Read csv file from EnviroDIY Mayfly datalogger into an xarray
    Dataset.

    skiprows : int, optional
        How many header rows to skip. Default 7.
    """

    df = pd.read_csv(
        filnam,
        skiprows=skiprows,
        header=0,
        na_values=[-9999],
    )
    df["Date and Time in UTC"] = pd.to_datetime(df["Date and Time in UTC"])
    df.rename(columns={"Date and Time in UTC": "time"}, inplace=True)
    df.set_index("time", inplace=True)

    return df.to_xarray()


def read_campbell(filnam):
    """Read dat file from Campbell Scientific datalogger into an xarray
    Dataset.
    """

    df = pd.read_csv(
        filnam,
        header=1,
        skiprows=[2, 3],
        na_values=["NAN"],
    )
    df["TIMESTAMP"] = pd.to_datetime(df["TIMESTAMP"])
    df.rename(columns={"TIMESTAMP": "time"}, inplace=True)
    df.set_index("time", inplace=True)

    return df.to_xarray()


# Make raw CDF
[docs]def csv_to_cdf(metadata): basefile = metadata["basefile"] if metadata["datalogger"].lower() == "mayfly": ds = read_mayfly(basefile + ".csv", skiprows=metadata["skiprows"]) metadata.pop("skiprows") elif metadata["datalogger"].lower() == "campbell": ds = read_campbell(basefile + ".dat") ds = utils.write_metadata(ds, metadata) ds = utils.ensure_cf(ds) # configure file cdf_filename = ds.attrs["filename"] + "-raw.cdf" ds.to_netcdf(cdf_filename, unlimited_dims=["time"]) print("Finished writing data to %s" % cdf_filename) return ds
# Process data and write to .nc file
[docs]def cdf_to_nc(cdf_filename): """ Load a raw .cdf file and generate a processed .nc file """ ds = xr.open_dataset(cdf_filename) # remove units in case we change and we can use larger time steps ds.time.encoding.pop("units") # Drop unneeded variables ds = met_drop_vars(ds) # Rename variables to CF compliant names ds = ds_rename_vars(ds) # Convert vars from int64 to float (doesn't touch coords like time) ds = ds.astype(float) # Fill time gaps ds.attrs["sample_interval"] = stats.mode( np.diff(ds.time) / np.timedelta64(1, "s") ).mode ds.attrs["sample_rate"] = 1 / ds.attrs["sample_interval"] # Hz ds = fill_time_gaps(ds) # Remove bad rows. Needs to happen before direction corrections. ds = qaqc.call_qaqc(ds) # Apply direction offset and magnetic declination correction wind_vars = [ "WD_min", "WD_410", "WD_gust", "wind_dir", ] for var in wind_vars: if var in ds: # If sensor wasn't pointing to magnetic north, apply offset to direction if "dir_offset" in ds.attrs: ds[var] = ds[var] + ds.attrs["dir_offset"].astype(float) ds = utils.insert_history( ds, f"Applied dir_offset of {ds.attrs['dir_offset']}" ) # Convert direction from magnetic to true with magnetic declination ds[var] = ds[var] + ds.attrs["magnetic_variation"].astype(float) ds[var] = ds[var].round(0) ds[var] = ds[var] % 360 ds = utils.insert_history( ds, f"Rotated directions from magnetic north to true north by applying magnetic_variation of {ds.attrs['magnetic_variation']}", ) # Run utilities ds = utils.create_z(ds) ds = utils.clip_ds(ds) ds = utils.add_start_stop_time(ds) ds = utils.ds_add_lat_lon(ds) ds = utils.add_min_max(ds) ds = utils.add_delta_t(ds) # Add attributes ds = attrs.ds_add_attrs(ds) ds = ds_add_var_attrs(ds) # Write to .nc file print("Writing cleaned/trimmed data to .nc file") nc_filename = ds.attrs["filename"] + "-a.nc" ds.to_netcdf( nc_filename, unlimited_dims=["time"], encoding={"time": {"dtype": "i4"}} ) utils.check_compliance(nc_filename, conventions=ds.attrs["Conventions"]) print("Done writing netCDF file", nc_filename)
def fill_time_gaps(ds): """Fill any gaps in time-series using to make time even""" sr = ds.attrs["sample_rate"] sims = 1 / sr * 1000 pds = ( int( (ds["time"][-1].values - ds["time"][0].values) / (sims * np.timedelta64(1, "ms")) ) + 1 ) idx = pd.date_range(str(ds["time"][0].values), periods=pds, freq=f"{sims}ms") # make sure time index is unique ds = ds.drop_duplicates(dim="time") ds = ds.reindex(time=idx) return ds def ds_rename_vars(ds): wxt_names = { # Mayfly + WXT "WXTDn": "WD_min", "WXTDm": "WD_410", "WXTDx": "WD_gust", "WXTSn": "WS_min", "WXTSm": "WS_401", "WXTSx": "WG_402", "WXTTa": "T_21", "WXTUa": "RH_910", "WXTPa": "BPR_915", "WXTRc": "Rn_963", # Campbell + WXT "WindDir_lull": "WD_min", "WindDir_avg": "WD_410", "WindDir_gust": "WD_gust", "WindSpeed_lull": "WS_min", "WindSpeed_avg": "WS_401", "WindSpeed_gust": "WG_402", "Temp": "T_21", "RH": "RH_910", "Baro": "BPR_915", "R_amt": "Rn_963", "R_dur": "rain_duration", "R_int": "rain_rate", "H_amt": "hail_amount", "H_dur": "hail_duration", "H_int": "hail_rate", } clivue_names = { # Campbell + ClimaVue "PTemp_C_Avg": "internal_temp", "SlrFD_W": "solar_flux_density", "Rain_mm_Tot": "rain_amount", "Strikes_Tot": "light_strikes", "Dist_km": "strike_distance", "WS_ms": "wind_speed", "WindDir": "wind_dir", "MaxWS_ms": "wind_gust", "AirT_C": "air_temp", "VP_mbar": "vapor_pressure", "BP_mbar": "baro_pressure", "RH": "relative_humidity", "RHT_C": "humidity_sensor_temp", "TiltNS_deg": "tilt_NS", "TiltWE_deg": "tilt_WE", "SlrTF_MJ_Tot": "solar_total_flux", "Invalid_Wind": "wind_error", } # Check to make sure vars exist before trying to rename newvars = {} if ds.attrs["instrument_type"].lower() == "wxt": for k in wxt_names: if k in ds: newvars[k] = wxt_names[k] elif ds.attrs["instrument_type"].lower() == "climavue": for k in clivue_names: if k in ds: newvars[k] = clivue_names[k] return ds.rename(newvars) def met_drop_vars(ds): # Drop unneeded variables var_list = [ "SampNum", "Battery", "BoardTemp", "signalPercent", "RECORD", "panel_temp", "power_in", "lithium_battery", "memory_free", "BattV_Max", "CVMeta", ] # Will ignore errors if variable is not in dataset ds = ds.drop_vars(var_list, errors="ignore") return ds # Add initial height to specific variables def ds_add_var_attrs(ds): for name in ds.variables: if (name not in ds.coords) and ("time" not in name): # don't include for coordinates that are also variables var = ds[name] var.attrs["initial_instrument_height"] = ds.attrs[ "initial_instrument_height" ] var.attrs["height_depth_units"] = "m" if "initial_instrument_height_note" in ds.attrs: var.attrs["initial_instrument_height_note"] = ds.attrs[ "initial_instrument_height_note" ] return ds