Rust implementation of the CVM algorithm for counting distinct elements in a stream
1//! An implementation of the CVM fast element counting algorithm presented in
2//! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. <https://doi.org/10.4230/LIPIcs.ESA.2022.34>
3//!
4//! This implementation uses a treap data structure as the buffer, following Knuth's original design.
5
6mod treap;
7
8use crate::treap::Treap;
9use rand::rngs::StdRng;
10use rand::{Rng, SeedableRng};
11
12/// Specification for confidence level in the CVM algorithm
13#[derive(Debug, Clone, Copy)]
14pub enum ConfidenceSpec {
15 /// Specify delta directly (probability of failure)
16 Delta(f64),
17 /// Specify confidence level (probability of success)
18 Confidence(f64),
19}
20
21impl ConfidenceSpec {
22 /// Convert to delta value for internal use
23 fn to_delta(self) -> f64 {
24 match self {
25 ConfidenceSpec::Delta(delta) => delta,
26 ConfidenceSpec::Confidence(confidence) => 1.0 - confidence,
27 }
28 }
29
30 /// Validate the confidence specification
31 fn validate(self) -> Result<Self, String> {
32 match self {
33 ConfidenceSpec::Delta(delta) => {
34 if delta <= 0.0 || delta >= 1.0 {
35 Err("Delta must be between 0.0 and 1.0 (exclusive)".to_string())
36 } else {
37 Ok(self)
38 }
39 }
40 ConfidenceSpec::Confidence(confidence) => {
41 if confidence <= 0.0 || confidence >= 1.0 {
42 Err("Confidence must be between 0.0 and 1.0 (exclusive)".to_string())
43 } else {
44 Ok(self)
45 }
46 }
47 }
48 }
49}
50
51/// Builder for constructing CVM instances with validation and defaults
52///
53/// # Examples
54///
55/// ```
56/// use cvmcount::CVM;
57///
58/// // Using defaults (`epsilon=0.8`, `confidence=0.9`, `size=1000`)
59/// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
60///
61/// // Custom parameters
62/// let cvm: CVM<i32> = CVM::<i32>::builder()
63/// .epsilon(0.05) // 5 % accuracy
64/// .confidence(0.99) // 99 % confidence
65/// .estimated_size(10_000)
66/// .build()
67/// .unwrap();
68///
69/// // Using delta instead of confidence
70/// let cvm: CVM<String> = CVM::<String>::builder()
71/// .epsilon(0.1)
72/// .delta(0.01) // 1 % failure probability
73/// .build()
74/// .unwrap();
75/// ```
76#[derive(Debug, Clone, Default)]
77pub struct CVMBuilder {
78 epsilon: Option<f64>,
79 confidence_spec: Option<ConfidenceSpec>,
80 stream_size: Option<usize>,
81}
82
83impl CVMBuilder {
84 /// Create a new builder with default values
85 pub fn new() -> Self {
86 Self::default()
87 }
88
89 /// Set the epsilon parameter (accuracy requirement)
90 ///
91 /// `Epsilon` determines how close you want your estimate to be to the true number
92 /// of distinct elements. A smaller `ε` means you require a more precise estimate.
93 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the
94 /// actual value.
95 ///
96 /// Must be between 0.0 and 1.0 (exclusive).
97 pub fn epsilon(mut self, epsilon: f64) -> Self {
98 self.epsilon = Some(epsilon);
99 self
100 }
101
102 /// Set the confidence level (probability that the estimate will be accurate)
103 ///
104 /// Confidence represents how certain you want to be that the algorithm's
105 /// estimate will fall within the desired accuracy range. For example,
106 /// `confidence = 0.99` means you're 99 % sure the estimate will be accurate.
107 ///
108 /// Must be between 0.0 and 1.0 (exclusive).
109 /// Cannot be used together with [`Self::delta`] – the last one called will be used.
110 pub fn confidence(mut self, confidence: f64) -> Self {
111 self.confidence_spec = Some(ConfidenceSpec::Confidence(confidence));
112 self
113 }
114
115 /// Set the delta parameter (probability of failure)
116 ///
117 /// Delta represents the probability that the algorithm's estimate will fall
118 /// outside the desired accuracy range. For example, `delta = 0.01` means there's
119 /// a 1 % chance the estimate will be inaccurate.
120 ///
121 /// Must be between 0.0 and 1.0 (exclusive).
122 /// Cannot be used together with [`Self::confidence()`] – the last one called will be used.
123 pub fn delta(mut self, delta: f64) -> Self {
124 self.confidence_spec = Some(ConfidenceSpec::Delta(delta));
125 self
126 }
127
128 /// Set the estimated stream size
129 ///
130 /// This is used to determine buffer size and can be a loose approximation.
131 /// The closer it is to the actual stream size, the more accurate the results
132 /// will be.
133 pub fn estimated_size(mut self, size: usize) -> Self {
134 self.stream_size = Some(size);
135 self
136 }
137
138 /// Build the CVM instance with validation
139 ///
140 /// Uses the following defaults if not specified:
141 /// - `epsilon: 0.8` (good starting point for most applications)
142 /// - `confidence: 0.9` (90 % confidence, equivalent to delta = 0.1)
143 /// - `estimated_size: 1000`
144 ///
145 /// Returns an error if any parameters are invalid.
146 pub fn build<T: Ord>(self) -> Result<CVM<T>, String> {
147 // Validate and get epsilon
148 let epsilon = self.epsilon.unwrap_or(0.8);
149 if epsilon <= 0.0 || epsilon >= 1.0 {
150 return Err("Epsilon must be between 0.0 and 1.0 (exclusive)".to_string());
151 }
152
153 // Validate and get delta
154 let confidence_spec = self
155 .confidence_spec
156 .unwrap_or(ConfidenceSpec::Confidence(0.9));
157 let validated_spec = confidence_spec.validate()?;
158 let delta = validated_spec.to_delta();
159
160 // Validate and get stream size
161 let stream_size = self.stream_size.unwrap_or(1000);
162 if stream_size == 0 {
163 return Err("Stream size must be greater than 0".to_string());
164 }
165
166 Ok(CVM::new(epsilon, delta, stream_size))
167 }
168}
169
170/// A counter implementing the CVM algorithm
171///
172/// This implementation uses a treap (randomized binary search tree) as the buffer,
173/// which provides `O(log n)` operations while maintaining the probabilistic properties
174/// needed for the algorithm.
175///
176/// Note that the CVM struct's buffer takes ownership of its elements.
177pub struct CVM<T: Ord> {
178 buf_size: usize,
179 buf: Treap<T>,
180 probability: f64,
181 rng: StdRng,
182}
183
184impl<T: Ord> CVM<T> {
185 /// Create a new builder for constructing CVM instances
186 ///
187 /// The builder provides a more ergonomic way to construct CVM instances with
188 /// validation and sensible defaults.
189 ///
190 /// # Examples
191 ///
192 /// ```
193 /// use cvmcount::CVM;
194 ///
195 /// // Using defaults
196 /// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
197 ///
198 /// // Custom configuration
199 /// let cvm: CVM<i32> = CVM::<i32>::builder()
200 /// .epsilon(0.05)
201 /// .confidence(0.99)
202 /// .estimated_size(10_000)
203 /// .build()
204 /// .unwrap();
205 /// ```
206 pub fn builder() -> CVMBuilder {
207 CVMBuilder::new()
208 }
209
210 /// Initialise the algorithm
211 ///
212 /// `epsilon`: how close you want your estimate to be to the true number of distinct elements.
213 /// A smaller `ε` means you require a more precise estimate.
214 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the actual value.
215 /// An epsilon of `0.8` is a good starting point for most applications.
216 ///
217 /// `delta`: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence
218 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a
219 /// higher chance the estimate might be outside the desired range.
220 /// A `delta` of `0.1` is a good starting point for most applications.
221 ///
222 /// `stream_size`: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size,
223 /// the more accurate the result will be.
224 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self {
225 let bufsize = buffer_size(epsilon, delta, stream_size);
226 Self {
227 buf_size: bufsize,
228 buf: Treap::new(),
229 probability: 1.0,
230 rng: StdRng::from_entropy(),
231 }
232 }
233 /// Add an element, potentially updating the unique element count
234 pub fn process_element(&mut self, elem: T) {
235 // The algorithm works as follows:
236 // 1. If element exists in buffer, remove it (this ensures proper sampling)
237 // 2. Add element back with current probability
238 // 3. If buffer is full, remove ~half the elements and halve the probability
239 // This creates a geometric sampling scheme that provides an unbiased estimate
240 if self.buf.contains(&elem) {
241 self.buf.remove(&elem);
242 }
243 if self.rng.gen_bool(self.probability) {
244 self.buf.insert(elem, &mut self.rng);
245 }
246 while self.buf.len() == self.buf_size {
247 self.clear_about_half();
248 self.probability /= 2.0;
249 }
250 }
251 // remove around half of the elements at random
252 fn clear_about_half(&mut self) {
253 // Need to capture rng reference to use in closure
254 let rng = &mut self.rng;
255 self.buf.retain(|_| rng.gen_bool(0.5));
256 }
257 /// Calculate the current unique element count. You can continue to add elements after calling this method.
258 pub fn calculate_final_result(&self) -> f64 {
259 self.buf.len() as f64 / self.probability
260 }
261}
262
263// Calculate threshold (buf_size) value for the F0-Estimator algorithm
264fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize {
265 ((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize
266}
267
268#[cfg(test)]
269mod tests {
270 use std::{
271 fs::File,
272 io::{BufRead, BufReader},
273 path::Path,
274 };
275
276 use super::{CVM, ConfidenceSpec};
277 use regex::Regex;
278 use std::collections::HashSet;
279
280 fn open_file<P>(filename: P) -> BufReader<File>
281 where
282 P: AsRef<Path>,
283 {
284 let f = File::open(filename).expect("Couldn't read from file");
285 BufReader::new(f)
286 }
287
288 fn line_to_word(re: &Regex, hs: &mut HashSet<String>, line: &str) {
289 let words = line.split(' ');
290 words.for_each(|word| {
291 let clean_word = re.replace_all(word, "").to_lowercase();
292 hs.insert(clean_word);
293 })
294 }
295 #[test]
296 fn actual() {
297 let input_file = "benches/kiy.txt";
298 let re = Regex::new(r"[^\w\s]").unwrap();
299 let br = open_file(input_file);
300 let mut hs = HashSet::new();
301 br.lines()
302 .for_each(|line| line_to_word(&re, &mut hs, &line.unwrap()));
303 assert_eq!(hs.len(), 9016)
304 }
305
306 #[test]
307 fn test_builder_defaults() {
308 let cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
309 // Verify that it's properly constructed with defaults
310 assert_eq!(cvm.calculate_final_result(), 0.0); // Empty buffer
311 }
312
313 #[test]
314 fn test_builder_custom_params() {
315 let cvm: CVM<i32> = CVM::<i32>::builder()
316 .epsilon(0.05)
317 .confidence(0.99)
318 .estimated_size(5000)
319 .build()
320 .unwrap();
321
322 // Test that it works by processing some elements
323 let mut cvm = cvm;
324 for i in 0..100 {
325 cvm.process_element(i);
326 }
327 let result = cvm.calculate_final_result();
328 assert!(result > 0.0);
329 }
330
331 #[test]
332 fn test_builder_delta_vs_confidence() {
333 // Test that confidence and delta give equivalent results
334 let cvm1: CVM<i32> = CVM::<i32>::builder().confidence(0.9).build().unwrap();
335
336 let cvm2: CVM<i32> = CVM::<i32>::builder().delta(0.1).build().unwrap();
337
338 // They should have the same internal configuration
339 // (we can't directly test this without exposing internals,
340 // but we can test they both work)
341 assert_eq!(cvm1.calculate_final_result(), 0.0);
342 assert_eq!(cvm2.calculate_final_result(), 0.0);
343 }
344
345 #[test]
346 fn test_builder_last_wins() {
347 // Test that the last confidence/delta setting wins
348 let cvm: CVM<i32> = CVM::<i32>::builder()
349 .confidence(0.9)
350 .delta(0.05) // This should override confidence
351 .build()
352 .unwrap();
353
354 assert_eq!(cvm.calculate_final_result(), 0.0);
355 }
356
357 #[test]
358 fn test_builder_validation() {
359 // Test epsilon validation
360 let result = CVM::<i32>::builder().epsilon(0.0).build::<i32>();
361 assert!(result.is_err());
362
363 let result = CVM::<i32>::builder().epsilon(1.0).build::<i32>();
364 assert!(result.is_err());
365
366 let result = CVM::<i32>::builder().epsilon(-0.5).build::<i32>();
367 assert!(result.is_err());
368
369 // Test confidence validation
370 let result = CVM::<i32>::builder().confidence(0.0).build::<i32>();
371 assert!(result.is_err());
372
373 let result = CVM::<i32>::builder().confidence(1.0).build::<i32>();
374 assert!(result.is_err());
375
376 // Test delta validation
377 let result = CVM::<i32>::builder().delta(0.0).build::<i32>();
378 assert!(result.is_err());
379
380 let result = CVM::<i32>::builder().delta(1.0).build::<i32>();
381 assert!(result.is_err());
382
383 // Test stream size validation
384 let result = CVM::<i32>::builder().estimated_size(0).build::<i32>();
385 assert!(result.is_err());
386 }
387
388 #[test]
389 fn test_builder_method_chaining() {
390 let result = CVM::<String>::builder()
391 .epsilon(0.1)
392 .confidence(0.95)
393 .estimated_size(2000)
394 .build::<String>();
395
396 assert!(result.is_ok());
397 }
398
399 #[test]
400 fn test_confidence_spec_conversion() {
401 // Test ConfidenceSpec::to_delta conversion
402 let confidence_spec = ConfidenceSpec::Confidence(0.9);
403 assert!((confidence_spec.to_delta() - 0.1).abs() < f64::EPSILON);
404
405 let delta_spec = ConfidenceSpec::Delta(0.05);
406 assert!((delta_spec.to_delta() - 0.05).abs() < f64::EPSILON);
407 }
408}