Rust implementation of the CVM algorithm for counting distinct elements in a stream
0

Configure Feed

Select the types of activity you want to include in your feed.

1//! An implementation of the CVM fast element counting algorithm presented in 2//! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. <https://doi.org/10.4230/LIPIcs.ESA.2022.34> 3 4use rand::rngs::ThreadRng; 5use rand::Rng; 6 7use rustc_hash::FxHashSet; 8use std::hash::Hash; 9 10/// A counter implementing the CVM algorithm 11/// 12/// Note that the CVM struct's buffer takes ownership of its elements. 13pub struct CVM<T: PartialEq + Eq + Hash> { 14 buf_size: usize, 15 buf: FxHashSet<T>, 16 probability: f64, 17 rng: ThreadRng, 18} 19 20impl<T: PartialEq + Eq + Hash> CVM<T> { 21 /// Initialise the algorithm 22 /// 23 /// epsilon: how close you want your estimate to be to the true number of distinct elements. 24 /// A smaller ε means you require a more precise estimate. 25 /// For example, ε = 0.05 means you want your estimate to be within 5% of the actual value. 26 /// An epsilon of 0.8 is a good starting point for most applications. 27 /// 28 /// delta: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence 29 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a 30 /// higher chance the estimate might be outside the desired range. 31 /// A delta of 0.1 is a good starting point for most applications. 32 /// 33 /// stream_size: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size, 34 /// the more accurate the result will be. 35 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self { 36 let bufsize = buffer_size(epsilon, delta, stream_size); 37 Self { 38 buf_size: bufsize, 39 buf: FxHashSet::with_capacity_and_hasher(bufsize, Default::default()), 40 probability: 1.0, 41 rng: rand::thread_rng(), 42 } 43 } 44 /// Add an element, potentially updating the unique element count 45 pub fn process_element(&mut self, elem: T) { 46 // We should switch to a treap (as per Knuth) to avoid the hash overhead, but FxHash 47 // is still a lot faster than linear searching a Vec, even at small (1000) buffer sizes 48 // Round 0: if an element exists, remove it. Element is added back due to probability 1 49 // When buffer is full, remove half the elements 50 // Round 1: if an element exists, remove it. Element MAY be added back due to probability 0.5 51 if self.buf.contains(&elem) { 52 self.buf.remove(&elem); 53 } 54 if self.rng.gen_bool(self.probability) { 55 self.buf.insert(elem); 56 } 57 while self.buf.len() == self.buf_size { 58 self.clear_about_half(); 59 self.probability /= 2.0; 60 } 61 } 62 // remove around half of the elements at random 63 fn clear_about_half(&mut self) { 64 self.buf.retain(|_| self.rng.gen_bool(0.5)); 65 } 66 /// Calculate the current unique element count. You can continue to add elements after calling this method. 67 pub fn calculate_final_result(&self) -> f64 { 68 self.buf.len() as f64 / self.probability 69 } 70} 71 72// Calculate threshold (buf_size) value for the F0-Estimator algorithm 73fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize { 74 ((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize 75}