]> git.mikk.net Git - mtbl-rs/commitdiff
Refactor, again (WIP)
authorChris Mikkelson <chris@mikk.net>
Wed, 16 Apr 2025 05:06:47 +0000 (00:06 -0500)
committerChris Mikkelson <chris@mikk.net>
Wed, 16 Apr 2025 05:16:00 +0000 (00:16 -0500)
Break up some larger files into smaller chunks, also decouple Iter
and Iterator again, in favor of implementing IntoIterator for
Iter implementations.

Still a few remaining nits to work out -- the IntoIterator trait
isn't coming into scope where I expect it, so may need to back
off of `impl Iter` in favor of an associated type on Source which
requires IntoIterator.

19 files changed:
src/bin/mtbl_dump.rs
src/entry.rs
src/iter.rs [deleted file]
src/iter/dupmerge.rs [new file with mode: 0644]
src/iter/dupsort.rs [moved from src/dupsort.rs with 65% similarity]
src/iter/filter.rs [moved from src/filter.rs with 59% similarity]
src/iter/map.rs [moved from src/map.rs with 78% similarity]
src/iter/merger.rs [new file with mode: 0644]
src/iter/mod.rs [new file with mode: 0644]
src/iter/prefix.rs [new file with mode: 0644]
src/iter/range.rs [new file with mode: 0644]
src/lib.rs
src/merge.rs [deleted file]
src/merger.rs
src/reader/block.rs
src/reader/mod.rs
src/sorter.rs
src/source/adapters.rs [new file with mode: 0644]
src/source/mod.rs [moved from src/source.rs with 70% similarity]

index c1de7244bd2920c6d71e88f6db5fcb88fa9c01c7..cfeeb6fb4250e354ba9cffff6bc84d69dc7204b5 100644 (file)
@@ -5,7 +5,7 @@ fn main() {
         .nth(1)
         .expect("Usage: mtbl_dump <filename>");
     let reader = mtbl::reader::from_file(fname);
-    for e in reader.iter() {
+    for e in reader.iter().into_iter() {
         println!("{:?}: {:?}", e.key(), e.value());
     }
 }
index ce1ede2c2a3e1cfdb6506926f3041db9d015dbec..8d990b87b581d00ab99fc477b4c25529cf2f2107 100644 (file)
@@ -1,7 +1,8 @@
+use crate::iter::merger::Mergeable;
 use std::cmp::Ordering;
 use std::sync::Arc;
 
-#[derive(Debug, Clone, Eq)]
+#[derive(Debug, Clone)]
 pub struct Entry {
     key: Arc<Vec<u8>>,
     value: Arc<Vec<u8>>,
@@ -46,6 +47,8 @@ impl Entry {
     }
 }
 
+impl Mergeable for Entry {}
+
 impl PartialOrd<[u8]> for Entry {
     fn partial_cmp(&self, other: &[u8]) -> Option<Ordering> {
         Some(self.key().cmp(other))
@@ -64,8 +67,21 @@ impl AsRef<[u8]> for Entry {
     }
 }
 
-impl PartialEq<Entry> for Entry {
+impl PartialEq for Entry {
     fn eq(&self, other: &Entry) -> bool {
         self.key() == other.key()
     }
 }
+impl Eq for Entry {}
+
+impl PartialOrd for Entry {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.key().cmp(other.key()))
+    }
+}
+
+impl Ord for Entry {
+    fn cmp(&self, other: &Self) -> Ordering {
+        self.key().cmp(other.key())
+    }
+}
diff --git a/src/iter.rs b/src/iter.rs
deleted file mode 100644 (file)
index 67a92dc..0000000
+++ /dev/null
@@ -1,143 +0,0 @@
-use crate::dupsort::DupsortIter;
-use crate::filter::FilterIter;
-use crate::map::MapIter;
-use crate::merge::MergeIter;
-use crate::Entry;
-use std::cmp::Ordering;
-use std::iter::Iterator;
-
-pub trait SeekableIter: Iterator {
-    fn seek(&mut self, key: &[u8]);
-
-    fn merge_func<F>(self, merge_func: F) -> MergeIter<Self, F>
-    where
-        F: Fn(&mut Vec<u8>, &Entry),
-        Self: Sized,
-    {
-        MergeIter::new(self, merge_func)
-    }
-
-    fn dupsort_func<F>(self, dupsort_func: F) -> DupsortIter<Self, F>
-    where
-        F: FnMut(&Self::Item, &Self::Item) -> Ordering,
-        Self: Sized,
-    {
-        DupsortIter::new(self, dupsort_func)
-    }
-
-    fn filter_func<F>(self, filter_func: F) -> FilterIter<Self, F>
-    where
-        F: FnMut(&Self::Item, &mut Vec<u8>) -> bool,
-        Self: Sized,
-    {
-        FilterIter::new(self, filter_func)
-    }
-
-    fn map_func<F, O>(self, map_func: F) -> MapIter<Self, F, O>
-    where
-        F: FnMut(Self::Item) -> O,
-        Self: Sized,
-    {
-        MapIter::new(self, map_func)
-    }
-}
-
-impl<I: SeekableIter + ?Sized> SeekableIter for Box<I> {
-    fn seek(&mut self, key: &[u8]) {
-        self.as_mut().seek(key);
-    }
-}
-
-pub struct PrefixIter<I>
-where
-    I: SeekableIter,
-    I::Item: AsRef<[u8]>,
-{
-    iter: I,
-    prefix: Vec<u8>,
-}
-
-impl<I> PrefixIter<I>
-where
-    I: SeekableIter,
-    I::Item: AsRef<[u8]>,
-{
-    pub fn new(mut iter: I, prefix: impl AsRef<[u8]>) -> Self {
-        iter.seek(prefix.as_ref());
-        Self {
-            iter,
-            prefix: Vec::from(prefix.as_ref()),
-        }
-    }
-}
-
-impl<I> Iterator for PrefixIter<I>
-where
-    I: SeekableIter,
-    I::Item: AsRef<[u8]>,
-{
-    type Item = I::Item;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.iter
-            .next()
-            .filter(|e| e.as_ref().starts_with(self.prefix.as_slice()))
-    }
-}
-
-impl<I> SeekableIter for PrefixIter<I>
-where
-    I: SeekableIter,
-    I::Item: AsRef<[u8]>,
-{
-    fn seek(&mut self, key: &[u8]) {
-        self.iter.seek(key);
-    }
-}
-
-pub struct RangeIter<I> {
-    iter: I,
-    start: Vec<u8>,
-    end: Vec<u8>,
-}
-
-impl<I> RangeIter<I>
-where
-    I: SeekableIter,
-{
-    pub fn new(mut iter: I, start: impl AsRef<[u8]>, end: impl AsRef<[u8]>) -> Self {
-        iter.seek(start.as_ref());
-        Self {
-            iter,
-            start: Vec::from(start.as_ref()),
-            end: Vec::from(end.as_ref()),
-        }
-    }
-}
-
-impl<I> Iterator for RangeIter<I>
-where
-    I: SeekableIter,
-    I::Item: PartialOrd<[u8]>,
-{
-    type Item = I::Item;
-    fn next(&mut self) -> Option<Self::Item> {
-        self.iter.next().filter(|i| i <= self.end.as_slice())
-    }
-}
-
-impl<I> SeekableIter for RangeIter<I>
-where
-    I: SeekableIter,
-    I::Item: PartialOrd<[u8]>,
-{
-    fn seek(&mut self, key: &[u8]) {
-        if key <= self.start.as_slice() {
-            self.iter.seek(self.start.as_slice());
-        } else if key > self.end.as_slice() {
-            self.iter.seek(self.end.as_slice());
-        } else {
-            self.iter.seek(key);
-        }
-    }
-}
diff --git a/src/iter/dupmerge.rs b/src/iter/dupmerge.rs
new file mode 100644 (file)
index 0000000..7a6a3ce
--- /dev/null
@@ -0,0 +1,94 @@
+use crate::iter::{IntoIter, Iter};
+
+pub struct MergeIter<I, F>
+where
+    I: Iter,
+    I::Item: PartialEq,
+    F: FnMut(&mut I::Item, &I::Item),
+{
+    prev: Option<I::Item>,
+    cur: Option<I::Item>,
+    iter: I,
+    merge_func: F,
+}
+
+impl<I, F> MergeIter<I, F>
+where
+    I: Iter,
+    I::Item: PartialEq,
+    F: FnMut(&mut I::Item, &I::Item),
+{
+    pub fn new(iter: I, merge_func: F) -> Self {
+        Self {
+            prev: None,
+            cur: None,
+            iter,
+            merge_func,
+        }
+    }
+}
+
+impl<I, F> Iter for MergeIter<I, F>
+where
+    I: Iter,
+    I::Item: PartialEq,
+    F: FnMut(&mut I::Item, &I::Item),
+{
+    type Item = I::Item;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let mut cur = self.prev.take().or_else(|| self.iter.next())?;
+        while let Some(e) = self.iter.next() {
+            if cur == e {
+                (self.merge_func)(&mut cur, &e);
+            } else {
+                self.prev.replace(e);
+                break;
+            }
+        }
+        Some(cur)
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.prev.take();
+        self.iter.seek(key);
+    }
+}
+
+impl<I, F> IntoIterator for MergeIter<I, F>
+where
+    I: Iter,
+    I::Item: PartialEq,
+    F: FnMut(&mut I::Item, &I::Item),
+{
+    type Item = I::Item;
+    type IntoIter = IntoIter<Self>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        IntoIter(self)
+    }
+}
+
+#[cfg(disable)]
+#[test]
+fn test_merge_func() {
+    use crate::source::test_source::TestSource;
+
+    let ts = TestSource(
+        (1u8..8)
+            .flat_map(|n| vec![n; n as usize])
+            .map(|n| Entry::new(vec![n], vec![1]))
+            .collect(),
+    );
+
+    let s = ts.merge_func(|v, e| {
+        v[0] += e.value()[0];
+    });
+
+    assert_eq!(
+        Vec::from_iter(s.iter()),
+        (1u8..8)
+            .map(|n| Entry::new(vec![n], vec![n]))
+            .collect::<Vec<_>>()
+    )
+}
similarity index 65%
rename from src/dupsort.rs
rename to src/iter/dupsort.rs
index 3c05b604073f447c582bba2192b3195f387c9fef..04dd4b71c0eded5252facb2b0b35655f19ac38ef 100644 (file)
@@ -1,45 +1,10 @@
-use crate::{SeekableIter, Source};
+use crate::iter::{IntoIter, Iter};
 use std::cmp::Ordering;
 
