]> git.mikk.net Git - mtbl-rs/commitdiff
Add merge_func and dupsort_func methods to Source
authorChris Mikkelson <cmikk@fsi.io>
Mon, 2 Sep 2024 05:28:53 +0000 (00:28 -0500)
committerChris Mikkelson <cmikk@fsi.io>
Mon, 2 Sep 2024 05:28:53 +0000 (00:28 -0500)
src/dupsort_func.rs [new file with mode: 0644]
src/lib.rs
src/merge_func.rs [new file with mode: 0644]
src/source.rs

diff --git a/src/dupsort_func.rs b/src/dupsort_func.rs
new file mode 100644 (file)
index 0000000..b0f4ea4
--- /dev/null
@@ -0,0 +1,142 @@
+use crate::{Entry, Iter, Source};
+use std::cmp::Ordering;
+
+pub struct DupsortFunc<S, F>
+where
+    S: Source,
+    F: Fn(&Entry, &Entry) -> Ordering,
+{
+    source: S,
+    dupsort_func: F,
+}
+
+impl<S, F> DupsortFunc<S, F>
+where
+    S: Source,
+    F: Fn(&Entry, &Entry) -> Ordering,
+{
+    pub fn new(source: S, dupsort_func: F) -> Self {
+        Self {
+            source,
+            dupsort_func,
+        }
+    }
+}
+
+impl<'a, S, F> Source for &'a DupsortFunc<S, F>
+where
+    S: Source,
+    F: Fn(&Entry, &Entry) -> Ordering,
+{
+    type It = DupsortFuncIter<'a, S::It, F>;
+    fn iter(&self) -> Self::It {
+        Self::It {
+            run: Vec::new(),
+            next: None,
+            iter: self.source.iter(),
+            dupsort_func: &self.dupsort_func,
+        }
+    }
+    type Get = DupsortFuncIter<'a, S::Get, F>;
+    fn get(&self, key: &[u8]) -> Self::Get {
+        Self::Get {
+            run: Vec::new(),
+            next: None,
+            iter: self.source.get(key),
+            dupsort_func: &self.dupsort_func,
+        }
+    }
+    type Prefix = DupsortFuncIter<'a, S::Prefix, F>;
+    fn get_prefix(&self, prefix: &[u8]) -> Self::Prefix {
+        Self::Prefix {
+            run: Vec::new(),
+            next: None,
+            iter: self.source.get_prefix(prefix),
+            dupsort_func: &self.dupsort_func,
+        }
+    }
+    type Range = DupsortFuncIter<'a, S::Range, F>;
+    fn get_range(&self, start: &[u8], end: &[u8]) -> Self::Range {
+        Self::Range {
+            run: Vec::new(),
+            next: None,
+            iter: self.source.get_range(start, end),
+            dupsort_func: &self.dupsort_func,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct DupsortFuncIter<'a, I, F>
+where
+    I: Iter,
+    F: Fn(&Entry, &Entry) -> Ordering,
+{
+    run: Vec<Entry>,
+    next: Option<Entry>,
+    iter: I,
+    dupsort_func: &'a F,
+}
+
+impl<'a, I, F> Iterator for DupsortFuncIter<'a, I, F>
+where
+    I: Iter,
+    F: Fn(&Entry, &Entry) -> Ordering,
+{
+    type Item = Entry;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.run.pop().or_else(|| {
+            self.run
+                .push(self.next.take().or_else(|| self.iter.next())?);
+
+            //   println!("2: {:?} / {:?}", self.next, self.run);
+            while let Some(e) = self.iter.next() {
+                if e.key == self.run[0].key {
+                    self.run.push(e);
+                    continue;
+                }
+                self.next.replace(e);
+                break;
+            }
+            // sort in reverse order, so self.run.pop() can be the usual
+            // return value.
+            self.run.sort_unstable_by(|a, b| (self.dupsort_func)(b, a));
+            self.run.pop()
+        })
+    }
+}
+
+impl<'a, I, F> Iter for DupsortFuncIter<'a, I, F>
+where
+    I: Iter,
+    F: Fn(&Entry, &Entry) -> Ordering,
+{
+    fn seek(&mut self, key: &[u8]) {
+        self.run.clear();
+        self.next.take();
+        self.iter.seek(key);
+    }
+}
+
+#[test]
+fn test_dupsort() {
+    use crate::source::test_source::TestSource;
+
+    let ts = TestSource(
+        (1u8..10)
+            .map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j])))
+            .flatten()
+            .collect(),
+    );
+
+    let s = ts.dupsort_func(|a, b| a.value[0].cmp(&b.value[0]));
+
+    assert_eq!(
+        Vec::from_iter((&s).iter()),
+        (1u8..10)
+            .map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j])))
+            .flatten()
+            .collect::<Vec<_>>()
+    );
+}
index 4e70a990f3058319429749df9d8aed663944bc45..2d4c1985fb06f37357a4fef14dbb4106a9a98422 100644 (file)
@@ -1,6 +1,8 @@
 mod compression;
+mod dupsort_func;
 mod entry;
 mod iter;
+mod merge_func;
 mod merger;
 pub mod reader;
 pub mod source;
