# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import numpy as np
import utool as ut
import ubelt as ub
import functools # NOQA
from six import next
from six.moves import zip, range
[docs]def safe_vstack(tup, default_shape=(0,), default_dtype=np.float32):
"""stacks a tuple even if it is empty"""
try:
return np.vstack(tup)
except ValueError:
return np.empty(default_shape, dtype=default_dtype)
[docs]def pad_vstack(arrs, fill_value=0):
"""Stacks values and pads arrays with different lengths with zeros"""
total = max(map(len, arrs))
padded = [np.hstack([a, np.full(total - len(a), fill_value)]) for a in arrs]
return np.vstack(padded)
[docs]def safe_cat(tup, axis=0, default_shape=(0,), default_dtype=np.float32):
"""
stacks a tuple even if it is empty
Also deals with numpy bug where cat fails if an element in sequence is empty
Example:
>>> # DISABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> import vtool as vt
>>> # test1
>>> tup = []
>>> ut.assert_eq(vt.safe_cat(tup, axis=0).shape, (0,))
>>> # test2
>>> tup = (np.array([[1, 2, 3]]), np.array([[]]))
>>> s = vt.safe_cat(tup, axis=0)
>>> print(ub.hzcat(['s = %s' % (ub.repr2(s), )]))
>>> ut.assert_eq(s.shape, (1, 3))
>>> # test3
>>> tup = (np.array([[1, 2, 3]]), np.array([[3, 4, 5]]))
>>> s = vt.safe_cat(tup, axis=1)
>>> print(ub.hzcat(['s = %s' % (ub.repr2(s), )]))
>>> ut.assert_eq(s.shape, (1, 6))
>>> # test3
>>> tup = (np.array(1), np.array(2), np.array(3))
>>> s = vt.safe_cat(tup, axis=1)
>>> print(ub.hzcat(['s = %s' % (ub.repr2(s), )]))
>>> ut.assert_eq(s.shape, (1, 6))
"""
if tup is None or len(tup) == 0:
stack = np.empty(default_shape, dtype=default_dtype)
else:
try:
stack = np.concatenate(tup, axis=axis)
except ValueError as ex1:
try:
# Ensure everything is at least a 1d array
tup_ = [np.atleast_1d(np.asarray(a)) for a in tup]
# remove empty parts
tup_ = [a for a in tup_ if a.size > 0]
stack = np.concatenate(tup_, axis=axis)
except ValueError:
# if axis == 0:
# stack = np.hstack(tup)
# elif axis == 1:
# stack = np.vstack(tup)
# elif axis == 3:
# stack = np.dstack(tup)
# else:
raise ex1
return stack
# try:
# return np.concatenate(tup, axis=axis)
# except ValueError:
[docs]def argsort_groups(scores_list, reverse=False, rng=np.random, randomize_levels=True):
"""
Sorts each group normally, but randomizes order of level values.
TODO: move to vtool
Args:
scores_list (list):
reverse (bool): (default = True)
rng (module): random number generator(default = numpy.random)
CommandLine:
python -m wbia.init.filter_annots --exec-argsort_groups
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> scores_list = [
>>> np.array([np.nan, np.nan], dtype=np.float32),
>>> np.array([np.nan, 2], dtype=np.float32),
>>> np.array([4, 1, 1], dtype=np.float32),
>>> np.array([7, 3, 3, 0, 9, 7, 5, 8], dtype=np.float32),
>>> np.array([2, 4], dtype=np.float32),
>>> np.array([np.nan, 4, np.nan, 8, np.nan, 9], dtype=np.float32),
>>> ]
>>> reverse = True
>>> rng = np.random.RandomState(0)
>>> idxs_list = argsort_groups(scores_list, reverse, rng)
>>> result = 'idxs_list = %s' % (ut.repr4(idxs_list, with_dtype=False),)
>>> print(result)
"""
scores_list_ = [
np.array(scores, copy=True).astype(np.float32) for scores in scores_list
]
breakers_list = [rng.rand(len(scores)) for scores in scores_list_]
# replace nan with -inf, or inf randomize order between equal values
replval = -np.inf if reverse else np.inf
# Ensure that nans are ordered last
for scores in scores_list_:
scores[np.isnan(scores)] = replval
# The last column is sorted by first with lexsort
scorebreaker_list = [
np.array((breakers, scores))
for scores, breakers in zip(scores_list_, breakers_list)
]
if reverse:
idxs_list = [np.lexsort(scorebreaker)[::-1] for scorebreaker in scorebreaker_list]
else:
idxs_list = [np.lexsort(scorebreaker) for scorebreaker in scorebreaker_list]
return idxs_list
[docs]def check_sift_validity(sift_uint8, lbl=None, verbose=ut.NOT_QUIET):
"""
checks if a SIFT descriptor is valid
"""
if lbl is None:
lbl = ut.get_varname_from_stack(sift_uint8, N=1)
print(
'[checksift] Checking valididty of %d SIFT descriptors. lbl=%s'
% (sift_uint8.shape[0], lbl)
)
is_correct_shape = len(sift_uint8.shape) == 2 and sift_uint8.shape[1] == 128
is_correct_dtype = sift_uint8.dtype == np.uint8
if not is_correct_shape:
print('[checksift] * incorrect shape = %r' % (sift_uint8.shape,))
elif verbose:
print('[checksift] * correct shape = %r' % (sift_uint8.shape,))
if not is_correct_dtype:
print('[checksift] * incorrect dtype = %r' % (sift_uint8.dtype,))
elif verbose:
print('[checksift] * correct dtype = %r' % (sift_uint8.dtype,))
num_sifts = sift_uint8.shape[0]
sift_float01 = sift_uint8 / 512.0
# Check L2 norm
sift_norm = np.linalg.norm(sift_float01, axis=1)
is_normal = np.isclose(sift_norm, 1.0, atol=0.04)
bad_locs_norm = np.where(np.logical_not(is_normal))[0]
if len(bad_locs_norm) > 0:
print('[checksift] * bad norm = %4d/%d' % (len(bad_locs_norm), num_sifts))
else:
print('[checksift] * correctly normalized')
# Check less than thresh=.2
# This check actually is not valid because the SIFT descriptors is
# normalized after it is thresholded
# bad_locs_thresh = np.where((sift_float01 > .2).sum(axis=1))[0]
# print('[checksift] * bad thresh = %4d/%d' % (len(bad_locs_thresh), num_sifts))
# if len(bad_locs_thresh) > 0:
# above_thresh = sift_float01[(sift_float01 > .2)]
# print('[checksift] * components under thresh = %d' % (sift_float01 <= 2).sum())
# print('[checksift] * components above thresh stats = ' +
# ut.get_stats_str(above_thresh, precision=2))
isok = len(bad_locs_norm) == 0 and is_correct_shape and is_correct_dtype
if not isok:
print('[checksift] ERROR. SIFT CHECK FAILED')
return isok
[docs]def get_crop_slices(isfill):
fill_colxs = [np.where(row)[0] for row in isfill]
fill_rowxs = [np.where(col)[0] for col in isfill.T]
nRows, nCols = isfill.shape[0:2]
filled_columns = intersect1d_reduce(fill_colxs)
filled_rows = intersect1d_reduce(fill_rowxs)
consec_rows_list = ut.group_consecutives(filled_rows)
consec_cols_list = ut.group_consecutives(filled_columns)
def get_consec_endpoint(consec_index_list, endpoint):
"""
consec_index_list = consec_cols_list
endpoint = 0
"""
for consec_index in consec_index_list:
if np.any(np.array(consec_index) == endpoint):
return consec_index
def get_min_consec_endpoint(consec_rows_list, endpoint):
consec_index = get_consec_endpoint(consec_rows_list, endpoint)
if consec_index is None:
return endpoint
return max(consec_index)
def get_max_consec_endpoint(consec_rows_list, endpoint):
consec_index = get_consec_endpoint(consec_rows_list, endpoint)
if consec_index is None:
return endpoint + 1
return min(consec_index)
consec_rows_top = get_min_consec_endpoint(consec_rows_list, 0)
consec_rows_bottom = get_max_consec_endpoint(consec_rows_list, nRows - 1)
remove_cols_left = get_min_consec_endpoint(consec_cols_list, 0)
remove_cols_right = get_max_consec_endpoint(consec_cols_list, nCols - 1)
rowslice = slice(consec_rows_top, consec_rows_bottom)
colslice = slice(remove_cols_left, remove_cols_right)
return rowslice, colslice
[docs]def get_undirected_edge_ids(directed_edges):
r"""
Args:
directed_edges (ndarray[ndims=2]):
Returns:
list: edgeid_list
CommandLine:
python -m vtool.other --exec-get_undirected_edge_ids
Example:
>>> # DISABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> directed_edges = np.array([[1, 2], [2, 1], [2, 3], [3, 1], [1, 1], [2, 3], [3, 2]])
>>> edgeid_list = get_undirected_edge_ids(directed_edges)
>>> result = ('edgeid_list = %s' % (ub.repr2(edgeid_list),))
>>> print(result)
edgeid_list = [0 0 1 2 3 1 1]
"""
# import vtool as vt
undirected_edges = to_undirected_edges(directed_edges)
edgeid_list = compute_unique_data_ids(undirected_edges)
return edgeid_list
[docs]def to_undirected_edges(directed_edges, upper=False):
assert len(directed_edges.shape) == 2 and directed_edges.shape[1] == 2
# flipped = qaid_arr < daid_arr
if upper:
flipped = directed_edges.T[0] > directed_edges.T[1]
else:
flipped = directed_edges.T[0] < directed_edges.T[1]
# standardize edge order
edges_dupl = directed_edges.copy()
edges_dupl[flipped, 0:2] = edges_dupl[flipped, 0:2][:, ::-1]
undirected_edges = edges_dupl
return undirected_edges
[docs]def find_best_undirected_edge_indexes(directed_edges, score_arr=None):
r"""
Args:
directed_edges (ndarray[ndims=2]):
score_arr (ndarray):
Returns:
list: unique_edge_xs
CommandLine:
python -m vtool.other --test-find_best_undirected_edge_indexes
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> directed_edges = np.array([[1, 2], [2, 1], [2, 3], [3, 1], [1, 1], [2, 3], [3, 2]])
>>> score_arr = np.array([1, 1, 1, 1, 1, 1, 2])
>>> unique_edge_xs = find_best_undirected_edge_indexes(directed_edges, score_arr)
>>> result = str(unique_edge_xs)
>>> print(result)
[0 3 4 6]
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> directed_edges = np.array([[1, 2], [2, 1], [2, 3], [3, 1], [1, 1], [2, 3], [3, 2]])
>>> score_arr = None
>>> unique_edge_xs = find_best_undirected_edge_indexes(directed_edges, score_arr)
>>> result = str(unique_edge_xs)
>>> print(result)
[0 2 3 4]
"""
import vtool as vt
# assert len(directed_edges.shape) == 2 and directed_edges.shape[1] == 2
# # flipped = qaid_arr < daid_arr
# flipped = directed_edges.T[0] < directed_edges.T[1]
# # standardize edge order
# edges_dupl = directed_edges.copy()
# edges_dupl[flipped, 0:2] = edges_dupl[flipped, 0:2][:, ::-1]
# edgeid_list = vt.compute_unique_data_ids(edges_dupl)
edgeid_list = get_undirected_edge_ids(directed_edges)
unique_edgeids, groupxs = vt.group_indices(edgeid_list)
# if there is more than one edge in a group take the one with the highest score
if score_arr is None:
unique_edge_xs_list = [groupx[0] for groupx in groupxs]
else:
assert len(score_arr) == len(directed_edges)
score_groups = vt.apply_grouping(score_arr, groupxs)
score_argmaxs = [score_group.argmax() for score_group in score_groups]
unique_edge_xs_list = [
groupx[argmax] for groupx, argmax in zip(groupxs, score_argmaxs)
]
unique_edge_xs = np.array(sorted(unique_edge_xs_list), dtype=np.int32)
return unique_edge_xs
[docs]def argsort_records(arrays, reverse=False):
r"""
Sorts arrays that form records.
Same as lexsort(arrays[::-1]) --- ie. rows are reversed.
Args:
arrays (ndarray): array of records
reverse (bool): (default = False)
Returns:
ndarray: sortx - sorted indicies
CommandLine:
python -m vtool.other --exec-argsort_records
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> arrays = np.array([
>>> [1, 1, 1, 2, 2, 2, 3, 4, 5],
>>> [2, 0, 2, 6, 4, 3, 2, 5, 6],
>>> [1, 1, 0, 2, 3, 4, 5, 6, 7],
>>> ],)
>>> reverse = False
>>> sortx = argsort_records(arrays, reverse)
>>> result = ('sortx = %s' % (str(sortx),))
>>> print('lxsrt = %s' % (np.lexsort(arrays[::-1]),))
>>> print(result)
sortx = [1 2 0 5 4 3 6 7 8]
"""
sorting_records = np.rec.fromarrays(arrays)
sort_stride = (-reverse * 2) + 1
sortx = sorting_records.argsort()[::sort_stride]
return sortx
[docs]def unique_rows(arr, directed=True):
"""
Order or columns does not matter if directed = False
"""
if directed:
idx_list = compute_unique_data_ids(arr)
else:
idx_list = get_undirected_edge_ids(arr)
_, unique_rowx = np.unique(idx_list, return_index=True)
unique_arr = arr.take(unique_rowx, axis=0)
return unique_arr
[docs]def compute_ndarray_unique_rowids_unsafe(arr):
"""
arr = np.random.randint(2, size=(10000, 10))
vt.compute_unique_data_ids_(list(map(tuple, arr)))
len(vt.compute_unique_data_ids_(list(map(tuple, arr))))
len(np.unique(vt.compute_unique_data_ids_(list(map(tuple, arr)))))
%timeit vt.compute_unique_data_ids_(list(map(tuple, arr)))
%timeit compute_ndarray_unique_rowids_unsafe(arr)
"""
# no checks performed
void_dtype = np.dtype((np.void, arr.dtype.itemsize * arr.shape[1]))
# assert arr.flags['C_CONTIGUOUS']
arr_void_view = arr.view(void_dtype)
unique, rowids = np.unique(arr_void_view, return_inverse=True)
return rowids
# np.ascontiguousarray(arr).data == arr.data
# assert arr.data == arr_void_view.data
[docs]def nonunique_row_flags(arr):
import vtool as vt
from vtool.numpy_utils import unique_row_indexes
unique_rowx = unique_row_indexes(arr)
unique_flags = vt.index_to_boolmask(unique_rowx, len(arr))
nonunique_flags = np.logical_not(unique_flags)
return nonunique_flags
[docs]def nonunique_row_indexes(arr):
"""rows that are not unique (does not include the first instance of each pattern)
Args:
arr (ndarray): 2d array
Returns:
ndarray: nonunique_rowx
SeeAlso:
unique_row_indexes
nonunique_row_flags
CommandLine:
python -m vtool.other --test-unique_row_indexes
Example:
>>> # DISABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> arr = np.array([[0, 0], [0, 1], [1, 0], [1, 1], [0, 0], [.534, .432], [.534, .432], [1, 0], [0, 1]])
>>> nonunique_rowx = unique_row_indexes(arr)
>>> result = ('nonunique_rowx = %s' % (ub.repr2(nonunique_rowx),))
>>> print(result)
nonunique_rowx = np.array([4, 6, 7, 8], dtype=np.int64)
"""
nonunique_flags = nonunique_row_flags(arr)
nonunique_rowx = np.where(nonunique_flags)[0]
return nonunique_rowx
[docs]def compute_unique_data_ids(data):
"""
This is actually faster than compute_unique_integer_data_ids it seems
CommandLine:
python -m vtool.other --test-compute_unique_data_ids
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> data = np.array([[0, 0], [0, 1], [1, 0], [1, 1], [0, 0], [.534, .432], [.534, .432], [1, 0], [0, 1]])
>>> dataid_list = compute_unique_data_ids(data)
>>> result = 'dataid_list = ' + ub.repr2(dataid_list, with_dtype=True)
>>> print(result)
dataid_list = np.array([0, 1, 2, 3, 0, 4, 4, 2, 1], dtype=np.int32)
"""
# construct a unique id for every edge
hashable_rows = [tuple(row_.tolist()) for row_ in data]
dataid_list = np.array(compute_unique_data_ids_(hashable_rows), dtype=np.int32)
return dataid_list
[docs]def compute_unique_data_ids_(hashable_rows, iddict_=None):
if iddict_ is None:
iddict_ = {}
for row in hashable_rows:
if row not in iddict_:
iddict_[row] = len(iddict_)
dataid_list = ut.dict_take(iddict_, hashable_rows)
return dataid_list
[docs]def compute_unique_arr_dataids(arr):
"""specialized version for speed when arr is an ndarray"""
iddict_ = {}
hashable_rows = list(map(tuple, arr.tolist()))
for row in hashable_rows:
if row not in iddict_:
iddict_[row] = len(iddict_)
dataid_list = np.array([iddict_[row] for row in hashable_rows])
return dataid_list
[docs]def compute_unique_integer_data_ids(data):
r"""
This is actually slower than compute_unique_data_ids it seems
Example:
>>> # DISABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> # build test data
>>> data = np.array([[0, 0], [0, 1], [1, 1], [0, 0], [0, 0], [0, 1], [1, 1], [0, 0], [9, 0]])
>>> data = np.random.randint(1000, size=(1000, 2))
>>> # execute function
>>> result1 = compute_unique_data_ids(data)
>>> result2 = compute_unique_integer_data_ids(data)
>>> # verify results
>>> print(result)
%timeit compute_unique_data_ids(data)
%timeit compute_unique_integer_data_ids(data)
"""
# construct a unique id for every edge
ncols = data.shape[1]
# get the number of decimal places to shift
exp_step = np.ceil(np.log10(data.max()))
offsets = [int(10 ** (ix * exp_step)) for ix in reversed(range(0, ncols))]
dataid_list = np.array(
[sum([item * offset for item, offset in zip(row, offsets)]) for row in data]
)
return dataid_list
[docs]def trytake(list_, index_list):
return None if list_ is None else list_take_(list_, index_list)
[docs]def list_take_(list_, index_list):
if isinstance(list_, np.ndarray):
return list_.take(index_list, axis=0)
else:
return list(ub.take(list_, index_list))
[docs]def compress2(arr, flag_list, axis=None, out=None):
"""
Wrapper around numpy compress that makes the signature more similar to take
"""
return np.compress(flag_list, arr, axis=axis, out=out)
[docs]def take2(arr, index_list, axis=None, out=None):
"""
Wrapper around numpy compress that makes the signature more similar to take
"""
return np.take(arr, index_list, axis=axis, out=out)
[docs]def list_compress_(list_, flag_list):
if isinstance(list_, np.ndarray):
return list_.compress(flag_list, axis=0)
else:
return list(ub.compress(list_, flag_list))
[docs]def index_partition(item_list, part1_items):
"""
returns two lists. The first are the indecies of items in item_list that
are in part1_items. the second is the indices in item_list that are not
in part1_items. items in part1_items that are not in item_list are
ignored
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> item_list = ['dist', 'fg', 'distinctiveness']
>>> part1_items = ['fg', 'distinctiveness']
>>> part1_indexes, part2_indexes = index_partition(item_list, part1_items)
>>> ut.assert_eq(part1_indexes.tolist(), [1, 2])
>>> ut.assert_eq(part2_indexes.tolist(), [0])
"""
part1_indexes_ = [item_list.index(item) for item in part1_items if item in item_list]
part1_indexes = np.array(part1_indexes_)
part2_indexes = np.setdiff1d(np.arange(len(item_list)), part1_indexes)
# FIXME: use dtype np.int_
part1_indexes = part1_indexes.astype(np.int32)
part2_indexes = part2_indexes.astype(np.int32)
return part1_indexes, part2_indexes
# def partition_Nones(item_list):
# """
# Example:
# >>> # ENABLE_DOCTEST
# >>> from vtool.other import * # NOQA
# >>> item_list = ['foo', None, None, 'bar']
# >>> part1_indexes, part2_indexes = partition_Nones(item_list)
# """
# # part1_indexes_ = ut.list_where(item_list)
# part1_indexes_ = [index for index, item in enumerate(item_list) if item is not None]
# part1_indexes = np.array(part1_indexes_)
# part2_indexes = np.setdiff1d(np.arange(len(item_list)), part1_indexes)
# return part1_indexes, part2_indexes
[docs]def rebuild_partition(part1_vals, part2_vals, part1_indexes, part2_indexes):
r"""
Inverts work done by index_partition
Args:
part1_vals (list):
part2_vals (list):
part1_indexes (dict):
part2_indexes (dict):
CommandLine:
python -m vtool.other --test-rebuild_partition
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> item_list = ['dist', 'fg', 'distinctiveness']
>>> part1_items = ['fg', 'distinctiveness']
>>> part1_indexes, part2_indexes = index_partition(item_list, part1_items)
>>> part1_vals = ut.take(item_list, part1_indexes)
>>> part2_vals = ut.take(item_list, part2_indexes)
>>> val_list = rebuild_partition(part1_vals, part2_vals, part1_indexes, part2_indexes)
>>> assert val_list == item_list, 'incorrect inversin'
>>> print(val_list)
"""
val_list = [None] * (len(part1_indexes) + len(part2_indexes))
for idx, val in zip(part1_indexes, part1_vals):
val_list[idx] = val
for idx, val in zip(part2_indexes, part2_vals):
val_list[idx] = val
return val_list
[docs]def weighted_average_scoring(fsv, weight_filtxs, nonweight_filtxs):
r"""
does \frac{\sum_i w^f_i * w^d_i * r_i}{\sum_i w^f_i, w^d_i}
to get a weighed average of ratio scores
If we normalize the weight part to add to 1 then we can get per-feature
scores.
References:
http://en.wikipedia.org/wiki/Weighted_arithmetic_mean
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> fsv = np.array([
... [ 0.82992172, 1.56136119, 0.66465378],
... [ 0.8000412 , 2.14719748, 1. ],
... [ 0.80848503, 2.6816361 , 1. ],
... [ 0.86761665, 2.70189977, 1. ],
... [ 0.8004055 , 1.58753884, 0.92178345],])
>>> weight_filtxs = np.array([1, 2], dtype=np.int32)
>>> nonweight_filtxs = np.array([0], dtype=np.int32)
>>> new_fs = weighted_average_scoring(fsv, weight_filtxs, nonweight_filtxs)
>>> result = new_fs
>>> print(result)
"""
weight_fs = fsv.T.take(weight_filtxs, axis=0).T.prod(axis=1)
nonweight_fs = fsv.T.take(nonweight_filtxs, axis=0).T.prod(axis=1)
weight_fs_norm01 = weight_fs / weight_fs.sum()
# weight_fs_norm01[np.isnan(weight_fs_norm01)] = 0.0
# If weights are nan, fill them with zeros
weight_fs_norm01 = np.nan_to_num(weight_fs_norm01)
new_fs = np.multiply(nonweight_fs, weight_fs_norm01)
return new_fs
[docs]def assert_zipcompress(arr_list, flags_list, axis=None):
num_flags = [len(flags) for flags in flags_list]
if axis is None:
num_arrs = [arr.size for arr in arr_list]
else:
num_arrs = [arr.shape[axis] for arr in arr_list]
assert num_flags == num_arrs, 'not able to zipcompress'
[docs]def zipcompress_safe(arr_list, flags_list, axis=None):
arr_list = list(arr_list)
flags_list = list(flags_list)
assert_zipcompress(arr_list, flags_list, axis=axis)
return zipcompress(arr_list, flags_list, axis)
[docs]def zipcompress(arr_list, flags_list, axis=None):
return [
np.compress(flags, arr, axis=axis) for arr, flags in zip(arr_list, flags_list)
]
[docs]def ziptake(arr_list, indices_list, axis=None):
return [arr.take(indices, axis=axis) for arr, indices in zip(arr_list, indices_list)]
[docs]def zipcat(arr1_list, arr2_list, axis=None):
r"""
Args:
arr1_list (list):
arr2_list (list):
axis (None): (default = None)
Returns:
list:
CommandLine:
python -m vtool.other --exec-zipcat --show
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> arr1_list = [np.array([0, 0, 0]), np.array([0, 0, 0, 0])]
>>> arr2_list = [np.array([1, 1, 1]), np.array([1, 1, 1, 1])]
>>> axis = None
>>> arr3_list = zipcat(arr1_list, arr2_list, axis)
>>> arr3_list0 = zipcat(arr1_list, arr2_list, axis=0)
>>> arr3_list1 = zipcat(arr1_list, arr2_list, axis=1)
>>> arr3_list2 = zipcat(arr1_list, arr2_list, axis=2)
>>> print('arr3_list = %s' % (ut.repr3(arr3_list),))
>>> print('arr3_list0 = %s' % (ut.repr3(arr3_list0),))
>>> print('arr3_list2 = %s' % (ut.repr3(arr3_list2),))
"""
import vtool as vt
assert len(arr1_list) == len(arr2_list), 'lists must correspond'
if axis is None:
arr1_iter = arr1_list
arr2_iter = arr2_list
else:
arr1_iter = [vt.atleast_nd(arr1, axis + 1) for arr1 in arr1_list]
arr2_iter = [vt.atleast_nd(arr2, axis + 1) for arr2 in arr2_list]
arrs_iter = list(zip(arr1_iter, arr2_iter))
arr3_list = [np.concatenate(arrs, axis=axis) for arrs in arrs_iter]
return arr3_list
[docs]def atleast_nd(arr, n, tofront=False):
r"""
View inputs as arrays with at least n dimensions.
TODO: Commit to numpy
Args:
arr (array_like): One array-like object. Non-array inputs are
converted to arrays. Arrays that already have n or more dimensions
are preserved.
n (int):
tofront (bool): if True new dims are added to the front of the array
CommandLine:
python -m vtool.other --exec-atleast_nd --show
Returns:
ndarray :
An array with ``a.ndim >= n``. Copies are avoided where possible,
and views with three or more dimensions are returned. For example,
a 1-D array of shape ``(N,)`` becomes a view of shape
``(1, N, 1)``, and a 2-D array of shape ``(M, N)`` becomes a view of shape
``(M, N, 1)``.
See Also:
atleast_1d, atleast_2d, atleast_3d
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> n = 2
>>> arr = np.array([1, 1, 1])
>>> arr_ = atleast_nd(arr, n)
>>> result = ub.repr2(arr_.tolist())
>>> print(result)
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> n = 4
>>> arr1 = [1, 1, 1]
>>> arr2 = np.array(0)
>>> arr3 = np.array([[[[[1]]]]])
>>> arr1_ = atleast_nd(arr1, n)
>>> arr2_ = atleast_nd(arr2, n)
>>> arr3_ = atleast_nd(arr3, n)
>>> result1 = ub.repr2(arr1_.tolist())
>>> result2 = ub.repr2(arr2_.tolist())
>>> result3 = ub.repr2(arr3_.tolist())
>>> result = '\n'.join([result1, result2, result3])
>>> print(result)
"""
arr_ = np.asanyarray(arr)
ndims = len(arr_.shape)
if n is not None and ndims < n:
# append the required number of dimensions to the end
if tofront:
expander = (None,) * (n - ndims) + (Ellipsis,)
else:
expander = (Ellipsis,) + (None,) * (n - ndims)
arr_ = arr_[expander]
return arr_
[docs]def ensure_shape(arr, dimshape):
"""
Ensures that an array takes a certain shape. The total size of the array
must not change.
Args:
arr (ndarray): array to change the shape of
dimshape (tuple): desired shape (Nones can be used to broadcast
dimensions)
Returns:
ndarray - the input array, which has been modified inplace.
CommandLine:
python -m vtool.other ensure_shape
Doctest:
>>> from vtool.other import * # NOQA
>>> arr = np.zeros((7, 7))
>>> dimshape = (None, None, 3)
>>> arr2 = ensure_shape(np.array([[1, 2]]), (None, 2))
>>> assert arr2.shape == (1, 2)
>>> arr3 = ensure_shape(np.array([]), (None, 2))
>>> assert arr3.shape == (0, 2)
"""
if isinstance(dimshape, tuple):
n = len(dimshape)
else:
n = dimshape
dimshape = None
arr_ = atleast_nd(arr, n)
if dimshape is not None:
newshape = tuple(
[d1 if d2 is None else d2 for d1, d2 in zip(arr_.shape, dimshape)]
)
arr_.shape = newshape
return arr_
[docs]def significant_shape(arr):
"""find the shape without trailing 1's"""
sig_dim = 0
for i, dim in enumerate(arr.shape, start=1):
if dim != 1:
sig_dim = i
sig_shape = arr.shape[0:sig_dim]
return sig_shape
[docs]def atleast_shape(arr, dimshape):
"""
Ensures that an array takes a certain shape. The total size of the array
must not change.
Args:
arr (ndarray): array to change the shape of
dimshape (tuple): desired shape (Nones can be used to broadcast
dimensions)
Returns:
ndarray - the input array, which has been modified inplace.
CommandLine:
python -m vtool.other ensure_shape
Doctest:
>>> from vtool.other import * # NOQA
>>> arr = np.zeros((7, 7))
>>> assert atleast_shape(arr, (1, 1, 3,)).shape == (7, 7, 3)
>>> assert atleast_shape(arr, (1, 1, 2, 4,)).shape == (7, 7, 2, 4)
>>> assert atleast_shape(arr, (1, 1,)).shape == (7, 7,)
>>> assert atleast_shape(arr, (1, 1, 1)).shape == (7, 7, 1)
>>> assert atleast_shape(np.zeros(()), (1,)).shape == (1,)
>>> assert atleast_shape(np.zeros(()), tuple()).shape == tuple()
>>> assert atleast_shape(np.zeros(()), (1, 2, 3,)).shape == (1, 2, 3)
>>> ut.assert_raises(ValueError, atleast_shape, arr, (2, 2))
>>> assert atleast_shape(np.zeros((7, 7, 3)), (1, 1, 3)).shape == (7, 7, 3)
>>> ut.assert_raises(ValueError, atleast_shape, np.zeros((7, 7, 3)), (1, 1, 4))
"""
n = len(dimshape)
sig_shape = significant_shape(arr)
if n < len(sig_shape):
raise ValueError(
'len(dimshape)={} must be >= than '
'len(significant_shape(arr)={})'.format(n, sig_shape)
)
arr_ = atleast_nd(arr, n)
for d1, d2 in zip(arr_.shape, dimshape):
if d2 > 1 and d1 != 1 and d1 != d2:
raise ValueError('cannot broadcast {} to {}'.format(arr_.shape, dimshape))
reps = tuple(
1 if d2 is None or (d1 == d2) else d2 for d1, d2 in zip(arr_.shape, dimshape)
)
arr_ = np.tile(arr_, reps)
return arr_
[docs]def atleast_3channels(arr, copy=True):
r"""
Ensures that there are 3 channels in the image
Args:
arr (ndarray[N, M, ...]): the image
copy (bool): Always copies if True, if False, then copies only when the
size of the array must change.
Returns:
ndarray: with shape (N, M, C), where C in {3, 4}
CommandLine:
python -m vtool.other atleast_3channels
Doctest:
>>> from vtool.image import * # NOQA
>>> import vtool as vt
>>> assert atleast_3channels(np.zeros((10, 10))).shape[-1] == 3
>>> assert atleast_3channels(np.zeros((10, 10, 1))).shape[-1] == 3
>>> assert atleast_3channels(np.zeros((10, 10, 3))).shape[-1] == 3
>>> assert atleast_3channels(np.zeros((10, 10, 4))).shape[-1] == 4
"""
# atleast_shape(arr, (None, None, 3))
ndims = len(arr.shape)
if ndims == 2:
res = np.tile(arr[:, :, None], 3)
return res
elif ndims == 3:
h, w, c = arr.shape
if c == 1:
res = np.tile(arr, 3)
elif c in [3, 4]:
res = arr.copy() if copy else arr
else:
raise ValueError('Cannot handle ndims={}'.format(ndims))
else:
raise ValueError('Cannot handle arr.shape={}'.format(arr.shape))
return res
[docs]def iter_reduce_ufunc(ufunc, arr_iter, out=None):
"""
constant memory iteration and reduction
applys ufunc from left to right over the input arrays
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> arr_list = [
... np.array([0, 1, 2, 3, 8, 9]),
... np.array([4, 1, 2, 3, 4, 5]),
... np.array([0, 5, 2, 3, 4, 5]),
... np.array([1, 1, 6, 3, 4, 5]),
... np.array([0, 1, 2, 7, 4, 5])
... ]
>>> memory = np.array([9, 9, 9, 9, 9, 9])
>>> gen_memory = memory.copy()
>>> def arr_gen(arr_list, gen_memory):
... for arr in arr_list:
... gen_memory[:] = arr
... yield gen_memory
>>> print('memory = %r' % (memory,))
>>> print('gen_memory = %r' % (gen_memory,))
>>> ufunc = np.maximum
>>> res1 = iter_reduce_ufunc(ufunc, iter(arr_list), out=None)
>>> res2 = iter_reduce_ufunc(ufunc, iter(arr_list), out=memory)
>>> res3 = iter_reduce_ufunc(ufunc, arr_gen(arr_list, gen_memory), out=memory)
>>> print('res1 = %r' % (res1,))
>>> print('res2 = %r' % (res2,))
>>> print('res3 = %r' % (res3,))
>>> print('memory = %r' % (memory,))
>>> print('gen_memory = %r' % (gen_memory,))
>>> assert np.all(res1 == res2)
>>> assert np.all(res2 == res3)
"""
# Get first item in iterator
try:
initial = next(arr_iter)
except StopIteration:
return
# Populate the outvariable if specified otherwise make a copy of the first
# item to be the output memory
if out is not None:
out[:] = initial
else:
out = initial.copy()
# Iterate and reduce
for arr in arr_iter:
ufunc(out, arr, out=out)
return out
[docs]def clipnorm(arr, min_, max_, out=None):
r"""
normalizes arr to the range 0 to 1 using min\_ and max\_ as clipping bounds
"""
if max_ == 1 and min_ == 0:
if out is not None:
out[:] = arr
else:
out = arr.copy()
return out
out_args = tuple() if out is None else (out,)
arr_ = np.subtract(arr, min_, *out_args)
arr_ = np.divide(arr_, max_ - min_, *out_args)
arr_ = np.clip(arr_, 0.0, 1.0, *out_args)
return arr_
[docs]def intersect1d_reduce(arr_list, assume_unique=False):
arr_iter = iter(arr_list)
out = next(arr_iter)
for arr in arr_iter:
out = np.intersect1d(out, arr, assume_unique=assume_unique)
return out
[docs]def componentwise_dot(arr1, arr2):
"""
a dot product is a componentwise multiplication of
two vector and then a sum.
Args:
arr1 (ndarray)
arr2 (ndarray):
Returns:
ndarray: cosangle
Example:
>>> # DISABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> np.random.seed(0)
>>> arr1 = np.random.rand(3, 128)
>>> arr1 = arr1 / np.linalg.norm(arr1, axis=1)[:, None]
>>> arr2 = arr1
>>> cosangle = componentwise_dot(arr1, arr2)
>>> result = str(cosangle)
>>> print(result)
[ 1. 1. 1.]
"""
cosangle = np.multiply(arr1, arr2).sum(axis=-1).T
return cosangle
[docs]def intersect2d_indices(A, B):
r"""
Args:
A (ndarray[ndims=2]):
B (ndarray[ndims=2]):
Returns:
tuple: (ax_list, bx_list)
CommandLine:
python -m vtool.other --test-intersect2d_indices
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> # build test data
>>> A = np.array([[ 158, 171], [ 542, 297], [ 955, 1113], [ 255, 1254], [ 976, 1255], [ 170, 1265]])
>>> B = np.array([[ 117, 211], [ 158, 171], [ 255, 1254], [ 309, 328], [ 447, 1148], [ 750, 357], [ 976, 1255]])
>>> # execute function
>>> (ax_list, bx_list) = intersect2d_indices(A, B)
>>> # verify results
>>> result = str((ax_list, bx_list))
>>> print(result)
"""
flag_list1, flag_list2 = intersect2d_flags(A, B)
ax_list = np.flatnonzero(flag_list1)
bx_list = np.flatnonzero(flag_list2)
return ax_list, bx_list
[docs]def intersect2d_flags(A, B):
r"""
Checks intersection of rows of A against rows of B
Args:
A (ndarray[ndims=2]):
B (ndarray[ndims=2]):
Returns:
tuple: (flag_list1, flag_list2)
CommandLine:
python -m vtool.other --test-intersect2d_flags
SeeAlso:
np.in1d - the one dimensional version
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> A = np.array([[609, 307], [ 95, 344], [ 1, 690]])
>>> B = np.array([[ 422, 1148], [ 422, 968], [ 481, 1148], [ 750, 1132], [ 759, 159]])
>>> (flag_list1, flag_list2) = intersect2d_flags(A, B)
>>> result = str((flag_list1, flag_list2))
>>> print(result)
"""
A_, B_, C_ = intersect2d_structured_numpy(A, B)
flag_list1 = flag_intersection(A_, C_)
flag_list2 = flag_intersection(B_, C_)
return flag_list1, flag_list2
[docs]def flag_intersection(arr1, arr2):
r"""
Flags the rows in `arr1` that contain items in `arr2`
Returns:
ndarray: flags where len(flags) == len(arr1)
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> arr1 = np.array([0, 1, 2, 3, 4, 5])
>>> arr2 = np.array([2, 6, 4])
>>> flags = flag_intersection(arr1, arr2)
>>> assert len(flags) == len(arr1)
>>> result = ('flags = %s' % (ub.repr2(flags),))
>>> print(result)
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> import vtool as vt
>>> arr1 = np.array([[0, 0], [0, 1], [0, 2], [0, 3], [0, 4], [0, 5]])
>>> arr2 = np.array([[0, 2], [0, 6], [0, 4], [3, 0]])
>>> arr1, arr2 = vt.structure_rows(arr1, arr2)
>>> flags = flag_intersection(arr1, arr2)
>>> assert len(flags) == len(arr1)
>>> result = ('flags = %s' % (ub.repr2(flags),))
>>> print(result)
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> arr1 = np.array([0, 1, 2, 3, 4, 5])
>>> arr2 = np.array([])
>>> flags = flag_intersection(arr1, arr2)
>>> assert len(flags) == len(arr1)
>>> flags = flag_intersection(np.array([]), np.array([2, 6, 4]))
>>> assert len(flags) == 0
Ignore:
>>> setup = ut.codeblock(
>>> r'''
import vtool as vt
import numpy as np
rng = np.random.RandomState(0)
arr1 = rng.randint(0, 100, 100000).reshape(-1, 2)
arr2 = rng.randint(0, 100, 1000).reshape(-1, 2)
arr1_, arr2_ = vt.structure_rows(arr1, arr2)
''')
>>> stmt_list = ut.codeblock(
>>> '''
np.array([row in arr2_ for row in arr1_])
np.logical_or.reduce([arr1_ == row_ for row_ in arr2_]).ravel()
vt.iter_reduce_ufunc(np.logical_or, (arr1_ == row_ for row_ in arr2_)).ravel()
''').split('\n')
>>> out = ut.timeit_compare(stmt_list, setup=setup, iterations=3)
"""
import vtool as vt
if arr1.size == 0 or arr2.size == 0:
flags = np.full(arr1.shape[0], False, dtype=np.bool_)
# return np.empty((0,), dtype=np.bool_)
else:
# flags = np.logical_or.reduce([arr1 == row for row in arr2]).T[0]
flags = vt.iter_reduce_ufunc(
np.logical_or, (arr1 == row_ for row_ in arr2)
).ravel()
return flags
[docs]def structure_rows(*arrs):
r"""
CommandLine:
python -m vtool.other structure_rows
SeeAlso:
unstructure_rows
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> arr1 = np.array([[609, 307], [ 95, 344], [ 1, 690]])
>>> arr2 = np.array([[ 422, 1148], [ 422, 968], [ 481, 1148], [ 750, 1132], [ 759, 159]])
>>> arrs = (arr1, arr2)
>>> structured_arrs = structure_rows(*arrs)
>>> unstructured_arrs = unstructure_rows(*structured_arrs)
>>> assert np.all(unstructured_arrs[0] == arrs[0])
>>> assert np.all(unstructured_arrs[1] == arrs[1])
>>> union_ = np.union1d(*structured_arrs)
>>> union, = unstructure_rows(union_)
>>> assert len(union.shape) == 2
"""
arr0 = arrs[0]
ncols = arr0.shape[1]
dtype = {
'names': ['f%d' % (i,) for i in range(ncols)],
'formats': ncols * [arr0.dtype],
}
for arr in arrs:
assert len(arr.shape) == 2, 'arrays must be 2d'
assert arr.dtype == arr0.dtype, 'arrays must share the same dtype'
assert arr.shape[1] == ncols, 'arrays must share column shape'
structured_arrs = []
for arr in arrs:
arr_ = np.ascontiguousarray(arr).view(dtype)
structured_arrs.append(arr_)
return structured_arrs
[docs]def unstructure_rows(*structured_arrs):
r"""
SeeAlso:
structure_rows
"""
# TODO: assert arr.dtype.fields are all the same type
unstructured_arrs = [
arr.view(list(arr.dtype.fields.values())[0][0]) for arr in structured_arrs
]
unstructured_arrs = []
for arr_ in structured_arrs:
dtype = list(arr_.dtype.fields.values())[0][0]
arr = arr_.view(dtype).reshape(-1, 2)
unstructured_arrs.append(arr)
return unstructured_arrs
[docs]def intersect2d_structured_numpy(arr1, arr2, assume_unique=False):
r"""
Args:
arr1: unstructured 2d array
arr2: unstructured 2d array
Returns:
A\_, B\_, C\_ - structured versions of arr1, and arr2, and their structured intersection
References:
http://stackoverflow.com/questions/16970982/find-unique-rows-in-numpy-array
http://stackoverflow.com/questions/8317022/get-intersecting-rows-across-two-2d-numpy-arrays
"""
ncols = arr1.shape[1]
assert (
arr1.dtype == arr2.dtype
), 'arr1 and arr2 must have the same dtypes.' 'arr1.dtype=%r, arr2.dtype=%r' % (
arr1.dtype,
arr2.dtype,
)
# [('f%d' % i, arr1.dtype) for i in range(ncols)]
# dtype = np.dtype([('f%d' % i, arr1.dtype) for i in range(ncols)])
# dtype = {'names': ['f{}'.format(i) for i in range(ncols)],
# 'formats': ncols * [arr1.dtype]}
dtype = {
'names': ['f%d' % (i,) for i in range(ncols)],
'formats': ncols * [arr1.dtype],
}
# try:
A_ = np.ascontiguousarray(arr1).view(dtype)
B_ = np.ascontiguousarray(arr2).view(dtype)
C_ = np.intersect1d(A_, B_, assume_unique=assume_unique)
# C = np.intersect1d(arr1.view(dtype),
# arr2.view(dtype),
# assume_unique=assume_unique)
# except ValueError:
# C = np.intersect1d(A.copy().view(dtype),
# B.copy().view(dtype),
# assume_unique=assume_unique)
return A_, B_, C_
[docs]def intersect2d_numpy(A, B, assume_unique=False, return_indices=False):
"""
References:
http://stackoverflow.com/questions/8317022/get-intersecting-rows-across-two-2d-numpy-arrays/8317155#8317155
Args:
A (ndarray[ndims=2]):
B (ndarray[ndims=2]):
assume_unique (bool):
Returns:
ndarray[ndims=2]: C
CommandLine:
python -m vtool.other --test-intersect2d_numpy
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> # build test data
>>> A = np.array([[ 0, 78, 85, 283, 396, 400, 403, 412, 535, 552],
... [152, 98, 32, 260, 387, 285, 22, 103, 55, 261]]).T
>>> B = np.array([[403, 85, 412, 85, 815, 463, 613, 552],
... [ 22, 32, 103, 116, 188, 199, 217, 254]]).T
>>> assume_unique = False
>>> # execute function
>>> C, Ax, Bx = intersect2d_numpy(A, B, return_indices=True)
>>> # verify results
>>> result = str((C.T, Ax, Bx))
>>> print(result)
(array([[ 85, 403, 412],
[ 32, 22, 103]]), array([2, 6, 7]), array([0, 1, 2]))
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> A = np.array([[1, 2, 3], [1, 1, 1]])
>>> B = np.array([[1, 2, 3], [1, 2, 14]])
>>> C, Ax, Bx = intersect2d_numpy(A, B, return_indices=True)
>>> result = str((C, Ax, Bx))
>>> print(result)
(array([[1, 2, 3]]), array([0]), array([0]))
"""
nrows, ncols = A.shape
A_, B_, C_ = intersect2d_structured_numpy(A, B, assume_unique)
# This last bit is optional if you're okay with "C" being a structured array...
C = C_.view(A.dtype).reshape(-1, ncols)
if return_indices:
ax_list = np.flatnonzero(flag_intersection(A_, C_))
bx_list = np.flatnonzero(flag_intersection(B_, C_))
return C, ax_list, bx_list
else:
return C
[docs]def nearest_point(x, y, pts, mode='random'):
"""finds the nearest point(s) in pts to (x, y)"""
dists = (pts.T[0] - x) ** 2 + (pts.T[1] - y) ** 2
fx = dists.argmin()
mindist = dists[fx]
other_fx = np.where(mindist == dists)[0]
if len(other_fx) > 0:
if mode == 'random':
np.random.shuffle(other_fx)
fx = other_fx[0]
if mode == 'all':
fx = other_fx
if mode == 'first':
fx = fx
return fx, mindist
[docs]def get_uncovered_mask(covered_array, covering_array):
r"""
Args:
covered_array (ndarray):
covering_array (ndarray):
Returns:
ndarray: flags
CommandLine:
python -m vtool.other --test-get_uncovered_mask
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> covered_array = [1, 2, 3, 4, 5]
>>> covering_array = [2, 4, 5]
>>> flags = get_uncovered_mask(covered_array, covering_array)
>>> result = str(flags)
>>> print(result)
[ True False True False False]
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> covered_array = [1, 2, 3, 4, 5]
>>> covering_array = []
>>> flags = get_uncovered_mask(covered_array, covering_array)
>>> result = str(flags)
>>> print(result)
[ True True True True True]
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> covered_array = np.array([
... [1, 2, 3],
... [4, 5, 6],
... [7, 8, 9],
... ], dtype=np.int32)
>>> covering_array = [2, 4, 5]
>>> flags = get_uncovered_mask(covered_array, covering_array)
>>> result = ub.repr2(flags, with_dtype=True)
>>> print(result)
np.array([[ True, False, True],
[False, False, True],
[ True, True, True]], dtype=np.bool)
Ignore:
covering_array = [1, 2, 3, 4, 5, 6, 7]
%timeit get_uncovered_mask(covered_array, covering_array)
100000 loops, best of 3: 18.6 µs per loop
%timeit get_uncovered_mask2(covered_array, covering_array)
100000 loops, best of 3: 16.9 µs per loop
"""
import vtool as vt
if len(covering_array) == 0:
return np.ones(np.shape(covered_array), dtype=np.bool_)
else:
flags_iter = (np.not_equal(covered_array, item) for item in covering_array)
mask_array = vt.iter_reduce_ufunc(np.logical_and, flags_iter)
return mask_array
# if len(covering_array) == 0:
# return np.ones(np.shape(covered_array), dtype=np.bool_)
# else:
# flags_list = (np.not_equal(covered_array, item) for item in covering_array)
# mask_array = and_lists(*flags_list)
# return mask_array
# def get_uncovered_mask2(covered_array, covering_array):
# if len(covering_array) == 0:
# return np.ones(np.shape(covered_array), dtype=np.bool_)
# else:
# flags_iter = (np.not_equal(covered_array, item) for item in covering_array)
# mask_array = vt.iter_reduce_ufunc(np.logical_and, flags_iter)
# return mask_array
[docs]def get_covered_mask(covered_array, covering_array):
return ~get_uncovered_mask(covered_array, covering_array)
[docs]def mult_lists(*args):
return np.multiply.reduce(args)
[docs]def or_lists(*args):
"""
Like np.logical_and, but can take more than 2 arguments
SeeAlso:
and_lists
"""
flags = np.logical_or.reduce(args)
return flags
[docs]def and_lists(*args):
"""
Like np.logical_and, but can take more than 2 arguments
CommandLine:
python -m vtool.other --test-and_lists
SeeAlso:
or_lists
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> arg1 = np.array([1, 1, 1, 1,])
>>> arg2 = np.array([1, 1, 0, 1,])
>>> arg3 = np.array([0, 1, 0, 1,])
>>> args = (arg1, arg2, arg3)
>>> flags = and_lists(*args)
>>> result = str(flags)
>>> print(result)
[False True False True]
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> size = 10000
>>> rng = np.random.RandomState(0)
>>> arg1 = rng.randint(2, size=size)
>>> arg2 = rng.randint(2, size=size)
>>> arg3 = rng.randint(2, size=size)
>>> args = (arg1, arg2, arg3)
>>> flags = and_lists(*args)
>>> # ensure equal division
>>> segments = 5
>>> validx = np.where(flags)[0]
>>> endx = int(segments * (validx.size // (segments)))
>>> parts = np.split(validx[:endx], segments)
>>> result = str(list(map(np.sum, parts)))
>>> print(result)
[243734, 714397, 1204989, 1729375, 2235191]
%timeit reduce(np.logical_and, args)
%timeit np.logical_and.reduce(args) # wins with more data
"""
return np.logical_and.reduce(args)
[docs]def rowwise_operation(arr1, arr2, op):
"""
DEPRICATE THIS IS POSSIBLE WITH STRICTLY BROADCASTING AND
USING np.newaxis
DEPRICATE, numpy has better ways of doing this.
Is the rowwise name correct? Should it be colwise?
performs an operation between an
(N x A x B ... x Z) array with an
(N x 1) array
"""
# FIXME: not sure this is the correct terminology
assert arr1.shape[0] == arr2.shape[0]
broadcast_dimensions = arr1.shape[1:] # need padding for
tileshape = tuple(list(broadcast_dimensions) + [1])
arr2_ = np.rollaxis(np.tile(arr2, tileshape), -1)
rowwise_result = op(arr1, arr2_)
return rowwise_result
[docs]def colwise_operation(arr1, arr2, op):
arr1T = arr1.T
arr2T = arr2.T
rowwise_result = rowwise_operation(arr1T, arr2T, op)
colwise_result = rowwise_result.T
return colwise_result
[docs]def compare_matrix_columns(matrix, columns, comp_op=np.equal, logic_op=np.logical_or):
"""
REPLACE WITH:
qfx2_invalid = logic_op.reduce([comp_op([:, None], qfx2_normnid) for col1 in qfx2_topnid.T])
"""
# FIXME: Generalize
# row_matrix = matrix.T
# row_list = columns.T
return compare_matrix_to_rows(
matrix.T, columns.T, comp_op=comp_op, logic_op=logic_op
).T
[docs]def compare_matrix_to_rows(
row_matrix, row_list, comp_op=np.equal, logic_op=np.logical_or
):
"""
Compares each row in row_list to each row in row matrix using comp_op
Both must have the same number of columns.
Performs logic_op on the results of each individual row
SeeAlso:
wbia.algo.hots.nn_weights.mark_name_valid_normalizers
compop = np.equal
logic_op = np.logical_or
"""
row_result_list = [
np.array([comp_op(matrow, row) for matrow in row_matrix]) for row in row_list
]
output = row_result_list[0]
for row_result in row_result_list[1:]:
logic_op(output, row_result, out=output)
# output = logic_op(output, row_result)
return output
[docs]def norm01(array, dim=None):
"""
normalizes a numpy array from 0 to 1 based in its extent
Args:
array (ndarray):
dim (int):
Returns:
ndarray:
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> array = np.array([ 22, 1, 3, 2, 10, 42, ])
>>> dim = None
>>> array_norm = norm01(array, dim)
>>> result = ub.repr2(array_norm, precision=3)
>>> print(result)
"""
if not ut.is_float(array):
array = array.astype(np.float32)
array_max = array.max(dim)
array_min = array.min(dim)
array_exnt = np.subtract(array_max, array_min)
array_norm = np.divide(np.subtract(array, array_min), array_exnt)
return array_norm
[docs]def weighted_geometic_mean_unnormalized(data, weights):
import vtool as vt
terms = [x ** w for x, w in zip(data, weights)]
termprod = vt.iter_reduce_ufunc(np.multiply, iter(terms))
return termprod
[docs]def weighted_geometic_mean(data, weights):
r"""
Args:
data (list of ndarrays):
weights (ndarray):
Returns:
ndarray
CommandLine:
python -m vtool.other --test-weighted_geometic_mean
References:
https://en.wikipedia.org/wiki/Weighted_geometric_mean
SeeAlso:
scipy.stats.mstats.gmean
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> data = [.9, .5]
>>> weights = np.array([1.0, .5])
>>> gmean_ = weighted_geometic_mean(data, weights)
>>> result = ('gmean_ = %.3f' % (gmean_,))
>>> print(result)
gmean_ = 0.740
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> rng = np.random.RandomState(0)
>>> img1 = rng.rand(4, 4)
>>> img2 = rng.rand(4, 4)
>>> data = [img1, img2]
>>> weights = np.array([.5, .5])
>>> gmean_ = weighted_geometic_mean(data, weights)
>>> result = ub.hzcat(['gmean_ = %s' % (ub.repr2(gmean_, precision=2, with_dtype=True), )])
>>> print(result)
Ignore:
res1 = ((img1 ** .5 * img2 ** .5)) ** 1
res2 = np.sqrt(img1 * img2)
"""
import vtool as vt
terms = [np.asarray(x ** w) for x, w in zip(data, weights)]
termprod = vt.iter_reduce_ufunc(np.multiply, iter(terms))
exponent = 1 / np.sum(weights)
gmean_ = termprod ** exponent
return gmean_
[docs]def grab_webcam_image():
"""
References:
http://opencv-python-tutroals.readthedocs.org/en/latest/py_tutorials/py_gui/py_video_display/py_video_display.html
CommandLine:
python -m vtool.other --test-grab_webcam_image --show
Example:
>>> # SCRIPT
>>> from vtool.other import * # NOQA
>>> import vtool as vt
>>> img = grab_webcam_image()
>>> # xdoctest: +REQUIRES(--show)
>>> import wbia.plottool as pt
>>> pt.imshow(img)
>>> vt.imwrite('webcap.jpg', img)
>>> ut.show_if_requested()
"""
import cv2
cap = cv2.VideoCapture(0)
# Capture frame-by-frame
ret, img = cap.read()
# When everything done, release the capture
cap.release()
return img
# def xor_swap(arr1, arr2, inplace=True):
# if not inplace:
# arr1 = arr1.copy()
# arr2 = arr2.copy()
# np.bitwise_xor(arr1, arr2, out=arr1)
# np.bitwise_xor(arr1, arr2, out=arr2)
# np.bitwise_xor(arr1, arr2, out=arr1)
# return arr1, arr2
[docs]def find_first_true_indices(flags_list):
"""
TODO: move to vtool
returns a list of indexes where the index is the first True position
in the corresponding sublist or None if it does not exist
in other words: for each row finds the smallest True column number or None
Args:
flags_list (list): list of lists of booleans
CommandLine:
python -m utool.util_list --test-find_first_true_indices
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> # build test data
>>> flags_list = [[True, False, True],
... [False, False, False],
... [False, True, True],
... [False, False, True]]
>>> # execute function
>>> index_list = find_first_true_indices(flags_list)
>>> # verify results
>>> result = str(index_list)
>>> print(result)
[0, None, 1, 2]
"""
def tryget_fisrt_true(flags):
index_list = np.where(flags)[0]
index = None if len(index_list) == 0 else index_list[0]
return index
index_list = [tryget_fisrt_true(flags) for flags in flags_list]
return index_list
[docs]def find_k_true_indicies(flags_list, k):
r"""
Uses output of either this function or find_first_true_indices
to find the next index of true flags
Args:
flags_list (list): list of lists of booleans
CommandLine:
python -m utool.util_list --test-find_next_true_indices
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> flags_list = [[False, False, True],
... [False, False, False],
... [False, True, True],
... [True, True, True]]
>>> k = 2
>>> indices = find_k_true_indicies(flags_list, k)
>>> result = str(indices)
>>> print(result)
[array([2]), None, array([1, 2]), array([0, 1])]
"""
if False:
import vtool as vt
flags_list = np.array(flags_list)
rowxs, colxs = np.where(flags_list)
first_k_groupxs = [groupx[0:k] for groupx in vt.group_indices(rowxs)[1]]
chosen_xs = np.hstack(first_k_groupxs)
flat_xs = np.ravel_multi_index(
(rowxs.take(chosen_xs), colxs.take(chosen_xs)), flags_list.shape
)
flat_xs
def tryget_k_true(flags):
index_list = np.where(flags)[0]
index = None if len(index_list) == 0 else index_list[0:k]
return index
index_list = [tryget_k_true(flags) for flags in flags_list]
return index_list
[docs]def find_next_true_indices(flags_list, offset_list):
r"""
Uses output of either this function or find_first_true_indices
to find the next index of true flags
Args:
flags_list (list): list of lists of booleans
CommandLine:
python -m utool.util_list --test-find_next_true_indices
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> # build test data
>>> flags_list = [[True, False, True],
... [False, False, False],
... [False, True, True],
... [False, False, True]]
>>> offset_list = find_first_true_indices(flags_list)
>>> # execute function
>>> index_list = find_next_true_indices(flags_list, offset_list)
>>> # verify results
>>> result = str(index_list)
>>> print(result)
[2, None, 2, None]
"""
def tryget_next_true(flags, offset_):
offset = offset_ + 1
relative_flags = flags[offset:]
rel_index_list = np.where(relative_flags)[0]
index = None if len(rel_index_list) == 0 else rel_index_list[0] + offset
return index
index_list = [
None if offset is None else tryget_next_true(flags, offset)
for flags, offset in zip(flags_list, offset_list)
]
return index_list
[docs]def ensure_rng(seed=None):
"""
Returns a numpy random number generator given a seed.
"""
if seed is None:
rng = np.random
elif isinstance(seed, np.random.RandomState):
rng = seed
else:
rng = np.random.RandomState(seed)
return rng
[docs]def safe_extreme(arr, op, fill=np.nan, finite=False, nans=True):
"""
Applies an exterme operation to an 1d array (typically max/min) but ensures
a value is always returned even in operations without identities. The
default identity must be specified using the `fill` argument.
Args:
arr (ndarray): 1d array to take extreme of
op (func): vectorized operation like np.max to apply to array
fill (float): return type if arr has no elements (default = nan)
finite (bool): if True ignores non-finite values (default = False)
nans (bool): if False ignores nans (default = True)
"""
if arr is None:
extreme = fill
else:
arr = np.asarray(arr)
if finite:
arr = arr.compress(np.isfinite(arr))
if not nans:
arr = arr.compress(np.logical_not(np.isnan(arr)))
if len(arr) == 0:
extreme = fill
else:
extreme = op(arr)
return extreme
[docs]def safe_argmax(arr, fill=np.nan, finite=False, nans=True):
"""
Doctest:
>>> from vtool.other import *
>>> assert safe_argmax([np.nan, np.nan], nans=False) == 0
>>> assert safe_argmax([-100, np.nan], nans=False) == 0
>>> assert safe_argmax([np.nan, -100], nans=False) == 1
>>> assert safe_argmax([-100, 0], nans=False) == 1
>>> assert np.isnan(safe_argmax([]))
"""
if len(arr) == 0:
return fill
extreme = safe_max(arr, fill=fill, finite=finite, nans=nans)
if np.isnan(extreme):
arg_extreme = np.where(np.isnan(arr))[0][0]
else:
arg_extreme = np.where(arr == extreme)[0][0]
return arg_extreme
[docs]def safe_max(arr, fill=np.nan, finite=False, nans=True):
r"""
Args:
arr (ndarray): 1d array to take max of
fill (float): return type if arr has no elements (default = nan)
finite (bool): if True ignores non-finite values (default = False)
nans (bool): if False ignores nans (default = True)
CommandLine:
python -m vtool.other safe_max --show
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> arrs = [[], [np.nan], [-np.inf, np.nan, np.inf], [np.inf], [np.inf, 1], [0, 1]]
>>> arrs = [np.array(arr) for arr in arrs]
>>> fill = np.nan
>>> results1 = [safe_max(arr, fill, finite=False, nans=True) for arr in arrs]
>>> results2 = [safe_max(arr, fill, finite=True, nans=True) for arr in arrs]
>>> results3 = [safe_max(arr, fill, finite=True, nans=False) for arr in arrs]
>>> results4 = [safe_max(arr, fill, finite=False, nans=False) for arr in arrs]
>>> results = [results1, results2, results3, results4]
>>> result = ('results = %s' % (ub.repr2(results, nl=1),))
>>> print(result)
results = [
[float('nan'), float('nan'), float('nan'), float('inf'), float('inf'), 1],
[float('nan'), float('nan'), float('nan'), float('nan'), 1.0, 1],
[float('nan'), float('nan'), float('nan'), float('nan'), 1.0, 1],
[float('nan'), float('nan'), float('inf'), float('inf'), float('inf'), 1],
]
"""
return safe_extreme(arr, np.max, fill, finite, nans)
[docs]def safe_min(arr, fill=np.nan, finite=False, nans=True):
"""
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> arrs = [[], [np.nan], [-np.inf, np.nan, np.inf], [np.inf], [np.inf, 1], [0, 1]]
>>> arrs = [np.array(arr) for arr in arrs]
>>> fill = np.nan
>>> results1 = [safe_min(arr, fill, finite=False, nans=True) for arr in arrs]
>>> results2 = [safe_min(arr, fill, finite=True, nans=True) for arr in arrs]
>>> results3 = [safe_min(arr, fill, finite=True, nans=False) for arr in arrs]
>>> results4 = [safe_min(arr, fill, finite=False, nans=False) for arr in arrs]
>>> results = [results1, results2, results3, results4]
>>> result = ('results = %s' % (ub.repr2(results, nl=1),))
>>> print(result)
results = [
[float('nan'), float('nan'), float('nan'), float('inf'), 1.0, 0],
[float('nan'), float('nan'), float('nan'), float('nan'), 1.0, 0],
[float('nan'), float('nan'), float('nan'), float('nan'), 1.0, 0],
[float('nan'), float('nan'), float('-inf'), float('inf'), 1.0, 0],
]
"""
return safe_extreme(arr, np.min, fill, finite, nans)
[docs]def safe_div(a, b):
return None if a is None or b is None else a / b
[docs]def multigroup_lookup_naive(lazydict, keys_list, subkeys_list, custom_func):
r"""
Slow version of multigroup_lookup. Makes a call to custom_func for each
item in zip(keys_list, subkeys_list).
SeeAlso:
vt.multigroup_lookup
"""
data_lists = []
for keys, subkeys in zip(keys_list, subkeys_list):
subvals_list = [
custom_func(lazydict, key, [subkey])[0] for key, subkey in zip(keys, subkeys)
]
data_lists.append(subvals_list)
return data_lists
[docs]def multigroup_lookup(lazydict, keys_list, subkeys_list, custom_func):
r"""
Efficiently calls custom_func for each item in zip(keys_list, subkeys_list)
by grouping subkeys to minimize the number of calls to custom_func.
We are given multiple lists of keys, and subvals.
The goal is to group the subvals by keys and apply the subval lookups
(a call to a function) to the key only once and at the same time.
Args:
lazydict (dict of utool.LazyDict):
keys_list (list):
subkeys_list (list):
custom_func (func): must have signature custom_func(lazydict, key, subkeys)
SeeAlso:
vt.multigroup_lookup_naive - unoptomized version, but simple to read
Example:
>>> # SLOW_DOCTEST
>>> # xdoctest: +SKIP
>>> from vtool.other import * # NOQA
>>> import vtool as vt
>>> fpath_list = [ut.grab_test_imgpath(key) for key in ut.util_grabdata.get_valid_test_imgkeys()]
>>> lazydict = {count: vt.testdata_annot_metadata(fpath) for count, fpath in enumerate(fpath_list)}
>>> aids_list = np.array([(3, 2), (0, 2), (1, 2), (2, 3)])
>>> fms = np.array([[2, 5], [2, 3], [2, 1], [3, 4]])
>>> keys_list = aids_list.T
>>> subkeys_list = fms.T
>>> def custom_func(lazydict, key, subkeys):
>>> annot = lazydict[key]
>>> kpts = annot['kpts']
>>> rchip = annot['rchip']
>>> kpts_m = kpts.take(subkeys, axis=0)
>>> warped_patches = vt.get_warped_patches(rchip, kpts_m)[0]
>>> return warped_patches
>>> data_lists1 = multigroup_lookup(lazydict, keys_list, subkeys_list, custom_func)
>>> data_lists2 = multigroup_lookup_naive(lazydict, keys_list, subkeys_list, custom_func)
>>> vt.sver_c_wrapper.asserteq(data_lists1, data_lists2)
Example:
>>> keys_list = [np.array([]), np.array([]), np.array([])]
>>> subkeys_list = [np.array([]), np.array([]), np.array([])]
"""
import vtool as vt
# Group the keys in each multi-list individually
multi_groups = [vt.group_indices(keys) for keys in keys_list]
# Combine keys across multi-lists usings a dict_stack
dict_list = [dict(zip(k, v)) for k, v in multi_groups]
nested_order = ut.dict_stack2(dict_list, default=[])
# Use keys and values for explicit ordering
group_key_list = list(nested_order.keys())
if len(group_key_list) == 0:
return multigroup_lookup_naive(lazydict, keys_list, subkeys_list, custom_func)
group_subxs_list = list(nested_order.values())
# Extract unique and flat subkeys.
# Maintain an information to invert back into multi-list form
group_uf_subkeys_list = []
group_invx_list = []
group_cumsum_list = []
for key, subxs in zip(group_key_list, group_subxs_list):
# Group subkeys for each key
subkey_group = vt.ziptake(subkeys_list, subxs, axis=0)
flat_subkeys, group_cumsum = ut.invertible_flatten2(subkey_group)
unique_subkeys, invx = np.unique(flat_subkeys, return_inverse=True)
# Append info
group_uf_subkeys_list.append(unique_subkeys)
group_invx_list.append(invx)
group_cumsum_list.append(group_cumsum)
# Apply custom function (lookup) to unique each key and its flat subkeys
group_subvals_list = [
custom_func(lazydict, key, subkeys)
for key, subkeys in zip(group_key_list, group_uf_subkeys_list)
]
# Efficiently invert values back into input shape
# First invert the subkey groupings
multi_subvals_list = [[] for _ in range(len(multi_groups))]
_iter = zip(group_key_list, group_subvals_list, group_cumsum_list, group_invx_list)
for key, subvals, group_cumsum, invx in _iter:
nonunique_subvals = list(ub.take(subvals, invx))
unflat_subvals_list = ut.unflatten2(nonunique_subvals, group_cumsum)
for subvals_list, unflat_subvals in zip(multi_subvals_list, unflat_subvals_list):
subvals_list.append(unflat_subvals)
# Then invert the key groupings
data_lists = []
multi_groupxs_list = list(zip(*group_subxs_list))
for subvals_list, groupxs in zip(multi_subvals_list, multi_groupxs_list):
datas = vt.invert_apply_grouping(subvals_list, groupxs)
data_lists.append(datas)
return data_lists
[docs]def asserteq(
output1,
output2,
thresh=1e-8,
nestpath=None,
level=0,
lbl1=None,
lbl2=None,
output_lbl=None,
verbose=True,
iswarning=False,
):
"""
recursive equality checks
asserts that output1 and output2 are close to equal.
"""
failed = False
if lbl1 is None:
lbl1 = ut.get_varname_from_stack(output1, N=1)
if lbl2 is None:
lbl2 = ut.get_varname_from_stack(output2, N=1)
# Setup
if nestpath is None:
# record the path through the nested structure as testing goes on
nestpath = []
# print out these variables in all error cases
common_keys = ['lbl1', 'lbl2', 'level', 'nestpath']
# CHECK: types
try:
assert type(output1) == type(output2), 'types are not equal'
except AssertionError as ex:
print(type(output1))
print(type(output2))
ut.printex(
ex,
'FAILED TYPE CHECKS',
keys=common_keys + [(type, 'output1'), (type, 'output2')],
iswarning=iswarning,
)
failed = True
if not iswarning:
raise
# CHECK: length
if hasattr(output1, '__len__'):
try:
assert len(output1) == len(output2), 'lens are not equal'
except AssertionError as ex:
keys = common_keys + [
(len, 'output1'),
(len, 'output2'),
]
ut.printex(ex, 'FAILED LEN CHECKS. ', keys=keys)
raise
# CHECK: ndarrays
if isinstance(output1, np.ndarray):
ndarray_keys = ['output1.shape', 'output2.shape']
# CHECK: ndarray shape
try:
assert output1.shape == output2.shape, 'ndarray shapes are unequal'
except AssertionError as ex:
keys = common_keys + ndarray_keys
ut.printex(ex, 'FAILED NUMPY SHAPE CHECKS.', keys=keys, iswarning=iswarning)
failed = True
if not iswarning:
raise
# CHECK: ndarray equality
try:
passed, error = ut.almost_eq(output1, output2, thresh, ret_error=True)
assert np.all(passed), 'ndarrays are unequal.'
except AssertionError as ex:
# Statistics on value difference and value difference
# above the thresholds
diff_stats = ut.get_stats(error) # NOQA
error_stats = ut.get_stats(error[error >= thresh]) # NOQA
keys = (
common_keys
+ ndarray_keys
+ [
(len, 'output1'),
(len, 'output2'),
('diff_stats'),
('error_stats'),
('thresh'),
]
)
PRINT_VAL_SAMPLE = True
if PRINT_VAL_SAMPLE:
keys += ['output1', 'output2']
ut.printex(ex, 'FAILED NUMPY CHECKS.', keys=keys, iswarning=iswarning)
failed = True
if not iswarning:
raise
# CHECK: list/tuple items
elif isinstance(output1, (tuple, list)):
for count, (item1, item2) in enumerate(zip(output1, output2)):
# recursive call
try:
asserteq(
item1,
item2,
lbl1=lbl2,
lbl2=lbl1,
thresh=thresh,
nestpath=nestpath + [count],
level=level + 1,
)
except AssertionError as ex:
ut.printex(
ex,
'recursive call failed',
keys=common_keys + ['item1', 'item2', 'count'],
iswarning=iswarning,
)
failed = True
if not iswarning:
raise
# CHECK: scalars
else:
try:
assert output1 == output2, 'output1 != output2'
except AssertionError as ex:
print('nestpath= %r' % (nestpath,))
ut.printex(
ex,
'FAILED SCALAR CHECK.',
keys=common_keys + ['output1', 'output2'],
iswarning=iswarning,
)
failed = True
if not iswarning:
raise
if verbose and level == 0:
if not failed:
print('PASSED %s == %s' % (lbl1, lbl2))
else:
print('WARNING %s != %s' % (lbl1, lbl2))
[docs]def compare_implementations(
func1, func2, args, show_output=False, lbl1='', lbl2='', output_lbl=None
):
"""
tests two different implementations of the same function
"""
print('+ --- BEGIN COMPARE IMPLEMENTATIONS ---')
func1_name = ut.get_funcname(func1)
func2_name = ut.get_funcname(func2)
print('func1_name = %r' % (func1_name,))
print('func2_name = %r' % (func2_name,))
# test both versions
with ub.Timer('time func1=' + func1_name) as t1:
output1 = func1(*args)
with ub.Timer('time func2=' + func2_name) as t2:
output2 = func2(*args)
if t2.ellapsed == 0:
t2.ellapsed = 1e9
print('speedup = %r' % (t1.ellapsed / t2.ellapsed))
try:
asserteq(output1, output2, lbl1=lbl1, lbl2=lbl2, output_lbl=output_lbl)
print('implementations are in agreement :) ')
except AssertionError as ex:
# prints out a nested list corresponding to nested structure
ut.printex(
ex, 'IMPLEMENTATIONS DO NOT AGREE', keys=[('func1_name'), ('func2_name')]
)
raise
finally:
depth_profile1 = ut.depth_profile(output1)
depth_profile2 = ut.depth_profile(output2)
type_profile1 = ut.list_type_profile(output1)
type_profile2 = ut.list_type_profile(output2)
print('depth_profile1 = ' + ub.repr2(depth_profile1))
print('depth_profile2 = ' + ub.repr2(depth_profile2))
print('type_profile1 = ' + (type_profile1))
print('type_profile2 = ' + (type_profile2))
print('L ___ END COMPARE IMPLEMENTATIONS ___')
return output1
[docs]def greedy_setcover(universe, subsets, weights=None):
"""
Copied implmentation of greedy set cover from stack overflow. Needs work.
References:
http://stackoverflow.com/questions/7942312/of-greedy-set-cover-faster
Example:
>>> # SLOW_DOCTEST
>>> # xdoctest: +SKIP
>>> from vtool.other import * # NOQA
>>> import vtool as vt
>>> universe = set([1,2,3,4])
>>> subsets = [set([1,2]), set([1]), set([1,2,3]), set([1]), set([3,4]),
>>> set([4]), set([1,2]), set([3,4]), set([1,2,3,4])]
>>> weights = [1, 1, 2, 2, 2, 3, 3, 4, 4]
>>> chosen, costs = greedy_setcover(universe, subsets, weights)
>>> print('Cover: %r' % (chosen,))
>>> print('Total Cost: %r=sum(%r)' % (sum(costs), costs))
"""
# unchosen = subsets.copy()
uncovered = universe
chosen = []
costs = []
def findMin(subsets, uncovered, weights):
minCost = np.inf
minElement = -1
for i, s in enumerate(subsets):
num_isect = len(s.intersection(uncovered))
try:
cost = weights[i] / num_isect
if cost < minCost:
minCost = cost
minElement = i
except ZeroDivisionError:
pass
return subsets[minElement], weights[minElement]
while len(uncovered) != 0:
S_i, cost = findMin(subsets, uncovered, weights)
chosen.append(S_i)
uncovered = uncovered.difference(S_i)
costs.append(cost)
return chosen, costs
[docs]def find_elbow_point(curve):
"""
Finds the on the curve point furthest from the line defined by the
endpoints of the curve.
Args:
curve (ndarray): a monotonic curve
Returns:
int: tradeoff_idx - this is an elbow point in the curve
References:
http://stackoverflow.com/questions/2018178/trade-off-point-on-curve
CommandLine:
python -m vtool.other find_elbow_point --show
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> curve = np.exp(np.linspace(0, 10, 100))
>>> tradeoff_idx = find_elbow_point(curve)
>>> result = ('tradeoff_idx = %s' % (ub.repr2(tradeoff_idx),))
>>> print(result)
>>> assert tradeoff_idx == 76
>>> # xdoctest: +REQUIRES(--show)
>>> import wbia.plottool as pt
>>> import vtool as vt
>>> point = [tradeoff_idx, curve[tradeoff_idx]]
>>> segment = np.array([[0, len(curve) - 1], [curve[0], curve[-1]]])
>>> e1, e2 = segment.T
>>> dist_point = vt.closest_point_on_line_segment(point, e1, e2)
>>> dist_line = np.array([dist_point, point]).T
>>> pt.plot(curve, 'r', label='curve')
>>> pt.plot(point[0], point[1], 'go', markersize=10, label='tradeoff point')
>>> pt.plot(dist_line[0], dist_line[1], '-xb')
>>> pt.plot(segment[0], segment[1], '-xb')
>>> pt.legend()
>>> ut.show_if_requested()
"""
num_points = len(curve)
all_coords = np.vstack((np.arange(num_points), curve)).T
np.array([np.arange(num_points), curve])
first_point = all_coords[0]
line_vec = all_coords[-1] - all_coords[0]
line_vec_norm = line_vec / np.sqrt(np.sum(line_vec ** 2))
vec_from_first = all_coords - first_point
tiled_line_vec_norm = np.tile(line_vec_norm, (num_points, 1))
scalar_product = np.sum(vec_from_first * tiled_line_vec_norm, axis=1)
vec_from_first_parallel = np.outer(scalar_product, line_vec_norm)
vec_to_line = vec_from_first - vec_from_first_parallel
dist_to_line = np.sqrt(np.sum(vec_to_line ** 2, axis=1))
tradeoff_idx = np.argmax(dist_to_line)
return tradeoff_idx
[docs]def zstar_value(conf_level=0.95):
"""
References:
http://stackoverflow.com/questions/28242593/correct-way-to-obtain-confidence-interval-with-scipy
"""
import scipy.stats as spstats
# distribution =
# spstats.t.interval(.95, df=(ss - 1))[1]
# spstats.norm.interval(.95, df=1)[1]
zstar = spstats.norm.interval(conf_level)[1]
# zstar = spstats.norm.ppf(spstats.norm.cdf(0) + (conf_level / 2))
return zstar
[docs]def calc_error_bars_from_sample(sample_size, num_positive, pop, conf_level=0.95):
"""
Determines a error bars of sample
References:
https://www.qualtrics.com/blog/determining-sample-size/
http://www.surveysystem.com/sscalc.htm
https://en.wikipedia.org/wiki/Sample_size_determination
http://www.surveysystem.com/sample-size-formula.htm
http://courses.wcupa.edu/rbove/Berenson/10th%20ed%20CD-ROM%20topics/section8_7.pdf
https://en.wikipedia.org/wiki/Standard_normal_table
https://www.unc.edu/~rls/s151-2010/class23.pdf
"""
# zValC_lookup = {.95: 3.8416, .99: 6.6564,}
# We sampled ss from a population of pop and got num_positive true cases.
ss = sample_size
# Calculate at this confidence level
zval = zstar_value(conf_level)
# Calculate our plus/minus error in positive percentage
pos_frac = num_positive / ss
pf = (pop - ss) / (pop - 1)
err_frac = zval * np.sqrt((pos_frac) * (1 - pos_frac) * pf / ss)
lines = []
lines.append('population_size = %r' % (pop,))
lines.append('sample_size = %r' % (ss,))
lines.append('num_positive = %r' % (num_positive,))
lines.append(
'positive rate is %.2f%% ± %.2f%% @ %r confidence'
% (100 * pos_frac, 100 * err_frac, conf_level)
)
lines.append(
'positive num is %d ± %d @ %r confidence'
% (int(np.round(pop * pos_frac)), int(np.round(pop * err_frac)), conf_level)
)
print(ut.msgblock('Calculate Sample Error Margin', '\n'.join(lines)))
[docs]def calc_sample_from_error_bars(err_frac, pop, conf_level=0.95, prior=0.5):
"""
Determines a reasonable sample size to achieve desired error bars.
import sympy
p, n, N, z = sympy.symbols('prior, ss, pop, zval')
me = sympy.symbols('err_frac')
expr = (z * sympy.sqrt((p * (1 - p) / n) * ((N - n) / (N - 1))))
equation = sympy.Eq(me, expr)
nexpr = sympy.solve(equation, [n])[0]
nexpr = sympy.simplify(nexpr)
import autopep8
print(autopep8.fix_lines(['ss = ' + str(nexpr)], autopep8._get_options({}, False)))
ss = -pop * prior* (zval**2) *(prior - 1) / ((err_frac ** 2) * pop - (err_frac**2) - prior * (zval**2) * (prior - 1))
ss = pop * prior * zval ** 2 * (prior - 1) / (-err_frac ** 2 * pop + err_frac ** 2 + prior * zval ** 2 * (prior - 1))
"""
# How much confidence ydo you want (in fraction of positive results)
# zVal_lookup = {.95: 1.96, .99: 2.58,}
zval = zstar_value(conf_level)
std = 0.5
zval * std * (1 - std) / err_frac
# margin_error = err_frac
# margin_error = zval * np.sqrt(prior * (1 - prior) / ss)
# margin_error_small = zval * np.sqrt((prior * (1 - prior) / ss) * ((pop - ss) / (pop - 1)))
# prior = .5 # initial uncertainty
# Used for large samples
# ss_large = (prior * (1 - prior)) / ((margin_error / zval) ** 2)
# Used for small samples
ss_numer = pop * prior * zval ** 2 * (1 - prior)
ss_denom = err_frac ** 2 * pop + err_frac ** 2 + prior * zval ** 2 * (1 - prior)
ss_small = ss_numer / ss_denom
# ss_ = ((zval ** 2) * 0.25) / (err_frac ** 2)
# ss = int(np.ceil(ss_ / (1 + ((ss_ - 1) / pop))))
ss = int(np.ceil(ss_small))
lines = []
lines.append('population_size = %r' % (pop,))
lines.append('positive_prior = %r' % (prior,))
lines.append('Desired confidence = %.2f' % (conf_level,))
lines.append('Desired error rate is %.2f%%' % (err_frac * 100))
lines.append('Desired number of errors is %d' % (int(round(err_frac * pop))))
lines.append('Need sample sample size of %r to achive requirements' % (ss,))
print(ut.msgblock('Calculate Required Sample Size', '\n'.join(lines)))
[docs]def inbounds(num, low, high, eq=False):
r"""
Args:
num (scalar or ndarray):
low (scalar or ndarray):
high (scalar or ndarray):
eq (bool):
Returns:
scalar or ndarray: is_inbounds
CommandLine:
xdoctest -m ~/code/vtool/vtool/other.py inbounds
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> import utool as ut
>>> num = np.array([[ 0. , 0.431, 0.279],
... [ 0.204, 0.352, 0.08 ],
... [ 0.107, 0.325, 0.179]])
>>> low = .1
>>> high = .4
>>> eq = False
>>> is_inbounds = inbounds(num, low, high, eq)
>>> result = ub.repr2(is_inbounds, with_dtype=True)
>>> print(result)
"""
import operator as op
less = op.le if eq else op.lt
greater = op.ge if eq else op.gt
and_ = np.logical_and if isinstance(num, np.ndarray) else op.and_
is_inbounds = and_(greater(num, low), less(num, high))
return is_inbounds
[docs]def fromiter_nd(iter_, shape, dtype):
"""
Like np.fromiter but handles iterators that generated
n-dimensional arrays. Slightly faster than np.array.
maybe commit to numpy?
Args:
iter_ (iter): an iterable that generates homogenous ndarrays
shape (tuple): the expected output shape
dtype (dtype): the numpy datatype of the generated ndarrays
Note:
The iterable must yeild a numpy array. It cannot yeild a Python list.
CommandLine:
python -m vtool.other fromiter_nd --show
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> dtype = np.float
>>> total = 11
>>> rng = np.random.RandomState(0)
>>> iter_ = (rng.rand(5, 7, 3) for _ in range(total))
>>> shape = (total, 5, 7, 3)
>>> result = fromiter_nd(iter_, shape, dtype)
>>> assert result.shape == shape
Example:
>>> # ENABLE_DOCTEST
>>> from vtool.other import * # NOQA
>>> dtype = np.int
>>> qfxs = np.array([1, 2, 3])
>>> dfxs = np.array([4, 5, 6])
>>> iter_ = (np.array(x) for x in ut.product(qfxs, dfxs))
>>> total = len(qfxs) * len(dfxs)
>>> shape = (total, 2)
>>> result = fromiter_nd(iter_, shape, dtype)
>>> assert result.shape == shape
"""
num_rows = shape[0]
chunksize = np.prod(shape[1:])
itemsize = np.dtype(dtype).itemsize
# Create dtype that makes an entire ndarray appear as a single item
chunk_dtype = np.dtype((np.void, itemsize * chunksize))
arr = np.fromiter(iter_, count=num_rows, dtype=chunk_dtype)
# Convert back to original dtype and shape
arr = arr.view(dtype)
arr.shape = shape
return arr
[docs]def make_video2(images, outdir):
import vtool as vt
from os.path import join
n = str(int(np.ceil(np.log10(len(images)))))
fmt = 'frame_%0' + n + 'd.png'
ub.ensuredir(outdir)
for count, img in enumerate(images):
fname = join(outdir, fmt % (count))
vt.imwrite(fname, img)
[docs]def make_video(images, outvid=None, fps=5, size=None, is_color=True, format='XVID'):
"""
Create a video from a list of images.
References:
http://www.xavierdupre.fr/blog/2016-03-30_nojs.html
http://opencv-python-tutroals.readthedocs.org/en/latest/py_tutorials/py_gui/py_video_display/py_video_display.html
@param outvid output video
@param images list of images to use in the video
@param fps frame per second
@param size size of each frame
@param is_color color
@param format see http://www.fourcc.org/codecs.php
The function relies on http://opencv-python-tutroals.readthedocs.org/en/latest/.
By default, the video will have the size of the first image.
It will resize every image to this size before adding them to the video.
"""
# format = 'MJPG'
# format = 'FMP4'
import cv2
fourcc = cv2.VideoWriter_fourcc(*str(format))
vid = None
for img in images:
if vid is None:
if size is None:
size = img.shape[1], img.shape[0]
vid = cv2.VideoWriter(outvid, fourcc, float(fps), size, is_color)
if size[0] != img.shape[1] and size[1] != img.shape[0]:
img = cv2.resize(img, size)
vid.write(img)
vid.release()
return vid
[docs]def take_col_per_row(arr, colx_list):
"""takes a column from each row
Ignore:
num_rows = 1000
num_cols = 4
arr = np.arange(10 * 4).reshape(10, 4)
colx_list = (np.random.rand(10) * 4).astype(np.int)
%timeit np.array([row[cx] for (row, cx) in zip(arr, colx_list)])
%timeit arr.ravel().take(np.ravel_multi_index((np.arange(len(colx_list)), colx_list), arr.shape))
%timeit arr.ravel().take(colx_list + np.arange(arr.shape[0]) * arr.shape[1])
"""
# out = np.array([row[cx] for (row, cx) in zip(arr, colx_list)])
multix_list = np.ravel_multi_index((np.arange(len(colx_list)), colx_list), arr.shape)
out = arr.ravel().take(multix_list)
return out
if __name__ == '__main__':
"""
CommandLine:
xdoctest -m vtool.other
"""
import xdoctest
xdoctest.doctest_module(__file__)