]> git.mikk.net Git - mtbl-rs/commitdiff
Rearranged source code to be more granular
authorChris Mikkelson <chris@mikk.net>
Thu, 17 Apr 2025 21:44:46 +0000 (16:44 -0500)
committerChris Mikkelson <chris@mikk.net>
Thu, 17 Apr 2025 21:44:46 +0000 (16:44 -0500)
Also made a bunch of other changes

14 files changed:
merger.rs [new file with mode: 0644]
src/bin/mtbl_dump.rs
src/iter/dupmerge.rs
src/iter/dupsort.rs
src/iter/filter.rs
src/iter/map.rs
src/iter/merger.rs
src/iter/mod.rs
src/merger.rs
src/reader/mod.rs
src/sorter.rs
src/source/adapters.rs
src/source/mod.rs
tests/rwtest.rs

diff --git a/merger.rs b/merger.rs
new file mode 100644 (file)
index 0000000..6194754
--- /dev/null
+++ b/merger.rs
@@ -0,0 +1,211 @@
+use crate::iter::dupmerge::Mergeable;
+
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+
+pub struct Merger<S> {
+    sources: Vec<S>,
+}
+
+pub trait Mergeable: Ord + PartialOrd<[u8]> + AsRef<[u8]> + Clone {}
+
+impl<S, I> From<I> for Merger<S>
+where
+    I: IntoIterator<Item = S>,
+    S: Source,
+    S::Item: Mergeable,
+{
+    fn from(i: I) -> Self {
+        Merger {
+            sources: Vec::from_iter(i),
+        }
+    }
+}
+
+struct MergeEntry<I: Iter> {
+    e: I::Item,
+    it: I,
+}
+
+impl<I: Iter> PartialEq for MergeEntry<I>
+where
+    I::Item: PartialEq,
+{
+    fn eq(&self, other: &Self) -> bool {
+        self.e == other.e
+    }
+}
+
+impl<I: Iter> Eq for MergeEntry<I> where I::Item: PartialEq {}
+
+// Note: MergEntry Ord, PartialOrd is reversed to provide "min heap" semantics
+impl<I: Iter> PartialOrd for MergeEntry<I>
+where
+    I::Item: PartialOrd,
+{
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        other.e.partial_cmp(&self.e)
+    }
+}
+
+impl<I: Iter> Ord for MergeEntry<I>
+where
+    I::Item: Ord,
+{
+    fn cmp(&self, other: &Self) -> Ordering {
+        other.e.cmp(&self.e)
+    }
+}
+
+pub struct MergeIter<I: Iter> {
+    heap: BinaryHeap<MergeEntry<I>>,
+    finished: Vec<I>,
+    last_key: Vec<u8>,
+}
+
+impl<I> From<I> for MergeIter<I::Item>
+where
+    I: Iterator,
+    I::Item: Iter,
+    <I::Item as Iter>::Item: Mergeable,
+{
+    fn from(iters: I) -> Self {
+        let mut v: Vec<I::Item> = Vec::new();
+        let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() {
+            Some(e) => Some(MergeEntry { e, it }),
+            None => {
+                v.push(it);
+                None
+            }
+        }));
+        MergeIter {
+            finished: v,
+            heap: h,
+            last_key: Vec::new(),
+        }
+    }
+}
+
+impl<I: Iter> Iter for MergeIter<I>
+where
+    I::Item: Mergeable,
+{
+    type Item = I::Item;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let cur;
+        {
+            let mut next = self.heap.peek_mut()?;
+
+            cur = next.e.clone();
+            self.last_key.clear();
+            self.last_key.extend(cur.as_ref());
+
+            if let Some(e) = next.it.next() {
+                next.e = e;
+                return Some(cur);
+            }
+        }
+        self.heap.pop();
+        Some(cur)
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        if key >= self.last_key.as_ref() {
+            loop {
+                match self.heap.peek_mut() {
+                    None => return,
+                    Some(mut head) => {
+                        if &head.e >= key {
+                            return;
+                        }
+                        head.it.seek(key);
+                        if let Some(e) = head.it.next() {
+                            head.e = e;
+                            continue;
+                        }
+                    }
+                }
+                self.heap.pop();
+            }
+        }
+
+        // backwards seek; reset heap
+        let mut finished: Vec<I> = Vec::new();
+        let mut heap_entries: Vec<MergeEntry<I>> = Vec::new();
+        for mut it in self
+            .heap
+            .drain()
+            .map(|me| me.it)
+            .chain(self.finished.drain(..))
+        {
+            it.seek(key);
+            match it.next() {
+                Some(e) => heap_entries.push(MergeEntry { e, it }),
+                None => finished.push(it),
+            }
+        }
+        self.heap = BinaryHeap::from_iter(heap_entries);
+        self.finished = finished;
+        self.last_key.clear();
+        self.last_key.extend(key);
+    }
+}
+
+impl<S> Source for Merger<S>
+where
+    S: Source,
+    S::Item: Mergeable,
+{
+    type Item = S::Item;
+    fn iter(&self) -> impl Iter<Item = Self::Item> {
+        MergeIter::from(self.sources.iter().map(|s| s.iter()))
+    }
+}
+
+#[cfg(disabled)]
+#[cfg(test)]
+mod test {
+    use super::Merger;
+    use crate::source::test_source::TestSource;
+    use crate::Entry;
+    use crate::Source;
+
+    fn tnum(m: u8) -> Vec<u8> {
+        Vec::from_iter((1u8..255).filter(|i| i % m == 0))
+    }
+
+    fn test_source(m: u8) -> TestSource {
+        TestSource(Vec::from_iter(
+            tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])),
+        ))
+    }
+
+    #[test]
+    fn test_merge() {
+        let range = 1..8;
+        let iters: Vec<_> = range
+            .clone()
+            .map(test_source)
+            .map(|s| s.into_boxed())
+            .collect();
+        let s = Merger::from(iters);
+        let mut v = Vec::<u8>::new();
+        for i in range {
+            v.extend(tnum(i))
+        }
+        v.sort();
+        let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0]));
+        assert_eq!(v2, v);
+    }
+
+    #[test]
+    fn test_binheap() {
+        use std::collections::BinaryHeap;
+
+        let v: Vec<u8> = vec![1, 8, 2, 9, 4, 7, 3];
+        let vs = v.as_slice();
+        let h = BinaryHeap::from_iter(vs.iter().copied());
+        assert_ne!(h.into_vec(), v);
+    }
+}
index cfeeb6fb4250e354ba9cffff6bc84d69dc7204b5..c1de7244bd2920c6d71e88f6db5fcb88fa9c01c7 100644 (file)
@@ -5,7 +5,7 @@ fn main() {
         .nth(1)
         .expect("Usage: mtbl_dump <filename>");
     let reader = mtbl::reader::from_file(fname);
-    for e in reader.iter().into_iter() {
+    for e in reader.iter() {
         println!("{:?}: {:?}", e.key(), e.value());
     }
 }
index 7a6a3ce0f4ccb1fb8664562f755774e911ea8740..2508a6ac408c915a939256764cde76b59a6921a1 100644 (file)
@@ -7,7 +7,6 @@ where
     F: FnMut(&mut I::Item, &I::Item),
 {
     prev: Option<I::Item>,
-    cur: Option<I::Item>,
     iter: I,
     merge_func: F,
 }
@@ -21,7 +20,6 @@ where
     pub fn new(iter: I, merge_func: F) -> Self {
         Self {
             prev: None,
-            cur: None,
             iter,
             merge_func,
         }
@@ -69,10 +67,11 @@ where
     }
 }
 
