Rust implementation of the CVM algorithm for counting distinct elements in a stream
0

Configure Feed

Select the types of activity you want to include in your feed.

Use treap in cvm algorithm impl

author
Stephan Hügel
date (Jun 29, 2025, 10:35 PM +0100) commit 7230c0ac parent 12c964c2 change-id kmpxzukn
+32 -24
+32 -24
src/lib.rs
··· 1 1 //! An implementation of the CVM fast element counting algorithm presented in 2 2 //! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. <https://doi.org/10.4230/LIPIcs.ESA.2022.34> 3 + //! 4 + //! This implementation uses a treap data structure as the buffer, following Knuth's original design. 5 + 6 + mod treap; 3 7 8 + use crate::treap::Treap; 4 9 use rand::rngs::StdRng; 5 10 use rand::{Rng, SeedableRng}; 6 11 7 - use rustc_hash::FxHashSet; 8 - use std::hash::Hash; 9 - 10 12 /// A counter implementing the CVM algorithm 11 13 /// 14 + /// This implementation uses a treap (randomized binary search tree) as the buffer, 15 + /// which provides `O(log n)` operations while maintaining the probabilistic properties 16 + /// needed for the algorithm. 17 + /// 12 18 /// Note that the CVM struct's buffer takes ownership of its elements. 13 - pub struct CVM<T: PartialEq + Eq + Hash> { 19 + pub struct CVM<T: Ord> { 14 20 buf_size: usize, 15 - buf: FxHashSet<T>, 21 + buf: Treap<T>, 16 22 probability: f64, 17 23 rng: StdRng, 18 24 } 19 25 20 - impl<T: PartialEq + Eq + Hash> CVM<T> { 26 + impl<T: Ord> CVM<T> { 21 27 /// Initialise the algorithm 22 28 /// 23 - /// epsilon: how close you want your estimate to be to the true number of distinct elements. 24 - /// A smaller ε means you require a more precise estimate. 25 - /// For example, ε = 0.05 means you want your estimate to be within 5% of the actual value. 26 - /// An epsilon of 0.8 is a good starting point for most applications. 29 + /// `epsilon`: how close you want your estimate to be to the true number of distinct elements. 30 + /// A smaller `ε` means you require a more precise estimate. 31 + /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the actual value. 32 + /// An epsilon of `0.8` is a good starting point for most applications. 27 33 /// 28 - /// delta: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence 34 + /// `delta`: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence 29 35 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a 30 36 /// higher chance the estimate might be outside the desired range. 31 - /// A delta of 0.1 is a good starting point for most applications. 37 + /// A `delta` of `0.1` is a good starting point for most applications. 32 38 /// 33 - /// stream_size: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size, 39 + /// `stream_size`: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size, 34 40 /// the more accurate the result will be. 35 41 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self { 36 42 let bufsize = buffer_size(epsilon, delta, stream_size); 37 43 Self { 38 44 buf_size: bufsize, 39 - buf: FxHashSet::with_capacity_and_hasher(bufsize, Default::default()), 45 + buf: Treap::new(), 40 46 probability: 1.0, 41 47 rng: StdRng::from_entropy(), 42 48 } 43 49 } 44 50 /// Add an element, potentially updating the unique element count 45 51 pub fn process_element(&mut self, elem: T) { 46 - // We should switch to a treap (as per Knuth) to avoid the hash overhead, but FxHash 47 - // is still a lot faster than linear searching a Vec, even at small (1000) buffer sizes 48 - // Round 0: if an element exists, remove it. Element is added back due to probability 1 49 - // When buffer is full, remove half the elements 50 - // Round 1: if an element exists, remove it. Element MAY be added back due to probability 0.5 52 + // The algorithm works as follows: 53 + // 1. If element exists in buffer, remove it (this ensures proper sampling) 54 + // 2. Add element back with current probability 55 + // 3. If buffer is full, remove ~half the elements and halve the probability 56 + // This creates a geometric sampling scheme that provides an unbiased estimate 51 57 if self.buf.contains(&elem) { 52 58 self.buf.remove(&elem); 53 59 } 54 60 if self.rng.gen_bool(self.probability) { 55 - self.buf.insert(elem); 61 + self.buf.insert(elem, &mut self.rng); 56 62 } 57 63 while self.buf.len() == self.buf_size { 58 64 self.clear_about_half(); ··· 61 67 } 62 68 // remove around half of the elements at random 63 69 fn clear_about_half(&mut self) { 64 - self.buf.retain(|_| self.rng.gen_bool(0.5)); 70 + // Need to capture rng reference to use in closure 71 + let rng = &mut self.rng; 72 + self.buf.retain(|_| rng.gen_bool(0.5)); 65 73 } 66 74 /// Calculate the current unique element count. You can continue to add elements after calling this method. 67 75 pub fn calculate_final_result(&self) -> f64 { ··· 82 90 path::Path, 83 91 }; 84 92 85 - use super::*; 86 93 use regex::Regex; 94 + use std::collections::HashSet; 87 95 88 96 fn open_file<P>(filename: P) -> BufReader<File> 89 97 where ··· 93 101 BufReader::new(f) 94 102 } 95 103 96 - fn line_to_word(re: &Regex, hs: &mut FxHashSet<String>, line: &str) { 104 + fn line_to_word(re: &Regex, hs: &mut HashSet<String>, line: &str) { 97 105 let words = line.split(' '); 98 106 words.for_each(|word| { 99 107 let clean_word = re.replace_all(word, "").to_lowercase(); ··· 105 113 let input_file = "benches/kiy.txt"; 106 114 let re = Regex::new(r"[^\w\s]").unwrap(); 107 115 let br = open_file(input_file); 108 - let mut hs = FxHashSet::with_hasher(Default::default()); 116 + let mut hs = HashSet::new(); 109 117 br.lines() 110 118 .for_each(|line| line_to_word(&re, &mut hs, &line.unwrap())); 111 119 assert_eq!(hs.len(), 9016)