-pub struct DupsortSource<S>
-where
-    S: Source,
-{
-    source: S,
-    #[allow(clippy::type_complexity)]
-    dupsort_func: Box<dyn Fn(&S::Item, &S::Item) -> Ordering>,
-}
-
-impl<S> DupsortSource<S>
-where
-    S: Source,
-{
-    pub fn new<F>(source: S, dupsort_func: F) -> Self
-    where
-        F: Fn(&S::Item, &S::Item) -> Ordering + 'static,
-    {
-        Self {
-            source,
-            dupsort_func: Box::new(dupsort_func),
-        }
-    }
-}
-
-impl<S> Source for DupsortSource<S>
-where
-    S: Source,
-    S::Item: PartialEq,
-{
-    type Item = S::Item;
-    fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
-        self.source.iter().dupsort_func(&self.dupsort_func)
-    }
-}
-
 #[derive(Debug)]
 pub struct DupsortIter<I, F>
 where
-    I: SeekableIter,
+    I: Iter,
     F: FnMut(&I::Item, &I::Item) -> Ordering,
 {
     run: Vec<I::Item>,
@@ -50,7 +15,7 @@ where
 
 impl<I, F> DupsortIter<I, F>
 where
-    I: SeekableIter,
+    I: Iter,
     F: FnMut(&I::Item, &I::Item) -> Ordering,
 {
     pub fn new(iter: I, dupsort_func: F) -> Self {
@@ -63,21 +28,39 @@ where
     }
 }
 
-impl<I, F> Iterator for DupsortIter<I, F>
+impl<I, F> IntoIterator for DupsortIter<I, F>
 where
-    I: SeekableIter,
+    I: Iter,
     I::Item: PartialEq,
     F: Fn(&I::Item, &I::Item) -> Ordering,
 {
     type Item = I::Item;
+    type IntoIter = IntoIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        IntoIter(self)
+    }
+}
+
+impl<I, F> Iter for DupsortIter<I, F>
+where
+    I: Iter,
+    I::Item: PartialEq,
+    F: Fn(&I::Item, &I::Item) -> Ordering,
+{
+    type Item = I::Item;
+
+    fn seek(&mut self, key: &[u8]) {
+        self.run.clear();
+        self.next.take();
+        self.iter.seek(key);
+    }
 
     fn next(&mut self) -> Option<Self::Item> {
         self.run.pop().or_else(|| {
             self.run
                 .push(self.next.take().or_else(|| self.iter.next())?);
 
-            //   println!("2: {:?} / {:?}", self.next, self.run);
-            for e in &mut self.iter {
+            while let Some(e) = self.iter.next() {
                 if e == self.run[0] {
                     self.run.push(e);
                     continue;
@@ -93,19 +76,7 @@ where
     }
 }
 
-impl<I, F> SeekableIter for DupsortIter<I, F>
-where
-    I: SeekableIter,
-    I::Item: PartialEq,
-    F: Fn(&I::Item, &I::Item) -> Ordering,
-{
-    fn seek(&mut self, key: &[u8]) {
-        self.run.clear();
-        self.next.take();
-        self.iter.seek(key);
-    }
-}
-
+#[cfg(disable)]
 #[test]
 fn test_dupsort() {
     use crate::source::test_source::TestSource;
similarity index 59%
rename from src/filter.rs
rename to src/iter/filter.rs
index 9d224f6d09ba0e7f12aebb33159d8e2c388612c6..b069d71b8c5763ac0772d86fc26ba9023ecd0892 100644 (file)
@@ -1,4 +1,4 @@
-use crate::{SeekableIter, Source};
+use crate::Iter;
 
 pub struct FilterIter<I, F> {
     iter: I,
@@ -8,7 +8,7 @@ pub struct FilterIter<I, F> {
 
 impl<I, F> FilterIter<I, F>
 where
-    I: SeekableIter,
+    I: Iter,
     F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
 {
     pub fn new(iter: I, filter: F) -> Self {
@@ -21,9 +21,9 @@ where
     }
 }
 
-impl<I, F> Iterator for FilterIter<I, F>
+impl<I, F> Iter for FilterIter<I, F>
 where
-    I: SeekableIter,
+    I: Iter,
     F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
 {
     type Item = I::Item;
@@ -40,56 +40,18 @@ where
             }
         }
     }
-}
 
-impl<I, F> SeekableIter for FilterIter<I, F>
-where
-    I: SeekableIter,
-    F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
-{
     fn seek(&mut self, key: &[u8]) {
         self.iter.seek(key);
     }
 }
 
-pub struct FilterSource<S>
-where
-    S: Source,
-{
-    source: S,
-    #[allow(clippy::type_complexity)]
-    filter_func: Box<dyn Fn(&S::Item, &mut Vec<u8>) -> bool>,
-}
-
-impl<S> FilterSource<S>
-where
-    S: Source,
-{
-    pub fn new<F>(source: S, filter_func: F) -> Self
-    where
-        F: Fn(&S::Item, &mut Vec<u8>) -> bool + 'static,
-    {
-        Self {
-            source,
-            filter_func: Box::new(filter_func),
-        }
-    }
-}
-
-impl<S> Source for FilterSource<S>
-where
-    S: Source,
-{
-    type Item = S::Item;
-    fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
-        self.source.iter().filter_func(&self.filter_func)
-    }
-}
-
 #[test]
 fn test_filter() {
     use crate::source::test_source::TestSource;
+    use crate::source::Source;
     use crate::Entry;
+    use std::iter::IntoIterator;
 
     let ts =
         TestSource((0u8..10).map(|n| Entry::new(vec![n], vec![])).collect()).filter(|e, sv| {
@@ -106,6 +68,9 @@ fn test_filter() {
 
     assert_eq!(
         vec![0, 2, 8],
-        ts.iter().map(|e| e.key()[0]).collect::<Vec<u8>>()
+        ts.map(|e| e.key()[0])
+            .iter()
+            .into_iter()
+            .collect::<Vec<u8>>()
     );
 }
similarity index 78%
rename from src/map.rs
rename to src/iter/map.rs
index a2f746b3ea0a0a194d3fca2f3d7534ab4996f42b..628467f360e90907cf9c22266036755e0c3537c7 100644 (file)
@@ -1,8 +1,11 @@
-use crate::{SeekableIter, Source};
+use crate::{
+    iter::{IntoIter, Iter},
+    Source,
+};
 
 pub struct MapIter<I, F, O>
 where
-    I: SeekableIter,
+    I: Iter,
     F: FnMut(I::Item) -> O,
 {
     iter: I,
@@ -11,7 +14,7 @@ where
 
 impl<I, F, O> MapIter<I, F, O>
 where
-    I: SeekableIter,
+    I: Iter,
     F: FnMut(I::Item) -> O,
 {
     pub fn new(iter: I, mapf: F) -> Self {
@@ -19,9 +22,9 @@ where
     }
 }
 
-impl<I, F, O> Iterator for MapIter<I, F, O>
+impl<I, F, O> Iter for MapIter<I, F, O>
 where
-    I: SeekableIter,
+    I: Iter,
     F: FnMut(I::Item) -> O,
 {
     type Item = O;
@@ -30,15 +33,22 @@ where
         let item = self.iter.next()?;
         Some((self.mapf)(item))
     }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.iter.seek(key);
+    }
 }
 
-impl<I, F, O> SeekableIter for MapIter<I, F, O>
+impl<I, F, O> IntoIterator for MapIter<I, F, O>
 where
-    I: SeekableIter,
+    I: Iter,
     F: FnMut(I::Item) -> O,
 {
-    fn seek(&mut self, key: &[u8]) {
-        self.iter.seek(key);
+    type Item = O;
+    type IntoIter = IntoIter<Self>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        IntoIter(self)
     }
 }
 
@@ -71,16 +81,15 @@ where
     S: Source,
 {
     type Item = O;
-    fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
-        self.source.iter().map_func(&self.map)
+    fn iter(&self) -> impl Iter<Item = Self::Item> {
+        self.source.iter().map(&self.map)
     }
 }
 
 #[cfg(test)]
 mod test {
     use crate::source::test_source::TestSource;
-    use crate::Entry;
-    use crate::Source;
+    use crate::{Entry, Iter, Source};
 
     struct CompVec(Vec<u8>);
     impl PartialEq<[u8]> for CompVec {
@@ -99,12 +108,13 @@ mod test {
         let ts = TestSource((0u8..10).map(|i| Entry::new(vec![i], vec![])).collect())
             .map(|e| CompVec(Vec::from(e.key())));
         assert_eq!(
-            ts.iter().map(|cv| cv.0).collect::<Vec<_>>(),
+            ts.iter().map(|cv| cv.0).into_iter().collect::<Vec<_>>(),
             (0u8..10).map(|i| vec![i]).collect::<Vec<_>>()
         );
         assert_eq!(
             ts.get_range(&[2u8], &[7u8])
                 .map(|cv| cv.0)
+                .into_iter()
                 .collect::<Vec<_>>(),
             (2u8..8)
                 .map(|i| {
diff --git a/src/iter/merger.rs b/src/iter/merger.rs
new file mode 100644 (file)
index 0000000..744991c
--- /dev/null
@@ -0,0 +1,182 @@
+use crate::Iter;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+
+pub trait Mergeable: Ord + PartialOrd<[u8]> + AsRef<[u8]> + Clone {}
+
+struct MergeEntry<I: Iter> {
+    e: I::Item,
+    it: I,
+}
+
+impl<I: Iter> PartialEq for MergeEntry<I>
+where
+    I::Item: PartialEq,
+{
+    fn eq(&self, other: &Self) -> bool {
+        self.e == other.e
+    }
+}
+
+impl<I: Iter> Eq for MergeEntry<I> where I::Item: PartialEq {}
+
+// Note: MergEntry Ord, PartialOrd is reversed to provide "min heap" semantics
+impl<I: Iter> PartialOrd for MergeEntry<I>
+where
+    I::Item: PartialOrd,
+{
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        other.e.partial_cmp(&self.e)
+    }
+}
+
+impl<I: Iter> Ord for MergeEntry<I>
+where
+    I::Item: Ord,
+{
+    fn cmp(&self, other: &Self) -> Ordering {
+        other.e.cmp(&self.e)
+    }
+}
+
+pub struct MergeIter<I: Iter> {
+    heap: BinaryHeap<MergeEntry<I>>,
+    finished: Vec<I>,
+    last_key: Vec<u8>,
+}
+
+impl<I> From<I> for MergeIter<I::Item>
+where
+    I: Iterator,
+    I::Item: Iter,
+    <I::Item as Iter>::Item: Mergeable,
+{
+    fn from(iters: I) -> Self {
+        let mut v: Vec<I::Item> = Vec::new();
+        let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() {
+            Some(e) => Some(MergeEntry { e, it }),
+            None => {
+                v.push(it);
+                None
+            }
+        }));
+        MergeIter {
+            finished: v,
+            heap: h,
+            last_key: Vec::new(),
+        }
+    }
+}
+
+impl<I: Iter> Iter for MergeIter<I>
+where
+    I::Item: Mergeable,
+{
+    type Item = I::Item;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let cur;
+        {
+            let mut next = self.heap.peek_mut()?;
+
+            cur = next.e.clone();
+            self.last_key.clear();
+            self.last_key.extend(cur.as_ref());
+
+            if let Some(e) = next.it.next() {
+                next.e = e;
+                return Some(cur);
+            }
+        }
+        self.heap.pop();
+        Some(cur)
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        if key >= self.last_key.as_ref() {
+            loop {
+                match self.heap.peek_mut() {
+                    None => return,
+                    Some(mut head) => {
+                        if &head.e >= key {
+                            return;
+                        }
+                        head.it.seek(key);
+                        if let Some(e) = head.it.next() {
+                            head.e = e;
+                            continue;
+                        }
+                    }
+                }
+                self.heap.pop();
+            }
+        }
+
+        // backwards seek; reset heap
+        let mut finished: Vec<I> = Vec::new();
+        let mut heap_entries: Vec<MergeEntry<I>> = Vec::new();
+        for mut it in self
+            .heap
+            .drain()
+            .map(|me| me.it)
+            .chain(self.finished.drain(..))
+        {
+            it.seek(key);
+            match it.next() {
+                Some(e) => heap_entries.push(MergeEntry { e, it }),
+                None => finished.push(it),
+            }
+        }
+        self.heap = BinaryHeap::from_iter(heap_entries);
+        self.finished = finished;
+        self.last_key.clear();
+        self.last_key.extend(key);
+    }
+}
+
+#[cfg(disabled)]
+#[cfg(test)]
+mod test {
+    use super::Merger;
+    use crate::source::test_source::TestSource;
+    use crate::Entry;
+    use crate::Source;
+
+    fn tnum(m: u8) -> Vec<u8> {
+        Vec::from_iter((1u8..255).filter(|i| i % m == 0))
+    }
+
+    fn test_source(m: u8) -> TestSource {
+        TestSource(Vec::from_iter(
+            tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])),
+        ))
+    }
+
+    #[test]
+    fn test_merge() {
+        let range = 1..8;
+        let iters: Vec<_> = range
+            .clone()
+            .map(test_source)
+            .map(|s| s.into_boxed())
+            .collect();
+        let s = Merger::from(iters);
+        let mut v = Vec::<u8>::new();
+        for i in range {
+            v.extend(tnum(i))
+        }
+        v.sort();
+        let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0]));
+        assert_eq!(v2, v);
+    }
+
+    #[test]
+    fn test_binheap() {
+        use std::collections::BinaryHeap;
+
+        let v: Vec<u8> = vec![1, 8, 2, 9, 4, 7, 3];
+        let vs = v.as_slice();
+        let h = BinaryHeap::from_iter(vs.iter().copied());
+        assert_ne!(h.into_vec(), v);
+    }
+}
diff --git a/src/iter/mod.rs b/src/iter/mod.rs
new file mode 100644 (file)
index 0000000..69123c9
--- /dev/null
@@ -0,0 +1,92 @@
+use std::cmp::Ordering;
+pub mod dupmerge;
+pub mod dupsort;
+pub mod filter;
+pub mod map;
+pub mod merger;
+pub mod prefix;
+pub mod range;
+
+use dupmerge::MergeIter;
+use dupsort::DupsortIter;
+use filter::FilterIter;
+use map::MapIter;
+
+pub trait Iter {
+    type Item;
+
+    fn seek(&mut self, key: &[u8]);
+
+    fn next(&mut self) -> Option<Self::Item>;
+
+    fn seek_to(mut self, key: &[u8]) -> Self
+    where
+        Self: Sized,
+    {
+        self.seek(key);
+        self
+    }
+
+    fn dup_merge<F>(self, merge_func: F) -> MergeIter<Self, F>
+    where
+        Self::Item: PartialEq + Clone,
+        F: FnMut(&mut Self::Item, &Self::Item),
+        Self: Sized,
+    {
+        MergeIter::new(self, merge_func)
+    }
+
+    fn dup_sort<F>(self, dupsort_func: F) -> DupsortIter<Self, F>
+    where
+        Self: Sized,
+        F: FnMut(&Self::Item, &Self::Item) -> Ordering,
+    {
+        DupsortIter::new(self, dupsort_func)
+    }
+
+    fn filter<F>(self, filter_func: F) -> FilterIter<Self, F>
+    where
+        F: FnMut(&Self::Item, &mut Vec<u8>) -> bool,
+        Self: Sized,
+    {
+        FilterIter::new(self, filter_func)
+    }
+
+    fn map<F, O>(self, map_func: F) -> MapIter<Self, F, O>
+    where
+        F: FnMut(Self::Item) -> O,
+        Self: Sized,
+    {
+        MapIter::new(self, map_func)
+    }
+}
+
+impl<I: Iter + ?Sized> Iter for Box<I> {
+    type Item = I::Item;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.as_mut().next()
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.as_mut().seek(key);
+    }
+}
+
+pub struct IntoIter<I: Iter>(pub I);
+
+impl<I: Iter> IntoIter<I> {
+    fn new(i: I) -> Self {
+        Self(i)
+    }
+}
+
+impl<I> Iterator for IntoIter<I>
+where
+    I: Iter,
+{
+    type Item = I::Item;
+    fn next(&mut self) -> Option<Self::Item> {
+        self.0.next()
+    }
+}
diff --git a/src/iter/prefix.rs b/src/iter/prefix.rs
new file mode 100644 (file)
index 0000000..8deb321
--- /dev/null
@@ -0,0 +1,55 @@
+use crate::iter::{IntoIter, Iter};
+
+pub struct PrefixIter<I>
+where
+    I: Iter,
+    I::Item: AsRef<[u8]>,
+{
+    iter: I,
+    prefix: Vec<u8>,
+}
+
+impl<I> PrefixIter<I>
+where
+    I: Iter,
+    I::Item: AsRef<[u8]>,
+{
+    pub fn new(mut iter: I, prefix: impl AsRef<[u8]>) -> Self {
+        iter.seek(prefix.as_ref());
+        Self {
+            iter,
+            prefix: Vec::from(prefix.as_ref()),
+        }
+    }
+}
+
+impl<I> Iter for PrefixIter<I>
+where
+    I: Iter,
+    I::Item: AsRef<[u8]>,
+{
+    type Item = I::Item;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.iter
+            .next()
+            .filter(|e| e.as_ref().starts_with(self.prefix.as_slice()))
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.iter.seek(key);
+    }
+}
+
+impl<I> IntoIterator for PrefixIter<I>
+where
+    I: Iter,
+    I::Item: AsRef<[u8]>,
+{
+    type Item = I::Item;
+    type IntoIter = IntoIter<Self>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        IntoIter(self)
+    }
+}
diff --git a/src/iter/range.rs b/src/iter/range.rs
new file mode 100644 (file)
index 0000000..00d559f
--- /dev/null
@@ -0,0 +1,54 @@
+use crate::iter::{IntoIter, Iter};
+
+pub struct RangeIter<I> {
+    iter: I,
+    start: Vec<u8>,
+    end: Vec<u8>,
+}
+
+impl<I> RangeIter<I>
+where
+    I: Iter,
+{
+    pub fn new(mut iter: I, start: impl AsRef<[u8]>, end: impl AsRef<[u8]>) -> Self {
+        iter.seek(start.as_ref());
+        Self {
+            iter,
+            start: Vec::from(start.as_ref()),
+            end: Vec::from(end.as_ref()),
+        }
+    }
+}
+
+impl<I> Iter for RangeIter<I>
+where
+    I: Iter,
+    I::Item: PartialOrd<[u8]>,
+{
+    type Item = I::Item;
+    fn next(&mut self) -> Option<Self::Item> {
+        self.iter.next().filter(|i| i <= self.end.as_slice())
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        if key <= self.start.as_slice() {
+            self.iter.seek(self.start.as_slice());
+        } else if key > self.end.as_slice() {
+            self.iter.seek(self.end.as_slice());
+        } else {
+            self.iter.seek(key);
+        }
+    }
+}
+
+impl<I> IntoIterator for RangeIter<I>
+where
+    I: Iter,
+    I::Item: PartialOrd<[u8]>,
+{
+    type Item = I::Item;
+    type IntoIter = IntoIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        IntoIter(self)
+    }
+}
index d8060fb78da751a976d363b5b3889fb839a547dc..07b87dc57f439b0f0c675ded0a8b6486a380383d 100644 (file)
@@ -1,10 +1,6 @@
 mod compression;
