|
| 1 | +import numpy as np |
| 2 | +import scipy.signal as scisig |
| 3 | + |
| 4 | + |
| 5 | +class PanTompkins(object): |
| 6 | + """ |
| 7 | + Class for implementing the Pan-Tompkins |
| 8 | + qrs detection algorithm. |
| 9 | +
|
| 10 | + Works on static signals. In future update, |
| 11 | + will work on streaming ecg signal. |
| 12 | + """ |
| 13 | + def __init__(self, sig=None, fs=None, streamsig=None): |
| 14 | + self.sig = sig |
| 15 | + self.fs = fs |
| 16 | + |
| 17 | + self.livesig = livesig |
| 18 | + |
| 19 | + if sig is not None: |
| 20 | + self.siglen = len(sig) |
| 21 | + |
| 22 | + def detect_qrs_static(self): |
| 23 | + """ |
| 24 | + Detect all the qrs locations in the static |
| 25 | + signal |
| 26 | + """ |
| 27 | + |
| 28 | + # Resample the signal to 200Hz if necessary |
| 29 | + self.resample() |
| 30 | + |
| 31 | + # Bandpass filter the signal |
| 32 | + self.sig_F = self.bandpass() |
| 33 | + |
| 34 | + # Calculate the moving wave integration signal |
| 35 | + self.sig_I = self.mwi() |
| 36 | + |
| 37 | + # Align the filtered and integrated signal with the original |
| 38 | + self.alignsignals() |
| 39 | + |
| 40 | + |
| 41 | + # Initialize parameters via the two learning phases |
| 42 | + self.learnparams() |
| 43 | + |
| 44 | + # Loop through every index and detect qrs locations |
| 45 | + for i in range(self.siglen): |
| 46 | + pass |
| 47 | + |
| 48 | + # Go through each index and detect qrs complexes. |
| 49 | + for i in range(siglen): |
| 50 | + # Determine whether the current index is a peak |
| 51 | + # for each signal |
| 52 | + is_peak_F = ispeak(sig_F, siglen, i, 20) |
| 53 | + is_peak_I = ispeak(sig_I, siglen, i, 20) |
| 54 | + |
| 55 | + # Whether the current index is a signal peak or noise peak |
| 56 | + is_sigpeak_F = False |
| 57 | + is_noisepeak_F = False |
| 58 | + is_sigpeak_I = False |
| 59 | + is_noisepeak_I = False |
| 60 | + |
| 61 | + # If peaks are detected, classify them as signal or noise |
| 62 | + # for their respective channels |
| 63 | + |
| 64 | + if is_peak_F: |
| 65 | + # Satisfied signal peak criteria for the channel |
| 66 | + # but not necessarily overall |
| 67 | + if sig_F[i] > thresh_F: |
| 68 | + is_sigpeak_F = True |
| 69 | + # Did not satisfy signal peak criteria. |
| 70 | + # Label as noise peak |
| 71 | + else: |
| 72 | + is_noisepeak_F = True |
| 73 | + |
| 74 | + if is_peak_I: |
| 75 | + # The |
| 76 | + if sig_I[i] > thresh_I: |
| 77 | + is_peak_sig_I = True |
| 78 | + else: |
| 79 | + |
| 80 | + # Check for double signal peak coincidence and at least >200ms (40 samples samples) |
| 81 | + # since the previous r peak |
| 82 | + is_sigpeak = is_sigpeak_F and is_sigpeak_I and () |
| 83 | + |
| 84 | + # Found a signal peak! |
| 85 | + if is_sigpeak: |
| 86 | + |
| 87 | + # BUT WAIT, THERE'S MORE! It could be a T-Wave ... |
| 88 | + |
| 89 | + # When an rr interval < 360ms (72 samples), it is checked |
| 90 | + # to determine whether it is a T-Wave |
| 91 | + if i - prev_r_ind < 360: |
| 92 | + |
| 93 | + |
| 94 | + # Update running parameters |
| 95 | + |
| 96 | + sigpeak_I = 0.875*sigpeak_I + 0.125*sig_I[i] |
| 97 | + sigpeak_F = 0.875*sigpeak_I + 0.125*sig_I[i] |
| 98 | + |
| 99 | + last_r_ind = i |
| 100 | + rr_limitavg |
| 101 | + rr_limitavg |
| 102 | + |
| 103 | + # Not a signal peak. Update running parameters |
| 104 | + # if any other peaks are detected |
| 105 | + elif is_peak_F: |
| 106 | + |
| 107 | + |
| 108 | + last_r_distance = i - last_r_ind |
| 109 | + |
| 110 | + if last_r_distance > |
| 111 | + |
| 112 | + |
| 113 | + qrs = np.zeros(10) |
| 114 | + |
| 115 | + # Convert the peak indices back to the original fs if necessary |
| 116 | + if fs!=200: |
| 117 | + qrs = qrs*fs/200 |
| 118 | + qrs = qrs.astype('int64') |
| 119 | + |
| 120 | + |
| 121 | + return qrs |
| 122 | + |
| 123 | + |
| 124 | + |
| 125 | + |
| 126 | + def resample(self): |
| 127 | + if self.fs != 200: |
| 128 | + self.sig = scisig.resample(self.sig, int(self.siglen*200/fs)) |
| 129 | + |
| 130 | + # Bandpass filter the signal from 5-15Hz |
| 131 | + def bandpass(self, plotsteps): |
| 132 | + # 15Hz Low Pass Filter |
| 133 | + a_low = [1, -2, 1] |
| 134 | + b_low = np.concatenate(([1], np.zeros(4), [-2], np.zeros(5), [1])) |
| 135 | + sig_low = scisig.lfilter(b_low, a_low, self.sig) |
| 136 | + |
| 137 | + # 5Hz High Pass Filter - passband gain = 32, delay = 16 samples |
| 138 | + a_high = [1,-1] |
| 139 | + b_high = np.concatenate(([-1/32], np.zeros(15), [1, -1], np.zeros(14), [1/32])) |
| 140 | + self.sig_F = scisig.lfilter(b_high, a_high, sig_low) |
| 141 | + |
| 142 | + if plotsteps: |
| 143 | + plt.plot(sig_low) |
| 144 | + plt.plot(self.sig_F) |
| 145 | + plt.legend(['After LP', 'After LP+HP']) |
| 146 | + plt.show() |
| 147 | + |
| 148 | + # Compute the moving wave integration waveform from the filtered signal |
| 149 | + def mwi(sig, plotsteps): |
| 150 | + # Compute 5 point derivative |
| 151 | + a_deriv = [1] |
| 152 | + b_deriv = [1/4, 1/8, 0, -1/8, -1/4] |
| 153 | + sig_F_deriv = scisig.lfilter(b_deriv, a_deriv, self.sig_F) |
| 154 | + |
| 155 | + # Square the derivative |
| 156 | + sig_F_deriv = np.square(sig_F_deriv) |
| 157 | + |
| 158 | + # Perform moving window integration - 150ms (ie. 30 samples wide for 200Hz) |
| 159 | + a_mwi = [1] |
| 160 | + b_mwi = 30*[1/30] |
| 161 | + |
| 162 | + self.sig_I = scisig.lfilter(b_mwi, a_mwi, sig_F_deriv) |
| 163 | + |
| 164 | + if plotsteps: |
| 165 | + plt.plot(sig_deriv) |
| 166 | + plt.plot(self.sig_I) |
| 167 | + plt.legend(['deriv', 'mwi']) |
| 168 | + plt.show() |
| 169 | + |
| 170 | + # Align the filtered and integrated signal with the original |
| 171 | + def alignsignals(self): |
| 172 | + self.sig_F = self.sig_F |
| 173 | + |
| 174 | + self.sig_I = self.sig_I |
| 175 | + |
| 176 | + |
| 177 | + def learnparams(self): |
| 178 | + """ |
| 179 | + Initialize parameters using the start of the waveforms |
| 180 | + during the two learning phases described. |
| 181 | + |
| 182 | + "Learning phase 1 requires about 2s to initialize |
| 183 | + detection thresholds based upon signal and noise peaks |
| 184 | + detected during the learning process. |
| 185 | + |
| 186 | + Learning phase two requires two heartbeats to initialize |
| 187 | + RR-interval average and RR-interval limit values. |
| 188 | + |
| 189 | + The subsequent detection phase does the recognition process |
| 190 | + and produces a pulse for each QRS complex" |
| 191 | + |
| 192 | + This code is not detailed in the Pan-Tompkins |
| 193 | + paper. The PT algorithm requires a threshold to |
| 194 | + categorize peaks as signal or noise, but the |
| 195 | + threshold is calculated from noise and signal |
| 196 | + peaks. There is a circular dependency when |
| 197 | + none of the fields are initialized. Therefore this learning phase will detect signal peaks using a |
| 198 | + different method, and estimate the threshold using those peaks. |
| 199 | + |
| 200 | + This function works as follows: |
| 201 | + - Try to find at least 2 signal peaks (qrs complexes) in the |
| 202 | + first N seconds of both signals using simple low order |
| 203 | + moments. Signal peaks are only defined when the same index is |
| 204 | + determined to be a peak in both signals. Start with N==2. |
| 205 | + If fewer than 2 signal peaks are detected, increment N and |
| 206 | + try again. |
| 207 | + - Using the classified estimated peaks, threshold1 is estimated as |
| 208 | + based on the steady state estimate equation: thres = 0.75*noisepeak + 0.25*sigpeak |
| 209 | + using the mean of the noisepeaks and signalpeaks instead of the |
| 210 | + running value. |
| 211 | + - Using the estimated peak locations, the rr parameters are set. |
| 212 | + |
| 213 | + """ |
| 214 | + |
| 215 | + # The sample radius when looking for local maxima |
| 216 | + radius = 20 |
| 217 | + # The signal start duration to use for learning |
| 218 | + learntime = 2 |
| 219 | + |
| 220 | + while : |
| 221 | + wavelearn_F = filtsig[:200*learntime] |
| 222 | + wavelearn_I = mwi[:200*learntime] |
| 223 | + |
| 224 | + # Find peaks in the signals |
| 225 | + peakinds_F = findpeaks_radius(wavelearn_F, radius) |
| 226 | + peakinds_I = findpeaks_radius(wavelearn_I, radius) |
| 227 | + peaks_F = wavelearn_F[peakinds_F] |
| 228 | + peaks_I = wavelearn_I[peakinds_I] |
| 229 | + |
| 230 | + # Classify signal and noise peaks. |
| 231 | + # This is the main tricky part |
| 232 | + |
| 233 | + # Align peaks to minimum value and set to unit variance |
| 234 | + peaks_F = (peaks_F - min(peaks_F)) / np.std(peaks_F) |
| 235 | + peaks_I = (peaks_I - min(peaks_I)) / np.std(peaks_I) |
| 236 | + sigpeakinds_F = np.where(peaks_F) >= 1.4 |
| 237 | + sigpeakinds_I = np.where(peaks_I) >= 1.4 |
| 238 | + |
| 239 | + # Final signal peak when both signals agree |
| 240 | + sigpeakinds = np.intersect1d(sigpeaks_F, sigpeaks_I) |
| 241 | + # Noise peaks are the remainders |
| 242 | + noisepeakinds_F = np.setdiff1d(peakinds_F, sigpeakinds) |
| 243 | + noisepeakinds_I = np.setdiff1d(peakinds_I, sigpeakinds) |
| 244 | + |
| 245 | + if len(sigpeakinds)>1: |
| 246 | + break |
| 247 | + |
| 248 | + # Need to detect at least 2 peaks |
| 249 | + learntime = learntime + 1 |
| 250 | + |
| 251 | + # Found at least 2 peaks. Use them to set parameters. |
| 252 | + |
| 253 | + # Set running peak estimates to first values |
| 254 | + sigpeak_F = wavelearn_F[sigpeakinds[0]] |
| 255 | + sigpeak_I = wavelearn_I[sigpeakinds[0]] |
| 256 | + noisepeak_F = wavelearn_F[noisepeakinds_F[0]] |
| 257 | + noisepeak_I = wavelearn_I[noisepeakinds_I[0]] |
| 258 | + |
| 259 | + # Use all signal and noise peaks in learning window to estimate threshold |
| 260 | + # Based on steady state equation: thres = 0.75*noisepeak + 0.25*sigpeak |
| 261 | + thres_F = 0.75*np.mean(wavelearn_F[noisepeakinds_F]) + 0.25*np.mean(wavelearn_F[sigpeakinds_F]) |
| 262 | + thres_I = 0.75*np.mean(wavelearn_I[noisepeakinds_I]) + 0.25*np.mean(wavelearn_I[sigpeakinds_I]) |
| 263 | + # Alternatively, could skip all of that and do something very simple like thresh_F = max(filtsig[:400])/3 |
| 264 | + |
| 265 | + # Set the r-r history using the first r-r interval |
| 266 | + # The most recent 8 rr intervals |
| 267 | + rr_history_unbound = [wavelearn_F[sigpeakinds[1]]-wavelearn_F[sigpeakinds[0]]]*8 |
| 268 | + # The most recent 8 rr intervals that fall within the acceptable low and high rr interval limits |
| 269 | + rr_history_bound = [wavelearn_I[sigpeakinds[1]]-wavelearn_I[sigpeakinds[0]]]*8 |
| 270 | + |
| 271 | + rr_average_unbound = np.mean(rr_history_unbound) |
| 272 | + rr_average_bound = np.mean(rr_history_bound) |
| 273 | + |
| 274 | + # Wait... what is rr_average_unbound for then? |
| 275 | + rr_low_limit = 0.92*rr_average_bound |
| 276 | + rr_high_limit = 1.16*rr_average_bound |
| 277 | + rr_missed_limit = 1.66*rr_average_bound |
| 278 | + |
| 279 | + return thresh_F, thresh_I, sigpeak_F, sigpeak_I, |
| 280 | + noisepeak_F, noisepeak_I, rr_freeavg_F, |
| 281 | + r_freeavg_I, rr_limitavg_F, rr_limitavg_I |
| 282 | + |
| 283 | + |
| 284 | + |
| 285 | + |
| 286 | + |
| 287 | + |
| 288 | + |
| 289 | +# Determine whether the signal contains a peak at index ind. |
| 290 | +# Check if it is the max value amoung samples ind-radius to ind+radius |
| 291 | +def ispeak_radius(sig, siglen, ind, radius): |
| 292 | + if sig[ind] == max(sig[max(0,i-radius):min(siglen, i+radius)]): |
| 293 | + return True |
| 294 | + else: |
| 295 | + return False |
| 296 | + |
| 297 | +# Find all peaks in a signal. Simple algorithm which marks a |
| 298 | +# peak if the <radius> samples on its left and right are |
| 299 | +# all not bigger than it. |
| 300 | +def findpeaks_radius(sig, radius): |
| 301 | + |
| 302 | + siglen = len(sig) |
| 303 | + peaklocs = [] |
| 304 | + |
| 305 | + # Pad samples at start and end |
| 306 | + sig = np.concatenate((np.ones(radius)*sig[0],sig, np.ones(radius)*sig[-1])) |
| 307 | + |
| 308 | + i=radius |
| 309 | + while i<siglen+radius: |
| 310 | + if sig[i] == max(sig[i-radius:i+radius]): |
| 311 | + peaklocs.append(i) |
| 312 | + i=i+radius |
| 313 | + else: |
| 314 | + i=i+1 |
| 315 | + |
| 316 | + peaklocs = np.array(peaklocs)-radius |
| 317 | + return peaklocs |
| 318 | + |
0 commit comments