]> git.mikk.net Git - mtbl-rs/commitdiff
Lots of WIP on new lending "Cursor" abstraction
authorChris Mikkelson <chris@mikk.net>
Fri, 2 May 2025 05:42:52 +0000 (00:42 -0500)
committerChris Mikkelson <chris@mikk.net>
Fri, 2 May 2025 05:43:32 +0000 (00:43 -0500)
Added merger of cursors and of duplicates. IntoIterator implementations
for Cursors which maintain a local copy of the value modified to use
take() instead of .clone()ing values. Getting closer to zero copy.

Merger uses Ord wrappers on the cursors themselves to provide duplicate
sorting or unsorting, which removes the need to copy values to wrap them
in an ordering wrapper.

Overall, code is getting simpler with fewer copies, a win-win.

src/cursor/filter.rs
src/cursor/filtermap.rs
src/cursor/map.rs
src/cursor/merge.rs [new file with mode: 0644]
src/cursor/merger.rs [new file with mode: 0644]
src/cursor/mod.rs
src/cursor/range.rs [new file with mode: 0644]

index 19c96323c36015251fe0f25e020d90ba526fd423..a9420e3db5a875a204479aa1c1ffae4e7283efed 100644 (file)
@@ -1,4 +1,4 @@
-use crate::cursor::Cursor;
+use super::{Cursor, KVIter};
 
 pub struct Filter<C, F>
 where
@@ -60,6 +60,19 @@ where
     }
 }
 
+impl<C, F> IntoIterator for Filter<C, F>
+where
+    C: Cursor,
+    C::Value: Clone,
+    F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
+{
+    type Item = (Vec<u8>, C::Value);
+    type IntoIter = KVIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        Cursor::to_iter(self)
+    }
+}
+
 // TODO: Map, FilterMap and associated methods on Cursor.
 // Implement timeout semantics in FilterMap, mapping value to
 // Result<V,E> where E can indicate timeout. Requires rest of
index a08f7217f0a3ee79d8a4728211febedb441c1baa..8177da0d1436282f08cd3467e1455575d22bbea1 100644 (file)
@@ -68,10 +68,53 @@ where
     }
 }
 
-// TODO: Map, FilterMap and associated methods on Cursor.
-// Implement timeout semantics in FilterMap, mapping value to
-// Result<V,E> where E can indicate timeout. Requires rest of
-// stack to handle the result, but equivalent to dnstable_res_timeout.
+impl<C, F, O> IntoIterator for FilterMap<C, F, O>
+where
+    C: Cursor,
+    O: Clone,
+    F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+    type Item = (Vec<u8>, O);
+    type IntoIter = FilterMapIter<C, F, O>;
+
+    fn into_iter(mut self) -> Self::IntoIter {
+        if self.get().is_none() {
+            self.advance();
+        }
+        FilterMapIter {
+            cursor: self,
+            first: true,
+        }
+    }
+}
+
+struct FilterMapIter<C, F, O>
+where
+    C: Cursor,
+    O: Clone,
+    F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+    cursor: FilterMap<C, F, O>,
+    first: bool,
+}
+
+impl<C, F, O> Iterator for FilterMapIter<C, F, O>
+where
+    C: Cursor,
+    O: Clone,
+    F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+    type Item = (Vec<u8>, O);
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if !self.first {
+            self.cursor.advance();
+        }
+        self.first = false;
+        let (k, _) = self.cursor.get()?;
+        self.cursor.value.take().map(|v| (Vec::from(k), v))
+    }
+}
 
 #[cfg(test)]
 mod test {
index fada0b8991ae86d0f5e0d4d7c2024e6f4933cf2e..03c0a3faeec5b71937e765ec77b058a02bb81d74 100644 (file)
@@ -1,4 +1,4 @@
-use crate::cursor::{Cursor, KVIter};
+use super::Cursor;
 
 pub struct Map<C, F, O>
 where
@@ -31,26 +31,45 @@ where
     }
 }
 
-impl<'a, C, F, O> Iterator for &'a Map<C, F, O>
+impl<C, F, O> IntoIterator for Map<C, F, O>
 where
     C: Cursor,
     F: FnMut((&[u8], &C::Value)) -> O,
 {
-    type Item = (&'a [u8], &'a O);
-    fn next(&mut self) -> Option<Self::Item> {
-        self.get()
+    type Item = (Vec<u8>, O);
+    type IntoIter = MapIter<C, F, O>;
+    fn into_iter(mut self) -> Self::IntoIter {
+        if self.get().is_none() {
+            self.advance();
+        }
+        MapIter {
+            cursor: self,
+            first: true,
+        }
     }
 }
 
-impl<C, F, O> IntoIterator for Map<C, F, O>
+struct MapIter<C, F, O>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value)) -> O,
+{
+    cursor: Map<C, F, O>,
+    first: bool,
+}
+
+impl<C, F, O> Iterator for MapIter<C, F, O>
 where
     C: Cursor,
     F: FnMut((&[u8], &C::Value)) -> O,
