Rust implementation of the CVM algorithm for counting distinct elements in a stream
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 17 kB View raw
1//! A randomized binary search tree (treap) implementation 2//! 3//! A treap maintains both BST property (for keys) and heap property (for priorities). 4//! 5//! This implementation was inspired by the treap exploration in <https://github.com/apanda/cvm> 6//! (BSD-2-Clause license), but is an independent implementation tailored specifically 7//! for the CVM algorithm's requirements. 8//! 9//! ## Key Differences from apanda/cvm treap: 10//! 11//! 1. **Simpler structure**: We don't use a separate Element type; keys and priorities are 12//! stored directly in nodes 13//! 2. **Random priorities**: apanda's implementation expects explicit priorities, while ours 14//! generates random priorities at insertion time 15//! 3. **No allocation tracking**: apanda uses `alloc_counter` for performance analysis 16//! 4. **Simplified delete**: Our delete returns a bool, apanda's has more complex handling 17//! 5. **Retain operation**: We added a specialized `retain` method for CVM's "clear half" 18//! 6. **No Display trait**: We focus on the minimal API needed for CVM 19//! 7. **Insert behavior**: apanda's `insert_or_replace` updates existing elements; ours 20//! keeps the original (no update) which is what CVM needs 21//! 22//! ## Design Decisions 23//! 24//! Unlike general-purpose treap implementations, this one is optimized for CVM: 25//! - No key-value mapping: CVM only needs to track unique elements 26//! - Simplified API: Only operations needed for CVM are implemented 27//! - Efficient `retain`: Optimized for the "clear about half" operation 28//! - RNG integration: Accepts an external RNG for consistent randomness 29 30use rand::Rng; 31use std::cmp::Ordering; 32 33/// A node in the treap 34struct Node<T> { 35 key: T, 36 priority: u32, 37 left: Option<Box<Node<T>>>, 38 right: Option<Box<Node<T>>>, 39} 40 41impl<T> Node<T> { 42 fn new(key: T, priority: u32) -> Self { 43 Node { 44 key, 45 priority, 46 left: None, 47 right: None, 48 } 49 } 50} 51 52/// A treap data structure 53/// 54/// Key differences from typical treap implementations: 55/// 1. Priorities are generated at insertion time using the provided RNG 56/// 2. The `retain` operation is optimized for the CVM algorithm's "clear half" operation 57/// 3. No support for key-value pairs - only keys are stored (values are implicit) 58/// 4. No split operation as it's not needed for CVM 59/// 5. Insert doesn't update existing keys - matching CVM's requirement 60pub struct Treap<T> { 61 root: Option<Box<Node<T>>>, 62 size: usize, 63} 64 65impl<T: Ord> Treap<T> { 66 /// Create a new empty treap 67 pub fn new() -> Self { 68 Treap { 69 root: None, 70 size: 0, 71 } 72 } 73 74 /// Get the number of elements in the treap 75 pub fn len(&self) -> usize { 76 self.size 77 } 78 79 /// Check if the treap is empty 80 #[allow(dead_code)] 81 pub fn is_empty(&self) -> bool { 82 self.size == 0 83 } 84 85 /// Insert a key with a random priority 86 /// 87 /// Returns `true` if the key was inserted, `false` if it already existed. 88 pub fn insert<R: Rng>(&mut self, key: T, rng: &mut R) -> bool { 89 let priority = rng.random(); 90 let (new_root, inserted) = Self::insert_node(self.root.take(), key, priority); 91 self.root = new_root; 92 if inserted { 93 self.size += 1; 94 } 95 inserted 96 } 97 98 /// Check if the treap contains a key 99 pub fn contains(&self, key: &T) -> bool { 100 Self::contains_node(&self.root, key) 101 } 102 103 /// Remove a key from the treap 104 pub fn remove(&mut self, key: &T) -> bool { 105 let (new_root, removed) = Self::remove_node(self.root.take(), key); 106 self.root = new_root; 107 if removed { 108 self.size -= 1; 109 } 110 removed 111 } 112 113 /// Clear the treap 114 #[allow(dead_code)] 115 pub fn clear(&mut self) { 116 self.root = None; 117 self.size = 0; 118 } 119 120 /// Apply a function to each element, removing those for which it returns false 121 pub fn retain<F>(&mut self, mut f: F) 122 where 123 F: FnMut(&T) -> bool, 124 { 125 let (new_root, new_size) = Self::retain_node(self.root.take(), &mut f); 126 self.root = new_root; 127 self.size = new_size; 128 } 129 130 // Helper function to insert a node 131 // Returns (new_tree, was_inserted) tuple 132 fn insert_node( 133 node: Option<Box<Node<T>>>, 134 key: T, 135 priority: u32, 136 ) -> (Option<Box<Node<T>>>, bool) { 137 match node { 138 None => (Some(Box::new(Node::new(key, priority))), true), 139 Some(mut n) => match key.cmp(&n.key) { 140 Ordering::Less => { 141 let (new_left, inserted) = Self::insert_node(n.left, key, priority); 142 n.left = new_left; 143 // Maintain heap property (only rotate if we actually inserted) 144 let result = if inserted && n.left.as_ref().unwrap().priority > n.priority { 145 Self::rotate_right(n) 146 } else { 147 Some(n) 148 }; 149 (result, inserted) 150 } 151 Ordering::Greater => { 152 let (new_right, inserted) = Self::insert_node(n.right, key, priority); 153 n.right = new_right; 154 // Maintain heap property (only rotate if we actually inserted) 155 let result = if inserted && n.right.as_ref().unwrap().priority > n.priority { 156 Self::rotate_left(n) 157 } else { 158 Some(n) 159 }; 160 (result, inserted) 161 } 162 Ordering::Equal => (Some(n), false), // Key already exists, do nothing 163 }, 164 } 165 } 166 167 // Helper function to check if a node contains a key 168 fn contains_node(node: &Option<Box<Node<T>>>, key: &T) -> bool { 169 match node { 170 None => false, 171 Some(n) => match key.cmp(&n.key) { 172 Ordering::Less => Self::contains_node(&n.left, key), 173 Ordering::Greater => Self::contains_node(&n.right, key), 174 Ordering::Equal => true, 175 }, 176 } 177 } 178 179 // Helper function to remove a node 180 fn remove_node(node: Option<Box<Node<T>>>, key: &T) -> (Option<Box<Node<T>>>, bool) { 181 match node { 182 None => (None, false), 183 Some(mut n) => match key.cmp(&n.key) { 184 Ordering::Less => { 185 let (new_left, removed) = Self::remove_node(n.left, key); 186 n.left = new_left; 187 (Some(n), removed) 188 } 189 Ordering::Greater => { 190 let (new_right, removed) = Self::remove_node(n.right, key); 191 n.right = new_right; 192 (Some(n), removed) 193 } 194 Ordering::Equal => { 195 // Found the node to remove 196 (Self::merge(n.left, n.right), true) 197 } 198 }, 199 } 200 } 201 202 // Merge two subtrees 203 fn merge(left: Option<Box<Node<T>>>, right: Option<Box<Node<T>>>) -> Option<Box<Node<T>>> { 204 match (left, right) { 205 (None, right) => right, 206 (left, None) => left, 207 (Some(l), Some(r)) => { 208 if l.priority > r.priority { 209 let mut l = l; 210 l.right = Self::merge(l.right, Some(r)); 211 Some(l) 212 } else { 213 let mut r = r; 214 r.left = Self::merge(Some(l), r.left); 215 Some(r) 216 } 217 } 218 } 219 } 220 221 // Rotate right 222 fn rotate_right(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> { 223 let mut new_root = node.left.take().unwrap(); 224 node.left = new_root.right.take(); 225 new_root.right = Some(node); 226 Some(new_root) 227 } 228 229 // Rotate left 230 fn rotate_left(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> { 231 let mut new_root = node.right.take().unwrap(); 232 node.right = new_root.left.take(); 233 new_root.left = Some(node); 234 Some(new_root) 235 } 236 237 // Retain nodes that satisfy the predicate 238 fn retain_node<F>(node: Option<Box<Node<T>>>, f: &mut F) -> (Option<Box<Node<T>>>, usize) 239 where 240 F: FnMut(&T) -> bool, 241 { 242 match node { 243 None => (None, 0), 244 Some(mut n) => { 245 let (new_left, left_size) = Self::retain_node(n.left, f); 246 let (new_right, right_size) = Self::retain_node(n.right, f); 247 248 if f(&n.key) { 249 n.left = new_left; 250 n.right = new_right; 251 (Some(n), left_size + right_size + 1) 252 } else { 253 // Remove this node by merging its subtrees 254 let merged = Self::merge(new_left, new_right); 255 (merged, left_size + right_size) 256 } 257 } 258 } 259 } 260} 261 262impl<T: Ord> Default for Treap<T> { 263 fn default() -> Self { 264 Self::new() 265 } 266} 267 268#[cfg(test)] 269mod tests { 270 use super::*; 271 use rand::SeedableRng; 272 use rand::rngs::StdRng; 273 274 // Helper: collect keys via in-order traversal (should yield sorted order for valid BST) 275 fn collect_inorder<T: Ord + Clone>(node: &Option<Box<Node<T>>>) -> Vec<T> { 276 match node { 277 None => vec![], 278 Some(n) => { 279 let mut result = collect_inorder(&n.left); 280 result.push(n.key.clone()); 281 result.extend(collect_inorder(&n.right)); 282 result 283 } 284 } 285 } 286 287 // Helper: verify max-heap property (parent priority >= child priorities) 288 fn verify_heap_property<T: Ord>(node: &Option<Box<Node<T>>>) -> bool { 289 match node { 290 None => true, 291 Some(n) => { 292 let left_ok = n.left.as_ref().is_none_or(|l| n.priority >= l.priority); 293 let right_ok = n.right.as_ref().is_none_or(|r| n.priority >= r.priority); 294 left_ok 295 && right_ok 296 && verify_heap_property(&n.left) 297 && verify_heap_property(&n.right) 298 } 299 } 300 } 301 302 #[test] 303 fn test_insert_and_contains() { 304 let mut treap = Treap::new(); 305 let mut rng = StdRng::seed_from_u64(42); 306 307 treap.insert(5, &mut rng); 308 treap.insert(3, &mut rng); 309 treap.insert(7, &mut rng); 310 311 assert!(treap.contains(&5)); 312 assert!(treap.contains(&3)); 313 assert!(treap.contains(&7)); 314 assert!(!treap.contains(&1)); 315 assert_eq!(treap.len(), 3); 316 } 317 318 #[test] 319 fn test_remove() { 320 let mut treap = Treap::new(); 321 let mut rng = StdRng::seed_from_u64(42); 322 323 treap.insert(5, &mut rng); 324 treap.insert(3, &mut rng); 325 treap.insert(7, &mut rng); 326 327 assert!(treap.remove(&3)); 328 assert!(!treap.contains(&3)); 329 assert_eq!(treap.len(), 2); 330 331 assert!(!treap.remove(&3)); // Already removed 332 } 333 334 #[test] 335 fn test_retain() { 336 let mut treap = Treap::new(); 337 let mut rng = StdRng::seed_from_u64(42); 338 339 for i in 0..10 { 340 treap.insert(i, &mut rng); 341 } 342 343 treap.retain(|&x| x % 2 == 0); 344 assert_eq!(treap.len(), 5); 345 346 for i in 0..10 { 347 if i % 2 == 0 { 348 assert!(treap.contains(&i)); 349 } else { 350 assert!(!treap.contains(&i)); 351 } 352 } 353 } 354 355 #[test] 356 fn test_duplicate_insertion() { 357 let mut treap = Treap::new(); 358 let mut rng = StdRng::seed_from_u64(42); 359 360 // Insert elements 361 assert!(treap.insert(5, &mut rng)); // First insertion returns true 362 assert!(treap.insert(3, &mut rng)); 363 assert!(treap.insert(7, &mut rng)); 364 assert_eq!(treap.len(), 3); 365 366 // Try to insert duplicates - should return false and not change size 367 assert!(!treap.insert(5, &mut rng)); 368 assert!(!treap.insert(3, &mut rng)); 369 assert!(!treap.insert(7, &mut rng)); 370 assert_eq!(treap.len(), 3); // Size unchanged 371 372 // Verify elements still exist 373 assert!(treap.contains(&5)); 374 assert!(treap.contains(&3)); 375 assert!(treap.contains(&7)); 376 377 // Remove and re-insert should work 378 assert!(treap.remove(&5)); 379 assert_eq!(treap.len(), 2); 380 assert!(treap.insert(5, &mut rng)); // Re-insertion returns true 381 assert_eq!(treap.len(), 3); 382 } 383 384 #[test] 385 fn test_bst_property() { 386 let mut treap = Treap::new(); 387 let mut rng = StdRng::seed_from_u64(123); 388 389 // Insert elements in random order 390 let elements = vec![50, 25, 75, 10, 30, 60, 90, 5, 15, 27, 35]; 391 for elem in &elements { 392 treap.insert(*elem, &mut rng); 393 } 394 395 // In-order traversal should yield sorted keys 396 let inorder = collect_inorder(&treap.root); 397 let mut sorted = elements.clone(); 398 sorted.sort(); 399 assert_eq!(inorder, sorted); 400 401 // Test after some removals 402 treap.remove(&25); 403 treap.remove(&75); 404 let inorder_after = collect_inorder(&treap.root); 405 let expected: Vec<i32> = sorted.into_iter().filter(|&x| x != 25 && x != 75).collect(); 406 assert_eq!(inorder_after, expected); 407 } 408 409 #[test] 410 fn test_heap_property() { 411 let mut treap = Treap::new(); 412 let mut rng = StdRng::seed_from_u64(456); 413 414 // Insert many elements 415 for i in 0..100 { 416 treap.insert(i, &mut rng); 417 assert!( 418 verify_heap_property(&treap.root), 419 "Heap property violated after inserting {}", 420 i 421 ); 422 } 423 424 // Test after removals 425 for i in (0..100).step_by(3) { 426 treap.remove(&i); 427 assert!( 428 verify_heap_property(&treap.root), 429 "Heap property violated after removing {}", 430 i 431 ); 432 } 433 434 // Test after retain 435 treap.retain(|&x| x % 2 == 0); 436 assert!( 437 verify_heap_property(&treap.root), 438 "Heap property violated after retain" 439 ); 440 } 441 442 #[test] 443 fn test_stress_insert_remove() { 444 let mut treap = Treap::new(); 445 let mut rng = StdRng::seed_from_u64(789); 446 447 // Insert 1000 elements 448 for i in 0..1000 { 449 treap.insert(i, &mut rng); 450 } 451 assert_eq!(treap.len(), 1000); 452 453 // Verify all elements present 454 for i in 0..1000 { 455 assert!(treap.contains(&i), "Element {} should be present", i); 456 } 457 458 // Remove every other element 459 for i in (0..1000).step_by(2) { 460 assert!(treap.remove(&i), "Should remove {}", i); 461 } 462 assert_eq!(treap.len(), 500); 463 464 // Verify correct elements remain 465 for i in 0..1000 { 466 if i % 2 == 0 { 467 assert!(!treap.contains(&i), "Element {} should be removed", i); 468 } else { 469 assert!(treap.contains(&i), "Element {} should remain", i); 470 } 471 } 472 473 // Re-insert removed elements 474 for i in (0..1000).step_by(2) { 475 assert!(treap.insert(i, &mut rng), "Should insert {}", i); 476 } 477 assert_eq!(treap.len(), 1000); 478 479 // Verify invariants 480 let inorder = collect_inorder(&treap.root); 481 let expected: Vec<i32> = (0..1000).collect(); 482 assert_eq!(inorder, expected); 483 assert!(verify_heap_property(&treap.root)); 484 } 485 486 #[test] 487 fn test_empty_tree_operations() { 488 let mut treap: Treap<i32> = Treap::new(); 489 let mut rng = StdRng::seed_from_u64(999); 490 491 // Operations on empty treap 492 assert!(treap.is_empty()); 493 assert_eq!(treap.len(), 0); 494 assert!(!treap.contains(&42)); 495 assert!(!treap.remove(&42)); 496 497 // Retain on empty treap (should be no-op) 498 treap.retain(|_| true); 499 assert!(treap.is_empty()); 500 501 // Clear on empty treap 502 treap.clear(); 503 assert!(treap.is_empty()); 504 505 // Insert then clear 506 treap.insert(1, &mut rng); 507 treap.insert(2, &mut rng); 508 assert_eq!(treap.len(), 2); 509 treap.clear(); 510 assert!(treap.is_empty()); 511 assert!(!treap.contains(&1)); 512 assert!(!treap.contains(&2)); 513 } 514 515 #[test] 516 fn test_single_element() { 517 let mut treap = Treap::new(); 518 let mut rng = StdRng::seed_from_u64(111); 519 520 // Single element operations 521 treap.insert(42, &mut rng); 522 assert_eq!(treap.len(), 1); 523 assert!(treap.contains(&42)); 524 assert!(!treap.contains(&0)); 525 526 // Duplicate of single element 527 assert!(!treap.insert(42, &mut rng)); 528 assert_eq!(treap.len(), 1); 529 530 // Remove single element 531 assert!(treap.remove(&42)); 532 assert!(treap.is_empty()); 533 assert!(!treap.contains(&42)); 534 535 // Re-insert 536 treap.insert(42, &mut rng); 537 assert_eq!(treap.len(), 1); 538 539 // Retain that keeps the element 540 treap.retain(|&x| x == 42); 541 assert_eq!(treap.len(), 1); 542 543 // Retain that removes the element 544 treap.retain(|&x| x != 42); 545 assert!(treap.is_empty()); 546 } 547}