Rust implementation of the CVM algorithm for counting distinct elements in a stream
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 22 kB View raw
1//! An implementation of the CVM fast element counting algorithm presented in 2//! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. <https://doi.org/10.4230/LIPIcs.ESA.2022.34> 3//! 4//! This implementation uses a treap data structure as the buffer, following Knuth's original design. 5 6mod treap; 7 8use crate::treap::Treap; 9use rand::Rng; 10use rand::SeedableRng; 11use rand::rngs::StdRng; 12 13/// Specification for confidence level in the CVM algorithm 14#[derive(Debug, Clone, Copy)] 15pub enum ConfidenceSpec { 16 /// Specify delta directly (probability of failure) 17 Delta(f64), 18 /// Specify confidence level (probability of success) 19 Confidence(f64), 20} 21 22impl ConfidenceSpec { 23 /// Convert to delta value for internal use 24 fn to_delta(self) -> f64 { 25 match self { 26 ConfidenceSpec::Delta(delta) => delta, 27 ConfidenceSpec::Confidence(confidence) => 1.0 - confidence, 28 } 29 } 30 31 /// Validate the confidence specification 32 fn validate(self) -> Result<Self, String> { 33 match self { 34 ConfidenceSpec::Delta(delta) => { 35 if delta <= 0.0 || delta >= 1.0 { 36 Err("Delta must be between 0.0 and 1.0 (exclusive)".to_string()) 37 } else { 38 Ok(self) 39 } 40 } 41 ConfidenceSpec::Confidence(confidence) => { 42 if confidence <= 0.0 || confidence >= 1.0 { 43 Err("Confidence must be between 0.0 and 1.0 (exclusive)".to_string()) 44 } else { 45 Ok(self) 46 } 47 } 48 } 49 } 50} 51 52/// Builder for constructing CVM instances with validation and defaults 53/// 54/// # Examples 55/// 56/// ``` 57/// use cvmcount::CVM; 58/// 59/// // Using defaults (`epsilon=0.8`, `confidence=0.9`, `size=1000`) 60/// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 61/// 62/// // Custom parameters 63/// let cvm: CVM<i32> = CVM::<i32>::builder() 64/// .epsilon(0.05) // 5 % accuracy 65/// .confidence(0.99) // 99 % confidence 66/// .estimated_size(10_000) 67/// .build() 68/// .unwrap(); 69/// 70/// // Using delta instead of confidence 71/// let cvm: CVM<String> = CVM::<String>::builder() 72/// .epsilon(0.1) 73/// .delta(0.01) // 1 % failure probability 74/// .build() 75/// .unwrap(); 76/// ``` 77#[derive(Debug, Clone, Default)] 78pub struct CVMBuilder { 79 epsilon: Option<f64>, 80 confidence_spec: Option<ConfidenceSpec>, 81 stream_size: Option<usize>, 82} 83 84impl CVMBuilder { 85 /// Create a new builder with default values 86 pub fn new() -> Self { 87 Self::default() 88 } 89 90 /// Set the epsilon parameter (accuracy requirement) 91 /// 92 /// `Epsilon` determines how close you want your estimate to be to the true number 93 /// of distinct elements. A smaller `ε` means you require a more precise estimate. 94 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the 95 /// actual value. 96 /// 97 /// Must be between 0.0 and 1.0 (exclusive). 98 pub fn epsilon(mut self, epsilon: f64) -> Self { 99 self.epsilon = Some(epsilon); 100 self 101 } 102 103 /// Set the confidence level (probability that the estimate will be accurate) 104 /// 105 /// Confidence represents how certain you want to be that the algorithm's 106 /// estimate will fall within the desired accuracy range. For example, 107 /// `confidence = 0.99` means you're 99 % sure the estimate will be accurate. 108 /// 109 /// Must be between 0.0 and 1.0 (exclusive). 110 /// Cannot be used together with [`Self::delta`] – the last one called will be used. 111 pub fn confidence(mut self, confidence: f64) -> Self { 112 self.confidence_spec = Some(ConfidenceSpec::Confidence(confidence)); 113 self 114 } 115 116 /// Set the delta parameter (probability of failure) 117 /// 118 /// Delta represents the probability that the algorithm's estimate will fall 119 /// outside the desired accuracy range. For example, `delta = 0.01` means there's 120 /// a 1 % chance the estimate will be inaccurate. 121 /// 122 /// Must be between 0.0 and 1.0 (exclusive). 123 /// Cannot be used together with [`Self::confidence()`] – the last one called will be used. 124 pub fn delta(mut self, delta: f64) -> Self { 125 self.confidence_spec = Some(ConfidenceSpec::Delta(delta)); 126 self 127 } 128 129 /// Set the estimated stream size 130 /// 131 /// This is used to determine buffer size and can be a loose approximation. 132 /// The closer it is to the actual stream size, the more accurate the results 133 /// will be. 134 pub fn estimated_size(mut self, size: usize) -> Self { 135 self.stream_size = Some(size); 136 self 137 } 138 139 /// Build the CVM instance with validation 140 /// 141 /// Uses the following defaults if not specified: 142 /// - `epsilon: 0.8` (good starting point for most applications) 143 /// - `confidence: 0.9` (90 % confidence, equivalent to delta = 0.1) 144 /// - `estimated_size: 1000` 145 /// 146 /// Returns an error if any parameters are invalid. 147 pub fn build<T: Ord>(self) -> Result<CVM<T>, String> { 148 // Validate and get epsilon 149 let epsilon = self.epsilon.unwrap_or(0.8); 150 if epsilon <= 0.0 || epsilon >= 1.0 { 151 return Err("Epsilon must be between 0.0 and 1.0 (exclusive)".to_string()); 152 } 153 154 // Validate and get delta 155 let confidence_spec = self 156 .confidence_spec 157 .unwrap_or(ConfidenceSpec::Confidence(0.9)); 158 let validated_spec = confidence_spec.validate()?; 159 let delta = validated_spec.to_delta(); 160 161 // Validate and get stream size 162 let stream_size = self.stream_size.unwrap_or(1000); 163 if stream_size == 0 { 164 return Err("Stream size must be greater than 0".to_string()); 165 } 166 167 Ok(CVM::new(epsilon, delta, stream_size)) 168 } 169} 170 171/// A counter implementing the CVM algorithm 172/// 173/// This implementation uses a treap (randomized binary search tree) as the buffer, 174/// which provides `O(log n)` operations while maintaining the probabilistic properties 175/// needed for the algorithm. 176/// 177/// Note that the CVM struct's buffer takes ownership of its elements. 178pub struct CVM<T: Ord> { 179 buf_size: usize, 180 buf: Treap<T>, 181 probability: f64, 182 rng: StdRng, 183} 184 185impl<T: Ord> CVM<T> { 186 /// Create a new builder for constructing CVM instances 187 /// 188 /// The builder provides a more ergonomic way to construct CVM instances with 189 /// validation and sensible defaults. 190 /// 191 /// # Examples 192 /// 193 /// ``` 194 /// use cvmcount::CVM; 195 /// 196 /// // Using defaults 197 /// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 198 /// 199 /// // Custom configuration 200 /// let cvm: CVM<i32> = CVM::<i32>::builder() 201 /// .epsilon(0.05) 202 /// .confidence(0.99) 203 /// .estimated_size(10_000) 204 /// .build() 205 /// .unwrap(); 206 /// ``` 207 pub fn builder() -> CVMBuilder { 208 CVMBuilder::new() 209 } 210 211 /// Initialise the algorithm 212 /// 213 /// `epsilon`: how close you want your estimate to be to the true number of distinct elements. 214 /// A smaller `ε` means you require a more precise estimate. 215 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the actual value. 216 /// An epsilon of `0.8` is a good starting point for most applications. 217 /// 218 /// `delta`: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence 219 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a 220 /// higher chance the estimate might be outside the desired range. 221 /// A `delta` of `0.1` is a good starting point for most applications. 222 /// 223 /// `stream_size`: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size, 224 /// the more accurate the result will be. 225 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self { 226 let bufsize = buffer_size(epsilon, delta, stream_size); 227 Self { 228 buf_size: bufsize, 229 buf: Treap::new(), 230 probability: 1.0, 231 rng: StdRng::from_os_rng(), 232 } 233 } 234 /// Add an element, potentially updating the unique element count 235 pub fn process_element(&mut self, elem: T) { 236 // The algorithm works as follows: 237 // 1. If element exists in buffer, remove it (this ensures proper sampling) 238 // 2. Add element back with current probability 239 // 3. If buffer is full, remove ~half the elements and halve the probability 240 // This creates a geometric sampling scheme that provides an unbiased estimate 241 if self.buf.contains(&elem) { 242 self.buf.remove(&elem); 243 } 244 if self.rng.random_bool(self.probability) { 245 self.buf.insert(elem, &mut self.rng); 246 } 247 while self.buf.len() == self.buf_size { 248 self.clear_about_half(); 249 self.probability /= 2.0; 250 } 251 } 252 // remove around half of the elements at random 253 fn clear_about_half(&mut self) { 254 // Need to capture rng reference to use in closure 255 let rng = &mut self.rng; 256 self.buf.retain(|_| rng.random_bool(0.5)); 257 } 258 /// Process an entire iterator of owned values and return the final estimate 259 /// 260 /// This is a convenience method that processes all elements from an iterator 261 /// and returns the final count estimate. The iterator must yield owned values 262 /// that the CVM can take ownership of. 263 /// 264 /// # Examples 265 /// 266 /// ``` 267 /// use cvmcount::CVM; 268 /// 269 /// // Process owned strings 270 /// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()]; 271 /// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 272 /// let estimate = cvm.process_stream(words); 273 /// assert!(estimate > 0.0); 274 /// 275 /// // Process numeric data 276 /// let numbers = vec![1, 2, 3, 2, 1, 4]; 277 /// let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap(); 278 /// let estimate = cvm.process_stream(numbers); 279 /// assert!(estimate > 0.0); 280 /// 281 /// // When you have borrowed data, clone explicitly 282 /// let borrowed_words = vec!["hello", "world", "hello"]; 283 /// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 284 /// let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string())); 285 /// assert!(estimate > 0.0); 286 /// ``` 287 pub fn process_stream<I>(&mut self, iter: I) -> f64 288 where 289 I: IntoIterator<Item = T>, 290 { 291 for item in iter { 292 self.process_element(item); 293 } 294 self.calculate_final_result() 295 } 296 297 /// Calculate the current unique element count. You can continue to add elements after calling this method. 298 pub fn calculate_final_result(&self) -> f64 { 299 self.buf.len() as f64 / self.probability 300 } 301} 302 303/// Extension trait for iterators to estimate distinct count directly 304/// 305/// This trait provides convenient methods to estimate distinct counts from iterators 306/// without manually creating and managing a CVM instance. 307/// 308/// # Examples 309/// 310/// ``` 311/// use cvmcount::{CVM, EstimateDistinct}; 312/// 313/// // Simple usage with default parameters 314/// let numbers = vec![1, 2, 3, 2, 1, 4, 5]; 315/// let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000); 316/// assert!(estimate > 0.0); 317/// 318/// // Using builder pattern for more control 319/// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()]; 320/// let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99); 321/// let estimate = words.into_iter().estimate_distinct_with_builder(builder).unwrap(); 322/// assert!(estimate > 0.0); 323/// ``` 324pub trait EstimateDistinct<T: Ord>: Iterator<Item = T> + Sized { 325 /// Estimate distinct count using the CVM algorithm with specified parameters 326 /// 327 /// # Arguments 328 /// 329 /// * `epsilon` - Accuracy requirement (smaller = more accurate) 330 /// * `delta` - Failure probability (smaller = more confident) 331 /// * `estimated_size` - Rough estimate of total stream size 332 /// 333 /// # Returns 334 /// 335 /// The estimated number of distinct elements 336 fn estimate_distinct_count(self, epsilon: f64, delta: f64, estimated_size: usize) -> f64 { 337 let mut cvm = CVM::new(epsilon, delta, estimated_size); 338 cvm.process_stream(self) 339 } 340 341 /// Estimate distinct count using a builder for more ergonomic configuration 342 /// 343 /// # Arguments 344 /// 345 /// * `builder` - A configured CVMBuilder instance 346 /// 347 /// # Returns 348 /// 349 /// Result containing the estimated number of distinct elements or an error message 350 /// 351 /// # Examples 352 /// 353 /// ``` 354 /// use cvmcount::{CVM, EstimateDistinct}; 355 /// 356 /// let data = vec![1, 2, 3, 2, 1]; 357 /// let builder = CVM::<i32>::builder().epsilon(0.05).confidence(0.99); 358 /// let estimate = data.into_iter().estimate_distinct_with_builder(builder).unwrap(); 359 /// assert!(estimate > 0.0); 360 /// ``` 361 fn estimate_distinct_with_builder(self, builder: CVMBuilder) -> Result<f64, String> { 362 let mut cvm: CVM<T> = builder.build()?; 363 Ok(cvm.process_stream(self)) 364 } 365} 366 367/// Implement EstimateDistinct for all iterators that yield Ord types 368impl<T: Ord, I: Iterator<Item = T>> EstimateDistinct<T> for I {} 369 370// Calculate threshold (buf_size) value for the F0-Estimator algorithm 371fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize { 372 ((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize 373} 374 375#[cfg(test)] 376mod tests { 377 use std::{ 378 fs::File, 379 io::{BufRead, BufReader}, 380 path::Path, 381 }; 382 383 use super::{CVM, ConfidenceSpec, EstimateDistinct}; 384 use regex::Regex; 385 use std::collections::HashSet; 386 387 fn open_file<P>(filename: P) -> BufReader<File> 388 where 389 P: AsRef<Path>, 390 { 391 let f = File::open(filename).expect("Couldn't read from file"); 392 BufReader::new(f) 393 } 394 395 fn line_to_word(re: &Regex, hs: &mut HashSet<String>, line: &str) { 396 let words = line.split(' '); 397 words.for_each(|word| { 398 let clean_word = re.replace_all(word, "").to_lowercase(); 399 hs.insert(clean_word); 400 }) 401 } 402 #[test] 403 fn actual() { 404 let input_file = "benches/kiy.txt"; 405 let re = Regex::new(r"[^\w\s]").unwrap(); 406 let br = open_file(input_file); 407 let mut hs = HashSet::new(); 408 br.lines() 409 .for_each(|line| line_to_word(&re, &mut hs, &line.unwrap())); 410 assert_eq!(hs.len(), 9016) 411 } 412 413 #[test] 414 fn test_builder_defaults() { 415 let cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 416 // Verify that it's properly constructed with defaults 417 assert_eq!(cvm.calculate_final_result(), 0.0); // Empty buffer 418 } 419 420 #[test] 421 fn test_builder_custom_params() { 422 let cvm: CVM<i32> = CVM::<i32>::builder() 423 .epsilon(0.05) 424 .confidence(0.99) 425 .estimated_size(5000) 426 .build() 427 .unwrap(); 428 429 // Test that it works by processing some elements 430 let mut cvm = cvm; 431 for i in 0..100 { 432 cvm.process_element(i); 433 } 434 let result = cvm.calculate_final_result(); 435 assert!(result > 0.0); 436 } 437 438 #[test] 439 fn test_builder_delta_vs_confidence() { 440 // Test that confidence and delta give equivalent results 441 let cvm1: CVM<i32> = CVM::<i32>::builder().confidence(0.9).build().unwrap(); 442 443 let cvm2: CVM<i32> = CVM::<i32>::builder().delta(0.1).build().unwrap(); 444 445 // They should have the same internal configuration 446 // (we can't directly test this without exposing internals, 447 // but we can test they both work) 448 assert_eq!(cvm1.calculate_final_result(), 0.0); 449 assert_eq!(cvm2.calculate_final_result(), 0.0); 450 } 451 452 #[test] 453 fn test_builder_last_wins() { 454 // Test that the last confidence/delta setting wins 455 let cvm: CVM<i32> = CVM::<i32>::builder() 456 .confidence(0.9) 457 .delta(0.05) // This should override confidence 458 .build() 459 .unwrap(); 460 461 assert_eq!(cvm.calculate_final_result(), 0.0); 462 } 463 464 #[test] 465 fn test_builder_validation() { 466 // Test epsilon validation 467 let result = CVM::<i32>::builder().epsilon(0.0).build::<i32>(); 468 assert!(result.is_err()); 469 470 let result = CVM::<i32>::builder().epsilon(1.0).build::<i32>(); 471 assert!(result.is_err()); 472 473 let result = CVM::<i32>::builder().epsilon(-0.5).build::<i32>(); 474 assert!(result.is_err()); 475 476 // Test confidence validation 477 let result = CVM::<i32>::builder().confidence(0.0).build::<i32>(); 478 assert!(result.is_err()); 479 480 let result = CVM::<i32>::builder().confidence(1.0).build::<i32>(); 481 assert!(result.is_err()); 482 483 // Test delta validation 484 let result = CVM::<i32>::builder().delta(0.0).build::<i32>(); 485 assert!(result.is_err()); 486 487 let result = CVM::<i32>::builder().delta(1.0).build::<i32>(); 488 assert!(result.is_err()); 489 490 // Test stream size validation 491 let result = CVM::<i32>::builder().estimated_size(0).build::<i32>(); 492 assert!(result.is_err()); 493 } 494 495 #[test] 496 fn test_builder_method_chaining() { 497 let result = CVM::<String>::builder() 498 .epsilon(0.1) 499 .confidence(0.95) 500 .estimated_size(2000) 501 .build::<String>(); 502 503 assert!(result.is_ok()); 504 } 505 506 #[test] 507 fn test_confidence_spec_conversion() { 508 // Test ConfidenceSpec::to_delta conversion 509 let confidence_spec = ConfidenceSpec::Confidence(0.9); 510 assert!((confidence_spec.to_delta() - 0.1).abs() < f64::EPSILON); 511 512 let delta_spec = ConfidenceSpec::Delta(0.05); 513 assert!((delta_spec.to_delta() - 0.05).abs() < f64::EPSILON); 514 } 515 516 #[test] 517 fn test_process_stream() { 518 let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap(); 519 520 // Test with vector 521 let numbers = vec![1, 2, 3, 2, 1, 4, 5, 3]; 522 let estimate = cvm.process_stream(numbers); 523 assert!(estimate > 0.0); 524 525 // Test with range 526 let mut cvm2: CVM<i32> = CVM::<i32>::builder().build().unwrap(); 527 let estimate2 = cvm2.process_stream(1..=100); 528 assert!(estimate2 > 0.0); 529 } 530 531 #[test] 532 fn test_process_stream_strings() { 533 let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 534 535 // Test with owned strings 536 let words = vec![ 537 "hello".to_string(), 538 "world".to_string(), 539 "hello".to_string(), 540 "rust".to_string(), 541 ]; 542 let estimate = cvm.process_stream(words); 543 assert!(estimate > 0.0); 544 } 545 546 #[test] 547 fn test_process_stream_with_map() { 548 let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 549 550 // Test with borrowed data mapped to owned 551 let borrowed_words = ["hello", "world", "hello", "rust"]; 552 let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string())); 553 assert!(estimate > 0.0); 554 } 555 556 #[test] 557 fn test_estimate_distinct_trait() { 558 // Test simple usage 559 let numbers = vec![1, 2, 3, 2, 1, 4, 5]; 560 let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000); 561 assert!(estimate > 0.0); 562 563 // Test with builder 564 let words = vec![ 565 "hello".to_string(), 566 "world".to_string(), 567 "hello".to_string(), 568 ]; 569 let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99); 570 let estimate = words 571 .into_iter() 572 .estimate_distinct_with_builder(builder) 573 .unwrap(); 574 assert!(estimate > 0.0); 575 } 576 577 #[test] 578 fn test_estimate_distinct_with_cloning() { 579 // Test that explicit cloning works as expected 580 let borrowed_numbers = [1, 2, 3, 2, 1, 4]; 581 let estimate = borrowed_numbers 582 .iter() 583 .cloned() 584 .estimate_distinct_count(0.1, 0.1, 100); 585 assert!(estimate > 0.0); 586 } 587 588 #[test] 589 fn test_streaming_integration_with_file_processing() { 590 // Simulate file processing pattern 591 let lines = vec![ 592 "hello world".to_string(), 593 "world peace".to_string(), 594 "hello rust".to_string(), 595 ]; 596 597 let mut cvm: CVM<String> = CVM::<String>::builder() 598 .epsilon(0.1) 599 .confidence(0.9) 600 .build() 601 .unwrap(); 602 603 // Process words from all lines 604 let words: Vec<String> = lines 605 .into_iter() 606 .flat_map(|line| { 607 line.split_whitespace() 608 .map(|s| s.to_string()) 609 .collect::<Vec<_>>() 610 }) 611 .collect(); 612 let estimate = cvm.process_stream(words); 613 614 assert!(estimate > 0.0); 615 } 616 617 #[test] 618 fn test_streaming_large_dataset() { 619 // Test with a larger dataset to verify the algorithm works 620 let mut cvm: CVM<i32> = CVM::<i32>::builder() 621 .epsilon(0.1) 622 .confidence(0.9) 623 .estimated_size(10_000) 624 .build() 625 .unwrap(); 626 627 // Create data with known distinct count (1000 unique values, repeated) 628 let data: Vec<i32> = (0..1000).cycle().take(10_000).collect(); 629 let estimate = cvm.process_stream(data); 630 631 // The estimate should be reasonably close to 1000 632 // With epsilon=0.1, we expect within 10 % accuracy most of the time 633 assert!(estimate > 500.0 && estimate < 2000.0); 634 } 635}