From 5d3e044d202621d5e4668f2d6db98289ad177b01 Mon Sep 17 00:00:00 2001 From: Peter H Charlton Date: Mon, 29 Jul 2024 13:36:29 +0100 Subject: [PATCH] Add MSPTDfast algorithm Added MSPTDfast algorithm (v.1.1) for PPG beat detection. --- neurokit2/ppg/ppg_findpeaks.py | 246 ++++++++++++++++++++++++++++++++- 1 file changed, 245 insertions(+), 1 deletion(-) diff --git a/neurokit2/ppg/ppg_findpeaks.py b/neurokit2/ppg/ppg_findpeaks.py index e0750fb523..4fdaf4b7e6 100644 --- a/neurokit2/ppg/ppg_findpeaks.py +++ b/neurokit2/ppg/ppg_findpeaks.py @@ -68,6 +68,8 @@ def ppg_findpeaks( * Bishop, S. M., & Ercole, A. (2018). Multi-scale peak and trough detection optimised for periodic and quasi-periodic neuroscience data. In Intracranial Pressure & Neuromonitoring XVI (pp. 189-195). Springer International Publishing. + * Charlton, P. H. et al. (2024). MSPTDfast: An Efficient Photoplethysmography Beat Detection + Algorithm. Proc CinC. """ method = method.lower() @@ -75,13 +77,17 @@ def ppg_findpeaks( peaks = _ppg_findpeaks_elgendi(ppg_cleaned, sampling_rate, show=show, **kwargs) elif method in ["msptd", "bishop2018", "bishop"]: peaks, _ = _ppg_findpeaks_bishop(ppg_cleaned, show=show, **kwargs) + elif method in ["msptdfast", "msptdfastv1", "charlton2024", "charlton"]: + peaks, onsets = _ppg_findpeaks_charlton(ppg_cleaned, sampling_rate, show=show, **kwargs) else: raise ValueError( - "`method` not found. Must be one of the following: 'elgendi', 'bishop'." + "`method` not found. Must be one of the following: 'elgendi', 'bishop', 'charlton'." ) # Prepare output. info = {"PPG_Peaks": peaks} + if 'onsets' in locals(): + info["PPG_Onsets"] = onsets return info @@ -239,3 +245,241 @@ def _ppg_findpeaks_bishop( ax0.set_title("PPG Peaks (Method by Bishop et al., 2018)") return peaks, onsets + + +def _ppg_findpeaks_charlton( + signal, + sampling_rate=1000, + show=False, +): + """Implementation of Charlton et al (2024) MSPTDfast: An Efficient Photoplethysmography + Beat Detection Algorithm. 2024 Computing in Cardiology (CinC), Karlsruhe, Germany, + doi:10.1101/2024.07.18.24310627. + """ + + # Inner functions + + def find_m_max(x, N, max_scale, m_max): + """Find local maxima scalogram for peaks + """ + + for k in range(1, max_scale + 1): # scalogram scales + for i in range(k + 2, N - k + 2): + if x[i - 2] > x[i - k - 2] and x[i - 2] > x[i + k - 2]: + m_max[k - 1, i - 2] = True + + return m_max + + def find_m_min(x, N, max_scale, m_min): + """Find local minima scalogram for onsets + """ + + for k in range(1, max_scale + 1): # scalogram scales + for i in range(k + 2, N - k + 2): + if x[i - 2] < x[i - k - 2] and x[i - 2] < x[i + k - 2]: + m_min[k - 1, i - 2] = True + + return m_min + + def find_lms_using_msptd_approach(max_scale, x, options): + """Find local maxima (or minima) scalogram(s) using the + MSPTD approach + """ + + # Setup + N = len(x) + + # Find local maxima scalogram (if required) + if options["find_pks"]: + m_max = np.full((max_scale, N), False) # matrix for maxima + m_max = find_m_max(x, N, max_scale, m_max) + else: + m_max = None + + # Find local minima scalogram (if required) + if options["find_trs"]: + m_min = np.full((max_scale, N), False) # matrix for minima + m_min = find_m_min(x, N, max_scale, m_min) + else: + m_min = None + + return m_max, m_min + + def downsample(win_sig, ds_factor): + """Downsamples signal by picking out every nth sample, where n is + specified by ds_factor + """ + + return win_sig[::ds_factor] + + def detect_peaks_and_onsets_using_msptd(signal, fs, options): + """Detect peaks and onsets in a PPG signal using a modified MSPTD approach + (where the modifications are those specified in Charlton et al. 2024) + """ + + # Setup + N = len(signal) + L = int(np.ceil(N / 2) - 1) + + # Step 0: Don't calculate scales outside the range of plausible HRs + + plaus_hr_hz = np.array(options['plaus_hr_bpm']) / 60 # in Hz + init_scales = np.arange(1, L + 1) + durn_signal = len(signal) / fs + init_scales_fs = (L / init_scales) / durn_signal + if options['use_reduced_lms_scales']: + init_scales_inc_log = init_scales_fs >= plaus_hr_hz[0] + else: + init_scales_inc_log = np.ones_like(init_scales_fs, dtype=bool) # DIDN"T FULLY UNDERSTAND + + max_scale_index = np.where(init_scales_inc_log)[0] # DIDN"T FULLY UNDERSTAND THIS AND NEXT FEW LINES + if max_scale_index.size > 0: + max_scale = max_scale_index[-1] + 1 # Add 1 to convert from 0-based to 1-based index + else: + max_scale = None # Or handle the case where no scales are valid + + # Step 1: calculate local maxima and local minima scalograms + + # - detrend + x = scipy.signal.detrend(signal, type="linear") + + # - populate LMS matrices + [m_max, m_min] = find_lms_using_msptd_approach(max_scale, x, options) + + # Step 2: find the scale with the most local maxima (or local minima) + + # - row-wise summation (i.e. sum each row) + if options["find_pks"]: + gamma_max = np.sum(m_max, axis=1) # the "axis=1" option makes it row-wise + if options["find_trs"]: + gamma_min = np.sum(m_min, axis=1) + # - find scale with the most local maxima (or local minima) + if options["find_pks"]: + lambda_max = np.argmax(gamma_max) + if options["find_trs"]: + lambda_min = np.argmax(gamma_min) + + # Step 3: Use lambda to remove all elements of m for which k>lambda + first_scale_to_include = np.argmax(init_scales_inc_log) + if options["find_pks"]: + m_max = m_max[first_scale_to_include:lambda_max + 1, :] + if options["find_trs"]: + m_min = m_min[first_scale_to_include:lambda_min + 1, :] + + # Step 4: Find peaks (and onsets) + # - column-wise summation + if options["find_pks"]: + m_max_sum = np.sum(m_max == False, axis=0) + peaks = np.where(m_max_sum == 0)[0].astype(int) + else: + peaks = [] + + if options["find_trs"]: + m_min_sum = np.sum(m_min == False, axis=0) + onsets = np.where(m_min_sum == 0)[0].astype(int) + else: + onsets = [] + + return peaks, onsets + + # ~~~ Main function ~~~ + + # Specify settings + # - version: optimal selection (CinC 2024) + options = { + 'find_trs': True, # whether or not to find onsets + 'find_pks': True, # whether or not to find peaks + 'do_ds': True, # whether or not to do downsampling + 'ds_freq': 20, # the target downsampling frequency + 'use_reduced_lms_scales': True, # whether or not to reduce the number of scales (default 30 bpm) + 'win_len': 8, # duration of individual windows for analysis + 'win_overlap': 0.2, # proportion of window overlap + 'plaus_hr_bpm': [30, 200] # range of plausible HRs (only the lower bound is used) + } + + # Split into overlapping windows + no_samps_in_win = options["win_len"] * sampling_rate + if len(signal) <= no_samps_in_win: + win_starts = 0 + win_ends = len(signal) - 1 + else: + win_offset = round(no_samps_in_win * (1 - options["win_overlap"])) + win_starts = list(range(0, len(signal) - no_samps_in_win + 1, win_offset)) + win_ends = [start + 1 + no_samps_in_win for start in win_starts] + if win_ends[-1] < len(signal): + win_starts.append(len(signal) - 1 - no_samps_in_win) + win_ends.append(len(signal)) + # this ensures that the windows include the entire signal duration + + # Set up downsampling if the sampling frequency is particularly high + if options["do_ds"]: + min_fs = options["ds_freq"] + if sampling_rate > min_fs: + ds_factor = int(np.floor(sampling_rate / min_fs)) + ds_fs = sampling_rate / np.floor(sampling_rate / min_fs) + else: + options["do_ds"] = False + + # detect peaks and onsets in each window + peaks = [] + onsets = [] + + # cycle through each window + for win_no in range(len(win_starts)): + # Extract this window's data + win_sig = signal[win_starts[win_no]:win_ends[win_no]] + + # Downsample signal + if options['do_ds']: + rel_sig = downsample(win_sig, ds_factor) + rel_fs = ds_fs + else: + rel_sig = win_sig + rel_fs = sampling_rate + + # Detect peaks and onsets + p, t = detect_peaks_and_onsets_using_msptd(rel_sig, rel_fs, options) + + # Resample peaks + if options['do_ds']: + p = [peak * ds_factor for peak in p] + t = [onset * ds_factor for onset in t] + + # Correct peak indices by finding highest point within tolerance either side of detected peaks + tol_durn = 0.05 + if rel_fs < 10: + tol_durn = 0.2 + elif rel_fs < 20: + tol_durn = 0.1 + tol = int(np.ceil(rel_fs * tol_durn)) + + for pk_no in range(len(p)): + segment = win_sig[(p[pk_no] - tol):(p[pk_no] + tol + 1)] + temp = np.argmax(segment) + p[pk_no] = p[pk_no] - tol + temp + + # Correct onset indices by finding highest point within tolerance either side of detected onsets + for onset_no in range(len(t)): + segment = win_sig[(t[onset_no] - tol):(t[onset_no] + tol + 1)] + temp = np.argmin(segment) + t[onset_no] = t[onset_no] - tol + temp + + # Store peaks and onsets + win_peaks = [peak + win_starts[win_no] for peak in p] + peaks.extend(win_peaks) + win_onsets = [onset + win_starts[win_no] for onset in t] + onsets.extend(win_onsets) + + # Tidy up detected peaks and onsets (by ordering them and only retaining unique ones) + peaks = sorted(set(peaks)) + onsets = sorted(set(onsets)) + + # Plot results (optional) + if show: + _, ax0 = plt.subplots(nrows=1, ncols=1, sharex=True) + ax0.plot(signal, label="signal") + ax0.scatter(peaks, signal[peaks], c="r") + ax0.scatter(onsets, signal[onsets], c="b") + ax0.set_title("PPG Onsets (Method by Charlton et al., 2024)") + + return peaks, onsets