Rust implementation of the CVM algorithm for counting distinct elements in a stream
1//! An implementation of the CVM fast element counting algorithm presented in
2//! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. <https://doi.org/10.4230/LIPIcs.ESA.2022.34>
3//!
4//! This implementation uses a treap data structure as the buffer, following Knuth's original design.
5
6mod treap;
7
8use crate::treap::Treap;
9use rand::rngs::StdRng;
10use rand::{Rng, SeedableRng};
11
12/// Specification for confidence level in the CVM algorithm
13#[derive(Debug, Clone, Copy)]
14pub enum ConfidenceSpec {
15 /// Specify delta directly (probability of failure)
16 Delta(f64),
17 /// Specify confidence level (probability of success)
18 Confidence(f64),
19}
20
21impl ConfidenceSpec {
22 /// Convert to delta value for internal use
23 fn to_delta(self) -> f64 {
24 match self {
25 ConfidenceSpec::Delta(delta) => delta,
26 ConfidenceSpec::Confidence(confidence) => 1.0 - confidence,
27 }
28 }
29
30 /// Validate the confidence specification
31 fn validate(self) -> Result<Self, String> {
32 match self {
33 ConfidenceSpec::Delta(delta) => {
34 if delta <= 0.0 || delta >= 1.0 {
35 Err("Delta must be between 0.0 and 1.0 (exclusive)".to_string())
36 } else {
37 Ok(self)
38 }
39 }
40 ConfidenceSpec::Confidence(confidence) => {
41 if confidence <= 0.0 || confidence >= 1.0 {
42 Err("Confidence must be between 0.0 and 1.0 (exclusive)".to_string())
43 } else {
44 Ok(self)
45 }
46 }
47 }
48 }
49}
50
51/// Builder for constructing CVM instances with validation and defaults
52///
53/// # Examples
54///
55/// ```
56/// use cvmcount::CVM;
57///
58/// // Using defaults (`epsilon=0.8`, `confidence=0.9`, `size=1000`)
59/// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
60///
61/// // Custom parameters
62/// let cvm: CVM<i32> = CVM::<i32>::builder()
63/// .epsilon(0.05) // 5 % accuracy
64/// .confidence(0.99) // 99 % confidence
65/// .estimated_size(10_000)
66/// .build()
67/// .unwrap();
68///
69/// // Using delta instead of confidence
70/// let cvm: CVM<String> = CVM::<String>::builder()
71/// .epsilon(0.1)
72/// .delta(0.01) // 1 % failure probability
73/// .build()
74/// .unwrap();
75/// ```
76#[derive(Debug, Clone, Default)]
77pub struct CVMBuilder {
78 epsilon: Option<f64>,
79 confidence_spec: Option<ConfidenceSpec>,
80 stream_size: Option<usize>,
81}
82
83impl CVMBuilder {
84 /// Create a new builder with default values
85 pub fn new() -> Self {
86 Self::default()
87 }
88
89 /// Set the epsilon parameter (accuracy requirement)
90 ///
91 /// `Epsilon` determines how close you want your estimate to be to the true number
92 /// of distinct elements. A smaller `ε` means you require a more precise estimate.
93 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the
94 /// actual value.
95 ///
96 /// Must be between 0.0 and 1.0 (exclusive).
97 pub fn epsilon(mut self, epsilon: f64) -> Self {
98 self.epsilon = Some(epsilon);
99 self
100 }
101
102 /// Set the confidence level (probability that the estimate will be accurate)
103 ///
104 /// Confidence represents how certain you want to be that the algorithm's
105 /// estimate will fall within the desired accuracy range. For example,
106 /// `confidence = 0.99` means you're 99 % sure the estimate will be accurate.
107 ///
108 /// Must be between 0.0 and 1.0 (exclusive).
109 /// Cannot be used together with [`Self::delta`] – the last one called will be used.
110 pub fn confidence(mut self, confidence: f64) -> Self {
111 self.confidence_spec = Some(ConfidenceSpec::Confidence(confidence));
112 self
113 }
114
115 /// Set the delta parameter (probability of failure)
116 ///
117 /// Delta represents the probability that the algorithm's estimate will fall
118 /// outside the desired accuracy range. For example, `delta = 0.01` means there's
119 /// a 1 % chance the estimate will be inaccurate.
120 ///
121 /// Must be between 0.0 and 1.0 (exclusive).
122 /// Cannot be used together with [`Self::confidence()`] – the last one called will be used.
123 pub fn delta(mut self, delta: f64) -> Self {
124 self.confidence_spec = Some(ConfidenceSpec::Delta(delta));
125 self
126 }
127
128 /// Set the estimated stream size
129 ///
130 /// This is used to determine buffer size and can be a loose approximation.
131 /// The closer it is to the actual stream size, the more accurate the results
132 /// will be.
133 pub fn estimated_size(mut self, size: usize) -> Self {
134 self.stream_size = Some(size);
135 self
136 }
137
138 /// Build the CVM instance with validation
139 ///
140 /// Uses the following defaults if not specified:
141 /// - `epsilon: 0.8` (good starting point for most applications)
142 /// - `confidence: 0.9` (90 % confidence, equivalent to delta = 0.1)
143 /// - `estimated_size: 1000`
144 ///
145 /// Returns an error if any parameters are invalid.
146 pub fn build<T: Ord>(self) -> Result<CVM<T>, String> {
147 // Validate and get epsilon
148 let epsilon = self.epsilon.unwrap_or(0.8);
149 if epsilon <= 0.0 || epsilon >= 1.0 {
150 return Err("Epsilon must be between 0.0 and 1.0 (exclusive)".to_string());
151 }
152
153 // Validate and get delta
154 let confidence_spec = self
155 .confidence_spec
156 .unwrap_or(ConfidenceSpec::Confidence(0.9));
157 let validated_spec = confidence_spec.validate()?;
158 let delta = validated_spec.to_delta();
159
160 // Validate and get stream size
161 let stream_size = self.stream_size.unwrap_or(1000);
162 if stream_size == 0 {
163 return Err("Stream size must be greater than 0".to_string());
164 }
165
166 Ok(CVM::new(epsilon, delta, stream_size))
167 }
168}
169
170/// A counter implementing the CVM algorithm
171///
172/// This implementation uses a treap (randomized binary search tree) as the buffer,
173/// which provides `O(log n)` operations while maintaining the probabilistic properties
174/// needed for the algorithm.
175///
176/// Note that the CVM struct's buffer takes ownership of its elements.
177pub struct CVM<T: Ord> {
178 buf_size: usize,
179 buf: Treap<T>,
180 probability: f64,
181 rng: StdRng,
182}
183
184impl<T: Ord> CVM<T> {
185 /// Create a new builder for constructing CVM instances
186 ///
187 /// The builder provides a more ergonomic way to construct CVM instances with
188 /// validation and sensible defaults.
189 ///
190 /// # Examples
191 ///
192 /// ```
193 /// use cvmcount::CVM;
194 ///
195 /// // Using defaults
196 /// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
197 ///
198 /// // Custom configuration
199 /// let cvm: CVM<i32> = CVM::<i32>::builder()
200 /// .epsilon(0.05)
201 /// .confidence(0.99)
202 /// .estimated_size(10_000)
203 /// .build()
204 /// .unwrap();
205 /// ```
206 pub fn builder() -> CVMBuilder {
207 CVMBuilder::new()
208 }
209
210 /// Initialise the algorithm
211 ///
212 /// `epsilon`: how close you want your estimate to be to the true number of distinct elements.
213 /// A smaller `ε` means you require a more precise estimate.
214 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the actual value.
215 /// An epsilon of `0.8` is a good starting point for most applications.
216 ///
217 /// `delta`: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence
218 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a
219 /// higher chance the estimate might be outside the desired range.
220 /// A `delta` of `0.1` is a good starting point for most applications.
221 ///
222 /// `stream_size`: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size,
223 /// the more accurate the result will be.
224 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self {
225 let bufsize = buffer_size(epsilon, delta, stream_size);
226 Self {
227 buf_size: bufsize,
228 buf: Treap::new(),
229 probability: 1.0,
230 rng: StdRng::from_entropy(),
231 }
232 }
233 /// Add an element, potentially updating the unique element count
234 pub fn process_element(&mut self, elem: T) {
235 // The algorithm works as follows:
236 // 1. If element exists in buffer, remove it (this ensures proper sampling)
237 // 2. Add element back with current probability
238 // 3. If buffer is full, remove ~half the elements and halve the probability
239 // This creates a geometric sampling scheme that provides an unbiased estimate
240 if self.buf.contains(&elem) {
241 self.buf.remove(&elem);
242 }
243 if self.rng.gen_bool(self.probability) {
244 self.buf.insert(elem, &mut self.rng);
245 }
246 while self.buf.len() == self.buf_size {
247 self.clear_about_half();
248 self.probability /= 2.0;
249 }
250 }
251 // remove around half of the elements at random
252 fn clear_about_half(&mut self) {
253 // Need to capture rng reference to use in closure
254 let rng = &mut self.rng;
255 self.buf.retain(|_| rng.gen_bool(0.5));
256 }
257 /// Process an entire iterator of owned values and return the final estimate
258 ///
259 /// This is a convenience method that processes all elements from an iterator
260 /// and returns the final count estimate. The iterator must yield owned values
261 /// that the CVM can take ownership of.
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use cvmcount::CVM;
267 ///
268 /// // Process owned strings
269 /// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
270 /// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
271 /// let estimate = cvm.process_stream(words);
272 /// assert!(estimate > 0.0);
273 ///
274 /// // Process numeric data
275 /// let numbers = vec![1, 2, 3, 2, 1, 4];
276 /// let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap();
277 /// let estimate = cvm.process_stream(numbers);
278 /// assert!(estimate > 0.0);
279 ///
280 /// // When you have borrowed data, clone explicitly
281 /// let borrowed_words = vec!["hello", "world", "hello"];
282 /// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
283 /// let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string()));
284 /// assert!(estimate > 0.0);
285 /// ```
286 pub fn process_stream<I>(&mut self, iter: I) -> f64
287 where
288 I: IntoIterator<Item = T>,
289 {
290 for item in iter {
291 self.process_element(item);
292 }
293 self.calculate_final_result()
294 }
295
296 /// Calculate the current unique element count. You can continue to add elements after calling this method.
297 pub fn calculate_final_result(&self) -> f64 {
298 self.buf.len() as f64 / self.probability
299 }
300}
301
302/// Extension trait for iterators to estimate distinct count directly
303///
304/// This trait provides convenient methods to estimate distinct counts from iterators
305/// without manually creating and managing a CVM instance.
306///
307/// # Examples
308///
309/// ```
310/// use cvmcount::{CVM, EstimateDistinct};
311///
312/// // Simple usage with default parameters
313/// let numbers = vec![1, 2, 3, 2, 1, 4, 5];
314/// let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000);
315/// assert!(estimate > 0.0);
316///
317/// // Using builder pattern for more control
318/// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
319/// let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
320/// let estimate = words.into_iter().estimate_distinct_with_builder(builder).unwrap();
321/// assert!(estimate > 0.0);
322/// ```
323pub trait EstimateDistinct<T: Ord>: Iterator<Item = T> + Sized {
324 /// Estimate distinct count using the CVM algorithm with specified parameters
325 ///
326 /// # Arguments
327 ///
328 /// * `epsilon` - Accuracy requirement (smaller = more accurate)
329 /// * `delta` - Failure probability (smaller = more confident)
330 /// * `estimated_size` - Rough estimate of total stream size
331 ///
332 /// # Returns
333 ///
334 /// The estimated number of distinct elements
335 fn estimate_distinct_count(self, epsilon: f64, delta: f64, estimated_size: usize) -> f64 {
336 let mut cvm = CVM::new(epsilon, delta, estimated_size);
337 cvm.process_stream(self)
338 }
339
340 /// Estimate distinct count using a builder for more ergonomic configuration
341 ///
342 /// # Arguments
343 ///
344 /// * `builder` - A configured CVMBuilder instance
345 ///
346 /// # Returns
347 ///
348 /// Result containing the estimated number of distinct elements or an error message
349 ///
350 /// # Examples
351 ///
352 /// ```
353 /// use cvmcount::{CVM, EstimateDistinct};
354 ///
355 /// let data = vec![1, 2, 3, 2, 1];
356 /// let builder = CVM::<i32>::builder().epsilon(0.05).confidence(0.99);
357 /// let estimate = data.into_iter().estimate_distinct_with_builder(builder).unwrap();
358 /// assert!(estimate > 0.0);
359 /// ```
360 fn estimate_distinct_with_builder(self, builder: CVMBuilder) -> Result<f64, String> {
361 let mut cvm: CVM<T> = builder.build()?;
362 Ok(cvm.process_stream(self))
363 }
364}
365
366/// Implement EstimateDistinct for all iterators that yield Ord types
367impl<T: Ord, I: Iterator<Item = T>> EstimateDistinct<T> for I {}
368
369// Calculate threshold (buf_size) value for the F0-Estimator algorithm
370fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize {
371 ((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize
372}
373
374#[cfg(test)]
375mod tests {
376 use std::{
377 fs::File,
378 io::{BufRead, BufReader},
379 path::Path,
380 };
381
382 use super::{CVM, ConfidenceSpec, EstimateDistinct};
383 use regex::Regex;
384 use std::collections::HashSet;
385
386 fn open_file<P>(filename: P) -> BufReader<File>
387 where
388 P: AsRef<Path>,
389 {
390 let f = File::open(filename).expect("Couldn't read from file");
391 BufReader::new(f)
392 }
393
394 fn line_to_word(re: &Regex, hs: &mut HashSet<String>, line: &str) {
395 let words = line.split(' ');
396 words.for_each(|word| {
397 let clean_word = re.replace_all(word, "").to_lowercase();
398 hs.insert(clean_word);
399 })
400 }
401 #[test]
402 fn actual() {
403 let input_file = "benches/kiy.txt";
404 let re = Regex::new(r"[^\w\s]").unwrap();
405 let br = open_file(input_file);
406 let mut hs = HashSet::new();
407 br.lines()
408 .for_each(|line| line_to_word(&re, &mut hs, &line.unwrap()));
409 assert_eq!(hs.len(), 9016)
410 }
411
412 #[test]
413 fn test_builder_defaults() {
414 let cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
415 // Verify that it's properly constructed with defaults
416 assert_eq!(cvm.calculate_final_result(), 0.0); // Empty buffer
417 }
418
419 #[test]
420 fn test_builder_custom_params() {
421 let cvm: CVM<i32> = CVM::<i32>::builder()
422 .epsilon(0.05)
423 .confidence(0.99)
424 .estimated_size(5000)
425 .build()
426 .unwrap();
427
428 // Test that it works by processing some elements
429 let mut cvm = cvm;
430 for i in 0..100 {
431 cvm.process_element(i);
432 }
433 let result = cvm.calculate_final_result();
434 assert!(result > 0.0);
435 }
436
437 #[test]
438 fn test_builder_delta_vs_confidence() {
439 // Test that confidence and delta give equivalent results
440 let cvm1: CVM<i32> = CVM::<i32>::builder().confidence(0.9).build().unwrap();
441
442 let cvm2: CVM<i32> = CVM::<i32>::builder().delta(0.1).build().unwrap();
443
444 // They should have the same internal configuration
445 // (we can't directly test this without exposing internals,
446 // but we can test they both work)
447 assert_eq!(cvm1.calculate_final_result(), 0.0);
448 assert_eq!(cvm2.calculate_final_result(), 0.0);
449 }
450
451 #[test]
452 fn test_builder_last_wins() {
453 // Test that the last confidence/delta setting wins
454 let cvm: CVM<i32> = CVM::<i32>::builder()
455 .confidence(0.9)
456 .delta(0.05) // This should override confidence
457 .build()
458 .unwrap();
459
460 assert_eq!(cvm.calculate_final_result(), 0.0);
461 }
462
463 #[test]
464 fn test_builder_validation() {
465 // Test epsilon validation
466 let result = CVM::<i32>::builder().epsilon(0.0).build::<i32>();
467 assert!(result.is_err());
468
469 let result = CVM::<i32>::builder().epsilon(1.0).build::<i32>();
470 assert!(result.is_err());
471
472 let result = CVM::<i32>::builder().epsilon(-0.5).build::<i32>();
473 assert!(result.is_err());
474
475 // Test confidence validation
476 let result = CVM::<i32>::builder().confidence(0.0).build::<i32>();
477 assert!(result.is_err());
478
479 let result = CVM::<i32>::builder().confidence(1.0).build::<i32>();
480 assert!(result.is_err());
481
482 // Test delta validation
483 let result = CVM::<i32>::builder().delta(0.0).build::<i32>();
484 assert!(result.is_err());
485
486 let result = CVM::<i32>::builder().delta(1.0).build::<i32>();
487 assert!(result.is_err());
488
489 // Test stream size validation
490 let result = CVM::<i32>::builder().estimated_size(0).build::<i32>();
491 assert!(result.is_err());
492 }
493
494 #[test]
495 fn test_builder_method_chaining() {
496 let result = CVM::<String>::builder()
497 .epsilon(0.1)
498 .confidence(0.95)
499 .estimated_size(2000)
500 .build::<String>();
501
502 assert!(result.is_ok());
503 }
504
505 #[test]
506 fn test_confidence_spec_conversion() {
507 // Test ConfidenceSpec::to_delta conversion
508 let confidence_spec = ConfidenceSpec::Confidence(0.9);
509 assert!((confidence_spec.to_delta() - 0.1).abs() < f64::EPSILON);
510
511 let delta_spec = ConfidenceSpec::Delta(0.05);
512 assert!((delta_spec.to_delta() - 0.05).abs() < f64::EPSILON);
513 }
514
515 #[test]
516 fn test_process_stream() {
517 let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap();
518
519 // Test with vector
520 let numbers = vec![1, 2, 3, 2, 1, 4, 5, 3];
521 let estimate = cvm.process_stream(numbers);
522 assert!(estimate > 0.0);
523
524 // Test with range
525 let mut cvm2: CVM<i32> = CVM::<i32>::builder().build().unwrap();
526 let estimate2 = cvm2.process_stream(1..=100);
527 assert!(estimate2 > 0.0);
528 }
529
530 #[test]
531 fn test_process_stream_strings() {
532 let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
533
534 // Test with owned strings
535 let words = vec![
536 "hello".to_string(),
537 "world".to_string(),
538 "hello".to_string(),
539 "rust".to_string(),
540 ];
541 let estimate = cvm.process_stream(words);
542 assert!(estimate > 0.0);
543 }
544
545 #[test]
546 fn test_process_stream_with_map() {
547 let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
548
549 // Test with borrowed data mapped to owned
550 let borrowed_words = ["hello", "world", "hello", "rust"];
551 let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string()));
552 assert!(estimate > 0.0);
553 }
554
555 #[test]
556 fn test_estimate_distinct_trait() {
557 // Test simple usage
558 let numbers = vec![1, 2, 3, 2, 1, 4, 5];
559 let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000);
560 assert!(estimate > 0.0);
561
562 // Test with builder
563 let words = vec![
564 "hello".to_string(),
565 "world".to_string(),
566 "hello".to_string(),
567 ];
568 let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
569 let estimate = words
570 .into_iter()
571 .estimate_distinct_with_builder(builder)
572 .unwrap();
573 assert!(estimate > 0.0);
574 }
575
576 #[test]
577 fn test_estimate_distinct_with_cloning() {
578 // Test that explicit cloning works as expected
579 let borrowed_numbers = [1, 2, 3, 2, 1, 4];
580 let estimate = borrowed_numbers
581 .iter()
582 .cloned()
583 .estimate_distinct_count(0.1, 0.1, 100);
584 assert!(estimate > 0.0);
585 }
586
587 #[test]
588 fn test_streaming_integration_with_file_processing() {
589 // Simulate file processing pattern
590 let lines = vec![
591 "hello world".to_string(),
592 "world peace".to_string(),
593 "hello rust".to_string(),
594 ];
595
596 let mut cvm: CVM<String> = CVM::<String>::builder()
597 .epsilon(0.1)
598 .confidence(0.9)
599 .build()
600 .unwrap();
601
602 // Process words from all lines
603 let words: Vec<String> = lines
604 .into_iter()
605 .flat_map(|line| {
606 line.split_whitespace()
607 .map(|s| s.to_string())
608 .collect::<Vec<_>>()
609 })
610 .collect();
611 let estimate = cvm.process_stream(words);
612
613 assert!(estimate > 0.0);
614 }
615
616 #[test]
617 fn test_streaming_large_dataset() {
618 // Test with a larger dataset to verify the algorithm works
619 let mut cvm: CVM<i32> = CVM::<i32>::builder()
620 .epsilon(0.1)
621 .confidence(0.9)
622 .estimated_size(10_000)
623 .build()
624 .unwrap();
625
626 // Create data with known distinct count (1000 unique values, repeated)
627 let data: Vec<i32> = (0..1000).cycle().take(10_000).collect();
628 let estimate = cvm.process_stream(data);
629
630 // The estimate should be reasonably close to 1000
631 // With epsilon=0.1, we expect within 10 % accuracy most of the time
632 assert!(estimate > 500.0 && estimate < 2000.0);
633 }
634}