# channel.py
from dataclasses import dataclass, field, replace
from typing import Optional, Dict, Any
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import detrend, butter, filtfilt, welch
from scipy.integrate import cumulative_trapezoid
from .spectra import FourierSpectrum, WelchSpectrum
from .arias import AriasResult
from .response import ResponseSpectrum, sdof_newmark_response
[docs]
@dataclass(eq=False)
class Channel:
"""
Single time-history channel with metadata and processing parameters.
This class represents a single 1D time-history signal together with
timing information, physical metadata (quantity, units), and a set of
processing parameters (drift, filter, baseline, trim). Processing is
non-destructive: methods update parameters and return new ``Channel``
instances, while the raw data remain unchanged.
Parameters
----------
data : np.ndarray
One-dimensional array of raw signal values.
dt : float, optional
Sampling interval in seconds. If not provided, an explicit ``time``
array must be supplied.
t0 : float, optional
Start time in seconds. Used to build the time vector when ``dt`` is
provided but ``time`` is not. Default is ``0.0``.
time : np.ndarray, optional
Explicit time array matching ``data`` in shape. If provided, it is
used to infer ``dt`` and ``t0`` if they are not set.
name_input : str, optional
Original name as found in the input file or data source.
name_user : str, optional
User-defined short name (for example ``"Acc1"``).
label_axis : str, optional
Axis label for plotting, typically including units (for example
``"Acc1 [g]"``).
label_legend : str, optional
Legend label for plotting (for example ``"Acc1: Footing"``).
description_long : str, optional
Long human-readable description of the channel.
quantity : str, optional
Physical quantity type (for example ``"displacement"``, ``"force"``,
``"acceleration"``).
units : str, optional
Engineering units for the physical quantity (for example ``"m"``,
``"kN"``, ``"g"``).
raw_units : str, optional
Raw data acquisition units (for example ``"V"``).
calibration_factor : float, optional
Multiplicative factor used to convert raw data to physical units.
Default is ``1.0`` (no scaling).
drift_params : dict, optional
Parameters controlling drift correction. By default an empty dict;
when set, merged with ``DRIFT_DEFAULTS``.
filter_params : dict, optional
Parameters controlling Butterworth filtering. By default an empty
dict; when set, merged with ``FILTER_DEFAULTS``.
baseline_params : dict, optional
Parameters controlling baseline correction via ``scipy.signal.detrend``.
trim_params : dict, optional
Parameters defining a time window for trimming (typically containing
``"t_start"`` and ``"t_end"`` in seconds).
processing_notes : list of str, optional
Free-form notes describing processing steps applied to the channel.
tags : set of str, optional
Tags used for grouping, such as ``{"q:acceleration"}``.
meta : dict, optional
Free-form metadata dictionary (for example sensor type, location).
Notes
-----
Processed data are obtained through :meth:`processed` or :meth:`xy`,
which apply drift correction, filtering, baseline correction, trimming,
and calibration in a fixed order. Results can be cached for efficiency.
"""
# Default processing parameters
DRIFT_DEFAULTS = {"points": 50}
FILTER_DEFAULTS = {"btype": "lowpass", "fc": 50.0, "order": 2}
BASELINE_DEFAULTS = {"type": "linear"}
# Time-history data and time axis
data: np.ndarray
dt: Optional[float] = None
t0: float = 0.0
time: Optional[np.ndarray] = None
# Naming, labels and physical metadata
name_input: Optional[str] = None
name_user: Optional[str] = None
label_axis: Optional[str] = None
label_legend: Optional[str] = None
description_long: Optional[str] = None
quantity: Optional[str] = None
units: Optional[str] = None
raw_units: Optional[str] = None
calibration_factor: float = 1.0
# Processing configuration and notes
drift_params: Dict[str, Any] = field(default_factory=dict)
filter_params: Dict[str, Any] = field(default_factory=dict)
baseline_params: Dict[str, Any] = field(default_factory=dict)
trim_params: Dict[str, Any] = field(default_factory=dict)
processing_notes: list[str] = field(default_factory=list)
# Grouping and free-form metadata
tags: set[str] = field(default_factory=set)
meta: Dict[str, Any] = field(default_factory=dict)
# Internal cache for processed data (not part of the public API)
_cache_processed_time: Optional[np.ndarray] = field(
default=None, init=False, repr=False, compare=False
)
_cache_processed_data: Optional[np.ndarray] = field(
default=None, init=False, repr=False, compare=False
)
_cache_signature: Optional[tuple] = field(
default=None, init=False, repr=False, compare=False
)
# ------------------------------------------------------------------ #
# Initialisation
# ------------------------------------------------------------------ #
def __post_init__(self) -> None:
"""
Finalise initialisation after dataclass construction.
This method normalises arrays, infers missing timing information, and
sets sensible defaults for names, labels and tags. It is executed
automatically by the dataclass after ``__init__``.
Notes
-----
* ``data`` is converted to a 1D NumPy array and validated.
* If ``time`` is provided, it is checked against ``data`` and used to
infer ``dt`` and ``t0`` when missing.
* If only ``dt`` is provided, an equally-spaced time vector is
constructed starting at ``t0``.
* Tags and labels are updated based on ``quantity``, ``units`` and
names when available.
"""
# Ensure data is a 1D NumPy array
self.data = np.asarray(self.data)
if self.data.ndim != 1:
raise ValueError(
f"Channel.data must be a 1D array (single time history), got shape {self.data.shape!r}"
)
# Handle time / dt relationship
if self.time is not None:
self.time = np.asarray(self.time)
if self.time.shape != self.data.shape:
raise ValueError(
f"Time and data must have the same shape; got time.shape={self.time.shape!r}, data.shape={self.data.shape!r}"
)
if self.dt is None and len(self.time) > 1:
self.dt = float(self.time[1] - self.time[0])
if self.t0 == 0.0 and len(self.time) > 0:
self.t0 = float(self.time[0])
else:
if self.dt is None:
raise ValueError("Either 'time' or 'dt' must be provided")
if self.dt <= 0:
raise ValueError(f"dt must be positive, got {self.dt!r}")
n = self.data.shape[0]
self.time = self.t0 + self.dt * np.arange(n)
# Normalise and auto-add tags
self.tags = set(self.tags)
if self.quantity is not None:
self.tags.add(f"q:{self.quantity}")
# Sensible fallbacks for names and labels
if self.name_user is None and self.name_input is not None:
self.name_user = self.name_input
if self.label_legend is None and self.name_user is not None:
self.label_legend = self.name_user
if self.label_axis is None and self.units is not None:
base = self.label_legend or self.name_user or ""
units = f"[{self.units}]"
self.label_axis = f"{base} {units}".strip()
# ------------------------------------------------------------------ #
# Convenience properties
# ------------------------------------------------------------------ #
@property
def duration(self) -> float:
"""
Total duration of the channel in seconds.
Returns
-------
float
Duration computed from the first and last entries of the time
vector. Returns ``0.0`` for an empty channel.
"""
if self.data.size == 0:
return 0.0
t, _ = self.processed()
return float(t[-1] - t[0])
# ------------------------------------------------------------------ #
# Internal cache management
# ------------------------------------------------------------------ #
def _clear_cache(self) -> None:
"""
Clear cached processed data.
This method invalidates any cached processed time and data as well
as the stored processing signature. It is called whenever processing
parameters are updated on a new ``Channel`` instance.
"""
self._cache_processed_time = None
self._cache_processed_data = None
self._cache_signature = None
# ------------------------------------------------------------------ #
# Info
# ------------------------------------------------------------------ #
[docs]
def info(self) -> str:
"""
Return a human-readable summary of channel metadata and processing.
The summary includes basic signal characteristics, timing,
physical meaning and calibration, naming and labels, tags,
processing parameters and notes, and any additional metadata.
Returns
-------
str
Multi-line summary string suitable for printing.
"""
lines = []
# Header
title = self.name_user or self.name_input or "<unnamed>"
lines.append(f"Channel: {title}")
lines.append("-" * (len(title) + 9))
# Basic signal info
lines.append(f"Length: {len(self.data)} samples")
if self.dt is not None and self.dt > 0:
lines.append(f"Sampling frequency: fs = {1.0 / self.dt:g} Hz")
lines.append(f"Timestep: dt = {self.dt:g} s")
else:
lines.append("Sampling frequency: fs = (unknown)")
lines.append("Timestep: dt = (unknown)")
lines.append(f"Start time: t0 = {self.t0} s")
# Physical meaning & calibration
if any(
[self.quantity, self.units, self.raw_units, self.calibration_factor != 1.0]
):
lines.append("\nPhysical meaning & calibration:")
if self.quantity:
lines.append(f" Quantity: {self.quantity}")
if self.units:
lines.append(f" Units: {self.units}")
if self.raw_units:
lines.append(f" Raw units: {self.raw_units}")
if self.calibration_factor != 1.0:
lines.append(
f" Calibration factor: {self.calibration_factor}"
+ f" {self.units or 'units'}/{self.raw_units or 'raw'}"
)
# Naming / labels
if any(
[
self.name_input,
self.name_user,
self.label_legend,
self.label_axis,
self.description_long,
]
):
lines.append("\nNaming / labels:")
if self.name_input:
lines.append(f" Input name: {self.name_input}")
if self.name_user:
lines.append(f" User name: {self.name_user}")
if self.label_legend:
lines.append(f" Legend label: {self.label_legend}")
if self.label_axis:
lines.append(f" Axis label: {self.label_axis}")
if self.description_long:
lines.append(f" Description: {self.description_long}")
# Tags
if self.tags:
lines.append("\nTags:")
taglist = "\n ".join(sorted(self.tags))
lines.append(f" {taglist}")
# Processing
if (
self.drift_params
or self.filter_params
or self.baseline_params
or self.trim_params
or self.processing_notes
):
lines.append("\nProcessing:")
if self.drift_params:
lines.append(f" Drift params: {self.drift_params}")
if self.filter_params:
lines.append(f" Filter params: {self.filter_params}")
if self.baseline_params:
lines.append(f" Baseline params: {self.baseline_params}")
if self.trim_params:
lines.append(f" Trim params: {self.trim_params}")
if self.processing_notes:
lines.append(" Notes:")
for note in self.processing_notes:
lines.append(f" - {note}")
# Free-form metadata
if self.meta:
lines.append("\nMetadata:")
for k, v in self.meta.items():
lines.append(f" {k}: {v}")
return "\n".join(lines)
# ------------------------------------------------------------------ #
# Processing methods that create new Channels
# ------------------------------------------------------------------ #
[docs]
def drift_corrected(self, **override: Any) -> "Channel":
"""
Return a new channel with updated drift correction parameters.
Drift correction is applied lazily in :meth:`processed`. This method
only updates the stored parameters, clears the processing cache and
appends a processing note.
Parameters
----------
**override
Keyword arguments that override or extend the current drift
parameters. Merged with ``DRIFT_DEFAULTS`` and ``drift_params``.
Returns
-------
Channel
New channel instance with updated drift parameters and tags.
"""
params = {**self.DRIFT_DEFAULTS, **self.drift_params, **override}
new = replace(self, drift_params=params)
new._clear_cache()
new.tags = set(self.tags).union({"drift_corrected"})
new.processing_notes = [
*self.processing_notes,
f"Drift params set: {params}",
]
return new
[docs]
def filtered(self, **override: Any) -> "Channel":
"""
Return a new channel with updated Butterworth filter parameters.
Filtering is applied lazily in :meth:`processed`. This method records
the requested filter specification, clears the processing cache and
appends a processing note.
Parameters
----------
**override
Keyword arguments that override or extend the current filter
parameters. Merged with ``FILTER_DEFAULTS`` and ``filter_params``.
Returns
-------
Channel
New channel instance with updated filter parameters and tags.
"""
params = {**self.FILTER_DEFAULTS, **self.filter_params, **override}
new = replace(self, filter_params=params)
new._clear_cache()
new.tags = set(self.tags).union({"filtered"})
new.processing_notes = [
*self.processing_notes,
f"Filter params set: {params}",
]
return new
[docs]
def baseline_corrected(self, **override: Any) -> "Channel":
"""
Return a new channel with updated baseline correction parameters.
Baseline correction is applied lazily in :meth:`processed` using
:func:`scipy.signal.detrend`. This method records the detrend
parameters, clears the processing cache and appends a processing note.
Parameters
----------
**override
Keyword arguments forwarded to :func:`scipy.signal.detrend`
when baseline correction is applied.
Returns
-------
Channel
New channel instance with updated baseline parameters and tags.
"""
params = {**self.BASELINE_DEFAULTS, **self.baseline_params, **override}
new = replace(self, baseline_params=params)
new._clear_cache()
arg_str = ", ".join(f"{k}={v}" for k, v in params.items()) or ""
new.tags = set(self.tags).union({"baseline_corrected"})
new.processing_notes = [
*self.processing_notes,
f"Baseline params set: detrend({arg_str})",
]
return new
[docs]
def trimmed(self, **override: Any) -> "Channel":
"""
Return a new channel with updated trimming window parameters.
Trimming is applied in :meth:`processed` by restricting the time and
data arrays to a specified interval.
Parameters
----------
**override
Keyword arguments that override or extend the current trimming
parameters. Typical keys are ``"t_start"`` and ``"t_end"`` in
seconds.
Returns
-------
Channel
New channel instance with updated trimming parameters and tags.
"""
defaults = {
"t_start": float(self.time[0]),
"t_end": float(self.time[-1]),
}
params = {**defaults, **self.trim_params, **override}
t_start = params["t_start"]
t_end = params["t_end"]
new = replace(self, trim_params=params)
new._clear_cache()
new.tags = set(self.tags).union({"trimmed"})
new.processing_notes = [
*self.processing_notes,
f"Trim params set: {t_start}–{t_end} s",
]
return new
# ------------------------------------------------------------------ #
# Trimmers
# ------------------------------------------------------------------ #
[docs]
def trim_by_threshold(
self,
threshold: float = 0.01,
use_abs: bool = True,
buffer_before: float = 0.0,
buffer_after: float = 0.0,
processed: bool = True,
use_cache: bool = True,
) -> "Channel":
"""
Trim the channel to where the signal exceeds a given threshold.
The trimming window is defined from the first to the last sample where
the signal magnitude exceeds ``threshold``, optionally in absolute
value, with configurable buffers before and after this window.
Parameters
----------
threshold : float, optional
Amplitude threshold used to detect the start and end of the
significant portion of the record. Default is ``0.01``.
use_abs : bool, optional
If ``True`` (default), the threshold is applied to the absolute
value of the signal. If ``False``, it is applied to the raw
signal.
buffer_before : float, optional
Time (seconds) to extend the trimming window before the first
threshold-crossing sample. Default is ``0.0``.
buffer_after : float, optional
Time (seconds) to extend the trimming window after the last
threshold-crossing sample. Default is ``0.0``.
processed : bool, optional
If ``True`` (default), trimming is based on the processed signal
returned by :meth:`xy`. If ``False``, the raw data are used.
use_cache : bool, optional
If ``True`` (default), use the processing cache when obtaining
the signal.
Returns
-------
Channel
New channel instance with updated trimming parameters.
Raises
------
ValueError
If the channel is empty, no samples exceed the threshold, or
the resulting window is empty after buffering.
"""
t, y = self.xy(processed=processed, use_cache=use_cache)
if y.size == 0:
raise ValueError("Cannot trim empty signal: channel has zero samples")
if use_abs:
mask = np.abs(y) >= threshold
else:
mask = y >= threshold
if not np.any(mask):
raise ValueError(f"No samples exceed the specified threshold ({threshold})")
# Indices of first and last samples above the threshold
i_start = int(np.argmax(mask))
i_end = int(len(mask) - 1 - np.argmax(mask[::-1]))
t_start = float(t[i_start]) - buffer_before
t_end = float(t[i_end]) + buffer_after
# Clamp to original time range
t_start = max(t_start, float(t[0]))
t_end = min(t_end, float(t[-1]))
if t_end <= t_start:
raise ValueError(
f"Computed trim window is empty after buffering: t_start={t_start}, t_end={t_end}"
)
return self.trimmed(t_start=t_start, t_end=t_end)
[docs]
def trim_by_fraction_of_peak(
self,
fraction: float = 0.05,
*,
use_abs: bool = True,
buffer_before: float = 0.0,
buffer_after: float = 0.0,
processed: bool = True,
use_cache: bool = True,
) -> "Channel":
"""
Trim the channel to where the signal exceeds a fraction of its peak.
The trimming window is defined using a threshold equal to
``fraction`` times the signal peak amplitude, optionally in absolute
value, with configurable buffers before and after.
Parameters
----------
fraction : float, optional
Fraction of the peak amplitude used to define the threshold.
Must be in the range ``(0, 1]``. Default is ``0.05`` (5% of peak).
use_abs : bool, optional
If ``True`` (default), the peak and threshold are computed using
the absolute value of the signal. If ``False``, they are computed
from the raw signal.
buffer_before : float, optional
Time (seconds) to extend the trimming window before the first
threshold-crossing sample. Default is ``0.0``.
buffer_after : float, optional
Time (seconds) to extend the trimming window after the last
threshold-crossing sample. Default is ``0.0``.
processed : bool, optional
If ``True`` (default), use the processed signal; otherwise use
the raw data.
use_cache : bool, optional
If ``True`` (default), use the processing cache when obtaining
the signal.
Returns
-------
Channel
New channel instance with updated trimming parameters.
Raises
------
ValueError
If ``fraction`` is out of range, the channel is empty, the peak
amplitude is non-positive, or the resulting window is empty.
"""
if not (0.0 < fraction <= 1.0):
raise ValueError(f"fraction must be in (0, 1], got {fraction!r}")
_, y = self.xy(processed=processed, use_cache=use_cache)
if y.size == 0:
raise ValueError("Cannot trim empty signal: channel has zero samples")
if use_abs:
peak = float(np.max(np.abs(y)))
else:
peak = float(np.max(y))
if peak <= 0.0:
raise ValueError(
f"Signal peak is non-positive ({peak}); cannot define threshold from peak"
)
threshold = fraction * peak
return self.trim_by_threshold(
threshold=threshold,
use_abs=use_abs,
buffer_before=buffer_before,
buffer_after=buffer_after,
processed=processed,
use_cache=use_cache,
)
[docs]
def trim_by_arias(
self,
lower: float = 0.05,
upper: float = 0.95,
g: float = 9.81,
buffer_before: float = 0.0,
buffer_after: float = 0.0,
processed: bool = True,
use_cache: bool = True,
) -> "Channel":
"""
Trim the channel using an Arias-intensity-based significant duration.
The trimming window is defined between the times when the cumulative
Arias intensity reaches fractions ``lower`` and ``upper`` of its final
value, optionally extended with time buffers.
Parameters
----------
lower : float, optional
Lower fraction of final Arias intensity (for example ``0.05``
for 5%). Must satisfy ``0 <= lower < upper <= 1``. Default is
``0.05``.
upper : float, optional
Upper fraction of final Arias intensity (for example ``0.95``
for 95%). Default is ``0.95``.
g : float, optional
Gravity acceleration used to convert acceleration in g to m/s².
Default is ``9.81``.
buffer_before : float, optional
Time (seconds) to extend the trimming window before the lower
Arias intensity crossing. Default is ``0.0``.
buffer_after : float, optional
Time (seconds) to extend the trimming window after the upper
Arias intensity crossing. Default is ``0.0``.
processed : bool, optional
If ``True`` (default), use the processed acceleration signal
when computing Arias intensity.
use_cache : bool, optional
If ``True`` (default), use the processing cache when obtaining
the signal.
Returns
-------
Channel
New channel instance with updated trimming parameters.
Raises
------
ValueError
If the channel is empty, the final Arias intensity is non-positive,
the ``lower``/``upper`` values are invalid, or the resulting
window is empty after buffering.
Notes
-----
The channel data are assumed to represent acceleration in units of g.
"""
if not (0.0 <= lower < upper <= 1.0):
raise ValueError(
f"Require 0 <= lower < upper <= 1, got lower={lower!r}, upper={upper!r}"
)
res = self.arias_intensity(g=g, processed=processed, use_cache=use_cache)
t = res.t
Ia = res.Ia
if Ia.size == 0:
raise ValueError("Cannot trim by Arias intensity for empty signal")
Ia_final = float(Ia[-1])
if Ia_final <= 0.0:
raise ValueError(f"Final Arias intensity is non-positive ({Ia_final})")
# Indices where cumulative Arias intensity crosses the requested fractions
idx_lower = int(np.argmax(Ia >= lower * Ia_final))
idx_upper = int(np.argmax(Ia >= upper * Ia_final))
t_lower = float(t[idx_lower]) - buffer_before
t_upper = float(t[idx_upper]) + buffer_after
# Clamp to original time range
t_lower = max(t_lower, float(t[0]))
t_upper = min(t_upper, float(t[-1]))
if t_upper <= t_lower:
raise ValueError(
f"Computed Arias trim window is empty after buffering: t_start={t_lower}, t_end={t_upper}"
)
return self.trimmed(t_start=t_lower, t_end=t_upper)
# ------------------------------------------------------------------ #
# Processed
# ------------------------------------------------------------------ #
[docs]
def processed(self, use_cache: bool = True) -> tuple[np.ndarray, np.ndarray]:
"""
Return time and data after applying all processing steps.
The following operations are applied in a fixed order:
1. Drift correction (if ``drift_params`` are set).
2. Butterworth filtering (if ``filter_params`` are set).
3. Baseline detrend (if ``baseline_params`` are set).
4. Trimming by time window (if ``trim_params`` are set).
5. Calibration-factor scaling.
Parameters
----------
use_cache : bool, optional
If ``True`` (default), cached processed results are returned
when available and the processing signature has not changed.
Returns
-------
t : np.ndarray
Time array after trimming (if applied).
y : np.ndarray
Processed data array after all enabled steps.
Raises
------
ValueError
If processing parameters are inconsistent (for example invalid
filter cutoff, trimming window outside the time range, or missing
``dt`` for filtering or response-related calculations).
"""
# Signature summarising the current processing configuration for caching
signature = (
tuple(sorted(self.drift_params.items())),
tuple(sorted(self.filter_params.items())),
tuple(sorted(self.baseline_params.items())),
tuple(sorted(self.trim_params.items())),
self.calibration_factor,
)
if (
use_cache
and self._cache_signature == signature
and self._cache_processed_data is not None
and self._cache_processed_time is not None
):
return self._cache_processed_time, self._cache_processed_data
# Start from raw data
t = self.time
y = self.data.astype(float, copy=False)
# 1. Drift correction
if self.drift_params:
params = {**self.DRIFT_DEFAULTS, **self.drift_params}
points = params["points"]
if points > len(y):
raise ValueError(
f"Number of points for drift correction ({points}) exceeds data length ({len(y)})"
)
drift_value = float(np.mean(y[:points]))
y = y - drift_value
# 2. Filtering
if self.filter_params:
if self.dt is None or self.dt <= 0:
raise ValueError(
f"Filtering requires a positive dt, got dt={self.dt!r}"
)
fs = 1.0 / self.dt
params = {**self.FILTER_DEFAULTS, **self.filter_params}
btype = params["btype"]
order = params["order"]
if btype in ("lowpass", "highpass"):
fc = params["fc"]
Wn = 2 * fc / fs
elif btype in ("bandpass", "bandstop"):
f1 = params.get("f1")
f2 = params.get("f2")
if f1 is None or f2 is None:
raise ValueError(
f"Band filters require f1 and f2, got f1={f1!r}, f2={f2!r}"
)
Wn = [2 * f1 / fs, 2 * f2 / fs]
else:
raise ValueError(f"Unsupported filter mode: {btype!r}")
if isinstance(Wn, (list, tuple)):
if not (0 < Wn[0] < Wn[1] < 1):
raise ValueError(
f"Normalized band edges must satisfy 0 < f1 < f2 < fs/2; got Wn={Wn} with fs={fs}"
)
else:
if not (0 < Wn < 1):
raise ValueError(
f"Normalized cutoff must satisfy 0 < fc < fs/2; got Wn={Wn} with fs={fs}"
)
b, a = butter(order, Wn, btype=btype)
y = filtfilt(b, a, y)
# 3. Baseline correction
if self.baseline_params:
params = {**self.BASELINE_DEFAULTS, **self.baseline_params}
y = detrend(y, **params)
# 4. Trimming
if self.trim_params:
defaults = {"t_start": float(t[0]), "t_end": float(t[-1])}
params = {**defaults, **self.trim_params}
t_start = params["t_start"]
t_end = params["t_end"]
if t_end <= t_start:
raise ValueError(
f"t_end must be greater than t_start: t_start={t_start}, t_end={t_end}"
)
mask = (t >= t_start) & (t <= t_end)
if not np.any(mask):
raise ValueError(
f"Trim window [{t_start}, {t_end}] does not overlap channel time range [{float(t[0])}, {float(t[-1])}]"
)
t = t[mask]
y = y[mask]
# 5. Calibration
if self.calibration_factor != 1.0:
y = y * self.calibration_factor
# Cache result
if use_cache:
self._cache_signature = signature
self._cache_processed_time = t
self._cache_processed_data = y
return t, y
[docs]
def xy(
self, processed: bool = True, use_cache: bool = True
) -> tuple[np.ndarray, np.ndarray]:
"""
Return coordinate arrays for plotting.
Parameters
----------
processed : bool, optional
If ``True`` (default), return the processed time and data arrays
from :meth:`processed`. If ``False``, return the raw ``time`` and
``data`` attributes.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache when computing processed data.
Returns
-------
x : np.ndarray
Time array.
y : np.ndarray
Data array (raw or processed depending on ``processed``).
"""
if processed:
return self.processed(use_cache=use_cache)
else:
return self.time, self.data
# ------------------------------------------------------------------ #
# Simple time-domain peaks and RMS
# ------------------------------------------------------------------ #
[docs]
def max_abs(
self,
processed: bool = True,
use_cache: bool = True,
) -> tuple[float, float]:
"""
Return the time and value of the maximum absolute amplitude.
Parameters
----------
processed : bool, optional
If ``True`` (default), use the processed data; otherwise use
the raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
Returns
-------
t_peak : float
Time at which the maximum absolute amplitude occurs.
y_peak : float
Data value at the maximum absolute amplitude.
Raises
------
ValueError
If the channel is empty.
"""
t, y = self.xy(processed=processed, use_cache=use_cache)
if y.size == 0:
raise ValueError("Cannot compute max_abs of empty signal")
idx = int(np.argmax(np.abs(y)))
return float(t[idx]), float(y[idx])
[docs]
def max_value(
self,
processed: bool = True,
use_cache: bool = True,
) -> tuple[float, float]:
"""
Return the time and value of the maximum (positive) data value.
Parameters
----------
processed : bool, optional
If ``True`` (default), use the processed data; otherwise use
the raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
Returns
-------
t_peak : float
Time at which the maximum value occurs.
y_peak : float
Maximum data value.
Raises
------
ValueError
If the channel is empty.
"""
t, y = self.xy(processed=processed, use_cache=use_cache)
if y.size == 0:
raise ValueError("Cannot compute max_value of empty signal")
idx = int(np.argmax(y))
return float(t[idx]), float(y[idx])
[docs]
def min_value(
self,
processed: bool = True,
use_cache: bool = True,
) -> tuple[float, float]:
"""
Return the time and value of the minimum (negative) data value.
Parameters
----------
processed : bool, optional
If ``True`` (default), use the processed data; otherwise use
the raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
Returns
-------
t_peak : float
Time at which the minimum value occurs.
y_peak : float
Minimum data value.
Raises
------
ValueError
If the channel is empty.
"""
t, y = self.xy(processed=processed, use_cache=use_cache)
if y.size == 0:
raise ValueError("Cannot compute min_value of empty signal")
idx = int(np.argmin(y))
return float(t[idx]), float(y[idx])
[docs]
def rms(
self,
processed: bool = True,
use_cache: bool = True,
) -> float:
"""
Compute the root-mean-square (RMS) value of the channel.
Parameters
----------
processed : bool, optional
If ``True`` (default), compute RMS of the processed data;
otherwise use the raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
Returns
-------
float
RMS value of the signal.
Raises
------
ValueError
If the channel is empty.
"""
_, y = self.xy(processed=processed, use_cache=use_cache)
if y.size == 0:
raise ValueError("Cannot compute RMS of empty signal")
return float(np.sqrt(np.mean(y**2)))
# ------------------------------------------------------------------ #
# Fourier amplitude spectrum + peak
# ------------------------------------------------------------------ #
[docs]
def fourier(
self,
processed: bool = True,
use_cache: bool = True,
) -> FourierSpectrum:
"""
Compute the (single-sided) Fourier amplitude spectrum of the channel.
The spectrum is computed via zero-padded FFT to the next power-of-two
length, and the single-sided amplitude spectrum is returned.
Parameters
----------
processed : bool, optional
If ``True`` (default), use the processed data; otherwise use
the raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
Returns
-------
FourierSpectrum
Object containing frequency array and amplitude spectrum.
Raises
------
ValueError
If the channel is empty or ``dt`` is missing/invalid.
"""
_, y = self.xy(processed=processed, use_cache=use_cache)
n = len(y)
if n == 0:
raise ValueError("Cannot compute Fourier spectrum of empty signal")
if self.dt is None or self.dt <= 0:
raise ValueError("Fourier transform requires a positive dt")
n_fft = 1 << (n - 1).bit_length()
f = np.fft.rfftfreq(n=n_fft, d=self.dt)
s = np.abs(np.fft.rfft(y, n=n_fft))
return FourierSpectrum(f=f, s=s)
[docs]
def fourier_peak(
self,
processed: bool = True,
use_cache: bool = True,
) -> tuple[float, float]:
"""
Return the dominant frequency and amplitude from the Fourier spectrum.
Parameters
----------
processed : bool, optional
If ``True`` (default), use the processed data; otherwise use
the raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
Returns
-------
f_peak : float
Frequency at which the spectrum amplitude is maximum.
s_peak : float
Maximum amplitude of the Fourier spectrum.
"""
spec = self.fourier(processed=processed, use_cache=use_cache)
return spec.peak()
# ------------------------------------------------------------------ #
# Welch PSD + peak (MATLAB-ish defaults)
# ------------------------------------------------------------------ #
[docs]
def welch_psd(
self,
processed: bool = True,
use_cache: bool = True,
**kwargs: Any,
) -> WelchSpectrum:
"""
Compute the power spectral density (PSD) using Welch's method.
Parameters
----------
processed : bool, optional
If ``True`` (default), use the processed data; otherwise use
the raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
**kwargs
Additional keyword arguments forwarded to
:func:`scipy.signal.welch`. If ``nperseg`` is not provided, a
default of ``min(256, n)`` is used.
Returns
-------
WelchSpectrum
Object containing frequency array and PSD values.
Raises
------
ValueError
If the channel is empty or ``dt`` is missing/invalid.
"""
_, y = self.xy(processed=processed, use_cache=use_cache)
n = len(y)
if n == 0:
raise ValueError("Cannot compute Welch PSD of empty signal")
if self.dt is None or self.dt <= 0:
raise ValueError("Welch PSD requires a positive dt")
fs = 1.0 / self.dt
if "nperseg" not in kwargs:
kwargs["nperseg"] = min(256, n)
f, p = welch(x=y, fs=fs, **kwargs)
return WelchSpectrum(f=f, p=p)
[docs]
def welch_peak(
self,
processed: bool = True,
use_cache: bool = True,
**kwargs: Any,
) -> tuple[float, float]:
"""
Return the dominant frequency and PSD amplitude from the Welch spectrum.
Parameters
----------
processed : bool, optional
If ``True`` (default), use the processed data; otherwise use
the raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
**kwargs
Additional keyword arguments forwarded to
:meth:`welch_psd` and then to :func:`scipy.signal.welch`.
Returns
-------
f_peak : float
Frequency at which the PSD is maximum.
p_peak : float
Maximum PSD value.
"""
psd = self.welch_psd(processed=processed, use_cache=use_cache, **kwargs)
return psd.peak()
# ------------------------------------------------------------------ #
# Arias intensity
# ------------------------------------------------------------------ #
[docs]
def arias_intensity(
self,
g: float = 9.81,
processed: bool = True,
use_cache: bool = True,
) -> AriasResult:
"""
Compute cumulative Arias intensity and significant-duration window.
The channel is assumed to contain acceleration in units of g. The
resulting Arias intensity is computed via numerical integration of
squared acceleration, and the 5–95% significant-duration window is
identified.
Parameters
----------
g : float, optional
Gravity acceleration used to convert from g to m/s². Default is
``9.81``.
processed : bool, optional
If ``True`` (default), use the processed acceleration signal;
otherwise use the raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
Returns
-------
AriasResult
Object containing time array, cumulative Arias intensity, and
start/end times of the 5–95% significant-duration window.
Raises
------
ValueError
If the channel is empty or the final Arias intensity is
non-positive.
"""
t, y = self.xy(processed=processed, use_cache=use_cache)
if y.size == 0:
raise ValueError("Cannot compute Arias intensity of empty signal")
a_mps2 = g * y
coef = np.pi / (2.0 * g)
Ia = cumulative_trapezoid(coef * a_mps2**2, t, initial=0.0)
Ia_final = float(Ia[-1])
if Ia_final <= 0:
raise ValueError("Final Arias intensity is non-positive")
idx_start = int(np.argmax(Ia >= 0.05 * Ia_final))
idx_end = int(np.argmax(Ia >= 0.95 * Ia_final))
t_start = float(t[idx_start])
t_end = float(t[idx_end])
return AriasResult(
t=t,
Ia=Ia,
t_start=t_start,
t_end=t_end,
)
# ------------------------------------------------------------------ #
# Response spectrum (SDOF, Newmark-beta average acceleration)
# ------------------------------------------------------------------ #
[docs]
def response_spectrum(
self,
periods: np.ndarray = np.linspace(0.05, 5.0, 100),
g: float = 9.81,
ksi: float = 0.05,
processed: bool = True,
use_cache: bool = True,
) -> ResponseSpectrum:
"""
Compute an elastic response spectrum for a grid of periods.
The response spectrum is computed by subjecting a set of linear
single-degree-of-freedom (SDOF) oscillators, with specified damping
ratio, to the acceleration record using the Newmark-beta average
acceleration method.
Parameters
----------
periods : np.ndarray, optional
One-dimensional array of natural periods (in seconds) at which
the spectrum is evaluated. Default is ``np.linspace(0.05, 5.0, 100)``.
g : float, optional
Gravity acceleration used to convert from g to m/s². Default is
``9.81``.
ksi : float, optional
Damping ratio of the SDOF systems (for example ``0.05`` for 5%
damping). Default is ``0.05``.
processed : bool, optional
If ``True`` (default), use the processed acceleration signal;
otherwise use the raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
Returns
-------
ResponseSpectrum
Object containing periods, spectral displacement (``Sd``),
velocity (``Sv``) and acceleration (``Sa`` in units of g), and
the damping ratio.
Raises
------
ValueError
If the channel is empty, ``dt`` is missing/invalid, ``periods``
is not 1D, or contains non-positive values.
"""
_, a = self.xy(processed=processed, use_cache=use_cache)
if a.size == 0:
raise ValueError("Cannot compute response spectrum of empty signal")
if self.dt is None or self.dt <= 0:
raise ValueError("Response spectrum requires a positive dt")
a_mps2 = g * a # convert g to m/s^2
periods = np.asarray(periods, dtype=float)
if periods.ndim != 1:
raise ValueError("periods must be a 1D array")
if np.any(periods <= 0.0):
raise ValueError("All periods must be positive")
# Preallocate spectral arrays and loop over SDOF periods
Sd = np.zeros_like(periods, dtype=float)
Sv = np.zeros_like(periods, dtype=float)
Sa = np.zeros_like(periods, dtype=float)
for i, T in enumerate(periods):
omega = 2.0 * np.pi / T
Sd[i], Sv[i], Sa[i] = sdof_newmark_response(a_mps2, self.dt, omega, ksi)
Sa_g = Sa / g # convert back to g
return ResponseSpectrum(T=periods, Sd=Sd, Sv=Sv, Sa=Sa_g, ksi=ksi)
# ------------------------------------------------------------------ #
# Plotting
# ------------------------------------------------------------------ #
[docs]
def plot(
self,
ax: Optional[plt.Axes] = None,
processed: bool = True,
use_cache: bool = True,
include_label: bool = True,
include_kind: bool = False,
include_legend: bool = False,
plot_type: str = "timehistory",
**plot_kwargs: Any,
) -> plt.Axes:
"""
Plot the channel in one of several supported representations.
The default behaviour (``plot_type='timehistory'``) is to plot the
time history. Other values of ``plot_type`` delegate to helper
methods to plot Fourier spectrum, power spectral density, Arias
intensity or response spectrum.
Supported ``plot_type`` values
-------------------------------
* ``"timehistory"``, ``"time"``, ``"time_history"``
* ``"fourier"``, ``"fft"``
* ``"psd"``, ``"welch"``, ``"power"``
* ``"arias"``, ``"husid"``
* ``"response"``, ``"response_spectrum"``, ``"rs"``
Parameters
----------
ax : matplotlib.axes.Axes, optional
Axes to plot on. If ``None``, a new figure and axes are created
with :func:`matplotlib.pyplot.subplots`.
processed : bool, optional
If ``True`` (default), use processed data for time-domain
plotting and for spectral quantities. If ``False``, use raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
include_label : bool, optional
For time-history plots, if ``True`` (default) use ``label_axis``
as the y-axis label when available.
include_kind : bool, optional
For time-history plots, if ``True``, build a generic y-label from
``quantity`` and ``units`` (e.g. ``"Acceleration [g]"``). This
overrides any existing y-label if set.
include_legend : bool, optional
For time-history plots, if ``True``, add a legend using
``label_legend`` or ``name_user``.
plot_type : str, optional
Representation to plot. See list above. Default is
``"timehistory"``.
**plot_kwargs
Additional keyword arguments forwarded to the underlying plotting
method (for example line style, color, ``fmax``, ``periods``).
Returns
-------
matplotlib.axes.Axes
Axes instance containing the requested plot.
Raises
------
ValueError
If an unknown ``plot_type`` is given.
"""
if ax is None:
_, ax = plt.subplots()
pt = plot_type.lower()
# Time history (original behaviour)
if pt in ("time", "timehistory", "time_history"):
t, y = self.xy(processed=processed, use_cache=use_cache)
line_label = self.label_legend or self.name_user or self.name_input
ax.plot(t, y, label=line_label, **plot_kwargs)
ax.set_xlabel("Time [s]")
ylabel = ax.get_ylabel() or ""
if include_label and self.label_axis:
ylabel = self.label_axis
if include_kind and self.quantity:
ylabel = self.quantity.capitalize() + (
f" [{self.units}]" if self.units else ""
)
ax.set_ylabel(ylabel)
if include_legend and line_label:
ax.legend()
ax.grid(True)
return ax
# Delegated spectral / intensity / response plots
if pt in ("fourier", "fft"):
return self.plot_fourier(
ax=ax,
processed=processed,
use_cache=use_cache,
**plot_kwargs,
)
if pt in ("psd", "welch", "power"):
return self.plot_psd(
ax=ax,
processed=processed,
use_cache=use_cache,
**plot_kwargs,
)
if pt in ("arias", "husid"):
return self.plot_arias(
ax=ax,
processed=processed,
use_cache=use_cache,
**plot_kwargs,
)
if pt in ("response", "response_spectrum", "rs"):
return self.plot_response_spectrum(
ax=ax,
processed=processed,
use_cache=use_cache,
**plot_kwargs,
)
raise ValueError(
f"Unknown plot_type {plot_type!r}. "
"Use 'timehistory', 'fourier', 'psd', 'arias', or 'response'."
)
[docs]
def plot_fourier(
self,
ax: Optional[plt.Axes] = None,
processed: bool = True,
use_cache: bool = True,
fmax: Optional[float] = 50.0,
**plot_kwargs: Any,
) -> plt.Axes:
"""
Plot the Fourier amplitude spectrum of the channel.
Parameters
----------
ax : matplotlib.axes.Axes, optional
Axes to plot on. If ``None``, a new figure and axes are created.
processed : bool, optional
If ``True`` (default), compute the spectrum from processed data;
otherwise use raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
fmax : float or None, optional
Maximum frequency shown on the plot (in Hz). If ``None``, the
full frequency range is used. Default is ``50.0``.
**plot_kwargs
Additional keyword arguments forwarded to
:meth:`FourierSpectrum.plot`.
Returns
-------
matplotlib.axes.Axes
Axes instance containing the Fourier spectrum plot.
"""
spec = self.fourier(processed=processed, use_cache=use_cache)
return spec.plot(ax=ax, fmax=fmax, **plot_kwargs)
[docs]
def plot_psd(
self,
ax: Optional[plt.Axes] = None,
processed: bool = True,
use_cache: bool = True,
fmax: Optional[float] = 50.0,
**welch_kwargs: Any,
) -> plt.Axes:
"""
Plot the power spectral density (PSD) using Welch's method.
Parameters
----------
ax : matplotlib.axes.Axes, optional
Axes to plot on. If ``None``, a new figure and axes are created.
processed : bool, optional
If ``True`` (default), compute the PSD from processed data;
otherwise use raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
fmax : float or None, optional
Maximum frequency shown on the plot (in Hz). If ``None``, the
full frequency range is used. Default is ``50.0``.
**welch_kwargs
Additional keyword arguments forwarded to :meth:`welch_psd` and
then to :func:`scipy.signal.welch`.
Returns
-------
matplotlib.axes.Axes
Axes instance containing the PSD plot.
"""
spec = self.welch_psd(processed=processed, use_cache=use_cache, **welch_kwargs)
return spec.plot(ax=ax, fmax=fmax)
[docs]
def plot_arias(
self,
ax: Optional[plt.Axes] = None,
g: float = 9.81,
processed: bool = True,
use_cache: bool = True,
show_window: bool = True,
**plot_kwargs: Any,
) -> plt.Axes:
"""
Plot the Arias intensity time history (Husid plot) for this channel.
Parameters
----------
ax : matplotlib.axes.Axes, optional
Axes to plot on. If ``None``, a new figure and axes are created.
g : float, optional
Gravity acceleration used to convert acceleration from g to m/s².
Default is ``9.81``.
processed : bool, optional
If ``True`` (default), compute Arias intensity from processed
acceleration; otherwise use raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
show_window : bool, optional
If ``True`` (default), plot vertical lines marking the 5–95%
significant-duration window.
**plot_kwargs
Additional keyword arguments forwarded to
:meth:`AriasResult.plot`.
Returns
-------
matplotlib.axes.Axes
Axes instance containing the Arias intensity plot.
Notes
-----
The channel data are assumed to represent acceleration in units of g.
"""
res = self.arias_intensity(g=g, processed=processed, use_cache=use_cache)
return res.plot(ax=ax, show_window=show_window, **plot_kwargs)
[docs]
def plot_response_spectrum(
self,
ax: Optional[plt.Axes] = None,
periods: np.ndarray = np.linspace(0.05, 5.0, 100),
ksi: float = 0.05,
processed: bool = True,
use_cache: bool = True,
y: str = "Sa",
logx: bool = False,
logy: bool = False,
**plot_kwargs: Any,
) -> plt.Axes:
"""
Plot the elastic response spectrum for this channel.
Parameters
----------
ax : matplotlib.axes.Axes, optional
Axes to plot on. If ``None``, a new figure and axes are created.
periods : np.ndarray, optional
One-dimensional array of natural periods (in seconds) at which
the spectrum is evaluated. Default is
``np.linspace(0.05, 5.0, 100)``.
ksi : float, optional
Damping ratio of the SDOF systems (for example ``0.05`` for 5%
damping). Default is ``0.05``.
processed : bool, optional
If ``True`` (default), compute the spectrum from processed data;
otherwise use raw data.
use_cache : bool, optional
If ``True`` (default) and ``processed`` is ``True``, use the
processing cache.
y : {"Sa", "Sv", "Sd"}, optional
Response quantity to plot: spectral acceleration ``"Sa"``, velocity
``"Sv"`` or displacement ``"Sd"``. Default is ``"Sa"``.
logx : bool, optional
If ``True``, use a logarithmic scale on the x-axis (period).
Default is ``False``.
logy : bool, optional
If ``True``, use a logarithmic scale on the y-axis (response).
Default is ``False``.
**plot_kwargs
Additional keyword arguments forwarded to
:meth:`ResponseSpectrum.plot`.
Returns
-------
matplotlib.axes.Axes
Axes instance containing the response spectrum plot.
"""
rs = self.response_spectrum(
periods=periods,
ksi=ksi,
processed=processed,
use_cache=use_cache,
)
return rs.plot(
ax=ax,
y=y,
logx=logx,
logy=logy,
**plot_kwargs,
)