"""
Default controller definition for PyBERT class.
Original author: David Banas <capn.freako@gmail.com>
Original date: August 24, 2014 (Copied from pybert.py, as part of a major code cleanup.)
Copyright (c) 2014 David Banas; all rights reserved World wide.
"""
# pylint: disable=too-many-lines
from time import perf_counter
from typing import Any, Callable, Optional, TypeAlias
import numpy as np
import numpy.typing as npt
import scipy.signal as sig
from numpy import ( # type: ignore
argmax,
array,
concatenate,
convolve,
correlate,
diff,
float64,
histogram,
linspace,
mean,
repeat,
resize,
transpose,
where,
zeros,
)
from numpy.fft import rfft, irfft # type: ignore
from numpy.random import normal # type: ignore
from numpy.typing import NDArray # type: ignore
from scipy.signal import iirfilter, lfilter
from scipy.interpolate import interp1d
from pyibisami.ami.parser import AmiName, AmiNode, ami_parse
from pybert.models.dfe import DFE
from pybert.utility import (
calc_eye,
calc_jitter,
calc_resps,
find_crossings,
import_channel,
make_bathtub,
make_ctle,
run_ami_model,
safe_log10,
trim_impulse,
)
from pybert.models.viterbi import ViterbiDecoder_ISI
clock = perf_counter
AmiFloats: TypeAlias = tuple[AmiName, list["float | 'AmiFloats'"]]
DEBUG = False
MIN_BATHTUB_VAL = 1.0e-12
gFc = 1.0e6 # Corner frequency of high-pass filter used to model capacitive coupling of periodic noise.
# pylint: disable=too-many-locals,protected-access,too-many-branches,too-many-statements
[docs]
def my_run_simulation(self, initial_run: bool = False, update_plots: bool = True,
aborted_sim: Optional[Callable[[], bool]] = None):
"""
Runs the simulation.
Args:
self: Reference to an instance of the *PyBERT* class.
Keyword Args:
initial_run: If True, don't update the eye diagrams, since
they haven't been created, yet.
Default: False
update_plots: If True, update the plots, after simulation
completes. This option can be used by larger scripts, which
import *pybert*, in order to avoid graphical back-end
conflicts and speed up this function's execution time.
Default: True
aborted_sim: a function that is used to tell the simulation that the user
has requested to stop/abort the simulation.
Raises:
RuntimeError: If the simulation is aborted by the user or cannot continue.
Notes:
1. When using IBIS-AMI models, we often need to scale the impulse response
by the sample interval, or its inverse, because while IBIS-AMI models
take the impulse response to have units: (V/s), PyBERT uses: (V/sample).
"""
def _check_sim_status():
"""Checks the status of the simulation thread and if this simulation needs to stop."""
if aborted_sim and aborted_sim():
self.status = "Aborted Simulation"
raise RuntimeError("Simulation aborted by User.")
start_time = clock()
self.status = "Running channel..."
# The user sets `seed` to zero to indicate that she wants new bits generated for each run.
if not self.seed:
self.run_count += 1 # Force regeneration of bit stream.
# Pull class variables into local storage, performing unit conversion where necessary.
t = self.t
t_irfft = self.t_irfft
f = self.f
w = self.w
bits = self.bits
symbols = self.symbols
ffe = self.ffe
nbits = self.nbits
nui = self.nui
eye_bits = self.eye_bits
eye_uis = self.eye_uis
nspui = self.nspui
rn = self.rn
pn_mag = self.pn_mag
pn_freq = self.pn_freq * 1.0e6
pattern = self.pattern_
rx_bw = self.rx_bw * 1.0e9
peak_freq = self.peak_freq * 1.0e9
peak_mag = self.peak_mag
ctle_enable = self.ctle_enable
delta_t = self.delta_t * 1.0e-12
alpha = self.alpha
ui = self.ui
gain = self.gain
n_ave = self.n_ave
decision_scaler = self.decision_scaler
n_lock_ave = self.n_lock_ave
dfe_tap_tuners = self.dfe_tap_tuners
rel_lock_tol = self.rel_lock_tol
lock_sustain = self.lock_sustain
bandwidth = self.sum_bw * 1.0e9
rel_thresh = self.thresh
mod_type = self.mod_type[0]
impulse_length = self.impulse_length
# Calculate misc. values.
Ts = t[1]
ts = Ts
fs = 1 / ts
min_len = 30 * nspui
max_len = 100 * nspui
if impulse_length:
min_len = max_len = round(impulse_length / ts)
if mod_type == 2: # PAM-4
nspb = nspui // 2
else:
nspb = nspui
# Generate the ideal over-sampled signal.
#
# Duo-binary is problematic, in that it requires convolution with the ideal duobinary
# impulse response, in order to produce the proper ideal signal.
x = repeat(symbols, nspui)
ideal_signal = x
if mod_type == 1: # Handle duo-binary case.
duob_h = array(([0.5] + [0.0] * (nspui - 1)) * 2)
ideal_signal = convolve(x, duob_h)[: len(t)]
# Calculate the channel response, as well as its (hypothetical)
# solitary effect on the data, for plotting purposes only.
try:
split_time = clock()
chnl_h = self.calc_chnl_h()
_calc_chnl_time = clock() - split_time
# Note: We're not using 'self.ideal_signal', because we rely on the system response to
# create the duobinary waveform. We only create it explicitly, above,
# so that we'll have an ideal reference for comparison.
split_time = clock()
chnl_out = convolve(x, chnl_h)[: len(t)]
_conv_chnl_time = clock() - split_time
if self.debug:
self.log(f"Channel calculation time: {_calc_chnl_time}")
self.log(f"Channel convolution time: {_conv_chnl_time}")
self.channel_perf = nbits * nspb / (clock() - start_time)
except Exception as err:
self.status = f"Exception: channel: {err}"
raise
self.chnl_out = chnl_out
_check_sim_status()
split_time = clock()
self.status = "Running Tx..."
# Calculate Tx output power dissipation.
ffe_out = convolve(symbols, ffe)[: len(symbols)]
if self.use_ch_file:
self.rel_power = mean(ffe_out**2) / self.rs
else:
self.rel_power = mean(ffe_out**2) / self.Z0
# Generate the uncorrelated periodic noise. (Assume capacitive coupling.)
# Generate the ideal rectangular aggressor waveform.
pn_period = 1.0 / pn_freq
pn_samps = int(pn_period / Ts + 0.5)
pn = zeros(pn_samps)
pn[pn_samps // 2:] = pn_mag
self.pn_period = pn_period
self.pn_samps = pn_samps
pn = resize(pn, len(x))
# High pass filter it. (Simulating capacitive coupling.)
(b, a) = iirfilter(2, gFc / (fs / 2), btype="highpass")
pn = lfilter(b, a, pn)[: len(pn)]
self.pn = pn
noise = pn + normal(scale=rn, size=(len(x),))
self.noise = noise
# Tx and Rx modeling are not separable in all cases.
# So, we model each of the 4 possible combinations explicitly.
# For the purposes of tallying possible combinations,
# AMI Init() and PyBERT native are equivalent,
# as both rely on convolving w/ impulse responses.
def get_ctle_h():
"Return the impulse response of the PyBERT native CTLE model."
if self.use_ctle_file:
# FIXME: The new import_channel() implementation breaks this.
ctle_h = import_channel(self.ctle_file, ts, f)
if max(abs(ctle_h)) < 100.0: # step response?
ctle_h = diff(ctle_h) # impulse response is derivative of step response.
else:
ctle_h *= ts # Normalize to (V/sample)
ctle_h.resize(len(t))
ctle_H = rfft(ctle_h) # ToDo: This needs interpolation first.
else:
if ctle_enable:
_, ctle_H = make_ctle(rx_bw, peak_freq, peak_mag, w)
_ctle_h = irfft(ctle_H)
krnl = interp1d(t_irfft, _ctle_h, bounds_error=False, fill_value=0)
ctle_h = krnl(t)
ctle_h *= sum(_ctle_h) / sum(ctle_h)
ctle_h, _ = trim_impulse(ctle_h, front_porch=False, min_len=min_len, max_len=max_len)
else:
ctle_h = array([1.] + [0. for _ in range(min_len - 1)])
return ctle_h
ctle_s = None
clock_times = None
try:
params: list[str] = []
if self.tx_use_ami and self.tx_use_getwave:
tx_out, _, tx_h, tx_out_h, msg, _params = run_ami_model(
self.tx_dll_file, self._tx_cfg, True, ui, ts, chnl_h, x)
params = _params
self.log(f"Tx IBIS-AMI model initialization results:\n{msg}")
tx_getwave_params = list(map(ami_parse, params))
self.log(f"Tx IBIS-AMI model GetWave() output parameters:\n{tx_getwave_params}")
rx_in = convolve(tx_out + noise, chnl_h)[:len(tx_out)]
# Calculate the remaining responses from the impulse responses.
tx_s, tx_p, tx_H = calc_resps(t, tx_h, ui, f)
tx_out_s, tx_out_p, tx_out_H = calc_resps(t, tx_out_h, ui, f)
self.tx_perf = nbits * nspb / (clock() - split_time)
split_time = clock()
self.status = "Running CTLE..."
if self.rx_use_ami and self.rx_use_getwave:
ctle_out, _, ctle_h, ctle_out_h, msg, _params = run_ami_model(
self.rx_dll_file, self._rx_cfg, True, ui, ts, tx_out_h, convolve(tx_out, chnl_h))
params = _params
self.log(f"Rx IBIS-AMI model initialization results:\n{msg}")
_rx_getwave_params = list(map(ami_parse, params))
self.log(f"Rx IBIS-AMI model GetWave() output parameters:\n{_rx_getwave_params}")
else: # Rx is either AMI_Init() or PyBERT native.
if self.rx_use_ami: # Rx Init()
_, _, ctle_h, ctle_out_h, msg, _ = run_ami_model(
self.rx_dll_file, self._rx_cfg, False, ui, ts, chnl_h, tx_out)
self.log(f"Rx IBIS-AMI model initialization results:\n{msg}")
ctle_out = convolve(tx_out, ctle_out_h)[:len(tx_out)]
else: # PyBERT native Rx
ctle_h = get_ctle_h()
ctle_out_h = convolve(ctle_h, tx_out_h)[:len(ctle_h)]
ctle_out = convolve(tx_out, convolve(ctle_h, chnl_h))[:len(tx_out)]
else: # Tx is either AMI_Init() or PyBERT native.
if self.tx_use_ami: # Tx is AMI_Init().
rx_in, _, tx_h, tx_out_h, msg, _ = run_ami_model(
self.tx_dll_file, self._tx_cfg, False, ui, ts, chnl_h, x)
self.log(f"Tx IBIS-AMI model initialization results:\n{msg}")
rx_in += noise
else: # Tx is PyBERT native.
# Using `sum` to concatenate:
tx_h = array(sum([[x] + list(zeros(nspui - 1)) for x in ffe], []))
tx_h.resize(len(chnl_h), refcheck=False) # "refcheck=False", to get around Tox failure.
tx_out_h = convolve(tx_h, chnl_h)[: len(chnl_h)]
rx_in = convolve(x, tx_out_h)[:len(x)] + noise
# Calculate the remaining responses from the impulse responses.
tx_s, tx_p, tx_H = calc_resps(t, tx_h, ui, f)
tx_out_s, tx_out_p, tx_out_H = calc_resps(t, tx_out_h, ui, f)
self.tx_perf = nbits * nspb / (clock() - split_time)
split_time = clock()
self.status = "Running CTLE..."
if self.rx_use_ami and self.rx_use_getwave:
ctle_out, clock_times, ctle_h, ctle_out_h, msg, _params = run_ami_model(
self.rx_dll_file, self._rx_cfg, True, ui, ts, tx_out_h, rx_in)
params = _params
self.log(f"Rx IBIS-AMI model initialization results:\n{msg}")
# Time evolution of (<root_name>: AmiName, <param_vals>: list[AmiNode]):
# (i.e. - There can be no `AmiAtom`s in the root tuple's second member.)
rx_getwave_params: list[tuple[AmiName, list[AmiNode]]] = list(map(ami_parse, params))
param_vals = {}
def isnumeric(x):
try:
_ = float(x)
return True
except: # noqa: E722, pylint: disable=bare-except
return False
def get_numeric_values(prefix: AmiName, node: AmiNode) -> dict[AmiName, list[np.float64]]:
"Retrieve all numeric values from an AMI node, encoding hierarchy in key names."
pname = node[0]
vals = node[1]
pname_hier = AmiName(prefix + pname)
first_val = vals[0]
if isnumeric(first_val):
return {pname_hier: list(map(float, vals))} # type: ignore
if type(first_val) == AmiNode: # noqa: E721, pylint: disable=unidiomatic-typecheck
subdicts = list(map(lambda nd: get_numeric_values(pname_hier, nd), vals)) # type: ignore
rslt = {}
for subdict in subdicts:
rslt.update(subdict)
return rslt
return {}
for nd in rx_getwave_params[0][1]:
param_vals.update(get_numeric_values(AmiName(""), nd))
for rslt in rx_getwave_params[1:]:
for nd in rslt[1]:
vals_dict = get_numeric_values(AmiName(""), nd)
for pname, pvals in vals_dict.items():
param_vals[pname].extend(pvals)
_tap_weights = []
dfe_tap_keys: list[AmiName] = list(filter(lambda s: s.tolower().contains("tap"), param_vals.keys())) # type: ignore
dfe_tap_keys.sort()
for dfe_tap_key in dfe_tap_keys:
_tap_weights.append(param_vals[dfe_tap_key])
tap_weights: list[list[float]] = list(array(_tap_weights).transpose())
if "cdr_locked" in param_vals:
_lockeds: npt.NDArray[np.float64] = array(param_vals[AmiName("cdr_locked")])
_lockeds = _lockeds.repeat(len(t) // len(_lockeds))
_lockeds.resize(len(t))
else:
_lockeds = zeros(len(t))
lockeds: list[bool] = list(map(bool, _lockeds))
if "cdr_ui" in param_vals:
ui_ests: npt.NDArray[np.float64] = array(param_vals[AmiName("cdr_ui")])
ui_ests = ui_ests.repeat(len(t) // len(ui_ests))
ui_ests.resize(len(t))
else:
ui_ests = zeros(len(t))
else: # Rx is either AMI_Init() or PyBERT native.
if self.rx_use_ami: # Rx Init()
ctle_out, _, ctle_h, ctle_out_h, msg, _ = run_ami_model(
self.rx_dll_file, self._rx_cfg, False, ui, ts, tx_out_h, x)
self.log(f"Rx IBIS-AMI model initialization results:\n{msg}")
ctle_out += noise
else: # PyBERT native Rx
if ctle_enable:
ctle_h = get_ctle_h()
ctle_out_h = convolve(tx_out_h, ctle_h)[:len(tx_out_h)]
ctle_out = convolve(rx_in, ctle_h)[:len(rx_in)]
else:
ctle_h = array([1.] + [0.] * (min_len - 1))
ctle_out_h = tx_out_h
ctle_out = rx_in
except Exception as err:
self.status = f"Exception: {err}"
raise
# Calculate the remaining responses from the impulse responses.
if ctle_s is None:
ctle_s, ctle_p, ctle_H = calc_resps(t, ctle_h, ui, f)
else:
_, ctle_p, ctle_H = calc_resps(t, ctle_h, ui, f)
ctle_out_s, ctle_out_p, ctle_out_H = calc_resps(t, ctle_out_h, ui, f)
# Calculate convolutional delay.
ctle_out.resize(len(t), refcheck=False)
ctle_out_h_main_lobe = where(ctle_out_h >= max(ctle_out_h) / 2.0)[0]
if ctle_out_h_main_lobe.size:
conv_dly_ix = ctle_out_h_main_lobe[0]
else:
conv_dly_ix = int(self.chnl_dly // Ts)
conv_dly = t[conv_dly_ix]
# Stash needed intermediate results, as instance variables.
self.tx_h = tx_h
self.tx_s = tx_s
self.tx_p = tx_p
self.tx_H = tx_H
self.tx_out_h = tx_out_h
self.tx_out_s = tx_out_s
self.tx_out_p = tx_out_p
self.tx_out_H = tx_out_H
self.ideal_signal = ideal_signal
self.rx_in = rx_in
self.ctle_h = ctle_h
self.ctle_s = ctle_s
self.ctle_p = ctle_p
self.ctle_H = ctle_H
self.ctle_out_h = ctle_out_h
self.ctle_out_s = ctle_out_s
self.ctle_out_p = ctle_out_p
self.ctle_out_H = ctle_out_H
self.ctle_out = ctle_out
self.conv_dly = conv_dly
self.conv_dly_ix = conv_dly_ix
self.ctle_perf = nbits * nspb / (clock() - split_time)
split_time = clock()
self.status = "Running FFE..."
_check_sim_status()
# FFE output and incremental/cumulative responses.
if any(tap.enabled for tap in self.rx_taps):
# Using `sum` to concatenate:
ffe_h = array(sum([[x.value] + list(zeros(nspui - 1)) for x in self.rx_taps], []))
else:
ffe_h = array([1] + [0] * (nspui - 1))
ffe_h.resize(len(chnl_h), refcheck=False) # "refcheck=False", to get around Tox failure.
ffe_out_h = convolve(ffe_h, ctle_out_h)[: len(chnl_h)]
ffe_out = convolve(ctle_out, ffe_h)[:len(ctle_out)]
# Calculate the remaining responses from the impulse responses.
ffe_s, ffe_p, ffe_H = calc_resps(t, ffe_h, ui, f) # pylint: disable=unused-variable
ffe_out_s, ffe_out_p, ffe_out_H = calc_resps(t, ffe_out_h, ui, f) # pylint: disable=unused-variable
self.ffe_perf = nbits * nspb / (clock() - split_time)
split_time = clock()
self.status = "Running DFE/CDR..."
_check_sim_status()
# DFE output and incremental/cumulative responses.
if any(tap.enabled for tap in dfe_tap_tuners):
_gain = gain
_ideal = self.sum_ideal
_n_taps = len(dfe_tap_tuners)
else:
_gain = 0.0
_ideal = True
_n_taps = 0
limits = []
for tuner in self.dfe_tap_tuners:
if tuner.enabled:
limits.append((tuner.min_val, tuner.max_val))
else:
limits.append((0., 0.))
dfe = DFE(_n_taps, _gain, delta_t, alpha, ui, nspui, decision_scaler, mod_type,
n_ave=n_ave, n_lock_ave=n_lock_ave, rel_lock_tol=rel_lock_tol,
lock_sustain=lock_sustain, bandwidth=bandwidth, ideal=_ideal,
limits=limits)
if not (self.rx_use_ami and self.rx_use_getwave): # Use PyBERT native DFE/CDR.
dbg_dict: dict[str, Any] = {}
(dfe_out,
tap_weights,
ui_ests,
clocks,
lockeds,
sample_times,
bits_out) = dfe.run(t, ffe_out, use_agc=self.use_agc, dbg_dict=dbg_dict)
self.decision_scaler = dfe.decision_scaler
self.dfe_scalar_values = dbg_dict["scalar_values"]
else: # Process Rx IBIS-AMI GetWave() output.
# Process any valid clock times returned by Rx IBIS-AMI model's GetWave() function if apropos.
dfe_out = array(ffe_out) # In this case, `ffe_out` includes the effects of IBIS-AMI DFE.
dfe_out.resize(len(t))
t_ix = 0
_bits_out = []
clocks = zeros(len(t))
sample_times = []
if self.rx_use_clocks and clock_times is not None:
for clock_time in clock_times:
if clock_time == -1: # "-1" is used to flag "no more valid clock times".
break
sample_time = clock_time + ui / 2 # IBIS-AMI clock times are edge aligned.
while t_ix < len(t) and t[t_ix] < sample_time:
t_ix += 1
if t_ix >= len(t):
self.log("Went beyond system time vector end searching for next clock time!")
break
_, _bits = dfe.decide(ffe_out[t_ix])
_bits_out.extend(_bits)
clocks[t_ix] = 1
sample_times.append(sample_time)
# Process any remaining output, using inferred sampling instants.
if t_ix < (len(t) - 5 * nspui / 4):
# Starting at `nspui/4` handles either case:
# - starting at UI boundary, or
# - starting at last sampling instant.
next_sample_ix = t_ix + nspui // 4 + argmax([sum(abs(ffe_out[t_ix + nspui // 4 + k::nspui]))
for k in range(nspui)])
for t_ix in range(next_sample_ix, len(t), nspui):
_, _bits = dfe.decide(ffe_out[t_ix])
_bits_out.extend(_bits)
clocks[t_ix] = 1
sample_times.append(t[t_ix])
bits_out = array(_bits_out)
start_ix = max(0, len(bits_out) - eye_bits)
end_ix = len(bits_out)
auto_corr = (
1.0 * correlate(bits_out[start_ix: end_ix], bits[start_ix: end_ix], mode="same") / # noqa: W504
sum(bits[start_ix: end_ix])
)
auto_corr = auto_corr[len(auto_corr) // 2:]
self.auto_corr = auto_corr
bit_dly = where(auto_corr == max(auto_corr))[0][0]
first_ref_bit = nbits - eye_bits
bits_ref = bits[first_ref_bit:]
first_tst_bit = first_ref_bit + bit_dly
bits_tst = bits_out[first_tst_bit:]
if len(bits_ref) > len(bits_tst):
bits_ref = bits_ref[: len(bits_tst)]
elif len(bits_tst) > len(bits_ref):
bits_tst = bits_tst[: len(bits_ref)]
bit_errs = where(bits_tst ^ bits_ref)[0]
n_errs = len(bit_errs)
if n_errs and False: # pylint: disable=condition-evals-to-constant
self.log(f"pybert.models.bert.my_run_simulation(): Bit errors detected at indices: {bit_errs}.")
self.bit_errs = n_errs
if len(tap_weights) > 0:
dfe_h = array(
[1.0] + list(zeros(nspui - 1)) + # noqa: W504
sum([[-x] + list(zeros(nspui - 1)) for x in tap_weights[-1]], [])) # sum as concat
dfe_h.resize(len(ctle_out_h), refcheck=False)
else:
dfe_h = array([1.0] + list(zeros(nspui - 1)))
dfe_out_h = convolve(ffe_out_h, dfe_h)[: len(ffe_out_h)]
# Calculate the remaining responses from the impulse responses.
dfe_s, dfe_p, dfe_H = calc_resps(t, dfe_h, ui, f)
dfe_out_s, dfe_out_p, dfe_out_H = calc_resps(t, dfe_out_h, ui, f)
self.dfe_perf = nbits * nspb / (clock() - split_time)
split_time = clock()
_check_sim_status()
# Apply Viterbi decoder if apropos.
self.bit_errs_viterbi = -1 # `-1` flags that Viterbi was not run.
self.viterbi_perf = 0
if self.rx_use_viterbi:
self.status = "Running Viterbi..."
match mod_type:
case 0:
L = 2
case 1:
L = 3
case 2:
L = 4
case _:
raise ValueError(f"Unrecognized modulation type: {mod_type}!")
N = self.rx_viterbi_symbols
sigma = self.rn
pulse_resp_curs_ix = np.argmax(ffe_out_p)
pulse_resp_samps = np.array([ffe_out_p[pulse_resp_curs_ix + n * nspui] for n in range(N)])
decoder = ViterbiDecoder_ISI(L, N, sigma, pulse_resp_samps)
sig_samps = []
for sample_time in filter(lambda x: x <= t[-1], sample_times[first_tst_bit:]):
ix = np.where(t >= sample_time)[0][0]
sig_samps.append(ffe_out[ix])
if self.debug:
self.dbg_dict_viterbi = {}
path = decoder.decode(sig_samps, dbg_dict=self.dbg_dict_viterbi)
else:
path = decoder.decode(sig_samps)
symbols_viterbi = list(map(lambda ix: decoder.states[ix][0][-1], path))
if self.debug:
self.pulse_resp_samps = pulse_resp_samps
self.sig_samps = sig_samps
self.symbols_viterbi = symbols_viterbi
self.dbg_dict_viterbi["decoder"] = decoder
self.dbg_dict_viterbi["path"] = path
bits_tst_viterbi = concatenate(list(map(lambda ss: dfe.decide(ss)[1], symbols_viterbi)))
if len(bits_ref) > len(bits_tst_viterbi):
bits_ref = bits_ref[: len(bits_tst_viterbi)]
elif len(bits_tst_viterbi) > len(bits_ref):
bits_tst_viterbi = bits_tst_viterbi[: len(bits_ref)]
num_viterbi_bits = len(bits_tst_viterbi)
bit_errs_viterbi = where(bits_tst_viterbi ^ bits_ref)[0]
self.bit_errs_viterbi = len(bit_errs_viterbi)
self.viterbi_errs_ixs = bit_errs_viterbi
self.viterbi_perf = num_viterbi_bits * nspb / (clock() - split_time)
split_time = clock()
self.dfe_h = dfe_h
self.dfe_s = dfe_s
self.dfe_p = dfe_p
self.dfe_H = dfe_H
self.dfe_out_h = dfe_out_h
self.dfe_out_s = dfe_out_s
self.dfe_out_p = dfe_out_p
self.dfe_out_H = dfe_out_H
self.dfe_out = dfe_out
self.lockeds = lockeds
self.status = "Analyzing jitter..."
_check_sim_status()
# Save local variables to class instance for state preservation, performing unit conversion where necessary.
self.adaptation = tap_weights
self.ui_ests = array(ui_ests) * 1.0e12 # (ps)
self.clocks = clocks
self.clock_times = sample_times
# Analyze the jitter.
self.thresh_tx = array([])
self.jitter_ext_tx = array([])
self.jitter_tx = array([])
self.jitter_spectrum_tx = array([])
self.jitter_ind_spectrum_tx = array([])
self.thresh_ctle = array([])
self.jitter_ext_ctle = array([])
self.jitter_ctle = array([])
self.jitter_spectrum_ctle = array([])
self.jitter_ind_spectrum_ctle = array([])
self.thresh_dfe = array([])
self.jitter_ext_dfe = array([])
self.jitter_dfe = array([])
self.jitter_spectrum_dfe = array([])
self.jitter_ind_spectrum_dfe = array([])
self.f_MHz_dfe = array([])
self.jitter_rejection_ratio = array([])
# The pattern length must be doubled in the duo-binary and PAM-4 cases anyway, because:
# - in the duo-binary case, the XOR pre-coding can invert every other pattern rep., and
# - in the PAM-4 case, the bits are taken in pairs to form the symbols and we start w/ an odd # of bits.
# So, while it isn't strictly necessary, doubling it in the NRZ case as well provides a certain consistency.
pattern_len = (pow(2, max(pattern)) - 1) * 2
len_x_m1 = len(x) - 1
xing_min_t = (nui - eye_uis) * ui
def eye_xings(xings, ofst=0) -> NDArray[float64]:
"""
Return crossings from that portion of the signal used to generate the eye.
Args:
xings([float]): List of crossings.
Keyword Args:
ofst(float): Time offset to be subtracted from all crossings.
Returns:
[float]: Selected crossings, offset and eye-start corrected.
"""
_xings = array(xings) - ofst
return _xings[where(_xings > xing_min_t)] - xing_min_t
jit_chnl_done: bool = False
jit_tx_done: bool = False
jit_ctle_done: bool = False
jit_dfe_done: bool = False
try:
# - ideal
ideal_xings = find_crossings(t, ideal_signal, decision_scaler, mod_type=mod_type)
self.ideal_xings = ideal_xings
ideal_xings_jit = eye_xings(ideal_xings)
# - channel output
ofst = (argmax(sig.correlate(chnl_out, x)) - len_x_m1) * Ts
actual_xings = find_crossings(t, chnl_out, decision_scaler, mod_type=mod_type)
actual_xings_jit = eye_xings(actual_xings, ofst)
(
tie,
t_jitter,
isi,
dcd,
pj,
rj,
pjDD,
rjDD,
tie_ind,
thresh,
jitter_spectrum,
jitter_ind_spectrum,
spectrum_freqs,
hist,
hist_synth,
bin_centers,
mu_pos,
mu_neg,
) = calc_jitter(ui, eye_uis, pattern_len, ideal_xings_jit, actual_xings_jit, rel_thresh)
self.t_jitter = t_jitter
self.isi_chnl = isi
self.dcd_chnl = dcd
self.pj_chnl = pj
self.rj_chnl = rj
self.pjDD_chnl = pjDD
self.rjDD_chnl = rjDD
self.mu_pos_chnl = mu_pos
self.mu_neg_chnl = mu_neg
self.thresh_chnl = thresh
self.jitter_chnl = hist
self.jitter_ext_chnl = hist_synth
self.jitter_bins = bin_centers
self.jitter_spectrum_chnl = jitter_spectrum
self.jitter_ind_spectrum_chnl = jitter_ind_spectrum
self.f_MHz = array(spectrum_freqs) * 1.0e-6
self.ofst_chnl = ofst
self.tie_chnl = tie
self.tie_ind_chnl = tie_ind
jit_chnl_done = True
# - Tx output
ofst = (argmax(sig.correlate(rx_in, x)) - len_x_m1) * Ts
actual_xings = find_crossings(t, rx_in, decision_scaler, mod_type=mod_type)
actual_xings_jit = eye_xings(actual_xings, ofst)
(
tie,
t_jitter,
isi,
dcd,
pj,
rj,
pjDD,
rjDD,
tie_ind,
thresh,
jitter_spectrum,
jitter_ind_spectrum,
spectrum_freqs,
hist,
hist_synth,
bin_centers,
mu_pos,
mu_neg,
) = calc_jitter(ui, eye_uis, pattern_len, ideal_xings_jit, actual_xings_jit, rel_thresh)
self.isi_tx = isi
self.dcd_tx = dcd
self.pj_tx = pj
self.rj_tx = rj
self.pjDD_tx = pjDD
self.rjDD_tx = rjDD
self.mu_pos_tx = mu_pos
self.mu_neg_tx = mu_neg
self.thresh_tx = thresh
self.jitter_tx = hist
self.jitter_ext_tx = hist_synth
self.jitter_centers_tx = bin_centers
self.jitter_spectrum_tx = jitter_spectrum
self.jitter_ind_spectrum_tx = jitter_ind_spectrum
self.jitter_freqs_tx = spectrum_freqs
self.t_jitter_tx = t_jitter
self.tie_tx = tie
self.tie_ind_tx = tie_ind
jit_tx_done = True
# - CTLE output
ofst = (argmax(sig.correlate(ctle_out, x)) - len_x_m1) * Ts
actual_xings = find_crossings(t, ctle_out, decision_scaler, mod_type=mod_type)
actual_xings_jit = eye_xings(actual_xings, ofst)
(
tie,
t_jitter,
isi,
dcd,
pj,
rj,
pjDD,
rjDD,
tie_ind,
thresh,
jitter_spectrum,
jitter_ind_spectrum,
spectrum_freqs,
hist,
hist_synth,
bin_centers,
mu_pos,
mu_neg,
) = calc_jitter(ui, eye_uis, pattern_len, ideal_xings_jit, actual_xings_jit, rel_thresh)
self.isi_ctle = isi
self.dcd_ctle = dcd
self.pj_ctle = pj
self.rj_ctle = rj
self.pjDD_ctle = pjDD
self.rjDD_ctle = rjDD
self.mu_pos_ctle = mu_pos
self.mu_neg_ctle = mu_neg
self.thresh_ctle = thresh
self.jitter_ctle = hist
self.jitter_ext_ctle = hist_synth
self.jitter_spectrum_ctle = jitter_spectrum
self.jitter_ind_spectrum_ctle = jitter_ind_spectrum
self.tie_ctle = tie
self.tie_ind_ctle = tie_ind
jit_ctle_done = True
# - DFE output
ofst = (argmax(sig.correlate(dfe_out, x)) - len_x_m1) * Ts
actual_xings = find_crossings(t, dfe_out, decision_scaler, mod_type=mod_type)
actual_xings_jit = eye_xings(actual_xings, ofst)
(
tie,
t_jitter,
isi,
dcd,
pj,
rj,
pjDD,
rjDD,
tie_ind,
thresh,
jitter_spectrum,
jitter_ind_spectrum,
spectrum_freqs,
hist,
hist_synth,
bin_centers,
mu_pos,
mu_neg,
) = calc_jitter(ui, eye_uis, pattern_len, ideal_xings_jit, actual_xings_jit, rel_thresh, dbg_obj=self)
self.isi_dfe = isi
self.dcd_dfe = dcd
self.pj_dfe = pj
self.rj_dfe = rj
self.pjDD_dfe = pjDD
self.rjDD_dfe = rjDD
self.mu_pos_dfe = mu_pos
self.mu_neg_dfe = mu_neg
self.thresh_dfe = thresh
self.jitter_dfe = hist
self.jitter_ext_dfe = hist_synth
self.jitter_spectrum_dfe = jitter_spectrum
self.jitter_ind_spectrum_dfe = jitter_ind_spectrum
self.tie_dfe = tie
self.tie_ind_dfe = tie_ind
self.f_MHz_dfe = array(spectrum_freqs) * 1.0e-6
dfe_spec = self.jitter_spectrum_dfe
self.jitter_rejection_ratio = zeros(len(dfe_spec))
jit_dfe_done = True
self.jitter_perf = nbits * nspb / (clock() - split_time)
self.total_perf = nbits * nspb / (clock() - start_time)
split_time = clock()
self.status = "Updating plots..."
except ValueError as err:
self.log(f"The jitter calculation could not be completed, due to the following error:\n{err}",
alert=False)
if jit_chnl_done:
if jit_tx_done:
if jit_ctle_done:
if jit_dfe_done:
self.status = "Exception: But, all finished!"
else:
self.status = "Exception: DFE jitter"
else:
self.status = "Exception: CTLE jitter"
else:
self.status = "Exception: Tx jitter"
else:
self.status = "Exception: channel jitter"
raise
_check_sim_status()
# Update plots.
try:
if update_plots:
update_results(self)
if not initial_run:
update_eyes(self)
self.plotting_perf = nbits * nspb / (clock() - split_time)
self.status = "Ready."
except Exception as err: # pylint: disable=broad-exception-caught
self.log(f"The following error occured, while trying to update the plots:\n{err}")
self.status = "Exception: plotting"
# raise
# Plot updating
# pylint: disable=too-many-locals,protected-access,too-many-statements
[docs]
def update_results(self):
"""Updates all plot data used by GUI.
Args:
self(PyBERT): Reference to an instance of the *PyBERT* class.
"""
# Copy globals into local namespace.
ui = self.ui
samps_per_ui = self.nspui
eye_uis = self.eye_uis
num_ui = self.nui
clock_times = self.clock_times
ui_ests = self.ui_ests
f = self.f
t = self.t
t_ns = self.t_ns
t_ns_chnl = self.t_ns_chnl
t_irfft = self.t_irfft
ignore_until = (num_ui - eye_uis) * ui
ignore_samps = (num_ui - eye_uis) * samps_per_ui
# Misc.
f_GHz = f / 1.0e9
len_f_GHz = len(f_GHz)
len_t = len(t_ns)
self.plotdata.set_data("f_GHz", f_GHz[1:])
self.plotdata.set_data("t_ns_chnl", t_ns_chnl)
self.plotdata.set_data("t_ns_irfft", t_irfft * 1e9)
if len_t > 1000: # to prevent Chaco plotting error with too much data
t_ns_plot = linspace(0, t_ns[-1], 1000)
else:
t_ns_plot = t_ns
self.plotdata.set_data("t_ns", t_ns_plot)
# DFE.
tap_weights = transpose(array(self.adaptation))
if len(tap_weights):
for k, tap_weight in enumerate(tap_weights): # pylint: disable=undefined-loop-variable
self.plotdata.set_data(f"tap{k + 1}_weights", tap_weight)
self.plotdata.set_data("tap_weight_index", list(range(len(tap_weight)))) # pylint: disable=undefined-loop-variable
else:
for k in range(len(self.dfe_tap_tuners)):
self.plotdata.set_data(f"tap{k + 1}_weights", zeros(10))
self.plotdata.set_data("tap_weight_index", list(range(10))) # pylint: disable=undefined-loop-variable
(bin_counts, bin_edges) = histogram(ui_ests, bins=100)
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2.0
clock_spec = rfft(ui_ests)
_f0 = 1 / (t[1] * len(t))
spec_freqs = [_f0 * k for k in range(len(t) // 2 + 1)]
self.plotdata.set_data("clk_per_hist_bins", bin_centers)
self.plotdata.set_data("clk_per_hist_vals", bin_counts)
self.plotdata.set_data("clk_spec", safe_log10(abs(clock_spec[1:]) / abs(clock_spec[1]))) # Omit the d.c. value and normalize to fundamental magnitude.
self.plotdata.set_data("clk_freqs", array(spec_freqs[1:]) * ui)
self.plotdata.set_data("dfe_out", self.dfe_out)
self.plotdata.set_data("clocks", self.clocks)
self.plotdata.set_data("lockeds", self.lockeds)
if len_t > 1000: # to prevent Chaco plotting error with too much data
krnl = interp1d(t_ns, self.ui_ests)
ui_ests_plot = krnl(t_ns_plot)
else:
ui_ests_plot = self.ui_ests
self.plotdata.set_data("ui_ests", ui_ests_plot)
# Impulse responses
self.plotdata.set_data("chnl_h", self.chnl_h)
self.plotdata.set_data("tx_h", self.tx_h)
self.plotdata.set_data("tx_out_h", self.tx_out_h)
self.plotdata.set_data("ctle_h", self.ctle_h)
self.plotdata.set_data("ctle_out_h", self.ctle_out_h)
self.plotdata.set_data("dfe_h", self.dfe_h)
self.plotdata.set_data("dfe_out_h", self.dfe_out_h)
# Step responses
self.plotdata.set_data("chnl_s", self.chnl_s)
self.plotdata.set_data("tx_s", self.tx_s)
self.plotdata.set_data("tx_out_s", self.tx_out_s)
self.plotdata.set_data("ctle_s", self.ctle_s)
self.plotdata.set_data("ctle_out_s", self.ctle_out_s)
self.plotdata.set_data("dfe_s", self.dfe_s)
self.plotdata.set_data("dfe_out_s", self.dfe_out_s)
# Pulse responses
self.plotdata.set_data("chnl_p", self.chnl_p)
self.plotdata.set_data("tx_out_p", self.tx_out_p)
self.plotdata.set_data("ctle_out_p", self.ctle_out_p)
self.plotdata.set_data("dfe_out_p", self.dfe_out_p)
# Frequency responses
self.plotdata.set_data("chnl_H_raw", 20.0 * safe_log10(abs(self.chnl_H_raw[1:len_f_GHz])))
self.plotdata.set_data("chnl_H", 20.0 * safe_log10(abs(self.chnl_H[1:len_f_GHz])))
self.plotdata.set_data("chnl_trimmed_H", 20.0 * safe_log10(abs(self.chnl_trimmed_H[1:len_f_GHz])))
self.plotdata.set_data("tx_H", 20.0 * safe_log10(abs(self.tx_H[1:])))
self.plotdata.set_data("tx_out_H", 20.0 * safe_log10(abs(self.tx_out_H[1:len_f_GHz])))
self.plotdata.set_data("ctle_H", 20.0 * safe_log10(abs(self.ctle_H[1:len_f_GHz])))
self.plotdata.set_data("ctle_out_H", 20.0 * safe_log10(abs(self.ctle_out_H[1:len_f_GHz])))
self.plotdata.set_data("dfe_H", 20.0 * safe_log10(abs(self.dfe_H[1:len_f_GHz])))
self.plotdata.set_data("dfe_out_H", 20.0 * safe_log10(abs(self.dfe_out_H[1:len_f_GHz])))
# Outputs
ideal_signal = self.ideal_signal[:len_t]
chnl_out = self.chnl_out[:len_t]
rx_in = self.rx_in[:len_t]
ctle_out = self.ctle_out[:len_t]
dfe_out = self.dfe_out[:len_t]
lockeds = self.lockeds[:len_t]
if len_t > 1000: # to prevent Chaco plotting error with too much data
krnl = interp1d(t_ns, ideal_signal)
ideal_signal_plot = krnl(t_ns_plot)
krnl = interp1d(t_ns, chnl_out)
chnl_out_plot = krnl(t_ns_plot)
krnl = interp1d(t_ns, rx_in)
rx_in_plot = krnl(t_ns_plot)
krnl = interp1d(t_ns, ctle_out)
ctle_out_plot = krnl(t_ns_plot)
krnl = interp1d(t_ns, dfe_out)
dfe_out_plot = krnl(t_ns_plot)
krnl = interp1d(t_ns, lockeds)
lockeds_plot = krnl(t_ns_plot)
else:
ideal_signal_plot = ideal_signal
chnl_out_plot = chnl_out
rx_in_plot = rx_in
ctle_out_plot = ctle_out
dfe_out_plot = dfe_out
lockeds_plot = lockeds
self.plotdata.set_data("ideal_signal", ideal_signal_plot)
self.plotdata.set_data("chnl_out", chnl_out_plot)
self.plotdata.set_data("rx_in", rx_in_plot)
self.plotdata.set_data("ctle_out", ctle_out_plot)
self.plotdata.set_data("dfe_out", dfe_out_plot)
self.plotdata.set_data("dbg_out", lockeds_plot)
# Jitter distributions
jitter_chnl = self.jitter_chnl # These are used again in bathtub curve generation, below.
jitter_tx = self.jitter_tx
jitter_ctle = self.jitter_ctle
jitter_dfe = self.jitter_dfe
jitter_bins = self.jitter_bins
self.plotdata.set_data("jitter_bins", array(self.jitter_bins) * 1e12)
self.plotdata.set_data("jitter_chnl", jitter_chnl * 1e-12) # PDF (/ps)
self.plotdata.set_data("jitter_ext_chnl", self.jitter_ext_chnl * 1e-12)
self.plotdata.set_data("jitter_tx", jitter_tx * 1e-12)
self.plotdata.set_data("jitter_ext_tx", self.jitter_ext_tx * 1e-12)
self.plotdata.set_data("jitter_ctle", jitter_ctle * 1e-12)
self.plotdata.set_data("jitter_ext_ctle", self.jitter_ext_ctle * 1e-12)
self.plotdata.set_data("jitter_dfe", jitter_dfe * 1e-12)
self.plotdata.set_data("jitter_ext_dfe", self.jitter_ext_dfe * 1e-12)
# Jitter spectrums
log10_ui = safe_log10(ui)
self.plotdata.set_data("f_MHz", self.f_MHz[1:])
self.plotdata.set_data("f_MHz_dfe", self.f_MHz_dfe[1:])
self.plotdata.set_data("jitter_spectrum_chnl", 10.0 * (safe_log10(self.jitter_spectrum_chnl[1:]) - log10_ui))
self.plotdata.set_data("jitter_ind_spectrum_chnl", 10.0 * (safe_log10(self.jitter_ind_spectrum_chnl[1:]) - log10_ui))
self.plotdata.set_data("thresh_chnl", 10.0 * (safe_log10(self.thresh_chnl[1:]) - log10_ui))
self.plotdata.set_data("jitter_spectrum_tx", 10.0 * (safe_log10(self.jitter_spectrum_tx[1:]) - log10_ui))
self.plotdata.set_data("jitter_ind_spectrum_tx", 10.0 * (safe_log10(self.jitter_ind_spectrum_tx[1:]) - log10_ui))
self.plotdata.set_data("thresh_tx", 10.0 * (safe_log10(self.thresh_tx[1:]) - log10_ui))
self.plotdata.set_data("jitter_spectrum_ctle", 10.0 * (safe_log10(self.jitter_spectrum_ctle[1:]) - log10_ui))
self.plotdata.set_data("jitter_ind_spectrum_ctle", 10.0 * (safe_log10(self.jitter_ind_spectrum_ctle[1:]) - log10_ui))
self.plotdata.set_data("thresh_ctle", 10.0 * (safe_log10(self.thresh_ctle[1:]) - log10_ui))
self.plotdata.set_data("jitter_spectrum_dfe", 10.0 * (safe_log10(self.jitter_spectrum_dfe[1:]) - log10_ui))
self.plotdata.set_data("jitter_ind_spectrum_dfe", 10.0 * (safe_log10(self.jitter_ind_spectrum_dfe[1:]) - log10_ui))
self.plotdata.set_data("thresh_dfe", 10.0 * (safe_log10(self.thresh_dfe[1:]) - log10_ui))
self.plotdata.set_data("jitter_rejection_ratio", self.jitter_rejection_ratio[1:])
# Bathtubs
bathtub_chnl = make_bathtub(
jitter_bins, jitter_chnl, min_val=0.1 * MIN_BATHTUB_VAL,
rj=self.rjDD_chnl, mu_r=self.mu_pos_chnl, mu_l=self.mu_neg_chnl, extrap=True)
bathtub_tx = make_bathtub(
jitter_bins, jitter_tx, min_val=0.1 * MIN_BATHTUB_VAL,
rj=self.rjDD_tx, mu_r=self.mu_pos_tx, mu_l=self.mu_neg_tx, extrap=True)
bathtub_ctle = make_bathtub(
jitter_bins, jitter_ctle, min_val=0.1 * MIN_BATHTUB_VAL,
rj=self.rjDD_ctle, mu_r=self.mu_pos_ctle, mu_l=self.mu_neg_ctle, extrap=True)
bathtub_dfe = make_bathtub(
jitter_bins, jitter_dfe, min_val=0.1 * MIN_BATHTUB_VAL,
rj=self.rjDD_dfe, mu_r=self.mu_pos_dfe, mu_l=self.mu_neg_dfe, extrap=True)
self.plotdata.set_data("bathtub_chnl", safe_log10(bathtub_chnl))
self.plotdata.set_data("bathtub_tx", safe_log10(bathtub_tx))
self.plotdata.set_data("bathtub_ctle", safe_log10(bathtub_ctle))
self.plotdata.set_data("bathtub_dfe", safe_log10(bathtub_dfe))
# Eyes
width = 2 * samps_per_ui
xs = linspace(-ui * 1.0e12, ui * 1.0e12, width)
height = 1000
tiny_noise = normal(scale=1e-3, size=len(chnl_out[ignore_samps:])) # to make channel eye easier to view.
chnl_out_noisy = self.chnl_out[ignore_samps:] + tiny_noise
y_max = 1.1 * max(abs(array(chnl_out_noisy)))
eye_chnl = calc_eye(ui, samps_per_ui, height, chnl_out_noisy, y_max)
y_max = 1.1 * max(abs(array(self.rx_in[ignore_samps:])))
eye_tx = calc_eye(ui, samps_per_ui, height, self.rx_in[ignore_samps:], y_max)
y_max = 1.1 * max(abs(array(self.ctle_out[ignore_samps:])))
eye_ctle = calc_eye(ui, samps_per_ui, height, self.ctle_out[ignore_samps:], y_max)
y_max = 1.1 * max(abs(array(self.dfe_out[ignore_samps:])))
self.dfe_eye_ymax = y_max
i = 0
len_clock_times = len(clock_times)
while i < len_clock_times and clock_times[i] < ignore_until:
i += 1
if i >= len(clock_times):
self.log("ERROR: Insufficient coverage in 'clock_times' vector.")
eye_dfe = calc_eye(ui, samps_per_ui, height, self.dfe_out[ignore_samps:], y_max)
else:
eye_dfe = calc_eye(ui, samps_per_ui, height, self.dfe_out[ignore_samps:], y_max, array(clock_times[i:]) - ignore_until)
self.plotdata.set_data("eye_index", xs)
self.plotdata.set_data("eye_chnl", eye_chnl)
self.plotdata.set_data("eye_tx", eye_tx)
self.plotdata.set_data("eye_ctle", eye_ctle)
self.plotdata.set_data("eye_dfe", eye_dfe)
[docs]
def update_eyes(self):
"""Update the heat plots representing the eye diagrams.
Args:
self(PyBERT): Reference to an instance of the *PyBERT* class.
"""
ui = self.ui
samps_per_ui = self.nspui
width = 2 * samps_per_ui
height = 100
xs = linspace(-ui * 1.0e12, ui * 1.0e12, width)
y_max = 1.1 * max(abs(array(self.chnl_out)))
ys = linspace(-y_max, y_max, height)
self.plots_eye.components[0].components[0].index.set_data(xs, ys)
self.plots_eye.components[0].x_axis.mapper.range.low = xs[0]
self.plots_eye.components[0].x_axis.mapper.range.high = xs[-1]
self.plots_eye.components[0].y_axis.mapper.range.low = ys[0]
self.plots_eye.components[0].y_axis.mapper.range.high = ys[-1]
self.plots_eye.components[0].invalidate_draw()
y_max = 1.1 * max(abs(array(self.rx_in)))
ys = linspace(-y_max, y_max, height)
self.plots_eye.components[1].components[0].index.set_data(xs, ys)
self.plots_eye.components[1].x_axis.mapper.range.low = xs[0]
self.plots_eye.components[1].x_axis.mapper.range.high = xs[-1]
self.plots_eye.components[1].y_axis.mapper.range.low = ys[0]
self.plots_eye.components[1].y_axis.mapper.range.high = ys[-1]
self.plots_eye.components[1].invalidate_draw()
y_max = 1.1 * max(abs(array(self.dfe_out)))
ys = linspace(-y_max, y_max, height)
self.plots_eye.components[3].components[0].index.set_data(xs, ys)
self.plots_eye.components[3].x_axis.mapper.range.low = xs[0]
self.plots_eye.components[3].x_axis.mapper.range.high = xs[-1]
self.plots_eye.components[3].y_axis.mapper.range.low = ys[0]
self.plots_eye.components[3].y_axis.mapper.range.high = ys[-1]
self.plots_eye.components[3].invalidate_draw()
self.plots_eye.components[2].components[0].index.set_data(xs, ys)
self.plots_eye.components[2].x_axis.mapper.range.low = xs[0]
self.plots_eye.components[2].x_axis.mapper.range.high = xs[-1]
self.plots_eye.components[2].y_axis.mapper.range.low = ys[0]
self.plots_eye.components[2].y_axis.mapper.range.high = ys[-1]
self.plots_eye.components[2].invalidate_draw()
self.plots_eye.request_redraw()