Rust implementation of the CVM algorithm for counting distinct elements in a stream
0

Configure Feed

Select the types of activity you want to include in your feed.

at previous 3.1 kB View raw
1//! An implementation of the CVM fast element counting algorithm presented in 2//! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. https://doi.org/10.4230/LIPIcs.ESA.2022.34 3 4use rand::rngs::ThreadRng; 5use rand::Rng; 6 7/// A counter implementing the CVM algorithm 8/// 9/// Note that the CVM struct's buffer takes ownership of its elements. 10pub struct CVM<T: PartialOrd + PartialEq> { 11 buf_size: usize, 12 buf: Vec<T>, 13 probability: f64, 14 rng: ThreadRng, 15} 16 17impl<T: PartialOrd + PartialEq> CVM<T> { 18 /// Initialise the algorithm 19 /// 20 /// epsilon: how close you want your estimate to be to the true number of distinct elements. 21 /// A smaller ε means you require a more precise estimate. 22 /// For example, ε = 0.05 means you want your estimate to be within 5% of the actual value. 23 /// An epsilon of 0.8 is a good starting point for most applications. 24 /// 25 /// delta: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence 26 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a 27 /// higher chance the estimate might be outside the desired range. 28 /// A delta of 0.1 is a good starting point for most applications. 29 /// 30 /// stream_size: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size, 31 /// the more accurate the result will be. 32 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self { 33 let bufsize = buffer_size(epsilon, delta, stream_size); 34 Self { 35 buf_size: bufsize, 36 buf: Vec::with_capacity(bufsize), 37 probability: 1.0, 38 rng: rand::thread_rng(), 39 } 40 } 41 /// Add an element, potentially updating the unique element count 42 pub fn process_element(&mut self, elem: T) { 43 // linear search 44 // I think this will be faster than a hashset for practical sizes 45 // Should really switch to a treap as per Knuth 46 if let Some(pos) = self.buf.iter().position(|x| *x == elem) { 47 self.buf.swap_remove(pos); 48 } 49 if self.rng.gen_bool(self.probability) { 50 self.buf.push(elem); 51 } 52 while self.buf.len() == self.buf_size { 53 self.clear_about_half(); 54 self.probability /= 2.0; 55 } 56 } 57 // remove around half of the elements at random 58 fn clear_about_half(&mut self) { 59 self.buf.retain(|_| self.rng.gen_bool(0.5)); 60 } 61 /// Calculate the current unique element count. You can continue to add elements after calling this method. 62 pub fn calculate_final_result(&self) -> f64 { 63 self.buf.len() as f64 / self.probability 64 } 65} 66 67// Calculate threshold (buf_size) value for the F0-Estimator algorithm 68fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize { 69 ((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize 70}