# -*- coding: utf-8 -*-
"""
vt
python -m utool.util_inspect check_module_usage --pat="matching.py"
"""
from __future__ import absolute_import, division, print_function
from vtool import _rhomb_dist
import six
import warnings
import pandas as pd
import utool as ut
import ubelt as ub
import numpy as np
import parse
from collections import namedtuple
from .util_math import TAU
try:
import pyhesaff
except ImportError:
pyhesaff = None
pass
AssignTup = namedtuple('AssignTup', ('fm', 'match_dist', 'norm_fx1', 'norm_dist'))
[docs]class MatchingError(Exception):
pass
if pyhesaff is not None:
VSONE_FEAT_CONFIG = [
ut.ParamInfo(key, val)
for key, val in pyhesaff.get_hesaff_default_params().items()
]
else:
VSONE_FEAT_CONFIG = []
VSONE_ASSIGN_CONFIG = [
ut.ParamInfo('checks', 20),
ut.ParamInfo('symmetric', False),
ut.ParamInfo(
'weight',
None,
valid_values=[None, 'fgweights'],
),
ut.ParamInfo('K', 1, min_=1),
ut.ParamInfo('Knorm', 1, min_=1),
]
VSONE_RATIO_CONFIG = [
ut.ParamInfo('ratio_thresh', 0.625, min_=0.0, max_=1.0),
]
VSONE_SVER_CONFIG = [
ut.ParamInfo('sv_on', True),
ut.ParamInfo('thresh_bins', tuple(), type_=eval, hideif=lambda cfg: not cfg['sv_on']),
ut.ParamInfo(
'refine_method',
'homog',
valid_values=['homog', 'affine'],
hideif=lambda cfg: not cfg['sv_on'],
),
ut.ParamInfo(
'sver_xy_thresh', 0.01, min_=0.0, max_=None, hideif=lambda cfg: not cfg['sv_on']
),
ut.ParamInfo(
'sver_ori_thresh',
TAU / 4.0,
min_=0.0,
max_=TAU,
hideif=lambda cfg: not cfg['sv_on'],
),
ut.ParamInfo(
'sver_scale_thresh', 2.0, min_=1.0, max_=None, hideif=lambda cfg: not cfg['sv_on']
),
]
NORM_CHIP_CONFIG = [
# ---
ut.ParamInfo('histeq', False, hideif=False),
# ---
ut.ParamInfo('adapteq', False, hideif=False),
ut.ParamInfo('adapteq_ksize', 8, hideif=lambda cfg: not cfg['adapteq']),
ut.ParamInfo('adapteq_limit', 2.0, hideif=lambda cfg: not cfg['adapteq']),
# ---
ut.ParamInfo('medianblur', False, hideif=False),
ut.ParamInfo('medianblur_thresh', 50, hideif=lambda cfg: not cfg['medianblur']),
ut.ParamInfo('medianblur_ksize1', 3, hideif=lambda cfg: not cfg['medianblur']),
ut.ParamInfo('medianblur_ksize2', 5, hideif=lambda cfg: not cfg['medianblur']),
# ---
]
VSONE_DEFAULT_CONFIG = VSONE_ASSIGN_CONFIG + VSONE_RATIO_CONFIG + VSONE_SVER_CONFIG
VSONE_PI_DICT = {pi.varname: pi for pi in VSONE_DEFAULT_CONFIG}
[docs]def demodata_match(cfgdict={}, apply=True, use_cache=True, recompute=False):
import vtool as vt
from vtool.inspect_matches import lazy_test_annot
# hashid based on the state of the code
if not apply:
use_cache = False
function_sig = ut.get_func_sourcecode(vt.matching.PairwiseMatch)
hashid = ut.hash_data(function_sig)
ub.util_hash._HASHABLE_EXTENSIONS._register_agressive_extensions()
cfgstr = ub.hash_data(cfgdict) + hashid
cacher = ub.Cacher('test_match_v5', cfgstr=cfgstr, appname='vtool', enabled=use_cache)
match = cacher.tryload()
annot1 = lazy_test_annot('easy1.png')
annot2 = lazy_test_annot('easy2.png')
if match is None or recompute:
match = vt.PairwiseMatch(annot1, annot2)
if apply:
match.apply_all(cfgdict)
cacher.save(match)
else:
match.annot1 = annot1
match.annot2 = annot2
return match
[docs]class PairwiseMatch(ub.NiceRepr):
"""
Newest (Sept-16-2016) object oriented one-vs-one matching interface
Creates an object holding two annotations
Then a pipeline of operations can be applied to
generate score and refine the matches
Note:
The annotation dictionaries are required to have certain attributes.
Required annotation attributes:
(kpts, vecs) OR rchip OR rchip_fpath
Optional annotation attributes:
aid, nid, flann, rchip, dlen_sqrd, weight
Ignore:
>>> from vtool.matching import * # NOQA
>>> import vtool as vt
>>> imgR = vt.imread(ut.grab_test_imgpath('easy1.png'))
>>> imgL = vt.imread(ut.grab_test_imgpath('easy2.png'))
>>> annot1 = {'rchip': imgR}
>>> annot2 = {'rchip': imgL}
>>> match = vt.PairwiseMatch(annot1, annot2)
>>> match.apply_all({'refine_method': 'affine', 'affine_invariance': False, 'rotation_invariance': False})
>>> dsize = imgR.shape[0:2][::-1]
>>> imgR_warp = vt.warpHomog(imgR, match.H_12, dsize)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(imgL, pnum=(2, 1, 1))
>>> kwplot.imshow(imgR_warp, pnum=(2, 1, 2))
>>> kwplot.imshow(imgL, pnum=(2, 1, 1))
>>> kwplot.imshow(imgR_warp, pnum=(2, 1, 2))
>>> # xdoctest: +REQUIRES(--gui)
>>> import wbia.guitool as gt
>>> gt.ensure_qapp()
>>> match.ishow()
>>> from vtool.matching import * # NOQA
>>> import vtool as vt
>>> imgR = vt.imread(ut.grab_test_imgpath('easy1.png'))
>>> imgL = vt.imread(ut.grab_test_imgpath('easy2.png'))
>>> annot1 = {'rchip': imgR}
>>> annot2 = {'rchip': imgL}
>>> match = vt.PairwiseMatch(annot1, annot2)
>>> match.apply_all({'refine_method': 'affine', 'affine_invariance': False, 'rotation_invariance': False})
>>> dsize = imgR.shape[0:2][::-1]
>>> imgR_warp = vt.warpHomog(imgR, match.H_12, dsize)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(imgL, pnum=(2, 1, 1))
>>> kwplot.imshow(imgR_warp, pnum=(2, 1, 2))
>>> kwplot.imshow(imgL, pnum=(2, 1, 1))
>>> kwplot.imshow(imgR_warp, pnum=(2, 1, 2))
>>> # xdoctest: +REQUIRES(--gui)
>>> import wbia.guitool as gt
>>> gt.ensure_qapp()
>>> match.ishow()
"""
def __init__(match, annot1=None, annot2=None):
if not isinstance(annot1, ut.LazyDict):
annot1 = ut.LazyDict(annot1)
if not isinstance(annot2, ut.LazyDict):
annot2 = ut.LazyDict(annot2)
match.annot1 = annot1
match.annot2 = annot2
match.fm = None
match.fs = None
match.H_21 = None
match.H_12 = None
match.verbose = False
match.local_measures = ub.odict([])
match.global_measures = ub.odict([])
match._inplace_default = False
@staticmethod
def _available_params():
return VSONE_PI_DICT
@staticmethod
def _take_params(config, keys):
"""
take parameter info from config using default values defined in module
constants.
"""
# if isinstance(keys, six.string_types):
# keys = keys.split(', ')
for key in keys:
yield config[key] if key in config else VSONE_PI_DICT[key].default
def __nice__(match):
parts = []
if 'aid' in match.annot1:
aid1 = match.annot1['aid']
aid2 = match.annot2['aid']
vsstr = '%s-vs-%s' % (aid1, aid2)
parts.append(vsstr)
parts.append('None' if match.fm is None else six.text_type(len(match.fm)))
return ' '.join(parts)
def __len__(match):
if match.fm is not None:
return len(match.fm)
else:
return 0
def __getstate__(match):
"""
The state of Pariwise Match ignores most of the annotation objects.
This means that if you need properties of annots, you must reapply
them after you load a PairwiseMatch object.
"""
def _prepare_annot(annot):
if isinstance(annot, ut.LazyDict):
_annot = {}
if 'aid' in annot:
_annot['aid'] = annot['aid']
return _annot
return annot
_annot1 = _prepare_annot(match.annot1)
_annot2 = _prepare_annot(match.annot2)
state = {
'annot1': _annot1,
'annot2': _annot2,
'fm': match.fm,
'fs': match.fs,
'H_21': match.H_21,
'H_12': match.H_12,
'global_measures': match.global_measures,
'local_measures': match.local_measures,
}
return state
def __setstate__(match, state):
match.__dict__.update(state)
match.verbose = False
[docs] def show(
match,
ax=None,
show_homog=False,
show_ori=False,
show_ell=True,
show_pts=False,
show_lines=True,
show_rect=False,
show_eig=False,
show_all_kpts=False,
mask_blend=0,
ell_alpha=0.6,
line_alpha=0.35,
modifysize=False,
vert=None,
overlay=True,
heatmask=False,
line_lw=1.4,
):
if match.verbose:
print('[match] show')
import wbia.plottool as pt
annot1 = match.annot1
annot2 = match.annot2
try:
rchip1 = annot1['nchip']
rchip2 = annot2['nchip']
except KeyError:
print('Warning: nchip not set. fallback to rchip')
rchip1 = annot1['rchip']
rchip2 = annot2['rchip']
if overlay:
kpts1 = annot1['kpts']
kpts2 = annot2['kpts']
else:
kpts1 = kpts2 = None
show_homog = False
show_ori = False
show_ell = False
show_pts = False
show_lines = False
show_rect = False
show_eig = False
# show_all_kpts = False
# mask_blend = 0
if mask_blend:
import vtool as vt
mask1 = vt.resize(annot1['probchip_img'], vt.get_size(rchip1))
mask2 = vt.resize(annot2['probchip_img'], vt.get_size(rchip2))
# vt.blend_images_average(vt.mask1, 1.0, alpha=mask_blend)
rchip1 = vt.blend_images_mult_average(rchip1, mask1, alpha=mask_blend)
rchip2 = vt.blend_images_mult_average(rchip2, mask2, alpha=mask_blend)
fm = match.fm
fs = match.fs
H1 = match.H_12 if show_homog else None
# H2 = match.H_21 if show_homog else None
ax, xywh1, xywh2 = pt.show_chipmatch2(
rchip1,
rchip2,
kpts1,
kpts2,
fm,
fs,
colorbar_=False,
H1=H1,
ax=ax,
modifysize=modifysize,
ori=show_ori,
rect=show_rect,
eig=show_eig,
ell=show_ell,
pts=show_pts,
draw_lines=show_lines,
all_kpts=show_all_kpts,
line_alpha=line_alpha,
ell_alpha=ell_alpha,
vert=vert,
heatmask=heatmask,
line_lw=line_lw,
)
return ax, xywh1, xywh2
[docs] def ishow(match):
"""
CommandLine:
python -m vtool.matching ishow --show
Example:
>>> # SCRIPT
>>> # xdoctest: +REQUIRES(module:wbia)
>>> from vtool.matching import * # NOQA
>>> import vtool as vt
>>> import wbia.guitool as gt
>>> gt.ensure_qapp()
>>> match = demodata_match(use_cache=False)
>>> self = match.ishow()
>>> self.disp_config['show_homog'] = True
>>> self.update()
>>> # xdoctest: +REQUIRES(--show)
>>> gt.qtapp_loop(qwin=self, freq=10)
"""
from vtool.inspect_matches import MatchInspector
self = MatchInspector(match=match)
self.show()
return self
[docs] def add_global_measures(match, global_keys):
for key in global_keys:
match.global_measures[key] = (match.annot1[key], match.annot2[key])
[docs] def add_local_measures(match, xy=True, scale=True):
import vtool as vt
if xy:
key_ = 'norm_xys'
norm_xy1 = match.annot1[key_].take(match.fm.T[0], axis=1)
norm_xy2 = match.annot2[key_].take(match.fm.T[1], axis=1)
match.local_measures['norm_x1'] = norm_xy1[0]
match.local_measures['norm_y1'] = norm_xy1[1]
match.local_measures['norm_x2'] = norm_xy2[0]
match.local_measures['norm_y2'] = norm_xy2[1]
if scale:
kpts1_m = match.annot1['kpts'].take(match.fm.T[0], axis=0)
kpts2_m = match.annot2['kpts'].take(match.fm.T[1], axis=0)
match.local_measures['scale1'] = vt.get_scales(kpts1_m)
match.local_measures['scale2'] = vt.get_scales(kpts2_m)
[docs] def matched_vecs2(match):
return match.annot2['vecs'].take(match.fm.T[1], axis=0)
def _next_instance(match, inplace=None):
"""
Returns either the same or a new instance of a match object with the
same global attributes.
"""
if inplace is None:
inplace = match._inplace_default
if inplace:
match_ = match
else:
match_ = match.__class__(match.annot1, match.annot2)
match_.H_21 = match.H_21
match_.H_12 = match.H_12
match_._inplace_default = match._inplace_default
match_.global_measures = match.global_measures.copy()
return match_
[docs] def copy(match):
match_ = match._next_instance(inplace=False)
if match.fm is not None:
match_.fm = match.fm.copy()
match_.fs = match.fs.copy()
match_.local_measures = ub.map_vals(lambda a: a.copy(), match.local_measures)
return match_
[docs] def compress(match, flags, inplace=None):
if match.verbose:
print('[match] compressing {}/{}'.format(sum(flags), len(flags)))
match_ = match._next_instance(inplace)
match_.fm = match.fm.compress(flags, axis=0)
match_.fs = match.fs.compress(flags, axis=0)
match_.local_measures = ub.map_vals(
lambda a: a.compress(flags), match.local_measures
)
return match_
[docs] def take(match, indicies, inplace=None):
match_ = match._next_instance(inplace)
match_.fm = match.fm.take(indicies, axis=0)
match_.fs = match.fs.take(indicies, axis=0)
match_.local_measures = ub.map_vals(
lambda a: a.take(indicies), match.local_measures
)
return match_
[docs] def assign(match, cfgdict={}, verbose=None):
"""
Assign feature correspondences between annots
Example:
>>> # xdoctest: +REQUIRES(module:pyhesaff)
>>> from vtool.matching import * # NOQA
>>> cfgdict = {'symmetric': True}
>>> match = demodata_match({}, apply=False)
>>> m1 = match.copy().assign({'symmetric': False})
>>> m2 = match.copy().assign({'symmetric': True})
Example:
>>> # xdoctest: +REQUIRES(module:wbia)
>>> from vtool.matching import * # NOQA
>>> grid = {
>>> 'symmetric': [True, False],
>>> }
>>> for cfgdict in ut.all_dict_combinations(grid):
>>> match = demodata_match(cfgdict, apply=False)
>>> match.assign()
"""
params = match._take_params(
cfgdict, ['K', 'Knorm', 'symmetric', 'checks', 'weight']
)
params = list(params)
K, Knorm, symmetric, checks, weight_key = params
annot1 = match.annot1
annot2 = match.annot2
if match.verbose:
print('[match] assign')
print('[match] params = ' + ub.repr2(params))
ensure_metadata_vsone(annot1, annot2, cfgdict)
allow_shrink = True # TODO: parameterize?
# Search for nearest neighbors
if symmetric:
tup = symmetric_correspondence(annot1, annot2, K, Knorm, checks, allow_shrink)
else:
tup = asymmetric_correspondence(
annot1, annot2, K, Knorm, checks, allow_shrink
)
fm, match_dist, norm_dist, fx1_norm, fx2_norm = tup
ratio = np.divide(match_dist, norm_dist)
# convert so bigger is better
ratio_score = 1.0 - ratio
# remove local measure that can no longer apply
match.local_measures = ub.dict_diff(
match.local_measures, ['sver_err_xy', 'sver_err_scale', 'sver_err_ori']
)
match.local_measures['match_dist'] = match_dist
match.local_measures['norm_dist'] = norm_dist
match.local_measures['ratio'] = ratio
match.local_measures['ratio_score'] = ratio_score
if weight_key is None:
match.fs = ratio_score
else:
# Weight the scores with specified attribute
# (e.g. foregroundness weights)
weight1 = annot1[weight_key].take(fm.T[0], axis=0)
weight2 = annot2[weight_key].take(fm.T[1], axis=0)
weight = np.sqrt(weight1 * weight2)
weighted_ratio_score = ratio_score * weight
match.local_measures[weight_key] = weight
match.local_measures['weighted_ratio_score'] = weighted_ratio_score
match.local_measures['weighted_norm_dist'] = norm_dist * weight
match.fs = weighted_ratio_score
match.fm = fm
match.fm_norm1 = np.vstack([fx1_norm, fm.T[1]]).T
if fx2_norm is None:
match.fm_norm2 = None
else:
match.fm_norm2 = np.vstack([fm.T[1], fx2_norm]).T
return match
[docs] def ratio_test_flags(match, cfgdict={}):
(ratio_thresh,) = match._take_params(cfgdict, ['ratio_thresh'])
if match.verbose:
print('[match] apply_ratio_test')
print('[match] ratio_thresh = {!r}'.format(ratio_thresh))
ratio = match.local_measures['ratio']
flags = ratio < ratio_thresh
return flags
[docs] def sver_flags(match, cfgdict={}, return_extra=False):
"""
Example:
>>> # xdoctest: +REQUIRES(module:pyhesaff)
>>> from vtool.matching import * # NOQA
>>> cfgdict = {'symmetric': True, 'newsym': True}
>>> match = demodata_match(cfgdict, apply=False)
>>> cfgbase = {'symmetric': True, 'ratio_thresh': .8}
>>> cfgdict = ut.dict_union(cfgbase, dict(thresh_bins=[.5, .6, .7, .8]))
>>> match = match.assign(cfgbase)
>>> match.apply_ratio_test(cfgdict, inplace=True)
>>> flags1 = match.sver_flags(cfgdict)
>>> flags2 = match.sver_flags(cfgbase)
"""
from vtool import spatial_verification as sver
import vtool as vt
def _run_sver(kpts1, kpts2, fm, match_weights, **sver_kw):
svtup = sver.spatially_verify_kpts(
kpts1, kpts2, fm, match_weights=match_weights, **sver_kw
)
if svtup is None:
errors = [np.empty(0), np.empty(0), np.empty(0)]
inliers = []
H_12 = np.eye(3)
else:
(inliers, errors, H_12) = svtup[0:3]
svtup = (inliers, errors, H_12)
return svtup
(
sver_xy_thresh,
sver_ori_thresh,
sver_scale_thresh,
refine_method,
thresh_bins,
) = match._take_params(
cfgdict,
[
'sver_xy_thresh',
'sver_ori_thresh',
'sver_scale_thresh',
'refine_method',
'thresh_bins',
],
)
kpts1 = match.annot1['kpts']
kpts2 = match.annot2['kpts']
dlen_sqrd2 = match.annot2['dlen_sqrd']
sver_kw = dict(
xy_thresh=sver_xy_thresh,
ori_thresh=sver_ori_thresh,
scale_thresh=sver_scale_thresh,
refine_method=refine_method,
dlen_sqrd2=dlen_sqrd2,
)
if thresh_bins:
sver_tups = []
n_fm = len(match.fm)
xy_err = np.full(n_fm, fill_value=np.inf)
scale_err = np.full(n_fm, fill_value=np.inf)
ori_err = np.full(n_fm, fill_value=np.inf)
agg_errors = (xy_err, scale_err, ori_err)
agg_inlier_flags = np.zeros(n_fm, dtype=np.bool_)
agg_H_12 = None
prev_best = 50
for thresh in thresh_bins:
ratio = match.local_measures['ratio']
# These are of len(match.fm)=1000
# 100 of these are True
ratio_flags = ratio < thresh
ratio_idxs = np.where(ratio_flags)[0]
if len(ratio_idxs) == 0:
continue
# Filter matches at this level of the ratio test
fm = match.fm[ratio_flags]
match_weights = match.fs[ratio_flags]
svtup = _run_sver(kpts1, kpts2, fm, match_weights, **sver_kw)
(inliers, errors, H_12) = svtup
n_inliers = len(inliers)
if agg_H_12 is None or (n_inliers < 100 and n_inliers > prev_best):
# pick a homography from a lower ratio threshold if
# possible. TODO: check for H_12 = np.eye
agg_H_12 = H_12
prev_best = n_inliers
# these are of len(fm)=100
# flags = vt.index_to_boolmask(inliers, len(fm))
# print(errors[0][inliers].mean())
# Find places that passed the ratio and were inliers
agg_inlier_flags[ratio_idxs[inliers]] = True
for agg_err, err in zip(agg_errors, errors):
if len(err):
current_err = agg_err[ratio_idxs]
agg_err[ratio_idxs] = np.minimum(current_err, err)
sver_tups.append(svtup)
if return_extra:
return agg_inlier_flags, agg_errors, agg_H_12
else:
return agg_inlier_flags
else:
# match_weights = np.ones(len(fm))
fm = match.fm
match_weights = match.fs
svtup = _run_sver(kpts1, kpts2, fm, match_weights, **sver_kw)
(inliers, errors, H_12) = svtup
flags = vt.index_to_boolmask(inliers, len(fm))
if return_extra:
return flags, errors, H_12
else:
return flags
[docs] def apply_all(match, cfgdict):
if match.verbose:
print('[match] apply_all')
match.H_21 = None
match.H_12 = None
match.local_measures = ut.odict([])
match.assign(cfgdict)
match.apply_ratio_test(cfgdict, inplace=True)
(sv_on,) = match._take_params(cfgdict, ['sv_on'])
if sv_on:
match.apply_sver(cfgdict, inplace=True)
[docs] def apply_ratio_test(match, cfgdict={}, inplace=None):
flags = match.ratio_test_flags(cfgdict)
match_ = match.compress(flags, inplace=inplace)
return match_
[docs] def apply_sver(match, cfgdict={}, inplace=None):
"""
Ignore:
>>> from vtool.matching import * # NOQA
>>> cfgdict = {'symmetric': True, 'ratio_thresh': .8,
>>> 'thresh_bins': [.5, .6, .7, .8]}
>>> match = demodata_match(cfgdict, apply=False)
>>> match = match.assign(cfgbase)
>>> match.apply_ratio_test(cfgdict, inplace=True)
>>> flags1 = match.apply_sver(cfgdict)
"""
if match.verbose:
print('[match] apply_sver')
flags, errors, H_12 = match.sver_flags(cfgdict, return_extra=True)
match_ = match.compress(flags, inplace=inplace)
errors_ = [e.compress(flags) for e in errors]
match_.local_measures['sver_err_xy'] = errors_[0]
match_.local_measures['sver_err_scale'] = errors_[1]
match_.local_measures['sver_err_ori'] = errors_[2]
match_.H_12 = H_12
return match_
def _make_global_feature_vector(match, global_keys=None):
"""Global annotation properties and deltas"""
import vtool as vt
feat = ut.odict([])
if global_keys is None:
# speed should need to be requested
global_keys = sorted(match.global_measures.keys())
global_measures = ut.dict_subset(match.global_measures, global_keys)
for k, v in global_measures.items():
v1, v2 = v
if v1 is None:
v1 = np.nan
if v2 is None:
v2 = np.nan
if ut.isiterable(v1):
for i in range(len(v1)):
feat['global({}_1[{}])'.format(k, i)] = v1[i]
feat['global({}_2[{}])'.format(k, i)] = v2[i]
if k == 'gps':
delta = vt.haversine(v1, v2)
else:
delta = np.abs(v1 - v2)
else:
feat['global({}_1)'.format(k)] = v1
feat['global({}_2)'.format(k)] = v2
# if k == 'yaw':
# # delta = vt.ori_distance(v1, v2)
# delta = vt.cyclic_distance(v1, v2, modulo=vt.TAU)
if k == 'view':
delta = _rhomb_dist.VIEW_INT_DIST[(v1, v2)]
# delta = vt.cyclic_distance(v1, v2, modulo=8)
else:
delta = np.abs(v1 - v2)
feat['global(delta_{})'.format(k)] = delta
assert k != 'yaw', 'yaw is depricated'
# Impose ordering on these keys to add symmetry
keys_to_order = ['qual', 'view']
for key in keys_to_order:
k1 = 'global({}_1)'.format(key)
k2 = 'global({}_2)'.format(key)
if k1 in feat and k2 in feat:
minv, maxv = np.sort([feat[k1], feat[k2]])
feat['global(min_{})'.format(key)] = minv
feat['global(max_{})'.format(key)] = maxv
if 'global(delta_gps)' in feat and 'global(delta_time)' in feat:
hour_delta = feat['global(delta_time)'] / 360
km_delta = feat['global(delta_gps)']
if hour_delta == 0:
if km_delta == 0:
feat['global(speed)'] = 0
else:
feat['global(speed)'] = np.nan
else:
feat['global(speed)'] = km_delta / hour_delta
return feat
def _make_local_summary_feature_vector(
match, local_keys=None, summary_ops=None, bin_key=None, bins=None
):
r"""
Summary statistics of local features
CommandLine:
python -m vtool.matching _make_local_summary_feature_vector
Example:
>>> # xdoctest: +REQUIRES(module:pyhesaff)
>>> from vtool.matching import * # NOQA
>>> import vtool as vt
>>> cfgdict = {}
>>> match = demodata_match(cfgdict, recompute=0)
>>> match.apply_all(cfgdict)
>>> summary_ops = {'len', 'sum'}
>>> bin_key = 'ratio'
>>> bins = 2
>>> #bins = [.625, .725, .9]
>>> bins = [.625, .9]
>>> local_keys = ['ratio', 'norm_dist', 'ratio_score']
>>> bin_key = None
>>> local_keys = None
>>> summary_ops = 'all'
>>> feat = match._make_local_summary_feature_vector(
>>> local_keys=local_keys,
>>> bin_key=bin_key, summary_ops=summary_ops, bins=bins)
>>> result = ('feat = %s' % (ub.repr2(feat, nl=2),))
>>> print(result)
"""
if summary_ops is None:
summary_ops = {'sum', 'mean', 'std', 'len'}
if summary_ops == 'all':
summary_ops = set(SUM_OPS.keys()).union({'len'})
if local_keys is None:
local_measures = match.local_measures
else:
local_measures = ut.dict_subset(match.local_measures, local_keys)
feat = ut.odict([])
if bin_key is not None:
if bins is None:
raise ValueError('must choose bins')
# bins = [.625, .725, .9]
# binned ratio feature vectors
if isinstance(bins, int):
bins = np.linspace(0, 1.0, bins + 1)
else:
bins = list(bins)
bin_ids = np.searchsorted(bins, match.local_measures[bin_key])
dimkey_fmt = '{opname}({measure}[{bin_key}<{binval}])'
for binid, binval in enumerate(bins, start=1):
fxs = np.where(bin_ids <= binid)[0]
if 'len' in summary_ops:
dimkey = dimkey_fmt.format(
opname='len',
measure='matches',
bin_key=bin_key,
binval=binval,
)
feat[dimkey] = len(fxs)
for opname in sorted(summary_ops - {'len'}):
op = SUM_OPS[opname]
for k, vs in local_measures.items():
dimkey = dimkey_fmt.format(
opname=opname,
measure=k,
bin_key=bin_key,
binval=binval,
)
feat[dimkey] = op(vs[fxs])
else:
if True:
dimkey_fmt = '{opname}({measure})'
if 'len' in summary_ops:
dimkey = dimkey_fmt.format(opname='len', measure='matches')
feat[dimkey] = len(match.fm)
for opname in sorted(summary_ops - {'len'}):
op = SUM_OPS[opname]
for k, vs in local_measures.items():
dimkey = dimkey_fmt.format(opname=opname, measure=k)
feat[dimkey] = op(vs)
else:
# OLD
if 'len' in summary_ops:
feat['len(matches)'] = len(match.fm)
if 'sum' in summary_ops:
for k, vs in local_measures.items():
feat['sum(%s)' % (k,)] = vs.sum()
if 'mean' in summary_ops:
for k, vs in local_measures.items():
feat['mean(%s)' % (k,)] = np.mean(vs)
if 'std' in summary_ops:
for k, vs in local_measures.items():
feat['std(%s)' % (k,)] = np.std(vs)
if 'med' in summary_ops:
for k, vs in local_measures.items():
feat['med(%s)' % (k,)] = np.median(vs)
return feat
def _make_local_top_feature_vector(
match, local_keys=None, sorters='ratio', indices=3
):
"""Selected subsets of top features"""
if local_keys is None:
local_measures = match.local_measures
else:
local_measures = ut.dict_subset(match.local_measures, local_keys)
# Convert indices to an explicit list
if isinstance(indices, int):
indices = slice(indices)
if isinstance(indices, slice):
# assert indices.stop is not None, 'indices must have maximum value'
indices = list(range(*indices.indices(len(match.fm))))
# indices = list(range(*indices.indices(indices.stop)))
if len(indices) == 0:
return {}
# TODO: some sorters might want descending orders
sorters = ut.ensure_iterable(sorters)
chosen_xs = [
match.local_measures[sorter].argsort()[::-1][indices] for sorter in sorters
]
loc_fmt = 'loc[{sorter},{rank}]({measure})'
feat = ut.odict(
[
(loc_fmt.format(sorter=sorter, rank=rank, measure=k), v)
for sorter, topxs in zip(sorters, chosen_xs)
for k, vs in local_measures.items()
for rank, v in zip(indices, vs[topxs])
]
)
return feat
[docs] def make_feature_vector(
match,
local_keys=None,
global_keys=None,
summary_ops=None,
sorters='ratio',
indices=3,
bin_key=None,
bins=None,
):
"""
Constructs the pairwise feature vector that represents a match
Args:
local_keys (None): (default = None)
global_keys (None): (default = None)
summary_ops (None): (default = None)
sorters (str): (default = 'ratio')
indices (int): (default = 3)
Returns:
dict: feat
CommandLine:
python -m vtool.matching make_feature_vector
Example:
>>> # DISABLE_DOCTEST
>>> from vtool.matching import * # NOQA
>>> import vtool as vt
>>> match = demodata_match({})
>>> feat = match.make_feature_vector(indices=[0, 1])
>>> result = ('feat = %s' % (ub.repr2(feat, nl=2),))
>>> print(result)
"""
feat = ut.odict([])
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=RuntimeWarning)
feat.update(match._make_global_feature_vector(global_keys))
feat.update(
match._make_local_summary_feature_vector(
local_keys, summary_ops, bin_key=bin_key, bins=bins
)
)
feat.update(
match._make_local_top_feature_vector(
local_keys, sorters=sorters, indices=indices
)
)
return feat
[docs]def invsum(x):
return np.sum(1 / x)
[docs]def csum(x):
return (1 - x).sum()
# Different summary options available for pairwise feature vecs
SUM_OPS = {
# 'len' : len,
'invsum': invsum,
# 'csum' : csum,
'sum': np.sum,
'mean': np.mean,
'std': np.std,
'med': np.median,
}
[docs]@ut.reloadable_class
class AnnotPairFeatInfo(object):
"""
Information class about feature dimensions of PairwiseMatch.
Notes:
Can be used to compute marginal importances over groups of features
used in the pairwise one-vs-one scoring algorithm
Can be used to construct an appropriate cfgdict for a new
PairwiseMatch.
CommandLine:
python -m vtool.matching AnnotPairFeatInfo
Example:
>>> # xdoctest: +REQUIRES(module:pyhesaff)
>>> from vtool.matching import * # NOQA
>>> import vtool as vt
>>> match = demodata_match({})
>>> match.add_global_measures(['time', 'gps'])
>>> index = pd.MultiIndex.from_tuples([(1, 2)], names=('aid1', 'aid2'))
>>> # Feat info without bins
>>> feat = match.make_feature_vector()
>>> X = pd.DataFrame(feat, index=index)
>>> print(X.keys())
>>> featinfo = AnnotPairFeatInfo(X)
>>> pairfeat_cfg, global_keys = featinfo.make_pairfeat_cfg()
>>> print('pairfeat_cfg = %r' % (pairfeat_cfg,))
>>> print('global_keys = %r' % (global_keys,))
>>> assert 'delta' not in global_keys
>>> assert 'max' not in global_keys
>>> ut.cprint(featinfo.get_infostr(), 'blue')
>>> # Feat info with bins
>>> feat = match.make_feature_vector(indices=0, bins=[.7, .8], bin_key='ratio')
>>> X = pd.DataFrame(feat, index=index)
>>> print(X.keys())
>>> featinfo = AnnotPairFeatInfo(X)
>>> pairfeat_cfg, global_keys = featinfo.make_pairfeat_cfg()
>>> print('pairfeat_cfg = %s' % (ut.repr4(pairfeat_cfg),))
>>> print('global_keys = %r' % (global_keys,))
>>> ut.cprint(featinfo.get_infostr(), 'blue')
"""
def __init__(featinfo, columns=None, importances=None):
featinfo.importances = importances
if columns is not None:
if hasattr(columns, 'columns'):
featinfo.columns = columns.columns
else:
featinfo.columns = columns
else:
featinfo.columns = list(importances.keys())
if importances is not None:
assert isinstance(importances, dict), 'must be a dict'
featinfo._summary_keys = sorted(SUM_OPS.keys()) + ['len']
[docs] def make_pairfeat_cfg(featinfo):
criteria = [('measure_type', '==', 'local')]
indices = sorted(
map(int, set(map(featinfo.local_rank, featinfo.select_columns(criteria))))
)
sorters = sorted(
set(map(featinfo.local_sorter, featinfo.select_columns(criteria)))
)
criteria = [('measure_type', '==', 'global')]
global_measures = sorted(
set(map(featinfo.global_measure, featinfo.select_columns(criteria)))
)
global_keys = []
for key in global_measures:
parts = key.split('_')
offset = parts[0] in {'max', 'min', 'delta'}
global_keys.append(parts[offset])
global_keys = sorted(set(global_keys))
if 'speed' in global_keys:
global_keys.remove('speed') # hack
summary_cols = featinfo.select_columns([('measure_type', '==', 'summary')])
summary_ops = sorted(set(map(featinfo.summary_op, summary_cols)))
summary_measures = sorted(set(map(featinfo.summary_measure, summary_cols)))
summary_binvals = sorted(set(map(featinfo.summary_binval, summary_cols)))
summary_binvals = ut.filter_Nones(summary_binvals)
summary_binkeys = sorted(set(map(featinfo.summary_binkey, summary_cols)))
summary_binkeys = ut.filter_Nones(summary_binkeys)
if 'matches' in summary_measures:
summary_measures.remove('matches')
if len(summary_binkeys) == 0:
bin_key = None
else:
assert len(summary_binkeys) == 1
bin_key = summary_binkeys[0]
pairfeat_cfg = {
'summary_ops': summary_ops,
'local_keys': summary_measures,
'sorters': sorters,
'indices': indices,
'bins': ut.lmap(float, summary_binvals),
'bin_key': bin_key,
}
return pairfeat_cfg, global_keys
[docs] def select_columns(featinfo, criteria, op='and'):
"""
Args:
criteria (list): list of tokens denoting selection constraints
can be one of:
measure_type global_measure, local_sorter, local_rank,
local_measure, summary_measure, summary_op, summary_bin,
summary_binval, summary_binkey,
Ignore:
>>> featinfo.select_columns([
>>> ('measure_type', '==', 'local'),
>>> ('local_sorter', 'in', ['weighted_ratio_score', 'lnbnn_norm_dist']),
>>> ], op='and')
"""
if op == 'and':
cols = set(featinfo.columns)
update = cols.intersection_update
elif op == 'or':
cols = set([])
update = cols.update
else:
raise Exception(op)
for group_id, op, value in criteria:
found = featinfo.find(group_id, op, value)
update(found)
return cols
[docs] def find(featinfo, group_id, op, value, hack=False):
"""
groupid options:
measure_type global_measure, local_sorter, local_rank,
local_measure, summary_measure, summary_op, summary_bin,
summary_binval, summary_binkey,
Ignore:
group_id = 'summary_op'
op = '=='
value = 'len'
"""
import six
if isinstance(op, six.text_type):
opdict = ut.get_comparison_operators()
op = opdict.get(op)
grouper = getattr(featinfo, group_id)
found = []
for col in featinfo.columns:
value1 = grouper(col)
if value1 is None:
# TODO: turn hack off and ensure that doesn't break anything
if hack:
# Only filter out/in comparable things
# why? Is this just a hack? I forgot why I wrote this.
found.append(col)
else:
try:
if value1 is not None:
if isinstance(value, int):
value1 = int(value1)
elif isinstance(value, float):
value1 = float(value1)
elif isinstance(value, list):
if len(value) > 0 and isinstance(value[0], int):
value1 = int(value1)
if op(value1, value):
found.append(col)
except Exception:
pass
return found
[docs] def group_importance(featinfo, item):
name, keys = item
num = len(keys)
weight = sum(ut.take(featinfo.importances, keys))
ave_w = weight / num
tup = ave_w, weight, num
# return tup
df = pd.DataFrame([tup], columns=['ave_w', 'weight', 'num'], index=[name])
return df
[docs] def print_margins(featinfo, group_id, ignore_trivial=True):
columns = featinfo.columns
if isinstance(group_id, list):
cols = featinfo.select_columns(criteria=group_id)
_keys = [(c, [c]) for c in cols]
try:
_weights = pd.concat(ut.lmap(featinfo.group_importance, _keys))
except ValueError:
_weights = []
pass
nice = str(group_id)
else:
grouper = getattr(featinfo, group_id)
_keys = ut.group_items(columns, ut.lmap(grouper, columns))
_weights = pd.concat(ut.lmap(featinfo.group_importance, _keys.items()))
nice = ut.get_funcname(grouper).replace('_', ' ')
nice = ut.pluralize(nice)
try:
_weights = _weights.iloc[_weights['ave_w'].argsort()[::-1]]
except Exception:
pass
if not ignore_trivial or len(_weights) > 1:
ut.cprint('\nMarginal importance of ' + nice, 'white')
print(_weights)
[docs] def group_counts(featinfo, item):
name, keys = item
num = len(keys)
tup = (num,)
# return tup
df = pd.DataFrame([tup], columns=['num'], index=[name])
return df
[docs] def print_counts(featinfo, group_id):
columns = featinfo.columns
grouper = getattr(featinfo, group_id)
_keys = ut.group_items(columns, ut.lmap(grouper, columns))
_weights = pd.concat(ut.lmap(featinfo.group_counts, _keys.items()))
_weights = _weights.iloc[_weights['num'].argsort()[::-1]]
nice = ut.get_funcname(grouper).replace('_', ' ')
nice = ut.pluralize(nice)
print('\nCounts of ' + nice)
print(_weights)
[docs] def feature(featinfo, key):
return key
[docs] def measure(featinfo, key):
parsed = parse.parse('{type}({measure})', key)
if parsed is None:
parsed = parse.parse('{type}({measure}[{bin}])', key)
if parsed is not None:
return parsed['measure']
[docs] def global_measure(featinfo, key):
parsed = parse.parse('global({measure})', key)
if parsed is not None:
return parsed['measure']
loc_fmt = 'loc[{sorter},{rank}]({measure})'
[docs] def local_measure(featinfo, key):
parsed = parse.parse(featinfo.loc_fmt, key)
if parsed is not None:
return parsed['measure']
[docs] def local_sorter(featinfo, key):
parsed = parse.parse(featinfo.loc_fmt, key)
if parsed is not None:
return parsed['sorter']
[docs] def local_rank(featinfo, key):
parsed = parse.parse(featinfo.loc_fmt, key)
if parsed is not None:
return parsed['rank']
sum_fmt = '{op}({measure})'
binsum_fmt = '{op}({measure}[{bin_key}<{binval}])'
[docs] def summary_measure(featinfo, key):
parsed = parse.parse(featinfo.binsum_fmt, key)
if parsed is None:
parsed = parse.parse(featinfo.sum_fmt, key)
if parsed is not None:
if parsed['op'] in featinfo._summary_keys:
return parsed['measure']
[docs] def summary_op(featinfo, key):
parsed = parse.parse(featinfo.binsum_fmt, key)
if parsed is None:
parsed = parse.parse(featinfo.sum_fmt, key)
if parsed is not None:
if parsed['op'] in featinfo._summary_keys:
return parsed['op']
# def summary_bin(featinfo, key):
# parsed = parse.parse(featinfo.binsum_fmt, key)
# if parsed is not None:
# return parsed['bin_key'] + '<' + parsed['binval']
[docs] def summary_binkey(featinfo, key):
parsed = parse.parse(featinfo.binsum_fmt, key)
if parsed is not None:
return parsed['bin_key']
[docs] def summary_binval(featinfo, key):
parsed = parse.parse(featinfo.binsum_fmt, key)
if parsed is not None:
return parsed['binval']
[docs] def dimkey_grammar(featinfo):
"""
CommandLine:
python -m vtool.matching AnnotPairFeatInfo.dimkey_grammar
Example:
>>> # xdoctest: +REQUIRES(module:pyhesaff)
>>> from vtool.matching import * # NOQA
>>> import vtool as vt
>>> match = demodata_match({})
>>> match.add_global_measures(['view', 'qual', 'gps', 'time'])
>>> index = pd.MultiIndex.from_tuples([(1, 2)], names=('aid1', 'aid2'))
>>> # Feat info without bins
>>> feat = match.make_feature_vector()
>>> X = pd.DataFrame(feat, index=index)
>>> featinfo = AnnotPairFeatInfo(X)
>>> feat_grammar = featinfo.dimkey_grammar()
>>> for key in X.keys():
>>> print(key)
>>> print(feat_grammar.parseString(key))
"""
# https://stackoverflow.com/questions/18706631/pyparsing-get-token-location-in-results-name
# Here is a start of a grammer if we need to get serious
import pyparsing as pp
# locator = pp.Empty().setParseAction(lambda s, l, t: l)
# with feature dimension name encoding
# _summary_keys = ['sum', 'mean', 'med', 'std', 'len']
_summary_keys = featinfo._summary_keys
S = pp.Suppress
class Nestings(object):
"""allows for a bit of syntactic sugar"""
def __call__(self, x):
return pp.Suppress('(') + x + pp.Suppress(')')
def __getitem__(self, x):
return pp.Suppress('[') + x + pp.Suppress(']')
brak = paren = Nestings()
unary_id = pp.Regex('[12]')
unary_id = (unary_id)('unary_id')
# takes care of non-greedy matching of underscores
# http://stackoverflow.com/questions/1905278/keyword-matching-in-
unary_measure = pp.Combine(
pp.Word(pp.alphanums) + pp.ZeroOrMore('_' + ~unary_id + pp.Word(pp.alphanums))
)
unary_sub = brak[pp.Word(pp.nums)]
global_unary = unary_measure + S('_') + unary_id + pp.ZeroOrMore(unary_sub)
global_unary = (global_unary)('global_unary')
global_relation = pp.Word(pp.alphas + '_')
global_relation = (global_relation)('global_relation')
global_measure = global_unary | global_relation
global_feature = ('global' + paren(global_measure))('global_feature')
# Local
local_measure = pp.Word(pp.alphas + '_')
local_sorter = pp.Word(pp.alphas + '_')
local_rank = pp.Word(pp.nums)
local_rank = (local_rank)('local_rank')
local_sorter = (local_sorter)('local_sorter')
local_measure = (local_measure)('local_measure')
local_feature = (
'loc' + brak[local_sorter + S(',') + local_rank] + paren(local_measure)
)('local_feature')
# Summary
summary_measure = pp.Word(pp.alphas + '_')('summary_measure')
summary_binkey = pp.Word(pp.alphas + '_')('summary_binkey')
summary_binval = pp.Word(pp.nums + '.')('summary_binval')
summary_bin = brak[summary_binkey + '<' + summary_binval]('summary_bin')
summary_op = pp.Or(_summary_keys)('summary_op')
summary_feature = (
summary_op + paren(summary_measure + pp.Optional(summary_bin))
)('summary_feature')
feat_grammar = local_feature | global_feature | summary_feature
if False:
z = global_feature.parseString('global(qual_1)')
z = feat_grammar.parseString('global(min_qual)')
z = feat_grammar.parseString('loc[ratio,1](norm_dist)')
z = feat_grammar.parseString('mean(dist[ratio<1])')
z
return feat_grammar
[docs] def measure_type(featinfo, key):
if key.startswith('global'):
return 'global'
if key.startswith('loc'):
return 'local'
if any(key.startswith(p) for p in featinfo._summary_keys):
# if '[b' in key: # ]
# return 'binned_summary'
# else:
return 'summary'
[docs] def get_infostr(featinfo):
"""
Summarizes the types (global, local, summary) of features in X based on
standardized dimension names.
"""
grouped_keys = ub.ddict(list)
for key in featinfo.columns:
type_ = featinfo.measure_type(key)
grouped_keys[type_].append(key)
info_items = ub.odict(
[
(
'global_measures',
ut.lmap(featinfo.global_measure, grouped_keys['global']),
),
('local_sorters', set(map(featinfo.local_sorter, grouped_keys['local']))),
('local_ranks', set(map(featinfo.local_rank, grouped_keys['local']))),
(
'local_measures',
set(map(featinfo.local_measure, grouped_keys['local'])),
),
(
'summary_measures',
set(map(featinfo.summary_measure, grouped_keys['summary'])),
),
('summary_ops', set(map(featinfo.summary_op, grouped_keys['summary']))),
# ('summary_bins', set(map(featinfo.summary_bin,
# grouped_keys['summary']))),
(
'summary_binvals',
set(map(featinfo.summary_binval, grouped_keys['summary'])),
),
(
'summary_binkeys',
set(map(featinfo.summary_binkey, grouped_keys['summary'])),
),
]
)
import textwrap
def _wrap(list_):
unwrapped = ', '.join(sorted(list_))
indent = ' ' * 4
lines_ = textwrap.wrap(unwrapped, width=80 - len(indent))
lines = [' ' + line for line in lines_]
return lines
lines = []
lines.append('Feature Dimensions: %d' % (len(featinfo.columns)))
for item in info_items.items():
key, list_ = item
list_ = {a for a in list_ if a is not None}
if len(list_):
title = key.replace('_', ' ').title()
if key.endswith('_measures'):
groupid = key.replace('_measures', '')
num = len(grouped_keys[groupid])
title = title + ' (%d)' % (num,)
lines.append(title + ':')
if key == 'summary_measures':
other = info_items['local_measures']
if other.issubset(list_) and len(other) > 0:
remain = list_ - other
lines.extend(_wrap(['<same as local_measures>'] + list(remain)))
else:
lines.extend(_wrap(list_))
else:
lines.extend(_wrap(list_))
infostr = '\n'.join(lines)
return infostr
# print(infostr)
[docs]def empty_neighbors(num_vecs=0, K=0):
shape = (num_vecs, K)
fx2_to_fx1 = np.empty(shape, dtype=np.int32)
_fx2_to_dist_sqrd = np.empty(shape, dtype=np.float64)
return fx2_to_fx1, _fx2_to_dist_sqrd
# maximum SIFT matching distance based using uint8 trick from hesaff
PSEUDO_MAX_VEC_COMPONENT = 512
PSEUDO_MAX_DIST_SQRD = 2 * (PSEUDO_MAX_VEC_COMPONENT ** 2)
PSEUDO_MAX_DIST = np.sqrt(2) * (PSEUDO_MAX_VEC_COMPONENT)
[docs]def empty_assign():
fm = np.empty((0, 2), dtype=np.int32)
match_dist = np.array([])
norm_dist = np.array([])
fx1_norm = np.array([], dtype=np.int32)
fx2_norm = np.array([], dtype=np.int32)
return fm, match_dist, norm_dist, fx1_norm, fx2_norm
[docs]def symmetric_correspondence(annot1, annot2, K, Knorm, checks, allow_shrink=True):
"""
Find symmetric feature corresopndences
"""
if allow_shrink:
# Reduce K to allow some correspondences to be established
n_have = min(len(annot1['vecs']), len(annot2['vecs']))
if n_have < 2:
return empty_assign()
elif n_have < K + Knorm:
K, Knorm = n_have - 1, 1
num_neighbors = K + Knorm
fx1_to_fx2, fx1_to_dist = normalized_nearest_neighbors(
annot2['flann'], annot1['vecs'], num_neighbors, checks
)
fx2_to_fx1, fx2_to_dist = normalized_nearest_neighbors(
annot1['flann'], annot2['vecs'], num_neighbors, checks
)
# fx2_to_flags = flag_symmetric_matches(fx2_to_fx1, fx1_to_fx2, K)
assigntup2 = assign_symmetric_matches(
fx2_to_fx1, fx2_to_dist, fx1_to_fx2, fx1_to_dist, K, Knorm
)
(fm, match_dist, fx1_norm, norm_dist1, fx2_norm, norm_dist2) = assigntup2
norm_dist = np.minimum(norm_dist1, norm_dist2)
return fm, match_dist, norm_dist, fx1_norm, fx2_norm
[docs]def asymmetric_correspondence(annot1, annot2, K, Knorm, checks, allow_shrink=True):
"""
Find symmetric feature corresopndences
"""
if allow_shrink:
# Reduce K to allow some correspondences to be established
n_have = len(annot1['vecs'])
if n_have < 2:
return empty_assign()
elif n_have < K + Knorm:
K, Knorm = n_have - 1, 1
num_neighbors = K + Knorm
fx2_to_fx1, fx2_to_dist = normalized_nearest_neighbors(
annot1['flann'], annot2['vecs'], num_neighbors, checks
)
fx2_to_flags = np.ones((len(fx2_to_fx1), K), dtype=np.bool_)
# Assign correspondences
assigntup = assign_unconstrained_matches(
fx2_to_fx1, fx2_to_dist, K, Knorm, fx2_to_flags
)
fm, match_dist, fx1_norm, norm_dist = assigntup
fx2_norm = None
return fm, match_dist, norm_dist, fx1_norm, fx2_norm
[docs]def normalized_nearest_neighbors(flann1, vecs2, K, checks=800):
"""
Computes matches from vecs2 to flann1.
uses flann index to return nearest neighbors with distances normalized
between 0 and 1 using sifts uint8 trick
"""
import vtool as vt
if K == 0:
(fx2_to_fx1, _fx2_to_dist_sqrd) = empty_neighbors(len(vecs2), 0)
elif len(vecs2) == 0:
(fx2_to_fx1, _fx2_to_dist_sqrd) = empty_neighbors(0, K)
elif flann1 is None:
(fx2_to_fx1, _fx2_to_dist_sqrd) = empty_neighbors(0, 0)
elif K > flann1.get_indexed_shape()[0]:
# Corner case, may be better to throw an assertion error
raise MatchingError('not enough database features')
# (fx2_to_fx1, _fx2_to_dist_sqrd) = empty_neighbors(len(vecs2), 0)
else:
fx2_to_fx1, _fx2_to_dist_sqrd = flann1.nn_index(
vecs2, num_neighbors=K, checks=checks
)
_fx2_to_dist = np.sqrt(_fx2_to_dist_sqrd.astype(np.float64))
# normalized SIFT dist
fx2_to_dist = np.divide(_fx2_to_dist, PSEUDO_MAX_DIST)
fx2_to_fx1 = vt.atleast_nd(fx2_to_fx1, 2)
fx2_to_dist = vt.atleast_nd(fx2_to_dist, 2)
return fx2_to_fx1, fx2_to_dist
[docs]def assign_symmetric_matches(
fx2_to_fx1, fx2_to_dist, fx1_to_fx2, fx1_to_dist, K, Knorm=None
):
r"""
Ignore:
>>> import vtool as vt
>>> from vtool.matching import *
>>> K = 2
>>> Knorm = 1
>>> feat1 = np.random.rand(5, 3)
>>> feat2 = np.random.rand(7, 3)
>>>
>>> # Assign distances
>>> distmat = vt.L2(feat1[:, None], feat2[None, :])
>>>
>>> # Find nearest K
>>> fx1_to_fx2 = distmat.argsort()[:, 0:K + Knorm]
>>> fx2_to_fx1 = distmat.T.argsort()[:, 0:K + Knorm]
>>> # and order their distances
>>> fx1_to_dist = np.array([distmat[i].take(col) for i, col in enumerate(fx1_to_fx2)])
>>> fx2_to_dist = np.array([distmat.T[j].take(row) for j, row in enumerate(fx2_to_fx1)])
>>>
>>> # flat_matx1 = fx1_to_fx2 + np.arange(distmat.shape[0])[:, None] * distmat.shape[1]
>>> # fx1_to_dist = distmat.take(flat_matx1).reshape(fx1_to_fx2.shape)
>>>
>>> fx21 = pd.DataFrame(fx2_to_fx1)
>>> fx21.columns.name = 'K'
>>> fx21.index.name = 'fx1'
>>>
>>> fx12 = pd.DataFrame(fx1_to_fx2)
>>> fx12.columns.name = 'K'
>>> fx12.index.name = 'fx2'
>>>
>>> fx12 = fx12.T[0:K].T.astype(np.float)
>>> fx21 = fx21.T[0:K].T.astype(np.float)
>>>
>>> fx12.values[~fx1_to_flags] = np.nan
>>> fx21.values[~fx2_to_flags] = np.nan
>>>
>>> print('fx12.values =\n%r' % (fx12,))
>>> print('fm_ =\n%r' % (fm_,))
>>>
>>> print('fx21.values =\n%r' % (fx21,))
>>> print('fm =\n%r' % (fm,))
>>>
>>> unflat_match_idx2 = -np.ones(fx2_to_fx1.shape)
>>> unflat_match_idx2.ravel()[flat_match_idx2] = flat_match_idx2
>>> inv_lookup21 = unflat_match_idx2.T[0:K].T
>>>
>>> for fx2 in zip(fx12.values[fx1_to_flags]:
>>>
>>> for fx1, fx2 in zip(match_fx1_, match_fx2_):
>>> cx = np.where(fx2_to_fx1[fx2][0:K] == fx1)[0][0]
>>> inv_idx = inv_lookup21[fx2][cx]
>>> print('inv_idx = %r' % (inv_idx,))
"""
# Infer the valid internal query feature indexes and ranks
if Knorm is None:
basic_norm_rank = -1
else:
basic_norm_rank = K + Knorm - 1
index_dtype = fx2_to_fx1.dtype
fx2_to_flags = flag_symmetric_matches(fx2_to_fx1, fx1_to_fx2, K)
flat_validx2 = np.flatnonzero(fx2_to_flags)
match_fx2 = np.floor_divide(flat_validx2, K, dtype=index_dtype)
match_rank2 = np.mod(flat_validx2, K, dtype=index_dtype)
flat_match_idx2 = np.ravel_multi_index(
(match_fx2, match_rank2), dims=fx2_to_fx1.shape
)
match_fx1 = fx2_to_fx1.take(flat_match_idx2)
match_dist1 = fx2_to_dist.take(flat_match_idx2)
# assert np.all(match_dist1 == fx2_to_dist[:, 0:K][fx2_to_flags])
fm = np.vstack((match_fx1, match_fx2)).T
# TODO: for each position in fm need to find corresponding position in fm_
# Currently just use the last one as a normalizer
norm_rank = np.array([basic_norm_rank] * len(match_fx2), dtype=match_fx2.dtype)
# norm_rank = basic_norm_rank
flat_norm_idx1 = np.ravel_multi_index((match_fx2, norm_rank), dims=fx2_to_fx1.shape)
norm_fx1 = fx2_to_fx1.take(flat_norm_idx1)
norm_dist1 = fx2_to_dist.take(flat_norm_idx1)
norm_fx1 = fx2_to_fx1[match_fx2, norm_rank]
norm_dist1 = fx2_to_dist[match_fx2, norm_rank]
# ---------
# REVERSE DIRECTION
fx1_to_flags = flag_symmetric_matches(fx1_to_fx2, fx2_to_fx1, K)
flat_validx1 = np.flatnonzero(fx1_to_flags)
match_fx1_ = np.floor_divide(flat_validx1, K, dtype=index_dtype)
match_rank1 = np.mod(flat_validx1, K, dtype=index_dtype)
flat_match_idx1 = np.ravel_multi_index(
(match_fx1_, match_rank1), dims=fx1_to_fx2.shape
)
match_fx2_ = fx1_to_fx2.take(flat_match_idx1)
# match_dist2_ = fx1_to_dist.take(flat_match_idx1)
fm_ = np.vstack((match_fx1_, match_fx2_)).T
flat_norm_idx2_ = np.ravel_multi_index((match_fx1_, norm_rank), dims=fx1_to_fx2.shape)
norm_fx2_ = fx1_to_fx2.take(flat_norm_idx2_)
norm_dist2_ = fx1_to_dist.take(flat_norm_idx2_)
# ---------
# Align matches with the reverse direction
# Do this by enforcing a constant sorting. No lookup necessary
sortx = np.lexsort(fm.T)
sortx_ = np.lexsort(fm_.T)
# fm2_ = fm_.take(sortx_, axis=0)
# assert np.all(fm2 == fm2_)
a_fm = fm.take(sortx, axis=0)
a_match_dist = match_dist1[sortx]
# assert np.all(match_dist1[sortx] == match_dist2_[sortx_])
a_norm_fx1 = norm_fx1[sortx]
a_norm_dist1 = norm_dist1[sortx]
a_norm_fx2_ = norm_fx2_[sortx_]
a_norm_dist2_ = norm_dist2_[sortx_]
np.minimum(a_norm_dist2_, a_norm_dist1)
assigntup = (a_fm, a_match_dist, a_norm_fx1, a_norm_dist1, a_norm_fx2_, a_norm_dist2_)
return assigntup
[docs]def assign_unconstrained_matches(
fx2_to_fx1, fx2_to_dist, K, Knorm=None, fx2_to_flags=None
):
"""
assigns vsone matches using results of nearest neighbors.
Ignore:
fx2_to_dist = np.arange(fx2_to_fx1.size).reshape(fx2_to_fx1.shape)
CommandLine:
python -m vtool.matching --test-assign_unconstrained_matches --show
python -m vtool.matching assign_unconstrained_matches:0
python -m vtool.matching assign_unconstrained_matches:1
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.matching import * # NOQA
>>> fx2_to_fx1, fx2_to_dist = empty_neighbors(0, 0)
>>> K = 1
>>> Knorm = 1
>>> fx2_to_flags = None
>>> assigntup = assign_unconstrained_matches(fx2_to_fx1, fx2_to_dist, K,
>>> Knorm, fx2_to_flags)
>>> fm, match_dist, norm_fx1, norm_dist = assigntup
>>> result = ub.repr2(assigntup, precision=3, nobr=True, with_dtype=True)
>>> print(result)
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.matching import * # NOQA
>>> fx2_to_fx1 = np.array([[ 77, 971, 22],
>>> [116, 120, 34],
>>> [122, 128, 99],
>>> [1075, 692, 102],
>>> [ 530, 45, 120],
>>> [ 45, 530, 77]], dtype=np.int32)
>>> fx2_to_dist = np.array([[ 0.059, 0.238, .3],
>>> [ 0.021, 0.240, .4],
>>> [ 0.039, 0.247, .5],
>>> [ 0.149, 0.151, .6],
>>> [ 0.226, 0.244, .7],
>>> [ 0.215, 0.236, .8]], dtype=np.float32)
>>> K = 1
>>> Knorm = 1
>>> fx2_to_flags = np.array([[1, 1], [0, 1], [1, 1], [0, 1], [1, 1], [1, 1]])
>>> fx2_to_flags = fx2_to_flags[:, 0:K]
>>> assigntup = assign_unconstrained_matches(fx2_to_fx1, fx2_to_dist, K,
>>> Knorm, fx2_to_flags)
>>> fm, match_dist, norm_fx1, norm_dist = assigntup
>>> result = ub.repr2(assigntup, precision=3, nobr=True, with_dtype=True)
>>> print(result)
>>> assert len(fm.shape) == 2 and fm.shape[1] == 2
>>> assert ub.allsame(list(map(len, assigntup)))
"""
# Infer the valid internal query feature indexes and ranks
index_dtype = fx2_to_fx1.dtype
if fx2_to_flags is None:
# make everything valid
flat_validx = np.arange(len(fx2_to_fx1) * K, dtype=index_dtype)
else:
# fx2_to_flags = np.ones((len(fx2_to_fx1), K), dtype=np.bool_)
flat_validx = np.flatnonzero(fx2_to_flags)
match_fx2 = np.floor_divide(flat_validx, K, dtype=index_dtype)
match_rank = np.mod(flat_validx, K, dtype=index_dtype)
flat_match_idx = np.ravel_multi_index((match_fx2, match_rank), dims=fx2_to_fx1.shape)
match_fx1 = fx2_to_fx1.take(flat_match_idx)
match_dist = fx2_to_dist.take(flat_match_idx)
fm = np.vstack((match_fx1, match_fx2)).T
if Knorm is None:
basic_norm_rank = -1
else:
basic_norm_rank = K + Knorm - 1
# Currently just use the last one as a normalizer
norm_rank = np.array([basic_norm_rank] * len(match_fx2), dtype=match_fx2.dtype)
flat_norm_idx = np.ravel_multi_index((match_fx2, norm_rank), dims=fx2_to_fx1.shape)
norm_fx1 = fx2_to_fx1.take(flat_norm_idx)
norm_dist = fx2_to_dist.take(flat_norm_idx)
norm_fx1 = fx2_to_fx1[match_fx2, norm_rank]
norm_dist = fx2_to_dist[match_fx2, norm_rank]
assigntup = AssignTup(fm, match_dist, norm_fx1, norm_dist)
return assigntup
[docs]def flag_sym_slow(fx1_to_fx2, fx2_to_fx1, K):
"""
Returns flags indicating if the matches in fx1_to_fx2 are reciprocal
with the matches in fx2_to_fx1.
Much slower version of flag_symmetric_matches, but more clear
"""
fx1_to_flags = []
for fx1 in range(len(fx1_to_fx2)):
# Find img2 features in fx1's top K
fx2_m = fx1_to_fx2[fx1, :K]
# Find img2 features that have fx1 in their top K
reverse_m = fx2_to_fx1[fx2_m, :K]
is_recip = (reverse_m == fx1).sum(axis=1).astype(np.bool_)
fx1_to_flags.append(is_recip)
fx1_to_flags = np.array(fx1_to_flags)
return fx1_to_flags
[docs]def flag_symmetric_matches(fx2_to_fx1, fx1_to_fx2, K=2):
"""
Returns flags indicating if the matches in fx2_to_fx1 are reciprocal
with the matches in fx1_to_fx2.
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.matching import * # NOQA
>>> K = 2
>>> fx2_to_fx1 = np.array([[ 0, 1], # 0
>>> [ 1, 4], # 1
>>> [ 3, 4], # 2
>>> [ 2, 3]], dtype=np.int32) # 3
>>> fx1_to_fx2 = np.array([[ 0, 1], # 0
>>> [ 2, 1], # 1
>>> [ 0, 1], # 2
>>> [ 3, 1], # 3
>>> [ 0, 1]], dtype=np.int32) # 4
>>> fx2_to_flagsA = flag_symmetric_matches(fx2_to_fx1, fx1_to_fx2, K)
>>> fx2_to_flagsB = flag_sym_slow(fx2_to_fx1, fx1_to_fx2, K)
>>> assert np.all(fx2_to_flagsA == fx2_to_flagsB)
>>> result = ub.repr2(fx2_to_flagsB)
>>> print(result)
"""
match_12 = fx1_to_fx2.T[:K].T
match_21 = fx2_to_fx1.T[:K].T
fx2_list = np.arange(len(match_21))
# Lookup the reciprocal neighbors of each img2 feature.
matched = match_12[match_21.ravel()]
# Group the reciprocal matches such that
# matches[fx2, k, i] is the i-th recip neighbor of the k-th neighbor of fx
matched = matched.reshape((len(fx2_to_fx1), K, K))
# If a reciprocal neighbor is itself, then the feature is good
flags = matched == fx2_list[:, None, None]
fx2_to_flags = np.any(flags, axis=2)
return fx2_to_flags
if __name__ == '__main__':
"""
CommandLine:
python ~/code/vtool/vtool/matching.py
"""
import xdoctest
xdoctest.doctest_module(__file__)