import numpy as np
from eqsig import exceptions
from eqsig.functions import get_section_average, calc_smooth_fa_spectrum, interp_array_to_approx_dt
import eqsig.sdof as dh
import eqsig.displacements as sd
from eqsig import im
from eqsig.exceptions import deprecation
[docs]class Signal(object):
"""
A time series object
Parameters
----------
values: array_like
values of the time series
dt: float
the time step
label: str, optional
A name for the signal
smooth_freq_range: tuple, optional
The frequency range for computing the smooth FAS
verbose: int, optional
Level of console output
ccbox: int, optional
colour id for plotting
"""
_npts = None
_smooth_freq_points = 61
_fa_spectrum = None
_fa_freqs = None
_cached_fa = False
_cached_smooth_fa = False
_smooth_fa_freqs = None
_smooth_freq_range = None
def __init__(self, values, dt, label='m1', smooth_freq_range=(0.1, 30), smooth_fa_freqs=None,
verbose=0, ccbox=0):
self.verbose = verbose
self._dt = dt
self._values = np.array(values)
self.label = label
if smooth_fa_freqs is not None:
self.smooth_fa_freqs = smooth_fa_freqs
else:
self.set_smooth_fa_frequecies_by_range(smooth_freq_range, 50)
self._smooth_fa_spectrum = np.zeros(len(self.smooth_fa_freqs))
self._npts = len(self.values)
# lf = np.log10(smooth_freq_range)
# self._smooth_fa_frequencies = np.logspace(lf[0], lf[1], 30, base=10)
self.ccbox = ccbox
@property
def values(self):
return self._values
@values.setter
def values(self, vals):
return ValueError('Cannot directly modify values, use self.reset_values()')
[docs] def reset_values(self, new_values):
self._values = new_values
self._npts = len(new_values)
self.clear_cache()
@property
def dt(self):
""" The time step """
return self._dt
@property
def npts(self): # Deliberately no public setter method for this
""" The number of points in the time series """
return self._npts
@property
def time(self):
""" An array of time of equal length to the time series """
return np.arange(0, self.npts) * self.dt
@property
def fa_spectrum(self):
"""The one-sided Fourier Amplitude spectrum"""
if not self._cached_fa:
self.generate_fa_spectrum()
return self._fa_spectrum
@property
def fa_spectrum_abs(self):
"""The one-sided Fourier Amplitude spectrum"""
if not self._cached_fa:
self.generate_fa_spectrum()
return abs(self._fa_spectrum)
[docs] def gen_fa_spectrum(self, p2_plus=0, n=None):
"""
Sets the one-sided Fourier Amplitude spectrum and frequencies
Parameters
----------
p2_plus: int (default=0)
Additional increments of `n` such that length of signal used is `n` and `n=ceil(log2(self.npts) + p2_plus))`.
n: int (optional)
If specified the signal length is padded with zeros before computing FAS
"""
if n is not None:
n_factor = n
else:
n_factor = 2 ** int(np.ceil(np.log2(self.npts)) + p2_plus)
fa = np.fft.fft(self.values, n=n_factor)
points = int(n_factor / 2)
self._fa_spectrum = fa[range(points)] * self.dt
self._fa_freqs = np.arange(points) / (2 * points * self.dt)
self._cached_fa = True
[docs] def generate_fa_spectrum(self):
self.gen_fa_spectrum()
@property
def fa_frequencies(self):
return self.fa_freqs
@property
def fa_freqs(self):
if not self._cached_fa:
self.gen_fa_spectrum()
return self._fa_freqs
@property
def smooth_freq_range(self):
deprecation('AccSignal.smooth_freq_range is deprecated. Use AccSignal.smooth_fa_freqs')
return self.smooth_fa_freqs[0], self.smooth_fa_freqs[-1]
@smooth_freq_range.setter
def smooth_freq_range(self, limits):
deprecation('AccSignal.smooth_freq_range is deprecated. Set AccSignal.smooth_fa_freqs directly')
lf = np.log10(np.array(limits))
self.smooth_fa_freqs = np.logspace(lf[0], lf[1], self.smooth_freq_points, base=10)
[docs] def set_smooth_fa_frequecies_by_range(self, limits, n_points):
lf = np.log10(limits)
self._smooth_fa_freqs = np.logspace(lf[0], lf[1], n_points, base=10)
self._smooth_freq_range = np.array(limits)
self._cached_smooth_fa = False
@property
def smooth_freq_points(self):
deprecation('AccSignal.smooth_freq_points is deprecated. Use len(AccSignal.smooth_fa_freqs)')
return len(self.smooth_fa_freqs)
@smooth_freq_points.setter
def smooth_freq_points(self, value):
deprecation('AccSignal.smooth_freq_points is deprecated. Set AccSignal.smooth_fa_freqs directly')
lf = np.log10(self.smooth_freq_range)
self.smooth_fa_freqs = np.logspace(lf[0], lf[1], int(value), base=10)
@property
def smooth_fa_freqs(self):
return self._smooth_fa_freqs
@property
def smooth_fa_frequencies(self):
return self._smooth_fa_freqs
@smooth_fa_frequencies.setter
def smooth_fa_frequencies(self, frequencies): # backward compatible using 'frequencies'
self._smooth_fa_freqs = np.array(frequencies, dtype=float)
self._cached_smooth_fa = False
@smooth_fa_freqs.setter
def smooth_fa_freqs(self, freqs):
self._smooth_fa_freqs = np.array(freqs, dtype=float)
self._cached_smooth_fa = False
@property
def smooth_fa_spectrum(self):
if not self._cached_smooth_fa:
self.generate_smooth_fa_spectrum()
return self._smooth_fa_spectrum
[docs] def clear_cache(self):
"""Resets the dynamically calculated properties."""
self._cached_smooth_fa = False
self._cached_fa = False
[docs] def generate_smooth_fa_spectrum(self, band=40):
self.gen_smooth_fa_spectrum(band=band)
[docs] def gen_smooth_fa_spectrum(self, smooth_fa_freqs=None, band=40):
"""
Calculates the smoothed Fourier Amplitude Spectrum
using the method by Konno and Ohmachi 1998
Parameters
----------
band: int
range to smooth over
"""
if smooth_fa_freqs is not None:
self._smooth_fa_freqs = smooth_fa_freqs
self._smooth_fa_spectrum = calc_smooth_fa_spectrum(self.fa_freqs,
self.fa_spectrum, self.smooth_fa_freqs, band=band)
self._cached_smooth_fa = True
[docs] def butter_pass(self, cut_off=(0.1, 15), **kwargs):
"""
Performs a Butterworth filter
Notes
-----
Wraps the scipy 'butter' filter
Parameters
----------
cut_off: tuple
Lower and upper cut off frequencies for the filter, if None then no filter.
e.g. (None, 15) applies a lowpass filter at 15Hz, whereas (0.1, 10) applies
a bandpass filter at 0.1Hz to 10Hz.
filter_order: int
Order of the Butterworth filter
remove_gibbs: str (default=None)
Pads with zeros to remove the Gibbs filter effect,
if = 'start' then pads at start,
if = 'end' then pads at end,
if = 'mid' then pads half at start and half at end
gibbs_extra: int
each increment of the value doubles the record length using zero padding.
:return:
"""
from scipy.signal import butter, filtfilt
if isinstance(cut_off, list) or isinstance(cut_off, tuple) or isinstance(cut_off, np.Array):
pass
else:
raise ValueError("cut_off must be list, tuple or array.")
if len(cut_off) != 2:
raise ValueError("cut_off must be length 2.")
if cut_off[0] is not None and cut_off[1] is not None:
filter_type = "band"
cut_off = np.array(cut_off)
elif cut_off[0] is None:
filter_type = 'low'
cut_off = cut_off[1]
else:
filter_type = 'high'
cut_off = cut_off[0]
filter_order = kwargs.get('filter_order', 4)
remove_gibbs = kwargs.get('remove_gibbs', None)
gibbs_extra = kwargs.get('gibbs_extra', 1)
gibbs_range = kwargs.get('gibbs_range', 50)
sampling_rate = 1.0 / self.dt
nyq = sampling_rate * 0.5
mote = self.values
org_len = len(mote)
if remove_gibbs is not None:
# Pad end of record with extra zeros then cut it off after filtering
nindex = int(np.ceil(np.log2(len(mote)))) + gibbs_extra
new_len = 2 ** nindex
diff_len = new_len - org_len
if remove_gibbs == 'start':
s_len = 0
f_len = s_len + org_len
elif remove_gibbs == 'end':
s_len = diff_len
f_len = s_len + org_len
else:
s_len = int(diff_len / 2)
f_len = s_len + org_len
end_value = np.mean(mote[-gibbs_range:])
start_value = np.mean(mote[:gibbs_range])
temp = start_value * np.ones(new_len)
temp[f_len:] = end_value
temp[s_len:f_len] = mote
mote = temp
else:
s_len = 0
f_len = org_len
wp = cut_off / nyq
b, a = butter(filter_order, wp, btype=filter_type)
mote = filtfilt(b, a, mote)
# removing extra zeros from gibbs effect
mote = mote[s_len:f_len] # TODO: don't use -1
self.reset_values(mote)
[docs] def remove_average(self, section=-1, verbose=-1):
"""
Calculates the average and removes it from the record
"""
if verbose == -1:
verbose = self.verbose
average = np.mean(self.values[:section])
self.reset_values(self.values - average)
if verbose:
print('removed av.: ', average)
[docs] def remove_poly(self, poly_fit=0):
"""
Calculates best fit polynomial and removes it from the record
"""
x = np.linspace(0, 1.0, self.npts)
cofs = np.polyfit(x, self.values, poly_fit)
y_cor = 0 * x
for co in range(len(cofs)):
mods = x ** (poly_fit - co)
y_cor += cofs[co] * mods
self.reset_values(self.values - y_cor)
[docs] def get_section_average(self, start=0, end=-1, index=False):
"""
Gets the average value of a part of series.
Common use is so that it can be patched
with another record.
Parameters
----------
start: int or float, optional
Section start point
end: int or float, optional
Section end point
index: bool, optional
if False then start and end are considered values in time.
Returns
-------
float
The mean value of the section.
"""
return get_section_average(self, start=start, end=end, index=index)
[docs] def add_constant(self, constant):
"""
Adds a single value to every value in the signal.
Parameters
----------
constant: float
Value to be added to time series
"""
self.reset_values(self.values + constant)
[docs] def add_series(self, series):
"""
Adds a series of values to the values in the signal.
Parameters
----------
series: array_like
A series of values
"""
if len(series) == self.npts:
self.reset_values(self.values + series)
else:
raise exceptions.SignalProcessingError("new series has different length to Signal")
[docs] def add_signal(self, new_signal):
"""
Combines a signal.
Parameters
----------
new_signal: Signal object
"""
if isinstance(new_signal, Signal):
if new_signal.dt == self.dt:
self.add_series(new_signal.values)
else:
raise exceptions.SignalProcessingError("New signal has different time step")
else:
raise exceptions.SignalProcessingError("New signal is not a Signal object")
[docs] def running_average(self, width=1):
"""
Averages the values over a width (chunk of numbers)
replaces the value in the centre of the chunk.
Parameters
----------
width: int
The number of points over which values are averaged
"""
mot = self.values
for i in range(len(mot)):
if i < width / 2:
cc = i + int(width / 2) + 1
self._values[i] = np.mean(mot[:cc])
elif i > len(mot) - width / 2:
cc = i - int(width / 2)
self._values[i] = np.mean(mot[cc:])
else:
cc1 = i - int(width / 2)
cc2 = i + int(width / 2) + 1
self._values[i] = np.mean(mot[cc1:cc2])
self.clear_cache()
[docs]class AccSignal(Signal):
"""
A time series object
Parameters
----------
values: array_like
Acceleration time series - should be in m/s2
dt: float
Time step
label: str (default='m1')
Used for plotting
smooth_freq_range: tuple [floats] (default=(0.1, 30))
lower and upper bound of frequency range to compute the smoothed Fourier amplitude spectrum
verbose: int (default=0)
Level of output verbosity
response_times: tuple of floats (default=(0.1, 5))
ccbox: int
colour index for plotting
"""
def __init__(self, values, dt, label='m1', smooth_freq_range=(0.1, 30), smooth_fa_freqs=None, verbose=0, response_period_range=(0.1, 5),
response_times=None, ccbox=0):
super(AccSignal, self).__init__(values, dt, label=label, smooth_freq_range=smooth_freq_range,
smooth_fa_freqs=smooth_fa_freqs, verbose=verbose, ccbox=ccbox)
if response_times is None:
self.response_times = np.linspace(response_period_range[0], response_period_range[1], 100)
else:
self.response_times = np.array(response_times)
self._velocity = np.zeros(self.npts)
self._displacement = np.zeros(self.npts)
self._cached_params = {}
self._cached_response_spectra = False
self._cached_disp_and_velo = False
self._cached_xi = 0.05
self._s_a = None
self._s_v = None
self._s_d = None
[docs] def clear_cache(self):
self._cached_smooth_fa = False
self._cached_fa = False
self._cached_response_spectra = False
self._cached_disp_and_velo = False
self.reset_all_motion_stats()
[docs] def gen_response_spectrum(self, response_times=None, xi=-1, min_dt_ratio=4):
"""
Generate the response spectrum for the response_times for a given
damping (xi).
Parameters
----------
response_times: array_like
Time periods of SDOFs that response should be computed for
xi: float (default=0.05 or previously set)
Ratio of critical damping
min_dt_ratio: float
Smallest ratio between response period and integration time step,
acceleration time series will be interpolated to reach appropriate integration time step if
not achieved using the acceleration time series time step.
"""
if self.verbose:
print('Generating response spectra')
if response_times is not None:
self.response_times = response_times
if self.response_times[0] != 0:
min_non_zero_period = self.response_times[0]
else:
min_non_zero_period = self.response_times[1]
target_dt = max(min_non_zero_period / 20, self.dt / min_dt_ratio) # limit to ratio of motion time step
if target_dt < self.dt:
values_interp, dt_interp = interp_array_to_approx_dt(self.values, self.dt, target_dt, even=False)
else:
values_interp = self.values
dt_interp = self.dt
if xi == -1:
xi = self._cached_xi
try:
self._s_d, self._s_v, self._s_a = dh.pseudo_response_spectra(values_interp, dt_interp, self.response_times, xi)
except MemoryError:
raise MemoryError('Out of memory. Length of acc (%i) and length of response_times (%i) too large, '
'set larger min_dt_ratio.' % (len(values_interp), len(self.response_times)))
self._cached_response_spectra = True
[docs] def generate_response_spectrum(self, response_times=None, xi=-1, min_dt_ratio=4):
self.gen_response_spectrum(response_times=response_times, xi=xi, min_dt_ratio=min_dt_ratio)
@property
def s_a(self):
"""Pseudo maximum response accelerations of linear SDOFs"""
if not self._cached_response_spectra:
self.generate_response_spectrum()
return self._s_a
@property
def s_v(self):
"""Pseudo maximum response velocities of linear SDOFs"""
if not self._cached_response_spectra:
self.generate_response_spectrum()
return self._s_v
@property
def s_d(self):
"""Maximum response displacements of linear SDOFs"""
if not self._cached_response_spectra:
self.generate_response_spectrum()
return self._s_d
[docs] def correct_me(self):
"""This provides a correction to an acceleration time series"""
from scipy.signal import detrend
disp = detrend(self.displacement)
vel = np.zeros(self.npts)
acc = np.zeros(self.npts)
for i in range(self.npts - 1): # MEANS THAT ACC has a pulse at the end.
vel[i + 1] = (disp[i + 1] - disp[i]) / self.dt
# vel[i + 1] = 2 * (disp[i + 1] - disp[i]) / self.dt - vel[i]
acc[i + 1] = (vel[i + 1] - vel[i]) / self.dt
# acc[i + 1] = 2 * (vel[i + 1] - vel[i]) / self.dt - acc[i]
# init_acc = np.mean(acc[:10])
acc[:10] = np.mean(acc[:10])
self.reset_values(acc)
[docs] def remove_rolling_average(self, mtype="velocity", freq_window=5):
"""
Removes the rolling average
Parameters
----------
mtype: str
motion type to apply method to
freq_window: int
window for applying the rolling average
"""
if mtype == "velocity":
mot = self.velocity
else:
mot = self.values
width = int(1. / (freq_window * self.dt))
if width < 1:
raise ValueError("freq_window to high")
roll = np.zeros_like(mot)
for i in range(len(mot)):
if i < width / 2:
cc = i + int(width / 2) + 1
roll[i] = np.mean(mot[:cc])
elif i > len(mot) - width / 2:
cc = i - int(width / 2)
roll[i] = np.mean(mot[cc:])
else:
cc1 = i - int(width / 2)
cc2 = i + int(width / 2) + 1
roll[i] = np.mean(mot[cc1:cc2])
if mtype == "velocity":
velocity = self.velocity - roll
acc = np.diff(velocity) / self.dt
acc = np.insert(acc, 0, velocity[0] / self.dt)
self._values = acc
else:
self._values -= roll
self.clear_cache()
[docs] def rebase_displacement(self):
"""Correction to make the displacement zero at the end of the record"""
end_disp = self.displacement[-1]
acceleration_correction = 2 * end_disp / (self.dt * self.npts)
self._values -= acceleration_correction
self.clear_cache()
[docs] def set_zero_residual_velocity(self, timezone=None):
post_vel = self.velocity[-1]
if timezone is None:
nsteps = int(abs(post_vel) / (self.pga * self.dt / 100)) + 1
si = -nsteps
ei = None
else:
si = int(timezone[0] / self.dt)
if timezone[1] is not None:
ei = int(timezone[1] / self.dt)
nsteps = ei - si
else:
ei = None
nsteps = len(self.values) - si
delta_acc = post_vel / self.dt / nsteps
vals = self.values
vals[si:ei] -= delta_acc
self.reset_values(vals)
[docs] def set_zero_residual_displacement(self, timezone=None):
if timezone is None:
si = None
ei = None
ttime = self.time[-1]
else:
raise ValueError('Not supported')
si = int(timezone[0] / self.dt)
ei = int(timezone[1] / self.dt)
ttime = timezone[1] - timezone[0]
post_disp = self.displacement[-1]
delta_acc = post_disp * 2 / ttime ** 2
vals = self.values
vals[si:ei] -= delta_acc
self.reset_values(vals)
[docs] def set_zero_residual_displacement_and_velocity(self, timezone=None):
pdisp = self.displacement[-1]
pvel = self.velocity[-1]
if timezone is None:
si = None
ei = None
ttime = self.time[-1]
else:
si = int(timezone[0] / self.dt)
if timezone[1] is not None:
ei = int(timezone[1] / self.dt)
pvel = 0 # if using a timezone the end velocity should be zero
ttime = timezone[1] - timezone[0]
else:
ei = None
ttime = self.time[-1] - timezone[0]
# acc = 2 * a + 6 * b * t
# vel = 2 * a * t + 3 * b * t ** 2
# disp = a * t ** 2 + b * t ** 3
# rearrange: a = (post_disp - b * ttime ** 3) / ttime ** 2
b = (-2*pdisp + pvel*ttime)/ttime**3
a = (pdisp - b * ttime ** 3) / ttime ** 2
# use a parabola (acc = a*t^2 + b*t + c)
# res_disp =
tincs = self.time[:ei]
if si is not None:
tincs = tincs[si:]
tincs -= tincs[0]
delta_acc = 2 * a + 6 * b * tincs
vals = self.values
vals[si:ei] -= delta_acc
self.reset_values(vals)
[docs] def generate_displacement_and_velocity_series(self, trap=True):
"""Calculates the displacement and velocity time series"""
self._velocity, self._displacement = sd.calc_velo_and_disp_from_accel_arr(self.values, self.dt, trap=trap)
self._cached_disp_and_velo = True
@property
def velocity(self):
"""Velocity time series"""
if not self._cached_disp_and_velo:
self.generate_displacement_and_velocity_series()
return self._velocity
# @property
# def zeroed_velocity(self):
# """Velocity time series with linear correction to have zero velocity at end"""
# return self.velocity - np.arange(self.npts) * self.velocity[-1] / self.npts
@property
def displacement(self):
"""Displacement time series"""
if not self._cached_disp_and_velo:
self.generate_displacement_and_velocity_series()
return self._displacement
# @property
# def zeroed_displacement(self):
# """Displacement time series with linear correction to have zero displacement at end"""
# return self.displacement - np.arange(self.npts) * self.displacement[-1] / self.npts
[docs] def generate_peak_values(self):
"""
DEPRECATED: all peak values are lazy loaded - this method does not need to be run.
"""
deprecation("generate_peak_values() is no longer in use, all peak values are lazy loaded.")
@property
def pga(self):
"""Absolute peak ground acceleration"""
if "pga" in self._cached_params:
return self._cached_params["pga"]
else:
pga = im.calc_peak(self.values)
self._cached_params["pga"] = pga
return pga
@property
def pgv(self):
"""Absolute peak ground velocity"""
if "pgv" in self._cached_params:
return self._cached_params["pgv"]
else:
pgv = im.calc_peak(self.velocity)
self._cached_params["pgv"] = pgv
return pgv
@property
def pgd(self):
"""Absolute peak ground displacement"""
if "pgd" in self._cached_params:
return self._cached_params["pgd"]
else:
pgd = im.calc_peak(self.displacement)
self._cached_params["pgd"] = pgd
return pgd
[docs] def generate_duration_stats(self):
"""
Deprecated: Use eqsig.im.calc_sig_dur or eqsig.im.calc_sig_dur, eqsig.im.calc_brac_dur or esig.im.calc_a_rms
"""
deprecation("Use eqsig.im.calc_sig_dur or eqsig.im.calc_sig_dur, eqsig.im.calc_brac_dur or esig.im.calc_a_rms")
abs_motion = abs(self.values)
time = np.arange(self.npts) * self.dt
# Bracketed duration
ind01 = np.where(abs_motion / 9.8 > 0.01) # 0.01g
time2 = time[ind01]
try:
self.t_b01 = time2[-1] - time2[0]
# rms acceleration in m/s/s
self.a_rms01 = np.sqrt(1 / self.t_b01 * np.trapz((self.values[ind01[0][0]:ind01[0][-1]]) ** 2, dx=self.dt))
except IndexError:
self.t_b01 = -1.
self.a_rms01 = -1.
ind05 = np.where(abs_motion / 9.8 > 0.05) # 0.05g
time05 = time[ind05]
try:
self.t_b05 = time05[-1] - time05[0]
self.a_rms05 = np.sqrt(1 / self.t_b05 * np.trapz((self.values[ind05[0][0]:ind05[0][-1]]) ** 2, dx=self.dt))
except IndexError:
self.t_b05 = -1.
self.a_rms05 = -1.
ind10 = np.where(abs_motion / 9.8 > 0.1) # 0.10g
time10 = time[ind10]
try:
self.t_b10 = time10[-1] - time10[0]
self.a_rms10 = np.sqrt(1 / self.t_b10 * np.trapz((self.values[ind10[0][0]:ind10[0][-1]]) ** 2, dx=self.dt))
except IndexError:
self.t_b10 = -1.
self.a_rms10 = -1.
# Trifunac and Brady
self.sd_start, self.sd_end = im.calc_sig_dur_vals(self.values, self.dt, se=True)
self.t_595 = self.sd_end - self.sd_start
[docs] def generate_cumulative_stats(self):
"""
Deprecated: Use eqsig.im.calc_arias_intensity, eqsig.im.calc_cav
"""
deprecation("Use eqsig.im.calc_arias_intensity, eqsig.im.calc_cav")
# Arias intensity in m/s
self.arias_intensity_series = im.calc_arias_intensity(self)
self.arias_intensity = self.arias_intensity_series[-1]
self.cav_series = im.calc_cav(self)
self.cav = self.cav_series[-1]
[docs] def generate_all_motion_stats(self):
"""
Deprecated: Use eqsig.im functions to calculate stats
"""
deprecation("Use eqsig.im functions to calculate stats")
self.generate_duration_stats()
self.generate_cumulative_stats()
[docs] def reset_all_motion_stats(self):
self.t_b01 = 0.0
self.t_b05 = 0.0
self.t_b10 = 0.0
self.a_rms01 = 0.0
self.a_rms05 = 0.0
self.a_rms10 = 0.0
self.t_595 = 0.0
self.sd_start = 0.0
self.sd_end = 0.0
self.arias_intensity = 0.0
self._cached_params = {}
[docs] def response_series(self, response_times=None, xi=-1):
"""
Generate the response time series for a set of elastic SDOFs for a given
damping (xi).
"""
if self.verbose:
print('Generating response series')
if response_times is not None:
self.response_times = response_times
if xi == -1:
xi = self._cached_xi
resp_u, resp_v, resp_a = dh.response_series(self.values, self.dt, self.response_times, xi)
return resp_u, resp_v, resp_a