|
| 1 | +import numpy |
| 2 | +from scipy import signal |
| 3 | + |
| 4 | +from wfdb import Annotation |
| 5 | + |
| 6 | + |
| 7 | +def resample_ann(tt, annsamp): |
| 8 | + # tt: numpy.array as returned by signal.resample |
| 9 | + # annsamp: numpy.array containing indexes of annotations (Annotation.annsamp) |
| 10 | + |
| 11 | + # Compute the new annotation indexes |
| 12 | + |
| 13 | + result = numpy.zeros(len(tt), dtype='bool') |
| 14 | + j = 0 |
| 15 | + tprec = tt[j] |
| 16 | + for i, v in enumerate(annsamp): |
| 17 | + while True: |
| 18 | + d = False |
| 19 | + if j+1 == len(tt): |
| 20 | + result[j] = 1 |
| 21 | + break |
| 22 | + tnow = tt[j+1] |
| 23 | + if tprec <= v and v <= tnow: |
| 24 | + if v-tprec < tnow-v: |
| 25 | + result[j] = 1 |
| 26 | + else: |
| 27 | + result[j+1] = 1 |
| 28 | + d = True |
| 29 | + j += 1 |
| 30 | + tprec = tnow |
| 31 | + if d: |
| 32 | + break |
| 33 | + return numpy.where(result==1)[0].astype('int64') |
| 34 | + |
| 35 | + |
| 36 | +def resample_sig(x, fs, fs_target): |
| 37 | + # x: a numpy.array containing the signal |
| 38 | + # fs: the current frequency |
| 39 | + # fs_target: the target frequency |
| 40 | + |
| 41 | + # Resample a signal |
| 42 | + |
| 43 | + t = numpy.arange(x.shape[0]).astype('float64') |
| 44 | + |
| 45 | + if fs == fs_target: |
| 46 | + return x, t |
| 47 | + |
| 48 | + new_length = int(x.shape[0]*fs_target/fs) |
| 49 | + xx, tt = signal.resample(x, num=new_length, t=t) |
| 50 | + assert xx.shape == tt.shape and xx.shape[0] == new_length |
| 51 | + assert numpy.all(numpy.diff(tt) > 0) |
| 52 | + return xx, tt |
| 53 | + |
| 54 | + |
| 55 | +def resample_singlechan(x, ann, fs, fs_target): |
| 56 | + # x: a numpy.array containing the signal |
| 57 | + # ann: an Annotation object |
| 58 | + # fs: the current frequency |
| 59 | + # fs_target: the target frequency |
| 60 | + |
| 61 | + # Resample a single-channel signal with its annotations |
| 62 | + |
| 63 | + xx, tt = resample_sig(x, fs, fs_target) |
| 64 | + |
| 65 | + new_annsamp = resample_ann(tt, ann.annsamp) |
| 66 | + assert ann.annsamp.shape == new_annsamp.shape |
| 67 | + |
| 68 | + new_ann = Annotation(ann.recordname, ann.annotator, new_annsamp, ann.anntype, ann.num, ann.subtype, ann.chan, ann.aux, ann.fs) |
| 69 | + return xx, new_ann |
| 70 | + |
| 71 | + |
| 72 | +def resample_multichan(xs, ann, fs, fs_target, resamp_ann_chan=0): |
| 73 | + # xs: a numpy.ndarray containing the signals as returned by wfdb.srdsamp |
| 74 | + # ann: an Annotation object |
| 75 | + # fs: the current frequency |
| 76 | + # fs_target: the target frequency |
| 77 | + # resample_ann_channel: the signal channel that is used to compute new annotation indexes |
| 78 | + |
| 79 | + # Resample multiple channels with their annotations |
| 80 | + |
| 81 | + assert resamp_ann_chan < xs.shape[1] |
| 82 | + |
| 83 | + lx = [] |
| 84 | + lt = None |
| 85 | + for chan in range(xs.shape[1]): |
| 86 | + xx, tt = resample_sig(xs[:, chan], fs, fs_target) |
| 87 | + lx.append(xx) |
| 88 | + if chan == resamp_ann_chan: |
| 89 | + lt = tt |
| 90 | + |
| 91 | + new_annsamp = resample_ann(lt, ann.annsamp) |
| 92 | + assert ann.annsamp.shape == new_annsamp.shape |
| 93 | + |
| 94 | + new_ann = Annotation(ann.recordname, ann.annotator, new_annsamp, ann.anntype, ann.num, ann.subtype, ann.chan, ann.aux, ann.fs) |
| 95 | + return numpy.column_stack(lx), new_ann |
| 96 | + |
| 97 | + |
| 98 | +def normalize(x, lb=0, ub=1): |
| 99 | + # lb: Lower bound |
| 100 | + # ub: Upper bound |
| 101 | + |
| 102 | + # Resizes a signal between the lower and upper bound |
| 103 | + |
| 104 | + mid = ub - (ub - lb) / 2 |
| 105 | + min_v = numpy.min(x) |
| 106 | + max_v = numpy.max(x) |
| 107 | + mid_v = max_v - (max_v - min_v) / 2 |
| 108 | + coef = (ub - lb) / (max_v - min_v) |
| 109 | + return x * coef - (mid_v * coef) + mid |
0 commit comments