Source code for pybert.utility.jitter

"""
Jitter utilities for PyBERT.

Original author: David Banas <capn.freako@gmail.com>

Original date:   June 16, 2024

Copyright (c) 2024 David Banas; all rights reserved World wide.

A partial extraction of the old `pybert/utility.py`, as part of a refactoring.
"""

from typing import Optional

import numpy as np
from numpy import (  # type: ignore
    argmax, array, concatenate, diag, diff, flip,
    histogram, mean, ones, real, reshape, resize, sign,
    sort, sqrt, where, zeros
)
from numpy.fft import fft, ifft  # type: ignore
from scipy.interpolate import interp1d
from scipy.optimize import curve_fit

from ..common import Rvec

from .math import gaus_pdf
from .sigproc import moving_average

debug          = False


[docs] def find_crossing_times( # pylint: disable=too-many-arguments,too-many-positional-arguments t: Rvec, x: Rvec, min_delay: float = 0.0, rising_first: bool = True, min_init_dev: float = 0.1, thresh: float = 0.0, ) -> Rvec: """ Finds the threshold crossing times of the input signal. Args: t: Vector of sample times. Intervals do NOT need to be uniform. x: Sampled input vector. Keyword Args: min_delay: Minimum delay required, before allowing crossings. (Helps avoid false crossings at beginning of signal.) Default: 0 rising_first: When True, start with the first rising edge found. When this option is True, the first rising edge crossing is the first crossing returned. This is the desired behavior for PyBERT, because we always initialize the bit stream with [0, 0, 1, 1], in order to provide a known synchronization point for jitter analysis. Default: True min_init_dev: The minimum initial deviation from zero, which must be detected before searching for crossings. Normalized to maximum input signal magnitude. Default: 0.1 thresh: Vertical crossing threshold. Returns: Array of signal threshold crossing times. """ if len(t) != len(x): raise ValueError(f"len(t) ({len(t)}) and len(x) ({len(x)}) need to be the same.") t = array(t) x = array(x) try: max_mag_x = np.max(abs(x)) except Exception: # pylint: disable=broad-exception-caught print("len(x):", len(x)) raise min_mag_x = min_init_dev * max_mag_x i = 0 while abs(x[i]) < min_mag_x: i += 1 if i >= len(x): raise RuntimeError("Input signal minimum deviation not detected!") x = x[i:] - thresh t = t[i:] sign_x = sign(x) sign_x = where(sign_x, sign_x, ones(len(sign_x))) # "0"s can produce duplicate xings. diff_sign_x = diff(sign_x) xing_ix = where(diff_sign_x)[0] xings = [t[i] + (t[i + 1] - t[i]) * x[i] / (x[i] - x[i + 1]) for i in xing_ix] if not xings: return array([]) i = 0 if min_delay: if min_delay >= xings[-1]: raise RuntimeError(f"min_delay ({min_delay}) must be less than last crossing time ({xings[-1]}).") while xings[i] < min_delay: i += 1 if debug: print(f"min_delay: {min_delay}") print(f"rising_first: {rising_first}") print(f"i: {i}") print(f"max_mag_x: {max_mag_x}") print(f"min_mag_x: {min_mag_x}") print(f"xings[0]: {xings[0]}") print(f"xings[i]: {xings[i]}") try: if rising_first and diff_sign_x[xing_ix[i]] < 0.0: i += 1 except Exception: # pylint: disable=broad-exception-caught print("len(diff_sign_x):", len(diff_sign_x)) print("len(xing_ix):", len(xing_ix)) print("i:", i) raise return array(xings[i:])
[docs] def find_crossings( # pylint: disable=too-many-arguments,too-many-positional-arguments t: Rvec, x: Rvec, amplitude: float = 1.0, min_delay: float = 0.0, rising_first: bool = True, min_init_dev: float = 0.1, mod_type: int = 0 ) -> Rvec: """ Find the crossing times in a signal, according to the modulation type. Args: t: The times associated with each signal sample. x: The signal samples. Keyword Args: amplitude: The nominal signal amplitude. (Used for determining thresholds, in the case of some modulation types.) Default: 1.0 min_delay(float): The earliest possible sample time we want returned. Default: 0 rising_first: When True, start with the first rising edgefound. When this option is True, the first rising edge crossing is the first crossing returned. This is the desired behavior for PyBERT, because we always initialize the bit stream with [0, 0, 1, 1], in order to provide a known synchronization point for jitter analysis. Default: True min_init_dev: The minimum initial deviation from zero, which must be detected before searching for crossings. Normalized to maximum input signal magnitude. Default: 0.1 mod_type: The modulation type. Allowed values are: {0: NRZ, 1: Duo-binary, 2: PAM-4} Default: 0 Returns: The signal threshold crossing times. """ if mod_type not in [0, 1, 2]: raise ValueError(f"ERROR: pybert_util.find_crossings(): Unknown modulation type: {mod_type}") xings = [] if mod_type == 0: # NRZ xings.append( find_crossing_times(t, x, min_delay=min_delay, rising_first=rising_first, min_init_dev=min_init_dev) ) elif mod_type == 1: # Duo-binary xings.append( find_crossing_times( t, x, min_delay=min_delay, rising_first=rising_first, min_init_dev=min_init_dev, thresh=(-0.5 * amplitude), ) ) xings.append( find_crossing_times( t, x, min_delay=min_delay, rising_first=rising_first, min_init_dev=min_init_dev, thresh=(0.5 * amplitude), ) ) elif mod_type == 2: # PAM-4 (Enabling the +/-0.67 cases yields multiple ideal crossings at the same edge.) xings.append( find_crossing_times( t, x, min_delay=min_delay, rising_first=rising_first, min_init_dev=min_init_dev, thresh=(0.0 * amplitude), ) ) else: raise ValueError(f"Unknown modulation type: {mod_type}") return sort(concatenate(xings))
[docs] def calc_jitter( # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements,too-many-positional-arguments ui: float, nui: int, pattern_len: int, ideal_xings: Rvec, actual_xings: Rvec, rel_thresh: float = 3.0, num_bins: int = 101, zero_mean: bool = True, dbg_obj: Optional[object] = None, smooth_width: int = 5 ) -> tuple[Rvec, Rvec, float, float, float, float, float, float, Rvec, Rvec, Rvec, Rvec, Rvec, Rvec, Rvec, Rvec, float, float]: """ Calculate the jitter in a set of actual zero crossings, given the ideal crossings and unit interval. Args: ui: The nominal unit interval (s). nui: The number of unit intervals spanned by the input signal. pattern_len: The number of unit intervals, before input symbol stream repeats. ideal_xings: The ideal zero crossing locations of the edges (s). actual_xings: The actual zero crossing locations of the edges (s). Keyword Args: rel_thresh: The threshold for determining periodic jitter spectral components (sigma). Default: 3.0 num_bins: The number of bins to use, when forming histograms. Default: 101 zero_mean: Force the mean jitter to zero, when True. Default: True dbg_obj: Object for stashing debugging info. Default: None smooth_width: Width of smoothing window to use when calculating moving averages. Default: 5 Returns: A tuple containing - The total jitter. - The times (taken from 'ideal_xings') corresponding to the returned jitter values. - The peak to peak jitter due to intersymbol interference (ISI). - The peak to peak jitter due to duty cycle distortion (DCD). - The peak to peak jitter due to uncorrelated periodic sources (Pj). - The standard deviation of the jitter due to uncorrelated unbounded random sources (Rj). - Dual-Dirac peak to peak jitter. - Dual-Dirac random jitter. - The data independent jitter. - Threshold for determining periodic components. - The spectral magnitude of the total jitter. - The spectral magnitude of the data independent jitter. - The frequencies corresponding to the spectrum components. - The smoothed histogram of the total jitter. - The smoothed histogram of the data-independent jitter. - The bin center values for both histograms. - The mean of the Gaussian distribution best fitted to the right tail. - The mean of the Gaussian distribution best fitted to the left tail. Raises: ValueError: If input checking fails, or curve fitting goes awry. Notes: 1. The actual crossings should arrive pre-aligned to the ideal crossings. And both should start near zero time. """ # Check inputs. if not ideal_xings.all(): raise ValueError("calc_jitter(): zero length ideal crossings vector received!") if not actual_xings.all(): raise ValueError("calc_jitter(): zero length actual crossings vector received!") num_patterns = nui // pattern_len if num_patterns == 0: raise ValueError("\n".join([ "Need at least one full pattern repetition!", f"(pattern_len: {pattern_len}; nui: {nui})",])) xings_per_pattern = where(ideal_xings > (pattern_len * ui))[0][0] if xings_per_pattern % 2 or not xings_per_pattern: raise ValueError("\n".join([ "pybert.utility.calc_jitter(): Odd number of (or, no) crossings per pattern detected!", f"xings_per_pattern: {xings_per_pattern}", f"min(ideal_xings): {min(ideal_xings)}",])) # Assemble the TIE track. i = 0 jitterL = [] t_jitter = [] skip_next_ideal_xing = False for ideal_xing in ideal_xings: if skip_next_ideal_xing: t_jitter.append(ideal_xing) skip_next_ideal_xing = False continue # Confine our attention to those actual crossings occuring # within the interval [-UI/2, +UI/2] centered around the # ideal crossing. min_t = ideal_xing - ui / 2.0 max_t = ideal_xing + ui / 2.0 while i < len(actual_xings) and actual_xings[i] < min_t: i += 1 if i == len(actual_xings): # We've exhausted the list of actual crossings; we're done. break if actual_xings[i] > max_t: # Means the xing we're looking for didn't occur, in the actual signal. jitterL.append( 3.0 * ui / 4.0) # Pad the jitter w/ alternating +/- 3UI/4. # noqa: E201 jitterL.append(-3.0 * ui / 4.0) # (Will get pulled into [-UI/2, UI/2], later. skip_next_ideal_xing = True # If we missed one, we missed two. else: # Noise may produce several crossings. xings = [] # We find all those within the interval [-UI/2, +UI/2] j = i # centered around the ideal crossing, and take the average. while j < len(actual_xings) and actual_xings[j] <= max_t: xings.append(actual_xings[j]) j += 1 tie = mean(xings) - ideal_xing jitterL.append(tie) t_jitter.append(ideal_xing) jitter = array(jitterL) # ToDo: Report this in the status bar. if len(jitter) == 0 or len(t_jitter) == 0: print("No crossings found!", flush=True) if debug: print("mean(jitter):", mean(jitter)) print("len(jitter):", len(jitter)) if zero_mean: jitter -= mean(jitter) # Do the jitter decomposition. # - Separate the rising and falling edges, shaped appropriately for averaging over the pattern period. tie_risings = jitter.take(list(range(0, len(jitter), 2))) # type: ignore tie_fallings = jitter.take(list(range(1, len(jitter), 2))) # type: ignore tie_risings.resize(num_patterns * xings_per_pattern // 2, refcheck=False) # type: ignore tie_fallings.resize(num_patterns * xings_per_pattern // 2, refcheck=False) # type: ignore tie_risings = reshape(tie_risings, (num_patterns, xings_per_pattern // 2)) tie_fallings = reshape(tie_fallings, (num_patterns, xings_per_pattern // 2)) # - Use averaging to remove the uncorrelated components, before calculating data dependent components. tie_risings_ave = tie_risings.mean(axis=0) tie_fallings_ave = tie_fallings.mean(axis=0) isi = max(tie_risings_ave.ptp(), tie_fallings_ave.ptp()) isi = min(isi, ui) # Cap the ISI at the unit interval. dcd = abs(mean(tie_risings_ave) - mean(tie_fallings_ave)) # - Subtract the data dependent jitter from the original TIE track, in order to yield the data independent jitter. _jitter = jitter.copy() # "refcheck=False": accommodating Tox: _jitter.resize(num_patterns * xings_per_pattern, refcheck=False) # type: ignore tie_ave = resize(reshape(_jitter, (num_patterns, xings_per_pattern)).mean(axis=0), len(jitter)) tie_ind = jitter - tie_ave if zero_mean: tie_ind -= mean(tie_ind) # - Calculate the total and data-independent jitter spectrums, for display purposes only. # -- Calculate the relevant time/frequency vectors. osf = 1 # jitter oversampling factor t0 = ui / osf # jitter sampling period t = array([n * t0 for n in range(nui * osf)]) # jitter samples time vector f0 = 1.0 / (ui * nui) # jitter samples fundamental frequency _f = [n * f0 for n in range(len(t) // 2)] # [0:f0:fNyquist) f = array(_f + [1 / (2 * t0)] + list(-1 * flip(array(_f[1:])))) # [0:f0:fN) ++ [fN:-f0:0) half_len = len(f) // 2 # for spectrum plotting convenience # -- Make TIE vector uniformly sampled in time, via interpolation, for use as input to `fft()`. # spl = UnivariateSpline(t_jitter, jitter) # Way of the future, but does funny things. :( try: spl = interp1d(t_jitter, jitter, bounds_error=False, fill_value="extrapolate") tie_interp = spl(t) y = fft(tie_interp) jitter_spectrum = abs(y[:half_len]) except Exception as err: # pylint: disable=broad-exception-caught print(f"t_jitter: {t_jitter}") print(f"jitter: {jitter}") print(f"Error calculating data dependent TIE: {err}", flush=True) jitter_spectrum = zeros(half_len) jitter_freqs = f[:half_len] # -- Repeat for data-independent jitter. try: spl = interp1d(t_jitter, tie_ind, bounds_error=False, fill_value="extrapolate") tie_ind_interp = spl(t) y = fft(tie_ind_interp) y_mag = abs(y) tie_ind_spectrum = y_mag[:half_len] except Exception as err: # pylint: disable=broad-exception-caught print(f"t_jitter: {t_jitter}") print(f"tie_ind: {tie_ind}") print(f"Error calculating data independent TIE: {err}", flush=True) y = zeros(half_len) # type: ignore y_mag = zeros(half_len) tie_ind_spectrum = zeros(half_len) # -- Perform spectral extraction of Pj from the data independent jitter, # -- using a threshold based on a moving average to identify the periodic components. win_width = 100 y_mean = moving_average(y_mag, n=win_width) y_var = moving_average((y_mag - y_mean) ** 2, n=win_width) y_sigma = sqrt(y_var) thresh = y_mean + rel_thresh * y_sigma y_per = where(y_mag > thresh, y, zeros(len(y))) # Periodic components are those lying above the threshold. y_rnd = where(y_mag > thresh, zeros(len(y)), y) # Random components are those lying below. tie_per = real(ifft(y_per)) pj = np.ptp(tie_per) tie_rnd = real(ifft(y_rnd)) rj = sqrt(mean((tie_rnd - tie_rnd.mean())**2)) # -- Do dual Dirac fitting of the data-independent jitter histogram, to determine Pj/Rj. # --- Generate a smoothed version of the TIE histogram, for better peak identification. # --- (Have to work in ps when curve fitting, or `curve_fit()` blows up.) use_my_hist = True # False will yield misleading jitter distribution plots! def my_hist(x, density=False): """ Calculates the probability *mass* function (PMF) of the input vector (or, the probability *density* function (PDF) if so directed), enforcing an output range of [-UI/2, +UI/2], sweeping everything in [-UI, -UI/2] into the first bin, and everything in [UI/2, UI] into the last bin. """ bin_edges = array([-ui] + [-ui / 2.0 + i * ui / (num_bins - 2) for i in range(num_bins - 1)] + [ui]) bin_centers = [-ui / 2] + list((bin_edges[1:-2] + bin_edges[2:-1]) / 2) + [ui / 2] hist, _ = histogram(x, bin_edges) hist = hist / hist.sum() # PMF if density: hist /= diff(bin_edges) return (hist, bin_centers, bin_edges) if use_my_hist: hist_ind, centers, edges = my_hist(tie_ind, density=True) hist_tot, _, _ = my_hist(jitter, density=True) centers = array(centers) else: hist_ind, edges = histogram(tie_ind, bins=num_bins, density=True) hist_tot, _ = histogram(jitter, bins=num_bins, density=True) centers = (edges[:-1] + edges[1:]) / 2 hist_ind_smooth = array(moving_average(hist_ind, n=smooth_width)) hist_tot_smooth = array(moving_average(hist_tot, n=smooth_width)) hist_dd = hist_tot_smooth # Trying to avoid any residual peak at zero, which can confuse the algorithm: center_ix = (num_bins - 1) / 2 # May be fractional. peak_ixs = array(list(filter(lambda x: abs(x - center_ix) > 1, where(diff(sign(diff(hist_dd))) < 0)[0] + 1))) neg_peak_ixs = list(filter(lambda x: x < center_ix, peak_ixs)) if neg_peak_ixs: neg_peak_loc = neg_peak_ixs[argmax(hist_dd[neg_peak_ixs])] else: neg_peak_loc = int(center_ix) pos_peak_ixs = list(filter(lambda x: x > center_ix, peak_ixs)) if pos_peak_ixs: pos_peak_loc = pos_peak_ixs[argmax(hist_dd[pos_peak_ixs])] else: pos_peak_loc = int(center_ix) pjDD = centers[pos_peak_loc] - centers[neg_peak_loc] # --- Stash debugging info if an object was provided. if dbg_obj: dbg_obj.hist_ind_smooth = hist_ind_smooth # type: ignore dbg_obj.centers = centers # type: ignore dbg_obj.hist_ind = hist_ind # type: ignore dbg_obj.peak_ixs = peak_ixs # type: ignore dbg_obj.neg_peak_ixs = neg_peak_ixs # type: ignore dbg_obj.pos_peak_ixs = pos_peak_ixs # type: ignore # --- Fit the tails and average the results, to determine Rj. pos_max = hist_dd[pos_peak_loc] neg_max = hist_dd[neg_peak_loc] dd_soltn = [[pos_max, neg_max],] pos_tail_ixs = where(hist_dd[pos_peak_loc:] < pos_max / 2)[0] neg_tail_ixs = where(hist_dd[:neg_peak_loc] < neg_max / 2)[0] if len(pos_tail_ixs) > 0 and len(neg_tail_ixs) > 0: pos_tail_ix = pos_tail_ixs[0] + pos_peak_loc # index of first piece of right tail neg_tail_ix = neg_tail_ixs[-1] # index of last piece of left tail dd_soltn[0].append(pos_tail_ix) dd_soltn[0].append(neg_tail_ix) # Don't send first or last histogram elements to curve fitter, due to their special nature. try: popt, pcov = curve_fit(gaus_pdf, centers[pos_tail_ix:-1] * 1e12, hist_dd[pos_tail_ix:-1] * 1e-12) mu_pos, sigma_pos = popt mu_pos *= 1e-12 # back to (s) sigma_pos *= 1e-12 err_pos = sqrt(diag(pcov)) * 1e-12 dd_soltn += [[mu_pos, sigma_pos, err_pos],] except Exception as err: # pylint: disable=broad-exception-caught mu_pos = 0 sigma_pos = 0 dd_soltn += [[err],] try: popt, pcov = curve_fit(gaus_pdf, centers[1:neg_tail_ix] * 1e12, hist_dd[1:neg_tail_ix] * 1e-12) mu_neg, sigma_neg = popt mu_neg *= 1e-12 # back to (s) sigma_neg *= 1e-12 err_neg = sqrt(diag(pcov)) * 1e-12 dd_soltn += [[mu_neg, sigma_neg, err_neg],] except Exception as err: # pylint: disable=broad-exception-caught mu_neg = 0 sigma_neg = 0 dd_soltn += [[err],] else: mu_pos = 0 sigma_pos = 0 mu_neg = 0 sigma_neg = 0 rjDD = (sigma_pos + sigma_neg) / 2 if dbg_obj: dbg_obj.dd_soltn = dd_soltn # type: ignore return ( # pylint: disable=duplicate-code jitter, array(t_jitter), isi, dcd, float(pj), rj, pjDD, rjDD, tie_ind, thresh[:half_len], jitter_spectrum, tie_ind_spectrum, jitter_freqs, hist_tot_smooth, hist_ind_smooth, centers, # Returning just one requires `use_my_hist` True. mu_pos, mu_neg )