Rust implementation of the CVM algorithm for counting distinct elements in a stream
1//! An implementation of the CVM fast element counting algorithm presented in
2//! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. https://doi.org/10.4230/LIPIcs.ESA.2022.34
3
4use rand::rngs::ThreadRng;
5use rand::Rng;
6
7/// A counter implementing the CVM algorithm
8///
9/// Note that the CVM struct's buffer takes ownership of its elements.
10pub struct CVM<T: PartialOrd + PartialEq> {
11 buf_size: usize,
12 buf: Vec<T>,
13 probability: f64,
14 rng: ThreadRng,
15}
16
17impl<T: PartialOrd + PartialEq> CVM<T> {
18 /// Initialise the algorithm
19 ///
20 /// epsilon: how close you want your estimate to be to the true number of distinct elements.
21 /// A smaller ε means you require a more precise estimate.
22 /// For example, ε = 0.05 means you want your estimate to be within 5% of the actual value.
23 /// An epsilon of 0.8 is a good starting point for most applications.
24 ///
25 /// delta: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence
26 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a
27 /// higher chance the estimate might be outside the desired range.
28 /// A delta of 0.1 is a good starting point for most applications.
29 ///
30 /// stream_size: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size,
31 /// the more accurate the result will be.
32 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self {
33 let bufsize = buffer_size(epsilon, delta, stream_size);
34 Self {
35 buf_size: bufsize,
36 buf: Vec::with_capacity(bufsize),
37 probability: 1.0,
38 rng: rand::thread_rng(),
39 }
40 }
41 /// Add an element, potentially updating the unique element count
42 pub fn process_element(&mut self, elem: T) {
43 // linear search
44 // I think this will be faster than a hashset for practical sizes
45 // Should really switch to a treap as per Knuth
46 if let Some(pos) = self.buf.iter().position(|x| *x == elem) {
47 self.buf.swap_remove(pos);
48 }
49 if self.rng.gen_bool(self.probability) {
50 self.buf.push(elem);
51 }
52 while self.buf.len() == self.buf_size {
53 self.clear_about_half();
54 self.probability /= 2.0;
55 }
56 }
57 // remove around half of the elements at random
58 fn clear_about_half(&mut self) {
59 self.buf.retain(|_| self.rng.gen_bool(0.5));
60 }
61 /// Calculate the current unique element count. You can continue to add elements after calling this method.
62 pub fn calculate_final_result(&self) -> f64 {
63 self.buf.len() as f64 / self.probability
64 }
65}
66
67// Calculate threshold (buf_size) value for the F0-Estimator algorithm
68fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize {
69 ((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize
70}