Rust implementation of the CVM algorithm for counting distinct elements in a stream
0

Configure Feed

Select the types of activity you want to include in your feed.

Make CVM more flexible

Count anything that impls PartialOrd and PartialEQ

+45 -34
+1 -1
Cargo.toml
··· 5 5 license = "MIT OR Apache-2.0" 6 6 repository = "https://github.com/urschrei/cvmcount" 7 7 8 - version = "0.1.4" 8 + version = "0.1.5" 9 9 edition = "2021" 10 10 11 11 [dependencies]
+8 -1
README.md
··· 9 9 ## What does that mean 10 10 The count-distinct problem, or cardinality-estimation problem refers to counting the number of distinct elements in a data stream with repeated elements. As a concrete example, imagine that you want to count the unique words in a book. If you have enough memory, you can keep track of every unique element you encounter. However, you may not have enough working memory due to resource constraints, or the number of potential elements may be enormous. This constraint is referred to as the bounded-storage constraint in the literature. 11 11 12 + In order to overcome this constraint, streaming algorithms have been developed: [Flajolet-Martin](https://en.wikipedia.org/wiki/Flajolet–Martin_algorithm), LogLog, [HyperLogLog](https://en.wikipedia.org/wiki/HyperLogLog). The algorithm implemented by this library is an improvement on these in one particular sense: it is extremely simple. Instead of hashing, it uses a sampling method to compute an [unbiased estimate](https://www.statlect.com/glossary/unbiased-estimator#:~:text=An%20estimator%20of%20a%20given,Examples) of the cardinality. 12 13 13 - In order to overcome this constraint, streaming algorithms have been developed: [Flajolet-Martin](https://en.wikipedia.org/wiki/Flajolet–Martin_algorithm), LogLog, [HyperLogLog](https://en.wikipedia.org/wiki/HyperLogLog). The algorithm implemented by this library is an improvement on these in one particular sense: it is extremely simple. Instead of hashing, it uses a sampling method to compute an [unbiased estimate](https://www.statlect.com/glossary/unbiased-estimator#:~:text=An%20estimator%20of%20a%20given,Examples) of the cardinality. 14 + # What is an Element 15 + In this implementation, an element is anything implementing the [`PartialOrd`](https://doc.rust-lang.org/std/cmp/trait.PartialOrd.html) and [`PartialEQ`](https://doc.rust-lang.org/std/cmp/trait.PartialEq.html) traits: various integer flavours, strings, any Struct on which you have implemented the traits. Not `f32` / `f64`, however. 16 + 17 + You will also note that I didn't mention `&str`: that's because the buffer has to keep ownership of its elements. In practice, this is not a problem: relative to its input stream size, the buffer is very small. 14 18 15 19 ## Further Details 16 20 Don Knuth has written about the algorithm (he refers to it as **Algorithm D**) at https://cs.stanford.edu/~knuth/papers/cvm-note.pdf, and does a far better job than I do at explaining it. You will note that on p1 he describes the buffer he uses as a data structure called a [treap](https://en.wikipedia.org/wiki/Treap#:~:text=7%20External%20links-,Description,(randomly%20chosen)%20numeric%20priority.) ··· 19 23 This implementation doesn't use a treap as a buffer; it uses a Vec and performs a binary search during step **D4**. Note in particular his modification of step **D6** on p5: **D6'**: halving the buffer. 20 24 21 25 I may switch to a treap implementation eventually; for many practical applications a binary search is considerably faster than the hashing algorithms under consideration. If your application assumes a buffer containing 100k+ elements, you may wish to consider using a treap. 26 + 27 + # What does this library provide 28 + Two things: the crate / library, and a command-line utility (`cvmcount`) which will count the unique strings in an input text file. 22 29 23 30 # Installation 24 31 Binaries and installation instructions are available for x64 Linux, Apple Silicon and Intel, and x64 Windows in [releases](https://github.com/urschrei/cvmcount/releases)
+22 -28
src/lib.rs
··· 1 - //! An implementation of the CVM fast token counting algorithm presented in 1 + //! An implementation of the CVM fast element counting algorithm presented in 2 2 //! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. https://doi.org/10.4230/LIPIcs.ESA.2022.34 3 3 4 4 use rand::rngs::ThreadRng; 5 5 use rand::Rng; 6 - use regex::Regex; 7 6 8 - pub struct CVM { 7 + pub struct CVM<T: PartialOrd + PartialEq> { 9 8 buf_size: usize, 10 - buf: Vec<String>, 9 + buf: Vec<T>, 11 10 probability: f64, 12 11 rng: ThreadRng, 13 - re: Regex, 14 12 } 15 - impl CVM { 13 + /// A counter implementing the CVM algorithm 14 + impl<T: PartialOrd + PartialEq> CVM<T> { 16 15 /// Initialise the algorithm 17 16 /// 18 17 /// epsilon: how close you want your estimate to be to the true number of distinct elements. ··· 21 20 /// An epsilon of 0.8 is a good starting point for most applications. 22 21 /// 23 22 /// delta: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence 24 - /// (e.g., 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g., 90 %) means there's a 23 + /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a 25 24 /// higher chance the estimate might be outside the desired range. 26 25 /// A delta of 0.1 is a good starting point for most applications. 27 26 /// 28 27 /// stream_size: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size, 29 - /// the more accurate the results 28 + /// the more accurate the result will be 30 29 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self { 31 30 let bufsize = buffer_size(epsilon, delta, stream_size); 32 31 Self { ··· 34 33 buf: Vec::with_capacity(bufsize), 35 34 probability: 1.0, 36 35 rng: rand::thread_rng(), 37 - re: Regex::new(r"[^\w\s]").unwrap(), 38 36 } 39 37 } 40 - /// Count tokens, given a string containing words, e.g. a line of a book 41 - pub fn process_line_tokens(&mut self, line: String) { 42 - let words = line.split(' '); 43 - for word in words { 44 - let clean_word = self.re.replace_all(word, "").to_lowercase(); 45 - // binary search should be pretty fast 46 - // I think this will be faster than a hashset for practical sizes 47 - // but I need some empirical data for this 48 - if let Some(pos) = self.buf.iter().position(|x| *x == clean_word) { 49 - self.buf.swap_remove(pos); 50 - } 51 - if self.rng.gen_bool(self.probability) { 52 - self.buf.push(clean_word); 53 - } 54 - while self.buf.len() == self.buf_size { 55 - self.clear_about_half(); 56 - self.probability /= 2.0; 57 - } 38 + /// Count elements, updating the current unique count 39 + pub fn process_element(&mut self, elem: T) { 40 + // binary search should be pretty fast 41 + // I think this will be faster than a hashset for practical sizes 42 + // but I need some empirical data for this 43 + if let Some(pos) = self.buf.iter().position(|x| *x == elem) { 44 + self.buf.swap_remove(pos); 45 + } 46 + if self.rng.gen_bool(self.probability) { 47 + self.buf.push(elem); 48 + } 49 + while self.buf.len() == self.buf_size { 50 + self.clear_about_half(); 51 + self.probability /= 2.0; 58 52 } 59 53 } 60 54 // remove around half of the elements at random 61 55 fn clear_about_half(&mut self) { 62 56 self.buf.retain(|_| self.rng.gen_bool(0.5)); 63 57 } 64 - /// Calculate the final token count 58 + /// Calculate the current unique element count. You can continue to add elements after calling this method. 65 59 pub fn calculate_final_result(&self) -> f64 { 66 60 self.buf.len() as f64 / self.probability 67 61 }
+14 -4
src/main.rs
··· 1 1 use clap::{arg, crate_version, value_parser, Command}; 2 + use regex::Regex; 2 3 use std::fs::File; 3 4 use std::io::BufRead; 4 5 use std::io::BufReader; ··· 13 14 { 14 15 let f = File::open(filename).expect("Couldn't read from file"); 15 16 BufReader::new(f) 17 + } 18 + 19 + fn line_to_word(re: &Regex, cvm: &mut CVM<String>, line: &str) { 20 + let words = line.split(' '); 21 + words.for_each(|word| { 22 + let clean_word = re.replace_all(word, "").to_lowercase(); 23 + cvm.process_element(clean_word) 24 + }) 16 25 } 17 26 18 27 fn main() { ··· 35 44 .required(true) 36 45 .value_parser(value_parser!(usize))) 37 46 .get_matches(); 47 + 38 48 let input_file = params.get_one::<PathBuf>("tokens").unwrap(); 39 49 let epsilon = params.get_one::<f64>("epsilon").unwrap(); 40 50 let delta = params.get_one::<f64>("delta").unwrap(); 41 51 let stream_size = params.get_one::<usize>("streamsize").unwrap(); 52 + let mut counter: CVM<String> = CVM::new(*epsilon, *delta, *stream_size); 53 + let re = Regex::new(r"[^\w\s]").unwrap(); 42 54 43 - let mut counter = CVM::new(*epsilon, *delta, *stream_size); 44 55 let br = open_file(input_file); 45 - for line in br.lines() { 46 - counter.process_line_tokens(line.unwrap()) 47 - } 56 + br.lines() 57 + .for_each(|line| line_to_word(&re, &mut counter, &line.unwrap())); 48 58 println!( 49 59 "Unique tokens: {:?}", 50 60 counter.calculate_final_result() as i32