from __future__ import annotations
import copy
import warnings
from collections.abc import Generator, Iterable
from typing import TYPE_CHECKING, Callable, Optional, Union
try:
from typing import Literal # py3.8+; if older, you can fallback to typing_extensions
except ImportError:
from typing_extensions import Literal
import numpy as np
if TYPE_CHECKING:
from numpy.typing import ArrayLike
# GPU backend
try:
import cupy as cp
except ImportError:
cp = None
from datasketch.hashfunc import sha1_hash32
# The size of a hash value in number of bytes
hashvalue_byte_size = len(bytes(np.int64(42).data))
# http://en.wikipedia.org/wiki/Mersenne_prime
_mersenne_prime = np.uint64((1 << 61) - 1)
_max_hash = np.uint64((1 << 32) - 1)
_hash_range = 1 << 32
_GPU_OK_CACHE: Optional[bool] = None
def _gpu_available() -> bool:
global _GPU_OK_CACHE
if cp is None:
return False
if _GPU_OK_CACHE is not None:
return _GPU_OK_CACHE
try:
_GPU_OK_CACHE = cp.cuda.runtime.getDeviceCount() > 0
except Exception:
_GPU_OK_CACHE = False
return _GPU_OK_CACHE
[docs]
class MinHash:
"""MinHash is a probabilistic data structure for computing
`Jaccard similarity`_ between sets.
Args:
num_perm (int): Number of random permutation functions.
It will be ignored if `hashvalues` is not None.
seed (int): The random seed controls the set of random
permutation functions generated for this MinHash.
gpu_mode : {'disable', 'detect', 'always'}, default 'disable'
Controls GPU use in :meth:`update_batch`.
- ``'disable'`` — always CPU.
- ``'detect'`` — use GPU if available, otherwise CPU.
- ``'always'`` — require GPU; raise ``RuntimeError`` if CuPy/CUDA
is unavailable.
hashfunc (Callable): The hash function used by
this MinHash.
It takes the input passed to the :meth:`update` method and
returns an integer that can be encoded with 32 bits.
The default hash function is based on SHA1 from hashlib_.
Users can use `farmhash` for better performance.
See the example in :meth:`update`.
hashobj (**deprecated**): This argument is deprecated since version
1.4.0. It is a no-op and has been replaced by `hashfunc`.
hashvalues (Optional[Iterable]): The hash values is
the internal state of the MinHash. It can be specified for faster
initialization using the existing :attr:`hashvalues` of another MinHash.
permutations (Optional[Tuple[Iterable, Iterable]]): The permutation
function parameters as a tuple of two lists. This argument
can be specified for faster initialization using the existing
:attr:`permutations` from another MinHash.
Note:
Hashing and permutation *generation* always run on CPU to preserve
existing semantics; only the permutation application and the
columnwise min-reduction inside :meth:`update_batch` may run on GPU.
Note:
To save memory usage, consider using :class:`datasketch.LeanMinHash`.
Note:
Since version 1.1.1, MinHash will only support serialization using
pickle_. ``serialize`` and ``deserialize`` methods are removed,
and are supported in :class:`datasketch.LeanMinHash` instead.
MinHash serialized before version 1.1.1 cannot be deserialized properly
in newer versions (`need to migrate? <https://github.com/ekzhu/datasketch/issues/18>`_).
Note:
Since version 1.1.3, MinHash uses Numpy's random number generator
instead of Python's built-in random package. This change makes the
hash values consistent across different Python versions.
The side-effect is that now MinHash created before version 1.1.3 won't
work (i.e., :meth:`jaccard`, :meth:`merge` and :meth:`union`)
with those created after.
.. _`Jaccard similarity`: https://en.wikipedia.org/wiki/Jaccard_index
.. _hashlib: https://docs.python.org/3.5/library/hashlib.html
.. _`pickle`: https://docs.python.org/3/library/pickle.html
"""
[docs]
def __init__(
self,
num_perm: int = 128,
seed: int = 1,
gpu_mode: Literal["disable", "detect", "always"] = "disable",
hashfunc: Callable = sha1_hash32,
hashobj: Optional[object] = None, # Deprecated.
hashvalues: Optional[ArrayLike] = None,
permutations: Optional[Union[tuple[ArrayLike, ArrayLike], ArrayLike]] = None,
) -> None:
if hashvalues is not None:
num_perm = len(hashvalues)
if num_perm > _hash_range:
# Because 1) we don't want the size to be too large, and
# 2) we are using 4 bytes to store the size value
raise ValueError(
"Cannot have more than %d number of\
permutation functions"
% _hash_range
)
self.seed = seed
self.num_perm = num_perm
# Check the hash function.
if not callable(hashfunc):
raise ValueError("The hashfunc must be a callable.")
self.hashfunc = hashfunc
# Check for use of hashobj and issue warning.
if hashobj is not None:
warnings.warn("hashobj is deprecated, use hashfunc instead.", DeprecationWarning, stacklevel=2)
# Initialize hash values
if hashvalues is not None:
self.hashvalues = self._parse_hashvalues(hashvalues)
else:
self.hashvalues = self._init_hashvalues(num_perm)
# Initalize permutation function parameters
if permutations is not None:
self.permutations = permutations
else:
self.permutations = self._init_permutations(num_perm)
if len(self) != len(self.permutations[0]):
raise ValueError("Numbers of hash values and permutations mismatch")
# GPU state
self._gpu_mode: Literal["disable", "detect", "always"] = gpu_mode
self._a_gpu = None
self._b_gpu = None
def _ensure_gpu_caches(self) -> None:
"""Cache permutation arrays on device. Call only when GPU is available."""
if self._a_gpu is None or self._b_gpu is None:
a, b = self.permutations
self._a_gpu = cp.asarray(a, dtype=cp.uint64)
self._b_gpu = cp.asarray(b, dtype=cp.uint64)
def _init_hashvalues(self, num_perm: int) -> np.ndarray:
return np.ones(num_perm, dtype=np.uint64) * _max_hash
def _init_permutations(self, num_perm: int) -> np.ndarray:
# Create parameters for a random bijective permutation function
# that maps a 32-bit hash value to another 32-bit hash value.
# http://en.wikipedia.org/wiki/Universal_hashing
gen = np.random.RandomState(self.seed)
return np.array(
[
(
gen.randint(1, _mersenne_prime, dtype=np.uint64),
gen.randint(0, _mersenne_prime, dtype=np.uint64),
)
for _ in range(num_perm)
],
dtype=np.uint64,
).T
def _parse_hashvalues(self, hashvalues) -> np.ndarray:
return np.array(hashvalues, dtype=np.uint64)
[docs]
def update(self, b) -> None:
"""Update this MinHash with a new value.
The value will be hashed using the hash function specified by
the `hashfunc` argument in the constructor.
Args:
b: The value to be hashed using the hash function specified.
Example:
To update with a new string value (using the default SHA1 hash
function, which requires bytes as input):
.. code-block:: python
minhash = Minhash()
minhash.update("new value".encode("utf-8"))
We can also use a different hash function, for example, `pyfarmhash`:
.. code-block:: python
import farmhash
def _hash_32(b):
return farmhash.hash32(b)
minhash = MinHash(hashfunc=_hash_32)
minhash.update("new value")
"""
hv = self.hashfunc(b)
a, b = self.permutations
phv = np.bitwise_and((a * hv + b) % _mersenne_prime, _max_hash)
self.hashvalues = np.minimum(phv, self.hashvalues)
[docs]
def update_batch(self, b: Iterable) -> None:
"""Update this MinHash with new values.
The values will be hashed using the hash function specified by
the `hashfunc` argument in the constructor.
Notes:
- Hashing of input values always runs on CPU.
- Permutation application + min-reduction may run on GPU
depending on `gpu_mode`:
'disable' : CPU
'detect' : GPU if available else CPU
'always' : GPU (error if unavailable)
Args:
b (Iterable): Values to be hashed using the hash function specified.
Examples:
Basic usage with string values (default SHA1 hash, requires bytes):
.. code-block:: python
from datasketch import MinHash
m = MinHash()
m.update_batch([s.encode("utf-8") for s in ["token1", "token2"]])
Using GPU mode if available:
.. code-block:: python
from datasketch import MinHash
m = MinHash(num_perm=256, gpu_mode="detect")
m.update_batch([b"token1", b"token2"])
"""
# Hash on CPU to preserve hashfunc semantics
hv_list = [self.hashfunc(_b) for _b in b]
# Optimization: empty batch is a no-op (preserves original behavior)
if not hv_list:
return
a, b = self.permutations # np.ndarray[uint64]
# Decide backend
use_gpu = False
if self._gpu_mode == "always":
if not _gpu_available():
raise RuntimeError("GPU mode 'always' requested but CuPy/CUDA device is unavailable.")
use_gpu = True
elif self._gpu_mode == "detect":
use_gpu = _gpu_available()
else: # 'disable'
use_gpu = False
if use_gpu:
# GPU path (keep indentation minimal as requested)
self._ensure_gpu_caches()
hv_gpu = cp.asarray(hv_list, dtype=cp.uint64).reshape(-1, 1)
phv_gpu = (hv_gpu * self._a_gpu + self._b_gpu) % cp.uint64(_mersenne_prime)
phv_gpu = cp.bitwise_and(phv_gpu, cp.uint64(_max_hash))
base_gpu = cp.asarray(self.hashvalues, dtype=cp.uint64)
# column-wise min without extra vstack
col_min = cp.minimum(base_gpu, cp.min(phv_gpu, axis=0))
self.hashvalues = cp.asnumpy(col_min).astype(np.uint64, copy=False)
return
# CPU path
hv = np.array(hv_list, dtype=np.uint64, ndmin=2).T
phv = (hv * a + b) % _mersenne_prime
phv = np.bitwise_and(phv, _max_hash)
self.hashvalues = np.minimum(self.hashvalues, phv.min(axis=0))
[docs]
def jaccard(self, other: MinHash) -> float:
"""Estimate the `Jaccard similarity`_ (resemblance) between the sets
represented by this MinHash and the other.
Args:
other (MinHash): The other MinHash.
Returns:
float: The Jaccard similarity, which is between 0.0 and 1.0.
Raises:
ValueError: If the two MinHashes have different numbers of
permutation functions or different seeds.
"""
if other.seed != self.seed:
raise ValueError(
"Cannot compute Jaccard given MinHash with\
different seeds"
)
if len(self) != len(other):
raise ValueError(
"Cannot compute Jaccard given MinHash with\
different numbers of permutation functions"
)
return float(np.count_nonzero(self.hashvalues == other.hashvalues)) / float(len(self))
[docs]
def count(self) -> float:
"""Estimate the cardinality count based on the technique described in
`this paper <http://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=365694>`_.
Returns:
int: The estimated cardinality of the set represented by this MinHash.
"""
k = len(self)
return float(k) / np.sum(self.hashvalues / float(_max_hash)) - 1.0
[docs]
def merge(self, other: MinHash) -> None:
"""Merge the other MinHash with this one, making this one the union
of both.
Args:
other (MinHash): The other MinHash.
Raises:
ValueError: If the two MinHashes have different numbers of
permutation functions or different seeds.
"""
if other.seed != self.seed:
raise ValueError(
"Cannot merge MinHash with\
different seeds"
)
if len(self) != len(other):
raise ValueError(
"Cannot merge MinHash with\
different numbers of permutation functions"
)
self.hashvalues = np.minimum(other.hashvalues, self.hashvalues)
[docs]
def digest(self) -> np.ndarray:
"""Export the hash values, which is the internal state of the
MinHash.
Returns:
numpy.ndarray: The hash values which is a Numpy array.
"""
return copy.copy(self.hashvalues)
[docs]
def is_empty(self) -> bool:
"""Returns:
bool: If the current MinHash is empty - at the state of just
initialized.
"""
return not np.any(self.hashvalues != _max_hash)
[docs]
def clear(self) -> None:
"""Clear the current state of the MinHash.
All hash values are reset.
"""
self.hashvalues = self._init_hashvalues(len(self))
[docs]
def copy(self) -> MinHash:
"""Return a copy; preserves gpu_mode (rehydrates caches lazily)."""
return MinHash(
seed=self.seed,
hashfunc=self.hashfunc,
hashvalues=self.digest(),
permutations=self.permutations,
gpu_mode=self._gpu_mode,
)
[docs]
def __len__(self) -> int:
"""Returns:
int: The number of hash values.
"""
return len(self.hashvalues)
[docs]
def __eq__(self, other: MinHash) -> bool:
"""Returns:
bool: If their seeds and hash values are both equal then two are equivalent.
"""
return (
type(self) is type(other) and self.seed == other.seed and np.array_equal(self.hashvalues, other.hashvalues)
)
[docs]
@classmethod
def union(cls, *mhs: MinHash) -> MinHash:
"""Create a MinHash which is the union of the MinHash objects passed as arguments.
Args:
*mhs (MinHash): The MinHash objects to be united. The argument list length is variable,
but must be at least 2.
Returns:
MinHash: a new union MinHash.
Raises:
ValueError: If the number of MinHash objects passed as arguments is less than 2,
or if the MinHash objects passed as arguments have different seeds or
different numbers of permutation functions.
Example:
.. code-block:: python
from datasketch import MinHash
import numpy as np
m1 = MinHash(num_perm=128)
m1.update_batch(np.random.randint(low=0, high=30, size=10))
m2 = MinHash(num_perm=128)
m2.update_batch(np.random.randint(low=0, high=30, size=10))
# Union m1 and m2.
m = MinHash.union(m1, m2)
"""
if len(mhs) < 2:
raise ValueError("Cannot union less than 2 MinHash")
num_perm = len(mhs[0])
seed = mhs[0].seed
if any((seed != m.seed or num_perm != len(m)) for m in mhs):
raise ValueError(
"The unioning MinHash must have the\
same seed and number of permutation functions"
)
hashvalues = np.minimum.reduce([m.hashvalues for m in mhs])
permutations = mhs[0].permutations
return cls(
num_perm=num_perm,
seed=seed,
hashfunc=mhs[0].hashfunc,
hashvalues=hashvalues,
permutations=permutations,
gpu_mode=mhs[0]._gpu_mode,
)
[docs]
@classmethod
def bulk(cls, b: Iterable, **minhash_kwargs) -> list[MinHash]:
"""Compute MinHashes in bulk. This method avoids unnecessary
overhead when initializing many minhashes by reusing the initialized
state.
Args:
b (Iterable): An Iterable of lists of bytes, each list is
hashed in to one MinHash in the output.
**minhash_kwargs: Keyword arguments used to initialize MinHash,
will be used for all minhashes.
Returns:
list[datasketch.MinHash]: A list of computed MinHashes.
Example:
.. code-block:: python
from datasketch import MinHash
data = [[b"token1", b"token2", b"token3"], [b"token4", b"token5", b"token6"]]
minhashes = MinHash.bulk(data, num_perm=64)
"""
return list(cls.generator(b, **minhash_kwargs))
[docs]
@classmethod
def generator(cls, b: Iterable, **minhash_kwargs) -> Generator[MinHash, None, None]:
"""Compute MinHashes in a generator. This method avoids unnecessary
overhead when initializing many minhashes by reusing the initialized
state.
Args:
b (Iterable): An Iterable of lists of bytes, each list is
hashed in to one MinHash in the output.
minhash_kwargs: Keyword arguments used to initialize MinHash,
will be used for all minhashes.
Returns:
Generator[MinHash, None, None]: a generator of computed MinHashes.
Example:
.. code-block:: python
from datasketch import MinHash
data = [[b"token1", b"token2", b"token3"], [b"token4", b"token5", b"token6"]]
for minhash in MinHash.generator(data, num_perm=64):
# do something useful
minhash
"""
m = cls(**minhash_kwargs)
for _b in b:
_m = m.copy()
_m.update_batch(_b)
yield _m
# NOTE:
# CuPy device arrays (`_a_gpu`, `_b_gpu`) are not reliably picklable across
# environments and can inflate pickle size or fail on machines without CUDA.
# We therefore drop device state on pickle so round-tripped MinHash objects
# remain portable; callers can still use `gpu_mode="detect"` to re-enable GPU.
def __getstate__(self):
state = self.__dict__.copy()
# Drop device arrays; keep gpu_mode as-is (portable across envs).
state["_a_gpu"] = None
state["_b_gpu"] = None
return state
def __setstate__(self, state):
self.__dict__.update(state)
# After unpickling we remain on CPU until update_batch decides backend.
# Caches will be recreated on first GPU use via _ensure_gpu_caches().