From 480455ef8b6154dc6fdd9e8da73c90f1091a53f2 Mon Sep 17 00:00:00 2001 From: Chris Mikkelson Date: Thu, 1 May 2025 17:44:51 -0500 Subject: [PATCH] Add "Cursor" abstraction for lending iterator. Uses immutable get() method to return borrowed key, value pair, mostly to avoid copying keys to provide sorting in merger. --- src/cursor/filter.rs | 117 ++++++++++++++++++++++++++++++++++++ src/cursor/filtermap.rs | 127 ++++++++++++++++++++++++++++++++++++++++ src/cursor/map.rs | 56 ++++++++++++++++++ src/cursor/mod.rs | 67 +++++++++++++++++++++ src/lib.rs | 1 + 5 files changed, 368 insertions(+) create mode 100644 src/cursor/filter.rs create mode 100644 src/cursor/filtermap.rs create mode 100644 src/cursor/map.rs create mode 100644 src/cursor/mod.rs diff --git a/src/cursor/filter.rs b/src/cursor/filter.rs new file mode 100644 index 0000000..19c9632 --- /dev/null +++ b/src/cursor/filter.rs @@ -0,0 +1,117 @@ +use crate::cursor::Cursor; + +pub struct Filter +where + C: Cursor, + F: FnMut((&[u8], &C::Value), &mut Vec) -> bool, +{ + cursor: C, + filter: F, + seekto: Vec, +} + +impl Filter +where + C: Cursor, + F: FnMut((&[u8], &C::Value), &mut Vec) -> bool, +{ + fn find_match(&mut self) { + while let Some((k, v)) = self.cursor.get() { + self.seekto.clear(); + if (self.filter)((k, v), &mut self.seekto) { + break; + } + if self.seekto.is_empty() { + self.cursor.advance(); + } else { + self.cursor.seek(self.seekto.as_slice()); + } + } + } + + pub fn new(cursor: C, filter: F) -> Self { + Self { + cursor, + filter, + seekto: Vec::new(), + } + } +} + +impl Cursor for Filter +where + C: Cursor, + F: FnMut((&[u8], &C::Value), &mut Vec) -> bool, +{ + type Value = C::Value; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + self.cursor.get() + } + + fn seek(&mut self, key: &[u8]) { + self.cursor.seek(key); + self.find_match(); + } + + fn advance(&mut self) { + self.cursor.advance(); + self.find_match(); + } +} + +// TODO: Map, FilterMap and associated methods on Cursor. +// Implement timeout semantics in FilterMap, mapping value to +// Result where E can indicate timeout. Requires rest of +// stack to handle the result, but equivalent to dnstable_res_timeout. + +#[cfg(test)] +mod test { + use super::{Cursor, Filter}; + + struct TestSource(Vec<(Vec, T)>, Option); + + impl TestSource { + fn from(i: impl IntoIterator) -> Self { + Self( + i.into_iter() + .map(|idx| (Vec::from(idx.to_be_bytes()), idx)) + .collect(), + None, + ) + } + } + + impl Cursor for TestSource { + type Value = T; + fn get(&self) -> Option<(&[u8], &T)> { + self.1 + .filter(|i| *i < self.0.len()) + .map(|i| (self.0[i].0.as_slice(), &self.0[i].1)) + } + fn advance(&mut self) { + match self.1 { + Some(offset) => self.1.replace(offset + 1), + None => self.1.replace(0), + }; + } + fn seek(&mut self, key: &[u8]) { + match self.0.binary_search_by_key(&key, |(k, _)| k) { + Ok(idx) => self.1.replace(idx), + Err(idx) => self.1.replace(idx), + }; + } + } + + #[test] + fn test_filter() { + let range = 1u64..64; + let ts = TestSource::::from(range.clone()); + let filter = Filter::new(ts, |(_, v), _| v % 3 == 0); + let check = range.into_iter().filter(|i| i % 3 == 0).collect::>(); + assert_eq!( + filter.into_iter().map(|(_, v)| v).collect::>(), + check + ) + } +} diff --git a/src/cursor/filtermap.rs b/src/cursor/filtermap.rs new file mode 100644 index 0000000..a08f721 --- /dev/null +++ b/src/cursor/filtermap.rs @@ -0,0 +1,127 @@ +use super::Cursor; + +pub struct FilterMap +where + C: Cursor, + F: FnMut((&[u8], &C::Value), &mut Vec) -> Option, +{ + cursor: C, + filtermap: F, + value: Option, + seekto: Vec, +} + +impl FilterMap +where + C: Cursor, + F: FnMut((&[u8], &C::Value), &mut Vec) -> Option, +{ + fn find_match(&mut self) { + while let Some((k, v)) = self.cursor.get() { + self.seekto.clear(); + match (self.filtermap)((k, v), &mut self.seekto) { + Some(val) => { + self.value.replace(val); + break; + } + None => { + if self.seekto.is_empty() { + self.cursor.advance(); + } else { + self.cursor.seek(self.seekto.as_slice()); + } + } + } + } + } + + pub fn new(cursor: C, filtermap: F) -> Self { + Self { + cursor, + filtermap, + value: None, + seekto: Vec::new(), + } + } +} + +impl Cursor for FilterMap +where + C: Cursor, + F: FnMut((&[u8], &C::Value), &mut Vec) -> Option, +{ + type Value = O; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + let val = self.value.as_ref()?; + self.cursor.get().map(|(k, _)| (k, val)) + } + + fn seek(&mut self, key: &[u8]) { + self.cursor.seek(key); + self.find_match(); + } + + fn advance(&mut self) { + self.cursor.advance(); + self.find_match(); + } +} + +// TODO: Map, FilterMap and associated methods on Cursor. +// Implement timeout semantics in FilterMap, mapping value to +// Result where E can indicate timeout. Requires rest of +// stack to handle the result, but equivalent to dnstable_res_timeout. + +#[cfg(test)] +mod test { + use super::Cursor; + use super::FilterMap; + + struct TestSource(Vec<(Vec, T)>, Option); + + impl TestSource { + fn from(i: impl IntoIterator) -> Self { + Self( + i.into_iter() + .map(|idx| (Vec::from(idx.to_be_bytes()), idx)) + .collect(), + None, + ) + } + } + + impl Cursor for TestSource { + type Value = T; + fn get(&self) -> Option<(&[u8], &T)> { + self.1 + .filter(|i| *i < self.0.len()) + .map(|i| (self.0[i].0.as_slice(), &self.0[i].1)) + } + fn advance(&mut self) { + match self.1 { + Some(offset) => self.1.replace(offset + 1), + None => self.1.replace(0), + }; + } + fn seek(&mut self, key: &[u8]) { + match self.0.binary_search_by_key(&key, |(k, _)| k) { + Ok(idx) => self.1.replace(idx), + Err(idx) => self.1.replace(idx), + }; + } + } + + #[test] + fn test_filter() { + let range = 1u64..64; + let fmap = |i| if i % 3 == 0 { Some(i as u16) } else { None }; + let ts = TestSource::::from(range.clone()); + let filter = FilterMap::new(ts, |(_, i), _| fmap(*i)); + let check = range.into_iter().filter_map(fmap).collect::>(); + assert_eq!( + filter.into_iter().map(|(_, v)| v).collect::>(), + check + ) + } +} diff --git a/src/cursor/map.rs b/src/cursor/map.rs new file mode 100644 index 0000000..fada0b8 --- /dev/null +++ b/src/cursor/map.rs @@ -0,0 +1,56 @@ +use crate::cursor::{Cursor, KVIter}; + +pub struct Map +where + C: Cursor, + F: FnMut((&[u8], &C::Value)) -> O, +{ + cursor: C, + map: F, + value: Option, +} + +impl Cursor for Map +where + C: Cursor, + F: FnMut((&[u8], &C::Value)) -> O, +{ + type Value = O; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + let val = self.value.as_ref()?; + self.cursor.get().map(|(k, _)| (k, val)) + } + + fn advance(&mut self) { + self.cursor.advance(); + } + + fn seek(&mut self, key: &[u8]) { + self.cursor.seek(key); + } +} + +impl<'a, C, F, O> Iterator for &'a Map +where + C: Cursor, + F: FnMut((&[u8], &C::Value)) -> O, +{ + type Item = (&'a [u8], &'a O); + fn next(&mut self) -> Option { + self.get() + } +} + +impl IntoIterator for Map +where + C: Cursor, + F: FnMut((&[u8], &C::Value)) -> O, + O: Clone, +{ + type Item = (Vec, O); + type IntoIter = KVIter; + fn into_iter(self) -> Self::IntoIter { + KVIter(self) + } +} diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs new file mode 100644 index 0000000..94b0fbe --- /dev/null +++ b/src/cursor/mod.rs @@ -0,0 +1,67 @@ +pub mod filter; +pub mod filtermap; +pub mod map; + +pub trait Cursor { + type Value; + + // required methods + fn get(&self) -> Option<(&[u8], &Self::Value)>; + fn advance(&mut self); + fn seek(&mut self, key: &[u8]); + + // provided methods + fn into_iter(self) -> KVIter + where + Self: Sized, + Self::Value: Clone, + { + KVIter(self) + } + + fn filter(self, filter: F) -> filter::Filter + where + Self: Sized, + F: FnMut((&[u8], &Self::Value), &mut Vec) -> bool, + { + filter::Filter::new(self, filter) + } + + fn filtermap(self, filtermap: F) -> filtermap::FilterMap + where + Self: Sized, + F: FnMut((&[u8], &Self::Value), &mut Vec) -> Option, + { + filtermap::FilterMap::new(self, filtermap) + } +} + +pub struct KVIter(pub T); + +impl Iterator for KVIter +where + C: Cursor, + C::Value: Clone, +{ + type Item = (Vec, C::Value); + fn next(&mut self) -> Option { + self.0.advance(); + self.0.get().map(|(k, v)| (Vec::from(k), v.clone())) + } +} + +impl Cursor for KVIter { + type Value = C::Value; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + self.0.get() + } + + fn advance(&mut self) { + self.0.advance(); + } + + fn seek(&mut self, key: &[u8]) { + self.0.seek(key); + } +} diff --git a/src/lib.rs b/src/lib.rs index 07b87dc..b093d52 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ mod compression; +mod cursor; mod entry; mod iter; mod merger; -- 2.50.1