# -*- coding: utf-8 -*-
"""
trace
~~~~~
Module contains a trace to analyze, process, visualize time-discrete measured
signals.
:copyright: (c) 2021-2023 by Jochen Gerhaeusser.
:license: BSD, see LICENSE for details
"""
from __future__ import annotations
import math
import operator
import statistics as stats
from collections import defaultdict
from dataclasses import (
asdict, astuple, dataclass, field, fields, Field, replace)
from itertools import (
accumulate, chain, islice, repeat, tee)
from typing import (
Any, Callable,
Iterable, Iterator,
Mapping, MutableMapping,
Optional,
Sequence, MutableSequence,
TypeAlias,
Union)
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.stats.mstats import winsorize
# module exports
__all__ = [
'Number', 'Sample', 'Samples', 'Operand', 'DataFrame',
'Trace', 'vectorize',
'logical_and', 'logical_or', 'priority',
'Traces', 'as_traces',
'Point2D',
'Point3D',
'Vector',
'VectorTraces', 'polar',
'Statistics',
'StatisticsTraces',
'MovingAverageTraces',
'SetTraces', 'combine',
'SlewRateLimiter',
'SlewRateLimiterTraces',
'ExponentialSmoothing',
'ExponentialSmoothingTraces',
'LinearRegression',
'LinearRegressionTraces',
'AlphaBetaFilter',
'AlphaBetaFilterTraces',
'IIRFilter',
]
#: Numeric type
Number: TypeAlias = Union[bool, int, float]
#: Sample type
Sample: TypeAlias = Union[Number, str]
#: Samples type
Samples: TypeAlias = Union[MutableSequence[Sample], Iterable[Sample], Iterable[None]]
#: Operand type
Operand: TypeAlias = Union[Iterable[Number], Number]
#: Data frame type
DataFrame: TypeAlias = Mapping[str, Iterable[Sample]]
# Signum function
sign = np.sign
def _pairwise(trace: (Iterator[Number] |
Sequence[Number] |
Trace)) -> Iterator[tuple[Number, Number]]:
""" Returns a pairwise iterator for the signal :attr:`~Trace.samples` in a
:class:`Trace`.
:param trace: trace to iterate
"""
a, b = tee(trace)
next(b, None)
return zip(a, b)
def _clip(value: float | int,
lower: float | int | None,
upper: float | int | None) -> float | int:
""" Returns the *value* limited between the *lower* and *upper* bound.
.. note:: The *upper* bound is dominant over the *lower* bound.
:param float | int value: value to clip
:param float | int lower: optional value of the lower bound or ``None``
:param float | int upper: optional value of the upper bound or ``None``
"""
if lower is None and upper is None:
# no bound
return value
elif upper is None:
# lower bound
return max(value, lower)
elif lower is None:
# upper bound
return min(value, upper)
else:
# both bounds
return min(upper, max(lower, value))
def _clamp(value: float | int,
bound: float | int) -> float | int:
""" Returns the *value* symmetrically clamped at the *bound*.
:param float | int value: value to clamp
:param float | int bound: bound to clamp at
"""
bound = abs(bound)
return _clip(value, -bound, bound)
[docs]def vectorize(operand: Operand) -> Samples:
""" Returns for an :class:`int`, :class:`float`, :class:`bool` or number
literal :class:`str` *operand* an endless iterator, otherwise the *operand*
is returned unchanged.
:param Operand operand: operand to vectorize
"""
if operand is None:
return repeat(None)
if isinstance(operand, (int, float, bool)):
# create number iterator
return repeat(operand)
if isinstance(operand, str):
try:
# create number iterator from literal
return repeat(int(operand, 0))
except ValueError:
raise ValueError(f"Sample literal '{operand}' is not a number.")
return operand
[docs]@dataclass
class IIRFilter:
""" The :class:`IIRFilter` class is a factory for a second-order recursive
linear infinite impulse response filter, also known as *SOS-IIR filter*.
The transfer function :math:`H(z)` with normalized filter coefficients of a
second-order IIR filter is defined as follows:
:math:`H(z) = \\frac{b_0 + b_1 \\cdot z^{-1} + b_2 \\cdot z^{-2}}
{1 + a_1 \\cdot z^{-1} + a_2 \\cdot z^{-2}}`
"""
#: Normalized feedforward filter coefficient
b0: float
#: Normalized feedforward filter coefficient (1st-order)
b1: float
#: Normalized feedforward filter coefficient (2nd-order)
b2: float
#: Normalized feedback filter coefficient (1st-order)
a1: float
#: Normalized feedback filter coefficient (2nd-order)
a2: float
#: Sum of the 1st-feedback stage of the SOS-IIR filter
s1: float = 0.0
#: Sum of the 2nd-feedback stage of the SOS-IIR filter
s2: float = 0.0
[docs] @classmethod
def band_pass(cls,
dt: float,
f0: float,
q: float = 1 / math.sqrt(2)) -> IIRFilter:
""" Creates a digital second-order IIR band-pass filter with normalized
filter coefficients for the given sampling-time *dt*, center frequency
*f0* and quality factor *q* of the filter.
:param float dt: sampling-time of the filter in seconds
:param float f0: center frequency of the filter in Hertz
:param float q: inverse of bandwidth factor of the filter
"""
omega = 2 * math.pi * f0 * dt
alpha = math.sin(omega) / (2 * q)
# filter coefficients numerator
b0 = alpha
b1 = 0
b2 = -alpha
# filter coefficients denominator
a0 = 1 + alpha
a1 = -2 * math.cos(omega)
a2 = 1 - alpha
# second-order IIR filter with normalized filter coefficients
return cls(b0 / a0, b1 / a0, b2 / a0, a1 / a0, a2 / a0)
[docs] @classmethod
def low_pass(cls,
dt: float,
f0: float,
q: float = 1 / math.sqrt(2)) -> IIRFilter:
""" Creates a digital second-order IIR low-pass filter with normalized
filter coefficients for the given sampling-time *dt*, cutoff frequency
*f0* and quality factor *q* of the filter.
:param float dt: sampling-time of the filter in seconds
:param float f0: cutoff frequency of the filter in Hertz
:param float q: inverse of bandwidth factor of the filter
"""
omega = 2 * math.pi * f0 * dt
alpha = math.sin(omega) / (2 * q)
# filter coefficients numerator
b0 = (1 - math.cos(omega)) / 2
b1 = 1 - math.cos(omega)
b2 = b0
# filter coefficients denominator
a0 = 1 + alpha
a1 = -2 * math.cos(omega)
a2 = 1 - alpha
# second-order IIR filter with normalized filter coefficients
return cls(b0 / a0, b1 / a0, b2 / a0, a1 / a0, a2 / a0)
[docs] @classmethod
def high_pass(cls,
dt: float,
f0: float,
q: float = 1 / math.sqrt(2)) -> IIRFilter:
""" Creates a digital second-order IIR high-pass filter with normalized
filter coefficients for the given sampling-time *dt*, cutoff frequency
*f0* and quality factor *q* of the filter.
:param float dt: sampling-time of the filter in seconds
:param float f0: cutoff frequency of the filter in Hertz
:param float q: inverse of bandwidth factor of the filter
"""
omega = 2 * math.pi * f0 * dt
alpha = math.sin(omega) / (2 * q)
# filter coefficients numerator
b0 = (1 + math.cos(omega)) / 2
b1 = -(1 + math.cos(omega))
b2 = b0
# filter coefficients denominator
a0 = 1 + alpha
a1 = -2 * math.cos(omega)
a2 = 1 - alpha
# second-order IIR filter with normalized filter coefficients
return cls(b0 / a0, b1 / a0, b2 / a0, a1 / a0, a2 / a0)
[docs] @classmethod
def notch(cls,
dt: float,
f0: float,
q: float = 1 / math.sqrt(2)) -> IIRFilter:
""" Creates a digital second-order IIR notch filter with normalized
filter coefficients for the given sampling-time *dt*, center frequency
*f0* and quality factor *q* of the filter.
:param float dt: sampling-time of the filter in seconds
:param float f0: center frequency of the filter in Hertz
:param float q: inverse of bandwidth factor of the filter
"""
omega = 2 * math.pi * f0 * dt
alpha = math.sin(omega) / (2 * q)
# filter coefficients numerator
b0 = 1
b1 = -2 * math.cos(omega)
b2 = b0
# filter coefficients denominator
a0 = 1 + alpha
a1 = -2 * math.cos(omega)
a2 = 1 - alpha
# second-order IIR filter with normalized filter coefficients
return cls(b0 / a0, b1 / a0, b2 / a0, a1 / a0, a2 / a0)
[docs] @classmethod
def all_pass(cls,
dt: float,
f0: float,
q: float = 1 / math.sqrt(2)) -> IIRFilter:
""" Creates a digital second-order IIR all-pass filter with normalized
filter coefficients for the given sampling-time *dt*, center frequency
*f0* and quality factor *q* of the filter.
:param float dt: sampling-time of the filter in seconds
:param float f0: center frequency of the filter in Hertz
:param float q: inverse of bandwidth factor of the filter
"""
omega = 2 * math.pi * f0 * dt
alpha = math.sin(omega) / (2 * q)
# filter coefficients numerator
b0 = 1 - alpha
b1 = -2 * math.cos(omega)
b2 = 1 + alpha
# filter coefficients denominator
a0 = 1 + alpha
a1 = -2 * math.cos(omega)
a2 = 1 - alpha
# second-order IIR filter with normalized filter coefficients
return cls(b0 / a0, b1 / a0, b2 / a0, a1 / a0, a2 / a0)
[docs] def clear(self) -> None:
""" Clears the sum value of the feedback stages of the SOS-IIR filter.
"""
self.s1 = 0.0
self.s2 = 0.0
[docs] def compute(self, x: float | int) -> float:
""" Computes for the sample :math:`x` the SOS-IIR filter equation in
canonical form with the configured normalized filter coefficients.
:param float | int x: sample value to compute
"""
# normalized canonical form
y = self.b0 * x + self.s1
self.s1 = (self.b1 * x - self.a1 * y) + self.s2
self.s2 = (self.b2 * x - self.a2 * y)
return y
[docs]@dataclass(eq=False)
class Trace:
""" Trace data class for time-discrete, equidistant signal samples.
"""
#: Label of the trace displayed in its plot.
label: str = 'Trace'
#: List of the time-discrete, equidistant signal samples of the trace.
samples: Samples = field(default_factory=list)
def __post_init__(self):
if not isinstance(self.samples, list):
# samples type conversion -> list
self.samples = list(self.samples)
if not isinstance(self.label, str):
# label type conversion -> str
self.label = str(self.label)
[docs] @classmethod
def from_dict(cls,
key: str,
data: DataFrame,
**kwargs: Any) -> Trace:
""" Returns a new trace labeled with the *key*, and the signal
:attr:`samples` from the value of the *data* dictionary item selected
by the *key*.
:param str key: key name of the signal :attr:`samples` to select from
the data dictionary
:param DataFrame data: dictionary with signal :attr:`samples`
:keyword str label: optional :attr:`label` to set instead of the key name
"""
kwargs['label'] = kwargs.get('label', key)
kwargs['samples'] = data[key]
return cls(**kwargs)
[docs] def relabel(self, label: str) -> Trace:
""" Returns the same trace relabeled with the new *label*.
:param str label: new label to set
"""
self.label = str(label)
return self
@property
def label_stem(self) -> str:
""" Returns the stem of the trace :attr:`label`."""
return self.label.partition(':')[0]
def __len__(self) -> int:
""" Returns the number of signal :attr:`samples` in the trace."""
return len(self.samples)
def __getitem__(self,
index: int | slice) -> Sample | MutableSequence[Sample]:
""" Returns the sample at the *index* from the trace.
:param int | slice index: index of the sample or the slice of the
samples to get
"""
if isinstance(index, slice):
return self.samples[index]
else:
return self.samples[index]
def __setitem__(self,
index: int,
value: Sample) -> None:
""" Sets the sample at the *index* to the *value* in the trace.
:param int index: index of the sample to set
:param Sample value: numerical value to set
"""
self.samples[index] = value
def __contains__(self, item: Sample) -> bool:
""" Checks if the *item* is in the signal :attr:`samples` of the trace.
"""
return item in self.samples
def __iter__(self) -> Iterator[Sample]:
""" Returns an iterator over the signal :attr:`samples` in the trace."""
return iter(self.samples)
def __reversed__(self) -> Iterator[Sample]:
""" Returns a reverse iterator over the signal :attr:`samples` in the
trace."""
return reversed(self.samples)
[docs] def repeat(self, times: int = 1) -> Iterator[Sample]:
""" Returns an iterator repeating each signal sample of the trace
n-*times*.
:param int times: number of times to repeat a sample.
Default is ``1``.
"""
return chain.from_iterable((map(lambda x: repeat(x, times), self)))
[docs] def fields(self) -> tuple[Field, ...]:
""" Returns a tuple describing the fields of the data class."""
return fields(self)
[docs] def as_dict(self) -> dict[str, Any]:
""" Returns the trace data class as a dictionary."""
return asdict(self)
[docs] def as_tuple(self) -> tuple[Any, ...]:
""" Returns the trace data class as a tuple.
"""
return astuple(self)
[docs] def bool(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` converted to
booleans.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:bool'),
samples=map(bool, self))
[docs] def int(self,
base: int | None = None,
**kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` converted to
integers.
:param int | None base: radix base of the integer literal to convert
:keyword str label: optional trace label to set
"""
if base is None:
return replace(self,
label=kwargs.get('label', f'{self.label}:int'),
samples=map(int, self))
return replace(self,
label=kwargs.get('label', f'{self.label}:int'),
samples=map(lambda item: int(item, base), self))
[docs] def float(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` converted to
floating-points.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:float'),
samples=map(float, self))
[docs] def bin(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` converted to
binary literals.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:bin'),
samples=map(bin, self))
[docs] def oct(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` converted to
octal literals.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:oct'),
samples=map(oct, self))
[docs] def hex(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` converted to
hexadecimal literals.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:hex'),
samples=map(hex, self))
[docs] def less(self, operand: Operand) -> Trace:
""" Returns a new trace with the results as integers of the lesser
comparison between the signal :attr:`samples` and the operand.
:param Operand operand: iterable or number to compare with
.. note:: An iterable *operand* should have at least the same length as
the trace :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:lt',
samples=map(lambda lhs, rhs: int(lhs < rhs),
self, vectorize(operand)))
def __lt__(self, operand: Operand) -> Trace:
return self.less(operand)
[docs] def less_equal(self, operand: Operand) -> Trace:
""" Returns a new trace with the results as integers of the lesser or
equal comparison between the signal :attr:`samples` and the operand.
:param Operand operand: iterable or number to compare with
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:le',
samples=map(lambda lhs, rhs: int(lhs <= rhs),
self, vectorize(operand)))
def __le__(self, operand: Operand) -> Trace:
return self.less_equal(operand)
[docs] def equal(self, operand: Operand) -> Trace:
""" Returns a new trace with the results as integers of the equal
comparison between the signal :attr:`samples` and the operand.
:param Operand operand: iterable or number to compare with
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:eq',
samples=map(lambda lhs, rhs: int(lhs == rhs),
self, vectorize(operand)))
def __eq__(self, operand: Operand) -> Trace:
return self.equal(operand)
[docs] def not_equal(self, operand: Operand) -> Trace:
""" Returns a new trace with the results as integers of the not equal
comparison between the signal :attr:`samples` and the operand.
:param Operand operand: iterable or number to compare with
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self, label=f'{self.label}:ne',
samples=map(lambda lhs, rhs: int(lhs != rhs),
self, vectorize(operand)))
def __ne__(self, operand: Operand) -> Trace:
return self.not_equal(operand)
[docs] def greater_equal(self, operand: Operand) -> Trace:
""" Returns a new trace with the results as integers of the greater or
equal comparison between the signal :attr:`samples` and the operand.
:param Operand operand: iterable or number to compare with
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:ge',
samples=map(lambda lhs, rhs: int(lhs >= rhs),
self, vectorize(operand)))
def __ge__(self, operand: Operand) -> Trace:
return self.greater_equal(operand)
[docs] def greater(self, operand: Operand) -> Trace:
""" Returns a new trace with the results as integers of the greater
comparison between the signal :attr:`samples` and the operand.
:param Operand operand: iterable or number to compare with
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:gt',
samples=map(lambda lhs, rhs: int(lhs > rhs),
self, vectorize(operand)))
def __gt__(self, operand: Operand) -> Trace:
return self.greater(operand)
[docs] def add(self, summand: Operand) -> Trace:
""" Returns a new trace with the sums of the signal :attr:`samples`
and the *summand*.
:param Operand summand: iterable or number
.. note:: An iterable *summand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:add',
samples=map(lambda a, b: a + b,
self, vectorize(summand)))
def __add__(self, summand: Operand) -> Trace:
return self.add(summand)
def __radd__(self, summand: Operand) -> Trace:
""" Returns a new trace with the sums of the *summand* and the signal
:attr:`samples`.
:param Operand summand: iterable or number
.. note:: An iterable *summand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:radd',
samples=map(lambda a, b: a + b,
vectorize(summand), self))
[docs] def sub(self, subtrahend: Operand) -> Trace:
""" Returns a new trace with the differences between the signal
:attr:`samples` and the *subtrahend*.
:param Operand subtrahend: iterable or number
.. note:: An iterable *subtrahend* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:sub',
samples=map(lambda a, b: a - b,
self, vectorize(subtrahend)))
def __sub__(self, subtrahend: Operand) -> Trace:
return self.sub(subtrahend)
def __rsub__(self, minuend: Operand) -> Trace:
""" Returns a new trace with the differences between the *minuend* and
the signal :attr:`samples`.
:param Operand minuend: iterable or number
.. note:: An iterable *minuend* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:rsub',
samples=map(lambda a, b: a - b,
vectorize(minuend), self))
[docs] def mul(self, factor: Operand) -> Trace:
""" Returns a new trace with the products between the signal
:attr:`samples` and the *factor*.
:param Operand factor: iterable or number
.. note:: An iterable *factor* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:mul',
samples=map(lambda a, b: a * b,
self, vectorize(factor)))
def __mul__(self, factor: Operand) -> Trace:
return self.mul(factor)
def __rmul__(self, factor: Operand) -> Trace:
""" Returns a new trace with the products between the *factor* and the
signal :attr:`samples`.
:param Operand factor: iterable or number
.. note:: An iterable *factor* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:rmul',
samples=map(lambda a, b: a * b,
vectorize(factor), self))
[docs] def div(self, divisor: Operand) -> Trace:
""" Returns a new trace with the quotients between the signal
:attr:`samples` and the *divisor*.
:param Operand divisor: iterable or number
.. note:: An iterable *divisor* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
.. note:: Zero-divisions between samples are resolved by returning
zero for the samples in the new trace where the divisor is zero!
"""
return replace(self,
label=f'{self.label}:div',
samples=map(lambda a, b: a / b if b != 0 else 0,
self, vectorize(divisor)))
def __truediv__(self, divisor: Operand) -> Trace:
return self.div(divisor)
def __rtruediv__(self, dividend: Operand) -> Trace:
""" Returns a new trace with the quotients between the *dividend* and
the signal :attr:`samples`.
:param Operand dividend: iterable or number
.. note:: An iterable *dividend* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
.. note:: Zero-divisions between samples are resolved by returning
zero for the samples in the new trace where the divisor is zero!
"""
return replace(self,
label=f'{self.label}:rdiv',
samples=map(lambda a, b: a / b if b != 0 else 0,
vectorize(dividend), self))
[docs] def floordiv(self, divisor: Operand) -> Trace:
""" Returns a new trace with the integer quotients between the signal
:attr:`samples` and the *divisor*.
:param Operand divisor: iterable or number
.. note:: An iterable *divisor* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
.. note:: Zero-divisions between samples are resolved by returning
zero for the samples in the new trace where the divisor is zero!
"""
return replace(self,
label=f'{self.label}:floordiv',
samples=map(lambda a, b: a // b if b != 0 else 0,
self, vectorize(divisor)))
def __floordiv__(self, divisor: Operand) -> Trace:
return self.floordiv(divisor)
def __rfloordiv__(self, dividend: Operand) -> Trace:
""" Returns a new trace with the integer quotients between the
*dividend* and the signal :attr:`samples`.
:param Operand dividend: iterable or number
.. note:: An iterable *dividend* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
.. note:: Zero-divisions between samples are resolved by returning
zero for the samples in the new trace where the divisor is zero!
"""
return replace(self,
label=f'{self.label}:rfloordiv',
samples=map(lambda a, b: a // b if b != 0 else 0,
vectorize(dividend), self))
[docs] def mod(self, divisor: Operand) -> Trace:
""" Returns a new trace with the remainders between the signal
:attr:`samples` and the *divisor*.
The sign of the remainders is determined by the *divisor*.
:param Operand divisor: iterable or number
.. note:: An iterable *divisor* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
.. note:: Zero-divisions between samples are resolved by returning
zero for the samples in the new trace where the divisor is zero!
"""
return replace(self,
label=f'{self.label}:mod',
samples=map(lambda a, b: a % b if b != 0 else 0,
self, vectorize(divisor)))
def __mod__(self, divisor: Operand) -> Trace:
return self.mod(divisor)
def __rmod__(self, dividend: Operand) -> Trace:
""" Returns a new trace with the remainders between the *dividend* and
the signal :attr:`samples`.
:param Operand dividend: iterable or number
.. note:: An iterable *dividend* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
.. note:: Zero-divisions between samples are resolved by returning
zero for the samples in the new trace where the divisor is zero!
"""
return replace(self,
label=f'{self.label}:rmod',
samples=map(lambda a, b: a % b if b != 0 else 0,
vectorize(dividend), self))
[docs] def fmod(self, divisor: Operand) -> Trace:
""" Returns a new trace with the remainders between the signal
:attr:`samples` and the *divisor*.
The sign of a remainder is determined by the signal sample.
:param Operand divisor: iterable or number
.. note:: An iterable *divisor* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
.. note:: Zero-divisions between samples are resolved by returning
zero for the samples in the new trace where the divisor is zero!
"""
return replace(self,
label=f'{self.label}:fmod',
samples=map(lambda a, b: math.fmod(a, b) if b != 0 else 0,
self, vectorize(divisor)))
[docs] def pow(self, exponent: Operand) -> Trace:
""" Returns a new trace with the exponentiated values for the signal
:attr:`samples` raised to the power of the *exponent*.
:param Operand exponent: iterable or number
.. note:: An iterable *exponent* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:pow',
samples=map(lambda a, b: a ** b,
self, vectorize(exponent)))
def __pow__(self, exponent: Operand) -> Trace:
return self.pow(exponent)
def __rpow__(self, base: Operand) -> Trace:
""" Returns a new trace with the exponentiated values for the *base*
raised to the power of the signal :attr:`samples`.
:param Operand base: iterable or number
.. note:: An iterable *base* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:rpow',
samples=map(lambda a, b: a ** b,
vectorize(base), self))
[docs] def bitwise_and(self, operand: Operand) -> Trace:
""" Returns a new trace with the bitwise ANDed values of the signal
:attr:`samples` and the *operand*.
:param Operand operand: iterable or number
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:and',
samples=map(lambda a, b: int(a) & int(b),
self, vectorize(operand)))
def __and__(self, operand: Operand) -> Trace:
return self.bitwise_and(operand)
def __rand__(self, operand: Operand) -> Trace:
""" Returns a new trace with the bitwise ANDed values of the *operand*
and the signal :attr:`samples`.
:param Operand operand: iterable or number
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:rand',
samples=map(lambda a, b: int(a) & int(b),
vectorize(operand), self))
[docs] def bitwise_or(self, operand: Operand) -> Trace:
""" Returns a new trace with the bitwise ORed values of the signal
:attr:`samples` and the *operand*.
:param Operand operand: iterable or number
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:or',
samples=map(lambda a, b: int(a) | int(b),
self, vectorize(operand)))
def __or__(self, operand: Operand) -> Trace:
return self.bitwise_or(operand)
def __ror__(self, operand: Operand) -> Trace:
""" Returns a new trace with the bitwise ORed values of the *operand*
and the signal :attr:`samples`.
:param Operand operand: iterable or number
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:ror',
samples=map(lambda a, b: int(a) | int(b),
vectorize(operand), self))
[docs] def bitwise_xor(self, operand: Operand) -> Trace:
""" Returns a new trace with the bitwise XORed values of the signal
:attr:`samples` and the *operand*.
:param Operand operand: iterable or number
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:xor',
samples=map(lambda a, b: int(a) ^ int(b),
self, vectorize(operand)))
def __xor__(self, operand: Operand) -> Trace:
return self.bitwise_xor(operand)
def __rxor__(self, operand: Operand) -> Trace:
""" Returns a new trace with the bitwise XORed values of the *operand*
and the signal :attr:`samples`.
:param Operand operand: iterable or number
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:rxor',
samples=map(lambda a, b: int(a) ^ int(b),
vectorize(operand), self))
[docs] def left_shift(self, operand: Operand) -> Trace:
""" Returns a new trace with the bitwise left-shifted signal
:attr:`samples` by the *operand*.
:param Operand operand: iterable or number
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:shl',
samples=map(lambda a, b: int(a) << int(b),
self, vectorize(operand)))
def __lshift__(self, operand: Operand) -> Trace:
return self.left_shift(operand)
def __rlshift__(self, operand: Operand) -> Trace:
""" Returns a new trace with the bitwise left-shifted *operand* by the
signal :attr:`samples`.
:param Operand operand: iterable or number
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:rshl',
samples=map(lambda a, b: int(a) << int(b),
vectorize(operand), self))
[docs] def right_shift(self, operand: Operand) -> Trace:
""" Returns a new trace with the right shifted signal :attr:`samples`
by the *operand*.
:param Operand operand: iterable or number
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:shr',
samples=map(lambda a, b: int(a) >> int(b),
self, vectorize(operand)))
def __rshift__(self, operand: Operand) -> Trace:
return self.right_shift(operand)
def __rrshift__(self, operand: Operand) -> Trace:
""" Returns a new trace with the right-shifted *operand* by the signal
:attr:`samples`.
:param Operand operand: iterable or number
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
return replace(self,
label=f'{self.label}:rshr',
samples=map(lambda a, b: int(a) >> int(b),
vectorize(operand), self))
[docs] def bits(self,
index: int,
number: int = 1,
**kwargs: Any) -> Trace:
""" Returns a new trace with the unpacked *number* of bits starting
at the bit *index* from the signal :attr:`samples` of the trace.
:param int index: start index of the bits to unpack [0..31]
:param int number: number of bits to unpack [1..32-index].
Default is ``1``.
:keyword str label: optional trace label to set
"""
if index < 0 or index > 31:
raise ValueError(f"Bit index '{index}' invalid.")
if number < 0 or number > 32 - index:
raise ValueError(
f"Number of bits '{number}' to unpack at index '{index}' invalid.")
mask = 2 ** number - 1
return replace(self,
label=kwargs.get('label', f'{self.label}:bits'),
samples=map(lambda x: (int(x) >> index) & mask, self))
def __iadd__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] += item[1]
self.label += ':iadd'
return self
def __isub__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] -= item[1]
self.label += ':isub'
return self
def __imul__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] *= item[1]
self.label += ':imul'
return self
def __itruediv__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] /= item[1] if item[1] != 0 else 0
self.label += ':idiv'
return self
def __ifloordiv__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] //= item[1] if item[1] != 0 else 0
self.label += ':ifloordiv'
return self
def __imod__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] %= item[1] if item[1] != 0 else 0
self.label += ':imod'
return self
def __ipow__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] **= item[1]
self.label += ':ipow'
return self
def __iand__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] &= item[1]
self.label += ':iand'
return self
def __ior__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] |= item[1]
self.label += ':ior'
return self
def __ixor__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] ^= item[1]
self.label += ':ixor'
return self
def __ilshift__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] <<= item[1]
self.label += ':ishl'
return self
def __irshift__(self, operand: Operand) -> Trace:
for i, item in enumerate(zip(self, vectorize(operand))):
self[i] >>= item[1]
self.label += ':ishr'
return self
[docs] def neg(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the negated values of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:neg'),
samples=map(lambda item: -item, self))
def __neg__(self) -> Trace:
return self.neg()
def __pos__(self) -> Trace:
return self
[docs] def abs(self) -> Trace:
""" Returns a new trace with the absolute values of the signal
:attr:`samples`.
"""
return replace(self,
label=f'{self.label}:abs',
samples=map(abs, self))
def __abs__(self) -> Trace:
""" Returns a new trace with the absolute values of the signal
:attr:`samples`.
"""
return self.abs()
[docs] def invert(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the bitwise inverted values of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:not'),
samples=map(lambda item: ~item, self))
def __invert__(self) -> Trace:
return self.invert()
[docs] def round(self,
ndigits: Optional[int] = None,
**kwargs: Any) -> Trace:
""" Returns a new trace with the rounded values of the signal
:attr:`samples`.
:param int | None ndigits: optional number of digits rounded after the
decimal point
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:round'),
samples=map(lambda item: round(item, ndigits), self))
def __round__(self, ndigits: Optional[int] = None) -> Trace:
""" Returns a new trace with the rounded values of the signal
:attr:`samples`.
:param int | None ndigits: optional number of digits rounded after the
decimal point
"""
return self.round(ndigits)
[docs] def trunc(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the truncated values of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:trunc'),
samples=map(math.trunc, self))
def __trunc__(self) -> Trace:
""" Returns a new trace with the truncated values of the signal
:attr:`samples`.
"""
return self.trunc()
[docs] def floor(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the floor, rounded integers of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:floor'),
samples=map(math.floor, self))
def __floor__(self) -> Trace:
return self.floor()
[docs] def ceil(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the ceil, rounded integers of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:ceil'),
samples=map(math.ceil, self))
def __ceil__(self) -> Trace:
return self.ceil()
[docs] def sign(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the signum values of the signal
:attr:`samples` of the trace.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:sign'),
samples=map(sign, self))
[docs] def zero(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the test results as integers for testing
the signal :attr:`samples` to be zero or not.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:zero'),
samples=map(lambda x: int(sign(x) == 0), self))
[docs] def positive(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the test results as integers for testing
the signal :attr:`samples` to be positive or not.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:positive'),
samples=map(lambda x: int(sign(x) == 1), self))
[docs] def negative(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the test results as integers for testing
the signal :attr:`samples` to be negative or not.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:negative'),
samples=map(lambda x: int(sign(x) == -1), self))
[docs] def delta(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the deltas between the consecutive signal
:attr:`samples`.
:keyword str label: optional trace label to set
:keyword float preset: optional preset value to compute the delta.
Default is the first value in the signal samples.
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:delta'),
samples=map(lambda x: x[1] - x[0], _pairwise(chain(
[kwargs.get('preset', self[0] if self else None)],
self))))
[docs] def enter_positive(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the test results as integers for checking
the signal :attr:`samples` starts to be positive.
:keyword str label: optional trace label to set
:keyword float preset: optional preset value to compute the delta.
Default is the first value in the signal samples.
"""
signs = self.sign()
deltas = signs.delta(kwargs=kwargs)
return replace(self,
label=kwargs.get('label', f'{self.label}:enter_positive'),
samples=map(
lambda signum, delta: int(signum == 1 and delta > 0),
signs, deltas))
[docs] def left_positive(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the test results as integers for checking
the signal :attr:`samples` stops to be positive.
:keyword str label: optional trace label to set
:keyword float preset: optional preset value to compute the delta.
Default is the first value in the signal samples.
"""
signs = self.sign()
deltas = signs.delta(kwargs=kwargs)
return replace(self,
label=kwargs.get('label', f'{self.label}:left_positive'),
samples=map(lambda signum, delta: int(
(signum == 0 and delta == -1) or (
signum == -1 and delta == -2)),
signs, deltas))
[docs] def enter_negative(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the test results as integers for checking
the signal :attr:`samples` starts to be negative.
:keyword str label: optional trace label to set
:keyword float preset: optional preset value to compute the delta.
Default is the first value in the signal samples.
"""
signs = self.sign()
deltas = signs.delta(kwargs=kwargs)
return replace(self,
label=kwargs.get('label', f'{self.label}:enter_negative'),
samples=map(
lambda signum, delta: int(signum == -1 and delta < 0),
signs, deltas))
[docs] def left_negative(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the test results as integers for checking
the signal :attr:`samples` stops to be negative.
:keyword str label: optional trace label to set
:keyword float preset: optional preset value to compute the delta.
Default is the first value in the signal samples.
"""
signs = self.sign()
deltas = signs.delta(kwargs=kwargs)
return replace(self,
label=kwargs.get('label', f'{self.label}:left_negative'),
samples=map(lambda signum, delta: int(
(signum == 0 and delta == 1) or (
signum == 1 and delta == 2)),
signs, deltas))
[docs] def accumulate(self,
func: Callable[[Number, Number], Number] = operator.add,
**kwargs: Any) -> Trace:
""" Returns a new trace with the accumulated signal :attr:`samples`.
:param Callable[[Number, Number], Number] func: function to use for the
accumulation of the samples.
Default is the function :func:`operator.add`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:accumulate'),
samples=accumulate(self,
func=func))
[docs] def sums_positive(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the integrated signal :attr:`samples`
limited to positive sums.
:keyword str label: optional trace label to set
"""
samples = accumulate(self, lambda s, x: s + x if s + x > 0 else 0)
if self and self[0] < 0:
samples = list(samples)
samples[0] = 0
return replace(self,
label=kwargs.get('label', f'{self.label}:sums_positive'),
samples=samples)
[docs] def sums_negative(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the integrated signal :attr:`samples`
limited to negative sums.
:keyword str label: optional trace label to set
"""
samples = accumulate(self, lambda s, x: s + x if s + x < 0 else 0)
if self and self[0] > 0:
samples = list(samples)
samples[0] = 0
return replace(self,
label=kwargs.get('label', f'{self.label}:sums_negative'),
samples=samples)
[docs] def sin(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the sines of the signal :attr:`samples`
in radians.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:sin'),
samples=map(math.sin, self))
[docs] def cos(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the cosines of the signal :attr:`samples`
in radians.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:cos'),
samples=map(math.cos, self))
[docs] def tan(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the tangents of the signal :attr:`samples`
in radians.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:tan'),
samples=map(math.tan, self))
[docs] def asin(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the arc sines of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:asin'),
samples=map(math.asin, self))
[docs] def acos(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the arc cosines of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:acos'),
samples=map(math.acos, self))
[docs] def atan(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the arc tangents of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:atan'),
samples=map(math.atan, self))
[docs] def sinh(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the hyperbolic sines of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:sinh'),
samples=map(math.sinh, self))
[docs] def cosh(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the hyperbolic cosines of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:cosh'),
samples=map(math.cosh, self))
[docs] def tanh(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the hyperbolic tangents of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:tanh'),
samples=map(math.tanh, self))
[docs] def asinh(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the area hyperbolic sines of the
signal :attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:asinh'),
samples=map(math.asinh, self))
[docs] def acosh(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the area hyperbolic cosines of the
signal :attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:acosh'),
samples=map(math.acosh, self))
[docs] def atanh(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the area hyperbolic tangents of the
signal :attr:`samples`.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:atanh'),
samples=map(math.atanh, self))
[docs] def sort(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the sorted signal :attr:`samples` :math:`N`
in ascending order.
:keyword key: function of one argument that is used to extract a
comparison key from each sample. Default is ``None``.
:keyword bool reverse: if ``True`` the sorted signal :attr:`samples` are
returned in descending order.
"""
return replace(self,
label=f'{self.label}:sort',
samples=sorted(self, **kwargs))
[docs] def winsorize(self,
limits: (float |
tuple[Optional[float], Optional[float]] |
None) = None,
**kwargs: Any):
""" Returns a new trace with the winsorized signal :attr:`samples`
:math:`N`.
The nth-lowest sample values are set to the ``limits[0]``-th percentile
sample value, and the nth-highest sample values are set to the
``(1.0 - limits[1])``-th percentile sample value.
:param limits: either a pair of the percentages as floats between
``0.0`` and ``1.0`` to cut on each side of the sorted signal
samples in ascending order, or the percentage for both sides.
The value of one limit can be set to ``None`` to indicate an open
interval for this side.
:type limits: float | tuple[Optional[float], Optional[float]] | None
:keyword str label: optional trace label to set
"""
if not self or limits is None:
return self
return replace(self,
label=kwargs.get('label', f'{self.label}:winsorize'),
samples=winsorize(np.array(self), limits).data)
[docs] def index_of(self, sample: Sample) -> int:
""" Returns the *index* of the first occurrence of the *sample* :math:`x`
in the signal :attr:`samples` :math:`N` or ``-1`` if no sample could be
found.
:param sample: sample value to look for
"""
try:
return operator.indexOf(self, sample)
except ValueError:
return -1
[docs] def count(self, sample: Sample) -> int:
""" Returns the *number* of occurrences of the *sample* :math:`x` in the
signal :attr:`samples` :math:`N`.
:param sample: sample value to count
"""
return operator.countOf(self, sample)
[docs] def sum(self,
*operands: Operand,
**kwargs: Any) -> Trace | float:
""" Returns either a new trace with the sums of the signal :attr:`samples`
and the provided *operands*, or the sum :math:`\\sum\\limits_{i=0}^{N}{x_i}`
of the signal :attr:`samples` :math:`N` of the trace in case no *operands*
are provided.
An *operand* can be either a fix number or an array-like iterable.
:param operands: operands to sum up with each signal sample
:type operands: tuple[Operand, ...]
:keyword str label: optional trace label to set
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
if not operands:
return sum(self)
return replace(self,
label=kwargs.get('label', f'{self.label}:sum'),
samples=map(lambda t: sum(t),
zip(self, *map(vectorize, operands))))
[docs] def min(self,
*operands: Operand,
**kwargs: Any) -> Trace | Sample:
""" Returns either a new trace with the minimums of the signal
:attr:`samples` and the provided *operands*, or the minimum :math:`x_{min}`
in the signal :attr:`samples` :math:`N` of the trace in case no *operands*
are provided.
An *operand* can be either a fix number or an array-like iterable.
:param Operand operands: operands to compare with each signal sample
:type operands: tuple[Operand, ...]
:keyword str label: optional trace label to set
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
if not operands:
return min(self)
return replace(self,
label=kwargs.get('label', f'{self.label}:min'),
samples=map(lambda t: min(t),
zip(self, *map(vectorize, operands))))
[docs] def max(self,
*operands: Operand,
**kwargs: Any) -> Trace | Sample:
""" Returns either a new trace with the maximus of the signal
:attr:`samples` and the provided *operands*, or the maximum :math:`x_{max}`
in the signal :attr:`samples` :math:`N` of the trace in case no *operands*
are provided.
An *operand* can be either a fix number or an array-like iterable.
:param Operand operands: operands to compare with each signal sample
:type operands: tuple[Operand, ...]
:keyword str label: optional trace label to set
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
if not operands:
return max(self)
return replace(self,
label=kwargs.get('label', f'{self.label}:max'),
samples=map(lambda t: max(t),
zip(self, *map(vectorize, operands))))
[docs] def range(self) -> Union[float, int]:
""" Returns the *range* :math:`x_{max} - x_{min}` of the signal
:attr:`samples` :math:`N`."""
return max(self) - min(self)
[docs] def midrange(self) -> Union[float, int]:
""" Returns the *midrange* :math:`\\frac{x_{max} + x_{min}}{2}` of the
signal :attr:`samples` :math:`N`."""
return (max(self) + min(self)) / 2
[docs] def average(self,
*operands: Operand,
**kwargs: Any) -> Trace | float:
""" Returns either a new trace with the averages of the signal
:attr:`samples` and the provided *operands*, or the average
:math:`\\overline{x}` of the signal :attr:`samples` :math:`N` of the
trace in case no *operands* are provided.
An *operand* can be either a fix number or an array-like iterable.
:param Operand operands: operands to compute with each signal sample
:type operands: tuple[Operand, ...]
:keyword str label: optional trace label to set
.. note:: An iterable *operand* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
if not operands:
return sum(self) / len(self) if self else 0.0
return replace(self,
label=kwargs.get('label', f'{self.label}:avg'),
samples=map(lambda t: sum(t) / len(t),
zip(self, *map(vectorize, operands))))
[docs] def mean(self) -> Union[float, int]:
""" Returns the arithmetic *mean* :math:`\\overline{x}` of the signal
:attr:`samples` :math:`N`."""
return stats.mean(self) if self else 0.0
[docs] def weighted_mean(self) -> Union[float, int]:
""" Returns the linear weighted arithmetic *mean*
:math:`\\overline{x}_{w}` of the signal :attr:`samples` :math:`N`."""
n = len(self)
if not n:
return 0.0
if n < 2:
return self[0]
return (2 / (n * (n + 1))) * sum([w * x for w, x in enumerate(self, 1)])
[docs] def winsor_mean(self,
limits: (float |
tuple[Optional[float], Optional[float]] |
None) = None) -> Union[float, int]:
""" Returns the winsorized arithmetic *mean*
:math:`\\overline{x}_{w\\alpha}` of the signal :attr:`samples` :math:`N`.
:param limits: either a pair of the percentages as floats between
``0.0`` and ``1.0`` to cut on each side of the sorted signal
samples in ascending order, or the percentage for both sides.
The value of one limit can be set to ``None`` to indicate an open
interval for this side.
:type limits: float | tuple[Optional[float], Optional[float]] | None
"""
samples = self.winsorize(limits)
return stats.mean(samples) if samples else 0.0
[docs] def mode(self) -> Union[float, int]:
""" Returns the mode :math:`\\overline{x}_{mod}` of the signal
:attr:`samples` :math:`N`."""
return stats.mode(self) if self else 0.0
[docs] def rms(self) -> float:
""" Returns the *root-mean-square* :math:`x_{rms}` of the signal
:attr:`samples` :math:`N`."""
return math.sqrt(sum(self ** 2) / len(self)) if self else 0.0
[docs] def aad(self) -> float:
""" Returns the *average absolute deviation* :math:`D_{mean}` of the
signal :attr:`samples` :math:`N`."""
if self:
center = self.mean()
return sum(map(lambda x: abs(x - center), self)) / len(self)
else:
return 0.0
[docs] def mad(self) -> float:
""" Returns the *median absolute deviation* :math:`D_{median}` of the
signal :attr:`samples` :math:`N`."""
if self:
center = self.median()
return sum(map(lambda x: abs(x - center), self)) / len(self)
else:
return 0.0
[docs] def variance(self, **kwargs: Any) -> float:
""" Returns the biased sample *variance* :math:`\sigma^2` of the signal
:attr:`samples` :math:`N`.
:keyword float | int center: optional center value.
Default is the arithmetic :attr:`mean` of the signal :attr:`samples`.
:keyword bool unbiased: optional if ``True`` computes the unbiased
*variance* :math:`\sigma^2`. Default is ``False``.
"""
center = kwargs.get('center', self.mean())
count = len(self)
if count < 2:
return 0.0
if kwargs.get('unbiased', False):
count -= 1
return sum(map(lambda x: (x - center) ** 2, self)) / count
[docs] def std(self, **kwargs: Any) -> float:
""" Returns the biased sample *standard deviation* :math:`\sigma` of the
signal :attr:`samples` :math:`N`.
:keyword float | int center: optional center value.
Default is the arithmetic :attr:`mean` of the signal :attr:`samples`.
:keyword bool unbiased: optional if ``True`` computes the unbiased
*standard deviation* :math:`\sigma`. Default is ``False``.
"""
return math.sqrt(self.variance(**kwargs))
[docs] def coefficient(self, **kwargs: Any) -> float:
""" Returns the *coefficient of variation* :math:`c_v` of the signal
:attr:`samples` :math:`N`.
:keyword float | int center: optional center value.
Default is the arithmetic :attr:`mean` of the signal :attr:`samples`.
:keyword float | int std: optional standard deviation.
Default is the biased standard deviation :attr:`std` of the signal
:attr:`samples`.
:keyword bool unbiased: optional if ``True`` uses the unbiased
*standard deviation* :math:`\sigma`. Default is ``False``.
"""
center = kwargs.get('center', self.mean())
std = kwargs.get('std', self.std(**kwargs))
return std / center if center else 0.0
[docs] def skew(self, **kwargs: Any) -> float:
""" Returns the biased sample *skew* of the signal :attr:`samples`
:math:`N`.
:keyword float | int center: optional center value.
Default is the arithmetic :attr:`mean` of the signal :attr:`samples`.
:keyword float | int std: optional standard deviation.
Default is the biased standard deviation :attr:`std` of the signal
:attr:`samples`.
:keyword bool unbiased: optional if ``True`` uses the unbiased
*standard deviation* :math:`\sigma`. Default is ``False``.
"""
center = kwargs.get('center', self.mean())
std = kwargs.get('std', self.std(**kwargs))
count = len(self)
if count < 2 or not std:
return 0.0
return sum(map(lambda x: ((x - center) / std) ** 3, self)) / count
[docs] def kurtosis(self, **kwargs: Any) -> float:
""" Returns the biased sample *kurtosis* of the signal :attr:`samples`
:math:`N`.
:keyword float | int center: optional center value.
Default is the arithmetic :attr:`mean` of the signal :attr:`samples`.
:keyword float | int std: optional standard deviation.
Default is the biased standard deviation :attr:`std` of the signal
:attr:`samples`.
:keyword bool unbiased: optional if ``True`` uses the unbiased
*standard deviation* :math:`\sigma`. Default is ``False``.
"""
center = kwargs.get('center', self.mean())
std = kwargs.get('std', self.std(**kwargs))
count = len(self)
if count < 2 or not std:
return 0.0
return sum(map(lambda x: ((x - center) / std) ** 4, self)) / count
[docs] def zscore(self, **kwargs: Any) -> Trace:
""" Returns a new trace with the biased z-scored signal :attr:`samples`
:math:`N`.
:keyword float | int center: optional center value.
Default is the arithmetic :attr:`mean` of the signal :attr:`samples`.
:keyword float | int std: optional biased standard deviation.
Default is the biased standard deviation :attr:`std` of the signal
:attr:`samples`.
:keyword str label: optional trace label to set
"""
center = kwargs.get('center', self.mean())
std = kwargs.get('std', self.std(**kwargs))
return replace(self,
label=kwargs.get('label', f'{self.label}:zscore'),
samples=map(lambda x: ((x - center) / std) if std else 0.0,
self))
[docs] def clip(self,
lower: Optional[Operand] = None,
upper: Optional[Operand] = None,
**kwargs: Any) -> Trace:
""" Returns either a new trace with the signal :attr:`samples` limited
between the provided *lower* and *upper* bound, or the same trace in case
no bounds are provided.
A *bound* can be either a fix number or an array-like iterable.
:param Operand | None lower: optional lower bound to clip at.
Default is ``None``, this means the bound is not considered.
:param Operand | None upper: optional upper bound to clip at.
Default is ``None``, this means the bound is not considered.
:keyword str label: optional trace label to set
.. note:: The *upper* bound is dominant over the *lower* bound.
.. note:: An iterable *bound* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
if lower is None and upper is None:
return self
return replace(self,
label=kwargs.get('label', f'{self.label}:clip'),
samples=map(_clip,
self, *map(vectorize, (lower, upper))))
[docs] def clamp(self,
bound: Operand | None = None,
**kwargs: Any) -> Trace:
""" Returns either a new trace with the signal :attr:`samples` symmetrically
clamped at the provided *bound*, or the same trace in case no bound is
provided.
A *bound* can be either a fix number or an array-like iterable.
:param Operand | None bound: optional bound to clamp at.
Default is ``None``, this means the bound is not considered.
:keyword str label: optional trace label to set
.. note:: An iterable *bound* should have at least the same length
as the signal :attr:`samples`, otherwise only a subset of the signal
:attr:`samples` is returned!
"""
if bound is None:
return self
return replace(self,
label=kwargs.get('label', f'{self.label}:clamp'),
samples=map(_clamp, self, vectorize(bound)))
[docs] def interpolate(self,
x: Sequence[Union[int, float]],
y: Sequence[Union[int, float]],
**kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` of the
trace piecewise linear interpolated between the data points.
The data points are internally sorted by their x-coordinates to be in
ascending order.
:param Sequence[int | float] x: x-coordinates of the data points.
:param Sequence[int | float] y: y-coordinates of the data points.
:keyword str label: optional trace label to set
"""
# sort data points
x, y = zip(*sorted(zip(x, y)))
if len(x) < 2:
return self
return replace(self,
label=kwargs.get('label', f'{self.label}:interpolate'),
samples=np.interp(self, x, y) if self else list())
[docs] def delay(self,
on: int = 0,
off: int = 0,
**kwargs: Any) -> Trace:
""" Returns a new trace with the change-rate limited signal
:attr:`samples` of the binary signal trace.
The change-rate of the binary signal :attr:`samples` is limited by the
numbers of samples to delay the *on*-state (``1``), and by the numbers
of samples to delay the *off*-state (``0``) of the binary signal trace.
:param int on: optional number of samples to delay the on-state.
Default is ``0``.
:param int off: optional number of samples to delay the off-state.
Default is ``0``.
:keyword str label: optional trace label to set
:keyword int preset: optional preset value (``0|1``) of the delay.
Default is the first value in the signal samples.
"""
level = kwargs.get('preset', self[0] if self else 0)
rates = [
max(0, int(on)) if on else 0,
max(0, int(off)) if off else 0
]
counters = [
rates[0] if level else 0,
rates[1] if not level else 0,
]
samples = list()
for value in self:
if value:
# delay-on
if counters[0] >= rates[0]:
counters[1] = 0
level = value
else:
counters[0] += 1
else:
# delay-off
if counters[1] >= rates[1]:
counters[0] = 0
level = value
else:
counters[1] += 1
samples.append(level)
return replace(self,
label=kwargs.get('label', f'{self.label}:delay'),
samples=samples)
[docs] def ramp(self,
limits: Optional[tuple[Optional[Operand], Optional[Operand]]] = None,
**kwargs: Any) -> Trace:
""" Returns either a new trace with the signal :attr:`samples` limited
by the maximal allowed positive and/or negative delta *limits* between
consecutive :attr:`samples`, or the same trace in case no *limits* are
provided.
The maximal allowed *positive* delta limit can be either a fix number
or an array-like iterable.
The maximal allowed *negative* delta limit can be either a fix number
or an array-like iterable.
:param limits: optional a pair with the maximal allowed positive and
negative deltas between consecutive samples.
The value of one limit can be set to ``None`` to indicate an
unlimited ramp for this side.
:type limits: Optional[tuple[Optional[Operand], Optional[Operand]]]
:keyword float preset: optional preset value of the ramp.
Default is the first value in the signal samples.
:keyword str label: optional trace label to set
.. note:: An iterable maximal allowed *positive* or *negative* step
should have at least the same length as the signal :attr:`samples`,
otherwise only a subset of the signal :attr:`samples` is returned!
"""
if not limits:
return replace(self,
label=f'{self.label}',
samples=self[:])
level = kwargs.get('preset', self[0] if self else 0)
samples = list()
for values in zip(self, vectorize(limits[0]), vectorize(limits[1])):
sample, delta_positive, delta_negative = values
if delta_positive is not None:
delta_positive = max(delta_positive, 0)
if delta_negative is not None:
delta_negative = min(delta_negative, 0)
delta = sample - level
if delta_positive is not None and delta > delta_positive:
level += delta_positive
elif delta_negative is not None and delta < delta_negative:
level += delta_negative
else:
level = sample
samples.append(level)
return replace(self,
label=kwargs.get('label', f'{self.label}:ramp'),
samples=samples)
[docs] def slew(self,
limits: tuple[Optional[Operand], Optional[Operand]],
hold: Optional[Operand] = None,
**kwargs: Any) -> SlewRateLimiterTraces:
""" Slew-rate limiter over the signal :attr:`samples` of the trace.
The maximal allowed *positive* delta limit can be either a fix number
or an array-like iterable.
The maximal allowed *negative* delta limit can be either a fix number
or an array-like iterable.
The number of samples to hold the previous sample value when the one of
the configured slew-rates becomes violated can be either a fix number
or an array-like iterable.
:param limits: a pair with the maximal allowed positive and negative
deltas between consecutive samples.
The value of one limit can be set to ``None`` to indicate an
unlimited ramp for this side.
:type limits: tuple[Optional[Operand], Optional[Operand]]
:param hold: optional number of samples to hold the
previous sample value when one of the configured slew-rates becomes
violated.
:type hold: Operand | None
:keyword float preset: optional preset value of the limiter.
Default is the first value in the signal samples.
:keyword str label: optional trace label to set
"""
level = kwargs.get('preset', self[0] if self else 0)
counter = 0
samples = list()
for values in zip(self,
vectorize(limits[0]),
vectorize(limits[1]),
vectorize(hold)):
sample, delta_positive, delta_negative, count = values
if delta_positive is not None:
delta_positive = max(delta_positive, 0)
if delta_negative is not None:
delta_negative = min(delta_negative, 0)
count = 0 if count is None else max(count, 0)
delta = sample - level
if delta_positive is not None and delta > delta_positive:
if counter >= count:
level += delta_positive
else:
counter += 1
active = 1
elif delta_negative is not None and delta < delta_negative:
if counter >= count:
level += delta_negative
else:
counter += 1
active = 1
else:
level = sample
active = 0
counter = 0
results = SlewRateLimiter(
level=level,
deviation=sample - level if active else 0,
active=active)
samples.append(asdict(results))
traces = dict()
label = kwargs.get('label', f'{self.label}:slew')
for key, value in as_traces(samples).items():
traces[key] = replace(self,
label=f'{label}:{key}',
samples=value)
return SlewRateLimiterTraces(**traces)
[docs] def band_pass(self, dt: float, f0: float, q: float = 1 / math.sqrt(2),
**kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` filtered with a
second-order IIR band-pass filter.
:param float dt: sampling-time of the :attr:`~Trace.samples` (filter)
in seconds
:param float f0: center frequency of the filter in Hertz
:param float q: inverse of bandwidth factor of the filter
:keyword str label: optional trace label to set
"""
_filter = IIRFilter.band_pass(dt, f0, q)
return replace(self,
label=kwargs.get('label', f'{self.label}:band-pass'),
samples=map(_filter.compute, self))
[docs] def low_pass(self, dt: float, f0: float, q: float = 1 / math.sqrt(2),
**kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` filtered with a
second-order IIR low-pass filter.
:param float dt: sampling-time of the :attr:`~Trace.samples` (filter)
in seconds
:param float f0: cutoff frequency of the filter in Hertz
:param float q: inverse of bandwidth factor of the filter
:keyword str label: optional trace label to set
"""
_filter = IIRFilter.low_pass(dt, f0, q)
return replace(self,
label=kwargs.get('label', f'{self.label}:low-pass'),
samples=map(_filter.compute, self))
[docs] def high_pass(self, dt: float, f0: float, q: float = 1 / math.sqrt(2),
**kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` filtered with a
second-order IIR high-pass filter.
:param float dt: sampling-time of the :attr:`~Trace.samples` (filter)
in seconds
:param float f0: cutoff frequency of the filter in Hertz
:param float q: inverse of bandwidth factor of the filter
:keyword str label: optional trace label to set
"""
_filter = IIRFilter.high_pass(dt, f0, q)
return replace(self,
label=kwargs.get('label', f'{self.label}:high-pass'),
samples=map(_filter.compute, self))
[docs] def notch(self, dt: float, f0: float, q: float = 1 / math.sqrt(2),
**kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` filtered with a
second-order IIR notch filter.
:param float dt: sampling-time of the :attr:`~Trace.samples` (filter)
in seconds
:param float f0: center frequency of the filter in Hertz
:param float q: inverse of bandwidth factor of the filter
:keyword str label: optional trace label to set
"""
_filter = IIRFilter.notch(dt, f0, q)
return replace(self,
label=kwargs.get('label', f'{self.label}:notch'),
samples=map(_filter.compute, self))
[docs] def all_pass(self, dt: float, f0: float, q: float = 1 / math.sqrt(2),
**kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` filtered with a
second-order IIR all-pass filter.
:param float dt: sampling-time of the :attr:`~Trace.samples` (filter)
in seconds
:param float f0: center frequency of the filter in Hertz
:param float q: inverse of bandwidth factor of the filter
:keyword str label: optional trace label to set
"""
_filter = IIRFilter.all_pass(dt, f0, q)
return replace(self,
label=kwargs.get('label', f'{self.label}:all-pass'),
samples=map(_filter.compute, self))
@staticmethod
def _damping_factor(ratio: Union[int, float]) -> float:
if isinstance(ratio, int):
factor = 1.0 - (1 / ratio) if ratio > 1 else 0.0
else:
factor = _clip(1.0 - ratio, 0.0, 1.0)
return factor
[docs] def dt1(self,
ratio: Operand,
gain: Operand = 1,
**kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` of the trace
processed by the transfer function of a DT1-element.
:param Operand ratio: integer ratios between damping time and sampling
time, or floating-point ratios between sampling time and damping
time.
:param Operand gain: proportional gain of the DT1-element.
:keyword float preset: optional preset value of the DT1-element.
Default is the first value in the signal samples.
:keyword str label: optional trace label to set
"""
y = kwargs.get('preset', self[0] if self else 0.0)
previous = self[0] if self else 0.0
samples = list()
for value, factor, p in zip(self.samples,
map(self._damping_factor,
vectorize(ratio)),
vectorize(gain)):
y = y * factor + (value - previous)
previous = value
samples.append(p * y)
return replace(self,
label=kwargs.get('label', f'{self.label}:dt1'),
samples=samples)
@staticmethod
def _smoothing_factor(ratio: Union[int, float]) -> float:
if isinstance(ratio, int):
factor = 1.0 / ratio if ratio > 1 else 1.0
else:
factor = _clip(ratio, 0.0, 1.0)
return factor
[docs] def pt1(self,
ratio: Operand,
gain: Operand = 1,
**kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` of the
trace processed by the transfer function of a PT1-element.
:param Operand ratio: integer ratio between smoothing time and sampling
time, or floating-point ratio between sampling time and smoothing
time.
:param Operand gain: proportional gain of the PT1-element.
:keyword float preset: optional preset value of the PT1-element.
Default is the first value in the signal samples.
:keyword str label: optional trace label to set
"""
y = kwargs.get('preset', self[0] if self else 0.0)
samples = list()
for value, factor, p in zip(self.samples,
map(self._smoothing_factor,
vectorize(ratio)),
vectorize(gain)):
y += (value - y) * factor
samples.append(p * y)
return replace(self,
label=kwargs.get('label', f'{self.label}:pt1'),
samples=samples)
[docs] def alpha_beta_filter(self,
dt: float,
alpha: Operand,
beta: Operand,
**kwargs: Any) -> AlphaBetaFilterTraces:
""" Alpha-beta filter over the signal :attr:`samples` of the trace.
:param float dt: sampling-time of the :attr:`~Trace.samples` (filter)
in seconds
:param Operand alpha: adaption factor for the level prediction
``[0.0: Ignore signal sample, ..., 1.0[``.
:param Operand beta: adaption factor for the trend prediction
``[0.0: Ignore signal sample, ..., 2.0]``.
:keyword float level: optional initial level of the alpha-beta filter.
Default is the first value in the signal samples.
:keyword float trend: optional initial trend of the alpha-beta filter.
Default is ``0.0``.
:keyword str label: optional trace label to set
"""
# initialize alpha-beta filter
level = kwargs.get('level', self.samples[0])
trend = kwargs.get('trend', 0.0)
samples = list()
for value, alpha, beta in zip(self.samples,
vectorize(alpha),
vectorize(beta)):
# Forecast value (prediction)
forecast = level + trend * dt
# Prediction residual
residual = value - forecast
# Level value (estimation)
level = forecast + (alpha * residual)
# Trend value (estimation)
trend += (beta * residual) / dt
denominator = alpha * (4 - 2 * alpha - beta)
# Forecast variance reduction factor (prediction)
variance_forecast = (2 * alpha ** 2 + 2 * beta + alpha * beta)
variance_forecast /= denominator
# Level variance reduction factor (estimation)
variance_level = (2 * alpha ** 2 + 2 * beta - 3 * alpha * beta)
variance_level /= denominator
# Trend variance reduction factor (estimation)
variance_trend = (2 * beta ** 2) / (dt ** 2)
variance_trend /= denominator
# results of the alpha-beta filter
results = AlphaBetaFilter(
forecast=forecast,
forecast_sign=sign(forecast),
level=level,
level_sign=sign(level),
trend=trend,
trend_sign=sign(trend),
trend_inflection=sign(level - forecast),
error=residual,
variance_forecast=variance_forecast,
variance_level=variance_level,
variance_trend=variance_trend)
samples.append(asdict(results))
traces = dict()
label = kwargs.get('label', f'{self.label}:filter')
for key, value in as_traces(samples).items():
traces[key] = replace(self,
label=f'{label}:{key}',
samples=value)
return AlphaBetaFilterTraces(**traces)
[docs] def exponential(self,
alpha: float,
**kwargs: Any) -> ExponentialSmoothingTraces:
""" Second-order exponential smoothing over the signal
:attr:`samples` of the trace.
:param float alpha: smoothing factor ``[0.0:freeze..1.0:transparent]``
:keyword float level: optional initial level of the exponential smoothing.
Default is the first value in the signal samples.
:keyword float trend: optional initial trend of the exponential smoothing.
Default is ``0.0``.
:keyword str label: optional trace label to set
"""
# initialize exponential smoothing
level = kwargs.get('level', self.samples[0])
trend = kwargs.get('trend', 0.0)
alpha = _clip(alpha, 0.0, 1.0)
prognosis = [
(level - ((1 - alpha) / alpha) * trend if alpha > 0 else 0),
(level - 2 * ((1 - alpha) / alpha) * trend if alpha > 0 else 0)
]
# initialize signal statistic
absolute_error = 0.0
variance = 0.0
skew = 0.0
kurtosis = 0.0
samples = list()
for value in self.samples:
# prognosis residual
residual = value - ((2 * prognosis[0] - prognosis[1]) + trend)
# empirical absolute error of the distribution (1st central moment)
absolute_error = (1 - alpha) * (absolute_error + alpha * abs(residual))
# empirical variance of the distribution (2nd central moment)
variance = (1 - alpha) * (variance + alpha * (residual ** 2))
# empirical standard deviation of the distribution
deviation = math.sqrt(variance)
# empirical skew of the distribution (3rd central moment)
skew = (1.0 - alpha) * (skew + alpha * (
(residual / deviation) ** 3 if deviation != 0.0 else 0.0))
# empirical kurtosis of the distribution (4th central moment)
kurtosis = (1.0 - alpha) * (kurtosis + alpha * (
(residual / deviation) ** 4 if deviation != 0.0 else 0.0))
# first-order exponentially smoothed value
smoothed1 = (1.0 - alpha) * prognosis[0] + alpha * value
# second-order exponentially smoothed value
smoothed2 = (1.0 - alpha) * prognosis[1] + alpha * smoothed1
# prognosis level (exponentially smoothed signal value)
level = 2 * smoothed1 - smoothed2
# prognosis trend
trend = (alpha / (1 - alpha)) * (
smoothed1 - smoothed2) if alpha < 1 else 0
# prognosis forecast
forecast = level + trend
# results of the 2nd-order exponential smoothing
results = ExponentialSmoothing(
forecast=forecast,
forecast_sign=sign(forecast),
level=level,
level_sign=sign(level),
prognosis1=prognosis[0],
prognosis2=prognosis[1],
prognosis=2 * prognosis[0] - prognosis[1],
smoothed1=smoothed1,
smoothed2=smoothed2,
trend=trend,
trend_sign=sign(trend),
trend_inflection=sign(forecast - level),
error=residual,
correction=alpha * residual,
absolute_error=absolute_error,
variance=variance,
deviation=deviation,
skew=skew,
kurtosis=kurtosis)
# new prognosis values
prognosis = [smoothed1, smoothed2]
samples.append(asdict(results))
traces = dict()
label = kwargs.get('label', f'{self.label}:exponential')
for key, value in as_traces(samples).items():
traces[key] = replace(self,
label=f'{label}:{key}',
samples=value)
return ExponentialSmoothingTraces(**traces)
[docs] def window(self,
size: int,
**kwargs: Any) -> Iterator[tuple[Sample, ...]]:
""" Moving window generator over the signal :attr:`samples` of the trace.
:param int size: moving window size (number of samples)
:keyword float preset: optional preset value to fill the moving window.
Default is the first value in the signal samples.
"""
if not self or size <= 0:
yield from self.samples
else:
# moving windows iterator
windows = tee(
chain(repeat(kwargs.get('preset', self[0]), size - 1),
iter(self)), size)
yield from zip(
*(islice(window, i, None) for i, window in enumerate(windows)))
[docs] def moving_events(self,
size: int,
value: Sample = 1,
**kwargs: Any) -> Trace:
""" Returns a new trace with the number of *value* occurrences (events)
in the moving window over the signal :attr:`samples` of the trace.
:param int size: moving window size
:param value: value to check. Default is ``1``.
:keyword int preset: optional preset value to fill the moving window.
Default is ``None``.
:keyword str label: optional trace label to set
"""
samples = list()
for values in self.window(size, preset=kwargs.get('preset', None)):
samples.append(list(values).count(value))
return replace(self,
label=kwargs.get('label', f'{self.label}:events'),
samples=samples)
[docs] def moving_average(self,
size: int,
**kwargs: Any) -> MovingAverageTraces:
""" Moving average over the signal :attr:`samples` of the trace.
:param int size: moving window size (number of samples)
:keyword float preset: optional preset value to fill the moving window.
Default is the first value in the signal samples.
"""
samples = list()
for values in self.window(size, **kwargs):
trace = Trace(samples=values)
mean = trace.mean()
minimum = trace.min()
maximum = trace.max()
deviation = trace.std(center=mean)
result = Statistics(
mean=mean,
weighted_mean=trace.weighted_mean(),
median=trace.median(),
mode=trace.mode(),
rms=trace.rms(),
minimum=trace.min(),
maximum=trace.max(),
range=maximum - minimum,
midrange=trace.midrange(),
absolute_error=sum(abs(trace - mean)),
variance=trace.variance(center=mean),
deviation=trace.std(center=mean),
coefficient=trace.coefficient(center=mean, std=deviation),
skew=trace.skew(center=mean, std=deviation),
kurtosis=trace.kurtosis(center=mean, std=deviation)
)
samples.append(asdict(result))
traces = dict()
label = kwargs.get('label', f'{self.label}:average')
for key, value in as_traces(samples).items():
traces[key] = Trace(label=f'{label}:{key}',
samples=value)
return MovingAverageTraces(**traces)
[docs] def moving_differential(self,
size: int,
**kwargs: Any) -> Trace:
""" Moving differential over the signal :attr:`samples` of the trace.
:param int size: moving window size (number of samples)
:keyword float preset: optional preset value to fill the moving window.
Default is the first value in the signal samples.
:keyword str label: optional trace label to set
"""
return replace(self,
label=kwargs.get('label', f'{self.label}:differential'),
samples=[((y[-1] - y[0]) / (size - 1)) if
size > 1 else 0 for y in
self.window(size, **kwargs)])
[docs] def moving_regression(self,
size: int,
**kwargs: Any) -> LinearRegressionTraces:
""" Moving linear regression over the signal :attr:`samples` of the
trace.
:param int size: moving window size (> 1)
:keyword float preset: optional preset value to fill the moving window.
Default is the first value in the signal samples.
:keyword str label: optional trace label to set
"""
if not self.samples:
return LinearRegressionTraces()
if size < 2:
return LinearRegressionTraces(level=self)
samples = list()
# x-coordinate values
x_values = list(range(size))
for y_values in self.window(size, **kwargs):
# arithmetic mean of the x-coordinate values
mean_x = stats.mean(x_values)
# arithmetic mean of the y-coordinate values (1st central moment)
mean_y = stats.mean(y_values)
# median of the y-coordinate values
median_y = stats.median(y_values)
# residuals of the x-coordinate values
residuals_x = list(map(lambda x: x - mean_x,
x_values))
# residuals of the y-coordinate values
residuals_y = list(map(lambda y: y - mean_y,
y_values))
# sum of the squared x-coordinate residuals
dxdx_sum = sum(map(lambda x: x ** 2,
residuals_x))
# sum of the xy-coordinate residual products
dxdy_sum = sum(map(lambda p: p[0] * p[1],
zip(residuals_x, residuals_y)))
# slope of the approximated line
slope = dxdy_sum / dxdx_sum
# y-intercept of the approximated line
intercept = mean_y - slope * mean_x
# current y-coordinate of the approximated line
level = slope * x_values[-1] + intercept
# minimum of the y-coordinate values
minimum = min(y_values)
# maximum of the y-coordinate values
maximum = max(y_values)
# deviations of the y-coordinates from the approximated line
errors = list(
map(lambda point: point[1] - (slope * point[0] + intercept),
enumerate(y_values)))
# maximal positive y-coordinate deviation from the approximated line
positive_error = max(errors)
# maximal negative y-coordinate deviation from the approximated line
negative_error = min(errors)
# empirical absolute error of the distribution (1st central moment)
absolute_error = sum(map(abs, errors))
# empirical variance of the distribution (2nd central moment)
variance = sum(map(lambda e: e ** 2, errors))
# empirical standard deviation of the distribution
deviation = math.sqrt(variance)
# empirical skew of the distribution (3rd central moment)
skew = sum(
map(lambda e: (e / deviation) ** 3 if deviation != 0.0 else 0.0,
errors))
# empirical kurtosis of the distribution (4th central moment)
kurtosis = sum(
map(lambda e: (e / deviation) ** 4 if deviation != 0.0 else 0.0,
errors))
# results of the linear regression
results = LinearRegression(
level=level,
slope=slope,
intercept=intercept,
mean=mean_y,
median=median_y,
error=errors[-1],
minimum=minimum,
maximum=maximum,
range=maximum - minimum,
negative_error=negative_error,
positive_error=positive_error,
absolute_error=absolute_error,
variance=variance,
deviation=deviation,
skew=skew,
kurtosis=kurtosis)
samples.append(asdict(results))
traces = dict()
label = kwargs.get('label', f'{self.label}:regression')
for key, value in as_traces(samples).items():
traces[key] = replace(self,
label=f'{label}:{key}',
samples=value)
return LinearRegressionTraces(**traces)
[docs] def move(self,
number: int,
**kwargs: Any) -> Trace:
""" Returns a new trace with the moved signal :attr:`samples` by the
number of samples either to the right or to the left.
A positive *number* moves the signal :attr:`samples` to the right. This
means a *number* of the first value in signal :attr:`samples` is inserted
at the beginning of the signal :attr:`samples` to move as the *fill*
value, and the *number* of the last signal :attr:`samples` are removed
from the signal :attr:`samples` to move.
A negative *number* moves the signal :attr:`samples` to the left. This
means a *number* of the last value in the signal :attr:`samples` is
appended at end of the signal :attr:`samples` to move as the *fill*
value, and the *number* of the first signal :attr:`samples` are removed
from the signal :attr:`samples` to move.
:param int number: number of samples to move to the right (``> 0``) or
to the left (``< 0``).
:keyword float fill: optional fill value.
Default is the first or last value in the signal samples.
:keyword str label: optional trace label to set
"""
# copy samples
samples = self[:]
if not samples or not number:
# nothing to move
pass
elif number > 0:
# move to right
number = min(number, len(samples))
fill = kwargs.get('fill', samples[0])
samples = [fill] * number + samples[:-number]
elif number < 0:
# move to left
number = min(-number, len(samples))
fill = kwargs.get('fill', samples[-1])
samples = samples[number:]
samples += [fill] * number
return replace(self,
label=kwargs.get('label', f'{self.label}:move'),
samples=samples)
[docs] def slice(self,
*args: tuple[int, ...],
**kwargs: Any) -> Trace:
""" Returns a new trace with the signal :attr:`samples` sliced with the
built-in function :func:`slice` in the range given by the arguments
*args*.
:param tuple[int, ...] args: optional *start*, *stop* index and *step*
for slicing the signal :attr:`samples`.
:keyword str label: optional trace label to set
"""
if not args:
samples = self.samples[:]
else:
samples = self.samples[slice(*args)]
return replace(self,
label=kwargs.get('label', f'{self.label}:slice'),
samples=samples if samples else list())
[docs] def iter_x(self) -> Iterator[int]:
""" Returns an iterator for the x-coordinates of the trace.
"""
return range(len(self))
[docs] def iter_point(self) -> Iterator[Point2D]:
""" Returns an iterator for the x,y-points of the trace.
"""
for x, y in zip(self.iter_x(), self):
yield Point2D(x, y)
@property
def x_values(self) -> list[int]:
""" Returns a list with the x-coordinates of the trace.
"""
return list(self.iter_x())
@property
def y_values(self) -> list[Sample]:
""" Returns a list with the y-coordinates of the trace.
"""
return list(self)
@property
def digital(self) -> Digital:
""" Returns a new digital trace with *shared* signal :attr:`samples`.
"""
return Digital(**asdict(self))
[docs] def plot(self,
index: Optional[int] = None,
n: Optional[int] = None,
**kwargs: Any) -> go.Scatter:
""" Returns the scatter plot of the trace.
:param int index: optional start index of the samples to plot
:param int n: optional number of samples to plot
"""
# pack x,y-pairs
pairs = list(zip(self.iter_x(), self))
if pairs:
# slice pairs
index = 0 if index is None else max(0, index)
if n is None:
pairs = pairs[index:]
elif n <= 0:
pairs = pairs[index:index + 1]
else:
pairs = pairs[index:index + n]
# unpack x,y-pairs
x, y = zip(*pairs)
else:
x = list()
y = list()
# default settings
settings = dict(
x=x,
y=y,
name=self.label,
mode='lines',
)
# create plot with default settings
_plot = go.Scatter(**settings)
# update plot
return _plot.update(kwargs)
@dataclass
class Digital(Trace):
""" Trace data class for time-discrete digital signal samples.
"""
def plot(self,
index: Optional[int] = None,
n: Optional[int] = None,
**kwargs: Any) -> go.Scatter:
""" Returns the plot of the digital signal trace.
:param int index: optional start index of the samples to plot
:param int n: optional number of samples to plot
"""
# pack x,y-pairs
pairs = list(zip(self.iter_x(), self))
if pairs:
# slice pairs
index = 0 if index is None else max(0, index)
if n is None:
pairs = pairs[index:]
elif n <= 0:
pairs = pairs[index:index + 1]
else:
pairs = pairs[index:index + n]
# unpack x,y-pairs
x, y = zip(*pairs)
else:
x = list()
y = list()
# default settings
settings = dict(
x=x,
y=y,
name=self.label,
mode='lines',
line=dict(shape='hv')
)
# create plot with default settings
_plot = go.Scatter(**settings)
# update plot
return _plot.update(kwargs)
[docs]def logical_and(*operands: Trace | Iterable,
**kwargs: Any) -> Trace:
""" Returns a new trace with the logical ANDed values of the given iterable
*operands*.
:param Trace | Iterable] operands: trace or iterable
:keyword str label: optional trace label to set.
Default is ``'And'``.
.. note:: The *operands* should have the same length, otherwise only a
subset of the signal :attr:`samples` is returned!
"""
label = kwargs.get('label', 'And')
if not operands:
return Trace(label)
samples = list()
for t in zip(*operands):
samples.append(int(all(t)))
return Trace(label, samples)
[docs]def logical_or(*operands: Trace | Iterable,
**kwargs: Any) -> Trace:
""" Returns a new trace with the logical ORed values of the given iterable
*operands*.
:param Trace | Iterable operands: trace or iterable
:keyword str label: optional trace label to set.
Default is ``'Or'``.
.. note:: The *operands* should have the same length, otherwise only a
subset of the signal :attr:`samples` is returned!
"""
label = kwargs.get('label', 'Or')
if not operands:
return Trace(label)
samples = list()
for t in zip(*operands):
samples.append(int(any(t)))
return Trace(label, samples)
[docs]def priority(*operands: Trace | Iterable,
**kwargs: Any) -> Trace:
""" Returns a new trace with the priority numbers defined by the order of
the given *operands*, and determined from the ``Truth=1`` values of the
given *operands*.
The priority starts with ``0`` as the *highest* priority number for the
``Truth=1`` values of the first given iterable operand to the lowest
priority number determined by the number of the given *operands*.
The iterable *operands* must contain either boolean values or integer
values ``1`` or ``0``.
:param Trace | Iterable operands: trace or iterable to prioritize
:keyword int highest: optional number for the highest priority.
Default is ``0``.
:keyword str label: optional trace label to set.
Default is ``'Priority'``.
.. note:: The *operands* should have the same length, otherwise only a
subset of the signal :attr:`samples` is returned!
"""
label = kwargs.get('label', 'Priority')
if not operands:
return Trace(label, list())
highest = int(kwargs.get('highest', 0))
lowest = len(operands) + highest
samples = list()
for t in zip(*operands):
try:
samples.append(tuple(map(int, t)).index(1) + highest)
except ValueError:
samples.append(lowest)
return Trace(label, samples)
[docs]def as_traces(samples: Sequence[dict[str, Any]]) -> dict[str, list[Sample]]:
""" Converts the list of samples into the trace format."""
if not samples:
return dict()
names = list(samples[0].keys())
trace = defaultdict(list)
for sample in samples:
for name in names:
trace[name].append(sample[name])
return dict(trace)
[docs]@dataclass(eq=False)
class Traces(MutableMapping):
""" Base data class for a collection of traces with mutable mapping support.
"""
[docs] def labels(self) -> tuple[str, ...]:
""" Returns a tuple of the trace labels in the collection."""
return tuple([trace.label for trace in self.values()])
[docs] def relabel(self, label: str) -> Traces:
""" Relabels the all traces with in the collection by keeping the last
part of the trace :attr:`~Trace.label` unchanged and the rest replaced
by the base *label*.
:param str label: base label to set for the traces in the collection
"""
for field_ in fields(self):
trace = getattr(self, field_.name)
if isinstance(trace, Trace):
trace.label = f"{label}:{trace.label.split(':')[-1]}"
return self
def __setitem__(self, key: str, value: Trace):
self.__dict__[key] = value
def __getitem__(self, key: str) -> Trace:
return self.__dict__[key]
def __delitem__(self, key: str):
raise NotImplemented()
def __iter__(self) -> Iterator[str]:
return iter(self.__dict__)
def __len__(self) -> int:
return len(self.__dict__)
[docs] def fields(self) -> tuple[Field, ...]:
""" Returns a tuple describing the fields of the data class."""
return fields(self)
[docs] def as_dict(self) -> dict[str, Any]:
""" Returns the collection of traces as a dictionary.
"""
return asdict(self)
[docs] def as_tuple(self) -> tuple[Any, ...]:
""" Returns the collection of traces as a tuple.
"""
return astuple(self)
[docs]@dataclass(eq=False)
class StatisticsTraces(Traces):
""" Collection of :class:`Statistics` traces for a set of signal samples
statistically analyzed."""
#: Arithmetic mean of the set (1st common moment).
mean: Trace = field(default_factory=Trace)
#: Linear weighted mean of the set.
weighted_mean: Trace = field(default_factory=Trace)
#: Median of the set.
median: Trace = field(default_factory=Trace)
#: Mode of the set.
mode: Trace = field(default_factory=Trace)
#: Root mean square of the set
rms: Trace = field(default_factory=Trace)
#: Minimum in the set.
minimum: Trace = field(default_factory=Trace)
#: Maximum in the set
maximum: Trace = field(default_factory=Trace)
#: Range of the set.
range: Trace = field(default_factory=Trace)
#: Mid-range of the set.
midrange: Trace = field(default_factory=Trace)
#: Empirical absolute error of the set (1st central moment).
absolute_error: Trace = field(default_factory=Trace)
#: Empirical biased sample variance of the set (2nd central moment).
variance: Trace = field(default_factory=Trace)
#: Empirical biased sample standard deviation of the set.
deviation: Trace = field(default_factory=Trace)
#: Empirical biased sample coefficient of variation of the set.
coefficient: Trace = field(default_factory=Trace)
#: Empirical biased sample skew of the set (3rd central moment).
skew: Trace = field(default_factory=Trace)
#: Empirical biased sample kurtosis of the set (4th central moment).
kurtosis: Trace = field(default_factory=Trace)
[docs]@dataclass(eq=False)
class MovingAverageTraces(StatisticsTraces):
""" Collection of :class:`Statistics` traces for signal samples processed
with a simple moving average.
"""
[docs]@dataclass(eq=False)
class SetTraces(StatisticsTraces):
""" Collection of :class:`Statistics` traces for the combined set of
multiple signal samples."""
[docs] @classmethod
def from_traces(cls,
*operands: Operand,
**kwargs: Any) -> SetTraces:
""" Returns the collection of :class:`Statistics` traces computed over
the combined sets of the *operands*.
An operand can be either a number or an array-like iterable.
:param operands: operands to compute the statistics with
:type operands: tuple[Operand, ...]
:keyword str label: optional traces base label to set
.. note:: All iterable *operands* must have the same length, otherwise
only a subset of the operand values is returned!
"""
instance = cls(**combine(*operands, **kwargs))
label = kwargs.get('label')
if label is not None:
return instance.relabel(label)
else:
return instance
[docs]def combine(*operands: Operand,
**kwargs: Any) -> dict[str, Trace]:
""" Returns a dictionary with the traces for the statistics results computed
over the combined sets of *operands*.
An operand can be either a number or an array-like iterable.
:param Operand operands: operands to compute the statistics with
:type operands: tuple[Operand, ...]
:keyword str label: optional trace label stem to set
.. note:: All iterable *operands* must have the same length, otherwise
only a subset of the operand values is returned!
"""
if not operands:
return dict()
samples = list()
for values in zip(*map(vectorize, operands)):
trace = Trace(samples=values)
mean = trace.mean()
minimum = trace.min()
maximum = trace.max()
deviation = trace.std(center=mean)
result = Statistics(
mean=mean,
weighted_mean=trace.weighted_mean(),
median=trace.median(),
mode=trace.mode(),
rms=trace.rms(),
minimum=trace.min(),
maximum=trace.max(),
range=maximum - minimum,
midrange=trace.midrange(),
absolute_error=sum(abs(trace - mean)),
variance=trace.variance(center=mean),
deviation=trace.std(center=mean),
coefficient=trace.coefficient(center=mean, std=deviation),
skew=trace.skew(center=mean, std=deviation),
kurtosis=trace.kurtosis(center=mean, std=deviation)
)
samples.append(asdict(result))
traces = dict()
label = kwargs.get('label', 'SetTrace')
for key, value in as_traces(samples).items():
traces[key] = Trace(label=f'{label}:{key}',
samples=value)
return traces
[docs]@dataclass(eq=True, order=True, frozen=True)
class Statistics:
""" Statistics results."""
#: Arithmetic mean of the set (1st common moment)
mean: float = 0.0
#: Weighted mean of the set
weighted_mean: float = 0.0
#: Median of the set
median: float = 0.0
#: Mode of the set
mode: float = 0.0
#: Root mean square of the set
rms: float = 0.0
#: Minimum in the set
minimum: float = 0.0
#: Maximum in the set
maximum: float = 0.0
#: Range of the set
range: float = 0.0
#: Mid-range of the set
midrange: float = 0.0
#: Empirical absolute error of the set (1st central moment).
absolute_error: float = 0.0
#: Empirical biased sample variance of the set (2nd central moment).
variance: float = 0.0
#: Empirical biased sample standard deviation of the set.
deviation: float = 0.0
#: Empirical biased sample coefficient of variation of the set.
coefficient: float = 0.0
#: Empirical biased sample skew of the set (3rd central moment).
skew: float = 0.0
#: Empirical biased sample kurtosis of the set (4th central moment).
kurtosis: float = 0.0
[docs]@dataclass(eq=False)
class VectorTraces(Traces):
""" Collection of traces for two signal samples converted into a vector
represented in polar coordinates.
"""
#: Trace with cartesian x-coordinates of the vector
x: Trace = field(default_factory=Trace)
#: Trace with cartesian y-coordinates of the vector
y: Trace = field(default_factory=Trace)
#: Trace with radii of the vector
r: Trace | None = field(default=None)
#: Trace with angles of the vector in radians [-pi..pi]
phi: Trace | None = field(default=None)
#: Trace with angles of the vector in degree [0..360].
theta: Trace | None = field(default=None)
#: Trace with delta angles of the consecutive x,y-points in radians [-pi..pi]
delta_phi: Trace | None = field(default=None)
#: Trace with euclidean distances of the consecutive x,y-points.
distance: Trace | None = field(default=None)
#: Trace with dot products of the consecutive x,y-points.
dot: Trace | None = field(default=None)
def __post_init__(self) -> None:
# dictionary with the vector results
vector = polar(self.x, self.y)
if self.r is None:
self['r'] = vector.get('r', Trace(label='Vector:r'))
if self.phi is None:
self['phi'] = vector.get('phi', Trace(label='Vector:phi'))
if self.theta is None:
self['theta'] = vector.get('theta', Trace(label='Vector:theta'))
self.delta_phi = Trace('Vector:delta_phi')
self.distance = Trace('Vector:distance')
self.dot = Trace('Vector:dot')
for angles in self.phi.window(2):
# angular velocity
omega = angles[1] - angles[0]
if omega > math.pi:
omega -= 2 * math.pi
elif omega < -math.pi:
omega += 2 * math.pi
self.delta_phi.samples.append(omega)
for coordinates in zip(self.x.window(2), self.y.window(2)):
points = [(x, y) for x, y in zip(coordinates[0], coordinates[1])]
self.distance.samples.append(math.dist(*points))
self.dot.samples.append(np.dot(*points))
[docs] def plot(self,
index: Optional[int] = None,
n: Optional[int] = None,
**kwargs: Any) -> go.Scatterpolar:
""" Returns the scatter polar plot of the vector traces.
:param int index: optional start index of the samples to plot
:param int n: optional number of samples to plot
"""
# pack r,theta-pairs
pairs = list(zip(self.r, self.theta))
if pairs:
# slice pairs
index = 0 if index is None else max(0, index)
if n is None:
pairs = pairs[index:]
elif n <= 0:
pairs = pairs[index:index + 1]
else:
pairs = pairs[index:index + n]
# unpack r,theta-pairs
r, theta = zip(*pairs)
else:
r = list()
theta = list()
# default settings
settings = dict(
r=r,
theta=theta,
name='Vector',
mode='lines+markers'
)
# create plot with default settings
_plot = go.Scatterpolar(**settings)
# update plot
return _plot.update(kwargs)
[docs]def polar(x: Trace, y: Trace, **kwargs: Any) -> dict[str, Trace]:
""" Returns a dictionary with the traces for the two *x*, *y* signal samples
converted into a vector represented in polar coordinates.
:param Trace x: cartesian x-coordinate signal trace of the vector
:param Trace y: cartesian y-coordinate signal trace of the vector
:keyword str label: optional traces base label to set.
Default is ``Vector``.
"""
samples = list()
for p in zip(x, y):
vector = Point2D(*p).as_vector()
samples.append(asdict(vector))
traces = dict()
label = kwargs.get('label', 'Vector')
for key, value in as_traces(samples).items():
traces[key] = Trace(label=f'{label}:{key}',
samples=value)
if not traces:
for item in fields(Vector):
traces[item.name] = Trace(label=f'{label}:{item.name}')
return traces
[docs]@dataclass(eq=False)
class Vector:
""" Vector representation by polar coordinates."""
#: Radius of the vector.
r: float = 0.0
#: Angle of the vector in radians [-pi..+pi].
phi: float = 0.0
#: Angle of the vector in degrees [0..360].
theta: Optional[float] = None
def __post_init__(self) -> None:
if self.theta is None:
angel = float(np.degrees(self.phi))
self.theta = angel + 360.0 if angel < 0.0 else angel
[docs]@dataclass(eq=True, order=True)
class Point2D:
""" 2-dimensional point representation by cartesian coordinates."""
#: X-coordinate of the 2-dimensional point.
x: int = 0.0
#: Y-coordinate of the 2-dimensional point.
y: float = 0.0
@property
def r(self):
""" Radius of the 2-dimensional point."""
return math.sqrt(self.x ** 2 + self.y ** 2)
@property
def phi(self):
""" Angle of the 2-dimensional point in radians [-pi..+pi]."""
return float(np.arctan2(self.y, self.x))
@property
def theta(self):
""" Angle of the 2-dimensional point in degrees [0..360]."""
angel = float(np.degrees(self.phi))
return angel + 360.0 if angel < 0 else angel
[docs] def as_vector(self):
""" Returns a :class:`Vector` for the 2-dimensional point."""
return Vector(self.r, self.phi)
[docs] def as_dict(self) -> dict[str, Any]:
""" Returns the 2-dimensional point as a dictionary."""
return asdict(self)
[docs] def as_tuple(self) -> tuple[Any, ...]:
""" Returns the 2-dimensional point as a tuple."""
return astuple(self)
def __iter__(self) -> Iterator[tuple[int, float]]:
""" Returns an iterator over the coordinates of the 2-dimensional point.
"""
return iter(astuple(self))
def __reversed__(self) -> Iterator[tuple[int, float]]:
""" Returns a reverse iterator over the coordinates of the 2-dimensional
point.
"""
return reversed(astuple(self))
[docs]@dataclass(eq=True, order=True)
class Point3D(Point2D):
""" 3-dimensional point representation by cartesian coordinates."""
#: X-coordinate of the 3-dimensional point.
x: int = 0.0
#: Y-coordinate of the 3-dimensional point.
y: float = 0.0
#: Z-coordinate of the 3-dimensional point.
z: float = 0.0
@property
def r(self) -> float:
""" Radius of the 3-dimensional point."""
return math.hypot(self.x, self.y, self.z)
[docs] def as_dict(self) -> dict[str, Any]:
""" Returns the 3-dimensional point as a dictionary."""
return asdict(self)
[docs] def as_tuple(self) -> tuple[Any, ...]:
""" Returns the 3-dimensional point as a tuple."""
return astuple(self)
def __iter__(self) -> Iterator[tuple[int, float, float]]:
""" Returns an iterator over the coordinates of the 3-dimensional point.
"""
return iter(astuple(self))
def __reversed__(self) -> Iterator[tuple[int, float, float]]:
""" Returns a reverse iterator over the coordinates of the 3-dimensional
point.
"""
return reversed(astuple(self))
[docs]@dataclass(eq=False)
class SlewRateLimiterTraces(Traces):
""" Collection of :class:`SlewRateLimiter` traces for signal samples
processed with a slew-rate limiter. """
#: Slew-rate limited signal level value.
level: Trace = field(default_factory=Trace)
#: Signal deviation.
deviation: Trace = field(default_factory=Trace)
#: Slew-rate limiter active ``(0: signal unlimited, 1: signal limited)``.
active: Trace = field(default_factory=Trace)
[docs]@dataclass(eq=True, frozen=True)
class SlewRateLimiter:
""" Slew-rate limiter results."""
#: Slew-rate limited signal level value.
level: float = 0.0
#: Signal deviation.
deviation: float = 0.0
#: Slew-rate limiter active ``(0: signal unlimited, 1: signal limited)``.
active: int = 0
[docs]@dataclass(eq=False)
class AlphaBetaFilterTraces(Traces):
""" Collection of :class:`AlphaBetaFilter` traces for signal samples
filtered with an alpha-beta filter. """
#: Signal forecast value.
forecast: Trace = field(default_factory=Trace)
#: Sign of the signal forecast ``(-1: negative, 0: zero, 1: positive)``.
forecast_sign: Trace = field(default_factory=Trace)
#: Signal level value.
level: Trace = field(default_factory=Trace)
#: Sign of the signal level ``(-1: negative, 0: zero, 1: positive)``.
level_sign: Trace = field(default_factory=Trace)
#: Signal trend value.
trend: Trace = field(default_factory=Trace)
#: Sign of the signal trend ``(-1: negative, 0: zero, 1: positive)``.
trend_sign: Trace = field(default_factory=Trace)
#: Inflection of the signal prognosis trend ``(1: increase, -1: decrease)``.
trend_inflection: Trace = field(default_factory=Trace)
#: Signal prognosis error.
error: Trace = field(default_factory=Trace)
#: Signal forecast variance reduction factor.
variance_forecast: Trace = field(default_factory=Trace)
#: Signal level variance reduction factor.
variance_level: Trace = field(default_factory=Trace)
#: Signal trend variance reduction factor.
variance_trend: Trace = field(default_factory=Trace)
[docs]@dataclass(eq=True, frozen=True)
class AlphaBetaFilter:
""" Alpha-beta filter results."""
#: Signal forecast value.
forecast: float = 0.0
#: Sign of the signal forecast ``(-1: negative, 0: zero, 1: positive)``.
forecast_sign: float = 0
#: Signal level value.
level: float = 0.0
#: Sign of the signal level ``(-1: negative, 0: zero, 1: positive)``.
level_sign: float = 0.0
#: Signal trend value.
trend: float = 0.0
#: Sign of the signal trend ``(-1: negative, 0: zero, 1: positive)``.
trend_sign: float = 0.0
#: Inflection of the signal prognosis trend ``(1: increase, -1: decrease)``.
trend_inflection: float = 0.0
#: Signal prediction error value.
error: float = 0.0
#: Signal forecast variance reduction factor.
variance_forecast: float = 0.0
#: Signal level variance reduction factor.
variance_level: float = 0.0
#: Signal trend variance reduction factor.
variance_trend: float = 0.0
[docs]@dataclass(eq=False)
class ExponentialSmoothingTraces(Traces):
""" Collection of :class:`ExponentialSmoothing` traces for signal samples
processed with a 2nd-order exponential smoothing. """
#: Signal forecast value.
forecast: Trace = field(default_factory=Trace)
#: Sign of the signal forecast ``(-1: negative, 0: zero, 1: positive)``.
forecast_sign: Trace = field(default_factory=Trace)
#: Signal level value.
level: Trace = field(default_factory=Trace)
#: Sign of the signal level ``(-1: negative, 0: zero, 1: positive)``.
level_sign: Trace = field(default_factory=Trace)
#: Signal prognosis value for the first-order exponential smoothing.
prognosis1: Trace = field(default_factory=Trace)
#: Signal prognosis value for the second-order exponential smoothing.
prognosis2: Trace = field(default_factory=Trace)
#: Signal prognosis value of the exponential smoothing.
prognosis: Trace = field(default_factory=Trace)
#: First-order exponentially smoothed signal value.
smoothed1: Trace = field(default_factory=Trace)
#: Second-order exponentially smoothed signal value.
smoothed2: Trace = field(default_factory=Trace)
#: Signal prognosis trend value.
trend: Trace = field(default_factory=Trace)
#: Sign of the signal trend ``(-1: negative, 0: zero, 1: positive)``.
trend_sign: Trace = field(default_factory=Trace)
#: Inflection of the signal prognosis trend ``(1: increase, -1: decrease)``.
trend_inflection: Trace = field(default_factory=Trace)
#: Signal prognosis error.
error: Trace = field(default_factory=Trace)
#: Signal prognosis correction.
correction: Trace = field(default_factory=Trace)
#: Empirical absolute error of the distribution (1st central moment).
absolute_error: Trace = field(default_factory=Trace)
#: Empirical variance of the distribution (2nd central moment).
variance: Trace = field(default_factory=Trace)
#: Empirical standard deviation of the distribution.
deviation: Trace = field(default_factory=Trace)
#: Empirical skew of the distribution (3rd central moment).
skew: Trace = field(default_factory=Trace)
#: Empirical kurtosis of the distribution (4th central moment).
kurtosis: Trace = field(default_factory=Trace)
[docs]@dataclass(eq=True, frozen=True)
class ExponentialSmoothing:
""" 2nd-order exponential smoothing results."""
#: Signal forecast value.
forecast: float = 0.0
#: Sign of the signal level ``(-1: negative, 0: zero, 1: positive)``.
forecast_sign: float = 0
#: Signal level value.
level: float = 0.0
#: Sign of the signal level ``(-1: negative, 0: zero, 1: positive)``.
level_sign: float = 0.0
#: Signal prognosis value for the first-order exponential smoothing.
prognosis1: float = 0.0
#: Signal prognosis value for the second-order exponential smoothing.
prognosis2: float = 0.0
#: Signal prognosis value of the exponential smoothing.
prognosis: float = 0.0
#: First-order exponentially smoothed signal value.
smoothed1: float = 0.0
#: Second-order exponentially smoothed signal value.
smoothed2: float = 0.0
#: Signal prognosis trend value.
trend: float = 0.0
#: Sign of the signal trend ``(-1: negative, 0: zero, 1: positive)``.
trend_sign: float = 0.0
#: Inflection of the signal prognosis trend ``(1: increase, -1: decrease)``.
trend_inflection: float = 0.0
#: Signal prognosis error.
error: float = 0.0
#: Signal prognosis correction.
correction: float = 0.0
#: Empirical absolute error of the distribution (1st central moment).
absolute_error: float = 0.0
#: Empirical variance of the distribution (2nd central moment).
variance: float = 0.0
#: Empirical standard deviation of the distribution.
deviation: float = 0.0
#: Empirical skew of the distribution (3rd central moment).
skew: float = 0.0
#: Empirical kurtosis of the distribution (4th central moment).
kurtosis: float = 0.0
[docs]@dataclass(eq=False)
class LinearRegressionTraces(Traces):
""" Collection of :class:`LinearRegression` traces for signal samples
processed with a moving linear regression. """
#: Current y-coordinate of the approximated line.
level: Trace = field(default_factory=Trace)
#: Slope of the approximated line.
slope: Trace = field(default_factory=Trace)
#: Y-intercept of the approximated line.
intercept: Trace = field(default_factory=Trace)
#: Arithmetic mean of the y-coordinate values of the approximated line.
mean: Trace = field(default_factory=Trace)
#: Median of the y-coordinate values of the approximated line.
median: Trace = field(default_factory=Trace)
#: Minimum of the y-coordinate values of the approximated line.
minimum: Trace = field(default_factory=Trace)
#: Maximum of the y-coordinate values of the approximated line.
maximum: Trace = field(default_factory=Trace)
#: Range of the y-coordinate values of the approximated line.
range: Trace = field(default_factory=Trace)
#: Current error from the approximated line.
error: Trace = field(default_factory=Trace)
#: Maximal negative y-coordinate error from the approximated line.
negative_error: Trace = field(default_factory=Trace)
#: Maximal positive y-coordinate error from the approximated line.
positive_error: Trace = field(default_factory=Trace)
#: Empirical absolute error of the distribution (1st central moment).
absolute_error: Trace = field(default_factory=Trace)
#: Empirical variance of the distribution (2nd central moment).
variance: Trace = field(default_factory=Trace)
#: Empirical standard deviation of the distribution.
deviation: Trace = field(default_factory=Trace)
#: Empirical skew of the distribution (3rd central moment).
skew: Trace = field(default_factory=Trace)
#: Empirical kurtosis of the distribution (4th central moment).
kurtosis: Trace = field(default_factory=Trace)
[docs]@dataclass(eq=True, frozen=True)
class LinearRegression:
""" Linear regression results."""
#: Current y-coordinate of the approximated line.
level: float = 0.0
#: Slope of the approximated line.
slope: float = 0.0
#: Y-intercept of the approximated line.
intercept: float = 0.0
#: Arithmetic mean of the y-coordinate values of the approximated line.
mean: float = 0.0
#: Median of the y-coordinate values of the approximated line.
median: float = 0.0
#: Minimum of the y-coordinate values of the approximated line.
minimum: float = 0.0
#: Maximum of the y-coordinate values of the approximated line.
maximum: float = 0.0
#: Range of the y-coordinate values of the approximated line.
range: float = 0.0
#: Current error from the approximated line.
error: float = 0.0
#: Maximal negative y-coordinate error from the approximated line.
negative_error: float = 0.0
#: Maximal positive y-coordinate error from the approximated line.
positive_error: float = 0.0
#: Empirical absolute error of the distribution (1st central moment).
absolute_error: float = 0.0
#: Empirical variance of the distribution (2nd central moment).
variance: float = 0.0
#: Empirical standard deviation of the distribution.
deviation: float = 0.0
#: Empirical skew of the distribution (3rd central moment).
skew: float = 0.0
#: Empirical kurtosis of the distribution (4th central moment).
kurtosis: float = 0.0