From 2121678c0183e39de91d97db945a8e8a0713b908 Mon Sep 17 00:00:00 2001 From: Chris Mikkelson Date: Fri, 2 May 2025 00:42:52 -0500 Subject: [PATCH] Lots of WIP on new lending "Cursor" abstraction Added merger of cursors and of duplicates. IntoIterator implementations for Cursors which maintain a local copy of the value modified to use take() instead of .clone()ing values. Getting closer to zero copy. Merger uses Ord wrappers on the cursors themselves to provide duplicate sorting or unsorting, which removes the need to copy values to wrap them in an ordering wrapper. Overall, code is getting simpler with fewer copies, a win-win. --- src/cursor/filter.rs | 15 ++- src/cursor/filtermap.rs | 51 +++++++++- src/cursor/map.rs | 39 ++++++-- src/cursor/merge.rs | 130 ++++++++++++++++++++++++++ src/cursor/merger.rs | 201 ++++++++++++++++++++++++++++++++++++++++ src/cursor/mod.rs | 34 ++++++- src/cursor/range.rs | 63 +++++++++++++ 7 files changed, 514 insertions(+), 19 deletions(-) create mode 100644 src/cursor/merge.rs create mode 100644 src/cursor/merger.rs create mode 100644 src/cursor/range.rs diff --git a/src/cursor/filter.rs b/src/cursor/filter.rs index 19c9632..a9420e3 100644 --- a/src/cursor/filter.rs +++ b/src/cursor/filter.rs @@ -1,4 +1,4 @@ -use crate::cursor::Cursor; +use super::{Cursor, KVIter}; pub struct Filter where @@ -60,6 +60,19 @@ where } } +impl IntoIterator for Filter +where + C: Cursor, + C::Value: Clone, + F: FnMut((&[u8], &C::Value), &mut Vec) -> bool, +{ + type Item = (Vec, C::Value); + type IntoIter = KVIter; + fn into_iter(self) -> Self::IntoIter { + Cursor::to_iter(self) + } +} + // TODO: Map, FilterMap and associated methods on Cursor. // Implement timeout semantics in FilterMap, mapping value to // Result where E can indicate timeout. Requires rest of diff --git a/src/cursor/filtermap.rs b/src/cursor/filtermap.rs index a08f721..8177da0 100644 --- a/src/cursor/filtermap.rs +++ b/src/cursor/filtermap.rs @@ -68,10 +68,53 @@ where } } -// TODO: Map, FilterMap and associated methods on Cursor. -// Implement timeout semantics in FilterMap, mapping value to -// Result where E can indicate timeout. Requires rest of -// stack to handle the result, but equivalent to dnstable_res_timeout. +impl IntoIterator for FilterMap +where + C: Cursor, + O: Clone, + F: FnMut((&[u8], &C::Value), &mut Vec) -> Option, +{ + type Item = (Vec, O); + type IntoIter = FilterMapIter; + + fn into_iter(mut self) -> Self::IntoIter { + if self.get().is_none() { + self.advance(); + } + FilterMapIter { + cursor: self, + first: true, + } + } +} + +struct FilterMapIter +where + C: Cursor, + O: Clone, + F: FnMut((&[u8], &C::Value), &mut Vec) -> Option, +{ + cursor: FilterMap, + first: bool, +} + +impl Iterator for FilterMapIter +where + C: Cursor, + O: Clone, + F: FnMut((&[u8], &C::Value), &mut Vec) -> Option, +{ + type Item = (Vec, O); + + fn next(&mut self) -> Option { + if !self.first { + self.cursor.advance(); + } + self.first = false; + let (k, _) = self.cursor.get()?; + self.cursor.value.take().map(|v| (Vec::from(k), v)) + } +} #[cfg(test)] mod test { diff --git a/src/cursor/map.rs b/src/cursor/map.rs index fada0b8..03c0a3f 100644 --- a/src/cursor/map.rs +++ b/src/cursor/map.rs @@ -1,4 +1,4 @@ -use crate::cursor::{Cursor, KVIter}; +use super::Cursor; pub struct Map where @@ -31,26 +31,45 @@ where } } -impl<'a, C, F, O> Iterator for &'a Map +impl IntoIterator for Map where C: Cursor, F: FnMut((&[u8], &C::Value)) -> O, { - type Item = (&'a [u8], &'a O); - fn next(&mut self) -> Option { - self.get() + type Item = (Vec, O); + type IntoIter = MapIter; + fn into_iter(mut self) -> Self::IntoIter { + if self.get().is_none() { + self.advance(); + } + MapIter { + cursor: self, + first: true, + } } } -impl IntoIterator for Map +struct MapIter +where + C: Cursor, + F: FnMut((&[u8], &C::Value)) -> O, +{ + cursor: Map, + first: bool, +} + +impl Iterator for MapIter where C: Cursor, F: FnMut((&[u8], &C::Value)) -> O, - O: Clone, { type Item = (Vec, O); - type IntoIter = KVIter; - fn into_iter(self) -> Self::IntoIter { - KVIter(self) + fn next(&mut self) -> Option { + if !self.first { + self.cursor.advance(); + } + self.first = false; + let (k, _) = self.cursor.get()?; + self.cursor.value.take().map(|v| (Vec::from(k), v)) } } diff --git a/src/cursor/merge.rs b/src/cursor/merge.rs new file mode 100644 index 0000000..dfa1bb4 --- /dev/null +++ b/src/cursor/merge.rs @@ -0,0 +1,130 @@ +use super::Cursor; +use std::borrow::Borrow; + +struct Merge +where + C: Cursor, + C::Value: ToOwned, + F: FnMut(&mut ::Owned, &C::Value), +{ + cursor: C, + merge: F, + needs_advance: bool, + merge_key: Vec, + merge_val: Option<::Owned>, +} + +impl Merge +where + C: Cursor, + C::Value: ToOwned, + F: FnMut(&mut ::Owned, &C::Value), +{ + pub fn new(cursor: C, merge: F) -> Self { + Self { + cursor, + merge, + needs_advance: true, + merge_key: Vec::new(), + merge_val: None, + } + } + + fn do_merge(&mut self) { + match self.cursor.get() { + None => { + self.merge_val.take(); + return; + } + Some((k, v)) => { + k.clone_into(&mut self.merge_key); + self.merge_val + .as_mut() + .map(|mv| v.clone_into(mv)) + .or_else(|| { + self.merge_val.insert(v.to_owned()); + None + }); + } + } + + while let Some((k, v)) = self.cursor.get() { + if k != self.merge_key.as_slice() { + break; + } + self.merge_val.as_mut().map(|mv| (self.merge)(mv, v)); + } + } +} + +impl Cursor for Merge +where + C: Cursor, + C::Value: ToOwned, + F: FnMut(&mut ::Owned, &C::Value), +{ + type Value = C::Value; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + let key = self.merge_key.as_slice(); + self.merge_val.as_ref().map(|val| (key, val.borrow())) + } + + fn advance(&mut self) { + self.cursor.advance(); + self.do_merge(); + } + + fn seek(&mut self, key: &[u8]) { + self.cursor.seek(key); + self.do_merge(); + } +} + +impl IntoIterator for Merge +where + C: Cursor, + C::Value: ToOwned, + F: FnMut(&mut ::Owned, &C::Value), +{ + type Item = (Vec, ::Owned); + type IntoIter = MergeIter; + fn into_iter(mut self) -> Self::IntoIter { + if self.get().is_none() { + self.advance() + } + MergeIter { + cursor: self, + first: true, + } + } +} + +pub struct MergeIter +where + C: Cursor, + C::Value: ToOwned, + F: FnMut(&mut ::Owned, &C::Value), +{ + cursor: Merge, + first: bool, +} + +impl Iterator for MergeIter +where + C: Cursor, + C::Value: ToOwned, + F: FnMut(&mut ::Owned, &C::Value), +{ + type Item = (Vec, ::Owned); + fn next(&mut self) -> Option { + if !self.first { + self.cursor.advance(); + } + self.first = false; + self.cursor + .merge_val + .take() + .map(|mv| (self.cursor.merge_key.clone(), mv)) + } +} diff --git a/src/cursor/merger.rs b/src/cursor/merger.rs new file mode 100644 index 0000000..49e3080 --- /dev/null +++ b/src/cursor/merger.rs @@ -0,0 +1,201 @@ +use super::{Cursor, KVIter}; +use std::cmp::Reverse; +use std::collections::BinaryHeap; + +pub struct MergeCursor { + pending: Vec, + active: BinaryHeap>, + last: Vec, +} + +impl MergeCursor> { + fn from(iter: I) -> Self + where + I: IntoIterator, + { + Self { + pending: iter.into_iter().map(|c| KeyOrd(c)).collect::>(), + active: BinaryHeap::new(), + last: Vec::new(), + } + } +} + +impl MergeCursor> +where + C::Value: Ord, +{ + fn from(iter: I) -> Self + where + I: IntoIterator, + { + Self { + pending: iter.into_iter().map(|c| KeyValOrd(c)).collect::>(), + active: BinaryHeap::new(), + last: Vec::new(), + } + } +} + +impl Cursor for MergeCursor { + type Value = C::Value; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + self.active.peek().map(|Reverse(c)| c.get()).flatten() + } + + fn advance(&mut self) { + if !self.pending.is_empty() { + self.active = BinaryHeap::from_iter(self.pending.drain(..).map(|mut c| { + c.advance(); + Reverse(c) + })); + self.active + .peek() + .map(|Reverse(c)| c.get().map(|(k, _)| k.clone_into(&mut self.last))); + } else if let Some(mut rcur) = self.active.peek_mut() { + rcur.0.get().map(|(k, _)| k.clone_into(&mut self.last)); + rcur.0.advance(); + // XXX cannot remove a cursor when `get()` return None after an advance. + // Finished cursors will remain at the bottom of the heap. + } + } + + fn seek(&mut self, key: &[u8]) { + if !self.pending.is_empty() { + self.active = BinaryHeap::from_iter(self.pending.drain(..).map(|mut c| { + c.seek(key); + Reverse(c) + })); + } else if key >= self.last.as_slice() { + while let Some(mut rcur) = self.active.peek_mut() { + let cur = &mut rcur.0; + if let Some((k, _)) = cur.get() { + if k >= key { + break; + } + cur.seek(key); + } + } + } else { + /* backward seek, reset all cursors */ + self.active = BinaryHeap::from_iter(self.active.drain().map(|Reverse(mut c)| { + c.seek(key); + Reverse(c) + })); + } + key.clone_into(&mut self.last); + } +} + +impl IntoIterator for MergeCursor +where + C: Cursor + Ord, + C::Value: Clone, +{ + type Item = (Vec, C::Value); + type IntoIter = KVIter; + fn into_iter(self) -> Self::IntoIter { + Cursor::to_iter(self) + } +} + +fn test_merger_construction(i: impl Iterator) +where + C::Value: Ord, +{ + let _ = MergeCursor::>::from(i); +} + +pub struct KeyOrd(C); +pub struct KeyValOrd(C); + +// Ordering implementation for KeyOrd, KeyValOrd +impl Cursor for KeyOrd { + type Value = C::Value; + fn get(&self) -> Option<(&[u8], &Self::Value)> { + self.0.get() + } + fn advance(&mut self) { + self.0.advance(); + } + fn seek(&mut self, key: &[u8]) { + self.0.seek(key); + } +} + +impl Cursor for KeyValOrd +where + C::Value: Ord, +{ + type Value = C::Value; + fn get(&self) -> Option<(&[u8], &Self::Value)> { + self.0.get() + } + fn advance(&mut self) { + self.0.advance(); + } + fn seek(&mut self, key: &[u8]) { + self.0.seek(key); + } +} + +impl PartialEq for KeyOrd { + fn eq(&self, other: &Self) -> bool { + let sv = self.0.get().map(|(k, _)| k); + let ov = other.0.get().map(|(k, _)| k); + sv == ov + } +} + +impl PartialEq for KeyValOrd +where + C::Value: Ord, +{ + fn eq(&self, other: &Self) -> bool { + let sv = self.0.get(); + let ov = other.0.get(); + sv == ov + } +} + +impl Eq for KeyOrd {} +impl Eq for KeyValOrd where C::Value: Ord {} + +impl PartialOrd for KeyOrd { + fn partial_cmp(&self, other: &Self) -> Option { + let ks = self.0.get().map(|(k, _)| k); + let ko = other.0.get().map(|(k, _)| k); + Some(ks.cmp(&ko)) + } +} + +impl PartialOrd for KeyValOrd +where + C::Value: Ord, +{ + fn partial_cmp(&self, other: &Self) -> Option { + let sp = self.0.get(); + let op = other.0.get(); + Some(sp.cmp(&op)) + } +} + +impl Ord for KeyOrd { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + let sk = self.0.get().map(|(k, _)| k); + let ok = other.0.get().map(|(k, _)| k); + sk.cmp(&ok) + } +} + +impl Ord for KeyValOrd +where + C::Value: Ord, +{ + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + let sk = self.0.get(); + let ok = other.0.get(); + sk.cmp(&ok) + } +} diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index 94b0fbe..756d668 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -1,6 +1,9 @@ pub mod filter; pub mod filtermap; pub mod map; +pub mod merge; +pub mod merger; +pub mod range; pub trait Cursor { type Value; @@ -11,12 +14,15 @@ pub trait Cursor { fn seek(&mut self, key: &[u8]); // provided methods - fn into_iter(self) -> KVIter + fn to_iter(mut self) -> KVIter where Self: Sized, Self::Value: Clone, { - KVIter(self) + if self.get().is_none() { + self.advance(); + } + KVIter(self, true) } fn filter(self, filter: F) -> filter::Filter @@ -36,7 +42,7 @@ pub trait Cursor { } } -pub struct KVIter(pub T); +pub struct KVIter(T, bool); impl Iterator for KVIter where @@ -45,7 +51,11 @@ where { type Item = (Vec, C::Value); fn next(&mut self) -> Option { - self.0.advance(); + let first = self.1; + if !first { + self.0.advance(); + } + self.1 = false; self.0.get().map(|(k, v)| (Vec::from(k), v.clone())) } } @@ -65,3 +75,19 @@ impl Cursor for KVIter { self.0.seek(key); } } + +impl Cursor for Box { + type Value = C::Value; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + self.as_ref().get() + } + + fn advance(&mut self) { + self.as_mut().advance(); + } + + fn seek(&mut self, key: &[u8]) { + self.as_mut().seek(key); + } +} diff --git a/src/cursor/range.rs b/src/cursor/range.rs new file mode 100644 index 0000000..ecb8273 --- /dev/null +++ b/src/cursor/range.rs @@ -0,0 +1,63 @@ +use super::Cursor; + +pub struct RangeCursor { + cursor: C, + begin: Vec, + end: Vec, +} + +impl RangeCursor { + pub fn new(cursor: C, begin: &[u8], end: &[u8]) -> Self { + let begin = Vec::from(begin.as_ref()); + let end = Vec::from(end); + Self { cursor, begin, end } + } +} + +impl Cursor for RangeCursor { + type Value = C::Value; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + self.cursor + .get() + .filter(|(k, _)| *k >= self.begin.as_slice() && *k <= self.end.as_slice()) + } + + fn advance(&mut self) { + self.cursor.advance(); + } + + fn seek(&mut self, key: &[u8]) { + self.cursor.seek(key); + } +} + +pub struct PrefixCursor { + cursor: C, + prefix: Vec, +} + +impl PrefixCursor { + pub fn new(cursor: C, prefix: &[u8]) -> Self { + let prefix = Vec::from(prefix); + Self { cursor, prefix } + } +} + +impl Cursor for PrefixCursor { + type Value = C::Value; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + self.cursor + .get() + .filter(|(k, _)| k.starts_with(self.prefix.as_slice())) + } + + fn advance(&mut self) { + self.cursor.advance(); + } + + fn seek(&mut self, key: &[u8]) { + self.cursor.seek(key); + } +} -- 2.50.1