Rust implementation of the CVM algorithm for counting distinct elements in a stream
1//! An implementation of the CVM fast element counting algorithm presented in
2//! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. <https://doi.org/10.4230/LIPIcs.ESA.2022.34>
3
4use rand::rngs::ThreadRng;
5use rand::Rng;
6
7use rustc_hash::FxHashSet;
8use std::hash::Hash;
9
10/// A counter implementing the CVM algorithm
11///
12/// Note that the CVM struct's buffer takes ownership of its elements.
13pub struct CVM<T: PartialEq + Eq + Hash> {
14 buf_size: usize,
15 buf: FxHashSet<T>,
16 probability: f64,
17 rng: ThreadRng,
18}
19
20impl<T: PartialEq + Eq + Hash> CVM<T> {
21 /// Initialise the algorithm
22 ///
23 /// epsilon: how close you want your estimate to be to the true number of distinct elements.
24 /// A smaller ε means you require a more precise estimate.
25 /// For example, ε = 0.05 means you want your estimate to be within 5% of the actual value.
26 /// An epsilon of 0.8 is a good starting point for most applications.
27 ///
28 /// delta: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence
29 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a
30 /// higher chance the estimate might be outside the desired range.
31 /// A delta of 0.1 is a good starting point for most applications.
32 ///
33 /// stream_size: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size,
34 /// the more accurate the result will be.
35 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self {
36 let bufsize = buffer_size(epsilon, delta, stream_size);
37 Self {
38 buf_size: bufsize,
39 buf: FxHashSet::with_capacity_and_hasher(bufsize, Default::default()),
40 probability: 1.0,
41 rng: rand::thread_rng(),
42 }
43 }
44 /// Add an element, potentially updating the unique element count
45 pub fn process_element(&mut self, elem: T) {
46 // We should switch to a treap (as per Knuth) to avoid the hash overhead, but FxHash
47 // is still a lot faster than linear searching a Vec, even at small (1000) buffer sizes
48 // Round 0: if an element exists, remove it. Element is added back due to probability 1
49 // When buffer is full, remove half the elements
50 // Round 1: if an element exists, remove it. Element MAY be added back due to probability 0.5
51 if self.buf.contains(&elem) {
52 self.buf.remove(&elem);
53 }
54 if self.rng.gen_bool(self.probability) {
55 self.buf.insert(elem);
56 }
57 while self.buf.len() == self.buf_size {
58 self.clear_about_half();
59 self.probability /= 2.0;
60 }
61 }
62 // remove around half of the elements at random
63 fn clear_about_half(&mut self) {
64 self.buf.retain(|_| self.rng.gen_bool(0.5));
65 }
66 /// Calculate the current unique element count. You can continue to add elements after calling this method.
67 pub fn calculate_final_result(&self) -> f64 {
68 self.buf.len() as f64 / self.probability
69 }
70}
71
72// Calculate threshold (buf_size) value for the F0-Estimator algorithm
73fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize {
74 ((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize
75}