From f75ec623b88a96e8013858ed9a2ade5e548769aa Mon Sep 17 00:00:00 2001 From: Chris Mikkelson Date: Mon, 2 Sep 2024 00:28:53 -0500 Subject: [PATCH] Add merge_func and dupsort_func methods to Source --- src/dupsort_func.rs | 142 ++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 + src/merge_func.rs | 118 ++++++++++++++++++++++++++++++++++++ src/source.rs | 20 ++++++- 4 files changed, 281 insertions(+), 1 deletion(-) create mode 100644 src/dupsort_func.rs create mode 100644 src/merge_func.rs diff --git a/src/dupsort_func.rs b/src/dupsort_func.rs new file mode 100644 index 0000000..b0f4ea4 --- /dev/null +++ b/src/dupsort_func.rs @@ -0,0 +1,142 @@ +use crate::{Entry, Iter, Source}; +use std::cmp::Ordering; + +pub struct DupsortFunc +where + S: Source, + F: Fn(&Entry, &Entry) -> Ordering, +{ + source: S, + dupsort_func: F, +} + +impl DupsortFunc +where + S: Source, + F: Fn(&Entry, &Entry) -> Ordering, +{ + pub fn new(source: S, dupsort_func: F) -> Self { + Self { + source, + dupsort_func, + } + } +} + +impl<'a, S, F> Source for &'a DupsortFunc +where + S: Source, + F: Fn(&Entry, &Entry) -> Ordering, +{ + type It = DupsortFuncIter<'a, S::It, F>; + fn iter(&self) -> Self::It { + Self::It { + run: Vec::new(), + next: None, + iter: self.source.iter(), + dupsort_func: &self.dupsort_func, + } + } + type Get = DupsortFuncIter<'a, S::Get, F>; + fn get(&self, key: &[u8]) -> Self::Get { + Self::Get { + run: Vec::new(), + next: None, + iter: self.source.get(key), + dupsort_func: &self.dupsort_func, + } + } + type Prefix = DupsortFuncIter<'a, S::Prefix, F>; + fn get_prefix(&self, prefix: &[u8]) -> Self::Prefix { + Self::Prefix { + run: Vec::new(), + next: None, + iter: self.source.get_prefix(prefix), + dupsort_func: &self.dupsort_func, + } + } + type Range = DupsortFuncIter<'a, S::Range, F>; + fn get_range(&self, start: &[u8], end: &[u8]) -> Self::Range { + Self::Range { + run: Vec::new(), + next: None, + iter: self.source.get_range(start, end), + dupsort_func: &self.dupsort_func, + } + } +} + +#[derive(Debug)] +pub struct DupsortFuncIter<'a, I, F> +where + I: Iter, + F: Fn(&Entry, &Entry) -> Ordering, +{ + run: Vec, + next: Option, + iter: I, + dupsort_func: &'a F, +} + +impl<'a, I, F> Iterator for DupsortFuncIter<'a, I, F> +where + I: Iter, + F: Fn(&Entry, &Entry) -> Ordering, +{ + type Item = Entry; + + fn next(&mut self) -> Option { + self.run.pop().or_else(|| { + self.run + .push(self.next.take().or_else(|| self.iter.next())?); + + // println!("2: {:?} / {:?}", self.next, self.run); + while let Some(e) = self.iter.next() { + if e.key == self.run[0].key { + self.run.push(e); + continue; + } + self.next.replace(e); + break; + } + // sort in reverse order, so self.run.pop() can be the usual + // return value. + self.run.sort_unstable_by(|a, b| (self.dupsort_func)(b, a)); + self.run.pop() + }) + } +} + +impl<'a, I, F> Iter for DupsortFuncIter<'a, I, F> +where + I: Iter, + F: Fn(&Entry, &Entry) -> Ordering, +{ + fn seek(&mut self, key: &[u8]) { + self.run.clear(); + self.next.take(); + self.iter.seek(key); + } +} + +#[test] +fn test_dupsort() { + use crate::source::test_source::TestSource; + + let ts = TestSource( + (1u8..10) + .map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j]))) + .flatten() + .collect(), + ); + + let s = ts.dupsort_func(|a, b| a.value[0].cmp(&b.value[0])); + + assert_eq!( + Vec::from_iter((&s).iter()), + (1u8..10) + .map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j]))) + .flatten() + .collect::>() + ); +} diff --git a/src/lib.rs b/src/lib.rs index 4e70a99..2d4c198 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,8 @@ mod compression; +mod dupsort_func; mod entry; mod iter; +mod merge_func; mod merger; pub mod reader; pub mod source; diff --git a/src/merge_func.rs b/src/merge_func.rs new file mode 100644 index 0000000..781ddb6 --- /dev/null +++ b/src/merge_func.rs @@ -0,0 +1,118 @@ +use crate::{Entry, Iter, Source}; + +pub struct MergeFunc { + source: S, + merge_func: F, +} + +impl MergeFunc +where + S: Source, + F: Fn(&mut Entry, &Entry), +{ + pub fn new(source: S, merge_func: F) -> Self { + Self { source, merge_func } + } +} + +impl<'a, S, F> Source for &'a MergeFunc +where + S: Source, + F: Fn(&mut Entry, &Entry) + Clone, +{ + type It = MergeFuncIter<'a, S::It, F>; + fn iter(&self) -> Self::It { + Self::It { + prev: None, + iter: self.source.iter(), + merge_func: &self.merge_func, + } + } + type Get = MergeFuncIter<'a, S::Get, F>; + fn get(&self, key: &[u8]) -> Self::Get { + MergeFuncIter { + prev: None, + iter: self.source.get(key), + merge_func: &self.merge_func, + } + } + type Prefix = MergeFuncIter<'a, S::Prefix, F>; + fn get_prefix(&self, prefix: &[u8]) -> Self::Prefix { + MergeFuncIter { + prev: None, + iter: self.source.get_prefix(prefix), + merge_func: &self.merge_func, + } + } + type Range = MergeFuncIter<'a, S::Range, F>; + fn get_range(&self, start: &[u8], end: &[u8]) -> Self::Range { + MergeFuncIter { + prev: None, + iter: self.source.get_range(start, end), + merge_func: &self.merge_func, + } + } +} + +pub struct MergeFuncIter<'a, I: Iter, F: Fn(&mut Entry, &Entry)> { + prev: Option, + iter: I, + merge_func: &'a F, +} + +impl<'a, I, F> Iterator for MergeFuncIter<'a, I, F> +where + I: Iter, + F: Fn(&mut Entry, &Entry), +{ + type Item = Entry; + fn next(&mut self) -> Option { + let mut cur = self.prev.take().or_else(|| self.iter.next())?; + while let Some(e) = self.iter.next() { + if e.key == cur.key { + (self.merge_func)(&mut cur, &e); + } else { + self.prev.replace(e); + break; + } + } + Some(cur) + } +} + +impl<'a, I, F> Iter for MergeFuncIter<'a, I, F> +where + I: Iter, + F: Fn(&mut Entry, &Entry), +{ + fn seek(&mut self, key: &[u8]) { + self.prev.take(); + self.iter.seek(key); + } +} + +#[test] +fn test_merge_func() { + use crate::source::test_source::TestSource; + use std::sync::Arc; + + let ts = TestSource( + (1u8..8) + .map(|n| vec![n; n as usize]) + .flatten() + .map(|n| Entry::new(vec![n], vec![1])) + .collect(), + ); + + let s = (&ts).merge_func(|e1, e2| { + let v1 = Arc::make_mut(&mut e1.value); + v1[0] += e2.value[0]; + }); + + assert_eq!( + Vec::from_iter((&s).iter()), + (1u8..8) + .map(|n| Entry::new(vec![n], vec![n])) + .collect::>() + ) +} diff --git a/src/source.rs b/src/source.rs index 4fd83b7..de00675 100644 --- a/src/source.rs +++ b/src/source.rs @@ -1,5 +1,7 @@ +use crate::dupsort_func::DupsortFunc; use crate::iter::{BoxedIter, PrefixIter, RangeIter}; -use crate::Iter; +use crate::merge_func::MergeFunc; +use crate::{Entry, Iter}; use std::marker::PhantomData; pub trait Source { @@ -12,6 +14,22 @@ pub trait Source { fn get(&self, key: &[u8]) -> Self::Get; fn get_prefix(&self, prefix: &[u8]) -> Self::Prefix; fn get_range(&self, start: &[u8], end: &[u8]) -> Self::Range; + + fn merge_func(self, merge_func: F) -> MergeFunc + where + Self: Sized, + F: Fn(&mut Entry, &Entry), + { + MergeFunc::new(self, merge_func) + } + + fn dupsort_func(self, dupsort_func: F) -> DupsortFunc + where + Self: Sized, + F: Fn(&Entry, &Entry) -> std::cmp::Ordering, + { + DupsortFunc::new(self, dupsort_func) + } } pub trait IterSource { -- 2.50.1