--- /dev/null
+use crate::cursor::Cursor;
+
+pub struct Filter<C, F>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
+{
+ cursor: C,
+ filter: F,
+ seekto: Vec<u8>,
+}
+
+impl<C, F> Filter<C, F>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
+{
+ fn find_match(&mut self) {
+ while let Some((k, v)) = self.cursor.get() {
+ self.seekto.clear();
+ if (self.filter)((k, v), &mut self.seekto) {
+ break;
+ }
+ if self.seekto.is_empty() {
+ self.cursor.advance();
+ } else {
+ self.cursor.seek(self.seekto.as_slice());
+ }
+ }
+ }
+
+ pub fn new(cursor: C, filter: F) -> Self {
+ Self {
+ cursor,
+ filter,
+ seekto: Vec::new(),
+ }
+ }
+}
+
+impl<C, F> Cursor for Filter<C, F>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
+{
+ type Value = C::Value;
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ self.cursor.get()
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ self.cursor.seek(key);
+ self.find_match();
+ }
+
+ fn advance(&mut self) {
+ self.cursor.advance();
+ self.find_match();
+ }
+}
+
+// TODO: Map, FilterMap and associated methods on Cursor.
+// Implement timeout semantics in FilterMap, mapping value to
+// Result<V,E> where E can indicate timeout. Requires rest of
+// stack to handle the result, but equivalent to dnstable_res_timeout.
+
+#[cfg(test)]
+mod test {
+ use super::{Cursor, Filter};
+
+ struct TestSource<T>(Vec<(Vec<u8>, T)>, Option<usize>);
+
+ impl TestSource<u64> {
+ fn from(i: impl IntoIterator<Item = u64>) -> Self {
+ Self(
+ i.into_iter()
+ .map(|idx| (Vec::from(idx.to_be_bytes()), idx))
+ .collect(),
+ None,
+ )
+ }
+ }
+
+ impl<T: Ord> Cursor for TestSource<T> {
+ type Value = T;
+ fn get(&self) -> Option<(&[u8], &T)> {
+ self.1
+ .filter(|i| *i < self.0.len())
+ .map(|i| (self.0[i].0.as_slice(), &self.0[i].1))
+ }
+ fn advance(&mut self) {
+ match self.1 {
+ Some(offset) => self.1.replace(offset + 1),
+ None => self.1.replace(0),
+ };
+ }
+ fn seek(&mut self, key: &[u8]) {
+ match self.0.binary_search_by_key(&key, |(k, _)| k) {
+ Ok(idx) => self.1.replace(idx),
+ Err(idx) => self.1.replace(idx),
+ };
+ }
+ }
+
+ #[test]
+ fn test_filter() {
+ let range = 1u64..64;
+ let ts = TestSource::<u64>::from(range.clone());
+ let filter = Filter::new(ts, |(_, v), _| v % 3 == 0);
+ let check = range.into_iter().filter(|i| i % 3 == 0).collect::<Vec<_>>();
+ assert_eq!(
+ filter.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
+ check
+ )
+ }
+}
--- /dev/null
+use super::Cursor;
+
+pub struct FilterMap<C, F, O>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+ cursor: C,
+ filtermap: F,
+ value: Option<O>,
+ seekto: Vec<u8>,
+}
+
+impl<C, F, O> FilterMap<C, F, O>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+ fn find_match(&mut self) {
+ while let Some((k, v)) = self.cursor.get() {
+ self.seekto.clear();
+ match (self.filtermap)((k, v), &mut self.seekto) {
+ Some(val) => {
+ self.value.replace(val);
+ break;
+ }
+ None => {
+ if self.seekto.is_empty() {
+ self.cursor.advance();
+ } else {
+ self.cursor.seek(self.seekto.as_slice());
+ }
+ }
+ }
+ }
+ }
+
+ pub fn new(cursor: C, filtermap: F) -> Self {
+ Self {
+ cursor,
+ filtermap,
+ value: None,
+ seekto: Vec::new(),
+ }
+ }
+}
+
+impl<C, F, O> Cursor for FilterMap<C, F, O>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+ type Value = O;
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ let val = self.value.as_ref()?;
+ self.cursor.get().map(|(k, _)| (k, val))
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ self.cursor.seek(key);
+ self.find_match();
+ }
+
+ fn advance(&mut self) {
+ self.cursor.advance();
+ self.find_match();
+ }
+}
+
+// TODO: Map, FilterMap and associated methods on Cursor.
+// Implement timeout semantics in FilterMap, mapping value to
+// Result<V,E> where E can indicate timeout. Requires rest of
+// stack to handle the result, but equivalent to dnstable_res_timeout.
+
+#[cfg(test)]
+mod test {
+ use super::Cursor;
+ use super::FilterMap;
+
+ struct TestSource<T>(Vec<(Vec<u8>, T)>, Option<usize>);
+
+ impl TestSource<u64> {
+ fn from(i: impl IntoIterator<Item = u64>) -> Self {
+ Self(
+ i.into_iter()
+ .map(|idx| (Vec::from(idx.to_be_bytes()), idx))
+ .collect(),
+ None,
+ )
+ }
+ }
+
+ impl<T: Ord> Cursor for TestSource<T> {
+ type Value = T;
+ fn get(&self) -> Option<(&[u8], &T)> {
+ self.1
+ .filter(|i| *i < self.0.len())
+ .map(|i| (self.0[i].0.as_slice(), &self.0[i].1))
+ }
+ fn advance(&mut self) {
+ match self.1 {
+ Some(offset) => self.1.replace(offset + 1),
+ None => self.1.replace(0),
+ };
+ }
+ fn seek(&mut self, key: &[u8]) {
+ match self.0.binary_search_by_key(&key, |(k, _)| k) {
+ Ok(idx) => self.1.replace(idx),
+ Err(idx) => self.1.replace(idx),
+ };
+ }
+ }
+
+ #[test]
+ fn test_filter() {
+ let range = 1u64..64;
+ let fmap = |i| if i % 3 == 0 { Some(i as u16) } else { None };
+ let ts = TestSource::<u64>::from(range.clone());
+ let filter = FilterMap::new(ts, |(_, i), _| fmap(*i));
+ let check = range.into_iter().filter_map(fmap).collect::<Vec<_>>();
+ assert_eq!(
+ filter.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
+ check
+ )
+ }
+}
--- /dev/null
+use crate::cursor::{Cursor, KVIter};
+
+pub struct Map<C, F, O>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value)) -> O,
+{
+ cursor: C,
+ map: F,
+ value: Option<O>,
+}
+
+impl<C, F, O> Cursor for Map<C, F, O>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value)) -> O,
+{
+ type Value = O;
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ let val = self.value.as_ref()?;
+ self.cursor.get().map(|(k, _)| (k, val))
+ }
+
+ fn advance(&mut self) {
+ self.cursor.advance();
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ self.cursor.seek(key);
+ }
+}
+
+impl<'a, C, F, O> Iterator for &'a Map<C, F, O>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value)) -> O,
+{
+ type Item = (&'a [u8], &'a O);
+ fn next(&mut self) -> Option<Self::Item> {
+ self.get()
+ }
+}
+
+impl<C, F, O> IntoIterator for Map<C, F, O>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value)) -> O,
+ O: Clone,
+{
+ type Item = (Vec<u8>, O);
+ type IntoIter = KVIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ KVIter(self)
+ }
+}
--- /dev/null
+pub mod filter;
+pub mod filtermap;
+pub mod map;
+
+pub trait Cursor {
+ type Value;
+
+ // required methods
+ fn get(&self) -> Option<(&[u8], &Self::Value)>;
+ fn advance(&mut self);
+ fn seek(&mut self, key: &[u8]);
+
+ // provided methods
+ fn into_iter(self) -> KVIter<Self>
+ where
+ Self: Sized,
+ Self::Value: Clone,
+ {
+ KVIter(self)
+ }
+
+ fn filter<F>(self, filter: F) -> filter::Filter<Self, F>
+ where
+ Self: Sized,
+ F: FnMut((&[u8], &Self::Value), &mut Vec<u8>) -> bool,
+ {
+ filter::Filter::new(self, filter)
+ }
+
+ fn filtermap<F, O>(self, filtermap: F) -> filtermap::FilterMap<Self, F, O>
+ where
+ Self: Sized,
+ F: FnMut((&[u8], &Self::Value), &mut Vec<u8>) -> Option<O>,
+ {
+ filtermap::FilterMap::new(self, filtermap)
+ }
+}
+
+pub struct KVIter<T>(pub T);
+
+impl<C> Iterator for KVIter<C>
+where
+ C: Cursor,
+ C::Value: Clone,
+{
+ type Item = (Vec<u8>, C::Value);
+ fn next(&mut self) -> Option<Self::Item> {
+ self.0.advance();
+ self.0.get().map(|(k, v)| (Vec::from(k), v.clone()))
+ }
+}
+
+impl<C: Cursor> Cursor for KVIter<C> {
+ type Value = C::Value;
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ self.0.get()
+ }
+
+ fn advance(&mut self) {
+ self.0.advance();
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ self.0.seek(key);
+ }
+}
mod compression;
+mod cursor;
mod entry;
mod iter;
mod merger;