Rust implementation of the CVM algorithm for counting distinct elements in a stream
1//! A randomized binary search tree (treap) implementation
2//!
3//! A treap maintains both BST property (for keys) and heap property (for priorities).
4//!
5//! This implementation was inspired by the treap exploration in <https://github.com/apanda/cvm>
6//! (BSD-2-Clause license), but is an independent implementation tailored specifically
7//! for the CVM algorithm's requirements.
8//!
9//! ## Key Differences from apanda/cvm treap:
10//!
11//! 1. **Simpler structure**: We don't use a separate Element type; keys and priorities are
12//! stored directly in nodes
13//! 2. **Random priorities**: apanda's implementation expects explicit priorities, while ours
14//! generates random priorities at insertion time
15//! 3. **No allocation tracking**: apanda uses `alloc_counter` for performance analysis
16//! 4. **Simplified delete**: Our delete returns a bool, apanda's has more complex handling
17//! 5. **Retain operation**: We added a specialized `retain` method for CVM's "clear half"
18//! 6. **No Display trait**: We focus on the minimal API needed for CVM
19//! 7. **Insert behavior**: apanda's `insert_or_replace` updates existing elements; ours
20//! keeps the original (no update) which is what CVM needs
21//!
22//! ## Design Decisions
23//!
24//! Unlike general-purpose treap implementations, this one is optimized for CVM:
25//! - No key-value mapping: CVM only needs to track unique elements
26//! - Simplified API: Only operations needed for CVM are implemented
27//! - Efficient `retain`: Optimized for the "clear about half" operation
28//! - RNG integration: Accepts an external RNG for consistent randomness
29
30use rand::Rng;
31use std::cmp::Ordering;
32
33/// A node in the treap
34struct Node<T> {
35 key: T,
36 priority: u32,
37 left: Option<Box<Node<T>>>,
38 right: Option<Box<Node<T>>>,
39}
40
41impl<T> Node<T> {
42 fn new(key: T, priority: u32) -> Self {
43 Node {
44 key,
45 priority,
46 left: None,
47 right: None,
48 }
49 }
50}
51
52/// A treap data structure
53///
54/// Key differences from typical treap implementations:
55/// 1. Priorities are generated at insertion time using the provided RNG
56/// 2. The `retain` operation is optimized for the CVM algorithm's "clear half" operation
57/// 3. No support for key-value pairs - only keys are stored (values are implicit)
58/// 4. No split operation as it's not needed for CVM
59/// 5. Insert doesn't update existing keys - matching CVM's requirement
60pub struct Treap<T> {
61 root: Option<Box<Node<T>>>,
62 size: usize,
63}
64
65impl<T: Ord> Treap<T> {
66 /// Create a new empty treap
67 pub fn new() -> Self {
68 Treap {
69 root: None,
70 size: 0,
71 }
72 }
73
74 /// Get the number of elements in the treap
75 pub fn len(&self) -> usize {
76 self.size
77 }
78
79 /// Check if the treap is empty
80 #[allow(dead_code)]
81 pub fn is_empty(&self) -> bool {
82 self.size == 0
83 }
84
85 /// Insert a key with a random priority
86 ///
87 /// Returns `true` if the key was inserted, `false` if it already existed.
88 pub fn insert<R: Rng>(&mut self, key: T, rng: &mut R) -> bool {
89 let priority = rng.random();
90 let (new_root, inserted) = Self::insert_node(self.root.take(), key, priority);
91 self.root = new_root;
92 if inserted {
93 self.size += 1;
94 }
95 inserted
96 }
97
98 /// Check if the treap contains a key
99 pub fn contains(&self, key: &T) -> bool {
100 Self::contains_node(&self.root, key)
101 }
102
103 /// Remove a key from the treap
104 pub fn remove(&mut self, key: &T) -> bool {
105 let (new_root, removed) = Self::remove_node(self.root.take(), key);
106 self.root = new_root;
107 if removed {
108 self.size -= 1;
109 }
110 removed
111 }
112
113 /// Clear the treap
114 #[allow(dead_code)]
115 pub fn clear(&mut self) {
116 self.root = None;
117 self.size = 0;
118 }
119
120 /// Apply a function to each element, removing those for which it returns false
121 pub fn retain<F>(&mut self, mut f: F)
122 where
123 F: FnMut(&T) -> bool,
124 {
125 let (new_root, new_size) = Self::retain_node(self.root.take(), &mut f);
126 self.root = new_root;
127 self.size = new_size;
128 }
129
130 // Helper function to insert a node
131 // Returns (new_tree, was_inserted) tuple
132 fn insert_node(
133 node: Option<Box<Node<T>>>,
134 key: T,
135 priority: u32,
136 ) -> (Option<Box<Node<T>>>, bool) {
137 match node {
138 None => (Some(Box::new(Node::new(key, priority))), true),
139 Some(mut n) => match key.cmp(&n.key) {
140 Ordering::Less => {
141 let (new_left, inserted) = Self::insert_node(n.left, key, priority);
142 n.left = new_left;
143 // Maintain heap property (only rotate if we actually inserted)
144 let result = if inserted && n.left.as_ref().unwrap().priority > n.priority {
145 Self::rotate_right(n)
146 } else {
147 Some(n)
148 };
149 (result, inserted)
150 }
151 Ordering::Greater => {
152 let (new_right, inserted) = Self::insert_node(n.right, key, priority);
153 n.right = new_right;
154 // Maintain heap property (only rotate if we actually inserted)
155 let result = if inserted && n.right.as_ref().unwrap().priority > n.priority {
156 Self::rotate_left(n)
157 } else {
158 Some(n)
159 };
160 (result, inserted)
161 }
162 Ordering::Equal => (Some(n), false), // Key already exists, do nothing
163 },
164 }
165 }
166
167 // Helper function to check if a node contains a key
168 fn contains_node(node: &Option<Box<Node<T>>>, key: &T) -> bool {
169 match node {
170 None => false,
171 Some(n) => match key.cmp(&n.key) {
172 Ordering::Less => Self::contains_node(&n.left, key),
173 Ordering::Greater => Self::contains_node(&n.right, key),
174 Ordering::Equal => true,
175 },
176 }
177 }
178
179 // Helper function to remove a node
180 fn remove_node(node: Option<Box<Node<T>>>, key: &T) -> (Option<Box<Node<T>>>, bool) {
181 match node {
182 None => (None, false),
183 Some(mut n) => match key.cmp(&n.key) {
184 Ordering::Less => {
185 let (new_left, removed) = Self::remove_node(n.left, key);
186 n.left = new_left;
187 (Some(n), removed)
188 }
189 Ordering::Greater => {
190 let (new_right, removed) = Self::remove_node(n.right, key);
191 n.right = new_right;
192 (Some(n), removed)
193 }
194 Ordering::Equal => {
195 // Found the node to remove
196 (Self::merge(n.left, n.right), true)
197 }
198 },
199 }
200 }
201
202 // Merge two subtrees
203 fn merge(left: Option<Box<Node<T>>>, right: Option<Box<Node<T>>>) -> Option<Box<Node<T>>> {
204 match (left, right) {
205 (None, right) => right,
206 (left, None) => left,
207 (Some(l), Some(r)) => {
208 if l.priority > r.priority {
209 let mut l = l;
210 l.right = Self::merge(l.right, Some(r));
211 Some(l)
212 } else {
213 let mut r = r;
214 r.left = Self::merge(Some(l), r.left);
215 Some(r)
216 }
217 }
218 }
219 }
220
221 // Rotate right
222 fn rotate_right(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> {
223 let mut new_root = node.left.take().unwrap();
224 node.left = new_root.right.take();
225 new_root.right = Some(node);
226 Some(new_root)
227 }
228
229 // Rotate left
230 fn rotate_left(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> {
231 let mut new_root = node.right.take().unwrap();
232 node.right = new_root.left.take();
233 new_root.left = Some(node);
234 Some(new_root)
235 }
236
237 // Retain nodes that satisfy the predicate
238 fn retain_node<F>(node: Option<Box<Node<T>>>, f: &mut F) -> (Option<Box<Node<T>>>, usize)
239 where
240 F: FnMut(&T) -> bool,
241 {
242 match node {
243 None => (None, 0),
244 Some(mut n) => {
245 let (new_left, left_size) = Self::retain_node(n.left, f);
246 let (new_right, right_size) = Self::retain_node(n.right, f);
247
248 if f(&n.key) {
249 n.left = new_left;
250 n.right = new_right;
251 (Some(n), left_size + right_size + 1)
252 } else {
253 // Remove this node by merging its subtrees
254 let merged = Self::merge(new_left, new_right);
255 (merged, left_size + right_size)
256 }
257 }
258 }
259 }
260}
261
262impl<T: Ord> Default for Treap<T> {
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271 use rand::SeedableRng;
272 use rand::rngs::StdRng;
273
274 // Helper: collect keys via in-order traversal (should yield sorted order for valid BST)
275 fn collect_inorder<T: Ord + Clone>(node: &Option<Box<Node<T>>>) -> Vec<T> {
276 match node {
277 None => vec![],
278 Some(n) => {
279 let mut result = collect_inorder(&n.left);
280 result.push(n.key.clone());
281 result.extend(collect_inorder(&n.right));
282 result
283 }
284 }
285 }
286
287 // Helper: verify max-heap property (parent priority >= child priorities)
288 fn verify_heap_property<T: Ord>(node: &Option<Box<Node<T>>>) -> bool {
289 match node {
290 None => true,
291 Some(n) => {
292 let left_ok = n.left.as_ref().is_none_or(|l| n.priority >= l.priority);
293 let right_ok = n.right.as_ref().is_none_or(|r| n.priority >= r.priority);
294 left_ok
295 && right_ok
296 && verify_heap_property(&n.left)
297 && verify_heap_property(&n.right)
298 }
299 }
300 }
301
302 #[test]
303 fn test_insert_and_contains() {
304 let mut treap = Treap::new();
305 let mut rng = StdRng::seed_from_u64(42);
306
307 treap.insert(5, &mut rng);
308 treap.insert(3, &mut rng);
309 treap.insert(7, &mut rng);
310
311 assert!(treap.contains(&5));
312 assert!(treap.contains(&3));
313 assert!(treap.contains(&7));
314 assert!(!treap.contains(&1));
315 assert_eq!(treap.len(), 3);
316 }
317
318 #[test]
319 fn test_remove() {
320 let mut treap = Treap::new();
321 let mut rng = StdRng::seed_from_u64(42);
322
323 treap.insert(5, &mut rng);
324 treap.insert(3, &mut rng);
325 treap.insert(7, &mut rng);
326
327 assert!(treap.remove(&3));
328 assert!(!treap.contains(&3));
329 assert_eq!(treap.len(), 2);
330
331 assert!(!treap.remove(&3)); // Already removed
332 }
333
334 #[test]
335 fn test_retain() {
336 let mut treap = Treap::new();
337 let mut rng = StdRng::seed_from_u64(42);
338
339 for i in 0..10 {
340 treap.insert(i, &mut rng);
341 }
342
343 treap.retain(|&x| x % 2 == 0);
344 assert_eq!(treap.len(), 5);
345
346 for i in 0..10 {
347 if i % 2 == 0 {
348 assert!(treap.contains(&i));
349 } else {
350 assert!(!treap.contains(&i));
351 }
352 }
353 }
354
355 #[test]
356 fn test_duplicate_insertion() {
357 let mut treap = Treap::new();
358 let mut rng = StdRng::seed_from_u64(42);
359
360 // Insert elements
361 assert!(treap.insert(5, &mut rng)); // First insertion returns true
362 assert!(treap.insert(3, &mut rng));
363 assert!(treap.insert(7, &mut rng));
364 assert_eq!(treap.len(), 3);
365
366 // Try to insert duplicates - should return false and not change size
367 assert!(!treap.insert(5, &mut rng));
368 assert!(!treap.insert(3, &mut rng));
369 assert!(!treap.insert(7, &mut rng));
370 assert_eq!(treap.len(), 3); // Size unchanged
371
372 // Verify elements still exist
373 assert!(treap.contains(&5));
374 assert!(treap.contains(&3));
375 assert!(treap.contains(&7));
376
377 // Remove and re-insert should work
378 assert!(treap.remove(&5));
379 assert_eq!(treap.len(), 2);
380 assert!(treap.insert(5, &mut rng)); // Re-insertion returns true
381 assert_eq!(treap.len(), 3);
382 }
383
384 #[test]
385 fn test_bst_property() {
386 let mut treap = Treap::new();
387 let mut rng = StdRng::seed_from_u64(123);
388
389 // Insert elements in random order
390 let elements = vec![50, 25, 75, 10, 30, 60, 90, 5, 15, 27, 35];
391 for elem in &elements {
392 treap.insert(*elem, &mut rng);
393 }
394
395 // In-order traversal should yield sorted keys
396 let inorder = collect_inorder(&treap.root);
397 let mut sorted = elements.clone();
398 sorted.sort();
399 assert_eq!(inorder, sorted);
400
401 // Test after some removals
402 treap.remove(&25);
403 treap.remove(&75);
404 let inorder_after = collect_inorder(&treap.root);
405 let expected: Vec<i32> = sorted.into_iter().filter(|&x| x != 25 && x != 75).collect();
406 assert_eq!(inorder_after, expected);
407 }
408
409 #[test]
410 fn test_heap_property() {
411 let mut treap = Treap::new();
412 let mut rng = StdRng::seed_from_u64(456);
413
414 // Insert many elements
415 for i in 0..100 {
416 treap.insert(i, &mut rng);
417 assert!(
418 verify_heap_property(&treap.root),
419 "Heap property violated after inserting {}",
420 i
421 );
422 }
423
424 // Test after removals
425 for i in (0..100).step_by(3) {
426 treap.remove(&i);
427 assert!(
428 verify_heap_property(&treap.root),
429 "Heap property violated after removing {}",
430 i
431 );
432 }
433
434 // Test after retain
435 treap.retain(|&x| x % 2 == 0);
436 assert!(
437 verify_heap_property(&treap.root),
438 "Heap property violated after retain"
439 );
440 }
441
442 #[test]
443 fn test_stress_insert_remove() {
444 let mut treap = Treap::new();
445 let mut rng = StdRng::seed_from_u64(789);
446
447 // Insert 1000 elements
448 for i in 0..1000 {
449 treap.insert(i, &mut rng);
450 }
451 assert_eq!(treap.len(), 1000);
452
453 // Verify all elements present
454 for i in 0..1000 {
455 assert!(treap.contains(&i), "Element {} should be present", i);
456 }
457
458 // Remove every other element
459 for i in (0..1000).step_by(2) {
460 assert!(treap.remove(&i), "Should remove {}", i);
461 }
462 assert_eq!(treap.len(), 500);
463
464 // Verify correct elements remain
465 for i in 0..1000 {
466 if i % 2 == 0 {
467 assert!(!treap.contains(&i), "Element {} should be removed", i);
468 } else {
469 assert!(treap.contains(&i), "Element {} should remain", i);
470 }
471 }
472
473 // Re-insert removed elements
474 for i in (0..1000).step_by(2) {
475 assert!(treap.insert(i, &mut rng), "Should insert {}", i);
476 }
477 assert_eq!(treap.len(), 1000);
478
479 // Verify invariants
480 let inorder = collect_inorder(&treap.root);
481 let expected: Vec<i32> = (0..1000).collect();
482 assert_eq!(inorder, expected);
483 assert!(verify_heap_property(&treap.root));
484 }
485
486 #[test]
487 fn test_empty_tree_operations() {
488 let mut treap: Treap<i32> = Treap::new();
489 let mut rng = StdRng::seed_from_u64(999);
490
491 // Operations on empty treap
492 assert!(treap.is_empty());
493 assert_eq!(treap.len(), 0);
494 assert!(!treap.contains(&42));
495 assert!(!treap.remove(&42));
496
497 // Retain on empty treap (should be no-op)
498 treap.retain(|_| true);
499 assert!(treap.is_empty());
500
501 // Clear on empty treap
502 treap.clear();
503 assert!(treap.is_empty());
504
505 // Insert then clear
506 treap.insert(1, &mut rng);
507 treap.insert(2, &mut rng);
508 assert_eq!(treap.len(), 2);
509 treap.clear();
510 assert!(treap.is_empty());
511 assert!(!treap.contains(&1));
512 assert!(!treap.contains(&2));
513 }
514
515 #[test]
516 fn test_single_element() {
517 let mut treap = Treap::new();
518 let mut rng = StdRng::seed_from_u64(111);
519
520 // Single element operations
521 treap.insert(42, &mut rng);
522 assert_eq!(treap.len(), 1);
523 assert!(treap.contains(&42));
524 assert!(!treap.contains(&0));
525
526 // Duplicate of single element
527 assert!(!treap.insert(42, &mut rng));
528 assert_eq!(treap.len(), 1);
529
530 // Remove single element
531 assert!(treap.remove(&42));
532 assert!(treap.is_empty());
533 assert!(!treap.contains(&42));
534
535 // Re-insert
536 treap.insert(42, &mut rng);
537 assert_eq!(treap.len(), 1);
538
539 // Retain that keeps the element
540 treap.retain(|&x| x == 42);
541 assert_eq!(treap.len(), 1);
542
543 // Retain that removes the element
544 treap.retain(|&x| x != 42);
545 assert!(treap.is_empty());
546 }
547}