Skip to content

Adds optional EDF header reader #270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 18, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 189 additions & 1 deletion wfdb/io/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,8 @@ def check_np_array(item, field_name, ndim, parent_class, channel_num=None):
raise TypeError(error_msg)


def edf2mit(record_name, pn_dir=None, delete_file=True, record_only=False):
def edf2mit(record_name, pn_dir=None, delete_file=True, record_only=False,
header_only=False, verbose=False):
"""
Convert EDF formatted files to MIT format.

Expand Down Expand Up @@ -1384,6 +1385,13 @@ def edf2mit(record_name, pn_dir=None, delete_file=True, record_only=False):
record_only : bool, optional
Whether to only return the record information (True) or not (False).
If false, this function will generate both a .dat and .hea file.
header_only : bool, optional
Whether to only return the header information (True) or not (False).
If true, this function will only return `['fs', 'sig_len', 'n_sig',
'base_date', 'base_time', 'units', 'sig_name', 'comments']`.
verbose : bool, optional
Whether to print all the information read about the file (True) or
not (False).

Returns
-------
Expand Down Expand Up @@ -1412,6 +1420,186 @@ def edf2mit(record_name, pn_dir=None, delete_file=True, record_only=False):
r = requests.get(file_url, allow_redirects=False)
open(record_name, 'wb').write(r.content)

# Temporary to return only the EDF header.. will later replace the
# current MNE package approach
if header_only:
# Open the desired file
edf_file = open(record_name, mode='rb')

# Remove the file if the `delete_file` flag is set
if pn_dir is not None and delete_file:
os.remove(record_name)

# Version of this data format (8 bytes)
version = struct.unpack('<8s', edf_file.read(8))[0].decode()

# Check to see that the input is an EDF file. (This check will detect
# most but not all other types of files.)
if version != '0 ':
raise Exception('Input does not appear to be EDF -- no conversion attempted')
else:
if verbose:
print('EDF version number: {}'.format(version.strip()))

# Local patient identification (80 bytes)
patient_id = struct.unpack('<80s', edf_file.read(80))[0].decode()
if verbose:
print('Patient ID: {}'.format(patient_id))

# Local recording identification (80 bytes)
# Bob Kemp recommends using this field to encode the start date
# including an abbreviated month name in English and a full (4-digit)
# year, as is done here if this information is available in the input
# record. EDF+ requires this.
record_id = struct.unpack('<80s', edf_file.read(80))[0].decode()
if verbose:
print('Recording ID: {}'.format(record_id))

# Start date of recording (dd.mm.yy) (8 bytes)
start_date = struct.unpack('<8s', edf_file.read(8))[0].decode()
if verbose:
print('Recording Date: {}'.format(start_date))
start_day, start_month, start_year = [int(i) for i in start_date.split('.')]
# This should work for a while
if start_year < 1970:
start_year += 1900
if start_year < 1970:
start_year += 100

# Start time of recording (hh.mm.ss) (8 bytes)
start_time = struct.unpack('<8s', edf_file.read(8))[0].decode()
if verbose:
print('Recording Time: {}'.format(start_time))
start_hour, start_minute, start_second = [int(i) for i in start_time.split('.')]

# Number of bytes in header (8 bytes)
header_bytes = int(struct.unpack('<8s', edf_file.read(8))[0].decode())
if verbose:
print('Number of bytes in header record: {}'.format(header_bytes))

# Reserved (44 bytes)
reserved_notes = struct.unpack('<44s', edf_file.read(44))[0].decode().strip()
if reserved_notes != '':
if verbose:
print('Free Space: {}'.format(reserved_notes))

# Number of blocks (-1 if unknown) (8 bytes)
num_blocks = int(struct.unpack('<8s', edf_file.read(8))[0].decode())
if verbose:
print('Number of data records: {}'.format(num_blocks))

# Duration of a block, in seconds (8 bytes)
block_duration = float(struct.unpack('<8s', edf_file.read(8))[0].decode())
if verbose:
print('Duration of each data record in seconds: {}'.format(block_duration))
if block_duration <= 0.0:
block_duration = 1.0

# Number of signals (4 bytes)
n_sig = int(struct.unpack('<4s', edf_file.read(4))[0].decode())
if verbose:
print('Number of signals: {}'.format(n_sig))
if n_sig < 1:
raise Exception('Done: not any signals left to read')

# Label (e.g., EEG FpzCz or Body temp) (16 bytes each)
sig_labels = []
for _ in range(n_sig):
sig_labels.append(struct.unpack('<16s', edf_file.read(16))[0].decode().strip())
if verbose:
print('Signal Labels: {}'.format(sig_labels))

# Transducer type (e.g., AgAgCl electrode) (80 bytes each)
transducer_types = []
for _ in range(n_sig):
transducer_types.append(struct.unpack('<80s', edf_file.read(80))[0].decode().strip())
if verbose:
print('Transducer Types: {}'.format(transducer_types))

# Physical dimension (e.g., uV or degreeC) (8 bytes each)
physical_dims = []
for _ in range(n_sig):
physical_dims.append(struct.unpack('<8s', edf_file.read(8))[0].decode().strip())
if verbose:
print('Physical Dimensions: {}'.format(physical_dims))

# Physical minimum (e.g., -500 or 34) (8 bytes each)
physical_min = np.array([])
for _ in range(n_sig):
physical_min = np.append(physical_min, float(struct.unpack('<8s', edf_file.read(8))[0].decode()))
if verbose:
print('Physical Minimums: {}'.format(physical_min))

# Physical maximum (e.g., 500 or 40) (8 bytes each)
physical_max = np.array([])
for _ in range(n_sig):
physical_max = np.append(physical_max, float(struct.unpack('<8s', edf_file.read(8))[0].decode()))
if verbose:
print('Physical Maximums: {}'.format(physical_max))

# Digital minimum (e.g., -2048) (8 bytes each)
digital_min = np.array([])
for _ in range(n_sig):
digital_min = np.append(digital_min, float(struct.unpack('<8s', edf_file.read(8))[0].decode()))
if verbose:
print('Digital Minimums: {}'.format(digital_min))

# Digital maximum (e.g., 2047) (8 bytes each)
digital_max = np.array([])
for _ in range(n_sig):
digital_max = np.append(digital_max, float(struct.unpack('<8s', edf_file.read(8))[0].decode()))
if verbose:
print('Digital Maximums: {}'.format(digital_max))

# Prefiltering (e.g., HP:0.1Hz LP:75Hz) (80 bytes each)
prefilter_info = []
for _ in range(n_sig):
prefilter_info.append(struct.unpack('<80s', edf_file.read(80))[0].decode().strip())
if verbose:
print('Prefiltering Information: {}'.format(prefilter_info))

# Number of samples per block (8 bytes each)
samps_per_block = []
for _ in range(n_sig):
samps_per_block.append(int(struct.unpack('<8s', edf_file.read(8))[0].decode()))
if verbose:
print('Number of Samples per Record: {}'.format(samps_per_block))

# The last 32*nsig bytes in the header are unused
for _ in range(n_sig):
struct.unpack('<32s', edf_file.read(32))[0].decode()

# Pre-process the acquired data before creating the record
sample_rate = [int(i/block_duration) for i in samps_per_block]
fs = functools.reduce(math.gcd, sample_rate)
sig_len = int(num_blocks * block_duration * fs)
base_time = datetime.time(start_hour, start_minute, start_second)
base_date = datetime.date(start_year, start_month, start_day)
comments = []

units = n_sig * ['']
for i,f in enumerate(physical_dims):
if f == 'n/a':
label = sig_labels[i].lower().split()[0]
if label in list(SIG_UNITS.keys()):
units[i] = SIG_UNITS[label]
else:
units[i] = 'n/a'
else:
f = f.replace('µ','u') # Maybe more weird symbols to check for?
units[i] = f

return {
'fs': fs,
'sig_len': sig_len,
'n_sig': n_sig,
'base_date': base_date,
'base_time': base_time,
'units': physical_dims,
'sig_name': sig_labels,
'comments': comments
}

edf_data = mne.io.read_raw_edf(record_name, preload=True)

if pn_dir is not None and delete_file:
Expand Down