-#[cfg(disable)]
 #[test]
 fn test_merge_func() {
     use crate::source::test_source::TestSource;
+    use crate::Entry;
+    use crate::Source;
 
     let ts = TestSource(
         (1u8..8)
@@ -81,8 +80,8 @@ fn test_merge_func() {
             .collect(),
     );
 
-    let s = ts.merge_func(|v, e| {
-        v[0] += e.value()[0];
+    let s = ts.dup_merge(|v, e| {
+        v.value_mut()[0] += e.value()[0];
     });
 
     assert_eq!(
index 04dd4b71c0eded5252facb2b0b35655f19ac38ef..07e067fe7cf430ed0c05682108ba311278650e44 100644 (file)
@@ -76,11 +76,11 @@ where
     }
 }
 
-#[cfg(disable)]
 #[test]
 fn test_dupsort() {
     use crate::source::test_source::TestSource;
     use crate::Entry;
+    use crate::Source;
 
     let ts = TestSource(
         (1u8..10)
@@ -88,7 +88,7 @@ fn test_dupsort() {
             .collect(),
     );
 
-    let s = ts.dupsort_func(|a, b| a.value()[0].cmp(&b.value()[0]));
+    let s = ts.dupsort(|a, b| a.value()[0].cmp(&b.value()[0]));
 
     assert_eq!(
         Vec::from_iter(s.iter()),
index b069d71b8c5763ac0772d86fc26ba9023ecd0892..4ce3bf60acaeb98359e7dbe573a496c107d32e21 100644 (file)
@@ -1,4 +1,4 @@
-use crate::Iter;
+use crate::iter::{IntoIter, Iter};
 
 pub struct FilterIter<I, F> {
     iter: I,
@@ -46,6 +46,18 @@ where
     }
 }
 
+impl<I, F> IntoIterator for FilterIter<I, F>
+where
+    I: Iter,
+    F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
+{
+    type Item = I::Item;
+    type IntoIter = IntoIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        IntoIter(self)
+    }
+}
+
 #[test]
 fn test_filter() {
     use crate::source::test_source::TestSource;
index 628467f360e90907cf9c22266036755e0c3537c7..e2d336ee1377f470aa76a7d6594a7f1b08664e1d 100644 (file)
@@ -1,7 +1,4 @@
-use crate::{
-    iter::{IntoIter, Iter},
-    Source,
-};
+use crate::iter::{IntoIter, Iter};
 
 pub struct MapIter<I, F, O>
 where
@@ -52,40 +49,6 @@ where
     }
 }
 
-pub struct MapSource<S, O>
-where
-    S: Source,
-{
-    source: S,
-    #[allow(clippy::type_complexity)]
-    map: Box<dyn Fn(S::Item) -> O>,
-}
-
-impl<S, O> MapSource<S, O>
-where
-    S: Source,
-{
-    pub fn new<F>(source: S, map: F) -> Self
-    where
-        F: Fn(S::Item) -> O + 'static,
-    {
-        Self {
-            source,
-            map: Box::new(map),
-        }
-    }
-}
-
-impl<S, O> Source for MapSource<S, O>
-where
-    S: Source,
-{
-    type Item = O;
-    fn iter(&self) -> impl Iter<Item = Self::Item> {
-        self.source.iter().map(&self.map)
-    }
-}
-
 #[cfg(test)]
 mod test {
     use crate::source::test_source::TestSource;
index 744991cf03993df82629155e141030af51a8bf5f..aa8d4d9cc8a6cd2bf5ee9d992d959a5e3f45a7d0 100644 (file)
@@ -1,4 +1,4 @@
-use crate::Iter;
+use crate::iter::{IntoIter, Iter};
 use std::cmp::Ordering;
 use std::collections::BinaryHeap;
 
@@ -134,13 +134,23 @@ where
     }
 }
 
-#[cfg(disabled)]
+impl<I: Iter> IntoIterator for MergeIter<I>
+where
+    I::Item: Mergeable,
+{
+    type Item = I::Item;
+    type IntoIter = IntoIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        IntoIter(self)
+    }
+}
+
 #[cfg(test)]
 mod test {
-    use super::Merger;
     use crate::source::test_source::TestSource;
     use crate::Entry;
-    use crate::Source;
+    use crate::Merger;
+    use crate::{Iter, Source};
 
     fn tnum(m: u8) -> Vec<u8> {
         Vec::from_iter((1u8..255).filter(|i| i % m == 0))
index 69123c9b74f5dcce8cc8a58527a121c812e4d91c..af0338a3e9a99b826d0cf51283e4bb039865b2bf 100644 (file)
@@ -73,14 +73,27 @@ impl<I: Iter + ?Sized> Iter for Box<I> {
     }
 }
 
-pub struct IntoIter<I: Iter>(pub I);
+pub struct BoxedIter<'a, T>(pub Box<dyn Iter<Item = T> + 'a>);
+impl<'a, T> Iter for BoxedIter<'a, T> {
+    type Item = T;
+    fn next(&mut self) -> Option<Self::Item> {
+        self.0.as_mut().next()
+    }
+    fn seek(&mut self, key: &[u8]) {
+        self.0.as_mut().seek(key)
+    }
+}
 
-impl<I: Iter> IntoIter<I> {
-    fn new(i: I) -> Self {
-        Self(i)
+impl<'a, T> IntoIterator for BoxedIter<'a, T> {
+    type Item = T;
+    type IntoIter = IntoIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        IntoIter(self)
     }
 }
 
+pub struct IntoIter<I: Iter>(pub I);
+
 impl<I> Iterator for IntoIter<I>
 where
     I: Iter,
index e6ea6c6148c04605c9fb473a3d7d7b2f198a038c..4668b3618c6cc220e0eb5b5d03d4db01c3f232de 100644 (file)
@@ -8,6 +8,14 @@ pub struct Merger<S> {
     sources: Vec<S>,
 }
 
+impl<S> Default for Merger<S> {
+    fn default() -> Self {
+        Self {
+            sources: Vec::new(),
+        }
+    }
+}
+
 impl<S> Merger<S> {
     pub fn new() -> Self {
         Self {
@@ -39,18 +47,17 @@ where
     S::Item: Mergeable,
 {
     type Item = S::Item;
-    fn iter(&self) -> impl Iter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
         MergeIter::from(self.sources.iter().map(|s| s.iter()))
     }
 }
 
-#[cfg(disabled)]
 #[cfg(test)]
 mod test {
     use super::Merger;
     use crate::source::test_source::TestSource;
     use crate::Entry;
-    use crate::Source;
+    use crate::{Iter, Source};
 
     fn tnum(m: u8) -> Vec<u8> {
         Vec::from_iter((1u8..255).filter(|i| i % m == 0))
index f5d8483acbf0b2f07e5dccf4d00228d7807d8fc2..f749f223a7dfd659fa2fdca2b4cb71ea2f7f67c5 100644 (file)
@@ -186,7 +186,7 @@ impl<D: AsRef<[u8]>> IntoIterator for ReaderIter<D> {
 impl<D: AsRef<[u8]>> Source for Reader<D> {
     type Item = Entry;
 
-    fn iter(&self) -> impl Iter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
         ReaderIter {
             reader: self.clone(),
             next_offset: 0,
index d16445239ce0a2af768b8cf4bab3e024ff63f1b5..4bed5ab2b2d68b0fd27e99db6d48e55b3c095629 100644 (file)
@@ -1,3 +1,4 @@
+use crate::source::DynSource;
 use crate::{Entry, Iter, Merger, Reader, Source, Writer};
 use memmap::Mmap;
 use std::cell::Cell;
@@ -35,16 +36,18 @@ where
         self.batch_size += esize;
     }
 
-    pub fn source(mut self) -> impl Source<Item = Entry> {
-        // DupmergeSource<Merger<Reader<Mmap>>> {
+    pub fn source(mut self) -> Box<dyn DynSource<Item = Entry>> {
         if !self.batch.get_mut().is_empty() {
             self.write_chunk();
         }
-        Merger::from(self.readers).dup_merge(self.merge_func)
+        Merger::from(self.readers)
+            .dup_merge(self.merge_func)
+            .into_boxed()
     }
 
     pub fn write<W: std::io::Write>(self, mut w: Writer<W>) {
         self.source()
+            .as_ref() // XXX - need to further wrap Box<dyn DynSource> in  BoxedSource
             .iter()
             .into_iter()
             .for_each(|e| w.add(e).unwrap());
index 4eaadbca972c342feaa882e19d42729862c3e9c3..ac34be16812df611c333d437d60f4aec9b5ed65c 100644 (file)
@@ -13,7 +13,7 @@ where
     F: Fn(&mut S::Item, &S::Item),
 {
     type Item = S::Item;
-    fn iter(&self) -> impl Iter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
         self.source.iter().dup_merge(&self.merge)
     }
 }
@@ -30,7 +30,7 @@ where
     F: Fn(&S::Item, &S::Item) -> Ordering,
 {
     type Item = S::Item;
-    fn iter(&self) -> impl Iter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
         self.source.iter().dup_sort(&self.dupsort)
     }
 }
@@ -46,7 +46,7 @@ where
     F: Fn(&S::Item, &mut Vec<u8>) -> bool,
 {
     type Item = S::Item;
-    fn iter(&self) -> impl Iter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
         self.source.iter().filter(&self.filter)
     }
 }
@@ -66,7 +66,7 @@ where
     F: Fn(S::Item) -> O,
 {
     type Item = O;
-    fn iter(&self) -> impl Iter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
         self.source.iter().map(&self.map)
     }
 }
index 24638fc9c3fb054e4ae9193ee1e38f711eabeb72..253dda6ed29de4af2b4303bce4d9d10be2f22ae8 100644 (file)
@@ -1,4 +1,4 @@
-use crate::iter::{prefix::PrefixIter, range::RangeIter, Iter};
+use crate::iter::{prefix::PrefixIter, range::RangeIter, BoxedIter, IntoIter, Iter};
 use crate::Entry;
 use std::cmp::Ordering;
 
@@ -11,7 +11,7 @@ pub use adapters::MapSource;
 pub trait Source {
     type Item;
 
-    fn iter(&self) -> impl Iter<Item = Self::Item>;
+    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item>;
 
     fn get(&self, key: &[u8]) -> RangeIter<impl Iter<Item = Self::Item>>
     where
@@ -89,20 +89,20 @@ pub trait Source {
 pub trait DynSource {
     type Item;
 
-    fn iter(&self) -> Box<dyn Iter<Item = Self::Item> + '_>;
+    fn iter(&self) -> BoxedIter<'_, Self::Item>;
 }
 
 impl<S: Source + ?Sized> DynSource for Box<S> {
     type Item = S::Item;
 
-    fn iter(&self) -> Box<dyn Iter<Item = Self::Item> + '_> {
-        Box::new(self.as_ref().iter())
+    fn iter(&self) -> BoxedIter<'_, Self::Item> {
+        BoxedIter(Box::new(self.as_ref().iter()))
     }
 }
 
 impl<D: DynSource + ?Sized> Source for D {
     type Item = D::Item;
-    fn iter(&self) -> impl Iter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
         DynSource::iter(self)
     }
 }
@@ -139,9 +139,17 @@ impl Iter for VecIter<'_> {
     }
 }
 
+impl<'a> IntoIterator for VecIter<'a> {
+    type Item = Entry;
+    type IntoIter = IntoIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        IntoIter(self)
+    }
+}
+
 impl Source for Vec<Entry> {
     type Item = Entry;
-    fn iter(&self) -> impl Iter<Item = Self::Item> {
+    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
         VecIter {
             index: 0,
             vec: self,
@@ -192,7 +200,7 @@ pub mod test_source {
 
     impl Source for TestSource {
         type Item = Entry;
-        fn iter(&self) -> impl Iter<Item = Self::Item> {
+        fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
             TestIter {
                 source: self,
                 off: 0,
index efc808b366f6b9ecddff894b365b63fd607643d5..d2c6328b9808afcb65654e38f69a38e6cd1b474f 100644 (file)
@@ -1,6 +1,5 @@
 use mtbl::{Entry, Reader, Source, Writer};
 
-//#[test]
 #[test]
 fn test_write_readback() {
     let mut store = Vec::<u8>::new();
@@ -17,14 +16,14 @@ fn test_write_readback() {
     assert!(store.len() > 512);
     let r = Reader::new(store);
     let ri = r.iter();
-    assert_eq!(ri.collect::<Vec<_>>(), reference);
+    assert_eq!(ri.into_iter().collect::<Vec<_>>(), reference);
 
     // test range
     let start = u32::to_be_bytes(192);
     let end = u32::to_be_bytes(256);
     let rangei = r.get_range(&start, &end);
     assert_eq!(
-        rangei.collect::<Vec<_>>(),
+        rangei.into_iter().collect::<Vec<_>>(),
         reference
             .into_iter()
             .filter(|e| e.key() >= &u32::to_be_bytes(192) && e.key() <= &u32::to_be_bytes(256))