From: Chris Mikkelson Date: Sun, 28 Apr 2024 18:43:04 +0000 (-0500) Subject: Refactor to generic Seekable IntoIterator X-Git-Url: https://git.mikk.net/?a=commitdiff_plain;h=82cd424e076d191455f4ea0b5d25bdc9df06df2d;p=mtbl-rs Refactor to generic Seekable IntoIterator --- diff --git a/src/entry.rs b/src/entry.rs deleted file mode 100644 index 95d7e93..0000000 --- a/src/entry.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::cmp::Ordering; -pub use std::rc::Rc; - -#[derive(Debug, Clone)] -pub struct Entry { - pub key: Rc>, - pub value: Rc>, -} - -impl Entry { - pub fn new() -> Entry { - Entry { - key: Rc::new(Vec::new()), - value: Rc::new(Vec::new()), - } - } - pub fn from_key_value(k: impl AsRef<[u8]>, v: impl AsRef<[u8]>) -> Entry { - let mut key = Vec::::with_capacity(k.as_ref().len()); - let mut value = Vec::::with_capacity(v.as_ref().len()); - - key.extend_from_slice(k.as_ref()); - value.extend_from_slice(v.as_ref()); - - Entry { - key: Rc::new(key), - value: Rc::new(value), - } - } - - pub fn unpack(&self) -> (&[u8], &[u8]) { - (self.key.as_slice(), self.value.as_slice()) - } -} - -// Entries are ordered and compared by key only -impl PartialOrd for Entry { - fn partial_cmp(&self, other: &Self) -> Option { - self.key.partial_cmp(&other.key) - } -} - -impl PartialEq for Entry { - fn eq(&self, other: &Self) -> bool { - self.key == other.key - } -} - -#[cfg(test)] -mod test { - - use crate::*; - - struct TestIter(Entry); - - impl TestIter { - fn new() -> TestIter { - TestIter(Entry::new()) - } - } - - impl Iterator for TestIter { - type Item = Entry; - - fn next(&mut self) -> Option { - println!("key strong_count = {}", Rc::strong_count(&self.0.key)); - Rc::make_mut(&mut self.0.key).push(0); - Some(self.0.clone()) - } - } - - #[test] - fn test_iter() { - for e in TestIter::new() { - if e.key.len() > 3 { - break; - } - } - } - - #[test] - fn test_iter_filter() { - for e in TestIter::new().filter(|x: &Entry| x.key.len() % 2 == 0) { - println!("{:?}", e); - if e.key.len() > 6 { - break; - } - } - } -} diff --git a/src/iter.rs b/src/iter.rs deleted file mode 100644 index eb0e5ef..0000000 --- a/src/iter.rs +++ /dev/null @@ -1,213 +0,0 @@ -use crate::entry::Entry; -use std::cell::RefCell; -use std::iter::Iterator; -use std::rc::Rc; - -pub trait Entries: Sized { - fn seek(&mut self, key: impl AsRef<[u8]>); - - fn iter_next(&mut self) -> Option; - - fn wrap_iter>) -> O, O: Iterator>( - self, - f: F, - ) -> Seekable> { - let b = IterCell::new(self); - let it = f(b.clone().into_iter()); - Seekable(WrapIter { - inner: b, - outer: it, - }) - } - - fn filter(self, filter: F) -> Seekable> - where - F: FnMut(&Entry, &mut Vec) -> FilterAction, - { - Seekable(Filter { - inner: self, - filter_func: filter, - seek_key: Vec::new(), - }) - } -} - -pub struct Seekable(T); - -impl IntoIterator for Seekable { - type Item = Entry; - type IntoIter = Iter; - - fn into_iter(self) -> Self::IntoIter { - Iter(self.0) - } -} - -impl Entries for Seekable { - fn seek(&mut self, key: impl AsRef<[u8]>) { - self.0.seek(key) - } - - fn iter_next(&mut self) -> Option { - self.0.iter_next() - } -} - -pub struct Iter(T); - -impl Iterator for Iter { - type Item = Entry; - - fn next(&mut self) -> Option { - self.0.iter_next() - } -} - -// WrapIter -pub struct IterCell { - ic: Rc>, -} - -impl Clone for IterCell { - fn clone(&self) -> Self { - IterCell { - ic: self.ic.clone(), - } - } -} - -impl IterCell { - fn new(i: I) -> IterCell { - IterCell { - ic: Rc::new(RefCell::new(i)), - } - } -} - -impl IntoIterator for IterCell { - type Item = Entry; - type IntoIter = Iter>; - - fn into_iter(self) -> Self::IntoIter { - Iter(self) - } -} - -impl Entries for IterCell { - fn seek(&mut self, key: impl AsRef<[u8]>) { - self.ic.borrow_mut().seek(key); - } - - fn iter_next(&mut self) -> Option { - self.ic.borrow_mut().iter_next() - } -} - -pub struct WrapIter> { - inner: IterCell, - outer: O, -} - -impl> Entries for WrapIter { - fn seek(&mut self, key: impl AsRef<[u8]>) { - self.inner.seek(key) - } - - fn iter_next(&mut self) -> Option { - self.outer.next() - } -} - -// Filter - -pub enum FilterAction { - Keep, - Skip, - Seek, -} -pub use FilterAction::*; - -pub struct Filter) -> FilterAction> { - inner: I, - filter_func: F, - seek_key: Vec, -} - -impl Entries for Filter -where - I: Entries, - F: FnMut(&Entry, &mut Vec) -> FilterAction, -{ - fn seek(&mut self, key: impl AsRef<[u8]>) { - self.inner.seek(key.as_ref()); - } - - fn iter_next(&mut self) -> Option { - self.seek_key.clear(); - while let Some(e) = self.inner.iter_next() { - match (self.filter_func)(&e, &mut self.seek_key) { - Skip => continue, - Keep => return Some(e), - Seek => self.inner.seek(self.seek_key.as_slice()), - } - } - None - } -} - -#[cfg(test)] -mod test { - use super::*; - - struct TestIter(u8); - - impl Entries for TestIter { - fn iter_next(&mut self) -> Option { - match self.0 { - 255 => None, - _ => { - let res = Entry::from_key_value(vec![self.0], vec![self.0]); - self.0 = self.0 + 1; - Some(res) - } - } - } - - fn seek(&mut self, kr: impl AsRef<[u8]>) { - self.0 = kr.as_ref()[0]; - } - } - - #[test] - fn test_iter_filter() { - let ti = TestIter(0); - let v: Vec = ti - .filter(|e, k| { - let b = e.key[0]; - if b % 2 > 0 { - if b < 60 { - k.push((b + 1) * 4); - return Seek; - } - return Skip; - } - Keep - }) - .into_iter() - .filter(|e| e.key[0] % 5 == 0) - .collect(); - println!("\n\nTEST: iter_filter"); - println!("length = {}", v.len()); - assert!(v.into_iter().all(|e| e.key[0] % 2 == 0)); - } - - #[test] - fn test_iter_wrap() { - let ti = TestIter(0); - let mut mi = ti.wrap_iter(|i| i.filter(|e| e.key[0] % 2 == 0)); - mi.seek(&[100]); - let v: Vec = mi.into_iter().collect(); - println!("\n\nTEST: iter_wrap"); - println!("{:?}", v); - } -} diff --git a/src/lib.rs b/src/lib.rs index 34db328..5335e69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,2 @@ -pub mod entry; -pub mod iter; - -pub use entry::*; -pub use iter::*; +pub mod seekable; +pub use seekable::Seekable; diff --git a/src/seekable.rs b/src/seekable.rs new file mode 100644 index 0000000..6d422f4 --- /dev/null +++ b/src/seekable.rs @@ -0,0 +1,65 @@ +mod filter_map; +mod merge; +pub use filter_map::FilterMap; +pub use merge::Merger; + +pub trait Seekable: Sized { + type Key: Ord; + type Value; + + fn next(&mut self) -> Option<(Self::Key, Self::Value)>; + fn seek(&mut self, key: &Self::Key); + + fn filter_map(self, func: F) -> FilterMap + where + K: Ord + AsRef, + F: FnMut((Self::Key, Self::Value), &mut Self) -> Option<(K, V)>, + { + FilterMap { next: self, func } + } + + fn merge(iter: L) -> Merger + where + L: Iterator, + { + merge::merge(iter) + } +} + +#[derive(Debug)] +pub struct Iter(T); + +impl Iterator for Iter +where + T: Seekable, +{ + type Item = (T::Key, T::Value); + + fn next(&mut self) -> Option { + self.0.next() + } +} + +#[cfg(test)] +mod test { + use super::{Iter, Seekable}; + + struct Empty; + impl IntoIterator for Empty { + type Item = ((), ()); + type IntoIter = Iter; + fn into_iter(self) -> Iter { + Iter(self) + } + } + + impl Seekable for Empty { + type Value = (); + type Key = (); + + fn next(&mut self) -> Option<((), ())> { + None + } + fn seek(&mut self, _key: &Self::Key) {} + } +} diff --git a/src/seekable/filter_map.rs b/src/seekable/filter_map.rs new file mode 100644 index 0000000..b483199 --- /dev/null +++ b/src/seekable/filter_map.rs @@ -0,0 +1,51 @@ +use crate::seekable::{Iter, Seekable}; + +pub struct FilterMap +where + S: Seekable, + K: Ord + AsRef, + F: FnMut((S::Key, S::Value), &mut S) -> Option<(K, V)>, +{ + pub(crate) next: S, + pub(crate) func: F, +} + +impl IntoIterator for FilterMap +where + S: Seekable, + K: Ord + AsRef, + F: FnMut((S::Key, S::Value), &mut S) -> Option<(K, V)>, +{ + type Item = (K, V); + type IntoIter = Iter; + fn into_iter(self) -> Iter { + Iter(self) + } +} + +impl Seekable for FilterMap +where + S: Seekable, + K: Ord + AsRef, + F: FnMut((S::Key, S::Value), &mut S) -> Option<(K, V)>, +{ + type Key = K; + type Value = V; + + fn next(&mut self) -> Option<(K, V)> { + loop { + match self.next.next() { + None => return None, + Some(v) => { + if let Some(i) = (self.func)(v, &mut self.next) { + return Some(i); + } + } + } + } + } + + fn seek(&mut self, key: &K) { + self.next.seek(key.as_ref()) + } +} diff --git a/src/seekable/merge.rs b/src/seekable/merge.rs new file mode 100644 index 0000000..c099b34 --- /dev/null +++ b/src/seekable/merge.rs @@ -0,0 +1,237 @@ +use crate::seekable::{Iter, Seekable}; +use std::cell::RefCell; +use std::cmp::Ordering; +use std::collections::BinaryHeap; + +pub struct Merger +where + S: Seekable, +{ + active: BinaryHeap>, + ended: Vec, +} + +struct MergerEnt { + cur_item: RefCell<(S::Key, S::Value)>, + rest: S, +} + +impl Eq for MergerEnt {} +impl PartialEq for MergerEnt { + fn eq(&self, other: &Self) -> bool { + let sent = self.cur_item.borrow(); + let oent = other.cur_item.borrow(); + sent.0.eq(&oent.0) + } +} + +// Because std::collections::BinaryHeap implements a max +// heap and we need a min-heap, we reverse the sense of Ord +// for MergerEnt vs. S::Key. We do this instead of using +// std::cmp::Reverse to avoid the additional packing/unpacking +// boilerplate. +impl PartialOrd for MergerEnt { + fn partial_cmp(&self, other: &Self) -> Option { + let sent = self.cur_item.borrow(); + let oent = other.cur_item.borrow(); + oent.0.partial_cmp(&sent.0) + } +} + +impl Ord for MergerEnt { + fn cmp(&self, other: &Self) -> Ordering { + let sent = self.cur_item.borrow(); + let oent = other.cur_item.borrow(); + oent.0.cmp(&sent.0) + } +} + +impl MergerEnt { + fn new(mut source: S) -> Option> { + Some(MergerEnt { + cur_item: RefCell::new(source.next()?), + rest: source, + }) + } + + fn replace(&mut self, v: (S::Key, S::Value)) -> (S::Key, S::Value) { + self.cur_item.replace(v) + } +} + +pub fn merge(sources: L) -> Merger +where + L: Iterator, + S: Seekable, +{ + Merger { + active: BinaryHeap::from( + sources + .into_iter() + .filter_map(|i| Some(MergerEnt::new(i)?)) + .collect::>>(), + ), + ended: Vec::new(), + } +} + +impl IntoIterator for Merger { + type Item = (S::Key, S::Value); + type IntoIter = Iter; + fn into_iter(self) -> Self::IntoIter { + Iter(self) + } +} + +fn heap_reset(m: &mut Merger, key: &S::Key) { + let mut active = Vec::>::new(); + let mut ended = Vec::::new(); + while let Some(e) = m.active.pop() { + m.ended.push(e.rest); + } + while let Some(mut s) = m.ended.pop() { + s.seek(key); + if let Some(next_item) = s.next() { + active.push(MergerEnt { + cur_item: RefCell::new(next_item), + rest: s, + }); + } else { + ended.push(s) + } + } + + m.ended = ended; + m.active = BinaryHeap::from(active); +} + +// forward seek within the heap +fn heap_seek(m: &mut Merger, key: &S::Key) -> Option<()> { + loop { + let mut head = m.active.peek_mut()?; + if head.cur_item.borrow().0 >= *key { + break; + } + head.rest.seek(key); + if let Some(next) = head.rest.next() { + head.replace(next); + continue; + } + drop(head); // release heap for modification + let head = m.active.pop()?; + m.ended.push(head.rest); + } + None +} + +impl Seekable for Merger { + type Key = S::Key; + type Value = S::Value; + + fn next(&mut self) -> Option<(Self::Key, Self::Value)> { + let mut head = self.active.peek_mut()?; + if let Some(next) = head.rest.next() { + return Some(head.replace(next)); + } + drop(head); // release heap for mutation below + let head = self.active.pop()?; + self.ended.push(head.rest); + Some(head.cur_item.into_inner()) + } + + fn seek(&mut self, key: &Self::Key) { + /* seek forward only */ + let head = self.active.peek_mut(); + if head.is_none() { + return; + } + if head.unwrap().cur_item.borrow().0 > *key { + heap_reset(self, key); + return; + } + heap_seek(self, key); + } +} + +#[cfg(test)] +mod test { + use crate::seekable::{Iter, Seekable}; + use std::cmp::Ordering; + + #[derive(Debug)] + struct SeekableVec { + off: usize, + v: Vec, + } + impl IntoIterator for SeekableVec { + type Item = (T, ()); + type IntoIter = Iter; + fn into_iter(self) -> Iter { + Iter(self) + } + } + impl Seekable for SeekableVec { + type Key = T; + type Value = (); + fn next(&mut self) -> Option<(T, ())> { + if self.off >= self.v.len() { + return None; + } + let v = self.v[self.off]; + self.off += 1; + Some((v, ())) + } + fn seek(&mut self, key: &T) { + self.off = 0; + while self.off < self.v.len() { + if self.v[self.off] >= *key { + break; + } + self.off += 1; + } + } + } + impl From> for SeekableVec { + fn from(vec: Vec) -> SeekableVec { + let mut sv = SeekableVec { off: 0, v: vec }; + sv.v.sort(); + sv + } + } + + #[test] + fn test_seekablevec() { + let mut v = vec![3, 1, 4, 1, 5, 9]; + let mut sv = SeekableVec::from(v.clone()); + v.sort(); + sv.seek(&3); + assert_eq!( + sv.into_iter() + .map(|e| e.0) + .cmp(v.into_iter().filter(|i| *i >= 3)), + Ordering::Equal + ); + } + + #[test] + fn test_merge_sv() { + let vpi = vec![3, 1, 4, 1, 5, 9]; + let mut ve = vec![2, 7, 1, 8, 2, 8]; + let mut mv = Seekable::merge( + vec![ + SeekableVec::from(vpi.clone()), + SeekableVec::from(ve.clone()), + ] + .into_iter(), + ); + mv.seek(&5); + ve.extend(vpi); + ve.sort(); + assert_eq!( + mv.into_iter() + .map(|e| e.0) + .cmp(ve.into_iter().filter(|i| *i >= 5)), + Ordering::Equal + ); + } +} diff --git a/tests/entry.rs b/tests/entry.rs deleted file mode 100644 index 343cb68..0000000 --- a/tests/entry.rs +++ /dev/null @@ -1,9 +0,0 @@ -use mtbl; - -#[test] -fn from_key_value() { - let e = mtbl::Entry::from_key_value(vec![0u8, 1, 2, 3], vec![4u8, 5, 6, 7]); - println!("{:?}", e); - let (k, v) = e.unpack(); - println!("{:?}", (k, v)); -}