-    O: Clone,
 {
     type Item = (Vec<u8>, O);
-    type IntoIter = KVIter<Self>;
-    fn into_iter(self) -> Self::IntoIter {
-        KVIter(self)
+    fn next(&mut self) -> Option<Self::Item> {
+        if !self.first {
+            self.cursor.advance();
+        }
+        self.first = false;
+        let (k, _) = self.cursor.get()?;
+        self.cursor.value.take().map(|v| (Vec::from(k), v))
     }
 }
diff --git a/src/cursor/merge.rs b/src/cursor/merge.rs
new file mode 100644 (file)
index 0000000..dfa1bb4
--- /dev/null
@@ -0,0 +1,130 @@
+use super::Cursor;
+use std::borrow::Borrow;
+
+struct Merge<C, F>
+where
+    C: Cursor,
+    C::Value: ToOwned,
+    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+    cursor: C,
+    merge: F,
+    needs_advance: bool,
+    merge_key: Vec<u8>,
+    merge_val: Option<<C::Value as ToOwned>::Owned>,
+}
+
+impl<C, F> Merge<C, F>
+where
+    C: Cursor,
+    C::Value: ToOwned,
+    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+    pub fn new(cursor: C, merge: F) -> Self {
+        Self {
+            cursor,
+            merge,
+            needs_advance: true,
+            merge_key: Vec::new(),
+            merge_val: None,
+        }
+    }
+
+    fn do_merge(&mut self) {
+        match self.cursor.get() {
+            None => {
+                self.merge_val.take();
+                return;
+            }
+            Some((k, v)) => {
+                k.clone_into(&mut self.merge_key);
+                self.merge_val
+                    .as_mut()
+                    .map(|mv| v.clone_into(mv))
+                    .or_else(|| {
+                        self.merge_val.insert(v.to_owned());
+                        None
+                    });
+            }
+        }
+
+        while let Some((k, v)) = self.cursor.get() {
+            if k != self.merge_key.as_slice() {
+                break;
+            }
+            self.merge_val.as_mut().map(|mv| (self.merge)(mv, v));
+        }
+    }
+}
+
+impl<C, F> Cursor for Merge<C, F>
+where
+    C: Cursor,
+    C::Value: ToOwned,
+    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+    type Value = C::Value;
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        let key = self.merge_key.as_slice();
+        self.merge_val.as_ref().map(|val| (key, val.borrow()))
+    }
+
+    fn advance(&mut self) {
+        self.cursor.advance();
+        self.do_merge();
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.cursor.seek(key);
+        self.do_merge();
+    }
+}
+
+impl<C, F> IntoIterator for Merge<C, F>
+where
+    C: Cursor,
+    C::Value: ToOwned,
+    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+    type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
+    type IntoIter = MergeIter<C, F>;
+    fn into_iter(mut self) -> Self::IntoIter {
+        if self.get().is_none() {
+            self.advance()
+        }
+        MergeIter {
+            cursor: self,
+            first: true,
+        }
+    }
+}
+
+pub struct MergeIter<C, F>
+where
+    C: Cursor,
+    C::Value: ToOwned,
+    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+    cursor: Merge<C, F>,
+    first: bool,
+}
+
+impl<C, F> Iterator for MergeIter<C, F>
+where
+    C: Cursor,
+    C::Value: ToOwned,
+    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+    type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
+    fn next(&mut self) -> Option<Self::Item> {
+        if !self.first {
+            self.cursor.advance();
+        }
+        self.first = false;
+        self.cursor
+            .merge_val
+            .take()
+            .map(|mv| (self.cursor.merge_key.clone(), mv))
+    }
+}
diff --git a/src/cursor/merger.rs b/src/cursor/merger.rs
new file mode 100644 (file)
index 0000000..49e3080
--- /dev/null
@@ -0,0 +1,201 @@
+use super::{Cursor, KVIter};
+use std::cmp::Reverse;
+use std::collections::BinaryHeap;
+
+pub struct MergeCursor<C: Cursor + Ord> {
+    pending: Vec<C>,
+    active: BinaryHeap<Reverse<C>>,
+    last: Vec<u8>,
+}
+
+impl<C: Cursor> MergeCursor<KeyOrd<C>> {
+    fn from<I>(iter: I) -> Self
+    where
+        I: IntoIterator<Item = C>,
+    {
+        Self {
+            pending: iter.into_iter().map(|c| KeyOrd(c)).collect::<Vec<_>>(),
+            active: BinaryHeap::new(),
+            last: Vec::new(),
+        }
+    }
+}
+
+impl<C: Cursor> MergeCursor<KeyValOrd<C>>
+where
+    C::Value: Ord,
+{
+    fn from<I>(iter: I) -> Self
+    where
+        I: IntoIterator<Item = C>,
+    {
+        Self {
+            pending: iter.into_iter().map(|c| KeyValOrd(c)).collect::<Vec<_>>(),
+            active: BinaryHeap::new(),
+            last: Vec::new(),
+        }
+    }
+}
+
+impl<C: Cursor + Ord> Cursor for MergeCursor<C> {
+    type Value = C::Value;
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        self.active.peek().map(|Reverse(c)| c.get()).flatten()
+    }
+
+    fn advance(&mut self) {
+        if !self.pending.is_empty() {
+            self.active = BinaryHeap::from_iter(self.pending.drain(..).map(|mut c| {
+                c.advance();
+                Reverse(c)
+            }));
+            self.active
+                .peek()
+                .map(|Reverse(c)| c.get().map(|(k, _)| k.clone_into(&mut self.last)));
+        } else if let Some(mut rcur) = self.active.peek_mut() {
+            rcur.0.get().map(|(k, _)| k.clone_into(&mut self.last));
+            rcur.0.advance();
+            // XXX cannot remove a cursor when `get()` return None after an advance.
+            // Finished cursors will remain at the bottom of the heap.
+        }
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        if !self.pending.is_empty() {
+            self.active = BinaryHeap::from_iter(self.pending.drain(..).map(|mut c| {
+                c.seek(key);
+                Reverse(c)
+            }));
+        } else if key >= self.last.as_slice() {
+            while let Some(mut rcur) = self.active.peek_mut() {
+                let cur = &mut rcur.0;
+                if let Some((k, _)) = cur.get() {
+                    if k >= key {
+                        break;
+                    }
+                    cur.seek(key);
+                }
+            }
+        } else {
+            /* backward seek, reset all cursors */
+            self.active = BinaryHeap::from_iter(self.active.drain().map(|Reverse(mut c)| {
+                c.seek(key);
+                Reverse(c)
+            }));
+        }
+        key.clone_into(&mut self.last);
+    }
+}
+
+impl<C> IntoIterator for MergeCursor<C>
+where
+    C: Cursor + Ord,
+    C::Value: Clone,
+{
+    type Item = (Vec<u8>, C::Value);
+    type IntoIter = KVIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        Cursor::to_iter(self)
+    }
+}
+
+fn test_merger_construction<C: Cursor>(i: impl Iterator<Item = C>)
+where
+    C::Value: Ord,
+{
+    let _ = MergeCursor::<KeyOrd<_>>::from(i);
+}
+
+pub struct KeyOrd<C: Cursor>(C);
+pub struct KeyValOrd<C: Cursor>(C);
+
+// Ordering implementation for KeyOrd, KeyValOrd
+impl<C: Cursor> Cursor for KeyOrd<C> {
+    type Value = C::Value;
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        self.0.get()
+    }
+    fn advance(&mut self) {
+        self.0.advance();
+    }
+    fn seek(&mut self, key: &[u8]) {
+        self.0.seek(key);
+    }
+}
+
+impl<C: Cursor> Cursor for KeyValOrd<C>
+where
+    C::Value: Ord,
+{
+    type Value = C::Value;
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        self.0.get()
+    }
+    fn advance(&mut self) {
+        self.0.advance();
+    }
+    fn seek(&mut self, key: &[u8]) {
+        self.0.seek(key);
+    }
+}
+
+impl<C: Cursor> PartialEq for KeyOrd<C> {
+    fn eq(&self, other: &Self) -> bool {
+        let sv = self.0.get().map(|(k, _)| k);
+        let ov = other.0.get().map(|(k, _)| k);
+        sv == ov
+    }
+}
+
+impl<C: Cursor> PartialEq for KeyValOrd<C>
+where
+    C::Value: Ord,
+{
+    fn eq(&self, other: &Self) -> bool {
+        let sv = self.0.get();
+        let ov = other.0.get();
+        sv == ov
+    }
+}
+
+impl<C: Cursor> Eq for KeyOrd<C> {}
+impl<C: Cursor> Eq for KeyValOrd<C> where C::Value: Ord {}
+
+impl<C: Cursor> PartialOrd for KeyOrd<C> {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        let ks = self.0.get().map(|(k, _)| k);
+        let ko = other.0.get().map(|(k, _)| k);
+        Some(ks.cmp(&ko))
+    }
+}
+
+impl<C: Cursor> PartialOrd for KeyValOrd<C>
+where
+    C::Value: Ord,
+{
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        let sp = self.0.get();
+        let op = other.0.get();
+        Some(sp.cmp(&op))
+    }
+}
+
+impl<C: Cursor> Ord for KeyOrd<C> {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        let sk = self.0.get().map(|(k, _)| k);
+        let ok = other.0.get().map(|(k, _)| k);
+        sk.cmp(&ok)
+    }
+}
+
+impl<C: Cursor> Ord for KeyValOrd<C>
+where
+    C::Value: Ord,
+{
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        let sk = self.0.get();
+        let ok = other.0.get();
+        sk.cmp(&ok)
+    }
+}
index 94b0fbeb1abc4450a6917c510662721232f49b75..756d66839b5a25d68c7314c0221ebf6438a57266 100644 (file)
@@ -1,6 +1,9 @@
 pub mod filter;
 pub mod filtermap;
 pub mod map;
