-use crate::cursor::Cursor;
+use super::{Cursor, KVIter};
pub struct Filter<C, F>
where
}
}
+impl<C, F> IntoIterator for Filter<C, F>
+where
+ C: Cursor,
+ C::Value: Clone,
+ F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
+{
+ type Item = (Vec<u8>, C::Value);
+ type IntoIter = KVIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ Cursor::to_iter(self)
+ }
+}
+
// TODO: Map, FilterMap and associated methods on Cursor.
// Implement timeout semantics in FilterMap, mapping value to
// Result<V,E> where E can indicate timeout. Requires rest of
}
}
-// TODO: Map, FilterMap and associated methods on Cursor.
-// Implement timeout semantics in FilterMap, mapping value to
-// Result<V,E> where E can indicate timeout. Requires rest of
-// stack to handle the result, but equivalent to dnstable_res_timeout.
+impl<C, F, O> IntoIterator for FilterMap<C, F, O>
+where
+ C: Cursor,
+ O: Clone,
+ F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+ type Item = (Vec<u8>, O);
+ type IntoIter = FilterMapIter<C, F, O>;
+
+ fn into_iter(mut self) -> Self::IntoIter {
+ if self.get().is_none() {
+ self.advance();
+ }
+ FilterMapIter {
+ cursor: self,
+ first: true,
+ }
+ }
+}
+
+struct FilterMapIter<C, F, O>
+where
+ C: Cursor,
+ O: Clone,
+ F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+ cursor: FilterMap<C, F, O>,
+ first: bool,
+}
+
+impl<C, F, O> Iterator for FilterMapIter<C, F, O>
+where
+ C: Cursor,
+ O: Clone,
+ F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
+{
+ type Item = (Vec<u8>, O);
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if !self.first {
+ self.cursor.advance();
+ }
+ self.first = false;
+ let (k, _) = self.cursor.get()?;
+ self.cursor.value.take().map(|v| (Vec::from(k), v))
+ }
+}
#[cfg(test)]
mod test {
-use crate::cursor::{Cursor, KVIter};
+use super::Cursor;
pub struct Map<C, F, O>
where
}
}
-impl<'a, C, F, O> Iterator for &'a Map<C, F, O>
+impl<C, F, O> IntoIterator for Map<C, F, O>
where
C: Cursor,
F: FnMut((&[u8], &C::Value)) -> O,
{
- type Item = (&'a [u8], &'a O);
- fn next(&mut self) -> Option<Self::Item> {
- self.get()
+ type Item = (Vec<u8>, O);
+ type IntoIter = MapIter<C, F, O>;
+ fn into_iter(mut self) -> Self::IntoIter {
+ if self.get().is_none() {
+ self.advance();
+ }
+ MapIter {
+ cursor: self,
+ first: true,
+ }
}
}
-impl<C, F, O> IntoIterator for Map<C, F, O>
+struct MapIter<C, F, O>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value)) -> O,
+{
+ cursor: Map<C, F, O>,
+ first: bool,
+}
+
+impl<C, F, O> Iterator for MapIter<C, F, O>
where
C: Cursor,
F: FnMut((&[u8], &C::Value)) -> O,
- O: Clone,
{
type Item = (Vec<u8>, O);
- type IntoIter = KVIter<Self>;
- fn into_iter(self) -> Self::IntoIter {
- KVIter(self)
+ fn next(&mut self) -> Option<Self::Item> {
+ if !self.first {
+ self.cursor.advance();
+ }
+ self.first = false;
+ let (k, _) = self.cursor.get()?;
+ self.cursor.value.take().map(|v| (Vec::from(k), v))
}
}
--- /dev/null
+use super::Cursor;
+use std::borrow::Borrow;
+
+struct Merge<C, F>
+where
+ C: Cursor,
+ C::Value: ToOwned,
+ F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+ cursor: C,
+ merge: F,
+ needs_advance: bool,
+ merge_key: Vec<u8>,
+ merge_val: Option<<C::Value as ToOwned>::Owned>,
+}
+
+impl<C, F> Merge<C, F>
+where
+ C: Cursor,
+ C::Value: ToOwned,
+ F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+ pub fn new(cursor: C, merge: F) -> Self {
+ Self {
+ cursor,
+ merge,
+ needs_advance: true,
+ merge_key: Vec::new(),
+ merge_val: None,
+ }
+ }
+
+ fn do_merge(&mut self) {
+ match self.cursor.get() {
+ None => {
+ self.merge_val.take();
+ return;
+ }
+ Some((k, v)) => {
+ k.clone_into(&mut self.merge_key);
+ self.merge_val
+ .as_mut()
+ .map(|mv| v.clone_into(mv))
+ .or_else(|| {
+ self.merge_val.insert(v.to_owned());
+ None
+ });
+ }
+ }
+
+ while let Some((k, v)) = self.cursor.get() {
+ if k != self.merge_key.as_slice() {
+ break;
+ }
+ self.merge_val.as_mut().map(|mv| (self.merge)(mv, v));
+ }
+ }
+}
+
+impl<C, F> Cursor for Merge<C, F>
+where
+ C: Cursor,
+ C::Value: ToOwned,
+ F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+ type Value = C::Value;
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ let key = self.merge_key.as_slice();
+ self.merge_val.as_ref().map(|val| (key, val.borrow()))
+ }
+
+ fn advance(&mut self) {
+ self.cursor.advance();
+ self.do_merge();
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ self.cursor.seek(key);
+ self.do_merge();
+ }
+}
+
+impl<C, F> IntoIterator for Merge<C, F>
+where
+ C: Cursor,
+ C::Value: ToOwned,
+ F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+ type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
+ type IntoIter = MergeIter<C, F>;
+ fn into_iter(mut self) -> Self::IntoIter {
+ if self.get().is_none() {
+ self.advance()
+ }
+ MergeIter {
+ cursor: self,
+ first: true,
+ }
+ }
+}
+
+pub struct MergeIter<C, F>
+where
+ C: Cursor,
+ C::Value: ToOwned,
+ F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+ cursor: Merge<C, F>,
+ first: bool,
+}
+
+impl<C, F> Iterator for MergeIter<C, F>
+where
+ C: Cursor,
+ C::Value: ToOwned,
+ F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+{
+ type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
+ fn next(&mut self) -> Option<Self::Item> {
+ if !self.first {
+ self.cursor.advance();
+ }
+ self.first = false;
+ self.cursor
+ .merge_val
+ .take()
+ .map(|mv| (self.cursor.merge_key.clone(), mv))
+ }
+}
--- /dev/null
+use super::{Cursor, KVIter};
+use std::cmp::Reverse;
+use std::collections::BinaryHeap;
+
+pub struct MergeCursor<C: Cursor + Ord> {
+ pending: Vec<C>,
+ active: BinaryHeap<Reverse<C>>,
+ last: Vec<u8>,
+}
+
+impl<C: Cursor> MergeCursor<KeyOrd<C>> {
+ fn from<I>(iter: I) -> Self
+ where
+ I: IntoIterator<Item = C>,
+ {
+ Self {
+ pending: iter.into_iter().map(|c| KeyOrd(c)).collect::<Vec<_>>(),
+ active: BinaryHeap::new(),
+ last: Vec::new(),
+ }
+ }
+}
+
+impl<C: Cursor> MergeCursor<KeyValOrd<C>>
+where
+ C::Value: Ord,
+{
+ fn from<I>(iter: I) -> Self
+ where
+ I: IntoIterator<Item = C>,
+ {
+ Self {
+ pending: iter.into_iter().map(|c| KeyValOrd(c)).collect::<Vec<_>>(),
+ active: BinaryHeap::new(),
+ last: Vec::new(),
+ }
+ }
+}
+
+impl<C: Cursor + Ord> Cursor for MergeCursor<C> {
+ type Value = C::Value;
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ self.active.peek().map(|Reverse(c)| c.get()).flatten()
+ }
+
+ fn advance(&mut self) {
+ if !self.pending.is_empty() {
+ self.active = BinaryHeap::from_iter(self.pending.drain(..).map(|mut c| {
+ c.advance();
+ Reverse(c)
+ }));
+ self.active
+ .peek()
+ .map(|Reverse(c)| c.get().map(|(k, _)| k.clone_into(&mut self.last)));
+ } else if let Some(mut rcur) = self.active.peek_mut() {
+ rcur.0.get().map(|(k, _)| k.clone_into(&mut self.last));
+ rcur.0.advance();
+ // XXX cannot remove a cursor when `get()` return None after an advance.
+ // Finished cursors will remain at the bottom of the heap.
+ }
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ if !self.pending.is_empty() {
+ self.active = BinaryHeap::from_iter(self.pending.drain(..).map(|mut c| {
+ c.seek(key);
+ Reverse(c)
+ }));
+ } else if key >= self.last.as_slice() {
+ while let Some(mut rcur) = self.active.peek_mut() {
+ let cur = &mut rcur.0;
+ if let Some((k, _)) = cur.get() {
+ if k >= key {
+ break;
+ }
+ cur.seek(key);
+ }
+ }
+ } else {
+ /* backward seek, reset all cursors */
+ self.active = BinaryHeap::from_iter(self.active.drain().map(|Reverse(mut c)| {
+ c.seek(key);
+ Reverse(c)
+ }));
+ }
+ key.clone_into(&mut self.last);
+ }
+}
+
+impl<C> IntoIterator for MergeCursor<C>
+where
+ C: Cursor + Ord,
+ C::Value: Clone,
+{
+ type Item = (Vec<u8>, C::Value);
+ type IntoIter = KVIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ Cursor::to_iter(self)
+ }
+}
+
+fn test_merger_construction<C: Cursor>(i: impl Iterator<Item = C>)
+where
+ C::Value: Ord,
+{
+ let _ = MergeCursor::<KeyOrd<_>>::from(i);
+}
+
+pub struct KeyOrd<C: Cursor>(C);
+pub struct KeyValOrd<C: Cursor>(C);
+
+// Ordering implementation for KeyOrd, KeyValOrd
+impl<C: Cursor> Cursor for KeyOrd<C> {
+ type Value = C::Value;
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ self.0.get()
+ }
+ fn advance(&mut self) {
+ self.0.advance();
+ }
+ fn seek(&mut self, key: &[u8]) {
+ self.0.seek(key);
+ }
+}
+
+impl<C: Cursor> Cursor for KeyValOrd<C>
+where
+ C::Value: Ord,
+{
+ type Value = C::Value;
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ self.0.get()
+ }
+ fn advance(&mut self) {
+ self.0.advance();
+ }
+ fn seek(&mut self, key: &[u8]) {
+ self.0.seek(key);
+ }
+}
+
+impl<C: Cursor> PartialEq for KeyOrd<C> {
+ fn eq(&self, other: &Self) -> bool {
+ let sv = self.0.get().map(|(k, _)| k);
+ let ov = other.0.get().map(|(k, _)| k);
+ sv == ov
+ }
+}
+
+impl<C: Cursor> PartialEq for KeyValOrd<C>
+where
+ C::Value: Ord,
+{
+ fn eq(&self, other: &Self) -> bool {
+ let sv = self.0.get();
+ let ov = other.0.get();
+ sv == ov
+ }
+}
+
+impl<C: Cursor> Eq for KeyOrd<C> {}
+impl<C: Cursor> Eq for KeyValOrd<C> where C::Value: Ord {}
+
+impl<C: Cursor> PartialOrd for KeyOrd<C> {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ let ks = self.0.get().map(|(k, _)| k);
+ let ko = other.0.get().map(|(k, _)| k);
+ Some(ks.cmp(&ko))
+ }
+}
+
+impl<C: Cursor> PartialOrd for KeyValOrd<C>
+where
+ C::Value: Ord,
+{
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ let sp = self.0.get();
+ let op = other.0.get();
+ Some(sp.cmp(&op))
+ }
+}
+
+impl<C: Cursor> Ord for KeyOrd<C> {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ let sk = self.0.get().map(|(k, _)| k);
+ let ok = other.0.get().map(|(k, _)| k);
+ sk.cmp(&ok)
+ }
+}
+
+impl<C: Cursor> Ord for KeyValOrd<C>
+where
+ C::Value: Ord,
+{
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ let sk = self.0.get();
+ let ok = other.0.get();
+ sk.cmp(&ok)
+ }
+}
pub mod filter;
pub mod filtermap;
pub mod map;
+pub mod merge;
+pub mod merger;
+pub mod range;
pub trait Cursor {
type Value;
fn seek(&mut self, key: &[u8]);
// provided methods
- fn into_iter(self) -> KVIter<Self>
+ fn to_iter(mut self) -> KVIter<Self>
where
Self: Sized,
Self::Value: Clone,
{
- KVIter(self)
+ if self.get().is_none() {
+ self.advance();
+ }
+ KVIter(self, true)
}
fn filter<F>(self, filter: F) -> filter::Filter<Self, F>
}
}
-pub struct KVIter<T>(pub T);
+pub struct KVIter<T>(T, bool);
impl<C> Iterator for KVIter<C>
where
{
type Item = (Vec<u8>, C::Value);
fn next(&mut self) -> Option<Self::Item> {
- self.0.advance();
+ let first = self.1;
+ if !first {
+ self.0.advance();
+ }
+ self.1 = false;
self.0.get().map(|(k, v)| (Vec::from(k), v.clone()))
}
}
self.0.seek(key);
}
}
+
+impl<C: Cursor> Cursor for Box<C> {
+ type Value = C::Value;
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ self.as_ref().get()
+ }
+
+ fn advance(&mut self) {
+ self.as_mut().advance();
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ self.as_mut().seek(key);
+ }
+}
--- /dev/null
+use super::Cursor;
+
+pub struct RangeCursor<C: Cursor> {
+ cursor: C,
+ begin: Vec<u8>,
+ end: Vec<u8>,
+}
+
+impl<C: Cursor> RangeCursor<C> {
+ pub fn new(cursor: C, begin: &[u8], end: &[u8]) -> Self {
+ let begin = Vec::from(begin.as_ref());
+ let end = Vec::from(end);
+ Self { cursor, begin, end }
+ }
+}
+
+impl<C: Cursor> Cursor for RangeCursor<C> {
+ type Value = C::Value;
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ self.cursor
+ .get()
+ .filter(|(k, _)| *k >= self.begin.as_slice() && *k <= self.end.as_slice())
+ }
+
+ fn advance(&mut self) {
+ self.cursor.advance();
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ self.cursor.seek(key);
+ }
+}
+
+pub struct PrefixCursor<C: Cursor> {
+ cursor: C,
+ prefix: Vec<u8>,
+}
+
+impl<C: Cursor> PrefixCursor<C> {
+ pub fn new(cursor: C, prefix: &[u8]) -> Self {
+ let prefix = Vec::from(prefix);
+ Self { cursor, prefix }
+ }
+}
+
+impl<C: Cursor> Cursor for PrefixCursor<C> {
+ type Value = C::Value;
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ self.cursor
+ .get()
+ .filter(|(k, _)| k.starts_with(self.prefix.as_slice()))
+ }
+
+ fn advance(&mut self) {
+ self.cursor.advance();
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ self.cursor.seek(key);
+ }
+}