__author__ = 'maximmillen'
from collections import OrderedDict
import eqsig.im
import numpy as np
from eqsig.single import Signal, AccSignal
[docs]class Cluster(object):
"""
This object represents a group of Signals or AccSignals that have the same time step
Parameters
----------
values: 2d_array_like
An array containing arrays of values that represent time series
dt: float
Time step of time series
names: list
A list of names for each time series
master_index: int
The index of the master time series - used in some signal matching methods
stypes: str or list of str
The signal type for each time series, if 'acc' then AccSignal, else Signal.
kwargs
"""
def __init__(self, values, dt, names=None, master_index=0, stypes="custom", **kwargs):
if names is None:
names = []
self.freq_range = np.array(kwargs.get('freq_range', [0.1, 20]))
lvt = np.log10(1.0 / self.freq_range)
if stypes == "custom" or stypes == "acc":
stypes = [stypes] * len(values)
self.response_times = kwargs.get('resp_times', np.logspace(lvt[1], lvt[0], 31, base=10))
if master_index < 0:
raise ValueError("master_index must be positive")
if master_index > len(values) - 1:
raise ValueError("master_index: %i, out of bounds, maximum value: %i" % (master_index, len(values) - 1))
self.master_index = master_index
# if len(signals) != len(steps):
# raise ValueError("Length of signals: %i, must match length of steps: %i" % (len(signals), len(steps)))
# self.master = Record(master_motion, master_step, response_times=self.response_times)
shortage = len(values) - len(names)
self.names = list(names)
for i in range(shortage):
self.names.append("m%i" % (len(names) + i))
self.master = self.names[master_index]
self.dt = dt
self.signals = OrderedDict()
for s in range(len(values)):
if stypes[s] == "acc":
self.signals[self.names[s]] = AccSignal(values[s], dt, self.names[s], response_times=self.response_times)
else:
self.signals[self.names[s]] = Signal(values[s], dt, self.names[s])
[docs] def values_by_index(self, index):
key_value_pair = list(self.signals.items())[index]
return key_value_pair[1].values
[docs] def signal_by_index(self, index):
key_value_pair = list(self.signals.items())[index]
return key_value_pair[1]
[docs] def values(self, name):
return self.signals[name].values
[docs] def name_by_index(self, index):
key_value_pair = list(self.signals.items())[index]
return key_value_pair[0]
@property
def n_signals(self):
return len(self.signals)
@property
def time(self):
sig0 = self.signal_by_index(0)
return sig0.time
[docs] def same_start(self, **kwargs):
"""
Starts both time series at the same y-value
"""
base = kwargs.get('base', 0)
start = kwargs.get('start', 0)
end = kwargs.get('end', 1)
verbose = kwargs.get('verbose', 0)
master_average = self.signal_by_index(self.master_index).get_section_average(start=start, end=end)
for i in range(len(self.signals)):
if i != self.master_index:
slave_signal = self.signal_by_index(1)
slave_average = slave_signal.get_section_average(start=start, end=end)
diff = slave_average - master_average
if verbose:
print('Same start the records')
print('old difference in starts: ', diff)
slave_signal.reset_values(slave_signal.values - diff)
[docs] def combine_motions(self, f_ch, low_index=0, high_index=1, **kwargs):
"""
This method combined the two ground motions by taking
the high frequency of one values
and the low frequency of the other.
WARNING: records must have the same time step!!!
Parameters
----------
:param f_ch: is the frequency change point
:param low_index: --
:param high_index: --
:param order: refers to the order of the BW filter
combined values is returned as self.combo
"""
order = kwargs.get('order', 4)
remove_gibbs = kwargs.get('remove_gibbs', 0)
self.signal_by_index(high_index).butter_pass(cut_off=(f_ch, None), order=order, remove_gibbs=remove_gibbs)
self.signal_by_index(low_index).butter_pass(cut_off=(None, f_ch), order=order, remove_gibbs=remove_gibbs)
motion = self.signal_by_index(low_index).values + self.signal_by_index(high_index).values
return motion
[docs] def time_match(self, **kwargs):
"""
This method determines the time lag between two records and removes the lag
- the solution is based on a sum of the squared residuals.
- The motions are both set to the base values, default = 0
- must have same time step and same magnitude
"""
verbose = kwargs.get('verbose', 0)
steps = kwargs.get('steps', 10)
set_step = kwargs.get('set_step', False)
trim = kwargs.get('trim', True)
if set_step is False:
if verbose:
print('length m0: ', self.signal_by_index(0).npts)
print('length m1: ', self.signal_by_index(1).npts)
length_check = min(self.signal_by_index(0).npts, self.signal_by_index(1).npts)
bm = self.signal_by_index(self.master_index).values[:length_check]
for s in range(len(self.signals)):
if s != self.master_index:
slave_signal = self.signal_by_index(s)
om = slave_signal.values[:length_check]
squares = (bm[0:-steps] - om[0:-steps]) ** 2
min_diff = np.sum(squares)
min_ind = 0
if verbose:
print('mi: ', min_ind, ' min_diff: ', min_diff)
else:
continue
# Check other values lags base values
for i in range(steps):
squares = (om[i:-steps + i] - bm[0:-steps]) ** 2
diff = sum(squares)
if verbose:
print('ind: ', i, ' diff: ', diff)
if diff < min_diff:
min_diff = diff
min_ind = i + 0
# Check base values lags other values
for i in range(steps):
squares = (bm[i:-steps + i] - om[0:-steps]) ** 2
diff = sum(squares)
if verbose:
print('ind: ', i, ' diff: ', diff)
if diff < min_diff:
min_diff = diff
min_ind = -i - 0
if verbose:
print('lag index: ', min_ind)
if min_ind < 0: # pad with initial value
m_temp = [om[0]] * abs(min_ind) + list(om[:min_ind])
elif min_ind > 0: # pad with final value
m_temp = list(om[min_ind:]) + [om[-1]] * abs(min_ind)
else:
continue
slave_signal.reset_values(m_temp)
# else:
# min_ind = set_step
# if verbose:
# print('time step lag: ', min_ind)
# pad other record with constant
# if verbose:
# print('m_temp[0]: ', m_temp[0])
# if min_ind != 0:
# m = self.motions([np.mod(base + 1, 2)])
# m = np.ones(len(self.motions[np.mod(base + 1, 2)]))
# if min_ind < 0:
# self.motions[np.mod(base + 1, 2)] = m_temp[0] * self.motions[np.mod(base + 1, 2)]
# self.motions[np.mod(base + 1, 2)][-min_ind:] = m_temp
# elif min_ind > 0:
# self.motions[np.mod(base + 1, 2)] = m_temp[-1] * self.motions[np.mod(base + 1, 2)]
# self.motions[np.mod(base + 1, 2)][:-min_ind] = m_temp
# if trim:
# tmax = len(self.motions[base]) * self.dts[base]
# ender = int(tmax / self.dts[np.mod(base + 1, 2)])
# if verbose:
# print('original length: ', len(self.motions[np.mod(base + 1, 2)]))
# print('trim length: ', ender)
# self.motions[np.mod(base + 1, 2)] = self.motions[np.mod(base + 1, 2)][:ender]
return min_ind
[docs] def generate_response_spectrums(self):
for i in range(len(self.signals)):
self.signal_by_index(i).generate_response_spectrum()
[docs] def calculate_ratios(self):
self.generate_response_spectrums()
self.resp_log_ratio = np.log10(self.motions(1).s_a / self.motions(0).s_a)
self.fa_log_ratio = np.log10(self.motions(1).smooth_fa_spectrum / self.motions(0).smooth_fa_spectrum)
centre_freq = 1.0 / self.response_times[len(self.response_times) / 2]
self.response_ratio_moment = sum(self.resp_log_ratio * (np.log10((1.0 /
self.response_times) / centre_freq)))
[docs]def combine_at_angle(acc_sig_ns, acc_sig_we, angle):
off_rad = np.radians(angle)
combo = acc_sig_ns.values * np.cos(off_rad) + acc_sig_we.values * np.sin(off_rad)
new_sig = AccSignal(combo, acc_sig_ns.dt)
return new_sig
[docs]def compute_rotated(acc_sig_ns, acc_sig_we, angle_off_ns=0.0, parameter=None, func=None, points=100):
"""
Computes the rotated value of a parameter.
:param acc_sig_ns:
:param acc_sig_we:
:param angle_off_ns: Angle from North in degrees of the primary signal.
:return: tuple, (angle to rotate, rotated)
"""
assert isinstance(acc_sig_ns, AccSignal)
assert isinstance(acc_sig_we, AccSignal)
assert acc_sig_ns.dt == acc_sig_we.dt
assert acc_sig_ns.npts == acc_sig_we.npts, (acc_sig_ns.npts, acc_sig_we.npts)
degrees = np.linspace(0 - angle_off_ns, 180. - angle_off_ns, points)
degrees = np.mod(degrees, 360)
pvalues = []
for i in range(len(degrees)):
new_sig = combine_at_angle(acc_sig_ns, acc_sig_we, degrees[i])
if parameter == "arias_intensity":
pvalues.append(eqsig.im.calc_arias_intensity(new_sig)[-1])
# new_sig.generate_all_motion_stats()
elif parameter is not None:
assert func is None
pvalues.append(getattr(new_sig, parameter))
elif func is not None:
val = func(new_sig)
if hasattr(val, "__len__"):
pvalues.append(val[-1])
else:
pvalues.append(val)
else:
raise ValueError("parameter or func must be not None")
return degrees, np.array(pvalues)