+pub mod merge;
+pub mod merger;
+pub mod range;
 
 pub trait Cursor {
     type Value;
@@ -11,12 +14,15 @@ pub trait Cursor {
     fn seek(&mut self, key: &[u8]);
 
     // provided methods
-    fn into_iter(self) -> KVIter<Self>
+    fn to_iter(mut self) -> KVIter<Self>
     where
         Self: Sized,
         Self::Value: Clone,
     {
-        KVIter(self)
+        if self.get().is_none() {
+            self.advance();
+        }
+        KVIter(self, true)
     }
 
     fn filter<F>(self, filter: F) -> filter::Filter<Self, F>
@@ -36,7 +42,7 @@ pub trait Cursor {
     }
 }
 
-pub struct KVIter<T>(pub T);
+pub struct KVIter<T>(T, bool);
 
 impl<C> Iterator for KVIter<C>
 where
@@ -45,7 +51,11 @@ where
 {
     type Item = (Vec<u8>, C::Value);
     fn next(&mut self) -> Option<Self::Item> {
-        self.0.advance();
+        let first = self.1;
+        if !first {
+            self.0.advance();
+        }
+        self.1 = false;
         self.0.get().map(|(k, v)| (Vec::from(k), v.clone()))
     }
 }
@@ -65,3 +75,19 @@ impl<C: Cursor> Cursor for KVIter<C> {
         self.0.seek(key);
     }
 }
