From e407b43c573b8d644a2139ba7878058e77540b66 Mon Sep 17 00:00:00 2001 From: Chris Mikkelson Date: Thu, 17 Apr 2025 16:44:46 -0500 Subject: [PATCH] Rearranged source code to be more granular Also made a bunch of other changes --- merger.rs | 211 +++++++++++++++++++++++++++++++++++++++++ src/bin/mtbl_dump.rs | 2 +- src/iter/dupmerge.rs | 9 +- src/iter/dupsort.rs | 4 +- src/iter/filter.rs | 14 ++- src/iter/map.rs | 39 +------- src/iter/merger.rs | 18 +++- src/iter/mod.rs | 21 +++- src/merger.rs | 13 ++- src/reader/mod.rs | 2 +- src/sorter.rs | 9 +- src/source/adapters.rs | 8 +- src/source/mod.rs | 24 +++-- tests/rwtest.rs | 5 +- 14 files changed, 302 insertions(+), 77 deletions(-) create mode 100644 merger.rs diff --git a/merger.rs b/merger.rs new file mode 100644 index 0000000..6194754 --- /dev/null +++ b/merger.rs @@ -0,0 +1,211 @@ +use crate::iter::dupmerge::Mergeable; + +use std::cmp::Ordering; +use std::collections::BinaryHeap; + +pub struct Merger { + sources: Vec, +} + +pub trait Mergeable: Ord + PartialOrd<[u8]> + AsRef<[u8]> + Clone {} + +impl From for Merger +where + I: IntoIterator, + S: Source, + S::Item: Mergeable, +{ + fn from(i: I) -> Self { + Merger { + sources: Vec::from_iter(i), + } + } +} + +struct MergeEntry { + e: I::Item, + it: I, +} + +impl PartialEq for MergeEntry +where + I::Item: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + self.e == other.e + } +} + +impl Eq for MergeEntry where I::Item: PartialEq {} + +// Note: MergEntry Ord, PartialOrd is reversed to provide "min heap" semantics +impl PartialOrd for MergeEntry +where + I::Item: PartialOrd, +{ + fn partial_cmp(&self, other: &Self) -> Option { + other.e.partial_cmp(&self.e) + } +} + +impl Ord for MergeEntry +where + I::Item: Ord, +{ + fn cmp(&self, other: &Self) -> Ordering { + other.e.cmp(&self.e) + } +} + +pub struct MergeIter { + heap: BinaryHeap>, + finished: Vec, + last_key: Vec, +} + +impl From for MergeIter +where + I: Iterator, + I::Item: Iter, + ::Item: Mergeable, +{ + fn from(iters: I) -> Self { + let mut v: Vec = Vec::new(); + let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() { + Some(e) => Some(MergeEntry { e, it }), + None => { + v.push(it); + None + } + })); + MergeIter { + finished: v, + heap: h, + last_key: Vec::new(), + } + } +} + +impl Iter for MergeIter +where + I::Item: Mergeable, +{ + type Item = I::Item; + + fn next(&mut self) -> Option { + let cur; + { + let mut next = self.heap.peek_mut()?; + + cur = next.e.clone(); + self.last_key.clear(); + self.last_key.extend(cur.as_ref()); + + if let Some(e) = next.it.next() { + next.e = e; + return Some(cur); + } + } + self.heap.pop(); + Some(cur) + } + + fn seek(&mut self, key: &[u8]) { + if key >= self.last_key.as_ref() { + loop { + match self.heap.peek_mut() { + None => return, + Some(mut head) => { + if &head.e >= key { + return; + } + head.it.seek(key); + if let Some(e) = head.it.next() { + head.e = e; + continue; + } + } + } + self.heap.pop(); + } + } + + // backwards seek; reset heap + let mut finished: Vec = Vec::new(); + let mut heap_entries: Vec> = Vec::new(); + for mut it in self + .heap + .drain() + .map(|me| me.it) + .chain(self.finished.drain(..)) + { + it.seek(key); + match it.next() { + Some(e) => heap_entries.push(MergeEntry { e, it }), + None => finished.push(it), + } + } + self.heap = BinaryHeap::from_iter(heap_entries); + self.finished = finished; + self.last_key.clear(); + self.last_key.extend(key); + } +} + +impl Source for Merger +where + S: Source, + S::Item: Mergeable, +{ + type Item = S::Item; + fn iter(&self) -> impl Iter { + MergeIter::from(self.sources.iter().map(|s| s.iter())) + } +} + +#[cfg(disabled)] +#[cfg(test)] +mod test { + use super::Merger; + use crate::source::test_source::TestSource; + use crate::Entry; + use crate::Source; + + fn tnum(m: u8) -> Vec { + Vec::from_iter((1u8..255).filter(|i| i % m == 0)) + } + + fn test_source(m: u8) -> TestSource { + TestSource(Vec::from_iter( + tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])), + )) + } + + #[test] + fn test_merge() { + let range = 1..8; + let iters: Vec<_> = range + .clone() + .map(test_source) + .map(|s| s.into_boxed()) + .collect(); + let s = Merger::from(iters); + let mut v = Vec::::new(); + for i in range { + v.extend(tnum(i)) + } + v.sort(); + let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0])); + assert_eq!(v2, v); + } + + #[test] + fn test_binheap() { + use std::collections::BinaryHeap; + + let v: Vec = vec![1, 8, 2, 9, 4, 7, 3]; + let vs = v.as_slice(); + let h = BinaryHeap::from_iter(vs.iter().copied()); + assert_ne!(h.into_vec(), v); + } +} diff --git a/src/bin/mtbl_dump.rs b/src/bin/mtbl_dump.rs index cfeeb6f..c1de724 100644 --- a/src/bin/mtbl_dump.rs +++ b/src/bin/mtbl_dump.rs @@ -5,7 +5,7 @@ fn main() { .nth(1) .expect("Usage: mtbl_dump "); let reader = mtbl::reader::from_file(fname); - for e in reader.iter().into_iter() { + for e in reader.iter() { println!("{:?}: {:?}", e.key(), e.value()); } } diff --git a/src/iter/dupmerge.rs b/src/iter/dupmerge.rs index 7a6a3ce..2508a6a 100644 --- a/src/iter/dupmerge.rs +++ b/src/iter/dupmerge.rs @@ -7,7 +7,6 @@ where F: FnMut(&mut I::Item, &I::Item), { prev: Option, - cur: Option, iter: I, merge_func: F, } @@ -21,7 +20,6 @@ where pub fn new(iter: I, merge_func: F) -> Self { Self { prev: None, - cur: None, iter, merge_func, } @@ -69,10 +67,11 @@ where } } -#[cfg(disable)] #[test] fn test_merge_func() { use crate::source::test_source::TestSource; + use crate::Entry; + use crate::Source; let ts = TestSource( (1u8..8) @@ -81,8 +80,8 @@ fn test_merge_func() { .collect(), ); - let s = ts.merge_func(|v, e| { - v[0] += e.value()[0]; + let s = ts.dup_merge(|v, e| { + v.value_mut()[0] += e.value()[0]; }); assert_eq!( diff --git a/src/iter/dupsort.rs b/src/iter/dupsort.rs index 04dd4b7..07e067f 100644 --- a/src/iter/dupsort.rs +++ b/src/iter/dupsort.rs @@ -76,11 +76,11 @@ where } } -#[cfg(disable)] #[test] fn test_dupsort() { use crate::source::test_source::TestSource; use crate::Entry; + use crate::Source; let ts = TestSource( (1u8..10) @@ -88,7 +88,7 @@ fn test_dupsort() { .collect(), ); - let s = ts.dupsort_func(|a, b| a.value()[0].cmp(&b.value()[0])); + let s = ts.dupsort(|a, b| a.value()[0].cmp(&b.value()[0])); assert_eq!( Vec::from_iter(s.iter()), diff --git a/src/iter/filter.rs b/src/iter/filter.rs index b069d71..4ce3bf6 100644 --- a/src/iter/filter.rs +++ b/src/iter/filter.rs @@ -1,4 +1,4 @@ -use crate::Iter; +use crate::iter::{IntoIter, Iter}; pub struct FilterIter { iter: I, @@ -46,6 +46,18 @@ where } } +impl IntoIterator for FilterIter +where + I: Iter, + F: FnMut(&I::Item, &mut Vec) -> bool, +{ + type Item = I::Item; + type IntoIter = IntoIter; + fn into_iter(self) -> Self::IntoIter { + IntoIter(self) + } +} + #[test] fn test_filter() { use crate::source::test_source::TestSource; diff --git a/src/iter/map.rs b/src/iter/map.rs index 628467f..e2d336e 100644 --- a/src/iter/map.rs +++ b/src/iter/map.rs @@ -1,7 +1,4 @@ -use crate::{ - iter::{IntoIter, Iter}, - Source, -}; +use crate::iter::{IntoIter, Iter}; pub struct MapIter where @@ -52,40 +49,6 @@ where } } -pub struct MapSource -where - S: Source, -{ - source: S, - #[allow(clippy::type_complexity)] - map: Box O>, -} - -impl MapSource -where - S: Source, -{ - pub fn new(source: S, map: F) -> Self - where - F: Fn(S::Item) -> O + 'static, - { - Self { - source, - map: Box::new(map), - } - } -} - -impl Source for MapSource -where - S: Source, -{ - type Item = O; - fn iter(&self) -> impl Iter { - self.source.iter().map(&self.map) - } -} - #[cfg(test)] mod test { use crate::source::test_source::TestSource; diff --git a/src/iter/merger.rs b/src/iter/merger.rs index 744991c..aa8d4d9 100644 --- a/src/iter/merger.rs +++ b/src/iter/merger.rs @@ -1,4 +1,4 @@ -use crate::Iter; +use crate::iter::{IntoIter, Iter}; use std::cmp::Ordering; use std::collections::BinaryHeap; @@ -134,13 +134,23 @@ where } } -#[cfg(disabled)] +impl IntoIterator for MergeIter +where + I::Item: Mergeable, +{ + type Item = I::Item; + type IntoIter = IntoIter; + fn into_iter(self) -> Self::IntoIter { + IntoIter(self) + } +} + #[cfg(test)] mod test { - use super::Merger; use crate::source::test_source::TestSource; use crate::Entry; - use crate::Source; + use crate::Merger; + use crate::{Iter, Source}; fn tnum(m: u8) -> Vec { Vec::from_iter((1u8..255).filter(|i| i % m == 0)) diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 69123c9..af0338a 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -73,14 +73,27 @@ impl Iter for Box { } } -pub struct IntoIter(pub I); +pub struct BoxedIter<'a, T>(pub Box + 'a>); +impl<'a, T> Iter for BoxedIter<'a, T> { + type Item = T; + fn next(&mut self) -> Option { + self.0.as_mut().next() + } + fn seek(&mut self, key: &[u8]) { + self.0.as_mut().seek(key) + } +} -impl IntoIter { - fn new(i: I) -> Self { - Self(i) +impl<'a, T> IntoIterator for BoxedIter<'a, T> { + type Item = T; + type IntoIter = IntoIter; + fn into_iter(self) -> Self::IntoIter { + IntoIter(self) } } +pub struct IntoIter(pub I); + impl Iterator for IntoIter where I: Iter, diff --git a/src/merger.rs b/src/merger.rs index e6ea6c6..4668b36 100644 --- a/src/merger.rs +++ b/src/merger.rs @@ -8,6 +8,14 @@ pub struct Merger { sources: Vec, } +impl Default for Merger { + fn default() -> Self { + Self { + sources: Vec::new(), + } + } +} + impl Merger { pub fn new() -> Self { Self { @@ -39,18 +47,17 @@ where S::Item: Mergeable, { type Item = S::Item; - fn iter(&self) -> impl Iter { + fn iter(&self) -> impl Iter + IntoIterator { MergeIter::from(self.sources.iter().map(|s| s.iter())) } } -#[cfg(disabled)] #[cfg(test)] mod test { use super::Merger; use crate::source::test_source::TestSource; use crate::Entry; - use crate::Source; + use crate::{Iter, Source}; fn tnum(m: u8) -> Vec { Vec::from_iter((1u8..255).filter(|i| i % m == 0)) diff --git a/src/reader/mod.rs b/src/reader/mod.rs index f5d8483..f749f22 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -186,7 +186,7 @@ impl> IntoIterator for ReaderIter { impl> Source for Reader { type Item = Entry; - fn iter(&self) -> impl Iter { + fn iter(&self) -> impl Iter + IntoIterator { ReaderIter { reader: self.clone(), next_offset: 0, diff --git a/src/sorter.rs b/src/sorter.rs index d164452..4bed5ab 100644 --- a/src/sorter.rs +++ b/src/sorter.rs @@ -1,3 +1,4 @@ +use crate::source::DynSource; use crate::{Entry, Iter, Merger, Reader, Source, Writer}; use memmap::Mmap; use std::cell::Cell; @@ -35,16 +36,18 @@ where self.batch_size += esize; } - pub fn source(mut self) -> impl Source { - // DupmergeSource>> { + pub fn source(mut self) -> Box> { if !self.batch.get_mut().is_empty() { self.write_chunk(); } - Merger::from(self.readers).dup_merge(self.merge_func) + Merger::from(self.readers) + .dup_merge(self.merge_func) + .into_boxed() } pub fn write(self, mut w: Writer) { self.source() + .as_ref() // XXX - need to further wrap Box in BoxedSource .iter() .into_iter() .for_each(|e| w.add(e).unwrap()); diff --git a/src/source/adapters.rs b/src/source/adapters.rs index 4eaadbc..ac34be1 100644 --- a/src/source/adapters.rs +++ b/src/source/adapters.rs @@ -13,7 +13,7 @@ where F: Fn(&mut S::Item, &S::Item), { type Item = S::Item; - fn iter(&self) -> impl Iter { + fn iter(&self) -> impl Iter + IntoIterator { self.source.iter().dup_merge(&self.merge) } } @@ -30,7 +30,7 @@ where F: Fn(&S::Item, &S::Item) -> Ordering, { type Item = S::Item; - fn iter(&self) -> impl Iter { + fn iter(&self) -> impl Iter + IntoIterator { self.source.iter().dup_sort(&self.dupsort) } } @@ -46,7 +46,7 @@ where F: Fn(&S::Item, &mut Vec) -> bool, { type Item = S::Item; - fn iter(&self) -> impl Iter { + fn iter(&self) -> impl Iter + IntoIterator { self.source.iter().filter(&self.filter) } } @@ -66,7 +66,7 @@ where F: Fn(S::Item) -> O, { type Item = O; - fn iter(&self) -> impl Iter { + fn iter(&self) -> impl Iter + IntoIterator { self.source.iter().map(&self.map) } } diff --git a/src/source/mod.rs b/src/source/mod.rs index 24638fc..253dda6 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -1,4 +1,4 @@ -use crate::iter::{prefix::PrefixIter, range::RangeIter, Iter}; +use crate::iter::{prefix::PrefixIter, range::RangeIter, BoxedIter, IntoIter, Iter}; use crate::Entry; use std::cmp::Ordering; @@ -11,7 +11,7 @@ pub use adapters::MapSource; pub trait Source { type Item; - fn iter(&self) -> impl Iter; + fn iter(&self) -> impl Iter + IntoIterator; fn get(&self, key: &[u8]) -> RangeIter> where @@ -89,20 +89,20 @@ pub trait Source { pub trait DynSource { type Item; - fn iter(&self) -> Box + '_>; + fn iter(&self) -> BoxedIter<'_, Self::Item>; } impl DynSource for Box { type Item = S::Item; - fn iter(&self) -> Box + '_> { - Box::new(self.as_ref().iter()) + fn iter(&self) -> BoxedIter<'_, Self::Item> { + BoxedIter(Box::new(self.as_ref().iter())) } } impl Source for D { type Item = D::Item; - fn iter(&self) -> impl Iter { + fn iter(&self) -> impl Iter + IntoIterator { DynSource::iter(self) } } @@ -139,9 +139,17 @@ impl Iter for VecIter<'_> { } } +impl<'a> IntoIterator for VecIter<'a> { + type Item = Entry; + type IntoIter = IntoIter; + fn into_iter(self) -> Self::IntoIter { + IntoIter(self) + } +} + impl Source for Vec { type Item = Entry; - fn iter(&self) -> impl Iter { + fn iter(&self) -> impl Iter + IntoIterator { VecIter { index: 0, vec: self, @@ -192,7 +200,7 @@ pub mod test_source { impl Source for TestSource { type Item = Entry; - fn iter(&self) -> impl Iter { + fn iter(&self) -> impl Iter + IntoIterator { TestIter { source: self, off: 0, diff --git a/tests/rwtest.rs b/tests/rwtest.rs index efc808b..d2c6328 100644 --- a/tests/rwtest.rs +++ b/tests/rwtest.rs @@ -1,6 +1,5 @@ use mtbl::{Entry, Reader, Source, Writer}; -//#[test] #[test] fn test_write_readback() { let mut store = Vec::::new(); @@ -17,14 +16,14 @@ fn test_write_readback() { assert!(store.len() > 512); let r = Reader::new(store); let ri = r.iter(); - assert_eq!(ri.collect::>(), reference); + assert_eq!(ri.into_iter().collect::>(), reference); // test range let start = u32::to_be_bytes(192); let end = u32::to_be_bytes(256); let rangei = r.get_range(&start, &end); assert_eq!( - rangei.collect::>(), + rangei.into_iter().collect::>(), reference .into_iter() .filter(|e| e.key() >= &u32::to_be_bytes(192) && e.key() <= &u32::to_be_bytes(256)) -- 2.50.1