diff --git a/wfdb/io/_header.py b/wfdb/io/_header.py index 142a69f7..7322ee87 100644 --- a/wfdb/io/_header.py +++ b/wfdb/io/_header.py @@ -1,5 +1,4 @@ import datetime -import re from typing import Collection, List, Tuple import numpy as np @@ -7,7 +6,7 @@ from wfdb.io import _signal from wfdb.io import util - +from wfdb.io.header import HeaderSyntaxError, rx_record, rx_segment, rx_signal """ Notes @@ -32,12 +31,6 @@ so that the user doesn't need to. But when reading, it should be clear that the fields are missing. -If all of the fields were filled out in a WFDB header file, they would appear -in this order with these seperators: - -RECORD_NAME/NUM_SEG NUM_SIG SAMP_FREQ/COUNT_FREQ(BASE_COUNT_VAL) SAMPS_PER_SIG BASE_TIME BASE_DATE -FILE_NAME FORMATxSAMP_PER_FRAME:SKEW+BYTE_OFFSET ADC_GAIN(BASELINE)/UNITS ADC_RES ADC_ZERO CHECKSUM BLOCK_SIZE DESCRIPTION - """ int_types = (int, np.int64, np.int32, np.int16, np.int8) float_types = (float, np.float64, np.float32) + int_types @@ -135,53 +128,6 @@ # Specifications of all WFDB header fields, except for comments FIELD_SPECS = pd.concat((RECORD_SPECS, SIGNAL_SPECS, SEGMENT_SPECS)) -# Regexp objects for reading headers -# Record line -_rx_record = re.compile( - r""" - [ \t]* (?P[-\w]+) - /?(?P\d*) - [ \t]+ (?P\d+) - [ \t]* (?P\d*\.?\d*) - /*(?P-?\d*\.?\d*) - \(?(?P-?\d*\.?\d*)\)? - [ \t]* (?P\d*) - [ \t]* (?P\d{,2}:?\d{,2}:?\d{,2}\.?\d{,6}) - [ \t]* (?P\d{,2}/?\d{,2}/?\d{,4}) - """, - re.VERBOSE, -) - -# Signal line -_rx_signal = re.compile( - r""" - [ \t]* (?P~?[-\w]*\.?[\w]*) - [ \t]+ (?P\d+) - x?(?P\d*) - :?(?P\d*) - \+?(?P\d*) - [ \t]* (?P-?\d*\.?\d*e?[\+-]?\d*) - \(?(?P-?\d*)\)? - /?(?P[\w\^\-\?%\/]*) - [ \t]* (?P\d*) - [ \t]* (?P-?\d*) - [ \t]* (?P-?\d*) - [ \t]* (?P-?\d*) - [ \t]* (?P\d*) - [ \t]* (?P[\S]?[^\t\n\r\f\v]*) - """, - re.VERBOSE, -) - -# Segment line -_rx_segment = re.compile( - r""" - [ \t]* (?P[-\w]*~?) - [ \t]+ (?P\d+) - """, - re.VERBOSE, -) - class BaseHeaderMixin(object): """ @@ -1013,37 +959,6 @@ def wfdb_strptime(time_string: str) -> datetime.time: return datetime.datetime.strptime(time_string, time_fmt).time() -def parse_header_content( - header_content: str, -) -> Tuple[List[str], List[str]]: - """ - Parse the text of a header file. - - Parameters - ---------- - header_content: str - The string content of the full header file - - Returns - ------- - header_lines : List[str] - A list of all the non-comment lines - comment_lines : List[str] - A list of all the comment lines - """ - header_lines, comment_lines = [], [] - for line in header_content.splitlines(): - line = line.strip() - # Comment line - if line.startswith("#"): - comment_lines.append(line) - # Non-empty non-comment line = header line. - elif line: - header_lines.append(line) - - return header_lines, comment_lines - - def _parse_record_line(record_line: str) -> dict: """ Extract fields from a record line string into a dictionary. @@ -1063,7 +978,7 @@ def _parse_record_line(record_line: str) -> dict: record_fields = {} # Read string fields from record line - match = _rx_record.match(record_line) + match = rx_record.match(record_line) if match is None: raise HeaderSyntaxError("invalid syntax in record line") ( @@ -1139,7 +1054,7 @@ def _parse_signal_lines(signal_lines): # Read string fields from signal line for ch in range(n_sig): - match = _rx_signal.match(signal_lines[ch]) + match = rx_signal.match(signal_lines[ch]) if match is None: raise HeaderSyntaxError("invalid syntax in signal line") ( @@ -1213,7 +1128,7 @@ def _read_segment_lines(segment_lines): # Read string fields from signal line for i in range(len(segment_lines)): - match = _rx_segment.match(segment_lines[i]) + match = rx_segment.match(segment_lines[i]) if match is None: raise HeaderSyntaxError("invalid syntax in segment line") ( @@ -1226,7 +1141,3 @@ def _read_segment_lines(segment_lines): segment_fields["seg_len"][i] = int(segment_fields["seg_len"][i]) return segment_fields - - -class HeaderSyntaxError(ValueError): - """Invalid syntax found in a WFDB header file.""" diff --git a/wfdb/io/convert/csv.py b/wfdb/io/convert/csv.py index cd8134d7..3cfd25a2 100644 --- a/wfdb/io/convert/csv.py +++ b/wfdb/io/convert/csv.py @@ -1,10 +1,8 @@ -import datetime import os import numpy as np import pandas as pd -from wfdb.io import _header from wfdb.io.annotation import format_ann_from_df, Annotation, wrann from wfdb.io.record import Record, wrsamp @@ -121,16 +119,13 @@ def csv_to_wfdb( The base counter value is a floating-point number that specifies the counter value corresponding to sample 0. If absent, the base counter value is taken to be 0. - base_time : str, optional + base_time : datetime.time, optional This field can be present only if the number of samples is also present. It gives the time of day that corresponds to the beginning of the - record, in 'HH:MM:SS' format (using a 24-hour clock; thus '13:05:00', or - '13:5:0', represent 1:05 pm). If this field is absent, the time-conversion - functions assume a value of '0:0:0', corresponding to midnight. - base_date : str, optional + record. + base_date : datetime.date, optional This field can be present only if the base time is also present. It contains - the date that corresponds to the beginning of the record, in 'DD/MM/YYYY' - format (e.g., '25/4/1989' is '25 April 1989'). + the date that corresponds to the beginning of the record. comments : list, optional A list of string comments to be written to the header file. Each string entry represents a new line to be appended to the bottom of the header @@ -416,12 +411,6 @@ def csv_to_wfdb( if verbose: print("Signal block size: {}".format(block_size)) - # Change the dates and times into `datetime` objects - if base_time: - base_time = _header.wfdb_strptime(base_time) - if base_date: - base_date = datetime.datetime.strptime(base_date, "%d/%m/%Y").date() - # Convert array to floating point p_signal = p_signal.astype("float64") diff --git a/wfdb/io/header.py b/wfdb/io/header.py new file mode 100644 index 00000000..0f0f7eed --- /dev/null +++ b/wfdb/io/header.py @@ -0,0 +1,127 @@ +""" +Module for parsing header files. + +This module will eventually replace _header.py + +""" +import datetime +import re +from typing import List, Tuple + + +class HeaderSyntaxError(ValueError): + """Invalid syntax found in a WFDB header file.""" + + +# Record line pattern. Format: +# RECORD_NAME/NUM_SEG NUM_SIG SAMP_FREQ/COUNT_FREQ(BASE_COUNT_VAL) SAMPS_PER_SIG BASE_TIME BASE_DATE +rx_record = re.compile( + r""" + [ \t]* (?P[-\w]+) + /?(?P\d*) + [ \t]+ (?P\d+) + [ \t]* (?P\d*\.?\d*) + /*(?P-?\d*\.?\d*) + \(?(?P-?\d*\.?\d*)\)? + [ \t]* (?P\d*) + [ \t]* (?P\d{,2}:?\d{,2}:?\d{,2}\.?\d{,6}) + [ \t]* (?P\d{,2}/?\d{,2}/?\d{,4}) + """, + re.VERBOSE, +) + +# Signal line pattern. Format: +# FILE_NAME FORMATxSAMP_PER_FRAME:SKEW+BYTE_OFFSET ADC_GAIN(BASELINE)/UNITS ADC_RES ADC_ZERO CHECKSUM BLOCK_SIZE DESCRIPTION +rx_signal = re.compile( + r""" + [ \t]* (?P~?[-\w]*\.?[\w]*) + [ \t]+ (?P\d+) + x?(?P\d*) + :?(?P\d*) + \+?(?P\d*) + [ \t]* (?P-?\d*\.?\d*e?[\+-]?\d*) + \(?(?P-?\d*)\)? + /?(?P[\w\^\-\?%\/]*) + [ \t]* (?P\d*) + [ \t]* (?P-?\d*) + [ \t]* (?P-?\d*) + [ \t]* (?P-?\d*) + [ \t]* (?P\d*) + [ \t]* (?P[\S]?[^\t\n\r\f\v]*) + """, + re.VERBOSE, +) + +# Segment line +rx_segment = re.compile( + r""" + [ \t]* (?P[-\w]*~?) + [ \t]+ (?P\d+) + """, + re.VERBOSE, +) + + +def wfdb_strptime(time_string: str) -> datetime.time: + """ + Given a time string in an acceptable WFDB format, return + a datetime.time object. + + Valid formats: SS, MM:SS, HH:MM:SS, all with and without microsec. + + Parameters + ---------- + time_string : str + The time to be converted to a datetime.time object. + + Returns + ------- + datetime.time object + The time converted from str format. + + """ + n_colons = time_string.count(":") + + if n_colons == 0: + time_fmt = "%S" + elif n_colons == 1: + time_fmt = "%M:%S" + elif n_colons == 2: + time_fmt = "%H:%M:%S" + + if "." in time_string: + time_fmt += ".%f" + + return datetime.datetime.strptime(time_string, time_fmt).time() + + +def parse_header_content( + header_content: str, +) -> Tuple[List[str], List[str]]: + """ + Parse the text of a header file. + + Parameters + ---------- + header_content: str + The string content of the full header file + + Returns + ------- + header_lines : List[str] + A list of all the non-comment lines + comment_lines : List[str] + A list of all the comment lines + + """ + header_lines, comment_lines = [], [] + for line in header_content.splitlines(): + line = line.strip() + # Comment line + if line.startswith("#"): + comment_lines.append(line) + # Non-empty non-comment line = header line. + elif line: + header_lines.append(line) + + return header_lines, comment_lines diff --git a/wfdb/io/record.py b/wfdb/io/record.py index 8d096ab4..3b9766a6 100644 --- a/wfdb/io/record.py +++ b/wfdb/io/record.py @@ -11,6 +11,7 @@ from wfdb.io import _signal from wfdb.io import _url from wfdb.io import download +from wfdb.io import header from wfdb.io import util @@ -1840,7 +1841,7 @@ def rdheader(record_name, pn_dir=None, rd_segments=False): header_content = download._stream_header(file_name, pn_dir) # Separate comment and non-comment lines - header_lines, comment_lines = _header.parse_header_content(header_content) + header_lines, comment_lines = header.parse_header_content(header_content) # Get fields from record line record_fields = _header._parse_record_line(header_lines[0])