Skip to content

Commit 42e7fe8

Browse files
committed
windaq file reading
1 parent 378a802 commit 42e7fe8

File tree

1 file changed

+145
-0
lines changed

1 file changed

+145
-0
lines changed

wfdb/io/windaq.py

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import struct
2+
3+
import numpy as np
4+
5+
# https://www.dataq.com/resources/pdfs/misc/ff.pdf
6+
7+
"""
8+
- the data file header
9+
- the acquired ADC data
10+
- the data file trailer.
11+
12+
The header contains 35 elements of various byte sizes describing virtually every data acquisition parameter,
13+
as well as various relative references to the ADC data and trailer elements. The headers currently have
14+
space for up to 256 channels, depending on MAX Channels
15+
16+
2 versions: standard and multiplexer
17+
18+
"""
19+
20+
21+
def rdwindaq(file_name, sampfrom, sampto, channels):
22+
"""
23+
Read values from a windaq file
24+
"""
25+
26+
fp = open(file_name, 'rb')
27+
28+
fields = rdheader(f)
29+
30+
signal = rdsignal(f)
31+
32+
signal = dac(signal, fields)
33+
34+
return signal, fields
35+
36+
37+
38+
def rdheader(fp):
39+
"""
40+
Read header info of the windaq file
41+
"""
42+
# element 1, bytes 0-1
43+
n_sig = struct.unpack('>B', fp.read(1))[0]
44+
max_channel_flag = struct.unpack('>B', fp.read(1))[0]
45+
# flag specifies whether max_channels >= 144. Look at 7 lsb.
46+
if max_channel_flag:
47+
n_sig = n_sig & 254
48+
# max channels = 29. Look at 5 lsb (with max=29, not 31)
49+
else:
50+
n_sig = n_sig & 29
51+
52+
# element 2, bytes 2-3
53+
oversamp_factor = struct.unpack('>H', fp.read(2))[0]
54+
# element 3, byte 4
55+
header_channel_offset = struct.unpack('b', fp.read(1))[0]
56+
# element 4, byte 5
57+
bytes_per_channel_info = struct.unpack('b', fp.read(1))[0]
58+
# element 5, bytes 6-7
59+
header_size = struct.unpack('>h', fp.read(2))[0]
60+
# element 6, bytes 8-11. Number of ADC data bytes (if it were unpacked)
61+
# Use element 27 to determine if it were packed
62+
data_size = struct.unpack('>L', fp.read(4))[0]
63+
# element 7, bytes 12-15
64+
trailer_size = struct.unpack('>L', fp.read(4))[0]
65+
# element 8, bytes 16-17
66+
user_ann_size = struct.unpack('>I', fp.read(1))[0]
67+
# Skip to element 13
68+
fp.seek(10)
69+
# element 13, bytes 28-35 - "time between channel samples"
70+
channel_dt = struct.unpack('>d', fp.read(8))[0]
71+
# For sampling rate, assume it is not AT-CODAS legacy version
72+
# Use element 1 and 13 to calculate fs
73+
fs = n_sig / channel_dt
74+
# element 14, bytes 36-39
75+
start_time_offset = struct.unpack('>l', fp.read(4))[0]
76+
# Skip to element 27
77+
fp.seek(60)
78+
# element 27, bytes 100-101
79+
packing_info = struct.unpack('>H', fp.read(2))[0]
80+
# bit 14 of this element contains 'packed' label
81+
# (packed files are WinDaq/Pro+ files with at least one channel
82+
# that has a sample rate divisor other than 1)
83+
packed = bool(packing_info & 8192):
84+
# Skip to element 34
85+
fp.seek(8)
86+
87+
# element 34, bytes 110 to header_size - 3
88+
# element 4 (currently 36) bytes of info per channel
89+
slope = []
90+
intercept = []
91+
units = []
92+
sample_rate_divisor = []
93+
for ch in range(n_sig):
94+
# Skip to item 3
95+
fp.seek(8)
96+
# item 3, bytes 8-15, 8 bytes
97+
slope.append(struct.unpack('>d', fp.read(8))[0])
98+
# item 4, bytes 16-23, 8 bytes
99+
intercept.append(struct.unpack('>d', fp.read(8))[0])
100+
# item 5, bytes 24-29, 6 bytes
101+
unit_chars = struct.iter_unpack('c', fp.read(8))
102+
# only 4 bytes are used
103+
units.append(str(units[:4]).strip())
104+
# skip to item 7
105+
fp.seek(1)
106+
# item 7, byte 31, 1 byte. Used for packed files only.
107+
sample_rate_divisor.append(struct.unpack('b', fp.read(1))[0])
108+
# skip to next channel
109+
fp.seek(4)
110+
111+
# Adjust number of data bytes if file is not packed
112+
if packed:
113+
pass
114+
else:
115+
pass
116+
117+
fields = {'n_sig':nsig, 'sig_len':sig_len, 'fmt':fmt,
118+
'slope': slope, 'intercept':intercept, 'units':units,
119+
'sample_rate_divisor':sample_rate_divisor}
120+
121+
return fields
122+
123+
124+
def rdsignal(f, data_size, n_sig, packed):
125+
"""
126+
Read the signal
127+
"""
128+
sig_len = int(data_size / 2 / n_sig)
129+
130+
signal = np.fromfile(f, dtype='>i2', count=sig_len * n_sig)
131+
132+
# Keep 14 lsb
133+
np.right_shift(signal, 2, out=signal)
134+
# Assume no differential treatment for packed and unpacked files?
135+
signal = signal.reshape((sig_len, n_sig))
136+
137+
return signal
138+
139+
140+
def dac(signal, fields):
141+
"""
142+
Perform dac
143+
"""
144+
145+
return signal

0 commit comments

Comments
 (0)