+
+impl<C: Cursor> Cursor for Box<C> {
+    type Value = C::Value;
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        self.as_ref().get()
+    }
+
+    fn advance(&mut self) {
+        self.as_mut().advance();
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.as_mut().seek(key);
+    }
+}
diff --git a/src/cursor/range.rs b/src/cursor/range.rs
new file mode 100644 (file)
index 0000000..ecb8273
--- /dev/null
@@ -0,0 +1,63 @@
+use super::Cursor;
+
+pub struct RangeCursor<C: Cursor> {
+    cursor: C,
+    begin: Vec<u8>,
+    end: Vec<u8>,
+}
+
+impl<C: Cursor> RangeCursor<C> {
+    pub fn new(cursor: C, begin: &[u8], end: &[u8]) -> Self {
+        let begin = Vec::from(begin.as_ref());
+        let end = Vec::from(end);
+        Self { cursor, begin, end }
+    }
+}
+
+impl<C: Cursor> Cursor for RangeCursor<C> {
+    type Value = C::Value;
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        self.cursor
+            .get()
+            .filter(|(k, _)| *k >= self.begin.as_slice() && *k <= self.end.as_slice())
+    }
+
+    fn advance(&mut self) {
+        self.cursor.advance();
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.cursor.seek(key);
+    }
+}
+
+pub struct PrefixCursor<C: Cursor> {
+    cursor: C,
+    prefix: Vec<u8>,
+}
+
+impl<C: Cursor> PrefixCursor<C> {
+    pub fn new(cursor: C, prefix: &[u8]) -> Self {
+        let prefix = Vec::from(prefix);
+        Self { cursor, prefix }
+    }
+}
+
+impl<C: Cursor> Cursor for PrefixCursor<C> {
+    type Value = C::Value;
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        self.cursor
+            .get()
+            .filter(|(k, _)| k.starts_with(self.prefix.as_slice()))
+    }
+
+    fn advance(&mut self) {
+        self.cursor.advance();
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.cursor.seek(key);
+    }
+}