]> git.mikk.net Git - mtbl-rs/commitdiff
Add "Cursor" abstraction for lending iterator.
authorChris Mikkelson <chris@mikk.net>
Thu, 1 May 2025 22:44:51 +0000 (17:44 -0500)
committerChris Mikkelson <chris@mikk.net>
Thu, 1 May 2025 22:44:51 +0000 (17:44 -0500)
Uses immutable get() method to return borrowed key, value pair, mostly
to avoid copying keys to provide sorting in merger.

src/cursor/filter.rs [new file with mode: 0644]
src/cursor/filtermap.rs [new file with mode: 0644]
src/cursor/map.rs [new file with mode: 0644]
src/cursor/mod.rs [new file with mode: 0644]
src/lib.rs

diff --git a/src/cursor/filter.rs b/src/cursor/filter.rs
new file mode 100644 (file)
index 0000000..19c9632
--- /dev/null
@@ -0,0 +1,117 @@
+use crate::cursor::Cursor;
+
+pub struct Filter<C, F>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
+{
+    cursor: C,
+    filter: F,
+    seekto: Vec<u8>,
+}
+
+impl<C, F> Filter<C, F>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
+{
+    fn find_match(&mut self) {
+        while let Some((k, v)) = self.cursor.get() {
+            self.seekto.clear();
+            if (self.filter)((k, v), &mut self.seekto) {
+                break;
+            }
+            if self.seekto.is_empty() {
+                self.cursor.advance();
+            } else {
+                self.cursor.seek(self.seekto.as_slice());
+            }
+        }
+    }
+
+    pub fn new(cursor: C, filter: F) -> Self {
+        Self {
+            cursor,
+            filter,
+            seekto: Vec::new(),
+        }
+    }
+}
+
+impl<C, F> Cursor for Filter<C, F>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
+{
+    type Value = C::Value;
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        self.cursor.get()
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.cursor.seek(key);
+        self.find_match();
+    }
+
+    fn advance(&mut self) {
+        self.cursor.advance();
+        self.find_match();
+    }
+}
+
+// TODO: Map, FilterMap and associated methods on Cursor.
+// Implement timeout semantics in FilterMap, mapping value to
+// Result<V,E> where E can indicate timeout. Requires rest of
+// stack to handle the result, but equivalent to dnstable_res_timeout.
+
+#[cfg(test)]
+mod test {
+    use super::{Cursor, Filter};
+
+    struct TestSource<T>(Vec<(Vec<u8>, T)>, Option<usize>);
+
+    impl TestSource<u64> {
+        fn from(i: impl IntoIterator<Item = u64>) -> Self {
+            Self(
+                i.into_iter()
+                    .map(|idx| (Vec::from(idx.to_be_bytes()), idx))
+                    .collect(),
+                None,
+            )
+        }
+    }
+
+    impl<T: Ord> Cursor for TestSource<T> {
+        type Value = T;
+        fn get(&self) -> Option<(&[u8], &T)> {
+            self.1
+                .filter(|i| *i < self.0.len())
+                .map(|i| (self.0[i].0.as_slice(), &self.0[i].1))
+        }
+        fn advance(&mut self) {
+            match self.1 {
+                Some(offset) => self.1.replace(offset + 1),
+                None => self.1.replace(0),
+            };
+        }
+        fn seek(&mut self, key: &[u8]) {
+            match self.0.binary_search_by_key(&key, |(k, _)| k) {
+                Ok(idx) => self.1.replace(idx),
+                Err(idx) => self.1.replace(idx),
+            };
+        }
+    }
+
+    #[test]
+    fn test_filter() {
+        let range = 1u64..64;
+        let ts = TestSource::<u64>::from(range.clone());
+        let filter = Filter::new(ts, |(_, v), _| v % 3 == 0);
+        let check = range.into_iter().filter(|i| i % 3 == 0).collect::<Vec<_>>();
+        assert_eq!(
+            filter.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
+            check
+        )
+    }
+}
diff --git a/src/cursor/filtermap.rs b/src/cursor/filtermap.rs
new file mode 100644 (file)
index 0000000..a08f721
--- /dev/null
@@ -0,0 +1,127 @@
+use super::Cursor;
+
+pub struct FilterMap<C, F, O>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+    cursor: C,
+    filtermap: F,
+    value: Option<O>,
+    seekto: Vec<u8>,
+}
+
+impl<C, F, O> FilterMap<C, F, O>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+    fn find_match(&mut self) {
+        while let Some((k, v)) = self.cursor.get() {
+            self.seekto.clear();
+            match (self.filtermap)((k, v), &mut self.seekto) {
+                Some(val) => {
+                    self.value.replace(val);
+                    break;
+                }
+                None => {
+                    if self.seekto.is_empty() {
+                        self.cursor.advance();
+                    } else {
+                        self.cursor.seek(self.seekto.as_slice());
+                    }
+                }
+            }
+        }
+    }
+
+    pub fn new(cursor: C, filtermap: F) -> Self {
+        Self {
+            cursor,
+            filtermap,
+            value: None,
+            seekto: Vec::new(),
+        }
+    }
+}
+
+impl<C, F, O> Cursor for FilterMap<C, F, O>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+    type Value = O;
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        let val = self.value.as_ref()?;
+        self.cursor.get().map(|(k, _)| (k, val))
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.cursor.seek(key);
+        self.find_match();
+    }
+
+    fn advance(&mut self) {
+        self.cursor.advance();
+        self.find_match();
+    }
+}
+
+// TODO: Map, FilterMap and associated methods on Cursor.
+// Implement timeout semantics in FilterMap, mapping value to
+// Result<V,E> where E can indicate timeout. Requires rest of
+// stack to handle the result, but equivalent to dnstable_res_timeout.
+
+#[cfg(test)]
+mod test {
+    use super::Cursor;
+    use super::FilterMap;
+
+    struct TestSource<T>(Vec<(Vec<u8>, T)>, Option<usize>);
+
+    impl TestSource<u64> {
+        fn from(i: impl IntoIterator<Item = u64>) -> Self {
+            Self(
+                i.into_iter()
+                    .map(|idx| (Vec::from(idx.to_be_bytes()), idx))
+                    .collect(),
+                None,
+            )
+        }
+    }
+
+    impl<T: Ord> Cursor for TestSource<T> {
+        type Value = T;
+        fn get(&self) -> Option<(&[u8], &T)> {
+            self.1
+                .filter(|i| *i < self.0.len())
+                .map(|i| (self.0[i].0.as_slice(), &self.0[i].1))
+        }
+        fn advance(&mut self) {
+            match self.1 {
+                Some(offset) => self.1.replace(offset + 1),
+                None => self.1.replace(0),
+            };
+        }
+        fn seek(&mut self, key: &[u8]) {
+            match self.0.binary_search_by_key(&key, |(k, _)| k) {
+                Ok(idx) => self.1.replace(idx),
+                Err(idx) => self.1.replace(idx),
+            };
+        }
+    }
+
+    #[test]
+    fn test_filter() {
+        let range = 1u64..64;
+        let fmap = |i| if i % 3 == 0 { Some(i as u16) } else { None };
+        let ts = TestSource::<u64>::from(range.clone());
+        let filter = FilterMap::new(ts, |(_, i), _| fmap(*i));
+        let check = range.into_iter().filter_map(fmap).collect::<Vec<_>>();
+        assert_eq!(
+            filter.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
+            check
+        )
+    }
+}
diff --git a/src/cursor/map.rs b/src/cursor/map.rs
new file mode 100644 (file)
index 0000000..fada0b8
--- /dev/null
@@ -0,0 +1,56 @@
+use crate::cursor::{Cursor, KVIter};
+
+pub struct Map<C, F, O>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value)) -> O,
+{
+    cursor: C,
+    map: F,
+    value: Option<O>,
+}
+
+impl<C, F, O> Cursor for Map<C, F, O>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value)) -> O,
+{
+    type Value = O;
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        let val = self.value.as_ref()?;
+        self.cursor.get().map(|(k, _)| (k, val))
+    }
+
+    fn advance(&mut self) {
+        self.cursor.advance();
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.cursor.seek(key);
+    }
+}
+
+impl<'a, C, F, O> Iterator for &'a Map<C, F, O>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value)) -> O,
+{
+    type Item = (&'a [u8], &'a O);
+    fn next(&mut self) -> Option<Self::Item> {
+        self.get()
+    }
+}
+
+impl<C, F, O> IntoIterator for Map<C, F, O>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value)) -> O,
+    O: Clone,
+{
+    type Item = (Vec<u8>, O);
+    type IntoIter = KVIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        KVIter(self)
+    }
+}
diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs
new file mode 100644 (file)
index 0000000..94b0fbe
--- /dev/null
@@ -0,0 +1,67 @@
+pub mod filter;
+pub mod filtermap;
+pub mod map;
+
+pub trait Cursor {
+    type Value;
+
+    // required methods
+    fn get(&self) -> Option<(&[u8], &Self::Value)>;
+    fn advance(&mut self);
+    fn seek(&mut self, key: &[u8]);
+
+    // provided methods
+    fn into_iter(self) -> KVIter<Self>
+    where
+        Self: Sized,
+        Self::Value: Clone,
+    {
+        KVIter(self)
+    }
+
+    fn filter<F>(self, filter: F) -> filter::Filter<Self, F>
+    where
+        Self: Sized,
+        F: FnMut((&[u8], &Self::Value), &mut Vec<u8>) -> bool,
+    {
+        filter::Filter::new(self, filter)
+    }
+
+    fn filtermap<F, O>(self, filtermap: F) -> filtermap::FilterMap<Self, F, O>
+    where
+        Self: Sized,
+        F: FnMut((&[u8], &Self::Value), &mut Vec<u8>) -> Option<O>,
+    {
+        filtermap::FilterMap::new(self, filtermap)
+    }
+}
+
+pub struct KVIter<T>(pub T);
+
+impl<C> Iterator for KVIter<C>
+where
+    C: Cursor,
+    C::Value: Clone,
+{
+    type Item = (Vec<u8>, C::Value);
+    fn next(&mut self) -> Option<Self::Item> {
+        self.0.advance();
+        self.0.get().map(|(k, v)| (Vec::from(k), v.clone()))
+    }
+}
+
+impl<C: Cursor> Cursor for KVIter<C> {
+    type Value = C::Value;
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        self.0.get()
+    }
+
+    fn advance(&mut self) {
+        self.0.advance();
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        self.0.seek(key);
+    }
+}
index 07b87dc57f439b0f0c675ded0a8b6486a380383d..b093d52335c401e59da820383613bd654e3a7dc3 100644 (file)
@@ -1,4 +1,5 @@
 mod compression;
+mod cursor;
 mod entry;
 mod iter;
 mod merger;