-mod dupsort;
 mod entry;
-mod filter;
 mod iter;
-mod map;
-mod merge;
 mod merger;
 pub mod reader;
 pub mod sorter;
@@ -14,7 +10,7 @@ mod writer;
 pub use compression::Compression;
 pub use entry::Entry;
 pub use fileset::Fileset;
-pub use iter::SeekableIter;
+pub use iter::Iter;
 pub use merger::Merger;
 pub use reader::Reader;
 pub use source::Source;
diff --git a/src/merge.rs b/src/merge.rs
deleted file mode 100644 (file)
index ae26304..0000000
+++ /dev/null
@@ -1,104 +0,0 @@
-use crate::{Entry, SeekableIter, Source};
-
-pub struct MergeSource<S: Source> {
-    source: S,
-    #[allow(clippy::type_complexity)]
-    merge_func: Box<dyn Fn(&mut Vec<u8>, &Entry)>,
-}
-
-impl<S> MergeSource<S>
-where
-    S: Source,
-{
-    pub fn new<F>(source: S, merge_func: F) -> Self
-    where
-        F: Fn(&mut Vec<u8>, &Entry) + 'static,
-    {
-        Self {
-            source,
-            merge_func: Box::new(merge_func),
-        }
-    }
-}
-impl<S> Source for MergeSource<S>
-where
-    S: Source<Item = Entry>,
-{
-    type Item = S::Item;
-    fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
-        self.source.iter().merge_func(&self.merge_func)
-    }
-}
-
-pub struct MergeIter<I, F: Fn(&mut Vec<u8>, &Entry)> {
-    prev: Option<Entry>,
-    iter: I,
-    merge_func: F,
-}
-
-impl<I, F> MergeIter<I, F>
-where
-    F: Fn(&mut Vec<u8>, &Entry),
-{
-    pub fn new(iter: I, merge_func: F) -> Self {
-        Self {
-            prev: None,
-            iter,
-            merge_func,
-        }
-    }
-}
-
-impl<I, F> Iterator for MergeIter<I, F>
-where
-    I: Iterator<Item = Entry>,
-    F: Fn(&mut Vec<u8>, &Entry),
-{
-    type Item = Entry;
-    fn next(&mut self) -> Option<Self::Item> {
-        let mut cur = self.prev.take().or_else(|| self.iter.next())?;
-        for e in &mut self.iter {
-            if e.key() == cur.key() {
-                (self.merge_func)(cur.value_mut(), &e);
-            } else {
-                self.prev.replace(e);
-                break;
-            }
-        }
-        Some(cur)
-    }
-}
-
-impl<I, F> SeekableIter for MergeIter<I, F>
-where
-    I: SeekableIter<Item = Entry>,
-    F: Fn(&mut Vec<u8>, &Entry),
-{
-    fn seek(&mut self, key: &[u8]) {
-        self.prev.take();
-        self.iter.seek(key);
-    }
-}
-
-#[test]
-fn test_merge_func() {
-    use crate::source::test_source::TestSource;
-
-    let ts = TestSource(
-        (1u8..8)
-            .flat_map(|n| vec![n; n as usize])
-            .map(|n| Entry::new(vec![n], vec![1]))
-            .collect(),
-    );
-
-    let s = ts.merge_func(|v, e| {
-        v[0] += e.value()[0];
-    });
-
-    assert_eq!(
-        Vec::from_iter(s.iter()),
-        (1u8..8)
-            .map(|n| Entry::new(vec![n], vec![n]))
-            .collect::<Vec<_>>()
-    )
-}
index 5abf0a9dfc565e568de3bbc582ebfd1876b69d4c..e6ea6c6148c04605c9fb473a3d7d7b2f198a038c 100644 (file)
-use crate::{Entry, SeekableIter, Source};
-use std::cmp::Ordering;
-use std::collections::BinaryHeap;
+use crate::iter::{
+    merger::{MergeIter, Mergeable},
+    Iter,
+};
+use crate::Source;
 
 pub struct Merger<S> {
     sources: Vec<S>,
 }
 
