from __future__ import annotations
from typing import Union, List
import collections.abc
import copy
import numpy as np
import scipy as sp
[docs]
class WeightedMinHash(object):
"""New weighted MinHash is generated by
:class:`WeightedMinHashGenerator`.
You can also initialize a weighted MinHash by using the state
from an existing one.
Args:
seed (int): The random seed used to generate this weighted
MinHash.
hashvalues (numpy.ndarray): The internal state of this weighted MinHash.
"""
[docs]
def __init__(self, seed: int, hashvalues: np.ndarray) -> None:
self.seed = seed
self.hashvalues = hashvalues
[docs]
def jaccard(self, other: WeightedMinHash) -> float:
"""Estimate the `weighted Jaccard similarity`_ between the
multi-sets represented by this weighted MinHash and the other.
Args:
other (WeightedMinHash): The other weighted MinHash.
Returns:
float: The weighted Jaccard similarity between 0.0 and 1.0.
Raises:
ValueError: If the two weighted MinHash objects have different
seeds or different numbers of hash values.
.. _`weighted Jaccard similarity`: http://mathoverflow.net/questions/123339/weighted-jaccard-similarity
"""
if other.seed != self.seed:
raise ValueError(
"Cannot compute Jaccard given WeightedMinHash objects with\
different seeds"
)
if len(self) != len(other):
raise ValueError(
"Cannot compute Jaccard given WeightedMinHash objects with\
different numbers of hash values"
)
# Check how many pairs of (k, t) hashvalues are equal
intersection = 0
for this, that in zip(self.hashvalues, other.hashvalues):
if np.array_equal(this, that):
intersection += 1
return float(intersection) / float(len(self))
[docs]
def digest(self) -> np.ndarray:
"""Export the hash values, which is the internal state of the
weighted MinHash.
Returns:
numpy.ndarray: The hash values which is a Numpy array.
"""
return copy.copy(self.hashvalues)
[docs]
def copy(self) -> WeightedMinHash:
"""
Returns:
WeightedMinHash: A copy of this weighted MinHash by exporting
its state.
"""
return WeightedMinHash(self.seed, self.digest())
[docs]
def __len__(self) -> int:
"""
Returns:
int: The number of hash values.
"""
return len(self.hashvalues)
[docs]
def __eq__(self, other) -> bool:
"""
Returns:
bool: If their seeds and hash values are both equal then two
are equivalent.
"""
return (
type(self) is type(other)
and self.seed == other.seed
and np.array_equal(self.hashvalues, other.hashvalues)
)
[docs]
class WeightedMinHashGenerator(object):
"""The weighted MinHash generator is used for creating
new :class:`WeightedMinHash` objects.
This weighted MinHash implementation is based on Sergey Ioffe's paper,
`Improved Consistent Sampling, Weighted Minhash and L1 Sketching
<http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36928.pdf>`_
Args:
dim (int): The number of dimensions of the input Jaccard vectors.
sample_size (int): The number of samples to use for creating
weighted MinHash.
seed (int): The random seed to use for generating permutation functions.
"""
[docs]
def __init__(self, dim: int, sample_size: int = 128, seed: int = 1) -> None:
self.dim = dim
self.sample_size = sample_size
self.seed = seed
generator = np.random.RandomState(seed=seed)
self.rs = generator.gamma(2, 1, (sample_size, dim)).astype(np.float32)
self.ln_cs = np.log(generator.gamma(2, 1, (sample_size, dim))).astype(
np.float32
)
self.betas = generator.uniform(0, 1, (sample_size, dim)).astype(np.float32)
[docs]
def minhash(self, v: np.ndarray) -> WeightedMinHash:
"""Create a new weighted MinHash given a weighted Jaccard vector.
Each dimension is an integer
frequency of the corresponding element in the multi-set represented
by the vector.
Args:
v (numpy.ndarray): The Jaccard vector.
Returns:
WeightedMinHash: The weighted MinHash.
"""
if not isinstance(v, collections.abc.Iterable):
raise TypeError("Input vector must be an iterable")
if not len(v) == self.dim:
raise ValueError("Input dimension mismatch, expecting %d" % self.dim)
if not isinstance(v, np.ndarray):
v = np.array(v, dtype=np.float32)
elif v.dtype != np.float32:
v = v.astype(np.float32)
hashvalues = np.zeros((self.sample_size, 2), dtype=int)
vzeros = v == 0
if vzeros.all():
raise ValueError("Input is all zeros")
v[vzeros] = np.nan
vlog = np.log(v)
for i in range(self.sample_size):
t = np.floor((vlog / self.rs[i]) + self.betas[i])
ln_y = (t - self.betas[i]) * self.rs[i]
ln_a = self.ln_cs[i] - ln_y - self.rs[i]
k = np.nanargmin(ln_a)
hashvalues[i][0], hashvalues[i][1] = k, int(t[k])
return WeightedMinHash(self.seed, hashvalues)
[docs]
def minhash_many(
self, X: Union[sp.sparse.spmatrix, np.ndarray]
) -> List[Union[WeightedMinHash, None]]:
"""Create new WeightedMinHash instances given a matrix of weighted
Jaccard vectors. In the input matrix X, each row corresponds to a
multi-set, and each column stores the integer frequency of the
element of a dimension.
Note:
This method is experimental and does not yield the same MinHash
hash values as :meth:`minhash`.
Args:
X (Union[scipy.sparse.spmatrix, numpy.ndarray]): A matrix of Jaccard
vectors (rows).
Returns:
List[Union[WeightedMinHash, None]] - A list of length X.shape[0].
Each element is either a :class:`WeightedMinHash` instance or None
(if the original row in X is empty).
"""
# Input validation
if not isinstance(X, (sp.sparse.spmatrix, np.ndarray)):
raise TypeError("Input X must be a sparse matrix or numpy matrix")
if X.ndim != 2:
raise ValueError("Input must have two dimensions")
if X.shape[1] != self.dim:
raise ValueError("Input dimension mismatch, expecting %d" % self.dim)
# Clean up X
X = sp.sparse.csr_matrix(X, dtype=np.float32, copy=True)
X.sort_indices()
num_docs = X.shape[0]
all_hashvalues = [None for _ in range(num_docs)]
# Grab nonzero index information
ridx, cidx = X.nonzero()
rowends = X.indptr.tolist()
rowends.pop(0)
rowends.append(X.nnz)
it_doc, doc_begin, doc_end = None, 0, rowends[0]
# Generate temporary data
rs_cidx = np.array(self.rs, copy=True)[:, cidx] # sample_size x dims
betas_cidx = np.array(self.betas, copy=True)[:, cidx] # sample_size x dims
ln_cs_cidx = np.array(self.ln_cs, copy=True)[:, cidx] # sample_size x dims
log_data = np.log(X[ridx, cidx].getA1())
log_data = np.vstack([log_data] * self.sample_size) # sample_size x dims
# Unary transformations
t = np.floor(log_data / rs_cidx + betas_cidx) # sample_size x dims
ln_y = (t - betas_cidx + 1) * rs_cidx
ln_a = ln_cs_cidx - ln_y
# Compute all samples simultaneously to take advantage of numpy's tight
# C for loops to improve performance
for it_doc in range(X.shape[0]):
doc_end = rowends[it_doc]
if doc_begin != doc_end:
doc_cidx = cidx[doc_begin:doc_end]
doc_ln_a = ln_a[:, doc_begin:doc_end]
doc_argmin = np.argmin(doc_ln_a, axis=1)
doc_k = doc_cidx[doc_argmin]
all_hashvalues[it_doc] = np.zeros((self.sample_size, 2), dtype=int)
hashvalues = all_hashvalues[it_doc]
hashvalues[:, 0], hashvalues[:, 1] = (
doc_k,
t[np.arange(self.sample_size), doc_begin + doc_argmin],
)
doc_begin = doc_end
# Create the WeightedMinHash instances for non-empty documents
ret = [None] * X.shape[0]
for it_doc, hashvalues in enumerate(all_hashvalues):
if hashvalues is not None:
ret[it_doc] = WeightedMinHash(self.seed, hashvalues)
return ret