Rust implementation of the CVM algorithm for counting distinct elements in a stream
0

Configure Feed

Select the types of activity you want to include in your feed.

Initial pass at treap

Note differences from far more thorough implementation at https://github.com/apanda/cvm

author
Stephan Hügel
date (Jun 29, 2025, 10:35 PM +0100) commit 12c964c2 parent 9aeebd7d change-id nopxlssp
+313
+313
src/treap.rs
··· 1 + //! A randomized binary search tree (treap) implementation 2 + //! 3 + //! A treap maintains both BST property (for keys) and heap property (for priorities). 4 + //! 5 + //! This implementation was inspired by the treap exploration in <https://github.com/apanda/cvm> 6 + //! (BSD-2-Clause license), but is an independent implementation tailored specifically 7 + //! for the CVM algorithm's requirements. 8 + //! 9 + //! ## Key Differences from apanda/cvm treap: 10 + //! 11 + //! 1. **Simpler structure**: We don't use a separate Element type; keys and priorities are 12 + //! stored directly in nodes 13 + //! 2. **Random priorities**: apanda's implementation expects explicit priorities, while ours 14 + //! generates random priorities at insertion time 15 + //! 3. **No allocation tracking**: apanda uses `alloc_counter` for performance analysis 16 + //! 4. **Simplified delete**: Our delete returns a bool, apanda's has more complex handling 17 + //! 5. **Retain operation**: We added a specialized `retain` method for CVM's "clear half" 18 + //! 6. **No Display trait**: We focus on the minimal API needed for CVM 19 + //! 7. **Insert behavior**: apanda's `insert_or_replace` updates existing elements; ours 20 + //! keeps the original (no update) which is what CVM needs 21 + //! 22 + //! ## Design Decisions 23 + //! 24 + //! Unlike general-purpose treap implementations, this one is optimized for CVM: 25 + //! - No key-value mapping: CVM only needs to track unique elements 26 + //! - Simplified API: Only operations needed for CVM are implemented 27 + //! - Efficient `retain`: Optimized for the "clear about half" operation 28 + //! - RNG integration: Accepts an external RNG for consistent randomness 29 + 30 + use rand::Rng; 31 + use std::cmp::Ordering; 32 + 33 + /// A node in the treap 34 + struct Node<T> { 35 + key: T, 36 + priority: u32, 37 + left: Option<Box<Node<T>>>, 38 + right: Option<Box<Node<T>>>, 39 + } 40 + 41 + impl<T> Node<T> { 42 + fn new(key: T, priority: u32) -> Self { 43 + Node { 44 + key, 45 + priority, 46 + left: None, 47 + right: None, 48 + } 49 + } 50 + } 51 + 52 + /// A treap data structure 53 + /// 54 + /// Key differences from typical treap implementations: 55 + /// 1. Priorities are generated at insertion time using the provided RNG 56 + /// 2. The `retain` operation is optimized for the CVM algorithm's "clear half" operation 57 + /// 3. No support for key-value pairs - only keys are stored (values are implicit) 58 + /// 4. No split operation as it's not needed for CVM 59 + /// 5. Insert doesn't update existing keys - matching CVM's requirement 60 + pub struct Treap<T> { 61 + root: Option<Box<Node<T>>>, 62 + size: usize, 63 + } 64 + 65 + impl<T: Ord> Treap<T> { 66 + /// Create a new empty treap 67 + pub fn new() -> Self { 68 + Treap { 69 + root: None, 70 + size: 0, 71 + } 72 + } 73 + 74 + /// Get the number of elements in the treap 75 + pub fn len(&self) -> usize { 76 + self.size 77 + } 78 + 79 + /// Check if the treap is empty 80 + #[allow(dead_code)] 81 + pub fn is_empty(&self) -> bool { 82 + self.size == 0 83 + } 84 + 85 + /// Insert a key with a random priority 86 + pub fn insert<R: Rng>(&mut self, key: T, rng: &mut R) { 87 + let priority = rng.gen(); 88 + self.root = Self::insert_node(self.root.take(), key, priority); 89 + self.size += 1; 90 + } 91 + 92 + /// Check if the treap contains a key 93 + pub fn contains(&self, key: &T) -> bool { 94 + Self::contains_node(&self.root, key) 95 + } 96 + 97 + /// Remove a key from the treap 98 + pub fn remove(&mut self, key: &T) -> bool { 99 + let (new_root, removed) = Self::remove_node(self.root.take(), key); 100 + self.root = new_root; 101 + if removed { 102 + self.size -= 1; 103 + } 104 + removed 105 + } 106 + 107 + /// Clear the treap 108 + #[allow(dead_code)] 109 + pub fn clear(&mut self) { 110 + self.root = None; 111 + self.size = 0; 112 + } 113 + 114 + /// Apply a function to each element, removing those for which it returns false 115 + pub fn retain<F>(&mut self, mut f: F) 116 + where 117 + F: FnMut(&T) -> bool, 118 + { 119 + let (new_root, new_size) = Self::retain_node(self.root.take(), &mut f); 120 + self.root = new_root; 121 + self.size = new_size; 122 + } 123 + 124 + // Helper function to insert a node 125 + fn insert_node(node: Option<Box<Node<T>>>, key: T, priority: u32) -> Option<Box<Node<T>>> { 126 + match node { 127 + None => Some(Box::new(Node::new(key, priority))), 128 + Some(mut n) => { 129 + match key.cmp(&n.key) { 130 + Ordering::Less => { 131 + n.left = Self::insert_node(n.left, key, priority); 132 + // Maintain heap property 133 + if n.left.as_ref().unwrap().priority > n.priority { 134 + Self::rotate_right(n) 135 + } else { 136 + Some(n) 137 + } 138 + } 139 + Ordering::Greater => { 140 + n.right = Self::insert_node(n.right, key, priority); 141 + // Maintain heap property 142 + if n.right.as_ref().unwrap().priority > n.priority { 143 + Self::rotate_left(n) 144 + } else { 145 + Some(n) 146 + } 147 + } 148 + Ordering::Equal => Some(n), // Key already exists, do nothing 149 + } 150 + } 151 + } 152 + } 153 + 154 + // Helper function to check if a node contains a key 155 + fn contains_node(node: &Option<Box<Node<T>>>, key: &T) -> bool { 156 + match node { 157 + None => false, 158 + Some(n) => match key.cmp(&n.key) { 159 + Ordering::Less => Self::contains_node(&n.left, key), 160 + Ordering::Greater => Self::contains_node(&n.right, key), 161 + Ordering::Equal => true, 162 + }, 163 + } 164 + } 165 + 166 + // Helper function to remove a node 167 + fn remove_node(node: Option<Box<Node<T>>>, key: &T) -> (Option<Box<Node<T>>>, bool) { 168 + match node { 169 + None => (None, false), 170 + Some(mut n) => match key.cmp(&n.key) { 171 + Ordering::Less => { 172 + let (new_left, removed) = Self::remove_node(n.left, key); 173 + n.left = new_left; 174 + (Some(n), removed) 175 + } 176 + Ordering::Greater => { 177 + let (new_right, removed) = Self::remove_node(n.right, key); 178 + n.right = new_right; 179 + (Some(n), removed) 180 + } 181 + Ordering::Equal => { 182 + // Found the node to remove 183 + (Self::merge(n.left, n.right), true) 184 + } 185 + }, 186 + } 187 + } 188 + 189 + // Merge two subtrees 190 + fn merge(left: Option<Box<Node<T>>>, right: Option<Box<Node<T>>>) -> Option<Box<Node<T>>> { 191 + match (left, right) { 192 + (None, right) => right, 193 + (left, None) => left, 194 + (Some(l), Some(r)) => { 195 + if l.priority > r.priority { 196 + let mut l = l; 197 + l.right = Self::merge(l.right, Some(r)); 198 + Some(l) 199 + } else { 200 + let mut r = r; 201 + r.left = Self::merge(Some(l), r.left); 202 + Some(r) 203 + } 204 + } 205 + } 206 + } 207 + 208 + // Rotate right 209 + fn rotate_right(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> { 210 + let mut new_root = node.left.take().unwrap(); 211 + node.left = new_root.right.take(); 212 + new_root.right = Some(node); 213 + Some(new_root) 214 + } 215 + 216 + // Rotate left 217 + fn rotate_left(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> { 218 + let mut new_root = node.right.take().unwrap(); 219 + node.right = new_root.left.take(); 220 + new_root.left = Some(node); 221 + Some(new_root) 222 + } 223 + 224 + // Retain nodes that satisfy the predicate 225 + fn retain_node<F>(node: Option<Box<Node<T>>>, f: &mut F) -> (Option<Box<Node<T>>>, usize) 226 + where 227 + F: FnMut(&T) -> bool, 228 + { 229 + match node { 230 + None => (None, 0), 231 + Some(mut n) => { 232 + let (new_left, left_size) = Self::retain_node(n.left, f); 233 + let (new_right, right_size) = Self::retain_node(n.right, f); 234 + 235 + if f(&n.key) { 236 + n.left = new_left; 237 + n.right = new_right; 238 + (Some(n), left_size + right_size + 1) 239 + } else { 240 + // Remove this node by merging its subtrees 241 + let merged = Self::merge(new_left, new_right); 242 + (merged, left_size + right_size) 243 + } 244 + } 245 + } 246 + } 247 + } 248 + 249 + impl<T: Ord> Default for Treap<T> { 250 + fn default() -> Self { 251 + Self::new() 252 + } 253 + } 254 + 255 + #[cfg(test)] 256 + mod tests { 257 + use super::*; 258 + use rand::rngs::StdRng; 259 + use rand::SeedableRng; 260 + 261 + #[test] 262 + fn test_insert_and_contains() { 263 + let mut treap = Treap::new(); 264 + let mut rng = StdRng::seed_from_u64(42); 265 + 266 + treap.insert(5, &mut rng); 267 + treap.insert(3, &mut rng); 268 + treap.insert(7, &mut rng); 269 + 270 + assert!(treap.contains(&5)); 271 + assert!(treap.contains(&3)); 272 + assert!(treap.contains(&7)); 273 + assert!(!treap.contains(&1)); 274 + assert_eq!(treap.len(), 3); 275 + } 276 + 277 + #[test] 278 + fn test_remove() { 279 + let mut treap = Treap::new(); 280 + let mut rng = StdRng::seed_from_u64(42); 281 + 282 + treap.insert(5, &mut rng); 283 + treap.insert(3, &mut rng); 284 + treap.insert(7, &mut rng); 285 + 286 + assert!(treap.remove(&3)); 287 + assert!(!treap.contains(&3)); 288 + assert_eq!(treap.len(), 2); 289 + 290 + assert!(!treap.remove(&3)); // Already removed 291 + } 292 + 293 + #[test] 294 + fn test_retain() { 295 + let mut treap = Treap::new(); 296 + let mut rng = StdRng::seed_from_u64(42); 297 + 298 + for i in 0..10 { 299 + treap.insert(i, &mut rng); 300 + } 301 + 302 + treap.retain(|&x| x % 2 == 0); 303 + assert_eq!(treap.len(), 5); 304 + 305 + for i in 0..10 { 306 + if i % 2 == 0 { 307 + assert!(treap.contains(&i)); 308 + } else { 309 + assert!(!treap.contains(&i)); 310 + } 311 + } 312 + } 313 + }