-impl<S, I> From<I> for Merger<S>
-where
-    I: IntoIterator<Item = S>,
-{
-    fn from(i: I) -> Self {
-        Merger {
-            sources: Vec::from_iter(i),
+impl<S> Merger<S> {
+    pub fn new() -> Self {
+        Self {
+            sources: Vec::new(),
         }
     }
-}
 
-struct MergeEntry<I: SeekableIter> {
-    e: Entry,
-    it: I,
-}
-
-impl<I: SeekableIter> PartialEq for MergeEntry<I> {
-    fn eq(&self, other: &Self) -> bool {
-        self.e.key() == other.e.key()
-    }
-}
-impl<I: SeekableIter> Eq for MergeEntry<I> {}
-
-impl<I: SeekableIter> PartialOrd for MergeEntry<I> {
-    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
-        Some(other.e.key().cmp(self.e.key()))
+    pub fn add(&mut self, source: S) {
+        self.sources.push(source);
     }
 }
 
-impl<I: SeekableIter> Ord for MergeEntry<I> {
-    fn cmp(&self, other: &Self) -> Ordering {
-        other.e.key().cmp(self.e.key())
-    }
-}
-
-pub struct MergeIter<I: SeekableIter> {
-    heap: BinaryHeap<MergeEntry<I>>,
-    finished: Vec<I>,
-    last_key: Vec<u8>,
-}
-
-impl<I> From<I> for MergeIter<I::Item>
+impl<S, I> From<I> for Merger<S>
 where
-    I: Iterator,
-    I::Item: SeekableIter<Item = Entry>,
+    I: IntoIterator<Item = S>,
+    S: Source,
+    S::Item: Mergeable,
 {
-    fn from(iters: I) -> Self {
-        let mut v: Vec<I::Item> = Vec::new();
-        let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() {
-            Some(e) => Some(MergeEntry { e, it }),
-            None => {
-                v.push(it);
-                None
-            }
-        }));
-        MergeIter {
-            finished: v,
-            heap: h,
-            last_key: Vec::new(),
-        }
-    }
-}
-
-impl<I: SeekableIter<Item = Entry>> Iterator for MergeIter<I> {
-    type Item = Entry;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        let cur;
-        {
-            let mut next = self.heap.peek_mut()?;
-            cur = next.e.clone();
-            if let Some(e) = next.it.next() {
-                next.e = e;
-                self.last_key.clear();
-                self.last_key.extend(next.e.key());
-                return Some(cur);
-            }
-        }
-        self.heap.pop();
-        Some(cur)
-    }
-}
-
-impl<I: SeekableIter<Item = Entry>> SeekableIter for MergeIter<I> {
-    fn seek(&mut self, key: &[u8]) {
-        if key > self.last_key.as_slice() {
-            loop {
-                match self.heap.peek_mut() {
-                    None => return,
-                    Some(mut head) => {
-                        if head.e.key() >= key {
-                            self.last_key.clear();
-                            self.last_key.extend_from_slice(head.e.key());
-                            return;
-                        }
-                        head.it.seek(key);
-                        if let Some(e) = head.it.next() {
-                            head.e = e;
-                            continue;
-                        }
-                    }
-                }
-                self.heap.pop();
-            }
-        }
-
-        // backwards seek; reset heap
-        let mut finished: Vec<I> = Vec::new();
-        let mut heap_entries: Vec<MergeEntry<I>> = Vec::new();
-        for mut it in self
-            .heap
-            .drain()
-            .map(|me| me.it)
-            .chain(self.finished.drain(..))
-        {
-            it.seek(key);
-            match it.next() {
-                Some(e) => heap_entries.push(MergeEntry { e, it }),
-                None => finished.push(it),
-            }
+    fn from(i: I) -> Self {
+        Merger {
+            sources: Vec::from_iter(i),
         }
-        self.heap = BinaryHeap::from_iter(heap_entries);
-        self.finished = finished;
-        self.last_key.clear();
-        self.last_key.extend_from_slice(key);
     }
 }
 
 impl<S> Source for Merger<S>
 where
-    S: Source<Item = Entry>,
+    S: Source,
+    S::Item: Mergeable,
 {
     type Item = S::Item;
-    fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> {
         MergeIter::from(self.sources.iter().map(|s| s.iter()))
     }
 }
 
+#[cfg(disabled)]
 #[cfg(test)]
 mod test {
     use super::Merger;
index d1cabf0987ec868a476683428d95a26d8d6f4215..e152af24dba7f4c07e2507bf7a1c23b9241b98c0 100644 (file)
@@ -1,4 +1,5 @@
-use crate::{Entry, Result, SeekableIter};
+use crate::iter::{IntoIter, Iter};
+use crate::{Entry, Result};
 use integer_encoding::VarInt;
 use std::mem::size_of;
 
@@ -79,16 +80,27 @@ impl<D: AsRef<[u8]>> Block<D> {
             .decode(&self.data.as_ref()[self.restart_off..], idx)
     }
 }
+
+impl<D: AsRef<[u8]>> From<Block<D>> for BlockIter<D> {
+    fn from(b: Block<D>) -> BlockIter<D> {
+        BlockIter {
+            block: b,
+            cur_ent: None,
+            off: 0,
+        }
+    }
+}
+
 impl<D: AsRef<[u8]>> IntoIterator for Block<D> {
     type Item = Entry;
-    type IntoIter = BlockIter<D>;
+    type IntoIter = IntoIter<BlockIter<D>>;
 
     fn into_iter(self) -> Self::IntoIter {
-        Self::IntoIter {
+        IntoIter(BlockIter {
             block: self,
             cur_ent: None,
             off: 0,
-        }
+        })
     }
 }
 
@@ -181,15 +193,13 @@ impl<D: AsRef<[u8]>> BlockIter<D> {
     }
 }
 
-impl<D: AsRef<[u8]>> Iterator for BlockIter<D> {
+impl<D: AsRef<[u8]>> Iter for BlockIter<D> {
     type Item = Entry;
 
     fn next(&mut self) -> Option<Self::Item> {
         self.decode().cloned()
     }
-}
 
-impl<D: AsRef<[u8]>> SeekableIter for BlockIter<D> {
     fn seek(&mut self, key: &[u8]) {
         // TODO: "galloping search"
         if self.block.restart_count > 0 {
@@ -213,10 +223,10 @@ impl<D: AsRef<[u8]>> SeekableIter for BlockIter<D> {
 #[cfg(test)]
 mod test {
 
-    use crate::reader::block::Block;
+    use crate::reader::block::{Block, BlockIter};
     use crate::writer::block_builder::BlockBuilder;
     use crate::Entry;
-    use crate::SeekableIter;
+    use crate::Iter;
 
     fn build_block(n: u32, skip: u32, r: usize) -> Block<Vec<u8>> {
         let mut bb = BlockBuilder::default();
@@ -258,7 +268,7 @@ mod test {
     fn test_block_seek() {
         let n = 40;
         let b = build_block(n, 10, 10);
-        let mut bi = b.into_iter();
+        let mut bi = BlockIter::from(b);
         bi.seek(&u32::to_be_bytes(40));
         assert_eq!(bi.next().unwrap().key(), &u32::to_be_bytes(40));
         bi.seek(&u32::to_be_bytes(32));
index 4ba0d7ebc82b1e7c4f1983ed6dacd1ce14a637e4..f5d8483acbf0b2f07e5dccf4d00228d7807d8fc2 100644 (file)
@@ -1,6 +1,6 @@
 use crate::metadata::Metadata;
 use crate::Source;
-use crate::{Entry, SeekableIter};
+use crate::{iter::IntoIter, Entry, Iter};
 use integer_encoding::VarInt;
 pub(crate) mod block;
 use crate::compression::CBuf;
@@ -105,7 +105,7 @@ impl<D: AsRef<[u8]>> Reader<D> {
         off += prelude;
         block::Block::new(CBuf::Buf(self.data.clone_range(off, size)))
             .expect("bad block")
-            .into_iter()
+            .into()
     }
 }
 
@@ -131,14 +131,15 @@ impl<D: AsRef<[u8]>> ReaderIter<D> {
         self.data_iter.replace(
             block::Block::new(comp.uncompress(self.reader.data.clone_range(data_off, size))?)
                 .expect("bad block")
-                .into_iter(),
+                .into(),
         );
         Some(())
     }
 }
 
-impl<D: AsRef<[u8]>> Iterator for ReaderIter<D> {
+impl<D: AsRef<[u8]>> Iter for ReaderIter<D> {
     type Item = Entry;
+
     fn next(&mut self) -> Option<Self::Item> {
         if self.data_iter.is_none() {
             self.next_block()
@@ -153,9 +154,7 @@ impl<D: AsRef<[u8]>> Iterator for ReaderIter<D> {
             }
         }
     }
-}
 
-impl<D: AsRef<[u8]>> SeekableIter for ReaderIter<D> {
     fn seek(&mut self, key: &[u8]) {
         // TODO: detect and skip unneeded seek in iter.
         self.index_iter.seek(key);
@@ -176,10 +175,18 @@ impl<D: AsRef<[u8]>> SeekableIter for ReaderIter<D> {
     }
 }
 
+impl<D: AsRef<[u8]>> IntoIterator for ReaderIter<D> {
+    type Item = Entry;
+    type IntoIter = IntoIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        IntoIter(self)
+    }
+}
+
 impl<D: AsRef<[u8]>> Source for Reader<D> {
     type Item = Entry;
 
-    fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> {
         ReaderIter {
             reader: self.clone(),
             next_offset: 0,
index a582cc0d304a1d344768457414b2390dfb5e8987..d16445239ce0a2af768b8cf4bab3e024ff63f1b5 100644 (file)
@@ -1,8 +1,8 @@
-use crate::{merge::MergeSource, Entry, Merger, Reader, SeekableIter, Source, Writer};
+use crate::{Entry, Iter, Merger, Reader, Source, Writer};
 use memmap::Mmap;
 use std::cell::Cell;
 
-pub struct Sorter<F: Fn(&mut Vec<u8>, &Entry)> {
+pub struct Sorter<F: Fn(&mut Entry, &Entry)> {
     batch: Cell<Vec<Entry>>,
     batch_size: usize,
     max_size: usize,
@@ -13,7 +13,7 @@ pub struct Sorter<F: Fn(&mut Vec<u8>, &Entry)> {
 
 impl<F> Sorter<F>
 where
-    F: Fn(&mut Vec<u8>, &Entry) + 'static,
+    F: Fn(&mut Entry, &Entry) + 'static,
 {
     pub fn new(max_size: usize, merge_func: F) -> Self {
         Self {
@@ -35,15 +35,19 @@ where
         self.batch_size += esize;
     }
 
-    pub fn source(mut self) -> MergeSource<Merger<Reader<Mmap>>> {
+    pub fn source(mut self) -> impl Source<Item = Entry> {
+        // DupmergeSource<Merger<Reader<Mmap>>> {
         if !self.batch.get_mut().is_empty() {
             self.write_chunk();
         }
-        Merger::from(self.readers).merge_func(self.merge_func)
+        Merger::from(self.readers).dup_merge(self.merge_func)
     }
 
     pub fn write<W: std::io::Write>(self, mut w: Writer<W>) {
-        self.source().iter().for_each(|e| w.add(e).unwrap());
+        self.source()
+            .iter()
+            .into_iter()
+            .for_each(|e| w.add(e).unwrap());
     }
 
     fn write_chunk(&mut self) {
@@ -54,7 +58,8 @@ where
         self.batch
             .take()
             .iter()
-            .merge_func(&self.merge_func)
+            .dup_merge(&self.merge_func)
+            .into_iter()
             .for_each(|e| {
                 w.add(e).unwrap();
             });
diff --git a/src/source/adapters.rs b/src/source/adapters.rs
new file mode 100644 (file)
index 0000000..4eaadbc
--- /dev/null
@@ -0,0 +1,72 @@
+use crate::{Iter, Source};
+use std::cmp::Ordering;
+
+pub struct DupmergeSource<S, F> {
+    pub(super) source: S,
+    pub(super) merge: F,
+}
+
+impl<S, F> Source for DupmergeSource<S, F>
+where
+    S: Source,
+    S::Item: PartialEq + Clone,
+    F: Fn(&mut S::Item, &S::Item),
+{
+    type Item = S::Item;
+    fn iter(&self) -> impl Iter<Item = Self::Item> {
+        self.source.iter().dup_merge(&self.merge)
+    }
+}
+
+pub struct DupsortSource<S, F> {
+    pub(super) source: S,
+    pub(super) dupsort: F,
+}
+
+impl<S, F> Source for DupsortSource<S, F>
+where
+    S: Source,
+    S::Item: PartialEq,
+    F: Fn(&S::Item, &S::Item) -> Ordering,
+{
+    type Item = S::Item;
+    fn iter(&self) -> impl Iter<Item = Self::Item> {
+        self.source.iter().dup_sort(&self.dupsort)
+    }
+}
+
+pub struct FilterSource<S, F> {
+    pub(super) source: S,
+    pub(super) filter: F,
+}
+
+impl<S, F> Source for FilterSource<S, F>
+where
+    S: Source,
+    F: Fn(&S::Item, &mut Vec<u8>) -> bool,
+{
+    type Item = S::Item;
+    fn iter(&self) -> impl Iter<Item = Self::Item> {
+        self.source.iter().filter(&self.filter)
+    }
+}
+
+pub struct MapSource<S, F, O>
+where
+    S: Source,
+    F: Fn(S::Item) -> O,
+{
+    pub(super) source: S,
+    pub(super) map: F,
+}
+
+impl<S, F, O> Source for MapSource<S, F, O>
+where
+    S: Source,
+    F: Fn(S::Item) -> O,
+{
+    type Item = O;
+    fn iter(&self) -> impl Iter<Item = Self::Item> {
+        self.source.iter().map(&self.map)
+    }
+}
similarity index 70%
rename from src/source.rs
rename to src/source/mod.rs
index 4efbb296fcbf5c0f0f65794df785a48d06fe16a9..24638fc9c3fb054e4ae9193ee1e38f711eabeb72 100644 (file)
@@ -1,63 +1,78 @@
-use crate::dupsort::DupsortSource;
-use crate::filter::FilterSource;
-use crate::iter::{PrefixIter, RangeIter};
-use crate::map::MapSource;
-use crate::merge::MergeSource;
-use crate::{Entry, SeekableIter};
+use crate::iter::{prefix::PrefixIter, range::RangeIter, Iter};
+use crate::Entry;
+use std::cmp::Ordering;
+
+mod adapters;
+pub use adapters::DupmergeSource;
+pub use adapters::DupsortSource;
+pub use adapters::FilterSource;
+pub use adapters::MapSource;
 
 pub trait Source {
     type Item;
 
-    fn iter(&self) -> impl SeekableIter<Item = Self::Item>;
-    fn get(&self, key: &[u8]) -> RangeIter<impl SeekableIter<Item = Self::Item>>
+    fn iter(&self) -> impl Iter<Item = Self::Item>;
+
+    fn get(&self, key: &[u8]) -> RangeIter<impl Iter<Item = Self::Item>>
     where
         Self::Item: PartialOrd<[u8]>,
     {
         RangeIter::new(self.iter(), key, key)
     }
-    fn get_prefix(&self, prefix: &[u8]) -> PrefixIter<impl SeekableIter<Item = Self::Item>>
+
+    fn get_prefix(&self, prefix: &[u8]) -> PrefixIter<impl Iter<Item = Self::Item>>
     where
         Self::Item: AsRef<[u8]>,
     {
         PrefixIter::new(self.iter(), prefix)
     }
-    fn get_range(&self, start: &[u8], end: &[u8]) -> RangeIter<impl SeekableIter<Item = Self::Item>>
+
+    fn get_range(&self, start: &[u8], end: &[u8]) -> RangeIter<impl Iter<Item = Self::Item>>
     where
         Self::Item: PartialOrd<[u8]>,
     {
         RangeIter::new(self.iter(), start, end)
     }
 
-    fn merge_func<F>(self, merge_func: F) -> MergeSource<Self>
+    fn dup_merge<F>(self, merge: F) -> DupmergeSource<Self, F>
     where
         Self: Sized,
-        F: Fn(&mut Vec<u8>, &Entry) + 'static,
+        F: Fn(&mut Entry, &Entry) + 'static,
     {
-        MergeSource::new(self, merge_func)
+        DupmergeSource {
+            source: self,
+            merge,
+        }
     }
 
-    fn dupsort_func<F>(self, dupsort_func: F) -> DupsortSource<Self>
+    fn dupsort<F>(self, dupsort: F) -> DupsortSource<Self, F>
     where
         Self: Sized,
-        F: Fn(&Self::Item, &Self::Item) -> std::cmp::Ordering + 'static,
+        F: Fn(&Self::Item, &Self::Item) -> Ordering + 'static,
     {
-        DupsortSource::new(self, dupsort_func)
+        DupsortSource {
+            source: self,
+            dupsort,
+        }
     }
 
-    fn filter<F>(self, filter_func: F) -> FilterSource<Self>
+    fn filter<F>(self, filter: F) -> FilterSource<Self, F>
     where
         Self: Sized,
         F: Fn(&Self::Item, &mut Vec<u8>) -> bool + 'static,
     {
-        FilterSource::new(self, filter_func)
+        FilterSource {
+            source: self,
+            filter,
+        }
     }
 
-    fn map<F, O>(self, map_func: F) -> MapSource<Self, O>
+    fn map<F, O>(self, map: F) -> MapSource<Self, F, O>
     where
         Self: Sized,
         F: Fn(Self::Item) -> O + 'static,
     {
-        MapSource::new(self, map_func)
+        MapSource { source: self, map }
     }
 
     fn into_boxed(self) -> Box<dyn DynSource<Item = Self::Item>>
@@ -74,20 +89,20 @@ pub trait Source {
 pub trait DynSource {
     type Item;
 
-    fn iter(&self) -> Box<dyn SeekableIter<Item = Self::Item> + '_>;
+    fn iter(&self) -> Box<dyn Iter<Item = Self::Item> + '_>;
 }
 
 impl<S: Source + ?Sized> DynSource for Box<S> {
     type Item = S::Item;
 
-    fn iter(&self) -> Box<dyn SeekableIter<Item = Self::Item> + '_> {
+    fn iter(&self) -> Box<dyn Iter<Item = Self::Item> + '_> {
         Box::new(self.as_ref().iter())
     }
 }
 
 impl<D: DynSource + ?Sized> Source for D {
     type Item = D::Item;
-    fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> {
         DynSource::iter(self)
     }
 }
@@ -97,7 +112,7 @@ pub struct VecIter<'a> {
     vec: &'a Vec<Entry>,
 }
 
-impl Iterator for VecIter<'_> {
+impl Iter for VecIter<'_> {
     type Item = Entry;
     fn next(&mut self) -> Option<Self::Item> {
         if self.index > self.vec.len() {
@@ -108,9 +123,7 @@ impl Iterator for VecIter<'_> {
             Some(e)
         }
     }
-}
 
-impl SeekableIter for VecIter<'_> {
     fn seek(&mut self, key: &[u8]) {
         let mut left = 0;
         let mut right = self.vec.len() - 1;
@@ -128,7 +141,7 @@ impl SeekableIter for VecIter<'_> {
 
 impl Source for Vec<Entry> {
     type Item = Entry;
-    fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> {
         VecIter {
             index: 0,
             vec: self,
@@ -138,7 +151,7 @@ impl Source for Vec<Entry> {
 
 pub mod test_source {
     use crate::Entry;
-    use crate::SeekableIter;
+    use crate::Iter;
     use crate::Source;
 
     pub struct TestSource(pub Vec<Entry>);
@@ -148,7 +161,7 @@ pub mod test_source {
         off: usize,
     }
 
-    impl Iterator for TestIter<'_> {
+    impl Iter for TestIter<'_> {
         type Item = Entry;
 
         fn next(&mut self) -> Option<Self::Item> {
@@ -160,9 +173,7 @@ pub mod test_source {
             self.off += 1;
             Some(item.clone())
         }
-    }
 
-    impl SeekableIter for TestIter<'_> {
         fn seek(&mut self, key: &[u8]) {
             self.off = 0;
             while self.off < self.source.0.len() && self.source.0[self.off].key() < key {
@@ -171,9 +182,17 @@ pub mod test_source {
         }
     }
 
+    impl IntoIterator for TestIter<'_> {
+        type Item = Entry;
+        type IntoIter = crate::iter::IntoIter<Self>;
+        fn into_iter(self) -> Self::IntoIter {
+            crate::iter::IntoIter(self)
+        }
+    }
+
     impl Source for TestSource {
         type Item = Entry;
-        fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
+        fn iter(&self) -> impl Iter<Item = Self::Item> {
             TestIter {
                 source: self,
                 off: 0,
@@ -186,6 +205,8 @@ pub mod test {
     use super::test_source::TestSource;
     #[allow(unused_imports)]
     use super::Source;
+    #[allow(unused_imports)]
+    use crate::iter::Iter;
     use crate::Entry;
 
     #[allow(dead_code)]