From: Chris Mikkelson Date: Sat, 5 Apr 2025 22:26:47 +0000 (-0500) Subject: Refactor to allow for dyn-compatible Source, iteration over arbitrary X-Git-Url: https://git.mikk.net/?a=commitdiff_plain;h=a3012a2504c08d222dea0fd4a4cfcc9fee02a972;p=mtbl-rs Refactor to allow for dyn-compatible Source, iteration over arbitrary types. --- diff --git a/src/dupsort_func.rs b/src/dupsort_func.rs index a9b8b92..96d0bf1 100644 --- a/src/dupsort_func.rs +++ b/src/dupsort_func.rs @@ -1,81 +1,63 @@ -use crate::entry::HasPrefix; use crate::{SeekableIter, Source}; use std::cmp::Ordering; +use std::marker::PhantomData; -pub struct DupsortFunc +pub struct DupsortFunc<'a, S, F> where - S: Source, + S: Source<'a>, F: Fn(&S::Item, &S::Item) -> Ordering, { source: S, dupsort_func: F, + phantom: PhantomData<&'a char>, } -impl DupsortFunc +impl<'a, S, F> DupsortFunc<'a, S, F> where - S: Source, + S: Source<'a>, F: Fn(&S::Item, &S::Item) -> Ordering, { pub fn new(source: S, dupsort_func: F) -> Self { Self { source, dupsort_func, + phantom: PhantomData, } } } -impl Source for DupsortFunc +impl<'a, S, F> Source<'a> for DupsortFunc<'a, S, F> where - S: Source, + S: Source<'a>, S::Item: PartialEq, - F: Fn(&S::Item, &S::Item) -> Ordering, + F: Fn(&S::Item, &S::Item) -> Ordering + 'a, { + type Iter = DupsortFuncIter; type Item = S::Item; - fn iter(&self) -> impl SeekableIter { + + fn iter(&'a self) -> Self::Iter { self.source.iter().dupsort_func(&self.dupsort_func) } - fn get(&self, key: &[u8]) -> impl SeekableIter - where - S::Item: PartialOrd<[u8]>, - { - self.source.get(key).dupsort_func(&self.dupsort_func) - } - fn get_prefix(&self, prefix: &[u8]) -> impl SeekableIter - where - S::Item: HasPrefix, - { - self.source - .get_prefix(prefix) - .dupsort_func(&self.dupsort_func) - } - fn get_range(&self, start: &[u8], end: &[u8]) -> impl SeekableIter - where - S::Item: PartialOrd<[u8]>, - { - self.source - .get_range(start, end) - .dupsort_func(&self.dupsort_func) - } } #[derive(Debug)] -pub struct DupsortFuncIter<'a, I, F> +pub struct DupsortFuncIter where I: SeekableIter, - F: Fn(&I::Item, &I::Item) -> Ordering, + F: FnMut(&I::Item, &I::Item) -> Ordering, { run: Vec, next: Option, iter: I, - dupsort_func: &'a F, + dupsort_func: F, } -impl<'a, I, F> DupsortFuncIter<'a, I, F> +impl DupsortFuncIter where I: SeekableIter, - F: Fn(&I::Item, &I::Item) -> Ordering, + F: FnMut(&I::Item, &I::Item) -> Ordering, { - pub fn new(iter: I, dupsort_func: &'a F) -> Self { + pub fn new(iter: I, dupsort_func: F) -> Self { Self { run: Vec::new(), next: None, @@ -85,7 +67,7 @@ where } } -impl<'a, I, F> Iterator for DupsortFuncIter<'a, I, F> +impl Iterator for DupsortFuncIter where I: SeekableIter, I::Item: PartialEq, @@ -115,7 +97,7 @@ where } } -impl<'a, I, F> SeekableIter for DupsortFuncIter<'a, I, F> +impl SeekableIter for DupsortFuncIter where I: SeekableIter, I::Item: PartialEq, diff --git a/src/entry.rs b/src/entry.rs index 9fe4fc6..ce1ede2 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -58,18 +58,14 @@ impl PartialEq<[u8]> for Entry { } } -impl PartialEq for Entry { - fn eq(&self, other: &Entry) -> bool { - self.key() == other.key() +impl AsRef<[u8]> for Entry { + fn as_ref(&self) -> &[u8] { + self.key() } } -pub trait HasPrefix { - fn has_prefix(&self, prefix: &[u8]) -> bool; -} - -impl HasPrefix for Entry { - fn has_prefix(&self, prefix: &[u8]) -> bool { - self.key().starts_with(prefix) +impl PartialEq for Entry { + fn eq(&self, other: &Entry) -> bool { + self.key() == other.key() } } diff --git a/src/filter.rs b/src/filter.rs index 459c50f..2aed71d 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,99 +1,88 @@ -use crate::{entry::HasPrefix, SeekableIter, Source}; +use crate::{SeekableIter, Source}; +use std::marker::PhantomData; -pub struct FilterIter<'a, I, F> { +pub struct FilterIter { iter: I, - filter_func: &'a F, + filter: F, + seek_key: Vec, } -impl<'a, I, F> FilterIter<'a, I, F> +impl FilterIter where I: SeekableIter, - F: Fn(&I::Item, &mut dyn SeekableIter) -> bool, + F: FnMut(&I::Item, &mut Vec) -> bool, { - pub fn new(iter: I, filter_func: &'a F) -> Self { - Self { iter, filter_func } + pub fn new(iter: I, filter: F) -> Self { + let seek_key = Vec::new(); + Self { + iter, + filter, + seek_key, + } } } -impl<'a, I, F> Iterator for FilterIter<'a, I, F> +impl Iterator for FilterIter where I: SeekableIter, - F: Fn(&I::Item, &mut dyn SeekableIter) -> bool, + F: FnMut(&I::Item, &mut Vec) -> bool, { type Item = I::Item; fn next(&mut self) -> Option { - while let Some(e) = self.iter.next() { - if (self.filter_func)(&e, &mut self.iter) { - return Some(e); + loop { + let item = self.iter.next()?; + self.seek_key.clear(); + if (self.filter)(&item, &mut self.seek_key) { + return Some(item); + } + if self.seek_key.len() > 0 { + self.iter.seek(self.seek_key.as_slice()); } } - None } } -impl<'a, I, F> SeekableIter for FilterIter<'a, I, F> +impl SeekableIter for FilterIter where I: SeekableIter, - F: Fn(&I::Item, &mut dyn SeekableIter) -> bool, + F: FnMut(&I::Item, &mut Vec) -> bool, { fn seek(&mut self, key: &[u8]) { self.iter.seek(key); } } -pub struct FilterSource { +pub struct FilterSource<'a, S, F> { source: S, filter_func: F, + phantom: PhantomData<&'a char>, } -impl FilterSource +impl<'a, S, F> FilterSource<'a, S, F> where - S: Source, - F: Fn(&S::Item, &mut dyn SeekableIter) -> bool, + S: Source<'a>, + F: Fn(&S::Item, &mut Vec) -> bool, { pub fn new(source: S, filter_func: F) -> Self { Self { source, filter_func, + phantom: PhantomData, } } } -impl Source for FilterSource +impl<'a, S, F> Source<'a> for FilterSource<'a, S, F> where - S: Source, - F: Fn(&S::Item, &mut dyn SeekableIter) -> bool, + S: Source<'a>, + F: Fn(&S::Item, &mut Vec) -> bool + 'a, { + type Iter = FilterIter; type Item = S::Item; - fn iter(&self) -> impl SeekableIter { + fn iter(&'a self) -> Self::Iter { self.source.iter().filter_func(&self.filter_func) } - - fn get(&self, key: &[u8]) -> impl SeekableIter - where - Self::Item: PartialOrd<[u8]>, - { - self.source.get(key).filter_func(&self.filter_func) - } - - fn get_prefix(&self, prefix: &[u8]) -> impl SeekableIter - where - Self::Item: HasPrefix, - { - self.source - .get_prefix(prefix) - .filter_func(&self.filter_func) - } - - fn get_range(&self, start: &[u8], end: &[u8]) -> impl SeekableIter - where - Self::Item: PartialOrd<[u8]>, - { - self.source - .get_range(start, end) - .filter_func(&self.filter_func) - } } #[test] @@ -107,19 +96,20 @@ fn test_filter() { .map(|n| Entry::new(vec![n], vec![])) .collect(), ) - .filter(|e, it| { + .filter(|e, sv| { // pass only even values if e.key()[0] % 2 != 0 { return false; } else if e.key()[0] == 4 { // at key 4, seek to 8 - it.seek(vec![8].as_slice()); + sv.push(8); + return false; } true }); assert_eq!( - vec![0, 2, 4, 8], + vec![0, 2, 8], ts.iter().map(|e| e.key()[0]).collect::>() ); } diff --git a/src/iter.rs b/src/iter.rs index 0a7209c..b50e3a8 100644 --- a/src/iter.rs +++ b/src/iter.rs @@ -1,5 +1,4 @@ use crate::dupsort_func::DupsortFuncIter; -use crate::entry::HasPrefix; use crate::filter::FilterIter; use crate::merge_func::MergeFuncIter; use crate::Entry; @@ -9,40 +8,44 @@ use std::iter::Iterator; pub trait SeekableIter: Iterator { fn seek(&mut self, key: &[u8]); - fn merge_func<'a, F>(self, merge_func: &'a F) -> MergeFuncIter<'a, Self, F> + fn merge_func(self, merge_func: F) -> MergeFuncIter where - F: Fn(&mut Vec, &Entry), + F: FnMut(&mut Vec, &Entry), Self: Sized, { MergeFuncIter::new(self, merge_func) } - fn dupsort_func<'a, F>(self, dupsort_func: &'a F) -> DupsortFuncIter<'a, Self, F> + fn dupsort_func(self, dupsort_func: F) -> DupsortFuncIter where - F: Fn(&Self::Item, &Self::Item) -> Ordering, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, Self: Sized, { DupsortFuncIter::new(self, dupsort_func) } - fn filter_func<'a, F>(self, filter_func: &'a F) -> FilterIter<'a, Self, F> + fn filter_func(self, filter_func: F) -> FilterIter where - F: Fn(&Self::Item, &mut dyn SeekableIter) -> bool, + F: FnMut(&Self::Item, &mut Vec) -> bool, Self: Sized, { FilterIter::new(self, filter_func) } } -pub struct PrefixIter { +pub struct PrefixIter +where + I: SeekableIter, + I::Item: AsRef<[u8]>, +{ iter: I, prefix: Vec, } -impl PrefixIter +impl PrefixIter where - I: SeekableIter, - E: HasPrefix, + I: SeekableIter, + I::Item: AsRef<[u8]>, { pub fn new(mut iter: I, prefix: impl AsRef<[u8]>) -> Self { iter.seek(prefix.as_ref()); @@ -53,26 +56,24 @@ where } } -impl Iterator for PrefixIter +impl Iterator for PrefixIter where - I: SeekableIter, - E: HasPrefix, + I: SeekableIter, + I::Item: AsRef<[u8]>, { - type Item = E; + type Item = I::Item; + fn next(&mut self) -> Option { - let item = self.iter.next()?; - if item.has_prefix(self.prefix.as_slice()) { - Some(item) - } else { - None - } + self.iter + .next() + .filter(|e| e.as_ref().starts_with(self.prefix.as_slice())) } } -impl SeekableIter for PrefixIter +impl SeekableIter for PrefixIter where - I: SeekableIter, - E: HasPrefix, + I: SeekableIter, + I::Item: AsRef<[u8]>, { fn seek(&mut self, key: &[u8]) { self.iter.seek(key); @@ -85,7 +86,11 @@ pub struct RangeIter { end: Vec, } -impl RangeIter { +impl RangeIter +where + I: SeekableIter, + I::Item: AsRef<[u8]>, +{ pub fn new(mut iter: I, start: impl AsRef<[u8]>, end: impl AsRef<[u8]>) -> Self { iter.seek(start.as_ref()); Self { @@ -96,33 +101,29 @@ impl RangeIter { } } -impl Iterator for RangeIter +impl Iterator for RangeIter where - I: SeekableIter, - E: PartialOrd<[u8]>, + I: SeekableIter, + I::Item: AsRef<[u8]>, { - type Item = E; + type Item = I::Item; fn next(&mut self) -> Option { - let item = self.iter.next()?; - if item <= *self.end.as_slice() { - Some(item) - } else { - None - } + self.iter + .next() + .filter(|i| i.as_ref() <= self.end.as_slice()) } } -impl SeekableIter for RangeIter +impl SeekableIter for RangeIter where - I: SeekableIter, - E: PartialOrd<[u8]>, + I: SeekableIter, + I::Item: AsRef<[u8]>, { fn seek(&mut self, key: &[u8]) { if key <= self.start.as_slice() { self.iter.seek(self.start.as_slice()); } else if key > self.end.as_slice() { self.iter.seek(self.end.as_slice()); - self.iter.next(); } else { self.iter.seek(key); } diff --git a/src/merge_func.rs b/src/merge_func.rs index 7f55bd6..ef10380 100644 --- a/src/merge_func.rs +++ b/src/merge_func.rs @@ -1,53 +1,49 @@ use crate::{Entry, SeekableIter, Source}; +use std::marker::PhantomData; -pub struct MergeFunc, &Entry)> { +pub struct MergeFunc<'a, S: Source<'a>, F: Fn(&mut Vec, &Entry)> { source: S, merge_func: F, + phantom: PhantomData<&'a char>, } -impl MergeFunc +impl<'a, S, F> MergeFunc<'a, S, F> where - S: Source, + S: Source<'a>, F: Fn(&mut Vec, &Entry), { pub fn new(source: S, merge_func: F) -> Self { - Self { source, merge_func } + Self { + source, + merge_func, + phantom: PhantomData, + } } } -impl Source for MergeFunc +impl<'a, S, F> Source<'a> for MergeFunc<'a, S, F> where - S: Source, - F: Fn(&mut Vec, &Entry), + S: Source<'a, Item = Entry>, + F: Fn(&mut Vec, &Entry) + 'a, { + type Iter = MergeFuncIter; type Item = Entry; - fn iter(&self) -> impl SeekableIter { + fn iter(&'a self) -> Self::Iter { self.source.iter().merge_func(&self.merge_func) } - fn get(&self, key: &[u8]) -> impl SeekableIter { - self.source.get(key).merge_func(&self.merge_func) - } - fn get_prefix(&self, prefix: &[u8]) -> impl SeekableIter { - self.source.get_prefix(prefix).merge_func(&self.merge_func) - } - fn get_range(&self, start: &[u8], end: &[u8]) -> impl SeekableIter { - self.source - .get_range(start, end) - .merge_func(&self.merge_func) - } } -pub struct MergeFuncIter<'a, I, F: Fn(&mut Vec, &Entry)> { +pub struct MergeFuncIter, &Entry)> { prev: Option, iter: I, - merge_func: &'a F, + merge_func: F, } -impl<'a, I, F> MergeFuncIter<'a, I, F> +impl MergeFuncIter where - F: Fn(&mut Vec, &Entry), + F: FnMut(&mut Vec, &Entry), { - pub fn new(iter: I, merge_func: &'a F) -> Self { + pub fn new(iter: I, merge_func: F) -> Self { Self { prev: None, iter, @@ -56,10 +52,10 @@ where } } -impl<'a, I, F> Iterator for MergeFuncIter<'a, I, F> +impl Iterator for MergeFuncIter where I: Iterator, - F: Fn(&mut Vec, &Entry), + F: FnMut(&mut Vec, &Entry), { type Item = Entry; fn next(&mut self) -> Option { @@ -76,10 +72,10 @@ where } } -impl<'a, I, F> SeekableIter for MergeFuncIter<'a, I, F> +impl SeekableIter for MergeFuncIter where I: SeekableIter, - F: Fn(&mut Vec, &Entry), + F: FnMut(&mut Vec, &Entry), { fn seek(&mut self, key: &[u8]) { self.prev.take(); diff --git a/src/merger.rs b/src/merger.rs index 3c99c1a..a5e2bb3 100644 --- a/src/merger.rs +++ b/src/merger.rs @@ -134,23 +134,15 @@ impl> SeekableIter for MergeIter { } } -impl> Source for Merger { +impl<'a, S> Source<'a> for Merger +where + S: Source<'a, Item = Entry>, +{ type Item = Entry; - fn iter(&self) -> impl SeekableIter { + type Iter = MergeIter; + fn iter(&'a self) -> Self::Iter { MergeIter::from(self.sources.iter().map(|s| s.iter())) } - - fn get(&self, key: &[u8]) -> impl SeekableIter { - MergeIter::from(self.sources.iter().map(|s| s.get(key))) - } - - fn get_prefix(&self, prefix: &[u8]) -> impl SeekableIter { - MergeIter::from(self.sources.iter().map(|s| s.get_prefix(prefix))) - } - - fn get_range(&self, start: &[u8], end: &[u8]) -> impl SeekableIter { - MergeIter::from(self.sources.iter().map(|s| s.get_range(start, end))) - } } #[cfg(test)] diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 4ba0d7e..e5659a3 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -176,10 +176,11 @@ impl> SeekableIter for ReaderIter { } } -impl> Source for Reader { +impl<'a, D: AsRef<[u8]>> Source<'a> for Reader { type Item = Entry; + type Iter = ReaderIter; - fn iter(&self) -> impl SeekableIter { + fn iter(&'a self) -> Self::Iter { ReaderIter { reader: self.clone(), next_offset: 0, diff --git a/src/sorter.rs b/src/sorter.rs index 7685530..c8bab83 100644 --- a/src/sorter.rs +++ b/src/sorter.rs @@ -35,15 +35,19 @@ where self.batch_size += esize; } - pub fn source(mut self) -> impl Source { + pub fn source<'a>(&'a mut self) -> impl Source<'a, Item = Entry> { if self.batch.get_mut().len() > 0 { self.write_chunk(); } - Merger::from(self.readers).merge_func(self.merge_func) + Merger::from(self.readers.clone()).merge_func(&self.merge_func) } - pub fn write(self, mut w: Writer) { - self.source().iter().for_each(|e| { + pub fn write(mut self, mut w: Writer) { + if self.batch.get_mut().len() > 0 { + self.write_chunk(); + } + let m = Merger::from(self.readers).merge_func(self.merge_func); + m.iter().for_each(|e| { w.add(e).unwrap(); }); } diff --git a/src/source.rs b/src/source.rs index c7c6c00..6a8015d 100644 --- a/src/source.rs +++ b/src/source.rs @@ -1,34 +1,25 @@ use crate::dupsort_func::DupsortFunc; -use crate::entry::HasPrefix; use crate::filter::FilterSource; use crate::iter::{PrefixIter, RangeIter}; use crate::merge_func::MergeFunc; use crate::{Entry, SeekableIter}; -pub trait Source { - type Item; +pub trait Source<'a> { + type Iter: SeekableIter; + type Item: AsRef<[u8]>; - fn iter(&self) -> impl SeekableIter; - fn get(&self, key: &[u8]) -> impl SeekableIter - where - Self::Item: PartialOrd<[u8]>, - { + fn iter(&'a self) -> Self::Iter; + fn get(&'a self, key: &[u8]) -> RangeIter { RangeIter::new(self.iter(), key, key) } - fn get_prefix(&self, prefix: &[u8]) -> impl SeekableIter - where - Self::Item: HasPrefix, - { + fn get_prefix(&'a self, prefix: &[u8]) -> PrefixIter { PrefixIter::new(self.iter(), prefix) } - fn get_range(&self, start: &[u8], end: &[u8]) -> impl SeekableIter - where - Self::Item: PartialOrd<[u8]>, - { + fn get_range(&'a self, start: &[u8], end: &[u8]) -> RangeIter { RangeIter::new(self.iter(), start, end) } - fn merge_func(self, merge_func: F) -> MergeFunc + fn merge_func(self, merge_func: F) -> MergeFunc<'a, Self, F> where Self: Sized, F: Fn(&mut Vec, &Entry), @@ -36,7 +27,7 @@ pub trait Source { MergeFunc::new(self, merge_func) } - fn dupsort_func(self, dupsort_func: F) -> DupsortFunc + fn dupsort_func(self, dupsort_func: F) -> DupsortFunc<'a, Self, F> where Self: Sized, F: Fn(&Self::Item, &Self::Item) -> std::cmp::Ordering, @@ -44,16 +35,16 @@ pub trait Source { DupsortFunc::new(self, dupsort_func) } - fn filter(self, filter_func: F) -> FilterSource + fn filter(self, filter_func: F) -> FilterSource<'a, Self, F> where Self: Sized, - F: Fn(&Self::Item, &mut dyn SeekableIter) -> bool, + F: Fn(&Self::Item, &mut Vec) -> bool, { FilterSource::new(self, filter_func) } } -struct VecIter<'a> { +pub struct VecIter<'a> { index: usize, vec: &'a Vec, } @@ -87,9 +78,10 @@ impl<'a> SeekableIter for VecIter<'a> { } } -impl Source for Vec { +impl<'a> Source<'a> for Vec { type Item = Entry; - fn iter(&self) -> impl SeekableIter { + type Iter = VecIter<'a>; + fn iter(&'a self) -> Self::Iter { VecIter { index: 0, vec: self, @@ -97,7 +89,6 @@ impl Source for Vec { } } -#[cfg(test)] pub mod test_source { use crate::Entry; use crate::SeekableIter; @@ -133,9 +124,10 @@ pub mod test_source { } } - impl Source for TestSource { + impl<'a> Source<'a> for TestSource { type Item = Entry; - fn iter(&self) -> impl SeekableIter { + type Iter = TestIter<'a>; + fn iter(&'a self) -> Self::Iter { TestIter { source: self, off: 0, @@ -144,12 +136,13 @@ pub mod test_source { } } -#[cfg(test)] pub mod test { use super::test_source::TestSource; + #[allow(unused_imports)] use super::Source; use crate::Entry; + #[allow(dead_code)] fn test_source() -> TestSource { TestSource(vec![ Entry::new(vec![0, 0, 0, 0], vec![0]),