diff --git a/src/merge_func.rs b/src/merge_func.rs
new file mode 100644 (file)
index 0000000..781ddb6
--- /dev/null
@@ -0,0 +1,118 @@
+use crate::{Entry, Iter, Source};
+
+pub struct MergeFunc<S: Source, F: Fn(&mut Entry, &Entry)> {
+    source: S,
+    merge_func: F,
+}
+
+impl<S, F> MergeFunc<S, F>
+where
+    S: Source,
+    F: Fn(&mut Entry, &Entry),
+{
+    pub fn new(source: S, merge_func: F) -> Self {
+        Self { source, merge_func }
+    }
+}
+
+impl<'a, S, F> Source for &'a MergeFunc<S, F>
+where
+    S: Source,
+    F: Fn(&mut Entry, &Entry) + Clone,
+{
+    type It = MergeFuncIter<'a, S::It, F>;
+    fn iter(&self) -> Self::It {
+        Self::It {
+            prev: None,
+            iter: self.source.iter(),
+            merge_func: &self.merge_func,
+        }
+    }
+    type Get = MergeFuncIter<'a, S::Get, F>;
+    fn get(&self, key: &[u8]) -> Self::Get {
+        MergeFuncIter {
+            prev: None,
+            iter: self.source.get(key),
+            merge_func: &self.merge_func,
+        }
+    }
+    type Prefix = MergeFuncIter<'a, S::Prefix, F>;
+    fn get_prefix(&self, prefix: &[u8]) -> Self::Prefix {
+        MergeFuncIter {
+            prev: None,
+            iter: self.source.get_prefix(prefix),
+            merge_func: &self.merge_func,
+        }
+    }
+    type Range = MergeFuncIter<'a, S::Range, F>;
+    fn get_range(&self, start: &[u8], end: &[u8]) -> Self::Range {
+        MergeFuncIter {
+            prev: None,
+            iter: self.source.get_range(start, end),
+            merge_func: &self.merge_func,
+        }
+    }
+}
+
+pub struct MergeFuncIter<'a, I: Iter, F: Fn(&mut Entry, &Entry)> {
+    prev: Option<Entry>,
+    iter: I,
+    merge_func: &'a F,
+}
+
+impl<'a, I, F> Iterator for MergeFuncIter<'a, I, F>
+where
+    I: Iter,
+    F: Fn(&mut Entry, &Entry),
+{
+    type Item = Entry;
+    fn next(&mut self) -> Option<Self::Item> {
+        let mut cur = self.prev.take().or_else(|| self.iter.next())?;
+        while let Some(e) = self.iter.next() {
+            if e.key == cur.key {
+                (self.merge_func)(&mut cur, &e);
+            } else {
+                self.prev.replace(e);
+                break;
+            }
+        }
+        Some(cur)
+    }
+}
+
+impl<'a, I, F> Iter for MergeFuncIter<'a, I, F>
+where
+    I: Iter,
+    F: Fn(&mut Entry, &Entry),
+{
+    fn seek(&mut self, key: &[u8]) {
+        self.prev.take();
+        self.iter.seek(key);
+    }
+}
+
+#[test]
+fn test_merge_func() {
+    use crate::source::test_source::TestSource;
+    use std::sync::Arc;
+
+    let ts = TestSource(
+        (1u8..8)
+            .map(|n| vec![n; n as usize])
+            .flatten()
+            .map(|n| Entry::new(vec![n], vec![1]))
+            .collect(),
+    );
+
+    let s = (&ts).merge_func(|e1, e2| {
+        let v1 = Arc::make_mut(&mut e1.value);
+        v1[0] += e2.value[0];
+    });
+
+    assert_eq!(
+        Vec::from_iter((&s).iter()),
+        (1u8..8)
+            .map(|n| Entry::new(vec![n], vec![n]))
+            .collect::<Vec<_>>()
+    )
+}
index 4fd83b7db64a5bea6fefe21b35ca53e097bda342..de0067527a827b2c7c0a19f1300ab11597ec418e 100644 (file)
@@ -1,5 +1,7 @@
+use crate::dupsort_func::DupsortFunc;
 use crate::iter::{BoxedIter, PrefixIter, RangeIter};
-use crate::Iter;
+use crate::merge_func::MergeFunc;
+use crate::{Entry, Iter};
 use std::marker::PhantomData;
 
 pub trait Source {
@@ -12,6 +14,22 @@ pub trait Source {
     fn get(&self, key: &[u8]) -> Self::Get;
     fn get_prefix(&self, prefix: &[u8]) -> Self::Prefix;
     fn get_range(&self, start: &[u8], end: &[u8]) -> Self::Range;
+
+    fn merge_func<F>(self, merge_func: F) -> MergeFunc<Self, F>
+    where
+        Self: Sized,
+        F: Fn(&mut Entry, &Entry),
+    {
+        MergeFunc::new(self, merge_func)
+    }
+
+    fn dupsort_func<F>(self, dupsort_func: F) -> DupsortFunc<Self, F>
+    where
+        Self: Sized,
+        F: Fn(&Entry, &Entry) -> std::cmp::Ordering,
+    {
+        DupsortFunc::new(self, dupsort_func)
+    }
 }
 
 pub trait IterSource {