From fabaf7fef3b6aaa2eb88ee3e8f5e5879403ea162 Mon Sep 17 00:00:00 2001 From: Chris Mikkelson Date: Wed, 16 Apr 2025 00:06:47 -0500 Subject: [PATCH] Refactor, again (WIP) Break up some larger files into smaller chunks, also decouple Iter and Iterator again, in favor of implementing IntoIterator for Iter implementations. Still a few remaining nits to work out -- the IntoIterator trait isn't coming into scope where I expect it, so may need to back off of `impl Iter` in favor of an associated type on Source which requires IntoIterator. --- src/bin/mtbl_dump.rs | 2 +- src/entry.rs | 20 +++- src/iter.rs | 143 ------------------------ src/iter/dupmerge.rs | 94 ++++++++++++++++ src/{ => iter}/dupsort.rs | 81 +++++--------- src/{ => iter}/filter.rs | 55 ++-------- src/{ => iter}/map.rs | 38 ++++--- src/iter/merger.rs | 182 +++++++++++++++++++++++++++++++ src/iter/mod.rs | 92 ++++++++++++++++ src/iter/prefix.rs | 55 ++++++++++ src/iter/range.rs | 54 +++++++++ src/lib.rs | 6 +- src/merge.rs | 104 ------------------ src/merger.rs | 143 ++++-------------------- src/reader/block.rs | 30 +++-- src/reader/mod.rs | 21 ++-- src/sorter.rs | 19 ++-- src/source/adapters.rs | 72 ++++++++++++ src/{source.rs => source/mod.rs} | 85 +++++++++------ 19 files changed, 750 insertions(+), 546 deletions(-) delete mode 100644 src/iter.rs create mode 100644 src/iter/dupmerge.rs rename src/{ => iter}/dupsort.rs (65%) rename src/{ => iter}/filter.rs (59%) rename src/{ => iter}/map.rs (78%) create mode 100644 src/iter/merger.rs create mode 100644 src/iter/mod.rs create mode 100644 src/iter/prefix.rs create mode 100644 src/iter/range.rs delete mode 100644 src/merge.rs create mode 100644 src/source/adapters.rs rename src/{source.rs => source/mod.rs} (70%) diff --git a/src/bin/mtbl_dump.rs b/src/bin/mtbl_dump.rs index c1de724..cfeeb6f 100644 --- a/src/bin/mtbl_dump.rs +++ b/src/bin/mtbl_dump.rs @@ -5,7 +5,7 @@ fn main() { .nth(1) .expect("Usage: mtbl_dump "); let reader = mtbl::reader::from_file(fname); - for e in reader.iter() { + for e in reader.iter().into_iter() { println!("{:?}: {:?}", e.key(), e.value()); } } diff --git a/src/entry.rs b/src/entry.rs index ce1ede2..8d990b8 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -1,7 +1,8 @@ +use crate::iter::merger::Mergeable; use std::cmp::Ordering; use std::sync::Arc; -#[derive(Debug, Clone, Eq)] +#[derive(Debug, Clone)] pub struct Entry { key: Arc>, value: Arc>, @@ -46,6 +47,8 @@ impl Entry { } } +impl Mergeable for Entry {} + impl PartialOrd<[u8]> for Entry { fn partial_cmp(&self, other: &[u8]) -> Option { Some(self.key().cmp(other)) @@ -64,8 +67,21 @@ impl AsRef<[u8]> for Entry { } } -impl PartialEq for Entry { +impl PartialEq for Entry { fn eq(&self, other: &Entry) -> bool { self.key() == other.key() } } +impl Eq for Entry {} + +impl PartialOrd for Entry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.key().cmp(other.key())) + } +} + +impl Ord for Entry { + fn cmp(&self, other: &Self) -> Ordering { + self.key().cmp(other.key()) + } +} diff --git a/src/iter.rs b/src/iter.rs deleted file mode 100644 index 67a92dc..0000000 --- a/src/iter.rs +++ /dev/null @@ -1,143 +0,0 @@ -use crate::dupsort::DupsortIter; -use crate::filter::FilterIter; -use crate::map::MapIter; -use crate::merge::MergeIter; -use crate::Entry; -use std::cmp::Ordering; -use std::iter::Iterator; - -pub trait SeekableIter: Iterator { - fn seek(&mut self, key: &[u8]); - - fn merge_func(self, merge_func: F) -> MergeIter - where - F: Fn(&mut Vec, &Entry), - Self: Sized, - { - MergeIter::new(self, merge_func) - } - - fn dupsort_func(self, dupsort_func: F) -> DupsortIter - where - F: FnMut(&Self::Item, &Self::Item) -> Ordering, - Self: Sized, - { - DupsortIter::new(self, dupsort_func) - } - - fn filter_func(self, filter_func: F) -> FilterIter - where - F: FnMut(&Self::Item, &mut Vec) -> bool, - Self: Sized, - { - FilterIter::new(self, filter_func) - } - - fn map_func(self, map_func: F) -> MapIter - where - F: FnMut(Self::Item) -> O, - Self: Sized, - { - MapIter::new(self, map_func) - } -} - -impl SeekableIter for Box { - fn seek(&mut self, key: &[u8]) { - self.as_mut().seek(key); - } -} - -pub struct PrefixIter -where - I: SeekableIter, - I::Item: AsRef<[u8]>, -{ - iter: I, - prefix: Vec, -} - -impl PrefixIter -where - I: SeekableIter, - I::Item: AsRef<[u8]>, -{ - pub fn new(mut iter: I, prefix: impl AsRef<[u8]>) -> Self { - iter.seek(prefix.as_ref()); - Self { - iter, - prefix: Vec::from(prefix.as_ref()), - } - } -} - -impl Iterator for PrefixIter -where - I: SeekableIter, - I::Item: AsRef<[u8]>, -{ - type Item = I::Item; - - fn next(&mut self) -> Option { - self.iter - .next() - .filter(|e| e.as_ref().starts_with(self.prefix.as_slice())) - } -} - -impl SeekableIter for PrefixIter -where - I: SeekableIter, - I::Item: AsRef<[u8]>, -{ - fn seek(&mut self, key: &[u8]) { - self.iter.seek(key); - } -} - -pub struct RangeIter { - iter: I, - start: Vec, - end: Vec, -} - -impl RangeIter -where - I: SeekableIter, -{ - pub fn new(mut iter: I, start: impl AsRef<[u8]>, end: impl AsRef<[u8]>) -> Self { - iter.seek(start.as_ref()); - Self { - iter, - start: Vec::from(start.as_ref()), - end: Vec::from(end.as_ref()), - } - } -} - -impl Iterator for RangeIter -where - I: SeekableIter, - I::Item: PartialOrd<[u8]>, -{ - type Item = I::Item; - fn next(&mut self) -> Option { - self.iter.next().filter(|i| i <= self.end.as_slice()) - } -} - -impl SeekableIter for RangeIter -where - I: SeekableIter, - I::Item: PartialOrd<[u8]>, -{ - fn seek(&mut self, key: &[u8]) { - if key <= self.start.as_slice() { - self.iter.seek(self.start.as_slice()); - } else if key > self.end.as_slice() { - self.iter.seek(self.end.as_slice()); - } else { - self.iter.seek(key); - } - } -} diff --git a/src/iter/dupmerge.rs b/src/iter/dupmerge.rs new file mode 100644 index 0000000..7a6a3ce --- /dev/null +++ b/src/iter/dupmerge.rs @@ -0,0 +1,94 @@ +use crate::iter::{IntoIter, Iter}; + +pub struct MergeIter +where + I: Iter, + I::Item: PartialEq, + F: FnMut(&mut I::Item, &I::Item), +{ + prev: Option, + cur: Option, + iter: I, + merge_func: F, +} + +impl MergeIter +where + I: Iter, + I::Item: PartialEq, + F: FnMut(&mut I::Item, &I::Item), +{ + pub fn new(iter: I, merge_func: F) -> Self { + Self { + prev: None, + cur: None, + iter, + merge_func, + } + } +} + +impl Iter for MergeIter +where + I: Iter, + I::Item: PartialEq, + F: FnMut(&mut I::Item, &I::Item), +{ + type Item = I::Item; + + fn next(&mut self) -> Option { + let mut cur = self.prev.take().or_else(|| self.iter.next())?; + while let Some(e) = self.iter.next() { + if cur == e { + (self.merge_func)(&mut cur, &e); + } else { + self.prev.replace(e); + break; + } + } + Some(cur) + } + + fn seek(&mut self, key: &[u8]) { + self.prev.take(); + self.iter.seek(key); + } +} + +impl IntoIterator for MergeIter +where + I: Iter, + I::Item: PartialEq, + F: FnMut(&mut I::Item, &I::Item), +{ + type Item = I::Item; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + IntoIter(self) + } +} + +#[cfg(disable)] +#[test] +fn test_merge_func() { + use crate::source::test_source::TestSource; + + let ts = TestSource( + (1u8..8) + .flat_map(|n| vec![n; n as usize]) + .map(|n| Entry::new(vec![n], vec![1])) + .collect(), + ); + + let s = ts.merge_func(|v, e| { + v[0] += e.value()[0]; + }); + + assert_eq!( + Vec::from_iter(s.iter()), + (1u8..8) + .map(|n| Entry::new(vec![n], vec![n])) + .collect::>() + ) +} diff --git a/src/dupsort.rs b/src/iter/dupsort.rs similarity index 65% rename from src/dupsort.rs rename to src/iter/dupsort.rs index 3c05b60..04dd4b7 100644 --- a/src/dupsort.rs +++ b/src/iter/dupsort.rs @@ -1,45 +1,10 @@ -use crate::{SeekableIter, Source}; +use crate::iter::{IntoIter, Iter}; use std::cmp::Ordering; -pub struct DupsortSource -where - S: Source, -{ - source: S, - #[allow(clippy::type_complexity)] - dupsort_func: Box Ordering>, -} - -impl DupsortSource -where - S: Source, -{ - pub fn new(source: S, dupsort_func: F) -> Self - where - F: Fn(&S::Item, &S::Item) -> Ordering + 'static, - { - Self { - source, - dupsort_func: Box::new(dupsort_func), - } - } -} - -impl Source for DupsortSource -where - S: Source, - S::Item: PartialEq, -{ - type Item = S::Item; - fn iter(&self) -> impl SeekableIter { - self.source.iter().dupsort_func(&self.dupsort_func) - } -} - #[derive(Debug)] pub struct DupsortIter where - I: SeekableIter, + I: Iter, F: FnMut(&I::Item, &I::Item) -> Ordering, { run: Vec, @@ -50,7 +15,7 @@ where impl DupsortIter where - I: SeekableIter, + I: Iter, F: FnMut(&I::Item, &I::Item) -> Ordering, { pub fn new(iter: I, dupsort_func: F) -> Self { @@ -63,21 +28,39 @@ where } } -impl Iterator for DupsortIter +impl IntoIterator for DupsortIter where - I: SeekableIter, + I: Iter, I::Item: PartialEq, F: Fn(&I::Item, &I::Item) -> Ordering, { type Item = I::Item; + type IntoIter = IntoIter; + fn into_iter(self) -> Self::IntoIter { + IntoIter(self) + } +} + +impl Iter for DupsortIter +where + I: Iter, + I::Item: PartialEq, + F: Fn(&I::Item, &I::Item) -> Ordering, +{ + type Item = I::Item; + + fn seek(&mut self, key: &[u8]) { + self.run.clear(); + self.next.take(); + self.iter.seek(key); + } fn next(&mut self) -> Option { self.run.pop().or_else(|| { self.run .push(self.next.take().or_else(|| self.iter.next())?); - // println!("2: {:?} / {:?}", self.next, self.run); - for e in &mut self.iter { + while let Some(e) = self.iter.next() { if e == self.run[0] { self.run.push(e); continue; @@ -93,19 +76,7 @@ where } } -impl SeekableIter for DupsortIter -where - I: SeekableIter, - I::Item: PartialEq, - F: Fn(&I::Item, &I::Item) -> Ordering, -{ - fn seek(&mut self, key: &[u8]) { - self.run.clear(); - self.next.take(); - self.iter.seek(key); - } -} - +#[cfg(disable)] #[test] fn test_dupsort() { use crate::source::test_source::TestSource; diff --git a/src/filter.rs b/src/iter/filter.rs similarity index 59% rename from src/filter.rs rename to src/iter/filter.rs index 9d224f6..b069d71 100644 --- a/src/filter.rs +++ b/src/iter/filter.rs @@ -1,4 +1,4 @@ -use crate::{SeekableIter, Source}; +use crate::Iter; pub struct FilterIter { iter: I, @@ -8,7 +8,7 @@ pub struct FilterIter { impl FilterIter where - I: SeekableIter, + I: Iter, F: FnMut(&I::Item, &mut Vec) -> bool, { pub fn new(iter: I, filter: F) -> Self { @@ -21,9 +21,9 @@ where } } -impl Iterator for FilterIter +impl Iter for FilterIter where - I: SeekableIter, + I: Iter, F: FnMut(&I::Item, &mut Vec) -> bool, { type Item = I::Item; @@ -40,56 +40,18 @@ where } } } -} -impl SeekableIter for FilterIter -where - I: SeekableIter, - F: FnMut(&I::Item, &mut Vec) -> bool, -{ fn seek(&mut self, key: &[u8]) { self.iter.seek(key); } } -pub struct FilterSource -where - S: Source, -{ - source: S, - #[allow(clippy::type_complexity)] - filter_func: Box) -> bool>, -} - -impl FilterSource -where - S: Source, -{ - pub fn new(source: S, filter_func: F) -> Self - where - F: Fn(&S::Item, &mut Vec) -> bool + 'static, - { - Self { - source, - filter_func: Box::new(filter_func), - } - } -} - -impl Source for FilterSource -where - S: Source, -{ - type Item = S::Item; - fn iter(&self) -> impl SeekableIter { - self.source.iter().filter_func(&self.filter_func) - } -} - #[test] fn test_filter() { use crate::source::test_source::TestSource; + use crate::source::Source; use crate::Entry; + use std::iter::IntoIterator; let ts = TestSource((0u8..10).map(|n| Entry::new(vec![n], vec![])).collect()).filter(|e, sv| { @@ -106,6 +68,9 @@ fn test_filter() { assert_eq!( vec![0, 2, 8], - ts.iter().map(|e| e.key()[0]).collect::>() + ts.map(|e| e.key()[0]) + .iter() + .into_iter() + .collect::>() ); } diff --git a/src/map.rs b/src/iter/map.rs similarity index 78% rename from src/map.rs rename to src/iter/map.rs index a2f746b..628467f 100644 --- a/src/map.rs +++ b/src/iter/map.rs @@ -1,8 +1,11 @@ -use crate::{SeekableIter, Source}; +use crate::{ + iter::{IntoIter, Iter}, + Source, +}; pub struct MapIter where - I: SeekableIter, + I: Iter, F: FnMut(I::Item) -> O, { iter: I, @@ -11,7 +14,7 @@ where impl MapIter where - I: SeekableIter, + I: Iter, F: FnMut(I::Item) -> O, { pub fn new(iter: I, mapf: F) -> Self { @@ -19,9 +22,9 @@ where } } -impl Iterator for MapIter +impl Iter for MapIter where - I: SeekableIter, + I: Iter, F: FnMut(I::Item) -> O, { type Item = O; @@ -30,15 +33,22 @@ where let item = self.iter.next()?; Some((self.mapf)(item)) } + + fn seek(&mut self, key: &[u8]) { + self.iter.seek(key); + } } -impl SeekableIter for MapIter +impl IntoIterator for MapIter where - I: SeekableIter, + I: Iter, F: FnMut(I::Item) -> O, { - fn seek(&mut self, key: &[u8]) { - self.iter.seek(key); + type Item = O; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + IntoIter(self) } } @@ -71,16 +81,15 @@ where S: Source, { type Item = O; - fn iter(&self) -> impl SeekableIter { - self.source.iter().map_func(&self.map) + fn iter(&self) -> impl Iter { + self.source.iter().map(&self.map) } } #[cfg(test)] mod test { use crate::source::test_source::TestSource; - use crate::Entry; - use crate::Source; + use crate::{Entry, Iter, Source}; struct CompVec(Vec); impl PartialEq<[u8]> for CompVec { @@ -99,12 +108,13 @@ mod test { let ts = TestSource((0u8..10).map(|i| Entry::new(vec![i], vec![])).collect()) .map(|e| CompVec(Vec::from(e.key()))); assert_eq!( - ts.iter().map(|cv| cv.0).collect::>(), + ts.iter().map(|cv| cv.0).into_iter().collect::>(), (0u8..10).map(|i| vec![i]).collect::>() ); assert_eq!( ts.get_range(&[2u8], &[7u8]) .map(|cv| cv.0) + .into_iter() .collect::>(), (2u8..8) .map(|i| { diff --git a/src/iter/merger.rs b/src/iter/merger.rs new file mode 100644 index 0000000..744991c --- /dev/null +++ b/src/iter/merger.rs @@ -0,0 +1,182 @@ +use crate::Iter; +use std::cmp::Ordering; +use std::collections::BinaryHeap; + +pub trait Mergeable: Ord + PartialOrd<[u8]> + AsRef<[u8]> + Clone {} + +struct MergeEntry { + e: I::Item, + it: I, +} + +impl PartialEq for MergeEntry +where + I::Item: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + self.e == other.e + } +} + +impl Eq for MergeEntry where I::Item: PartialEq {} + +// Note: MergEntry Ord, PartialOrd is reversed to provide "min heap" semantics +impl PartialOrd for MergeEntry +where + I::Item: PartialOrd, +{ + fn partial_cmp(&self, other: &Self) -> Option { + other.e.partial_cmp(&self.e) + } +} + +impl Ord for MergeEntry +where + I::Item: Ord, +{ + fn cmp(&self, other: &Self) -> Ordering { + other.e.cmp(&self.e) + } +} + +pub struct MergeIter { + heap: BinaryHeap>, + finished: Vec, + last_key: Vec, +} + +impl From for MergeIter +where + I: Iterator, + I::Item: Iter, + ::Item: Mergeable, +{ + fn from(iters: I) -> Self { + let mut v: Vec = Vec::new(); + let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() { + Some(e) => Some(MergeEntry { e, it }), + None => { + v.push(it); + None + } + })); + MergeIter { + finished: v, + heap: h, + last_key: Vec::new(), + } + } +} + +impl Iter for MergeIter +where + I::Item: Mergeable, +{ + type Item = I::Item; + + fn next(&mut self) -> Option { + let cur; + { + let mut next = self.heap.peek_mut()?; + + cur = next.e.clone(); + self.last_key.clear(); + self.last_key.extend(cur.as_ref()); + + if let Some(e) = next.it.next() { + next.e = e; + return Some(cur); + } + } + self.heap.pop(); + Some(cur) + } + + fn seek(&mut self, key: &[u8]) { + if key >= self.last_key.as_ref() { + loop { + match self.heap.peek_mut() { + None => return, + Some(mut head) => { + if &head.e >= key { + return; + } + head.it.seek(key); + if let Some(e) = head.it.next() { + head.e = e; + continue; + } + } + } + self.heap.pop(); + } + } + + // backwards seek; reset heap + let mut finished: Vec = Vec::new(); + let mut heap_entries: Vec> = Vec::new(); + for mut it in self + .heap + .drain() + .map(|me| me.it) + .chain(self.finished.drain(..)) + { + it.seek(key); + match it.next() { + Some(e) => heap_entries.push(MergeEntry { e, it }), + None => finished.push(it), + } + } + self.heap = BinaryHeap::from_iter(heap_entries); + self.finished = finished; + self.last_key.clear(); + self.last_key.extend(key); + } +} + +#[cfg(disabled)] +#[cfg(test)] +mod test { + use super::Merger; + use crate::source::test_source::TestSource; + use crate::Entry; + use crate::Source; + + fn tnum(m: u8) -> Vec { + Vec::from_iter((1u8..255).filter(|i| i % m == 0)) + } + + fn test_source(m: u8) -> TestSource { + TestSource(Vec::from_iter( + tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])), + )) + } + + #[test] + fn test_merge() { + let range = 1..8; + let iters: Vec<_> = range + .clone() + .map(test_source) + .map(|s| s.into_boxed()) + .collect(); + let s = Merger::from(iters); + let mut v = Vec::::new(); + for i in range { + v.extend(tnum(i)) + } + v.sort(); + let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0])); + assert_eq!(v2, v); + } + + #[test] + fn test_binheap() { + use std::collections::BinaryHeap; + + let v: Vec = vec![1, 8, 2, 9, 4, 7, 3]; + let vs = v.as_slice(); + let h = BinaryHeap::from_iter(vs.iter().copied()); + assert_ne!(h.into_vec(), v); + } +} diff --git a/src/iter/mod.rs b/src/iter/mod.rs new file mode 100644 index 0000000..69123c9 --- /dev/null +++ b/src/iter/mod.rs @@ -0,0 +1,92 @@ +use std::cmp::Ordering; +pub mod dupmerge; +pub mod dupsort; +pub mod filter; +pub mod map; +pub mod merger; +pub mod prefix; +pub mod range; + +use dupmerge::MergeIter; +use dupsort::DupsortIter; +use filter::FilterIter; +use map::MapIter; + +pub trait Iter { + type Item; + + fn seek(&mut self, key: &[u8]); + + fn next(&mut self) -> Option; + + fn seek_to(mut self, key: &[u8]) -> Self + where + Self: Sized, + { + self.seek(key); + self + } + + fn dup_merge(self, merge_func: F) -> MergeIter + where + Self::Item: PartialEq + Clone, + F: FnMut(&mut Self::Item, &Self::Item), + Self: Sized, + { + MergeIter::new(self, merge_func) + } + + fn dup_sort(self, dupsort_func: F) -> DupsortIter + where + Self: Sized, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, + { + DupsortIter::new(self, dupsort_func) + } + + fn filter(self, filter_func: F) -> FilterIter + where + F: FnMut(&Self::Item, &mut Vec) -> bool, + Self: Sized, + { + FilterIter::new(self, filter_func) + } + + fn map(self, map_func: F) -> MapIter + where + F: FnMut(Self::Item) -> O, + Self: Sized, + { + MapIter::new(self, map_func) + } +} + +impl Iter for Box { + type Item = I::Item; + + fn next(&mut self) -> Option { + self.as_mut().next() + } + + fn seek(&mut self, key: &[u8]) { + self.as_mut().seek(key); + } +} + +pub struct IntoIter(pub I); + +impl IntoIter { + fn new(i: I) -> Self { + Self(i) + } +} + +impl Iterator for IntoIter +where + I: Iter, +{ + type Item = I::Item; + fn next(&mut self) -> Option { + self.0.next() + } +} diff --git a/src/iter/prefix.rs b/src/iter/prefix.rs new file mode 100644 index 0000000..8deb321 --- /dev/null +++ b/src/iter/prefix.rs @@ -0,0 +1,55 @@ +use crate::iter::{IntoIter, Iter}; + +pub struct PrefixIter +where + I: Iter, + I::Item: AsRef<[u8]>, +{ + iter: I, + prefix: Vec, +} + +impl PrefixIter +where + I: Iter, + I::Item: AsRef<[u8]>, +{ + pub fn new(mut iter: I, prefix: impl AsRef<[u8]>) -> Self { + iter.seek(prefix.as_ref()); + Self { + iter, + prefix: Vec::from(prefix.as_ref()), + } + } +} + +impl Iter for PrefixIter +where + I: Iter, + I::Item: AsRef<[u8]>, +{ + type Item = I::Item; + + fn next(&mut self) -> Option { + self.iter + .next() + .filter(|e| e.as_ref().starts_with(self.prefix.as_slice())) + } + + fn seek(&mut self, key: &[u8]) { + self.iter.seek(key); + } +} + +impl IntoIterator for PrefixIter +where + I: Iter, + I::Item: AsRef<[u8]>, +{ + type Item = I::Item; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + IntoIter(self) + } +} diff --git a/src/iter/range.rs b/src/iter/range.rs new file mode 100644 index 0000000..00d559f --- /dev/null +++ b/src/iter/range.rs @@ -0,0 +1,54 @@ +use crate::iter::{IntoIter, Iter}; + +pub struct RangeIter { + iter: I, + start: Vec, + end: Vec, +} + +impl RangeIter +where + I: Iter, +{ + pub fn new(mut iter: I, start: impl AsRef<[u8]>, end: impl AsRef<[u8]>) -> Self { + iter.seek(start.as_ref()); + Self { + iter, + start: Vec::from(start.as_ref()), + end: Vec::from(end.as_ref()), + } + } +} + +impl Iter for RangeIter +where + I: Iter, + I::Item: PartialOrd<[u8]>, +{ + type Item = I::Item; + fn next(&mut self) -> Option { + self.iter.next().filter(|i| i <= self.end.as_slice()) + } + + fn seek(&mut self, key: &[u8]) { + if key <= self.start.as_slice() { + self.iter.seek(self.start.as_slice()); + } else if key > self.end.as_slice() { + self.iter.seek(self.end.as_slice()); + } else { + self.iter.seek(key); + } + } +} + +impl IntoIterator for RangeIter +where + I: Iter, + I::Item: PartialOrd<[u8]>, +{ + type Item = I::Item; + type IntoIter = IntoIter; + fn into_iter(self) -> Self::IntoIter { + IntoIter(self) + } +} diff --git a/src/lib.rs b/src/lib.rs index d8060fb..07b87dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,6 @@ mod compression; -mod dupsort; mod entry; -mod filter; mod iter; -mod map; -mod merge; mod merger; pub mod reader; pub mod sorter; @@ -14,7 +10,7 @@ mod writer; pub use compression::Compression; pub use entry::Entry; pub use fileset::Fileset; -pub use iter::SeekableIter; +pub use iter::Iter; pub use merger::Merger; pub use reader::Reader; pub use source::Source; diff --git a/src/merge.rs b/src/merge.rs deleted file mode 100644 index ae26304..0000000 --- a/src/merge.rs +++ /dev/null @@ -1,104 +0,0 @@ -use crate::{Entry, SeekableIter, Source}; - -pub struct MergeSource { - source: S, - #[allow(clippy::type_complexity)] - merge_func: Box, &Entry)>, -} - -impl MergeSource -where - S: Source, -{ - pub fn new(source: S, merge_func: F) -> Self - where - F: Fn(&mut Vec, &Entry) + 'static, - { - Self { - source, - merge_func: Box::new(merge_func), - } - } -} -impl Source for MergeSource -where - S: Source, -{ - type Item = S::Item; - fn iter(&self) -> impl SeekableIter { - self.source.iter().merge_func(&self.merge_func) - } -} - -pub struct MergeIter, &Entry)> { - prev: Option, - iter: I, - merge_func: F, -} - -impl MergeIter -where - F: Fn(&mut Vec, &Entry), -{ - pub fn new(iter: I, merge_func: F) -> Self { - Self { - prev: None, - iter, - merge_func, - } - } -} - -impl Iterator for MergeIter -where - I: Iterator, - F: Fn(&mut Vec, &Entry), -{ - type Item = Entry; - fn next(&mut self) -> Option { - let mut cur = self.prev.take().or_else(|| self.iter.next())?; - for e in &mut self.iter { - if e.key() == cur.key() { - (self.merge_func)(cur.value_mut(), &e); - } else { - self.prev.replace(e); - break; - } - } - Some(cur) - } -} - -impl SeekableIter for MergeIter -where - I: SeekableIter, - F: Fn(&mut Vec, &Entry), -{ - fn seek(&mut self, key: &[u8]) { - self.prev.take(); - self.iter.seek(key); - } -} - -#[test] -fn test_merge_func() { - use crate::source::test_source::TestSource; - - let ts = TestSource( - (1u8..8) - .flat_map(|n| vec![n; n as usize]) - .map(|n| Entry::new(vec![n], vec![1])) - .collect(), - ); - - let s = ts.merge_func(|v, e| { - v[0] += e.value()[0]; - }); - - assert_eq!( - Vec::from_iter(s.iter()), - (1u8..8) - .map(|n| Entry::new(vec![n], vec![n])) - .collect::>() - ) -} diff --git a/src/merger.rs b/src/merger.rs index 5abf0a9..e6ea6c6 100644 --- a/src/merger.rs +++ b/src/merger.rs @@ -1,149 +1,50 @@ -use crate::{Entry, SeekableIter, Source}; -use std::cmp::Ordering; -use std::collections::BinaryHeap; +use crate::iter::{ + merger::{MergeIter, Mergeable}, + Iter, +}; +use crate::Source; pub struct Merger { sources: Vec, } -impl From for Merger -where - I: IntoIterator, -{ - fn from(i: I) -> Self { - Merger { - sources: Vec::from_iter(i), +impl Merger { + pub fn new() -> Self { + Self { + sources: Vec::new(), } } -} -struct MergeEntry { - e: Entry, - it: I, -} - -impl PartialEq for MergeEntry { - fn eq(&self, other: &Self) -> bool { - self.e.key() == other.e.key() - } -} -impl Eq for MergeEntry {} - -impl PartialOrd for MergeEntry { - fn partial_cmp(&self, other: &Self) -> Option { - Some(other.e.key().cmp(self.e.key())) + pub fn add(&mut self, source: S) { + self.sources.push(source); } } -impl Ord for MergeEntry { - fn cmp(&self, other: &Self) -> Ordering { - other.e.key().cmp(self.e.key()) - } -} - -pub struct MergeIter { - heap: BinaryHeap>, - finished: Vec, - last_key: Vec, -} - -impl From for MergeIter +impl From for Merger where - I: Iterator, - I::Item: SeekableIter, + I: IntoIterator, + S: Source, + S::Item: Mergeable, { - fn from(iters: I) -> Self { - let mut v: Vec = Vec::new(); - let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() { - Some(e) => Some(MergeEntry { e, it }), - None => { - v.push(it); - None - } - })); - MergeIter { - finished: v, - heap: h, - last_key: Vec::new(), - } - } -} - -impl> Iterator for MergeIter { - type Item = Entry; - - fn next(&mut self) -> Option { - let cur; - { - let mut next = self.heap.peek_mut()?; - cur = next.e.clone(); - if let Some(e) = next.it.next() { - next.e = e; - self.last_key.clear(); - self.last_key.extend(next.e.key()); - return Some(cur); - } - } - self.heap.pop(); - Some(cur) - } -} - -impl> SeekableIter for MergeIter { - fn seek(&mut self, key: &[u8]) { - if key > self.last_key.as_slice() { - loop { - match self.heap.peek_mut() { - None => return, - Some(mut head) => { - if head.e.key() >= key { - self.last_key.clear(); - self.last_key.extend_from_slice(head.e.key()); - return; - } - head.it.seek(key); - if let Some(e) = head.it.next() { - head.e = e; - continue; - } - } - } - self.heap.pop(); - } - } - - // backwards seek; reset heap - let mut finished: Vec = Vec::new(); - let mut heap_entries: Vec> = Vec::new(); - for mut it in self - .heap - .drain() - .map(|me| me.it) - .chain(self.finished.drain(..)) - { - it.seek(key); - match it.next() { - Some(e) => heap_entries.push(MergeEntry { e, it }), - None => finished.push(it), - } + fn from(i: I) -> Self { + Merger { + sources: Vec::from_iter(i), } - self.heap = BinaryHeap::from_iter(heap_entries); - self.finished = finished; - self.last_key.clear(); - self.last_key.extend_from_slice(key); } } impl Source for Merger where - S: Source, + S: Source, + S::Item: Mergeable, { type Item = S::Item; - fn iter(&self) -> impl SeekableIter { + fn iter(&self) -> impl Iter { MergeIter::from(self.sources.iter().map(|s| s.iter())) } } +#[cfg(disabled)] #[cfg(test)] mod test { use super::Merger; diff --git a/src/reader/block.rs b/src/reader/block.rs index d1cabf0..e152af2 100644 --- a/src/reader/block.rs +++ b/src/reader/block.rs @@ -1,4 +1,5 @@ -use crate::{Entry, Result, SeekableIter}; +use crate::iter::{IntoIter, Iter}; +use crate::{Entry, Result}; use integer_encoding::VarInt; use std::mem::size_of; @@ -79,16 +80,27 @@ impl> Block { .decode(&self.data.as_ref()[self.restart_off..], idx) } } + +impl> From> for BlockIter { + fn from(b: Block) -> BlockIter { + BlockIter { + block: b, + cur_ent: None, + off: 0, + } + } +} + impl> IntoIterator for Block { type Item = Entry; - type IntoIter = BlockIter; + type IntoIter = IntoIter>; fn into_iter(self) -> Self::IntoIter { - Self::IntoIter { + IntoIter(BlockIter { block: self, cur_ent: None, off: 0, - } + }) } } @@ -181,15 +193,13 @@ impl> BlockIter { } } -impl> Iterator for BlockIter { +impl> Iter for BlockIter { type Item = Entry; fn next(&mut self) -> Option { self.decode().cloned() } -} -impl> SeekableIter for BlockIter { fn seek(&mut self, key: &[u8]) { // TODO: "galloping search" if self.block.restart_count > 0 { @@ -213,10 +223,10 @@ impl> SeekableIter for BlockIter { #[cfg(test)] mod test { - use crate::reader::block::Block; + use crate::reader::block::{Block, BlockIter}; use crate::writer::block_builder::BlockBuilder; use crate::Entry; - use crate::SeekableIter; + use crate::Iter; fn build_block(n: u32, skip: u32, r: usize) -> Block> { let mut bb = BlockBuilder::default(); @@ -258,7 +268,7 @@ mod test { fn test_block_seek() { let n = 40; let b = build_block(n, 10, 10); - let mut bi = b.into_iter(); + let mut bi = BlockIter::from(b); bi.seek(&u32::to_be_bytes(40)); assert_eq!(bi.next().unwrap().key(), &u32::to_be_bytes(40)); bi.seek(&u32::to_be_bytes(32)); diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 4ba0d7e..f5d8483 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -1,6 +1,6 @@ use crate::metadata::Metadata; use crate::Source; -use crate::{Entry, SeekableIter}; +use crate::{iter::IntoIter, Entry, Iter}; use integer_encoding::VarInt; pub(crate) mod block; use crate::compression::CBuf; @@ -105,7 +105,7 @@ impl> Reader { off += prelude; block::Block::new(CBuf::Buf(self.data.clone_range(off, size))) .expect("bad block") - .into_iter() + .into() } } @@ -131,14 +131,15 @@ impl> ReaderIter { self.data_iter.replace( block::Block::new(comp.uncompress(self.reader.data.clone_range(data_off, size))?) .expect("bad block") - .into_iter(), + .into(), ); Some(()) } } -impl> Iterator for ReaderIter { +impl> Iter for ReaderIter { type Item = Entry; + fn next(&mut self) -> Option { if self.data_iter.is_none() { self.next_block() @@ -153,9 +154,7 @@ impl> Iterator for ReaderIter { } } } -} -impl> SeekableIter for ReaderIter { fn seek(&mut self, key: &[u8]) { // TODO: detect and skip unneeded seek in iter. self.index_iter.seek(key); @@ -176,10 +175,18 @@ impl> SeekableIter for ReaderIter { } } +impl> IntoIterator for ReaderIter { + type Item = Entry; + type IntoIter = IntoIter; + fn into_iter(self) -> Self::IntoIter { + IntoIter(self) + } +} + impl> Source for Reader { type Item = Entry; - fn iter(&self) -> impl SeekableIter { + fn iter(&self) -> impl Iter { ReaderIter { reader: self.clone(), next_offset: 0, diff --git a/src/sorter.rs b/src/sorter.rs index a582cc0..d164452 100644 --- a/src/sorter.rs +++ b/src/sorter.rs @@ -1,8 +1,8 @@ -use crate::{merge::MergeSource, Entry, Merger, Reader, SeekableIter, Source, Writer}; +use crate::{Entry, Iter, Merger, Reader, Source, Writer}; use memmap::Mmap; use std::cell::Cell; -pub struct Sorter, &Entry)> { +pub struct Sorter { batch: Cell>, batch_size: usize, max_size: usize, @@ -13,7 +13,7 @@ pub struct Sorter, &Entry)> { impl Sorter where - F: Fn(&mut Vec, &Entry) + 'static, + F: Fn(&mut Entry, &Entry) + 'static, { pub fn new(max_size: usize, merge_func: F) -> Self { Self { @@ -35,15 +35,19 @@ where self.batch_size += esize; } - pub fn source(mut self) -> MergeSource>> { + pub fn source(mut self) -> impl Source { + // DupmergeSource>> { if !self.batch.get_mut().is_empty() { self.write_chunk(); } - Merger::from(self.readers).merge_func(self.merge_func) + Merger::from(self.readers).dup_merge(self.merge_func) } pub fn write(self, mut w: Writer) { - self.source().iter().for_each(|e| w.add(e).unwrap()); + self.source() + .iter() + .into_iter() + .for_each(|e| w.add(e).unwrap()); } fn write_chunk(&mut self) { @@ -54,7 +58,8 @@ where self.batch .take() .iter() - .merge_func(&self.merge_func) + .dup_merge(&self.merge_func) + .into_iter() .for_each(|e| { w.add(e).unwrap(); }); diff --git a/src/source/adapters.rs b/src/source/adapters.rs new file mode 100644 index 0000000..4eaadbc --- /dev/null +++ b/src/source/adapters.rs @@ -0,0 +1,72 @@ +use crate::{Iter, Source}; +use std::cmp::Ordering; + +pub struct DupmergeSource { + pub(super) source: S, + pub(super) merge: F, +} + +impl Source for DupmergeSource +where + S: Source, + S::Item: PartialEq + Clone, + F: Fn(&mut S::Item, &S::Item), +{ + type Item = S::Item; + fn iter(&self) -> impl Iter { + self.source.iter().dup_merge(&self.merge) + } +} + +pub struct DupsortSource { + pub(super) source: S, + pub(super) dupsort: F, +} + +impl Source for DupsortSource +where + S: Source, + S::Item: PartialEq, + F: Fn(&S::Item, &S::Item) -> Ordering, +{ + type Item = S::Item; + fn iter(&self) -> impl Iter { + self.source.iter().dup_sort(&self.dupsort) + } +} + +pub struct FilterSource { + pub(super) source: S, + pub(super) filter: F, +} + +impl Source for FilterSource +where + S: Source, + F: Fn(&S::Item, &mut Vec) -> bool, +{ + type Item = S::Item; + fn iter(&self) -> impl Iter { + self.source.iter().filter(&self.filter) + } +} + +pub struct MapSource +where + S: Source, + F: Fn(S::Item) -> O, +{ + pub(super) source: S, + pub(super) map: F, +} + +impl Source for MapSource +where + S: Source, + F: Fn(S::Item) -> O, +{ + type Item = O; + fn iter(&self) -> impl Iter { + self.source.iter().map(&self.map) + } +} diff --git a/src/source.rs b/src/source/mod.rs similarity index 70% rename from src/source.rs rename to src/source/mod.rs index 4efbb29..24638fc 100644 --- a/src/source.rs +++ b/src/source/mod.rs @@ -1,63 +1,78 @@ -use crate::dupsort::DupsortSource; -use crate::filter::FilterSource; -use crate::iter::{PrefixIter, RangeIter}; -use crate::map::MapSource; -use crate::merge::MergeSource; -use crate::{Entry, SeekableIter}; +use crate::iter::{prefix::PrefixIter, range::RangeIter, Iter}; +use crate::Entry; +use std::cmp::Ordering; + +mod adapters; +pub use adapters::DupmergeSource; +pub use adapters::DupsortSource; +pub use adapters::FilterSource; +pub use adapters::MapSource; pub trait Source { type Item; - fn iter(&self) -> impl SeekableIter; - fn get(&self, key: &[u8]) -> RangeIter> + fn iter(&self) -> impl Iter; + + fn get(&self, key: &[u8]) -> RangeIter> where Self::Item: PartialOrd<[u8]>, { RangeIter::new(self.iter(), key, key) } - fn get_prefix(&self, prefix: &[u8]) -> PrefixIter> + + fn get_prefix(&self, prefix: &[u8]) -> PrefixIter> where Self::Item: AsRef<[u8]>, { PrefixIter::new(self.iter(), prefix) } - fn get_range(&self, start: &[u8], end: &[u8]) -> RangeIter> + + fn get_range(&self, start: &[u8], end: &[u8]) -> RangeIter> where Self::Item: PartialOrd<[u8]>, { RangeIter::new(self.iter(), start, end) } - fn merge_func(self, merge_func: F) -> MergeSource + fn dup_merge(self, merge: F) -> DupmergeSource where Self: Sized, - F: Fn(&mut Vec, &Entry) + 'static, + F: Fn(&mut Entry, &Entry) + 'static, { - MergeSource::new(self, merge_func) + DupmergeSource { + source: self, + merge, + } } - fn dupsort_func(self, dupsort_func: F) -> DupsortSource + fn dupsort(self, dupsort: F) -> DupsortSource where Self: Sized, - F: Fn(&Self::Item, &Self::Item) -> std::cmp::Ordering + 'static, + F: Fn(&Self::Item, &Self::Item) -> Ordering + 'static, { - DupsortSource::new(self, dupsort_func) + DupsortSource { + source: self, + dupsort, + } } - fn filter(self, filter_func: F) -> FilterSource + fn filter(self, filter: F) -> FilterSource where Self: Sized, F: Fn(&Self::Item, &mut Vec) -> bool + 'static, { - FilterSource::new(self, filter_func) + FilterSource { + source: self, + filter, + } } - fn map(self, map_func: F) -> MapSource + fn map(self, map: F) -> MapSource where Self: Sized, F: Fn(Self::Item) -> O + 'static, { - MapSource::new(self, map_func) + MapSource { source: self, map } } fn into_boxed(self) -> Box> @@ -74,20 +89,20 @@ pub trait Source { pub trait DynSource { type Item; - fn iter(&self) -> Box + '_>; + fn iter(&self) -> Box + '_>; } impl DynSource for Box { type Item = S::Item; - fn iter(&self) -> Box + '_> { + fn iter(&self) -> Box + '_> { Box::new(self.as_ref().iter()) } } impl Source for D { type Item = D::Item; - fn iter(&self) -> impl SeekableIter { + fn iter(&self) -> impl Iter { DynSource::iter(self) } } @@ -97,7 +112,7 @@ pub struct VecIter<'a> { vec: &'a Vec, } -impl Iterator for VecIter<'_> { +impl Iter for VecIter<'_> { type Item = Entry; fn next(&mut self) -> Option { if self.index > self.vec.len() { @@ -108,9 +123,7 @@ impl Iterator for VecIter<'_> { Some(e) } } -} -impl SeekableIter for VecIter<'_> { fn seek(&mut self, key: &[u8]) { let mut left = 0; let mut right = self.vec.len() - 1; @@ -128,7 +141,7 @@ impl SeekableIter for VecIter<'_> { impl Source for Vec { type Item = Entry; - fn iter(&self) -> impl SeekableIter { + fn iter(&self) -> impl Iter { VecIter { index: 0, vec: self, @@ -138,7 +151,7 @@ impl Source for Vec { pub mod test_source { use crate::Entry; - use crate::SeekableIter; + use crate::Iter; use crate::Source; pub struct TestSource(pub Vec); @@ -148,7 +161,7 @@ pub mod test_source { off: usize, } - impl Iterator for TestIter<'_> { + impl Iter for TestIter<'_> { type Item = Entry; fn next(&mut self) -> Option { @@ -160,9 +173,7 @@ pub mod test_source { self.off += 1; Some(item.clone()) } - } - impl SeekableIter for TestIter<'_> { fn seek(&mut self, key: &[u8]) { self.off = 0; while self.off < self.source.0.len() && self.source.0[self.off].key() < key { @@ -171,9 +182,17 @@ pub mod test_source { } } + impl IntoIterator for TestIter<'_> { + type Item = Entry; + type IntoIter = crate::iter::IntoIter; + fn into_iter(self) -> Self::IntoIter { + crate::iter::IntoIter(self) + } + } + impl Source for TestSource { type Item = Entry; - fn iter(&self) -> impl SeekableIter { + fn iter(&self) -> impl Iter { TestIter { source: self, off: 0, @@ -186,6 +205,8 @@ pub mod test { use super::test_source::TestSource; #[allow(unused_imports)] use super::Source; + #[allow(unused_imports)] + use crate::iter::Iter; use crate::Entry; #[allow(dead_code)] -- 2.50.1