From 089eafdb926661c95d18d7e2132d2feb8feaed21 Mon Sep 17 00:00:00 2001 From: Lucas McCullum Date: Fri, 18 Dec 2020 16:30:48 -0500 Subject: [PATCH] Adds optional EDF header reader --- wfdb/io/record.py | 190 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 189 insertions(+), 1 deletion(-) diff --git a/wfdb/io/record.py b/wfdb/io/record.py index 5b6f1237..3e8cda4a 100644 --- a/wfdb/io/record.py +++ b/wfdb/io/record.py @@ -1349,7 +1349,8 @@ def check_np_array(item, field_name, ndim, parent_class, channel_num=None): raise TypeError(error_msg) -def edf2mit(record_name, pn_dir=None, delete_file=True, record_only=False): +def edf2mit(record_name, pn_dir=None, delete_file=True, record_only=False, + header_only=False, verbose=False): """ Convert EDF formatted files to MIT format. @@ -1384,6 +1385,13 @@ def edf2mit(record_name, pn_dir=None, delete_file=True, record_only=False): record_only : bool, optional Whether to only return the record information (True) or not (False). If false, this function will generate both a .dat and .hea file. + header_only : bool, optional + Whether to only return the header information (True) or not (False). + If true, this function will only return `['fs', 'sig_len', 'n_sig', + 'base_date', 'base_time', 'units', 'sig_name', 'comments']`. + verbose : bool, optional + Whether to print all the information read about the file (True) or + not (False). Returns ------- @@ -1412,6 +1420,186 @@ def edf2mit(record_name, pn_dir=None, delete_file=True, record_only=False): r = requests.get(file_url, allow_redirects=False) open(record_name, 'wb').write(r.content) + # Temporary to return only the EDF header.. will later replace the + # current MNE package approach + if header_only: + # Open the desired file + edf_file = open(record_name, mode='rb') + + # Remove the file if the `delete_file` flag is set + if pn_dir is not None and delete_file: + os.remove(record_name) + + # Version of this data format (8 bytes) + version = struct.unpack('<8s', edf_file.read(8))[0].decode() + + # Check to see that the input is an EDF file. (This check will detect + # most but not all other types of files.) + if version != '0 ': + raise Exception('Input does not appear to be EDF -- no conversion attempted') + else: + if verbose: + print('EDF version number: {}'.format(version.strip())) + + # Local patient identification (80 bytes) + patient_id = struct.unpack('<80s', edf_file.read(80))[0].decode() + if verbose: + print('Patient ID: {}'.format(patient_id)) + + # Local recording identification (80 bytes) + # Bob Kemp recommends using this field to encode the start date + # including an abbreviated month name in English and a full (4-digit) + # year, as is done here if this information is available in the input + # record. EDF+ requires this. + record_id = struct.unpack('<80s', edf_file.read(80))[0].decode() + if verbose: + print('Recording ID: {}'.format(record_id)) + + # Start date of recording (dd.mm.yy) (8 bytes) + start_date = struct.unpack('<8s', edf_file.read(8))[0].decode() + if verbose: + print('Recording Date: {}'.format(start_date)) + start_day, start_month, start_year = [int(i) for i in start_date.split('.')] + # This should work for a while + if start_year < 1970: + start_year += 1900 + if start_year < 1970: + start_year += 100 + + # Start time of recording (hh.mm.ss) (8 bytes) + start_time = struct.unpack('<8s', edf_file.read(8))[0].decode() + if verbose: + print('Recording Time: {}'.format(start_time)) + start_hour, start_minute, start_second = [int(i) for i in start_time.split('.')] + + # Number of bytes in header (8 bytes) + header_bytes = int(struct.unpack('<8s', edf_file.read(8))[0].decode()) + if verbose: + print('Number of bytes in header record: {}'.format(header_bytes)) + + # Reserved (44 bytes) + reserved_notes = struct.unpack('<44s', edf_file.read(44))[0].decode().strip() + if reserved_notes != '': + if verbose: + print('Free Space: {}'.format(reserved_notes)) + + # Number of blocks (-1 if unknown) (8 bytes) + num_blocks = int(struct.unpack('<8s', edf_file.read(8))[0].decode()) + if verbose: + print('Number of data records: {}'.format(num_blocks)) + + # Duration of a block, in seconds (8 bytes) + block_duration = float(struct.unpack('<8s', edf_file.read(8))[0].decode()) + if verbose: + print('Duration of each data record in seconds: {}'.format(block_duration)) + if block_duration <= 0.0: + block_duration = 1.0 + + # Number of signals (4 bytes) + n_sig = int(struct.unpack('<4s', edf_file.read(4))[0].decode()) + if verbose: + print('Number of signals: {}'.format(n_sig)) + if n_sig < 1: + raise Exception('Done: not any signals left to read') + + # Label (e.g., EEG FpzCz or Body temp) (16 bytes each) + sig_labels = [] + for _ in range(n_sig): + sig_labels.append(struct.unpack('<16s', edf_file.read(16))[0].decode().strip()) + if verbose: + print('Signal Labels: {}'.format(sig_labels)) + + # Transducer type (e.g., AgAgCl electrode) (80 bytes each) + transducer_types = [] + for _ in range(n_sig): + transducer_types.append(struct.unpack('<80s', edf_file.read(80))[0].decode().strip()) + if verbose: + print('Transducer Types: {}'.format(transducer_types)) + + # Physical dimension (e.g., uV or degreeC) (8 bytes each) + physical_dims = [] + for _ in range(n_sig): + physical_dims.append(struct.unpack('<8s', edf_file.read(8))[0].decode().strip()) + if verbose: + print('Physical Dimensions: {}'.format(physical_dims)) + + # Physical minimum (e.g., -500 or 34) (8 bytes each) + physical_min = np.array([]) + for _ in range(n_sig): + physical_min = np.append(physical_min, float(struct.unpack('<8s', edf_file.read(8))[0].decode())) + if verbose: + print('Physical Minimums: {}'.format(physical_min)) + + # Physical maximum (e.g., 500 or 40) (8 bytes each) + physical_max = np.array([]) + for _ in range(n_sig): + physical_max = np.append(physical_max, float(struct.unpack('<8s', edf_file.read(8))[0].decode())) + if verbose: + print('Physical Maximums: {}'.format(physical_max)) + + # Digital minimum (e.g., -2048) (8 bytes each) + digital_min = np.array([]) + for _ in range(n_sig): + digital_min = np.append(digital_min, float(struct.unpack('<8s', edf_file.read(8))[0].decode())) + if verbose: + print('Digital Minimums: {}'.format(digital_min)) + + # Digital maximum (e.g., 2047) (8 bytes each) + digital_max = np.array([]) + for _ in range(n_sig): + digital_max = np.append(digital_max, float(struct.unpack('<8s', edf_file.read(8))[0].decode())) + if verbose: + print('Digital Maximums: {}'.format(digital_max)) + + # Prefiltering (e.g., HP:0.1Hz LP:75Hz) (80 bytes each) + prefilter_info = [] + for _ in range(n_sig): + prefilter_info.append(struct.unpack('<80s', edf_file.read(80))[0].decode().strip()) + if verbose: + print('Prefiltering Information: {}'.format(prefilter_info)) + + # Number of samples per block (8 bytes each) + samps_per_block = [] + for _ in range(n_sig): + samps_per_block.append(int(struct.unpack('<8s', edf_file.read(8))[0].decode())) + if verbose: + print('Number of Samples per Record: {}'.format(samps_per_block)) + + # The last 32*nsig bytes in the header are unused + for _ in range(n_sig): + struct.unpack('<32s', edf_file.read(32))[0].decode() + + # Pre-process the acquired data before creating the record + sample_rate = [int(i/block_duration) for i in samps_per_block] + fs = functools.reduce(math.gcd, sample_rate) + sig_len = int(num_blocks * block_duration * fs) + base_time = datetime.time(start_hour, start_minute, start_second) + base_date = datetime.date(start_year, start_month, start_day) + comments = [] + + units = n_sig * [''] + for i,f in enumerate(physical_dims): + if f == 'n/a': + label = sig_labels[i].lower().split()[0] + if label in list(SIG_UNITS.keys()): + units[i] = SIG_UNITS[label] + else: + units[i] = 'n/a' + else: + f = f.replace('ยต','u') # Maybe more weird symbols to check for? + units[i] = f + + return { + 'fs': fs, + 'sig_len': sig_len, + 'n_sig': n_sig, + 'base_date': base_date, + 'base_time': base_time, + 'units': physical_dims, + 'sig_name': sig_labels, + 'comments': comments + } + edf_data = mne.io.read_raw_edf(record_name, preload=True) if pn_dir is not None and delete_file: