Skip to content

Commit

Permalink
Merge pull request #1018 from peterhcharlton/add_msptdfastv1.1
Browse files Browse the repository at this point in the history
Add MSPTDfast algorithm
  • Loading branch information
DominiqueMakowski authored Aug 5, 2024
2 parents 8195d2f + 5d3e044 commit ee87db7
Showing 1 changed file with 245 additions and 1 deletion.
246 changes: 245 additions & 1 deletion neurokit2/ppg/ppg_findpeaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,26 @@ def ppg_findpeaks(
* Bishop, S. M., & Ercole, A. (2018). Multi-scale peak and trough detection optimised for
periodic and quasi-periodic neuroscience data. In Intracranial Pressure & Neuromonitoring XVI
(pp. 189-195). Springer International Publishing.
* Charlton, P. H. et al. (2024). MSPTDfast: An Efficient Photoplethysmography Beat Detection
Algorithm. Proc CinC.
"""
method = method.lower()
if method in ["elgendi"]:
peaks = _ppg_findpeaks_elgendi(ppg_cleaned, sampling_rate, show=show, **kwargs)
elif method in ["msptd", "bishop2018", "bishop"]:
peaks, _ = _ppg_findpeaks_bishop(ppg_cleaned, show=show, **kwargs)
elif method in ["msptdfast", "msptdfastv1", "charlton2024", "charlton"]:
peaks, onsets = _ppg_findpeaks_charlton(ppg_cleaned, sampling_rate, show=show, **kwargs)
else:
raise ValueError(
"`method` not found. Must be one of the following: 'elgendi', 'bishop'."
"`method` not found. Must be one of the following: 'elgendi', 'bishop', 'charlton'."
)

# Prepare output.
info = {"PPG_Peaks": peaks}
if 'onsets' in locals():
info["PPG_Onsets"] = onsets

return info

Expand Down Expand Up @@ -239,3 +245,241 @@ def _ppg_findpeaks_bishop(
ax0.set_title("PPG Peaks (Method by Bishop et al., 2018)")

return peaks, onsets


def _ppg_findpeaks_charlton(
signal,
sampling_rate=1000,
show=False,
):
"""Implementation of Charlton et al (2024) MSPTDfast: An Efficient Photoplethysmography
Beat Detection Algorithm. 2024 Computing in Cardiology (CinC), Karlsruhe, Germany,
doi:10.1101/2024.07.18.24310627.
"""

# Inner functions

def find_m_max(x, N, max_scale, m_max):
"""Find local maxima scalogram for peaks
"""

for k in range(1, max_scale + 1): # scalogram scales
for i in range(k + 2, N - k + 2):
if x[i - 2] > x[i - k - 2] and x[i - 2] > x[i + k - 2]:
m_max[k - 1, i - 2] = True

return m_max

def find_m_min(x, N, max_scale, m_min):
"""Find local minima scalogram for onsets
"""

for k in range(1, max_scale + 1): # scalogram scales
for i in range(k + 2, N - k + 2):
if x[i - 2] < x[i - k - 2] and x[i - 2] < x[i + k - 2]:
m_min[k - 1, i - 2] = True

return m_min

def find_lms_using_msptd_approach(max_scale, x, options):
"""Find local maxima (or minima) scalogram(s) using the
MSPTD approach
"""

# Setup
N = len(x)

# Find local maxima scalogram (if required)
if options["find_pks"]:
m_max = np.full((max_scale, N), False) # matrix for maxima
m_max = find_m_max(x, N, max_scale, m_max)
else:
m_max = None

# Find local minima scalogram (if required)
if options["find_trs"]:
m_min = np.full((max_scale, N), False) # matrix for minima
m_min = find_m_min(x, N, max_scale, m_min)
else:
m_min = None

return m_max, m_min

def downsample(win_sig, ds_factor):
"""Downsamples signal by picking out every nth sample, where n is
specified by ds_factor
"""

return win_sig[::ds_factor]

def detect_peaks_and_onsets_using_msptd(signal, fs, options):
"""Detect peaks and onsets in a PPG signal using a modified MSPTD approach
(where the modifications are those specified in Charlton et al. 2024)
"""

# Setup
N = len(signal)
L = int(np.ceil(N / 2) - 1)

# Step 0: Don't calculate scales outside the range of plausible HRs

plaus_hr_hz = np.array(options['plaus_hr_bpm']) / 60 # in Hz
init_scales = np.arange(1, L + 1)
durn_signal = len(signal) / fs
init_scales_fs = (L / init_scales) / durn_signal
if options['use_reduced_lms_scales']:
init_scales_inc_log = init_scales_fs >= plaus_hr_hz[0]
else:
init_scales_inc_log = np.ones_like(init_scales_fs, dtype=bool) # DIDN"T FULLY UNDERSTAND

max_scale_index = np.where(init_scales_inc_log)[0] # DIDN"T FULLY UNDERSTAND THIS AND NEXT FEW LINES
if max_scale_index.size > 0:
max_scale = max_scale_index[-1] + 1 # Add 1 to convert from 0-based to 1-based index
else:
max_scale = None # Or handle the case where no scales are valid

# Step 1: calculate local maxima and local minima scalograms

# - detrend
x = scipy.signal.detrend(signal, type="linear")

# - populate LMS matrices
[m_max, m_min] = find_lms_using_msptd_approach(max_scale, x, options)

# Step 2: find the scale with the most local maxima (or local minima)

# - row-wise summation (i.e. sum each row)
if options["find_pks"]:
gamma_max = np.sum(m_max, axis=1) # the "axis=1" option makes it row-wise
if options["find_trs"]:
gamma_min = np.sum(m_min, axis=1)
# - find scale with the most local maxima (or local minima)
if options["find_pks"]:
lambda_max = np.argmax(gamma_max)
if options["find_trs"]:
lambda_min = np.argmax(gamma_min)

# Step 3: Use lambda to remove all elements of m for which k>lambda
first_scale_to_include = np.argmax(init_scales_inc_log)
if options["find_pks"]:
m_max = m_max[first_scale_to_include:lambda_max + 1, :]
if options["find_trs"]:
m_min = m_min[first_scale_to_include:lambda_min + 1, :]

# Step 4: Find peaks (and onsets)
# - column-wise summation
if options["find_pks"]:
m_max_sum = np.sum(m_max == False, axis=0)
peaks = np.where(m_max_sum == 0)[0].astype(int)
else:
peaks = []

if options["find_trs"]:
m_min_sum = np.sum(m_min == False, axis=0)
onsets = np.where(m_min_sum == 0)[0].astype(int)
else:
onsets = []

return peaks, onsets

# ~~~ Main function ~~~

# Specify settings
# - version: optimal selection (CinC 2024)
options = {
'find_trs': True, # whether or not to find onsets
'find_pks': True, # whether or not to find peaks
'do_ds': True, # whether or not to do downsampling
'ds_freq': 20, # the target downsampling frequency
'use_reduced_lms_scales': True, # whether or not to reduce the number of scales (default 30 bpm)
'win_len': 8, # duration of individual windows for analysis
'win_overlap': 0.2, # proportion of window overlap
'plaus_hr_bpm': [30, 200] # range of plausible HRs (only the lower bound is used)
}

# Split into overlapping windows
no_samps_in_win = options["win_len"] * sampling_rate
if len(signal) <= no_samps_in_win:
win_starts = 0
win_ends = len(signal) - 1
else:
win_offset = round(no_samps_in_win * (1 - options["win_overlap"]))
win_starts = list(range(0, len(signal) - no_samps_in_win + 1, win_offset))
win_ends = [start + 1 + no_samps_in_win for start in win_starts]
if win_ends[-1] < len(signal):
win_starts.append(len(signal) - 1 - no_samps_in_win)
win_ends.append(len(signal))
# this ensures that the windows include the entire signal duration

# Set up downsampling if the sampling frequency is particularly high
if options["do_ds"]:
min_fs = options["ds_freq"]
if sampling_rate > min_fs:
ds_factor = int(np.floor(sampling_rate / min_fs))
ds_fs = sampling_rate / np.floor(sampling_rate / min_fs)
else:
options["do_ds"] = False

# detect peaks and onsets in each window
peaks = []
onsets = []

# cycle through each window
for win_no in range(len(win_starts)):
# Extract this window's data
win_sig = signal[win_starts[win_no]:win_ends[win_no]]

# Downsample signal
if options['do_ds']:
rel_sig = downsample(win_sig, ds_factor)
rel_fs = ds_fs
else:
rel_sig = win_sig
rel_fs = sampling_rate

# Detect peaks and onsets
p, t = detect_peaks_and_onsets_using_msptd(rel_sig, rel_fs, options)

# Resample peaks
if options['do_ds']:
p = [peak * ds_factor for peak in p]
t = [onset * ds_factor for onset in t]

# Correct peak indices by finding highest point within tolerance either side of detected peaks
tol_durn = 0.05
if rel_fs < 10:
tol_durn = 0.2
elif rel_fs < 20:
tol_durn = 0.1
tol = int(np.ceil(rel_fs * tol_durn))

for pk_no in range(len(p)):
segment = win_sig[(p[pk_no] - tol):(p[pk_no] + tol + 1)]
temp = np.argmax(segment)
p[pk_no] = p[pk_no] - tol + temp

# Correct onset indices by finding highest point within tolerance either side of detected onsets
for onset_no in range(len(t)):
segment = win_sig[(t[onset_no] - tol):(t[onset_no] + tol + 1)]
temp = np.argmin(segment)
t[onset_no] = t[onset_no] - tol + temp

# Store peaks and onsets
win_peaks = [peak + win_starts[win_no] for peak in p]
peaks.extend(win_peaks)
win_onsets = [onset + win_starts[win_no] for onset in t]
onsets.extend(win_onsets)

# Tidy up detected peaks and onsets (by ordering them and only retaining unique ones)
peaks = sorted(set(peaks))
onsets = sorted(set(onsets))

# Plot results (optional)
if show:
_, ax0 = plt.subplots(nrows=1, ncols=1, sharex=True)
ax0.plot(signal, label="signal")
ax0.scatter(peaks, signal[peaks], c="r")
ax0.scatter(onsets, signal[onsets], c="b")
ax0.set_title("PPG Onsets (Method by Charlton et al., 2024)")

return peaks, onsets

0 comments on commit ee87db7

Please sign in to comment.