From dc0aa0fe5bb3f306a62d04e4d9bbbcb6e9d9cc67 Mon Sep 17 00:00:00 2001 From: Chen Xie Date: Mon, 27 Jun 2022 20:37:18 -0700 Subject: [PATCH 1/2] Add new dataclasses for WFDB metadata --- wfdb/io/_header.py | 140 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 129 insertions(+), 11 deletions(-) diff --git a/wfdb/io/_header.py b/wfdb/io/_header.py index 15c8065a..28a7c05a 100644 --- a/wfdb/io/_header.py +++ b/wfdb/io/_header.py @@ -1,6 +1,7 @@ +from dataclasses import dataclass import datetime import re -from typing import List, Tuple +from typing import Collection, List, Optional, Tuple, Union import numpy as np import pandas as pd @@ -35,9 +36,6 @@ If all of the fields were filled out in a WFDB header file, they would appear in this order with these seperators: -RECORD_NAME/NUM_SEG NUM_SIG SAMP_FREQ/COUNT_FREQ(BASE_COUNT_VAL) SAMPS_PER_SIG BASE_TIME BASE_DATE -FILE_NAME FORMATxSAMP_PER_FRAME:SKEW+BYTE_OFFSET ADC_GAIN(BASELINE)/UNITS ADC_RES ADC_ZERO CHECKSUM BLOCK_SIZE DESCRIPTION - """ int_types = (int, np.int64, np.int32, np.int16, np.int8) float_types = (float, np.float64, np.float32) + int_types @@ -135,8 +133,128 @@ # Specifications of all WFDB header fields, except for comments FIELD_SPECS = pd.concat((RECORD_SPECS, SIGNAL_SPECS, SEGMENT_SPECS)) -# Regexp objects for reading headers -# Record line + +@dataclass +class SignalInfo: + """ + Signal specification fields for one signal + """ + + file_name: Optional[str] = None + fmt: Optional[str] = None + samps_per_frame: Optional[int] = None + skew: Optional[int] = None + byte_offset: Optional[int] = None + adc_gain: Optional[float] = None + baseline: Optional[int] = None + units: Optional[str] = None + adc_res: Optional[int] = None + adc_zero: Optional[int] = None + init_value: Optional[int] = None + checksum: Optional[int] = None + block_size: Optional[int] = None + sig_name: Optional[str] = None + + +class SignalSet: + """ + Wrapper for a set of signal information. Provides useful access/modify methods. + """ + + def __init__(self, signals: List[SignalInfo]): + self._signal_info = signals + try: + self._generate_name_map() + except ValueError: + pass + + def _generate_name_map(self): + """ + Generate mapping of channel names to channel indices to allow + for access by both index and name. + + Raises + ------ + ValueError + Raises unless all channel names are present and unique. + + """ + self._channel_inds = None + channel_inds = {} + + for ch, signal in enumerate(self._signal_info): + sig_name = signal.sig_name + if not sig_name or sig_name in channel_inds: + raise ValueError( + "Cannot generate name map: channel names are not unique" + ) + channel_inds[sig_name] = ch + + self._channel_inds = channel_inds + + def __getitem__(self, key: Union[int, str]): + if isinstance(key, str): + if not self._channel_inds: + raise KeyError("Channel name mapping not available") + + return self._signal_info[key] + + +@dataclass +class _RecordFields: + """ + Record specification fields for a record. + + Used by helper functions and to be inherited by class RecordInfo. + + """ + + name: Optional[str] = None + n_seg: Optional[int] = None + n_sig: Optional[int] = None + fs: Optional[float] = None + counter_freq: Optional[float] = None + base_counter: Optional[float] = None + sig_len: Optional[int] = None + base_time: Optional[datetime.time] = None + base_date: Optional[datetime.date] = None + + +@dataclass +class RecordInfo(_RecordFields): + """ + The core object encapsulating WFDB metadata for a single-segment record. + Contains record specification fields and signal specification fields. + """ + + # All signal fields are encapsulated under this field + signals: Optional[SignalSet] = None + + comments: List[str] = None + + +@dataclass +class SegmentFields: + """ + Segment specification fields for a single segment. + """ + + seg_name: Optional[str] = None + seg_len: Optional[int] = None + + +@dataclass +class MultiRecord(_RecordFields): + """ + The core object encapsulating WFDB metadata for a multi-segment record. + Contains record specification fields and segment specification fields. + """ + + segments: List[SegmentFields] = None + + +# Record line pattern. Format: +# RECORD_NAME/NUM_SEG NUM_SIG SAMP_FREQ/COUNT_FREQ(BASE_COUNT_VAL) SAMPS_PER_SIG BASE_TIME BASE_DATE _rx_record = re.compile( r""" [ \t]* (?P[-\w]+) @@ -152,7 +270,8 @@ re.VERBOSE, ) -# Signal line +# Signal line pattern. Format: +# FILE_NAME FORMATxSAMP_PER_FRAME:SKEW+BYTE_OFFSET ADC_GAIN(BASELINE)/UNITS ADC_RES ADC_ZERO CHECKSUM BLOCK_SIZE DESCRIPTION _rx_signal = re.compile( r""" [ \t]* (?P~?[-\w]*\.?[\w]*) @@ -1104,8 +1223,8 @@ def _read_segment_lines(segment_lines): segment_fields[field] = [None] * len(segment_lines) # Read string fields from signal line - for i in range(len(segment_lines)): - match = _rx_segment.match(segment_lines[i]) + for i, line in enumerate(segment_lines): + match = _rx_segment.match(line) if match is None: raise HeaderSyntaxError("invalid syntax in segment line") ( @@ -1114,8 +1233,7 @@ def _read_segment_lines(segment_lines): ) = match.groups() # Typecast strings for numerical field - if field == "seg_len": - segment_fields["seg_len"][i] = int(segment_fields["seg_len"][i]) + segment_fields["seg_len"][i] = int(segment_fields["seg_len"][i]) return segment_fields From 8e00f254282d293183ad673da7fb355c9bd4b3bc Mon Sep 17 00:00:00 2001 From: Chen Xie Date: Tue, 28 Jun 2022 21:09:10 -0700 Subject: [PATCH 2/2] Add defaults --- wfdb/io/_header.py | 121 ++++++++++++++++++++++++++++++++++++++------- wfdb/io/_signal.py | 2 + 2 files changed, 106 insertions(+), 17 deletions(-) diff --git a/wfdb/io/_header.py b/wfdb/io/_header.py index 28a7c05a..45395e08 100644 --- a/wfdb/io/_header.py +++ b/wfdb/io/_header.py @@ -1,7 +1,7 @@ from dataclasses import dataclass import datetime import re -from typing import Collection, List, Optional, Tuple, Union +from typing import Any, Collection, Dict, List, Optional, Tuple, Union import numpy as np import pandas as pd @@ -200,6 +200,95 @@ def __getitem__(self, key: Union[int, str]): return self._signal_info[key] +@dataclass +class WFDBField: + is_required: bool + data_type: type + + # is_required + has_default? + + +RECORD_FIELDS: Dict[str, WFDBField] = { + "record_name": WFDBField(is_required=True, data_type=str), + "n_seg": WFDBField(is_required=False, data_type=int), + "n_sig": WFDBField(is_required=True, data_type=int), + "fs": WFDBField(is_required=False, data_type=float), + "counter_freq": WFDBField(is_required=False, data_type=int), + "base_counter": WFDBField(is_required=False, data_type=float), + "sig_len": WFDBField(is_required=False, data_type=int), + "base_time": WFDBField(is_required=False, data_type=datetime.time), + "base_date": WFDBField(is_required=False, data_type=datetime.date), +} + +WFDB_FIELDS : Dict[str, WFDBField]= dict(**RECORD_FIELDS) + + + +def get_field_default(fields: dict, field_name: str) -> Any: + """ + Gets the default value for a WFDB field, if it has one. + + Returns + ------ + N/A : Any + The default value for the field. This may be None, which is different + from the field not having a default. + + Raises + ----- + ValueError + If the field has no default value + HeaderSyntaxError + If the field's default value is dependent on another field, which + is missing in the 'fields' parameter. + """ + if WFDB_FIELDS[field_name].is_required: + raise ValueError(f"{field_name} is a required field with no default") + + # Special rules + if field_name == "counter_freq": + if "fs" not in fields: + raise HeaderSyntaxError( + "counter_freq should default to fs, which is missing" + ) + return fields["fs"] + + if field_name == "baseline": + if "adc_zero" not in fields: + raise HeaderSyntaxError( + "baseline should default to adc_zero, which is missing" + ) + return fields["adc_zero"] + + if field_name == "init_value": + if "adc_zero" not in fields: + raise HeaderSyntaxError( + "init_value should default to adc_zero, which is missing" + ) + return fields["adc_zero"] + + if field_name == "adc_res": + # If this field is missing or zero, it is interpreted to be 12 bits + # for amplitude-format signals, or 10 bits for difference-format + # signals, unless a lower value is specified by the format field. + if "fmt" not in fields: + raise HeaderSyntaxError("adc_res depends on fmt, which is missing") + fmt = fields["fmt"] + + res = 10 if fmt in _signal.DIFFERENCE_FMTS else 12 + return min(res, _signal.BIT_RES[fmt]) + + if field_name == "n_seg": + return None + if field_name == "fs": + return 250 + if field_name == "base_counter": + return 0 + + + + + @dataclass class _RecordFields: """ @@ -209,7 +298,7 @@ class _RecordFields: """ - name: Optional[str] = None + record_name: Optional[str] = None n_seg: Optional[int] = None n_sig: Optional[int] = None fs: Optional[float] = None @@ -1055,7 +1144,7 @@ def parse_header_content( return header_lines, comment_lines -def _parse_record_line(record_line: str) -> dict: +def _parse_record_line(record_line: str) -> _RecordFields: """ Extract fields from a record line string into a dictionary. @@ -1069,14 +1158,19 @@ def _parse_record_line(record_line: str) -> dict: record_fields : dict The fields for the given record line. + Raises + ------ + HeaderSyntaxError + If the input is not in the form of a valid WFDB record line. + """ - # Dictionary for record fields + record_fields = {} # Read string fields from record line match = _rx_record.match(record_line) if match is None: - raise HeaderSyntaxError("invalid syntax in record line") + raise HeaderSyntaxError("Invalid syntax in record line") ( record_fields["record_name"], record_fields["n_seg"], @@ -1089,11 +1183,10 @@ def _parse_record_line(record_line: str) -> dict: record_fields["base_date"], ) = match.groups() - for field in RECORD_SPECS.index: - # Replace empty strings with their read defaults (which are - # mostly None) - if record_fields[field] == "": - record_fields[field] = RECORD_SPECS.loc[field, "read_default"] + for field_name, field_value in record_fields.items(): + # Replace empty strings with the field defaults + if field_value == "": + record_fields[field_name] = RECORD_SPECS.loc[field, "read_default"] # Typecast non-empty strings for non-string (numerical/datetime) # fields else: @@ -1116,13 +1209,7 @@ def _parse_record_line(record_line: str) -> dict: record_fields["base_date"], "%d/%m/%Y" ).date() - # This is not a standard WFDB field, but is useful to set. - if record_fields["base_date"] and record_fields["base_time"]: - record_fields["base_datetime"] = datetime.datetime.combine( - record_fields["base_date"], record_fields["base_time"] - ) - - return record_fields + return _RecordFields(**record_fields) def _parse_signal_lines(signal_lines): diff --git a/wfdb/io/_signal.py b/wfdb/io/_signal.py index c40b1883..9d4f3959 100644 --- a/wfdb/io/_signal.py +++ b/wfdb/io/_signal.py @@ -18,6 +18,8 @@ COMPRESSED_FMTS = ["508", "516", "524"] # Formats which are stored in offset binary form OFFSET_FMTS = ["80", "160"] +# Formats which are stored in difference format +DIFFERENCE_FMTS = ["8"] # All WFDB dat formats - https://www.physionet.org/physiotools/wag/signal-5.htm DAT_FMTS = ALIGNED_FMTS + UNALIGNED_FMTS + COMPRESSED_FMTS