Rust implementation of the CVM algorithm for counting distinct elements in a stream
0

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #3 from urschrei/shugel/push-rpuxlkvuoqkp

Switch to a treap

+350 -29
+1 -1
Cargo.toml
··· 5 5 license = "BlueOak-1.0.0" 6 6 repository = "https://github.com/urschrei/cvmcount" 7 7 documentation = "https://docs.rs/cvmcount" 8 - keywords = ["CVM", "count-distinct", "estimation"] 8 + keywords = ["CVM", "count-distinct", "estimation", "treap"] 9 9 categories = ["algorithms", ] 10 10 11 11 version = "0.3.1"
+2 -2
README.md
··· 12 12 In order to overcome this constraint, streaming algorithms have been developed: [Flajolet-Martin](https://en.wikipedia.org/wiki/Flajolet–Martin_algorithm), LogLog, [HyperLogLog](https://en.wikipedia.org/wiki/HyperLogLog). The algorithm implemented by this library is an improvement on these in one particular sense: it is extremely simple. Instead of hashing, it uses a sampling method to compute an [unbiased estimate](https://www.statlect.com/glossary/unbiased-estimator#:~:text=An%20estimator%20of%20a%20given,Examples) of the cardinality. 13 13 14 14 # What is an Element 15 - In this implementation, an element is anything implementing the `Eq` + `PartialEq` + `Hash` traits: various integer flavours, strings, any Struct on which you have implemented the traits. Not `f32` / `f64`, however. 15 + In this implementation, an element is anything implementing the `Ord` trait: various integer flavours, strings, any Struct on which you have implemented the trait. Not `f32` / `f64`, however (unless wrapped in an ordered wrapper type). 16 16 17 17 ## Ownership 18 18 The buffer has to keep ownership of its elements. In practice, this is not a problem: relative to its input stream size, the buffer is very small. This is also the point of the algorithm: your data set is very large and your working memory is small; you **don't** want to keep the original data around in order to store references to it! Thus, if you have `&str` elements you will need to create new `String`s to store them. If you're processing text data you'll probably want to strip punctuation and regularise the case, so you'll need new `String`s anyway. If you're processing strings containing numeric values, parsing them to the appropriate integer type (which implements `Copy`) first seems like a reasonable approach. ··· 21 21 Don Knuth has written about the algorithm (he refers to it as **Algorithm D**) at https://cs.stanford.edu/~knuth/papers/cvm-note.pdf, and does a far better job than I do at explaining it. You will note that on p1 he describes the buffer he uses as a data structure – called a [treap](https://en.wikipedia.org/wiki/Treap#:~:text=7%20External%20links-,Description,(randomly%20chosen)%20numeric%20priority.) – as a binary tree 22 22 > "that’s capable of holding up to _s_ ordered pairs (_a_, _u_), where _a_ is an element of the stream and _u_ is a real number, 0 ≤ _u_ < 1." 23 23 24 - where _s_ >= 1. Our implementation doesn't use a treap as a buffer; it uses a fast HashSet with the [FxHash](https://docs.rs/fxhash/latest/fxhash/) algorithm: we pay the hash cost when inserting, but search in step **D4** is `O(1)`. The library may switch to a treap implementation eventually. 24 + where _s_ >= 1. This implementation uses a treap as a buffer, following Knuth's original design. While this results in O(log n) operations instead of O(1) for hash-based approaches, it provides better cache locality for small buffers and eliminates hash collision overhead. 25 25 26 26 # What does this library provide 27 27 Two things: the crate / library, and a command-line utility (`cvmcount`) which will count the unique strings in an input text file.
+2 -2
benches/benchmarks.rs
··· 11 11 use rand::{thread_rng, Rng}; 12 12 use regex::Regex; 13 13 14 - use rustc_hash::FxHashSet; 14 + use std::collections::HashSet; 15 15 16 16 // generate 1 million 7-digit random positive integers 17 17 fn generate_random_numbers() -> Vec<i32> { ··· 78 78 |b| { 79 79 let digits = generate_random_numbers(); 80 80 b.iter(|| { 81 - let mut hs = FxHashSet::with_hasher(Default::default()); 81 + let mut hs = HashSet::new(); 82 82 digits.iter().for_each(|digit| { 83 83 hs.insert(digit); 84 84 });
+32 -24
src/lib.rs
··· 1 1 //! An implementation of the CVM fast element counting algorithm presented in 2 2 //! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. <https://doi.org/10.4230/LIPIcs.ESA.2022.34> 3 + //! 4 + //! This implementation uses a treap data structure as the buffer, following Knuth's original design. 5 + 6 + mod treap; 3 7 8 + use crate::treap::Treap; 4 9 use rand::rngs::StdRng; 5 10 use rand::{Rng, SeedableRng}; 6 11 7 - use rustc_hash::FxHashSet; 8 - use std::hash::Hash; 9 - 10 12 /// A counter implementing the CVM algorithm 11 13 /// 14 + /// This implementation uses a treap (randomized binary search tree) as the buffer, 15 + /// which provides `O(log n)` operations while maintaining the probabilistic properties 16 + /// needed for the algorithm. 17 + /// 12 18 /// Note that the CVM struct's buffer takes ownership of its elements. 13 - pub struct CVM<T: PartialEq + Eq + Hash> { 19 + pub struct CVM<T: Ord> { 14 20 buf_size: usize, 15 - buf: FxHashSet<T>, 21 + buf: Treap<T>, 16 22 probability: f64, 17 23 rng: StdRng, 18 24 } 19 25 20 - impl<T: PartialEq + Eq + Hash> CVM<T> { 26 + impl<T: Ord> CVM<T> { 21 27 /// Initialise the algorithm 22 28 /// 23 - /// epsilon: how close you want your estimate to be to the true number of distinct elements. 24 - /// A smaller ε means you require a more precise estimate. 25 - /// For example, ε = 0.05 means you want your estimate to be within 5% of the actual value. 26 - /// An epsilon of 0.8 is a good starting point for most applications. 29 + /// `epsilon`: how close you want your estimate to be to the true number of distinct elements. 30 + /// A smaller `ε` means you require a more precise estimate. 31 + /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the actual value. 32 + /// An epsilon of `0.8` is a good starting point for most applications. 27 33 /// 28 - /// delta: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence 34 + /// `delta`: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence 29 35 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a 30 36 /// higher chance the estimate might be outside the desired range. 31 - /// A delta of 0.1 is a good starting point for most applications. 37 + /// A `delta` of `0.1` is a good starting point for most applications. 32 38 /// 33 - /// stream_size: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size, 39 + /// `stream_size`: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size, 34 40 /// the more accurate the result will be. 35 41 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self { 36 42 let bufsize = buffer_size(epsilon, delta, stream_size); 37 43 Self { 38 44 buf_size: bufsize, 39 - buf: FxHashSet::with_capacity_and_hasher(bufsize, Default::default()), 45 + buf: Treap::new(), 40 46 probability: 1.0, 41 47 rng: StdRng::from_entropy(), 42 48 } 43 49 } 44 50 /// Add an element, potentially updating the unique element count 45 51 pub fn process_element(&mut self, elem: T) { 46 - // We should switch to a treap (as per Knuth) to avoid the hash overhead, but FxHash 47 - // is still a lot faster than linear searching a Vec, even at small (1000) buffer sizes 48 - // Round 0: if an element exists, remove it. Element is added back due to probability 1 49 - // When buffer is full, remove half the elements 50 - // Round 1: if an element exists, remove it. Element MAY be added back due to probability 0.5 52 + // The algorithm works as follows: 53 + // 1. If element exists in buffer, remove it (this ensures proper sampling) 54 + // 2. Add element back with current probability 55 + // 3. If buffer is full, remove ~half the elements and halve the probability 56 + // This creates a geometric sampling scheme that provides an unbiased estimate 51 57 if self.buf.contains(&elem) { 52 58 self.buf.remove(&elem); 53 59 } 54 60 if self.rng.gen_bool(self.probability) { 55 - self.buf.insert(elem); 61 + self.buf.insert(elem, &mut self.rng); 56 62 } 57 63 while self.buf.len() == self.buf_size { 58 64 self.clear_about_half(); ··· 61 67 } 62 68 // remove around half of the elements at random 63 69 fn clear_about_half(&mut self) { 64 - self.buf.retain(|_| self.rng.gen_bool(0.5)); 70 + // Need to capture rng reference to use in closure 71 + let rng = &mut self.rng; 72 + self.buf.retain(|_| rng.gen_bool(0.5)); 65 73 } 66 74 /// Calculate the current unique element count. You can continue to add elements after calling this method. 67 75 pub fn calculate_final_result(&self) -> f64 { ··· 82 90 path::Path, 83 91 }; 84 92 85 - use super::*; 86 93 use regex::Regex; 94 + use std::collections::HashSet; 87 95 88 96 fn open_file<P>(filename: P) -> BufReader<File> 89 97 where ··· 93 101 BufReader::new(f) 94 102 } 95 103 96 - fn line_to_word(re: &Regex, hs: &mut FxHashSet<String>, line: &str) { 104 + fn line_to_word(re: &Regex, hs: &mut HashSet<String>, line: &str) { 97 105 let words = line.split(' '); 98 106 words.for_each(|word| { 99 107 let clean_word = re.replace_all(word, "").to_lowercase(); ··· 105 113 let input_file = "benches/kiy.txt"; 106 114 let re = Regex::new(r"[^\w\s]").unwrap(); 107 115 let br = open_file(input_file); 108 - let mut hs = FxHashSet::with_hasher(Default::default()); 116 + let mut hs = HashSet::new(); 109 117 br.lines() 110 118 .for_each(|line| line_to_word(&re, &mut hs, &line.unwrap())); 111 119 assert_eq!(hs.len(), 9016)
+313
src/treap.rs
··· 1 + //! A randomized binary search tree (treap) implementation 2 + //! 3 + //! A treap maintains both BST property (for keys) and heap property (for priorities). 4 + //! 5 + //! This implementation was inspired by the treap exploration in <https://github.com/apanda/cvm> 6 + //! (BSD-2-Clause license), but is an independent implementation tailored specifically 7 + //! for the CVM algorithm's requirements. 8 + //! 9 + //! ## Key Differences from apanda/cvm treap: 10 + //! 11 + //! 1. **Simpler structure**: We don't use a separate Element type; keys and priorities are 12 + //! stored directly in nodes 13 + //! 2. **Random priorities**: apanda's implementation expects explicit priorities, while ours 14 + //! generates random priorities at insertion time 15 + //! 3. **No allocation tracking**: apanda uses `alloc_counter` for performance analysis 16 + //! 4. **Simplified delete**: Our delete returns a bool, apanda's has more complex handling 17 + //! 5. **Retain operation**: We added a specialized `retain` method for CVM's "clear half" 18 + //! 6. **No Display trait**: We focus on the minimal API needed for CVM 19 + //! 7. **Insert behavior**: apanda's `insert_or_replace` updates existing elements; ours 20 + //! keeps the original (no update) which is what CVM needs 21 + //! 22 + //! ## Design Decisions 23 + //! 24 + //! Unlike general-purpose treap implementations, this one is optimized for CVM: 25 + //! - No key-value mapping: CVM only needs to track unique elements 26 + //! - Simplified API: Only operations needed for CVM are implemented 27 + //! - Efficient `retain`: Optimized for the "clear about half" operation 28 + //! - RNG integration: Accepts an external RNG for consistent randomness 29 + 30 + use rand::Rng; 31 + use std::cmp::Ordering; 32 + 33 + /// A node in the treap 34 + struct Node<T> { 35 + key: T, 36 + priority: u32, 37 + left: Option<Box<Node<T>>>, 38 + right: Option<Box<Node<T>>>, 39 + } 40 + 41 + impl<T> Node<T> { 42 + fn new(key: T, priority: u32) -> Self { 43 + Node { 44 + key, 45 + priority, 46 + left: None, 47 + right: None, 48 + } 49 + } 50 + } 51 + 52 + /// A treap data structure 53 + /// 54 + /// Key differences from typical treap implementations: 55 + /// 1. Priorities are generated at insertion time using the provided RNG 56 + /// 2. The `retain` operation is optimized for the CVM algorithm's "clear half" operation 57 + /// 3. No support for key-value pairs - only keys are stored (values are implicit) 58 + /// 4. No split operation as it's not needed for CVM 59 + /// 5. Insert doesn't update existing keys - matching CVM's requirement 60 + pub struct Treap<T> { 61 + root: Option<Box<Node<T>>>, 62 + size: usize, 63 + } 64 + 65 + impl<T: Ord> Treap<T> { 66 + /// Create a new empty treap 67 + pub fn new() -> Self { 68 + Treap { 69 + root: None, 70 + size: 0, 71 + } 72 + } 73 + 74 + /// Get the number of elements in the treap 75 + pub fn len(&self) -> usize { 76 + self.size 77 + } 78 + 79 + /// Check if the treap is empty 80 + #[allow(dead_code)] 81 + pub fn is_empty(&self) -> bool { 82 + self.size == 0 83 + } 84 + 85 + /// Insert a key with a random priority 86 + pub fn insert<R: Rng>(&mut self, key: T, rng: &mut R) { 87 + let priority = rng.gen(); 88 + self.root = Self::insert_node(self.root.take(), key, priority); 89 + self.size += 1; 90 + } 91 + 92 + /// Check if the treap contains a key 93 + pub fn contains(&self, key: &T) -> bool { 94 + Self::contains_node(&self.root, key) 95 + } 96 + 97 + /// Remove a key from the treap 98 + pub fn remove(&mut self, key: &T) -> bool { 99 + let (new_root, removed) = Self::remove_node(self.root.take(), key); 100 + self.root = new_root; 101 + if removed { 102 + self.size -= 1; 103 + } 104 + removed 105 + } 106 + 107 + /// Clear the treap 108 + #[allow(dead_code)] 109 + pub fn clear(&mut self) { 110 + self.root = None; 111 + self.size = 0; 112 + } 113 + 114 + /// Apply a function to each element, removing those for which it returns false 115 + pub fn retain<F>(&mut self, mut f: F) 116 + where 117 + F: FnMut(&T) -> bool, 118 + { 119 + let (new_root, new_size) = Self::retain_node(self.root.take(), &mut f); 120 + self.root = new_root; 121 + self.size = new_size; 122 + } 123 + 124 + // Helper function to insert a node 125 + fn insert_node(node: Option<Box<Node<T>>>, key: T, priority: u32) -> Option<Box<Node<T>>> { 126 + match node { 127 + None => Some(Box::new(Node::new(key, priority))), 128 + Some(mut n) => { 129 + match key.cmp(&n.key) { 130 + Ordering::Less => { 131 + n.left = Self::insert_node(n.left, key, priority); 132 + // Maintain heap property 133 + if n.left.as_ref().unwrap().priority > n.priority { 134 + Self::rotate_right(n) 135 + } else { 136 + Some(n) 137 + } 138 + } 139 + Ordering::Greater => { 140 + n.right = Self::insert_node(n.right, key, priority); 141 + // Maintain heap property 142 + if n.right.as_ref().unwrap().priority > n.priority { 143 + Self::rotate_left(n) 144 + } else { 145 + Some(n) 146 + } 147 + } 148 + Ordering::Equal => Some(n), // Key already exists, do nothing 149 + } 150 + } 151 + } 152 + } 153 + 154 + // Helper function to check if a node contains a key 155 + fn contains_node(node: &Option<Box<Node<T>>>, key: &T) -> bool { 156 + match node { 157 + None => false, 158 + Some(n) => match key.cmp(&n.key) { 159 + Ordering::Less => Self::contains_node(&n.left, key), 160 + Ordering::Greater => Self::contains_node(&n.right, key), 161 + Ordering::Equal => true, 162 + }, 163 + } 164 + } 165 + 166 + // Helper function to remove a node 167 + fn remove_node(node: Option<Box<Node<T>>>, key: &T) -> (Option<Box<Node<T>>>, bool) { 168 + match node { 169 + None => (None, false), 170 + Some(mut n) => match key.cmp(&n.key) { 171 + Ordering::Less => { 172 + let (new_left, removed) = Self::remove_node(n.left, key); 173 + n.left = new_left; 174 + (Some(n), removed) 175 + } 176 + Ordering::Greater => { 177 + let (new_right, removed) = Self::remove_node(n.right, key); 178 + n.right = new_right; 179 + (Some(n), removed) 180 + } 181 + Ordering::Equal => { 182 + // Found the node to remove 183 + (Self::merge(n.left, n.right), true) 184 + } 185 + }, 186 + } 187 + } 188 + 189 + // Merge two subtrees 190 + fn merge(left: Option<Box<Node<T>>>, right: Option<Box<Node<T>>>) -> Option<Box<Node<T>>> { 191 + match (left, right) { 192 + (None, right) => right, 193 + (left, None) => left, 194 + (Some(l), Some(r)) => { 195 + if l.priority > r.priority { 196 + let mut l = l; 197 + l.right = Self::merge(l.right, Some(r)); 198 + Some(l) 199 + } else { 200 + let mut r = r; 201 + r.left = Self::merge(Some(l), r.left); 202 + Some(r) 203 + } 204 + } 205 + } 206 + } 207 + 208 + // Rotate right 209 + fn rotate_right(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> { 210 + let mut new_root = node.left.take().unwrap(); 211 + node.left = new_root.right.take(); 212 + new_root.right = Some(node); 213 + Some(new_root) 214 + } 215 + 216 + // Rotate left 217 + fn rotate_left(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> { 218 + let mut new_root = node.right.take().unwrap(); 219 + node.right = new_root.left.take(); 220 + new_root.left = Some(node); 221 + Some(new_root) 222 + } 223 + 224 + // Retain nodes that satisfy the predicate 225 + fn retain_node<F>(node: Option<Box<Node<T>>>, f: &mut F) -> (Option<Box<Node<T>>>, usize) 226 + where 227 + F: FnMut(&T) -> bool, 228 + { 229 + match node { 230 + None => (None, 0), 231 + Some(mut n) => { 232 + let (new_left, left_size) = Self::retain_node(n.left, f); 233 + let (new_right, right_size) = Self::retain_node(n.right, f); 234 + 235 + if f(&n.key) { 236 + n.left = new_left; 237 + n.right = new_right; 238 + (Some(n), left_size + right_size + 1) 239 + } else { 240 + // Remove this node by merging its subtrees 241 + let merged = Self::merge(new_left, new_right); 242 + (merged, left_size + right_size) 243 + } 244 + } 245 + } 246 + } 247 + } 248 + 249 + impl<T: Ord> Default for Treap<T> { 250 + fn default() -> Self { 251 + Self::new() 252 + } 253 + } 254 + 255 + #[cfg(test)] 256 + mod tests { 257 + use super::*; 258 + use rand::rngs::StdRng; 259 + use rand::SeedableRng; 260 + 261 + #[test] 262 + fn test_insert_and_contains() { 263 + let mut treap = Treap::new(); 264 + let mut rng = StdRng::seed_from_u64(42); 265 + 266 + treap.insert(5, &mut rng); 267 + treap.insert(3, &mut rng); 268 + treap.insert(7, &mut rng); 269 + 270 + assert!(treap.contains(&5)); 271 + assert!(treap.contains(&3)); 272 + assert!(treap.contains(&7)); 273 + assert!(!treap.contains(&1)); 274 + assert_eq!(treap.len(), 3); 275 + } 276 + 277 + #[test] 278 + fn test_remove() { 279 + let mut treap = Treap::new(); 280 + let mut rng = StdRng::seed_from_u64(42); 281 + 282 + treap.insert(5, &mut rng); 283 + treap.insert(3, &mut rng); 284 + treap.insert(7, &mut rng); 285 + 286 + assert!(treap.remove(&3)); 287 + assert!(!treap.contains(&3)); 288 + assert_eq!(treap.len(), 2); 289 + 290 + assert!(!treap.remove(&3)); // Already removed 291 + } 292 + 293 + #[test] 294 + fn test_retain() { 295 + let mut treap = Treap::new(); 296 + let mut rng = StdRng::seed_from_u64(42); 297 + 298 + for i in 0..10 { 299 + treap.insert(i, &mut rng); 300 + } 301 + 302 + treap.retain(|&x| x % 2 == 0); 303 + assert_eq!(treap.len(), 5); 304 + 305 + for i in 0..10 { 306 + if i % 2 == 0 { 307 + assert!(treap.contains(&i)); 308 + } else { 309 + assert!(!treap.contains(&i)); 310 + } 311 + } 312 + } 313 + }