Rust implementation of the CVM algorithm for counting distinct elements in a stream
1//! An implementation of the CVM fast element counting algorithm presented in
2//! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. <https://doi.org/10.4230/LIPIcs.ESA.2022.34>
3//!
4//! This implementation uses a treap data structure as the buffer, following Knuth's original design.
5
6mod treap;
7
8use crate::treap::Treap;
9use rand::Rng;
10use rand::SeedableRng;
11use rand::rngs::StdRng;
12
13/// Specification for confidence level in the CVM algorithm
14#[derive(Debug, Clone, Copy)]
15pub enum ConfidenceSpec {
16 /// Specify delta directly (probability of failure)
17 Delta(f64),
18 /// Specify confidence level (probability of success)
19 Confidence(f64),
20}
21
22impl ConfidenceSpec {
23 /// Convert to delta value for internal use
24 fn to_delta(self) -> f64 {
25 match self {
26 ConfidenceSpec::Delta(delta) => delta,
27 ConfidenceSpec::Confidence(confidence) => 1.0 - confidence,
28 }
29 }
30
31 /// Validate the confidence specification
32 fn validate(self) -> Result<Self, String> {
33 match self {
34 ConfidenceSpec::Delta(delta) => {
35 if delta <= 0.0 || delta >= 1.0 {
36 Err("Delta must be between 0.0 and 1.0 (exclusive)".to_string())
37 } else {
38 Ok(self)
39 }
40 }
41 ConfidenceSpec::Confidence(confidence) => {
42 if confidence <= 0.0 || confidence >= 1.0 {
43 Err("Confidence must be between 0.0 and 1.0 (exclusive)".to_string())
44 } else {
45 Ok(self)
46 }
47 }
48 }
49 }
50}
51
52/// Builder for constructing CVM instances with validation and defaults
53///
54/// # Examples
55///
56/// ```
57/// use cvmcount::CVM;
58///
59/// // Using defaults (`epsilon=0.8`, `confidence=0.9`, `size=1000`)
60/// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
61///
62/// // Custom parameters
63/// let cvm: CVM<i32> = CVM::<i32>::builder()
64/// .epsilon(0.05) // 5 % accuracy
65/// .confidence(0.99) // 99 % confidence
66/// .estimated_size(10_000)
67/// .build()
68/// .unwrap();
69///
70/// // Using delta instead of confidence
71/// let cvm: CVM<String> = CVM::<String>::builder()
72/// .epsilon(0.1)
73/// .delta(0.01) // 1 % failure probability
74/// .build()
75/// .unwrap();
76/// ```
77#[derive(Debug, Clone, Default)]
78pub struct CVMBuilder {
79 epsilon: Option<f64>,
80 confidence_spec: Option<ConfidenceSpec>,
81 stream_size: Option<usize>,
82}
83
84impl CVMBuilder {
85 /// Create a new builder with default values
86 pub fn new() -> Self {
87 Self::default()
88 }
89
90 /// Set the epsilon parameter (accuracy requirement)
91 ///
92 /// `Epsilon` determines how close you want your estimate to be to the true number
93 /// of distinct elements. A smaller `ε` means you require a more precise estimate.
94 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the
95 /// actual value.
96 ///
97 /// Must be between 0.0 and 1.0 (exclusive).
98 pub fn epsilon(mut self, epsilon: f64) -> Self {
99 self.epsilon = Some(epsilon);
100 self
101 }
102
103 /// Set the confidence level (probability that the estimate will be accurate)
104 ///
105 /// Confidence represents how certain you want to be that the algorithm's
106 /// estimate will fall within the desired accuracy range. For example,
107 /// `confidence = 0.99` means you're 99 % sure the estimate will be accurate.
108 ///
109 /// Must be between 0.0 and 1.0 (exclusive).
110 /// Cannot be used together with [`Self::delta`] – the last one called will be used.
111 pub fn confidence(mut self, confidence: f64) -> Self {
112 self.confidence_spec = Some(ConfidenceSpec::Confidence(confidence));
113 self
114 }
115
116 /// Set the delta parameter (probability of failure)
117 ///
118 /// Delta represents the probability that the algorithm's estimate will fall
119 /// outside the desired accuracy range. For example, `delta = 0.01` means there's
120 /// a 1 % chance the estimate will be inaccurate.
121 ///
122 /// Must be between 0.0 and 1.0 (exclusive).
123 /// Cannot be used together with [`Self::confidence()`] – the last one called will be used.
124 pub fn delta(mut self, delta: f64) -> Self {
125 self.confidence_spec = Some(ConfidenceSpec::Delta(delta));
126 self
127 }
128
129 /// Set the estimated stream size
130 ///
131 /// This is used to determine buffer size and can be a loose approximation.
132 /// The closer it is to the actual stream size, the more accurate the results
133 /// will be.
134 pub fn estimated_size(mut self, size: usize) -> Self {
135 self.stream_size = Some(size);
136 self
137 }
138
139 /// Build the CVM instance with validation
140 ///
141 /// Uses the following defaults if not specified:
142 /// - `epsilon: 0.8` (good starting point for most applications)
143 /// - `confidence: 0.9` (90 % confidence, equivalent to delta = 0.1)
144 /// - `estimated_size: 1000`
145 ///
146 /// Returns an error if any parameters are invalid.
147 pub fn build<T: Ord>(self) -> Result<CVM<T>, String> {
148 // Validate and get epsilon
149 let epsilon = self.epsilon.unwrap_or(0.8);
150 if epsilon <= 0.0 || epsilon >= 1.0 {
151 return Err("Epsilon must be between 0.0 and 1.0 (exclusive)".to_string());
152 }
153
154 // Validate and get delta
155 let confidence_spec = self
156 .confidence_spec
157 .unwrap_or(ConfidenceSpec::Confidence(0.9));
158 let validated_spec = confidence_spec.validate()?;
159 let delta = validated_spec.to_delta();
160
161 // Validate and get stream size
162 let stream_size = self.stream_size.unwrap_or(1000);
163 if stream_size == 0 {
164 return Err("Stream size must be greater than 0".to_string());
165 }
166
167 Ok(CVM::new(epsilon, delta, stream_size))
168 }
169}
170
171/// A counter implementing the CVM algorithm
172///
173/// This implementation uses a treap (randomized binary search tree) as the buffer,
174/// which provides `O(log n)` operations while maintaining the probabilistic properties
175/// needed for the algorithm.
176///
177/// Note that the CVM struct's buffer takes ownership of its elements.
178pub struct CVM<T: Ord> {
179 buf_size: usize,
180 buf: Treap<T>,
181 probability: f64,
182 rng: StdRng,
183}
184
185impl<T: Ord> CVM<T> {
186 /// Create a new builder for constructing CVM instances
187 ///
188 /// The builder provides a more ergonomic way to construct CVM instances with
189 /// validation and sensible defaults.
190 ///
191 /// # Examples
192 ///
193 /// ```
194 /// use cvmcount::CVM;
195 ///
196 /// // Using defaults
197 /// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
198 ///
199 /// // Custom configuration
200 /// let cvm: CVM<i32> = CVM::<i32>::builder()
201 /// .epsilon(0.05)
202 /// .confidence(0.99)
203 /// .estimated_size(10_000)
204 /// .build()
205 /// .unwrap();
206 /// ```
207 pub fn builder() -> CVMBuilder {
208 CVMBuilder::new()
209 }
210
211 /// Initialise the algorithm
212 ///
213 /// `epsilon`: how close you want your estimate to be to the true number of distinct elements.
214 /// A smaller `ε` means you require a more precise estimate.
215 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the actual value.
216 /// An epsilon of `0.8` is a good starting point for most applications.
217 ///
218 /// `delta`: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence
219 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a
220 /// higher chance the estimate might be outside the desired range.
221 /// A `delta` of `0.1` is a good starting point for most applications.
222 ///
223 /// `stream_size`: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size,
224 /// the more accurate the result will be.
225 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self {
226 let bufsize = buffer_size(epsilon, delta, stream_size);
227 Self {
228 buf_size: bufsize,
229 buf: Treap::new(),
230 probability: 1.0,
231 rng: StdRng::from_os_rng(),
232 }
233 }
234 /// Add an element, potentially updating the unique element count
235 pub fn process_element(&mut self, elem: T) {
236 // The algorithm works as follows:
237 // 1. If element exists in buffer, remove it (this ensures proper sampling)
238 // 2. Add element back with current probability
239 // 3. If buffer is full, remove ~half the elements and halve the probability
240 // This creates a geometric sampling scheme that provides an unbiased estimate
241 if self.buf.contains(&elem) {
242 self.buf.remove(&elem);
243 }
244 if self.rng.random_bool(self.probability) {
245 self.buf.insert(elem, &mut self.rng);
246 }
247 while self.buf.len() == self.buf_size {
248 self.clear_about_half();
249 self.probability /= 2.0;
250 }
251 }
252 // remove around half of the elements at random
253 fn clear_about_half(&mut self) {
254 // Need to capture rng reference to use in closure
255 let rng = &mut self.rng;
256 self.buf.retain(|_| rng.random_bool(0.5));
257 }
258 /// Process an entire iterator of owned values and return the final estimate
259 ///
260 /// This is a convenience method that processes all elements from an iterator
261 /// and returns the final count estimate. The iterator must yield owned values
262 /// that the CVM can take ownership of.
263 ///
264 /// # Examples
265 ///
266 /// ```
267 /// use cvmcount::CVM;
268 ///
269 /// // Process owned strings
270 /// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
271 /// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
272 /// let estimate = cvm.process_stream(words);
273 /// assert!(estimate > 0.0);
274 ///
275 /// // Process numeric data
276 /// let numbers = vec![1, 2, 3, 2, 1, 4];
277 /// let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap();
278 /// let estimate = cvm.process_stream(numbers);
279 /// assert!(estimate > 0.0);
280 ///
281 /// // When you have borrowed data, clone explicitly
282 /// let borrowed_words = vec!["hello", "world", "hello"];
283 /// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
284 /// let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string()));
285 /// assert!(estimate > 0.0);
286 /// ```
287 pub fn process_stream<I>(&mut self, iter: I) -> f64
288 where
289 I: IntoIterator<Item = T>,
290 {
291 for item in iter {
292 self.process_element(item);
293 }
294 self.calculate_final_result()
295 }
296
297 /// Calculate the current unique element count. You can continue to add elements after calling this method.
298 pub fn calculate_final_result(&self) -> f64 {
299 self.buf.len() as f64 / self.probability
300 }
301}
302
303/// Extension trait for iterators to estimate distinct count directly
304///
305/// This trait provides convenient methods to estimate distinct counts from iterators
306/// without manually creating and managing a CVM instance.
307///
308/// # Examples
309///
310/// ```
311/// use cvmcount::{CVM, EstimateDistinct};
312///
313/// // Simple usage with default parameters
314/// let numbers = vec![1, 2, 3, 2, 1, 4, 5];
315/// let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000);
316/// assert!(estimate > 0.0);
317///
318/// // Using builder pattern for more control
319/// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
320/// let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
321/// let estimate = words.into_iter().estimate_distinct_with_builder(builder).unwrap();
322/// assert!(estimate > 0.0);
323/// ```
324pub trait EstimateDistinct<T: Ord>: Iterator<Item = T> + Sized {
325 /// Estimate distinct count using the CVM algorithm with specified parameters
326 ///
327 /// # Arguments
328 ///
329 /// * `epsilon` - Accuracy requirement (smaller = more accurate)
330 /// * `delta` - Failure probability (smaller = more confident)
331 /// * `estimated_size` - Rough estimate of total stream size
332 ///
333 /// # Returns
334 ///
335 /// The estimated number of distinct elements
336 fn estimate_distinct_count(self, epsilon: f64, delta: f64, estimated_size: usize) -> f64 {
337 let mut cvm = CVM::new(epsilon, delta, estimated_size);
338 cvm.process_stream(self)
339 }
340
341 /// Estimate distinct count using a builder for more ergonomic configuration
342 ///
343 /// # Arguments
344 ///
345 /// * `builder` - A configured CVMBuilder instance
346 ///
347 /// # Returns
348 ///
349 /// Result containing the estimated number of distinct elements or an error message
350 ///
351 /// # Examples
352 ///
353 /// ```
354 /// use cvmcount::{CVM, EstimateDistinct};
355 ///
356 /// let data = vec![1, 2, 3, 2, 1];
357 /// let builder = CVM::<i32>::builder().epsilon(0.05).confidence(0.99);
358 /// let estimate = data.into_iter().estimate_distinct_with_builder(builder).unwrap();
359 /// assert!(estimate > 0.0);
360 /// ```
361 fn estimate_distinct_with_builder(self, builder: CVMBuilder) -> Result<f64, String> {
362 let mut cvm: CVM<T> = builder.build()?;
363 Ok(cvm.process_stream(self))
364 }
365}
366
367/// Implement EstimateDistinct for all iterators that yield Ord types
368impl<T: Ord, I: Iterator<Item = T>> EstimateDistinct<T> for I {}
369
370// Calculate threshold (buf_size) value for the F0-Estimator algorithm
371fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize {
372 ((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize
373}
374
375#[cfg(test)]
376mod tests {
377 use std::{
378 fs::File,
379 io::{BufRead, BufReader},
380 path::Path,
381 };
382
383 use super::{CVM, ConfidenceSpec, EstimateDistinct};
384 use regex::Regex;
385 use std::collections::HashSet;
386
387 fn open_file<P>(filename: P) -> BufReader<File>
388 where
389 P: AsRef<Path>,
390 {
391 let f = File::open(filename).expect("Couldn't read from file");
392 BufReader::new(f)
393 }
394
395 fn line_to_word(re: &Regex, hs: &mut HashSet<String>, line: &str) {
396 let words = line.split(' ');
397 words.for_each(|word| {
398 let clean_word = re.replace_all(word, "").to_lowercase();
399 hs.insert(clean_word);
400 })
401 }
402 #[test]
403 fn actual() {
404 let input_file = "benches/kiy.txt";
405 let re = Regex::new(r"[^\w\s]").unwrap();
406 let br = open_file(input_file);
407 let mut hs = HashSet::new();
408 br.lines()
409 .for_each(|line| line_to_word(&re, &mut hs, &line.unwrap()));
410 assert_eq!(hs.len(), 9016)
411 }
412
413 #[test]
414 fn test_builder_defaults() {
415 let cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
416 // Verify that it's properly constructed with defaults
417 assert_eq!(cvm.calculate_final_result(), 0.0); // Empty buffer
418 }
419
420 #[test]
421 fn test_builder_custom_params() {
422 let cvm: CVM<i32> = CVM::<i32>::builder()
423 .epsilon(0.05)
424 .confidence(0.99)
425 .estimated_size(5000)
426 .build()
427 .unwrap();
428
429 // Test that it works by processing some elements
430 let mut cvm = cvm;
431 for i in 0..100 {
432 cvm.process_element(i);
433 }
434 let result = cvm.calculate_final_result();
435 assert!(result > 0.0);
436 }
437
438 #[test]
439 fn test_builder_delta_vs_confidence() {
440 // Test that confidence and delta give equivalent results
441 let cvm1: CVM<i32> = CVM::<i32>::builder().confidence(0.9).build().unwrap();
442
443 let cvm2: CVM<i32> = CVM::<i32>::builder().delta(0.1).build().unwrap();
444
445 // They should have the same internal configuration
446 // (we can't directly test this without exposing internals,
447 // but we can test they both work)
448 assert_eq!(cvm1.calculate_final_result(), 0.0);
449 assert_eq!(cvm2.calculate_final_result(), 0.0);
450 }
451
452 #[test]
453 fn test_builder_last_wins() {
454 // Test that the last confidence/delta setting wins
455 let cvm: CVM<i32> = CVM::<i32>::builder()
456 .confidence(0.9)
457 .delta(0.05) // This should override confidence
458 .build()
459 .unwrap();
460
461 assert_eq!(cvm.calculate_final_result(), 0.0);
462 }
463
464 #[test]
465 fn test_builder_validation() {
466 // Test epsilon validation
467 let result = CVM::<i32>::builder().epsilon(0.0).build::<i32>();
468 assert!(result.is_err());
469
470 let result = CVM::<i32>::builder().epsilon(1.0).build::<i32>();
471 assert!(result.is_err());
472
473 let result = CVM::<i32>::builder().epsilon(-0.5).build::<i32>();
474 assert!(result.is_err());
475
476 // Test confidence validation
477 let result = CVM::<i32>::builder().confidence(0.0).build::<i32>();
478 assert!(result.is_err());
479
480 let result = CVM::<i32>::builder().confidence(1.0).build::<i32>();
481 assert!(result.is_err());
482
483 // Test delta validation
484 let result = CVM::<i32>::builder().delta(0.0).build::<i32>();
485 assert!(result.is_err());
486
487 let result = CVM::<i32>::builder().delta(1.0).build::<i32>();
488 assert!(result.is_err());
489
490 // Test stream size validation
491 let result = CVM::<i32>::builder().estimated_size(0).build::<i32>();
492 assert!(result.is_err());
493 }
494
495 #[test]
496 fn test_builder_method_chaining() {
497 let result = CVM::<String>::builder()
498 .epsilon(0.1)
499 .confidence(0.95)
500 .estimated_size(2000)
501 .build::<String>();
502
503 assert!(result.is_ok());
504 }
505
506 #[test]
507 fn test_confidence_spec_conversion() {
508 // Test ConfidenceSpec::to_delta conversion
509 let confidence_spec = ConfidenceSpec::Confidence(0.9);
510 assert!((confidence_spec.to_delta() - 0.1).abs() < f64::EPSILON);
511
512 let delta_spec = ConfidenceSpec::Delta(0.05);
513 assert!((delta_spec.to_delta() - 0.05).abs() < f64::EPSILON);
514 }
515
516 #[test]
517 fn test_process_stream() {
518 let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap();
519
520 // Test with vector
521 let numbers = vec![1, 2, 3, 2, 1, 4, 5, 3];
522 let estimate = cvm.process_stream(numbers);
523 assert!(estimate > 0.0);
524
525 // Test with range
526 let mut cvm2: CVM<i32> = CVM::<i32>::builder().build().unwrap();
527 let estimate2 = cvm2.process_stream(1..=100);
528 assert!(estimate2 > 0.0);
529 }
530
531 #[test]
532 fn test_process_stream_strings() {
533 let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
534
535 // Test with owned strings
536 let words = vec![
537 "hello".to_string(),
538 "world".to_string(),
539 "hello".to_string(),
540 "rust".to_string(),
541 ];
542 let estimate = cvm.process_stream(words);
543 assert!(estimate > 0.0);
544 }
545
546 #[test]
547 fn test_process_stream_with_map() {
548 let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
549
550 // Test with borrowed data mapped to owned
551 let borrowed_words = ["hello", "world", "hello", "rust"];
552 let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string()));
553 assert!(estimate > 0.0);
554 }
555
556 #[test]
557 fn test_estimate_distinct_trait() {
558 // Test simple usage
559 let numbers = vec![1, 2, 3, 2, 1, 4, 5];
560 let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000);
561 assert!(estimate > 0.0);
562
563 // Test with builder
564 let words = vec![
565 "hello".to_string(),
566 "world".to_string(),
567 "hello".to_string(),
568 ];
569 let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
570 let estimate = words
571 .into_iter()
572 .estimate_distinct_with_builder(builder)
573 .unwrap();
574 assert!(estimate > 0.0);
575 }
576
577 #[test]
578 fn test_estimate_distinct_with_cloning() {
579 // Test that explicit cloning works as expected
580 let borrowed_numbers = [1, 2, 3, 2, 1, 4];
581 let estimate = borrowed_numbers
582 .iter()
583 .cloned()
584 .estimate_distinct_count(0.1, 0.1, 100);
585 assert!(estimate > 0.0);
586 }
587
588 #[test]
589 fn test_streaming_integration_with_file_processing() {
590 // Simulate file processing pattern
591 let lines = vec![
592 "hello world".to_string(),
593 "world peace".to_string(),
594 "hello rust".to_string(),
595 ];
596
597 let mut cvm: CVM<String> = CVM::<String>::builder()
598 .epsilon(0.1)
599 .confidence(0.9)
600 .build()
601 .unwrap();
602
603 // Process words from all lines
604 let words: Vec<String> = lines
605 .into_iter()
606 .flat_map(|line| {
607 line.split_whitespace()
608 .map(|s| s.to_string())
609 .collect::<Vec<_>>()
610 })
611 .collect();
612 let estimate = cvm.process_stream(words);
613
614 assert!(estimate > 0.0);
615 }
616
617 #[test]
618 fn test_streaming_large_dataset() {
619 // Test with a larger dataset to verify the algorithm works
620 let mut cvm: CVM<i32> = CVM::<i32>::builder()
621 .epsilon(0.1)
622 .confidence(0.9)
623 .estimated_size(10_000)
624 .build()
625 .unwrap();
626
627 // Create data with known distinct count (1000 unique values, repeated)
628 let data: Vec<i32> = (0..1000).cycle().take(10_000).collect();
629 let estimate = cvm.process_stream(data);
630
631 // The estimate should be reasonably close to 1000
632 // With epsilon=0.1, we expect within 10 % accuracy most of the time
633 assert!(estimate > 500.0 && estimate < 2000.0);
634 }
635}