Source code for datasketch.hyperloglog

from __future__ import annotations
import struct, copy
from typing import Callable, Optional
import numpy as np
import warnings

try:
    from .hyperloglog_const import _thresholds, _raw_estimate, _bias
except ImportError:
    # For Python 2
    from hyperloglog_const import _thresholds, _raw_estimate, _bias

from datasketch.hashfunc import sha1_hash32, sha1_hash64

# Get the number of bits starting from the first non-zero bit to the right
_bit_length = lambda bits: bits.bit_length()
# For < Python 2.7
if not hasattr(int, "bit_length"):
    _bit_length = lambda bits: len(bin(bits)) - 2 if bits > 0 else 0


[docs] class HyperLogLog(object): """ The HyperLogLog data sketch for estimating cardinality of very large dataset in a single pass. The original HyperLogLog is described `here <http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf>`_. This HyperLogLog implementation is based on: https://github.com/svpcom/hyperloglog Args: p (int): The precision parameter. It is ignored if the `reg` is given. reg (Optional[numpy.ndarray]): The internal state. This argument is for initializing the HyperLogLog from an existing one. hashfunc (Callable): The hash function used by this MinHash. It takes the input passed to the `update` method and returns an integer that can be encoded with 32 bits. The default hash function is based on SHA1 from hashlib_. hashobj (**deprecated**): This argument is deprecated since version 1.4.0. It is a no-op and has been replaced by `hashfunc`. """ __slots__ = ("p", "m", "reg", "alpha", "max_rank", "hashfunc") # The range of the hash values used for HyperLogLog _hash_range_bit = 32 _hash_range_byte = 4 def _get_alpha(self, p): if not (4 <= p <= 16): raise ValueError("p=%d should be in range [4 : 16]" % p) if p == 4: return 0.673 if p == 5: return 0.697 if p == 6: return 0.709 return 0.7213 / (1.0 + 1.079 / (1 << p))
[docs] def __init__( self, p: int = 8, reg: Optional[np.ndarray] = None, hashfunc: Callable = sha1_hash32, hashobj: Optional[object] = None, # Deprecated ): if reg is None: self.p = p self.m = 1 << p self.reg = np.zeros((self.m,), dtype=np.int8) else: # Check if the register has the correct type if not isinstance(reg, np.ndarray): raise ValueError("The imported register must be a numpy.ndarray.") # We have to check if the imported register has the correct length. self.m = reg.size self.p = _bit_length(self.m) - 1 if 1 << self.p != self.m: raise ValueError( "The imported register has \ incorrect size. Expect a power of 2." ) # Generally we trust the user to import register that contains # reasonable counter values, so we don't check for every values. self.reg = reg # Check the hash function. if not callable(hashfunc): raise ValueError("The hashfunc must be a callable.") # Check for use of hashobj and issue warning. if hashobj is not None: warnings.warn( "hashobj is deprecated, use hashfunc instead.", DeprecationWarning ) self.hashfunc = hashfunc # Common settings self.alpha = self._get_alpha(self.p) self.max_rank = self._hash_range_bit - self.p
[docs] def update(self, b) -> None: """ Update the HyperLogLog with a new data value in bytes. The value will be hashed using the hash function specified by the `hashfunc` argument in the constructor. Args: b: The value to be hashed using the hash function specified. Example: To update with a new string value (using the default SHA1 hash function, which requires bytes as input): .. code-block:: python hll = HyperLogLog() hll.update("new value".encode('utf-8')) We can also use a different hash function, for example, `pyfarmhash`: .. code-block:: python import farmhash def _hash_32(b): return farmhash.hash32(b) hll = HyperLogLog(hashfunc=_hash_32) hll.update("new value") """ # Digest the hash object to get the hash value hv = self.hashfunc(b) # Get the index of the register using the first p bits of the hash reg_index = hv & (self.m - 1) # Get the rest of the hash bits = hv >> self.p # Update the register self.reg[reg_index] = max(self.reg[reg_index], self._get_rank(bits))
[docs] def count(self) -> float: """ Estimate the cardinality of the data values seen so far. Returns: float: The estimated cardinality. """ # Use HyperLogLog estimation function e = self.alpha * float(self.m**2) / np.sum(2.0 ** (-self.reg)) # Small range correction small_range_threshold = (5.0 / 2.0) * self.m if abs(e - small_range_threshold) / small_range_threshold < 0.15: warnings.warn( ( "Warning: estimate is close to error correction threshold. " + "Output may not satisfy HyperLogLog accuracy guarantee." ) ) if e <= small_range_threshold: num_zero = self.m - np.count_nonzero(self.reg) return self._linearcounting(num_zero) # Normal range, no correction if e <= (1.0 / 30.0) * (1 << 32): return e # Large range correction return self._largerange_correction(e)
[docs] def merge(self, other: HyperLogLog) -> None: """ Merge the other HyperLogLog with this one, making this the union of the two. Args: other (HyperLogLog): The other HyperLogLog to be merged. """ if self.m != other.m or self.p != other.p: raise ValueError( "Cannot merge HyperLogLog with different\ precisions." ) self.reg = np.maximum(self.reg, other.reg)
[docs] def digest(self) -> np.ndarray: """ Returns: numpy.array: The current internal state. """ return copy.copy(self.reg)
[docs] def copy(self) -> HyperLogLog: """ Create a copy of the current HyperLogLog by exporting its state. Returns: HyperLogLog: A copy of the current HyperLogLog. """ return self.__class__(reg=self.digest(), hashfunc=self.hashfunc)
[docs] def is_empty(self) -> bool: """ Returns: bool: True if the current HyperLogLog is empty - at the state of just initialized. """ if np.any(self.reg): return False return True
[docs] def clear(self) -> None: """ Reset the current HyperLogLog to empty. """ self.reg = np.zeros((self.m,), dtype=np.int8)
[docs] def __len__(self) -> int: """ Returns: int: Get the size of the HyperLogLog as the size of `reg`. """ return len(self.reg)
[docs] def __eq__(self, other: HyperLogLog) -> bool: """ Check equivalence between two HyperLogLogs Args: other (HyperLogLog): Returns: bool: True if both have the same internal state. """ return ( type(self) is type(other) and self.p == other.p and self.m == other.m and np.array_equal(self.reg, other.reg) )
def _get_rank(self, bits): rank = self.max_rank - _bit_length(bits) + 1 if rank <= 0: raise ValueError( "Hash value overflow, maximum size is %d\ bits" % self.max_rank ) return rank def _linearcounting(self, num_zero): return self.m * np.log(self.m / float(num_zero)) def _largerange_correction(self, e): return -(1 << 32) * np.log(1.0 - e / (1 << 32)) @classmethod def union(cls, *hyperloglogs: HyperLogLog) -> HyperLogLog: if len(hyperloglogs) < 2: raise ValueError( "Cannot union less than 2 HyperLogLog\ sketches" ) m = hyperloglogs[0].m if not all(h.m == m for h in hyperloglogs): raise ValueError( "Cannot union HyperLogLog sketches with\ different precisions" ) reg = np.maximum.reduce([h.reg for h in hyperloglogs]) h = cls(reg=reg) return h
[docs] def bytesize(self) -> int: """Get the size of the HyperLogLog in bytes.""" # Since p is no larger than 64, use 8 bits p_size = struct.calcsize("B") # Each register value is no larger than 64, use 8 bits # TODO: is there a way to use 5 bits instead of 8 bits # to store integer in Python? reg_val_size = struct.calcsize("B") return p_size + reg_val_size * self.m
def serialize(self, buf): if len(buf) < self.bytesize(): raise ValueError( "The buffer does not have enough space\ for holding this HyperLogLog." ) fmt = "B%dB" % self.m struct.pack_into(fmt, buf, 0, self.p, *self.reg) @classmethod def deserialize(cls, buf): size = struct.calcsize("B") try: p = struct.unpack_from("B", buf, 0)[0] except TypeError: p = struct.unpack_from("B", buffer(buf), 0)[0] h = cls(p) offset = size try: h.reg = np.array( struct.unpack_from("%dB" % h.m, buf, offset), dtype=np.int8 ) except TypeError: h.reg = np.array( struct.unpack_from("%dB" % h.m, buffer(buf), offset), dtype=np.int8 ) return h
[docs] def __getstate__(self): buf = bytearray(self.bytesize()) self.serialize(buf) return buf
def __setstate__(self, buf): size = struct.calcsize("B") try: p = struct.unpack_from("B", buf, 0)[0] except TypeError: p = struct.unpack_from("B", buffer(buf), 0)[0] self.__init__(p=p) offset = size try: self.reg = np.array( struct.unpack_from("%dB" % self.m, buf, offset), dtype=np.int8 ) except TypeError: self.reg = np.array( struct.unpack_from("%dB" % self.m, buffer(buf), offset), dtype=np.int8 )
[docs] class HyperLogLogPlusPlus(HyperLogLog): """ HyperLogLog++ is an enhanced HyperLogLog `from Google <http://research.google.com/pubs/pub40671.html>`_. Main changes from the original HyperLogLog: 1. Use 64 bits instead of 32 bits for hash function 2. A new small-cardinality estimation scheme 3. Sparse representation (not implemented here) Args: p (int): The precision parameter. It is ignored if the `reg` is given. reg (Optional[numpy.array]): The internal state. This argument is for initializing the HyperLogLog from an existing one. hashfunc (Callable): The hash function used by this MinHash. It takes the input passed to the `update` method and returns an integer that can be encoded with 64 bits. The default hash function is based on SHA1 from hashlib_. hashobj (**deprecated**): This argument is deprecated since version 1.4.0. It is a no-op and has been replaced by `hashfunc`. """ _hash_range_bit = 64 _hash_range_byte = 8
[docs] def __init__( self, p: int = 8, reg: Optional[np.ndarray] = None, hashfunc: Callable = sha1_hash64, hashobj: Optional[object] = None, ): super(HyperLogLogPlusPlus, self).__init__( p=p, reg=reg, hashfunc=hashfunc, hashobj=hashobj )
def _get_threshold(self, p): return _thresholds[p - 4] def _estimate_bias(self, e, p): bias_vector = _bias[p - 4] estimate_vector = _raw_estimate[p - 4] nearest_neighbors = np.argsort((e - estimate_vector) ** 2)[:6] return np.mean(bias_vector[nearest_neighbors])
[docs] def count(self) -> float: """Estimate the cardinality of the data values seen so far.""" num_zero = self.m - np.count_nonzero(self.reg) if num_zero > 0: # linear counting lc = self._linearcounting(num_zero) if lc <= self._get_threshold(self.p): return lc # Use HyperLogLog estimation function e = self.alpha * float(self.m**2) / np.sum(2.0 ** (-self.reg)) if e <= 5 * self.m: return e - self._estimate_bias(e, self.p) else: return e