# -*- coding: utf-8 -*-
"""
Wrapper around flann (with caching)
python -c "import vtool, doctest; print(doctest.testmod(vtool.nearest_neighbors))"
"""
from __future__ import absolute_import, division, print_function
from os.path import exists, normpath, join
import utool as ut
import ubelt as ub
import numpy as np
import annoy as ann
from vtool._pyflann_backend import FLANN_CLS, pyflann
[docs]class AnnoyWrapper(object):
"""
Wrapper for annoy to use the FLANN api
"""
def __init__(self):
self.ann = None
self.params = {
'trees': 8,
'checks': 512,
}
[docs] def build_index(self, dvecs, **kwargs):
import annoy
self.params.update(kwargs)
self.ann = annoy.AnnoyIndex(f=dvecs.shape[1], metric='euclidean')
for i, dvec in enumerate(dvecs):
ann.add_item(i, dvec)
ann.build(n_trees=self.params['trees'])
[docs] def nn_index(self, qvecs, num_neighbs, checks=None):
if checks is None:
checks = self.params['checks']
idxs = np.empty((len(qvecs), num_neighbs), dtype=np.int)
dists = np.empty((len(qvecs), num_neighbs), dtype=np.float)
for i, qvec in enumerate(qvecs):
idxs[i], dists[i] = ann.get_nns_by_vector(
qvec, n=num_neighbs, search_k=checks, include_distances=True
)
return idxs, dists
[docs]def test_annoy():
from vtool import demodata
import annoy
import utool
qvecs = demodata.testdata_dummy_sift(2 * 1000)
dvecs = demodata.testdata_dummy_sift(100 * 1000)
dim = dvecs.shape[1]
checks = 200
num_neighbs = 10
num_trees = 8
trials = 10
for timer in utool.Timerit(trials, label='build annoy'):
with timer:
ann = annoy.AnnoyIndex(dim, metric='euclidean')
for i, vec in enumerate(dvecs):
ann.add_item(i, vec)
ann.build(n_trees=num_trees)
for timer in utool.Timerit(trials, label='annoy query'):
with timer:
for qvec in qvecs:
ann.get_nns_by_vector(
qvec, n=num_neighbs, search_k=checks, include_distances=True
)
# ---------------
for timer in utool.Timerit(trials, label='build flann'):
with timer:
flann = FLANN_CLS()
flann.build_index(
dvecs, algorithm='kdtree', trees=num_trees, checks=checks, cores=1
)
for timer in utool.Timerit(trials, label='flann query'):
with timer:
flann.nn_index(qvecs, num_neighbs, checks=checks)
# ---------------
for timer in utool.Timerit(trials, label='build annoy wrapper'):
with timer:
index = AnnoyWrapper()
index.build_index(dvecs, trees=num_trees, checks=checks)
for timer in utool.Timerit(trials, label='query annoy wrapper'):
with timer:
index.nn_index(qvecs, num_neighbs, checks=checks)
[docs]def test_cv2_flann():
"""
Ignore:
[name for name in dir(cv2) if 'create' in name.lower()]
[name for name in dir(cv2) if 'stereo' in name.lower()]
ut.grab_zipped_url('https://priithon.googlecode.com/archive/a6117f5e81ec00abcfb037f0f9da2937bb2ea47f.tar.gz', download_dir='.')
"""
import cv2
from vtool import demodata
import wbia.plottool as pt
import vtool as vt
img1 = vt.imread(ut.grab_test_imgpath('easy1.png'))
img2 = vt.imread(ut.grab_test_imgpath('easy2.png'))
stereo = cv2.StereoBM_create(numDisparities=16, blockSize=15)
disparity = stereo.compute(img1, img2)
pt.imshow(disparity)
pt.show()
# cv2.estima
flow = cv2.createOptFlow_DualTVL1()
img1, img2 = vt.convert_image_list_colorspace(
[img1, img2], 'gray', src_colorspace='bgr'
)
img2 = vt.resize(img2, img1.shape[0:2][::-1])
out = img1.copy()
flow.calc(img1, img2, out)
orb = cv2.ORB_create()
kp1, vecs1 = orb.detectAndCompute(img1, None)
kp2, vecs2 = orb.detectAndCompute(img2, None)
detector = cv2.FeatureDetector_create('SIFT')
descriptor = cv2.DescriptorExtractor_create('SIFT')
skp = detector.detect(img1)
skp, sd = descriptor.compute(img1, skp)
tkp = detector.detect(img2)
tkp, td = descriptor.compute(img2, tkp)
out = img1.copy()
cv2.drawKeypoints(img1, kp1, outImage=out)
pt.imshow(out)
vecs1 = demodata.testdata_dummy_sift(10)
vecs2 = demodata.testdata_dummy_sift(10) # NOQA
FLANN_INDEX_KDTREE = 0 # bug: flann enums are missing
# flann_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=4)
index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
search_params = dict(checks=50) # or pass empty dictionary
flann = cv2.FlannBasedMatcher(index_params, search_params) # NOQA
cv2.flann.Index(vecs1, index_params)
# cv2.FlannBasedMatcher(flann_params)
cv2.flann.Index(vecs1, flann_params) # NOQA
# def match_flann(desc1, desc2, r_threshold = 0.6):
# flann = cv2.flann_Index(desc2, flann_params)
# idx2, dist = flann.knnSearch(desc1, 2, params = {}) # bug: need to provide empty dict
# mask = dist[:,0] / dist[:,1] < r_threshold
# idx1 = np.arange(len(desc1))
# pairs = np.int32( zip(idx1, idx2[:,0]) )
# return pairs[mask]
[docs]def ann_flann_once(dpts, qpts, num_neighbors, flann_params={}):
"""
Finds the approximate nearest neighbors of qpts in dpts
CommandLine:
xdoctest -m ~/code/vtool/vtool/nearest_neighbors.py ann_flann_once:0
Example:
>>> # DISABLE_DOCTEST
>>> from vtool.nearest_neighbors import * # NOQA
>>> np.random.seed(1)
>>> dpts = np.random.randint(0, 255, (5, 128)).astype(np.uint8)
>>> qpts = np.random.randint(0, 255, (5, 128)).astype(np.uint8)
>>> qx2_dx, qx2_dist = ann_flann_once(dpts, qpts, 2)
>>> import ubelt as ub
>>> result = ub.repr2((qx2_dx.T, qx2_dist.T), precision=2, with_dtype=True, nl=2)
>>> print(result)
(
np.array([[3, 3, 3, 3, 0],
[2, 0, 1, 4, 4]], dtype=np.int32),
np.array([[1037329., 1235876., 1168550., 1286435., 1075507.],
[1038324., 1243690., 1304896., 1320598., 1369036.]], dtype=np.float32),
)
Example:
>>> # DISABLE_DOCTEST
>>> # Test upper bounds on sift descriptors
>>> # SeeAlso distance.understanding_pseudomax_props
>>> from vtool.nearest_neighbors import * # NOQA
>>> import vtool as vt
>>> import numpy as np
>>> np.random.seed(1)
>>> # get points on unit sphere
>>> nDpts = 5000 # 5
>>> nQpts = 10000 # 10
>>> dpts = vt.normalize_rows(np.random.rand(nDpts, 128))
>>> qpts = vt.normalize_rows(np.random.rand(nQpts, 128))
>>> qmag = np.sqrt(np.power(qpts, 2).sum(1))
>>> dmag = np.sqrt(np.power(dpts, 2).sum(1))
>>> assert np.all(np.allclose(qmag, 1)), 'not on unit sphere'
>>> assert np.all(np.allclose(dmag, 1)), 'not on unit sphere'
>>> # cast to uint8
>>> uint8_max = 512 # hack
>>> uint8_min = 0 # hack
>>> K = 100 # 2
>>>
>>> qpts8 = np.clip(np.round(qpts * uint8_max), uint8_min, uint8_max).astype(np.uint8)
>>> dpts8 = np.clip(np.round(dpts * uint8_max), uint8_min, uint8_max).astype(np.uint8)
>>> qmag8 = np.sqrt(np.power(qpts8.astype(np.float32), 2).sum(1))
>>> dmag8 = np.sqrt(np.power(dpts8.astype(np.float32), 2).sum(1))
>>> # test
>>> qx2_dx, qx2_dist = ann_flann_once(dpts8, qpts8, K)
>>> biggest_dist = np.sqrt(qx2_dist.max())
>>> print('biggest_dist = %r' % (biggest_dist))
>>> # Get actual distance by hand
>>> hand_dist = np.sum((qpts8 - dpts8[qx2_dx.T[0]]) ** 2, 0)
>>> # Seems like flann returns squared distance. makes sense
>>> result = ub.hash_data(repr((qx2_dx, qx2_dist)))
>>> print(result)
Example:
>>> # DISABLE_DOCTEST
>>> # Build theoretically maximally distant vectors
>>> b = 512
>>> D = 128
>>> x = np.sqrt((float(b) ** 2) / float(D - 1))
>>> dpts = np.ones((2, 128)) * x
>>> qpts = np.zeros((2, 128))
>>> dpts[:, 0] = 0
>>> qpts[:, 0] = 512
>>> qpts[:, 0::2] = 1
>>> dpts[:, 1::2] = 1
>>> qpts[:, 1::2] = 0
>>> dpts[:, 0::2] = 0
>>> qmag = np.sqrt(np.power(qpts.astype(np.float64), 2).sum(1))
>>> dmag = np.sqrt(np.power(dpts.astype(np.float64), 2).sum(1))
>>> # FIX TO ACTUALLY BE AT THE RIGHT NORM
>>> dpts = (dpts * (512 / np.linalg.norm(dpts, axis=1))[:, None]).astype(np.float32)
>>> qpts = (qpts * (512 / np.linalg.norm(qpts, axis=1))[:, None]).astype(np.float32)
>>> print(np.linalg.norm(dpts))
>>> print(np.linalg.norm(qpts))
>>> dist = np.sqrt(np.sum((qpts - dpts) ** 2, 1))
>>> # Because of norm condition another maximally disant pair of vectors
>>> # is [1, 0, 0, ... 0] and [0, 1, .. 0, 0, 0]
>>> # verifythat this gives you same dist.
>>> dist2 = np.sqrt((512 ** 2 + 512 ** 2))
>>> print(dist2)
>>> print(dist)
"""
# qx2_dx = query_index -> nearest database index
# qx2_dist = query_index -> distance
# import cv2
flann = FLANN_CLS()
# obj = cv2.flann_Index()
if 'algorithm' not in flann_params:
flann_params['algorithm'] = 0
# flann_params['trees'] = 5
flann.build_index(dpts, **flann_params)
(qx2_dx, qx2_dist) = flann.nn_index(qpts, num_neighbors)
# flann.build_index(dpts, **flann_params)
# (qx2_dx, qx2_dist) = .nn_index(qpts, num_neighbors)
return (qx2_dx, qx2_dist)
[docs]def assign_to_centroids(dpts, qpts, num_neighbors=1, flann_params={}):
"""Helper for akmeans"""
(qx2_dx, qx2_dist) = FLANN_CLS().nn_index(dpts, qpts, num_neighbors, **flann_params)
return qx2_dx
[docs]def get_flann_params_cfgstr(flann_params):
if True:
# Ensure consistent ordering
flann_vals = list(flann_params.values())
flann_keys = list(flann_params.keys())
# reverse to maintain backwards compatibility
flann_valsig_ = str(ut.sortedby(flann_vals, flann_keys, reverse=True))
else:
flann_valsig_ = str(list(flann_params.values()))
flann_valsig = ut.remove_chars(flann_valsig_, ", '[]")
return flann_valsig
[docs]def get_flann_cfgstr(
dpts, flann_params, cfgstr='', use_params_hash=True, use_data_hash=True
):
"""
CommandLine:
python -m vtool.nearest_neighbors --test-get_flann_cfgstr
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.nearest_neighbors import * # NOQA
>>> rng = np.random.RandomState(1)
>>> dpts = rng.randint(0, 255, (10, 128)).astype(np.uint8)
>>> cache_dir = '.'
>>> cfgstr = '_FEAT(alg=heshes)'
>>> flann_params = get_kdtree_flann_params()
>>> result = get_flann_cfgstr(dpts, flann_params, cfgstr)
>>> print(result)
_FEAT(alg=heshes)_FLANN(4kdtree)_DPTS((10,128)xxaotseonmfjkzcr)
"""
flann_cfgstr = cfgstr
if use_params_hash:
flann_valsig = get_flann_params_cfgstr(flann_params)
flann_cfgstr += '_FLANN(' + flann_valsig + ')'
# Generate a unique filename for dpts and flann parameters
if use_data_hash:
# flann is dependent on the dpts
data_hashstr = ut.hashstr_arr27(dpts, '_DPTS')
flann_cfgstr += data_hashstr
return flann_cfgstr
[docs]def get_flann_fpath(
dpts,
cache_dir='default',
cfgstr='',
flann_params={},
use_params_hash=True,
use_data_hash=True,
appname='vtool',
verbose=True,
):
"""returns filepath for flann index"""
if cache_dir == 'default':
if verbose:
print('[flann] using default cache dir')
cache_dir = ub.ensure_app_cache_dir(appname)
ub.ensuredir(cache_dir)
flann_cfgstr = get_flann_cfgstr(
dpts,
flann_params,
cfgstr,
use_params_hash=use_params_hash,
use_data_hash=use_data_hash,
)
if verbose:
print('...flann_cache cfgstr = %r: ' % flann_cfgstr)
# Append any user labels
flann_fname = 'flann_index' + flann_cfgstr + '.flann'
flann_fpath = normpath(join(cache_dir, flann_fname))
return flann_fpath
[docs]def flann_cache(
dpts,
cache_dir='default',
cfgstr='',
flann_params={},
use_cache=True,
save=True,
use_params_hash=True,
use_data_hash=True,
appname='vtool',
verbose=None,
):
"""
Tries to load a cached flann index before doing anything
from vtool.nn
"""
if verbose is None:
verbose = int(ut.NOT_QUIET)
if verbose is True:
verbose = 2
if verbose > 1:
print('+--- START CACHED FLANN INDEX ')
if len(dpts) == 0:
raise ValueError('cannot build flann when len(dpts) == 0. (prevents a segfault)')
flann_fpath = get_flann_fpath(
dpts,
cache_dir,
cfgstr,
flann_params,
use_params_hash=use_params_hash,
use_data_hash=use_data_hash,
appname=appname,
verbose=verbose,
)
# Load the index if it exists
flann = FLANN_CLS()
flann.flann_fpath = flann_fpath
if use_cache and exists(flann_fpath):
try:
flann.load_index(flann_fpath, dpts)
if verbose > 0:
print('...flann cache hit: %d vectors' % (len(dpts)))
if verbose > 1:
print('L___ END FLANN INDEX ')
return flann
except Exception as ex:
ut.printex(ex, '... cannot load index', iswarning=True)
# Rebuild the index otherwise
if verbose > 0:
print('...flann cache miss.')
num_dpts = len(dpts)
if flann is None:
flann = FLANN_CLS()
if verbose > 1 or (verbose > 0 and num_dpts > 1e6):
print('...building kdtree over %d points (this may take a sec).' % num_dpts)
if num_dpts == 0:
print(
'WARNING: CANNOT BUILD FLANN INDEX OVER 0 POINTS. THIS MAY BE A SIGN OF A DEEPER ISSUE'
)
return flann
flann.build_index(dpts, **flann_params)
if verbose > 1:
print('flann.save_index(%r)' % ut.path_ndir_split(flann_fpath, n=2))
if save:
flann.save_index(flann_fpath)
if verbose > 1:
print('L___ END CACHED FLANN INDEX ')
return flann
[docs]def flann_augment(
dpts, new_dpts, cache_dir, cfgstr, new_cfgstr, flann_params, use_cache=True, save=True
):
"""
Example:
>>> # DISABLE_DOCTEST
>>> from vtool.nearest_neighbors import * # NOQA
>>> import vtool.demodata as demodata # NOQA
>>> dpts = demodata.get_dummy_dpts(ut.get_nth_prime(10))
>>> new_dpts = demodata.get_dummy_dpts(ut.get_nth_prime(9))
>>> cache_dir = ut.get_app_resource_dir('vtool')
>>> cfgstr = '_testcfg'
>>> new_cfgstr = '_new_testcfg'
>>> flann_params = get_kdtree_flann_params()
>>> use_cache = False
>>> save = False
"""
flann = flann_cache(dpts, cache_dir, cfgstr, flann_params)
flann.add_points(new_dpts)
if save:
aug_dpts = np.vstack((dpts, new_dpts))
new_flann_fpath = get_flann_fpath(aug_dpts, cache_dir, new_cfgstr, flann_params)
flann.save_index(new_flann_fpath)
return flann
[docs]def get_kdtree_flann_params():
flann_params = {'algorithm': 'kdtree', 'trees': 4}
return flann_params
[docs]def get_flann_params(algorithm='kdtree', **kwargs):
"""
Returns flann params that are relvant tothe algorithm
References:
http://www.cs.ubc.ca/research/flann/uploads/FLANN/flann_manual-1.8.4.pdf
Args:
algorithm (str): (default = 'kdtree')
Returns:
dict: flann_params
CommandLine:
python -m vtool.nearest_neighbors --test-get_flann_params --algo=kdtree
python -m vtool.nearest_neighbors --test-get_flann_params --algo=kmeans
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.nearest_neighbors import * # NOQA
>>> algorithm = ut.get_argval('--algo', default='kdtree')
>>> flann_params = get_flann_params(algorithm)
>>> result = ('flann_params = %s' % (ub.repr2(flann_params),))
>>> print(result)
"""
_algorithm_options = ['linear', 'kdtree', 'kmeans', 'composite', 'kdtree_single']
_centersinit_options = [
'random',
'gonzales',
'kmeanspp',
]
# Search params (for all algos)
assert algorithm in _algorithm_options
flann_params = {'algorithm': algorithm}
if algorithm != 'linear':
flann_params.update({'random_seed': -1})
if algorithm in ['kdtree', 'composite']:
# kdtree index parameters
flann_params.update(
{
'algorithm': _algorithm_options[1],
'trees': 4,
'checks': 32, # how many leafs (features) to check in one search
}
)
elif algorithm in ['kmeans', 'composite']:
# Kmeans index parametrs
flann_params.update(
{
'branching': 32,
'iterations': 5,
'centers_init': _centersinit_options[2],
'cb_index': 0.5, # cluster boundary index for searching kmeanms tree
'checks': 32, # how many leafs (features) to check in one search
}
)
elif algorithm == 'autotuned':
flann_params.update(
{
'algorithm': 'autotuned',
'target_precision': 0.01, # precision desired (used for autotuning, -1 otherwise)
'build_weight': 0.01, # build tree time weighting factor
'memory_weight': 0.0, # index memory weigthing factor
'sample_fraction': 0.001, # what fraction of the dataset to use for autotuning
}
)
elif algorithm == 'lsh':
flann_params.update(
{'table_number_': 12, 'key_size_': 20, 'multi_probe_level_': 2}
)
flann_params = ut.update_existing(flann_params, kwargs, assert_exists=True)
return flann_params
[docs]def tune_flann(
dpts,
target_precision=0.90,
build_weight=0.50,
memory_weight=0.00,
sample_fraction=0.01,
):
r"""
References:
http://www.cs.ubc.ca/research/flann/uploads/FLANN/flann_pami2014.pdf
http://www.cs.ubc.ca/research/flann/uploads/FLANN/flann_manual-1.8.4.pdf
http://docs.opencv.org/trunk/modules/flann/doc/flann_fast_approximate_nearest_neighbor_search.html
Math:
cost of an algorithm is:
LaTeX:
\cost = \frac
{\search + build_weight * \build }
{ \minoverparams( \search + build_weight \build)} +
memory_weight * \memory
Args:
dpts (ndarray):
target_precision (float): number between 0 and 1 representing desired
accuracy. Higher values are more accurate.
build_weight (float): importance weight given to minimizing build time
relative to search time. This number can range from 0 to infinity.
typically because building is a more complex computation you want
to keep the number relatively low, (less than 1) otherwise you'll
end up getting a linear search (no build time).
memory_weight (float): Importance of memory relative to total speed.
A value less than 1 gives more importance to the time spent and a
value greater than 1 gives more importance to the memory usage.
sample_fraction (float): number between 0 and 1 representing the
fraction of the input data to use in the optimization. A higher
number uses more data.
Returns:
dict: tuned_params
CommandLine:
python -m vtool.nearest_neighbors --test-tune_flann
"""
with ut.Timer('tuning flann'):
print('Autotuning flann with %d %dD vectors' % (dpts.shape[0], dpts.shape[1]))
print(
'a sample of %d vectors will be used' % (int(dpts.shape[0] * sample_fraction))
)
flann = FLANN_CLS()
# num_data = len(dpts)
flann_atkwargs = dict(
algorithm='autotuned',
target_precision=target_precision,
build_weight=build_weight,
memory_weight=memory_weight,
sample_fraction=sample_fraction,
)
suffix = repr(flann_atkwargs)
badchar_list = ",{}': "
for badchar in badchar_list:
suffix = suffix.replace(badchar, '')
print('flann_atkwargs:')
print(ub.repr2(flann_atkwargs))
print('starting optimization')
tuned_params = flann.build_index(dpts, **flann_atkwargs)
print('finished optimization')
# The algorithm is sometimes returned as default which is
# very unuseful as the default name is embeded in the pyflann
# module where most would not care to look. This finds the default
# name for you.
for key in ['algorithm', 'centers_init', 'log_level']:
val = tuned_params.get(key, None)
if val == 'default':
dict_ = pyflann.FLANNParameters._translation_[key]
other_algs = ut.dict_find_other_sameval_keys(dict_, 'default')
assert len(other_algs) == 1, 'more than 1 default for key=%r' % (key,)
tuned_params[key] = other_algs[0]
common_params = [
'algorithm',
'checks',
]
relevant_params_dict = dict(
linear=['algorithm'],
# ---
kdtree=['trees'],
# ---
kmeans=['branching', 'iterations', 'centers_init', 'cb_index'],
# ---
lsh=['table_number', 'key_size', 'multi_probe_level'],
)
relevant_params_dict['composite'] = (
relevant_params_dict['kmeans']
+ relevant_params_dict['kdtree']
+ common_params
)
relevant_params_dict['kmeans'] += common_params
relevant_params_dict['kdtree'] += common_params
relevant_params_dict['lsh'] += common_params
# kdtree_single_params = [
# 'leaf_max_size',
# ]
# other_params = [
# 'build_weight',
# 'sorted',
# ]
out_file = 'flann_tuned' + suffix
ut.write_to(out_file, ub.repr2(tuned_params, sorted_=True, newlines=True))
flann.delete_index()
if tuned_params['algorithm'] in relevant_params_dict:
print('relevant_params=')
relevant_params = relevant_params_dict[tuned_params['algorithm']]
print(
ub.repr2(
ut.dict_subset(tuned_params, relevant_params),
sorted_=True,
newlines=True,
)
)
print('irrelevant_params=')
print(
ub.repr2(
ut.dict_setdiff(tuned_params, relevant_params),
sorted_=True,
newlines=True,
)
)
else:
print('unknown tuned algorithm=%r' % (tuned_params['algorithm'],))
print('all_tuned_params=')
print(ub.repr2(tuned_params, sorted_=True, newlines=True))
return tuned_params
[docs]def flann_index_time_experiment():
r"""
Shows a plot of how long it takes to build a flann index for a given number of KD-trees
CommandLine:
python -m vtool.nearest_neighbors --test-flann_index_time_experiment
Example:
>>> # SLOW_DOCTEST
>>> # xdoctest: +SKIP
>>> from vtool.nearest_neighbors import * # NOQA
>>> result = flann_index_time_experiment()
>>> print(result)
"""
import vtool as vt
import itertools
class TestDataPool(object):
"""
Perform only a few allocations of test data
"""
def __init__(self):
self.num = 10000
self.data_pool = None
self.alloc_pool(1000000)
def alloc_pool(self, num):
print('[alloc] num = %r' % (num,))
self.num = num
self.data_pool = vt.demodata.testdata_dummy_sift(num)
print(
'[alloc] object size '
+ ut.get_object_size_str(self.data_pool, 'data_pool')
)
def get_testdata(self, num):
if len(self.data_pool) < num:
self.alloc_pool(2 * self.num)
return self.data_pool[0:num]
pool = TestDataPool()
def get_buildtime_data(**kwargs):
flann_params = vt.get_flann_params(**kwargs)
print('flann_params = %r' % (ub.repr2(flann_params),))
data_list = []
num = 1000
print('-----')
for count in ut.ProgIter(itertools.count(), length=-1, freq=1, adjust=False):
num = int(num * 1.2)
print('num = %r' % (num,))
# if num > 1E6:
# break
data = pool.get_testdata(num)
print('object size ' + ut.get_object_size_str(data, 'data'))
flann = FLANN_CLS(**flann_params)
with ut.Timer(verbose=False) as t:
flann.build_index(data)
print('t.ellapsed = %r' % (t.ellapsed,))
if t.ellapsed > 5 or count > 1000:
break
data_list.append((count, num, t.ellapsed))
print('-----')
return data_list, flann_params
data_list1, params1 = get_buildtime_data(trees=1)
data_list2, params2 = get_buildtime_data(trees=2)
data_list4, params4 = get_buildtime_data(trees=4)
data_list8, params8 = get_buildtime_data(trees=8)
data_list16, params16 = get_buildtime_data(trees=16)
import wbia.plottool as pt
def plotdata(data_list):
count_arr = ut.get_list_column(data_list, 1)
time_arr = ut.get_list_column(data_list, 2)
pt.plot2(
count_arr,
time_arr,
marker='-o',
equal_aspect=False,
x_label='num_vectors',
y_label='FLANN build time',
)
plotdata(data_list1)
plotdata(data_list2)
plotdata(data_list4)
plotdata(data_list8)
plotdata(data_list16)
pt.iup()
[docs]def invertible_stack(vecs_list, label_list):
"""
Stacks descriptors into a flat structure and returns inverse mapping from
flat database descriptor indexes (dx) to annotation ids (label) and feature
indexes (fx). Feature indexes are w.r.t. annotation indexes.
Output:
idx2_desc - flat descriptor stack
idx2_label - inverted index into annotations
idx2_fx - inverted index into features
# Example with 2D Descriptors
Example:
>>> # DISABLE_DOCTEST
>>> from vtool.nearest_neighbors import * # NOQA
>>> DESC_TYPE = np.uint8
>>> label_list = [1, 2, 3, 4, 5]
>>> vecs_list = [
... np.array([[0, 0], [0, 1]], dtype=DESC_TYPE),
... np.array([[5, 3], [2, 30], [1, 1]], dtype=DESC_TYPE),
... np.empty((0, 2), dtype=DESC_TYPE),
... np.array([[5, 3], [2, 30], [1, 1]], dtype=DESC_TYPE),
... np.array([[3, 3], [42, 42], [2, 6]], dtype=DESC_TYPE),
... ]
>>> idx2_vec, idx2_label, idx2_fx = invertible_stack(vecs_list, label_list)
>>> print(repr(idx2_vec.T))
array([[ 0, 0, 5, 2, 1, 5, 2, 1, 3, 42, 2],
[ 0, 1, 3, 30, 1, 3, 30, 1, 3, 42, 6]], dtype=uint8)
>>> print(repr(idx2_label))
array([1, 1, 2, 2, 2, 4, 4, 4, 5, 5, 5])
>>> print(repr(idx2_fx))
array([0, 1, 0, 1, 2, 0, 1, 2, 0, 1, 2])
"""
# INFER DTYPE? dtype = vecs_list[0].dtype
# Build inverted index of (label, fx) pairs
nFeats = sum(list(map(len, vecs_list)))
nFeat_iter = map(len, vecs_list)
label_nFeat_iter = zip(label_list, map(len, vecs_list))
# generate featx inverted index for each feature in each annotation
_ax2_fx = [list(range(nFeat)) for nFeat in nFeat_iter]
# generate label inverted index for each feature in each annotation
_ax2_label = [[label] * nFeat for (label, nFeat) in label_nFeat_iter]
# Flatten generators into the inverted index
_flatlabels = ub.flatten(_ax2_label)
_flatfeatxs = ub.flatten(_ax2_fx)
idx2_label = np.fromiter(_flatlabels, np.int32, nFeats)
idx2_fx = np.fromiter(_flatfeatxs, np.int32, nFeats)
# Stack vecsriptors into numpy array corresponding to inverted inexed
# This might throw a MemoryError
idx2_vec = np.vstack(vecs_list)
return idx2_vec, idx2_label, idx2_fx
if __name__ == '__main__':
"""
CommandLine:
xdoctest -m vtool.nearest_neighbors
"""
import xdoctest
xdoctest.doctest_module(__file__)