Source code for vtool.score_normalization

# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import numpy as np
import utool as ut
import ubelt as ub
import scipy.interpolate
from functools import partial


[docs]def check_unused_kwargs(kwargs, expected_keys): unused_keys = set(kwargs.keys()) - set(expected_keys) if len(unused_keys) > 0: print('unused kwargs keys = %r' % (unused_keys))
[docs]def testdata_score_normalier( tp_bumps=[(6.5, 256)], tn_bumps=[(3.5, 256)], tp_scale=1.0, tn_scale=1.0, min_clip=None, **kwargs ): rng = np.random.RandomState(seed=0) # Get a training sample tp_support = np.hstack( [rng.normal(loc=loc, scale=tp_scale, size=(size,)) for loc, size in tp_bumps] ) tn_support = np.hstack( [rng.normal(loc=loc, scale=tn_scale, size=(size,)) for loc, size in tn_bumps] ) if min_clip is not None: tp_support[tp_support < min_clip] = min_clip tn_support[tn_support < min_clip] = min_clip data = np.hstack((tp_support, tn_support)) labels = np.array([True] * len(tp_support) + [False] * len(tn_support)) encoder = ScoreNormalizer(**kwargs) encoder.fit(data, labels) return encoder, data, labels
[docs]def get_left_area(ydata, xdata, index_list): """area to the left of each index point""" left_area = np.array( [np.trapz(ydata[: ix + 1], xdata[: ix + 1]) for ix in index_list] ) return left_area
[docs]def get_right_area(ydata, xdata, index_list): """area to the right of each index point""" right_area = np.array([np.trapz(ydata[ix:], xdata[ix:]) for ix in index_list]) return right_area
[docs]class ScoreNormVisualizeClass(object): """ # HACK; eventually move all individual plots into a class structure """ def _hack_vizlearn(encoder, **kwargs): if 'target_tpr' in kwargs: print('_HACK VIZLERAN TARGET') verbose = ut.VERBOSE score_thresh = encoder.learn_threshold(verbose=verbose, **kwargs) prob_thresh = encoder.learned_thresh # prob_thresh = encoder.normalize_scores(score_thresh) else: # prob_thresh = encoder.learned_thresh # score_thresh = encoder.inverse_normalize(prob_thresh) print('_HACK VIZLERAN2') score_thresh = encoder.learn_threshold2() prob_thresh = encoder.normalize_scores(score_thresh) return score_thresh, prob_thresh def _plot_score_support_hist(encoder, fnum, pnum=(1, 1, 1), **kwargs): import wbia.plottool as pt fnum = pt.ensure_fnum(fnum) tup = encoder.get_partitioned_support() tp_support, tn_support, part_attrs = tup score_thresh, prob_thresh = encoder._hack_vizlearn(**kwargs) true_color = pt.TRUE_BLUE # pt.TRUE_GREEN false_color = pt.FALSE_RED support_kw = dict( score_lbls=('trueneg', 'truepos'), score_colors=(false_color, true_color), titlesuf=kwargs.get('titlesuf', ''), ) score_range = kwargs.get('score_range', None) pt.plot_score_histograms( (tn_support, tp_support), score_thresh=score_thresh, score_label='score', fnum=fnum, pnum=pnum, bin_width=kwargs.get('bin_width', None), num_bins=kwargs.get('num_bins', None), overlay_prob_given_list=(encoder.p_score_given_tn, encoder.p_score_given_tp), overlay_score_domain=encoder.score_domain, xlim=score_range, **support_kw ) def _plot_roc(encoder, fnum, pnum, **kwargs): import vtool as vt import wbia.plottool as pt # NOQA tup = encoder.get_partitioned_support() tp_support, tn_support, part_attrs = tup scores = np.hstack([tn_support, tp_support]) labels = np.array([False] * len(tn_support) + [True] * len(tp_support)) # probs = encoder.normalize_scores(scores) probs = normalize_scores(encoder.score_domain, encoder.p_tp_given_score, scores) confusions = vt.ConfusionMetrics().fit(probs, labels) score_thresh, prob_thresh = encoder._hack_vizlearn(**kwargs) # target_tpr = None target_tpr = confusions.get_metric_at_thresh('tpr', prob_thresh) # print('target_tpr = %r' % (target_tpr,)) ROCInteraction = vt.interact_roc_factory( confusions, target_tpr, show_operating_point=True ) fnum = pt.ensure_fnum(fnum) ROCInteraction.static_plot(fnum, pnum, **kwargs) def _plot_prebayes(encoder, fnum, pnum, **kwargs): score_thresh, prob_thresh = encoder._hack_vizlearn(**kwargs) plot_prebayes_pdf( encoder.score_domain, encoder.p_score_given_tn, encoder.p_score_given_tp, encoder.p_score, score_thresh=score_thresh, cfgstr='', fnum=fnum, pnum=pnum, ) def _plot_postbayes(encoder, fnum, pnum, **kwargs): score_thresh, prob_thresh = encoder._hack_vizlearn(**kwargs) plot_postbayes_pdf( encoder.score_domain, encoder.p_tn_given_score, encoder.p_tp_given_score, prob_thresh=prob_thresh, score_thresh=score_thresh, cfgstr='', fnum=fnum, pnum=pnum, )
[docs]class ScoreNormalizer(ut.Cachable, ScoreNormVisualizeClass): """ Conforms to scikit-learn Estimator interface CommandLine: python -m vtool.score_normalization --test-ScoreNormalizer --show --cmd Kwargs: tpr (float): target true positive rate (default .90) fpr (float): target false positive rate (default None) reverse (bool): True if lower scores are better, False if higher scores are better (default=None) Example: >>> # ENABLE_DOCTEST >>> from vtool.score_normalization import * # NOQA >>> import vtool as vt >>> encoder = ScoreNormalizer() >>> X, y = vt.demodata.testdata_binary_scores() >>> attrs = {'index': np.arange(len(y)) * ((2 * y) - 1)} >>> encoder.fit(X, y, attrs) >>> # xdoctest: +REQUIRES(--show) >>> encoder.visualize() >>> ut.show_if_requested() """ def __init__(encoder, **kwargs): encoder.learn_kw = ut.update_existing( dict( gridsize=1024, adjust=8, monotonize=False, # monotonize=True, # clip_factor=(ut.PHI + 1), clip_factor=None, reverse=None, p_tp_method='eq', ), kwargs, ) # check_unused_kwargs(kwargs, encoder.learn_kw.keys()) encoder.thresh_kw = ut.update_existing( dict( # Target recall for learned threshold tpr=None, fpr=None, ), kwargs, ) assert not any( key.startswith('target_') for key in kwargs ), 'old interface of target_<metric> used just use <metric>' if not any(encoder.thresh_kw.values()): encoder.thresh_kw['tpr'] = 0.90 # Support data encoder.support = dict( X=None, y=None, attrs=None, ) # Learned score normalization encoder.score_domain = None encoder.p_tp_given_score = None encoder.p_tn_given_score = None encoder.p_score_given_tn = None encoder.p_score_given_tp = None encoder.p_score = None # Learneed classification threshold encoder.learned_thresh = None # Learned interpolation function encoder.interp_fn = None def __getstate__(encoder): """ CommandLine: python -m vtool.score_normalization --test-__getstate__ Example: >>> # ENABLE_DOCTEST >>> from vtool.score_normalization import * # NOQA >>> encoder = ScoreNormalizer() >>> from six.moves import cPickle as pickle >>> dump = pickle.dumps(encoder) >>> encoder2 = pickle.loads(dump) """ state_dict = encoder.__dict__.copy() state_dict['interp_fn'] = None return state_dict
[docs] def get_prefix(encoder): return 'ScoreNorm_'
def __setstate__(encoder, state_dict): encoder.__dict__.update(state_dict) encoder._update_interp_fn()
[docs] def fit(encoder, X, y, attrs=None, verbose=False, finite_only=True): """ Fits estimator to data Args: X (ndarray): one dimensional scores y (ndarray): binary labels attrs (dict): dictionary of data attributes """ # Record support encoder.support['X'] = X encoder.support['y'] = y encoder.support['attrs'] = {} if attrs is None else attrs encoder.learn_probabilities(verbose=verbose) try: encoder.learn_threshold(verbose=verbose) except Exception as ex: ut.printex(ex, 'could not learn thresh', iswarning=True)
@staticmethod def _to_xy(tp_scores, tn_scores, part_attrs=None): return flatten_scores(tp_scores, tn_scores, part_attrs) @staticmethod def _to_partitioned(X, y, attrs={}): return partition_scores(X, y, attrs)
[docs] def fit_partitioned(encoder, tp_scores, tn_scores, part_attrs=None, **kwargs): """convinience func to fit only scores that have been separated instead of labeled""" fitargs = flatten_scores(tp_scores, tn_scores, part_attrs) return encoder.fit(*fitargs, **kwargs)
[docs] def get_partitioned_support(encoder): """convinience get prepartitioned data""" X, y, attrs = encoder.get_support() return partition_scores(X, y, attrs)
[docs] def get_support(encoder, finite_only=True): """ return X, y, and attrs """ X = encoder.support['X'] y = encoder.support['y'] attrs = encoder.support['attrs'] if finite_only: mask = np.isfinite(X) X = X.compress(mask) y = y.compress(mask) attrs = ut.map_dict_vals(partial(np.compress, mask), attrs) return X, y, attrs
[docs] def learn_probabilities(encoder, verbose=False): """ Kernel density estimation """ # X, y = encoder.get_support() tp_support, tn_support, part_attrs = encoder.get_partitioned_support() # heuristic encoder.learn_kw['reverse'] = tp_support.mean() < tn_support.mean() if verbose: print('[scorenorm] setting reverse = %r' % (encoder.learn_kw['reverse'])) tup = learn_score_normalization( tp_support, tn_support, return_all=True, verbose=verbose, **encoder.learn_kw ) # unpack ( score_domain, p_tp_given_score, p_tn_given_score, p_score_given_tp, p_score_given_tn, p_score, ) = tup encoder.score_domain = score_domain encoder.p_tp_given_score = p_tp_given_score encoder.p_tn_given_score = p_tn_given_score encoder.p_score_given_tn = p_score_given_tn encoder.p_score_given_tp = p_score_given_tp encoder.p_score = p_score encoder._update_interp_fn()
def _update_interp_fn(encoder): """ Internal call to update interpolation function. Used when learning and when loading from cache. """ if encoder.p_tp_given_score is not None: encoder.interp_fn = scipy.interpolate.interp1d( encoder.score_domain, encoder.p_tp_given_score, kind='linear', copy=False, assume_sorted=False, )
[docs] def learn_threshold2(encoder): """ Finds a cutoff where the probability of a truepos stats becoming greater than probability of trueneg CommandLine: python -m vtool.score_normalization --exec-learn_threshold2 --show Example: >>> from vtool.score_normalization import * # NOQA >>> import vtool as vt >>> #encoder, X, y = testdata_score_normalier([(3.5, 256), (9.5, 1024), (15.5, 2048)], [(6.5, 256), (12.5, 5064), (18.5, 128)], adjust=1, p_tp_method='ratio') >>> encoder, X, y = testdata_score_normalier([(3.5, 64), (9.5, 1024), (15.5, 5064)], [(6.5, 256), (12.5, 2048), (18.5, 128)], adjust=1, p_tp_method='ratio') >>> #encoder, X, y = testdata_score_normalier(adjust=1) >>> #encoder, X, y = testdata_score_normalier([(3.5, 2048)], [(30.5, 128)], tn_scale=.1, adjust=1) >>> #encoder, X, y = testdata_score_normalier([(0, 64)], [(-.1, 12)], adjust=8, min_clip=0) >>> locals_ = ut.exec_func_src(encoder.learn_threshold2) >>> exec(ut.execstr_dict(locals_)) >>> # xdoctest: +REQUIRES(--show) >>> import wbia.plottool as pt >>> pt.ensureqt() >>> #pt.plot(xdata[0:-2], np.diff(np.diff(closeness))) >>> #maxima_x, maxima_y, argmaxima = vt.hist_argmaxima(closeness) >>> fnum = 100 >>> pt.multi_plot(xdata, [tp_curve, tn_curve, closeness, ], >>> label_list=['p(tp | s)', 'p(tn | s)', 'closeness', ], marker='', >>> linewidth_list=[4, 4, 1,], title='intersection points', >>> pnum=(4, 1, 1), fnum=fnum, xmax=xdata.max(), xmin=0) >>> pt.plot(xdata[argmaxima], closeness[argmaxima], 'rx', label='closeness maxima') >>> pt.plot(x_submax, y_submax, 'o', label='chosen') >>> #pt.plot(xdata[argmaxima], curveness[argmaxima], 'rx', label='curveness maxima') >>> pt.legend() >>> #pt.plot(x_submax, y_submax, 'o') >>> pt.plot(xdata[argmaxima], tp_curve[argmaxima], 'rx') >>> pt.plot(xdata[argmaxima], tn_curve[argmaxima], 'rx') >>> pt.plot(xdata[argmaxima], tp_curve[argmaxima], 'rx') >>> pt.plot(xdata[argmaxima], tn_curve[argmaxima], 'rx') >>> #pt.plot(xdata[argmaxima], encoder.interp_fn(x_submax), 'rx') >>> _mkinterp = ut.partial( >>> scipy.interpolate.interp1d, kind='linear', copy=False, >>> assume_sorted=False, bounds_error=False) >>> _interp_sgtn = _mkinterp(xdata, tn_curve) >>> _interp_sgtp = _mkinterp(xdata, tp_curve) >>> pt.plot(x_submax, _interp_sgtn(x_submax), 'go') >>> pt.plot(x_submax, _interp_sgtp(x_submax), 'bx') >>> # >>> pt.multi_plot(xdata[argmaxima], [tp_area, fp_area, tn_area, fn_area], title='intersection areas', >>> label_list=['tp_area', 'fp_area', 'tn_area', 'fn_area'], markers=['o', 'd', 'o', '.'], >>> pnum=(4, 1, 2), fnum=fnum, xmax=xdata.max(), xmin=0) >>> # >>> pt.multi_plot(xdata[argmaxima], [lr_pos, lr_neg, acc], title='intersection quality (liklihood ratios)', >>> label_list=['lr_pos=tp/fp', 'lr_neg=fn/tn', 'acc'], markers=['o', 'o', '*'], >>> pnum=(4, 1, 3), fnum=fnum, xmax=xdata.max(), xmin=0) >>> # >>> pnum_ = pt.make_pnum_nextgen(4, 3, start=9) >>> encoder._plot_score_support_hist(fnum=fnum, pnum=pnum_()) >>> #encoder._plot_prebayes(fnum=fnum, pnum=pnum_()) >>> encoder._plot_postbayes(fnum=fnum, pnum=pnum_()) >>> encoder._plot_roc(fnum=fnum, pnum=pnum_()) >>> pt.adjust_subplots(hspace=.5, top=.95, bottom=.08) >>> pt.show_if_requested() """ import vtool as vt if False: # New stuff with area should make this irrelevant # weights = encoder.p_score_given_tp weights = encoder.p_score_given_tn values = encoder.score_domain mean = np.average(values, weights=weights) std = np.sqrt(np.average((values - mean) ** 2, weights=weights)) score_cutoff = mean + (2.5 * std) cutx = np.sum(encoder.score_domain < score_cutoff) xdata = encoder.score_domain[:cutx] tp_curve = encoder.p_score_given_tp[:cutx] tn_curve = encoder.p_score_given_tn[:cutx] else: xdata = encoder.score_domain tp_curve = encoder.p_score_given_tp tn_curve = encoder.p_score_given_tn if 0: _p = np.where(xdata > 50)[0][0] xdata = xdata[0:_p] tp_curve = tp_curve[0:_p] tn_curve = tn_curve[0:_p] # print('xdata = %r' % (xdata,)) # print('tp_curve = %r' % (tp_curve,)) # print('tn_curve = %r' % (tn_curve,)) # tp_curve[:] = .1 # tn_curve[:] = .5 # Find locations of intersection closeness = -np.abs(tp_curve - tn_curve) closeness = closeness - closeness.min() # closeness # print('closeness = %r' % (closeness,)) # argmaxima = vt.hist_argmaxima2(closeness) # print('argmaxima = %r' % (argmaxima,)) argmaxima = vt.hist_argmaxima2(closeness) # argmaxima = np.arange(2, len(closeness) - 2) # curvature = -np.gradient(np.gradient(closeness)) # curveness = (curvature - curvature.min()) / (curvature.max() - curvature.min()) # Remove maxima points with almost no curvature # if False: # if len(argmaxima) > 1: # #curveness[argmaxima] # curvature_ = curvature[argmaxima] # valid = curvature_ > 1e-5 # #valid = curvature[argmaxima] > np.median(curvature[argmaxima]) # #valid = curvature[argmaxima] > np.median(curvature[argmaxima]) # if np.any(valid): # argmaxima = argmaxima[valid] # argmaxima2 = vt.hist_argmaxima2(-deriv_no2) # if len(np.intersect1d(argmaxima2, argmaxima)) > 0: # argmaxima = np.intersect1d(argmaxima2, argmaxima) # Now find which intersection points are "best" # TODO: have user specify metric that they care about # https://en.wikipedia.org/wiki/Confusion_matrix if True: # Use area under curves to determine the probability density of tp, # fp, tn, fn at each candidate threshold. tp_area = get_right_area(tp_curve, xdata, argmaxima) fp_area = get_right_area(tn_curve, xdata, argmaxima) tn_area = get_left_area(tn_curve, xdata, argmaxima) fn_area = get_left_area(tp_curve, xdata, argmaxima) # NOQA # Choose the location of intersection that performs best on some test # statistic. (Positive likelihood ratio) lr_pos_ = tp_area / fp_area lr_neg_ = fn_area / tn_area # Accuracy is (tp + tn) / total acc = (tp_area + tn_area) / 2 # print('lr_neg = %r' % (lr_neg,)) # print('lr_pos = %r' % (lr_pos,)) # Normalize likelihood into range 0 to 1 pos_norm = max(1, vt.safe_max(lr_pos_, fill=1, finite=True, nans=False)) neg_norm = max(1, vt.safe_max(lr_neg_, fill=1, finite=True, nans=False)) lr_pos = lr_pos_ / pos_norm # NOQA lr_neg = lr_neg_ / neg_norm # NOQA # chosen_metric = lr_pos chosen_metric = acc # Invalidate impossible values isvalid = np.isfinite(chosen_metric) if np.any(isvalid): valid_argmaxima = argmaxima[isvalid] chosen_metric = chosen_metric[isvalid] else: valid_argmaxima = argmaxima # Invalidate values based on "reasonable" heuristics # reasonable_tp = tp_area > .1 # reasonable_fp = tn_area > .1 # reasonable_flags = np.logical_and(reasonable_tp, reasonable_fp) # if np.any(reasonable_flags): # lr_pos = lr_pos.compress(reasonable_flags) # lr_neg = lr_neg.compress(reasonable_flags) # Choose a finite argmax sortx = chosen_metric.argsort()[::-1] closeness_argmax = valid_argmaxima[sortx[0]] # maxpos = valid_argmaxima[lr_pos.argmax()] # print('lr_pos.argmax = %r' % (lr_pos.argmax,)) # Hack for infinity and nans. bring thems out of the 0 and 1 range, but only by a bit. # lr_pos[np.isnan(lr_pos)] = -.1 # lr_neg[np.isnan(lr_neg)] = -.1 # lr_pos[np.isinf(lr_pos)] = 1.1 # lr_neg[np.isinf(lr_neg)] = 1.1 else: closeness_argmax = closeness.argmax() if closeness_argmax == len(closeness) - 1: y_submax = closeness[-2:-1] x_submax = xdata[-2:-1] elif closeness_argmax == 0: y_submax = closeness[0:1] x_submax = xdata[0:1] else: # argmaxima, hist_, centers = maxpos, closeness, xdata x_submax, y_submax = vt.interpolate_submaxima( np.array([closeness_argmax]), closeness, xdata ) score_thresh = x_submax[0] if ut.get_argflag('--debug-scorethresh') and not getattr(encoder, 'block', False): encoder.block = True ut.exec_func_doctest( encoder.learn_threshold2, start_sentinal='import wbia.plottool as pt', end_sentinal='pt.show_if_requested()', ) encoder.block = False return score_thresh
[docs] def learn_threshold(encoder, verbose=False, **thresh_kw): """ Learns cutoff threshold that achieves the target confusion metric Typically a desired false positive rate (recall) is specified """ import vtool as vt # select a cutoff threshold # import sklearn.metrics if len(thresh_kw) > 0: _thresh_kw = ut.map_dict_keys(lambda x: x.replace('target_', ''), thresh_kw) else: _thresh_kw = encoder.thresh_kw # Select threshold that gives target confusion _selected_items = [item for item in _thresh_kw.items() if item[1] is not None] assert len(_selected_items) == 1, 'Can only specify one desired confusion metric' # choose how to optimize the threshold metric, value = _selected_items[0] # Get classifier confusions (maybe dont need probs here) X, y, attrs = encoder.get_support() probs = encoder.normalize_scores(X) if False: confusions_score = vt.ConfusionMetrics().fit(-X, y, verbose=verbose) confusions_prob = vt.ConfusionMetrics().fit(probs, y, verbose=verbose) _score_thresh = confusions_score.get_thresh_at_metric(metric, value) _prob_thresh = confusions_prob.get_thresh_at_metric(metric, value) _inv_score = encoder.inverse_normalize(_prob_thresh) _inv_prob = encoder.normalize_scores(-_score_thresh) _inv_prob - _prob_thresh _inv_score - (-_score_thresh) confusions = vt.ConfusionMetrics().fit(probs, y, verbose=verbose) prob_thresh = confusions.get_thresh_at_metric(metric, value) # target_value = confusions.get_metric_at_thresh(metric, prob_thresh) # check_thresh = confusions.get_thresh_at_metric(metric, target_value) score_thresh = encoder.inverse_normalize(prob_thresh) if verbose: print( '[scorenorm] Learning threshold to achieve %s=%.5f' % ( metric.upper(), value, ) ) if encoder.learned_thresh is not None: print('[scorenorm] * learned_thresh = %.5f' % (encoder.learned_thresh,)) else: print('[scorenorm] * learned_thresh = %r' % (encoder.learned_thresh,)) print('[scorenorm] * score_thresh = %.5f' % (score_thresh,)) if metric == 'tpr': print('[scorenorm] * fpr = %r' % (confusions.get_fpr_at_recall(value),)) elif metric == 'fpr': print('[scorenorm] * tpr = %r' % (confusions.get_recall_at_fpr(value),)) # TODO: maybe do not change state? encoder.learned_thresh = prob_thresh return score_thresh
[docs] def inverse_normalize(encoder, probs): inverse_interp = scipy.interpolate.interp1d( encoder.p_tp_given_score, encoder.score_domain, kind='linear', copy=False, assume_sorted=False, ) scores = inverse_interp(probs) return scores
[docs] def normalize_scores(encoder, X): is_iterable = ut.isiterable(X) if not is_iterable: X = np.array([X]) prob = normalize_scores( encoder.score_domain, encoder.p_tp_given_score, X, interp_fn=encoder.interp_fn ) if not is_iterable: prob = prob[0] return prob
[docs] def predict(encoder, X): """Predict true or false of ``X``.""" prob = encoder.normalize_scores(X) pred = prob > encoder.learned_thresh return pred
[docs] def get_accuracy(encoder, X, y): pred = encoder.predict(X) is_correct = pred == y accuracy = (is_correct).mean() return accuracy
[docs] def get_error_indicies(encoder, X, y): r""" Returns the indicies of the most difficult type I and type II errors. Example: >>> # DISABLE_DOCTEST >>> from vtool.score_normalization import * # NOQA >>> encoder, X, y = testdata_score_normalier() >>> (fp_indicies, fn_indicies) = encoder.get_error_indicies(X, y) >>> fp_X = X.take(fp_indicies)[0:3] >>> fn_X = X.take(fn_indicies)[0:3] >>> result = 'fp_X = ' + ub.repr2(fp_X) >>> result += '\nfn_X = ' + ub.repr2(fn_X) >>> print(result) fp_X = np.array([ 6.196, 5.912, 5.804]) fn_X = np.array([ 3.947, 4.277, 4.43 ]) """ prob = encoder.normalize_scores(X) pred = prob > encoder.learned_thresh is_correct = pred == y # Find misspredictions is_error = np.logical_not(is_correct) # get indexes of misspredictions error_indicies = np.where(is_error)[0] # Separate by Type I and Type II error error_y = y.take(error_indicies) fp_indicies_ = error_indicies.compress(np.logical_not(error_y)) fn_indicies_ = error_indicies.compress(error_y) # Sort errors by difficulty fp_sortx = prob.take(fp_indicies_).argsort()[::-1] fn_sortx = prob.take(fn_indicies_).argsort() fp_indicies = fp_indicies_.take(fp_sortx) fn_indicies = fn_indicies_.take(fn_sortx) return fp_indicies, fn_indicies
[docs] def get_correct_indices(encoder, X, y): r""" Args: X (ndarray): data y (ndarray): labels Returns: tuple: (fp_indicies, fn_indicies) CommandLine: python -m vtool.score_normalization --test-get_correct_indices Example: >>> # DISABLE_DOCTEST >>> from vtool.score_normalization import * # NOQA >>> encoder, X, y = testdata_score_normalier() >>> (tp_indicies, tn_indicies) = encoder.get_correct_indices(X, y) >>> tp_X = X.take(tp_indicies)[0:3] >>> tn_X = X.take(tn_indicies)[0:3] >>> result = 'tp_X = ' + ub.repr2(tp_X) >>> result += '\ntn_X = ' + ub.repr2(tn_X) >>> print(result) tp_X = np.array([ 8.883, 8.77 , 8.759]) tn_X = np.array([ 0.727, 0.76 , 0.841]) """ prob = encoder.normalize_scores(X) pred = prob > encoder.learned_thresh is_correct = pred == y # get indexes of misspredictions correct_indicies = np.where(is_correct)[0] # Separate by Type I and Type II error correct_y = y.take(correct_indicies) tp_indicies_ = correct_indicies.compress(correct_y) tn_indicies_ = correct_indicies.compress(np.logical_not(correct_y)) # Sort by most correct cases tp_sortx = prob.take(tp_indicies_).argsort()[::-1] tn_sortx = prob.take(tn_indicies_).argsort() tp_indicies = tp_indicies_.take(tp_sortx) tn_indicies = tn_indicies_.take(tn_sortx) return tp_indicies, tn_indicies
[docs] def get_confusion_indicies(encoder, X, y): """ combination of get_correct_indices and get_error_indicies """ prob = encoder.normalize_scores(X) pred = prob > encoder.learned_thresh # Find correct and misspredictions is_correct = pred == y is_error = np.logical_not(is_correct) correct_indicies = np.where(is_correct)[0] error_indicies = np.where(is_error)[0] # Separate by Correct and Type I and Type II error error_y = y.take(error_indicies) correct_y = y.take(correct_indicies) fp_indicies_ = error_indicies.compress(np.logical_not(error_y)) fn_indicies_ = error_indicies.compress(error_y) tp_indicies_ = correct_indicies.compress(correct_y) tn_indicies_ = correct_indicies.compress(np.logical_not(correct_y)) # Sort errors by difficulty and other cases by correctness fp_sortx = prob.take(fp_indicies_).argsort()[::-1] fn_sortx = prob.take(fn_indicies_).argsort() tp_sortx = prob.take(tp_indicies_).argsort()[::-1] tn_sortx = prob.take(tn_indicies_).argsort() indicies = ut.DynStruct() indicies.tp = tp_indicies_.take(tp_sortx) indicies.tn = tn_indicies_.take(tn_sortx) indicies.fp = fp_indicies_.take(fp_sortx) indicies.fn = fn_indicies_.take(fn_sortx) return indicies
[docs] def visualize(encoder, **kwargs): r""" shows details about the score normalizer Kwargs: fnum figtitle with_hist interactive with_scores with_roc with_precision_recall CommandLine: python -m vtool.score_normalization --exec-ScoreNormalizer.visualize:0 --show python -m vtool.score_normalization --exec-ScoreNormalizer.visualize:1 --show Example: >>> # UNSTABLE_DOCTEST >>> from vtool.score_normalization import * # NOQA >>> import vtool as vt >>> encoder = ScoreNormalizer() >>> X, y = vt.demodata.testdata_binary_scores() >>> encoder.fit(X, y) >>> kwargs = dict( >>> with_pr=True, interactive=True, with_roc=True, >>> with_hist=True) >>> encoder.visualize(**kwargs) >>> ut.show_if_requested() Example: >>> # UNSTABLE_DOCTEST >>> from vtool.score_normalization import * # NOQA >>> import vtool as vt >>> encoder = ScoreNormalizer() >>> X, y = vt.demodata.testdata_binary_scores() >>> encoder.fit(X, y) >>> kwargs = dict( >>> with_pr=True, interactive=True, with_roc=True, with_hist=True, >>> with_scores=False, with_prebayes=False, with_postbayes=False) >>> encoder.visualize(target_tpr=.95, **kwargs) >>> ut.show_if_requested() """ # import wbia.plottool as pt default_kw = dict( with_scores=False, with_roc=True, # with_roc=False, # with_precision_recall=True, with_precision_recall=False, # with_hist=False, with_hist=True, fnum=None, figtitle=None, # interactive=None, interactive=False, use_stems=None, attr_callback=None, with_prebayes=True, with_postbayes=True, score_range=None, bin_width=None, logscale=False, ) alias_dict = {'with_pr': 'with_precision_recall'} # inspect_kw = ut.update_existing(default_kw, kwargs, alias_dict=alias_dict) inspect_kw = ut.update_dict(default_kw, kwargs, alias_dict=alias_dict) print('inspect_kw = %r' % (inspect_kw,)) other_kw = ut.delete_dict_keys( kwargs.copy(), list(inspect_kw.keys()) + list(alias_dict.keys()) ) score_thresh, prob_thresh = encoder._hack_vizlearn(**other_kw) tup = encoder.get_partitioned_support() tp_support, tn_support, part_attrs = tup inter = inspect_pdfs( tn_support, tp_support, encoder.score_domain, encoder.p_tp_given_score, encoder.p_tn_given_score, encoder.p_score_given_tp, encoder.p_score_given_tn, encoder.p_score, prob_thresh=prob_thresh, score_thresh=score_thresh, part_attrs=part_attrs, thresh_kw=encoder.thresh_kw, **inspect_kw ) return inter
[docs]def partition_scores(X, y, attrs=None): """ convinience helper to translate partitioned to unpartitioned data Args: tp_scores (ndarray): tn_scores (ndarray): attrs (dict): (default = None) Returns: tuple: (scores, labels, attrs) CommandLine: python -m vtool.score_normalization --test-partition_scores Example: >>> # ENABLE_DOCTEST >>> from vtool.score_normalization import * # NOQA >>> X = np.array([5, 6, 6, 7, 1, 2, 2]) >>> attrs = {'qaid': np.array([21, 24, 25, 26, 11, 14, 15])} >>> y = np.array([1, 1, 1, 1, 0, 0, 0], dtype=np.bool_) >>> tup = partition_scores(X, y, attrs) >>> resdict = ut.odict(zip( >>> ['tp_scores', 'tn_scores', 'part_attrs'], tup)) >>> result = ub.repr2(resdict, nobraces=True, with_dtype=False, >>> explicit=1, nl=2) >>> print(result) tp_scores=np.array([5, 6, 6, 7]), tn_scores=np.array([1, 2, 2]), part_attrs=False: 'qaid': np.array([11, 14, 15]), True: 'qaid': np.array([21, 24, 25, 26]),, """ import vtool as vt import operator # Make partitioning unique_labels, groupxs = vt.group_indices(y) _grouper = partial(vt.apply_grouping, groupxs=groupxs) # Group data X_parts = _grouper(X) # Group attributes _nested_attrs = ut.map_dict_vals(_grouper, attrs) def _getitem(a, b): return operator.getitem(b, a) part_attrs = { label: ut.map_dict_vals(partial(_getitem, lblx), _nested_attrs) for lblx, label in enumerate(unique_labels) } assert len(unique_labels) == 2, 'exepcted two groups' assert not unique_labels[0], 'expected true negatives to be first' assert unique_labels[1], 'expected true positives to be second' # tn_scores, tp_scores = X_parts return tp_scores, tn_scores, part_attrs
[docs]def flatten_scores(tp_scores, tn_scores, part_attrs=None): """ convinience helper to translate partitioned to unpartitioned data Args: tp_scores (ndarray): tn_scores (ndarray): part_attrs (dict): (default = None) Returns: tuple: (scores, labels, attrs) CommandLine: python -m vtool.score_normalization --test-flatten_scores Example: >>> # ENABLE_DOCTEST >>> from vtool.score_normalization import * # NOQA >>> tp_scores = np.array([5, 6, 6, 7]) >>> tn_scores = np.array([1, 2, 2]) >>> part_attrs = { ... 1: {'qaid': [21, 24, 25, 26]}, ... 0: {'qaid': [11, 14, 15]}, ... } >>> tup = flatten_scores( ... tp_scores, tn_scores, part_attrs) >>> (X, y, attrs) = tup >>> y = y.astype(np.int) >>> resdict = ut.odict(zip(['X', 'y', 'attrs'], [X, y, attrs])) >>> result = ub.repr2(resdict, nobraces=True, with_dtype=False, >>> explicit=1, nl=1) >>> print(result) X=np.array([5, 6, 6, 7, 1, 2, 2]), y=np.array([1, 1, 1, 1, 0, 0, 0]), attrs='qaid': np.array([21, 24, 25, 26, 11, 14, 15]), """ scores = np.hstack([tp_scores, tn_scores]) labels = np.zeros(scores.size, dtype=np.bool_) labels[0 : len(tp_scores)] = True if part_attrs is None: return scores, labels else: tp_attrs = part_attrs[1] tn_attrs = part_attrs[0] assert (tp_attrs is None) == (tn_attrs is None), 'must specify both or none' assert sorted(tp_attrs.keys()) == sorted(tn_attrs.keys()), 'dicts do not agree' # attrs = ut.dict_isect_combine(tp_attrs, tn_attrs, combine_op=np.append) from functools import partial combine_op = partial(np.append, axis=0) attrs = ut.dict_isect_combine(tp_attrs, tn_attrs, combine_op=combine_op) num_attrs = np.array(list(map(len, attrs.values()))) assert np.all( num_attrs == len(scores) ), 'num_attrs=%r must agree with data. len(scores)=%r' % (num_attrs, len(scores)) return scores, labels, attrs
[docs]def learn_score_normalization( tp_support, tn_support, gridsize=1024, adjust=8, return_all=False, monotonize=True, clip_factor=(ut.PHI + 1), verbose=False, reverse=False, p_tp_method='eq', ): r""" Takes collected data and applys parzen window density estimation and bayes rule. #True positive scores must be larger than true negative scores. FIXME: might be an issue with pdfs summing to 1 here. Args: tp_support (ndarray): tn_support (ndarray): gridsize (int): default 512 adjust (int): default 8 return_all (bool): default False monotonize (bool): default True clip_factor (float): default phi ** 2 Returns: tuple: (score_domain, p_tp_given_score, p_tn_given_score, p_score_given_tp, p_score_given_tn, p_score) CommandLine: python -m vtool.score_normalization --test-learn_score_normalization Example: >>> # ENABLE_DOCTEST >>> from vtool.score_normalization import * # NOQA >>> tp_support = np.linspace(100, 10000, 512) >>> tn_support = np.linspace(0, 120, 512) >>> gridsize = 1024 >>> adjust = 8 >>> return_all = False >>> monotonize = True >>> clip_factor = 2.6180339887499997 >>> verbose = True >>> reverse = False >>> (score_domain, p_tp_given_score) = learn_score_normalization(tp_support, tn_support) >>> result = '%.2f' % (np.diff(p_tp_given_score).sum()) >>> print(result) 0.99 """ import vtool as vt if verbose: print('[scorenorm] Learning normalization pdf') print('[scorenorm] * tp_support.shape=%r' % (tp_support.shape,)) print('[scorenorm] * tn_support.shape=%r' % (tn_support.shape,)) print('[scorenorm] * estimating true positive pdf, ') print('[scorenorm] * monotonize = %r' % (monotonize,)) print('stats.tp_support = ' + ut.get_stats_str(tp_support, use_nan=True)) print('stats.tn_support = ' + ut.get_stats_str(tn_support, use_nan=True)) next_ = ut.next_counter(1) total = 8 # import utool # utool.embed() # Find good score domain range if True: min_score, max_score = find_clip_range( tp_support, tn_support, clip_factor, reverse ) else: min_score = min(tp_support.min(), tn_support.min()) max_score = min(tp_support.max(), tn_support.max()) score_domain = np.linspace(min_score, max_score, gridsize) # Estimate true positive/negative density if verbose: print('[scorenorm] %d/%d estimating true negative pdf' % (next_(), total)) score_tp_pdf = vt.estimate_pdf(tp_support, gridsize=gridsize, adjust=adjust) # assert score_tp_pdf.bw != 0, 'error bandwidth estimated to be 0' if verbose: print('[scorenorm] %d/%d estimating true negative pdf' % (next_(), total)) score_tn_pdf = vt.estimate_pdf(tn_support, gridsize=gridsize, adjust=adjust) # assert score_tn_pdf.bw != 0, 'error bandwidth estimated to be 0' if verbose: print('[scorenorm] %d/%d estimating score domain' % (next_(), total)) # Evaluate true negative density if verbose: print('[scorenorm] %d/%d evaluating tp density' % (next_(), total)) p_score_given_tp = score_tp_pdf.evaluate(score_domain) if verbose: print('[scorenorm] %d/%d evaluating tn density' % (next_(), total)) p_score_given_tn = score_tn_pdf.evaluate(score_domain) # Not sure why the pdfs returned from statsmodels dont integrate to 1 if verbose: print( '[sn.pre]stats.score_domain = ' + ut.get_stats_str(score_domain, use_nan=True, precision=5) ) print( '[sn.pre]stats:p_score_given_tn = ' + ut.get_stats_str(p_score_given_tn, use_nan=True, precision=5) ) print( '[sn.pre]stats:p_score_given_tp = ' + ut.get_stats_str(p_score_given_tp, use_nan=True, precision=5) ) # print('stats.tn_support = ' + ut.get_stats_str(tn_support, use_nan=True)) problems = [] try: assert not np.any(np.isnan(p_score_given_tp)), 'Need more positive support' except AssertionError as ex: # NOQA print( '[sn.pre]stats:tpsupport = ' + ut.get_stats_str(score_tp_pdf.support, use_nan=True, precision=5) ) problems += [str(ex)] try: assert not np.any(np.isnan(p_score_given_tn)), 'Need more negative support' except AssertionError as ex: # NOQA print( '[sn.pre]stats:tnsupport = ' + ut.get_stats_str(score_tn_pdf.support, use_nan=True, precision=5) ) problems += [str(ex)] if problems: raise AssertionError(', '.join(problems)) if True: # Make sure we still have probability functions area_tp = np.trapz(p_score_given_tp, score_domain) area_tn = np.trapz(p_score_given_tn, score_domain) if verbose: print('pre.area_tp = %r' % (area_tp,)) print('pre.area_tn = %r' % (area_tn,)) # normalize to ensure p_score_given_tp = p_score_given_tp / area_tp p_score_given_tn = p_score_given_tn / area_tn area_tp = np.trapz(p_score_given_tp, score_domain) area_tn = np.trapz(p_score_given_tn, score_domain) # if ut.DEBUG2: if verbose: print('norm.area_tp = %r' % (area_tp,)) print('norm.area_tn = %r' % (area_tn,)) assert np.isclose(area_tp, 1.0), area_tp assert np.isclose(area_tn, 1.0), area_tn if verbose: print('[scorenorm] %d/%d evaluating posterior probabilities' % (next_(), total)) print('p_tp_method = %r' % (p_tp_method,)) # For inbalanced data there are several methods we might want to use to # calculate p_tp. not always going to be equal probability of true and # positive cases if p_tp_method == 'eq': p_tp = 0.5 elif p_tp_method == 'ratio': p_tp = len(tp_support) / (len(tp_support) + len(tn_support)) else: raise NotImplementedError('p_tp_method = %r' % (p_tp_method,)) # Average to get probablity of any score p_score = (np.array(p_score_given_tp) + np.array(p_score_given_tn)) / 2.0 # Apply bayes p_tp_given_score = ut.bayes_rule(p_score_given_tp, p_tp, p_score) if ut.DEBUG2: assert np.isclose(np.trapz(p_score, score_domain), 1.0) assert np.isclose(np.trapz(p_score, p_tp_given_score), 1.0) if np.any(np.isnan(p_tp_given_score)): p_tp_given_score = vt.interpolate_nans(p_tp_given_score) if verbose: # np.trapz(p_tp_given_score / np.trapz(p_tp_given_score, score_domain), score_domain) print( 'stats:p_score_given_tn = ' + ut.get_stats_str(p_score_given_tn, newlines=0, use_nan=True, precision=5) ) print( 'stats:p_score_given_tp = ' + ut.get_stats_str(p_score_given_tp, newlines=0, use_nan=True, precision=5) ) print( 'stats:p_score = ' + ut.get_stats_str(p_score, newlines=0, use_nan=True, precision=5) ) print( 'stats:p_tp_given_score = ' + ut.get_stats_str(p_tp_given_score, newlines=0, use_nan=True, precision=5) ) if monotonize: if reverse: if verbose: print('[scorenorm] %d/%d monotonize decreasing' % (next_(), total)) p_tp_given_score = vt.ensure_monotone_strictly_decreasing( p_tp_given_score, left_endpoint=1.0, right_endpoint=0.0 ) else: if verbose: print('[scorenorm] %d/%d monotonize increasing' % (next_(), total)) # if False: # flags = ~np.isnan(p_tp_given_score) # pt.plot(score_domain[flags], p_tp_given_score[flags]) # pt.plot(score_domain, p_tp_given_score) p_tp_given_score = vt.ensure_monotone_strictly_increasing( p_tp_given_score, left_endpoint=0.0, right_endpoint=1.0 ) if return_all: if verbose: print('[scorenorm] %d/%d returning all' % (next_(), total)) p_tn_given_score = 1 - p_tp_given_score return ( score_domain, p_tp_given_score, p_tn_given_score, p_score_given_tp, p_score_given_tn, p_score, ) else: if verbose: print('[scorenorm] %d/%d returning minimum' % (next_(), total)) return (score_domain, p_tp_given_score)
[docs]def find_clip_range(tp_support, tn_support, clip_factor=ut.PHI + 1, reverse=None): """ TODO: generalize to arbitrary domains (not just 0->inf) Finds score to clip true positives past. This is useful when the highest true positive scores can be much larger than the highest true negative score. Args: tp_support (ndarray): tn_support (ndarray): clip_factor (float): factor of the true negative domain to search for true positives Returns: tuple: min_score, max_score CommandLine: python -m vtool.score_normalization --test-find_clip_range Example: >>> # ENABLE_DOCTEST >>> from vtool.score_normalization import * # NOQA >>> tp_support = np.array([100, 200, 50000]) >>> tn_support = np.array([10, 30, 110]) >>> clip_factor = ut.PHI + 1 >>> min_score, max_score = find_clip_range(tp_support, tn_support, clip_factor) >>> result = '%.4f, %.4f' % ((min_score, max_score)) >>> print(result) 10.0000, 287.9837 """ if reverse is None: mean_tp_score = tp_support.mean() mean_tn_score = tn_support.mean() reverse = mean_tp_score < mean_tn_score if not reverse: # Normal case where higher scores is better high_scores = tp_support low_scores = tn_support else: high_scores = tn_support low_scores = tp_support max_high_score = high_scores.max() max_low_score = low_scores.max() min_high_score = high_scores.min() min_low_score = low_scores.min() abs_max_score = max(max_high_score, max_low_score) abs_min_score = min(min_high_score, min_low_score) if clip_factor is None: min_score = abs_min_score max_score = abs_max_score return min_score, max_score # FIXME: allow for true positive scores to be low, or not bounded at 0 # Do not clip if true negatives can score higher than true positives if max_low_score < max_high_score: # overshoot_factor = (max_high_score - abs_min_score) / (max_low_score - abs_min_score) overshoot_factor = max_high_score / max_low_score if overshoot_factor > clip_factor: max_score = max_low_score * clip_factor else: max_score = max_high_score min_score = abs_min_score # if min_low_score < min_high_score: # overshoot_factor = min_low_score / min_high_score # if overshoot_factor > clip_factor: # min_score = min_high_score * clip_factor # else: # min_score = min_low_score return min_score, max_score
[docs]def normalize_scores(score_domain, p_tp_given_score, scores, interp_fn=None): """ Adjusts a raw scores to a probabilities based on a learned normalizer Args: score_domain (ndarray): input score domain p_tp_given_score (ndarray): learned probability mapping scores (ndarray): raw scores Returns: ndarray: probabilities CommandLine: python -m vtool.score_normalization --test-normalize_scores Example: >>> # DISABLE_DOCTEST >>> from vtool.score_normalization import * # NOQA >>> score_domain = np.linspace(0, 10, 10) >>> p_tp_given_score = (score_domain ** 2) / (score_domain.max() ** 2) >>> scores = np.array([-1, 0.0, 0.01, 2.3, 8.0, 9.99, 10.0, 10.1, 11.1]) >>> prob = normalize_scores(score_domain, p_tp_given_score, scores) >>> #np.set_printoptions(suppress=True) >>> result = ub.repr2(prob, precision=2, suppress_small=True) >>> print(result) >>> # xdoctest: +REQUIRES(--show) >>> import wbia.plottool as pt >>> pt.plot2(score_domain, p_tp_given_score, 'r-x', equal_aspect=False, label='learned probability') >>> pt.plot2(scores, prob, 'yo', equal_aspect=False, title='Normalized scores', pad=.2, label='query points') >>> pt.legend('upper left') >>> ut.show_if_requested() np.array([ 0. , 0. , 0. , 0.05, 0.64, 1. , 1. , 1. , 1. ], dtype=np.float64) """ # prob = np.zeros(len(scores)) prob = np.zeros(len(scores)) # prob = np.full(len(scores), np.nan) is_nan = np.isnan(scores) # Check score domain bounds is_low = scores < score_domain[0] is_high = scores > score_domain[-1] is_inbounds = np.logical_not(np.logical_or.reduce((is_nan, is_low, is_high))) # interpolate scores in the learned domain # we are garuenteed to have inbounds nonzero elements here if True: if interp_fn is None: # TODO build custom interpolator with correct bound checks interp_fn = scipy.interpolate.interp1d( score_domain, p_tp_given_score, kind='linear', copy=False, assume_sorted=True, ) prob[is_inbounds] = interp_fn(scores[is_inbounds]) else: flags = score_domain <= scores[is_inbounds][:, None] left_indicies = np.array([np.nonzero(row)[0][-1] for row in flags]) prob[is_inbounds] = p_tp_given_score[left_indicies] # currently just taking the min # fill in other values assert not np.any(is_nan), 'cannot normalize nan values' # if any(is_nan): # # handle nans # raise AssertionError('user normalize score list') # prob[np.isnan(score_domain)] = -1.0 # clip low scores at 0 prob[is_low] = 0 # clip high scores by between max probability and one prob[is_high] = (p_tp_given_score[-1] + 1.0) / 2.0 return prob
# DEBUGGING FUNCTIONS
[docs]def test_score_normalization( tp_support, tn_support, with_scores=True, verbose=True, with_roc=True, with_precision_recall=False, figtitle=None, normkw_varydict=None, ): """ Gives an overview of how well threshold can be learned from raw scores. DEPRICATE CommandLine: python -m vtool.score_normalization --test-test_score_normalization --show CommandLine: xdoctest -m ~/code/vtool/vtool/score_normalization.py test_score_normalization Ignore: >>> # GUI_DOCTEST >>> # Shows how score normalization works with gaussian noise >>> from vtool.score_normalization import * # NOQA >>> verbose = True >>> randstate = np.random.RandomState(seed=0) >>> # Get a training sample >>> tp_support = randstate.normal(loc=6.5, size=(256,)) >>> tn_support = randstate.normal(loc=3.5, size=(256,)) >>> # xdoctest: +REQUIRES(module:plottool) >>> test_score_normalization(tp_support, tn_support, verbose=verbose) >>> ut.show_if_requested() """ import wbia.plottool as pt # NOQA # Print raw score statistics print('tp_support') print(ut.repr4(ut.get_stats(tp_support))) print('tn_support') print(ut.repr4(ut.get_stats(tn_support))) # Test (potentially multiple) normalizing configurations if normkw_varydict is None: normkw_varydict = { 'monotonize': [False], # [True, False], #'adjust': [1, 4, 8], 'adjust': [1], #'adjust': [8], } normkw_list = ut.util_dict.all_dict_combinations(normkw_varydict) if len(normkw_list) > 32: raise AssertionError('Too many plots to test!') for normkw in normkw_list: # Learn the appropriate normalization tup = learn_score_normalization( tp_support, tn_support, return_all=True, verbose=verbose, **normkw ) ( score_domain, p_tp_given_score, p_tn_given_score, p_score_given_tp, p_score_given_tn, p_score, ) = tup if verbose: print('plotting pdfs') fnum = pt.next_fnum() inspect_pdfs( tn_support, tp_support, score_domain, p_tp_given_score, p_tn_given_score, p_score_given_tp, p_score_given_tn, p_score, with_scores=with_scores, with_roc=with_roc, with_precision_recall=with_precision_recall, fnum=fnum, ) if figtitle is not None: pt.set_figtitle(figtitle) else: pt.set_figtitle('ScoreNorm test' + ub.repr2(normkw, newlines=False)) locals_ = locals() return locals_
# -------- # Plotting # --------
[docs]def plot_prebayes_pdf( score_domain, p_score_given_tn, p_score_given_tp, p_score, cfgstr='', fnum=None, pnum=(1, 1, 1), **kwargs ): import wbia.plottool as pt if fnum is None: fnum = pt.next_fnum() true_color = pt.TRUE_BLUE # pt.TRUE_GREEN false_color = pt.FALSE_RED # unknown_color = pt.UNKNOWN_PURP unknown_color = pt.PURPLE2 # unknown_color = pt.GRAY pt.plots.plot_probabilities( (p_score_given_tn, p_score_given_tp, p_score), ('p(score | tn)', 'p(score | tp)', 'p(score)'), prob_colors=(false_color, true_color, unknown_color), # figtitle='pre_bayes pdf score', figtitle='p(score | truth)', xdata=score_domain, fnum=fnum, pnum=pnum, **kwargs )
[docs]def plot_postbayes_pdf( score_domain, p_tn_given_score, p_tp_given_score, score_thresh=None, prob_thresh=None, cfgstr='', fnum=None, pnum=(1, 1, 1), ): import wbia.plottool as pt if fnum is None: fnum = pt.next_fnum() true_color = pt.TRUE_BLUE # pt.TRUE_GREEN false_color = pt.FALSE_RED pt.plots.plot_probabilities( (p_tn_given_score, p_tp_given_score), ('p(tn | score)', 'p(tp | score)'), prob_colors=( false_color, true_color, ), # figtitle='post_bayes pdf score ' + cfgstr, figtitle='p(truth | score)' + cfgstr, xdata=score_domain, fnum=fnum, pnum=pnum, score_thresh=score_thresh, prob_thresh=prob_thresh, )
[docs]def inspect_pdfs( tn_support, tp_support, score_domain, p_tp_given_score, p_tn_given_score, p_score_given_tp, p_score_given_tn, p_score, prob_thresh=None, score_thresh=None, with_scores=False, with_roc=False, with_precision_recall=False, with_hist=False, fnum=None, figtitle=None, interactive=None, use_stems=None, part_attrs=None, thresh_kw=None, attr_callback=None, with_prebayes=True, with_postbayes=True, score_range=None, **kwargs ): r""" Shows plots of learned thresholds CommandLine: python -m vtool.score_normalization --test-ScoreNormalizer --show python -m vtool.score_normalization --exec-ScoreNormalizer.visualize --show """ import wbia.plottool as pt # NOQA from wbia.plottool.interactions import ExpandableInteraction from wbia.plottool.abstract_interaction import AbstractInteraction import vtool as vt import wbia.plottool as pt # NOQA if fnum is None: fnum = pt.next_fnum() with_normscore = with_scores # with_prebayes = True # with_postbayes = True nSubplots = ( with_normscore + with_prebayes + with_postbayes + with_scores + with_roc + with_precision_recall + with_hist ) if nSubplots == 0: raise ValueError('Must choose at least one subplot') nRows, nCols = pt.get_square_row_cols(nSubplots) _pnumiter = pt.make_pnum_nextgen(nRows=nRows, nCols=nCols, nSubplots=nSubplots) # print('Always interactive even if: interactive = %r' % (interactive,)) # Make a plottool interaction inter = ExpandableInteraction(fnum, _pnumiter) scores = np.hstack([tn_support, tp_support]) labels = np.array([False] * len(tn_support) + [True] * len(tp_support)) # probs = encoder.normalize_scores(scores) probs = normalize_scores(score_domain, p_tp_given_score, scores) confusions = vt.ConfusionMetrics().fit(probs, labels) # Hack change confusion prob thresholds to score thresholds if False: # Fixme: assume sorted inverse_interp = scipy.interpolate.interp1d( p_tp_given_score, score_domain, kind='linear', copy=False, assume_sorted=False ) confusions._orig_thresholds = confusions.thresholds confusions.thresholds = inverse_interp(confusions.thresholds) confusions._hackscores = scores true_color = pt.TRUE_BLUE # pt.TRUE_GREEN false_color = pt.FALSE_RED support_kw = dict( score_lbls=kwargs.get('score_lbls', ('trueneg', 'truepos')), score_colors=(false_color, true_color), logscale=kwargs.get('logscale', False), ) support_sort_kw = dict( score_markers=['^', 'v'], markersizes=[5, 5], use_stems=use_stems, **support_kw ) class SortedScoreSupportInteraction(AbstractInteraction): def __init__(data_pdf, **kwargs): super(SortedScoreSupportInteraction, data_pdf).__init__(**kwargs) data_pdf.tn_support = tn_support data_pdf.tp_support = tp_support data_pdf.part_attrs = part_attrs data_pdf.attr_callback = attr_callback data_pdf.sel_mode = 'tn' def toggle_mode(data_pdf): if data_pdf.sel_mode == 'tn': data_pdf.sel_mode = 'tp' else: data_pdf.sel_mode = 'tn' print('TOGGLE data_pdf.sel_mode = %r' % (data_pdf.sel_mode,)) @staticmethod def static_plot(fnum, pnum): pt.plots.plot_sorted_scores( (tn_support, tp_support), fnum=fnum, pnum=pnum, score_label='score', thresh=score_thresh, **support_sort_kw ) def plot(data_pdf, fnum, pnum): data_pdf.static_plot(fnum, pnum) def on_key_press(data_pdf, event): # print('event = %r' % (event,)) # print('event.key = %r' % (event.key,)) if event.key == 't': data_pdf.toggle_mode() def on_click_inside(data_pdf, event, ex): import vtool as vt # ax = event.inaxes # for l in ax.get_lines(): # print(l.get_label()) tp_index, tp_dist = vt.closest_point( event.ydata, data_pdf.tp_support[:, None] ) tn_index, tn_dist = vt.closest_point( event.ydata, data_pdf.tn_support[:, None] ) print('closest tp_index = %r, %r' % (tp_index, tp_dist)) print('closest tn_index = %r, %r' % (tn_index, tn_dist)) SEL_TP = data_pdf.sel_mode == 'tp' print('data_pdf.sel_mode = %r' % (data_pdf.sel_mode,)) if SEL_TP: tp_attrs = data_pdf.part_attrs[True] if len(tp_attrs) == 0: print('No positive attrs') subattrs = ut.get_dict_column(tp_attrs, tp_index) else: tn_attrs = data_pdf.part_attrs[False] if len(tn_attrs) == 0: print('No negative attrs') subattrs = ut.get_dict_column(tn_attrs, tn_index) print('subattrs = %r' % (subattrs,)) if data_pdf.attr_callback is not None: print('Executing callback') data_pdf.attr_callback(**subattrs) # dists = vt.L1(event.ydata, data_pdf.tp_support[:, None]) # index = dists.argsort()[0] # event.xdata # Find the nearest label pass # target_tpr = None target_tpr = confusions.get_metric_at_thresh('tpr', prob_thresh) # print('target_tpr = %r' % (target_tpr,)) ROCInteraction = vt.interact_roc_factory( confusions, target_tpr, show_operating_point=True ) def _score_support_hist(fnum, pnum): overlay_score_domain = None score_thresh_ = None if kwargs.get('histoverlay', True): overlay_score_domain = score_domain score_thresh_ = score_thresh print('support_kw = %r' % (support_kw,)) pt.plot_score_histograms( (tn_support, tp_support), score_thresh=score_thresh_, score_label='score', fnum=fnum, pnum=pnum, bin_width=kwargs.get('bin_width', None), num_bins=kwargs.get('num_bins', None), overlay_prob_given_list=(p_score_given_tn, p_score_given_tp), overlay_score_domain=overlay_score_domain, xlim=score_range, histnorm=kwargs.get('histnorm', False), **support_kw ) def _prob_support_hist(fnum, pnum): tp_probs = probs[labels] tn_probs = probs[np.logical_not(labels)] pt.plot_score_histograms( (tn_probs, tp_probs), score_label='prob', fnum=fnum, pnum=pnum, bin_width=kwargs.get('bin_width', None), num_bins=kwargs.get('num_bins', None), **support_kw ) def _prob_support_sorted(fnum, pnum): tp_probs = probs[labels] tn_probs = probs[np.logical_not(labels)] pt.plots.plot_sorted_scores( (tn_probs, tp_probs), fnum=fnum, pnum=pnum, score_label='prob', thresh=prob_thresh, **support_sort_kw ) # ax = pt.gca() # max_score = max(tn_support.max(), tp_support.max()) # ax.set_ylim(-max_score, max_score) def _prebayes(fnum, pnum): plot_prebayes_pdf( score_domain, p_score_given_tn, p_score_given_tp, p_score, score_thresh=score_thresh, cfgstr='', fnum=fnum, pnum=pnum, ) def _postbayes(fnum, pnum): plot_postbayes_pdf( score_domain, p_tn_given_score, p_tp_given_score, prob_thresh=prob_thresh, score_thresh=score_thresh, cfgstr='', fnum=fnum, pnum=pnum, ) def _precision_recall(fnum, pnum): confusions.draw_precision_recall_curve(fnum=fnum, pnum=pnum) if with_scores: inter.append_plot(SortedScoreSupportInteraction) if with_hist: inter.append_plot(_score_support_hist) if with_prebayes: inter.append_plot(_prebayes) if with_postbayes: inter.append_plot(_postbayes) if with_normscore: inter.append_plot(_prob_support_sorted) # inter.append_plot(_prob_support_hist) if with_roc: inter.append_plot(ROCInteraction) if with_precision_recall: inter.append_plot(_precision_recall) inter.start() if figtitle is not None: pt.set_figtitle(figtitle) return inter
[docs]def estimate_pdf(data, gridsize=1024, adjust=1): """ estimate_pdf References; http://statsmodels.sourceforge.net/devel/generated/statsmodels.nonparametric.kde.KDEUnivariate.html https://jakevdp.github.io/blog/2013/12/01/kernel-density-estimation/ Args: data (ndarray): 1 dimensional data of float64 gridsize(int): domain size adjust(int): smoothing factor Returns: ndarray: data_pdf Example: >>> # xdoctest: +REQUIRES(module:pyhesaff) >>> from vtool.score_normalization import * # NOQA >>> import vtool as vt >>> rng = np.random.RandomState(0) >>> data = rng.randn(1000) >>> data_pdf = vt.estimate_pdf(data) >>> # xdoctest: +REQUIRES(--show) >>> import wbia.plottool as pt >>> pt.plot(data_pdf.support[:-1], np.diff(data_pdf.cdf)) >>> ut.show_if_requested() """ import utool as ut import numpy as np import statsmodels.nonparametric.kde import statsmodels.nonparametric.bandwidths # import scipy.stats as spstats # import statsmodelskde.score_samples(support[:, None]) if True: # Ensure that a non-zero bandwidth is chosen # bw_choices = ['scott', 'silverman', 'normal_reference'] # bw = bw_choices[1] for bw in ['silverman', 'scott']: bw_value = statsmodels.nonparametric.bandwidths.select_bandwidth( data, bw, None ) if bw_value > 0: break if bw_value == 0: sorted_diffs = np.diff(sorted(data)) nonzero_diffs = sorted_diffs[sorted_diffs > 0] if len(nonzero_diffs) > 0: median_diff = np.median(nonzero_diffs) bw_value = np.sqrt(median_diff) else: # use a very small value bw_value = 1e-9 try: if False: # Alternate implementation in case statsmodels breaks class TempPdf: def __init__(data_pdf, data, bw_value, gridsize): from sklearn.neighbors.kde import KernelDensity kde = KernelDensity(kernel='gaussian', bandwidth=bw_value) kde.fit(data[:, None]) data_pdf.kde = kde data_pdf.support = np.linspace(data.min(), data.max(), gridsize) data_pdf.density = data_pdf.evaluate(data_pdf.support) # import scipy as sp data_pdf.cdf = data_pdf.density.cumsum() # data_pdf.cdf = sp.integrate.cumtrapz(data_pdf.density, # data_pdf.support) def evaluate(data_pdf, scores): return np.exp(data_pdf.kde.score_samples(scores[:, None])) data_pdf = TempPdf(data, bw_value, gridsize) data_pdf = statsmodels.nonparametric.kde.KDEUnivariate(data) fitkw = dict( kernel='gau', bw=bw_value, fft=True, weights=None, adjust=adjust, cut=3, gridsize=gridsize, clip=(-np.inf, np.inf), ) data_pdf.fit(**fitkw) except Exception as ex: ut.printex( ex, '! Exception while estimating kernel density', keys=['data', 'gridsize', 'bw_value', 'adjust'], ) raise return data_pdf
if __name__ == '__main__': """ CommandLine: xdoctest -m vtool.score_normalization """ import xdoctest xdoctest.doctest_module(__file__)