.nth(1)
.expect("Usage: mtbl_dump <filename>");
let reader = mtbl::reader::from_file(fname);
- for e in reader.iter() {
+ for e in reader.iter().into_iter() {
println!("{:?}: {:?}", e.key(), e.value());
}
}
+use crate::iter::merger::Mergeable;
use std::cmp::Ordering;
use std::sync::Arc;
-#[derive(Debug, Clone, Eq)]
+#[derive(Debug, Clone)]
pub struct Entry {
key: Arc<Vec<u8>>,
value: Arc<Vec<u8>>,
}
}
+impl Mergeable for Entry {}
+
impl PartialOrd<[u8]> for Entry {
fn partial_cmp(&self, other: &[u8]) -> Option<Ordering> {
Some(self.key().cmp(other))
}
}
-impl PartialEq<Entry> for Entry {
+impl PartialEq for Entry {
fn eq(&self, other: &Entry) -> bool {
self.key() == other.key()
}
}
+impl Eq for Entry {}
+
+impl PartialOrd for Entry {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.key().cmp(other.key()))
+ }
+}
+
+impl Ord for Entry {
+ fn cmp(&self, other: &Self) -> Ordering {
+ self.key().cmp(other.key())
+ }
+}
+++ /dev/null
-use crate::dupsort::DupsortIter;
-use crate::filter::FilterIter;
-use crate::map::MapIter;
-use crate::merge::MergeIter;
-use crate::Entry;
-use std::cmp::Ordering;
-use std::iter::Iterator;
-
-pub trait SeekableIter: Iterator {
- fn seek(&mut self, key: &[u8]);
-
- fn merge_func<F>(self, merge_func: F) -> MergeIter<Self, F>
- where
- F: Fn(&mut Vec<u8>, &Entry),
- Self: Sized,
- {
- MergeIter::new(self, merge_func)
- }
-
- fn dupsort_func<F>(self, dupsort_func: F) -> DupsortIter<Self, F>
- where
- F: FnMut(&Self::Item, &Self::Item) -> Ordering,
- Self: Sized,
- {
- DupsortIter::new(self, dupsort_func)
- }
-
- fn filter_func<F>(self, filter_func: F) -> FilterIter<Self, F>
- where
- F: FnMut(&Self::Item, &mut Vec<u8>) -> bool,
- Self: Sized,
- {
- FilterIter::new(self, filter_func)
- }
-
- fn map_func<F, O>(self, map_func: F) -> MapIter<Self, F, O>
- where
- F: FnMut(Self::Item) -> O,
- Self: Sized,
- {
- MapIter::new(self, map_func)
- }
-}
-
-impl<I: SeekableIter + ?Sized> SeekableIter for Box<I> {
- fn seek(&mut self, key: &[u8]) {
- self.as_mut().seek(key);
- }
-}
-
-pub struct PrefixIter<I>
-where
- I: SeekableIter,
- I::Item: AsRef<[u8]>,
-{
- iter: I,
- prefix: Vec<u8>,
-}
-
-impl<I> PrefixIter<I>
-where
- I: SeekableIter,
- I::Item: AsRef<[u8]>,
-{
- pub fn new(mut iter: I, prefix: impl AsRef<[u8]>) -> Self {
- iter.seek(prefix.as_ref());
- Self {
- iter,
- prefix: Vec::from(prefix.as_ref()),
- }
- }
-}
-
-impl<I> Iterator for PrefixIter<I>
-where
- I: SeekableIter,
- I::Item: AsRef<[u8]>,
-{
- type Item = I::Item;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.iter
- .next()
- .filter(|e| e.as_ref().starts_with(self.prefix.as_slice()))
- }
-}
-
-impl<I> SeekableIter for PrefixIter<I>
-where
- I: SeekableIter,
- I::Item: AsRef<[u8]>,
-{
- fn seek(&mut self, key: &[u8]) {
- self.iter.seek(key);
- }
-}
-
-pub struct RangeIter<I> {
- iter: I,
- start: Vec<u8>,
- end: Vec<u8>,
-}
-
-impl<I> RangeIter<I>
-where
- I: SeekableIter,
-{
- pub fn new(mut iter: I, start: impl AsRef<[u8]>, end: impl AsRef<[u8]>) -> Self {
- iter.seek(start.as_ref());
- Self {
- iter,
- start: Vec::from(start.as_ref()),
- end: Vec::from(end.as_ref()),
- }
- }
-}
-
-impl<I> Iterator for RangeIter<I>
-where
- I: SeekableIter,
- I::Item: PartialOrd<[u8]>,
-{
- type Item = I::Item;
- fn next(&mut self) -> Option<Self::Item> {
- self.iter.next().filter(|i| i <= self.end.as_slice())
- }
-}
-
-impl<I> SeekableIter for RangeIter<I>
-where
- I: SeekableIter,
- I::Item: PartialOrd<[u8]>,
-{
- fn seek(&mut self, key: &[u8]) {
- if key <= self.start.as_slice() {
- self.iter.seek(self.start.as_slice());
- } else if key > self.end.as_slice() {
- self.iter.seek(self.end.as_slice());
- } else {
- self.iter.seek(key);
- }
- }
-}
--- /dev/null
+use crate::iter::{IntoIter, Iter};
+
+pub struct MergeIter<I, F>
+where
+ I: Iter,
+ I::Item: PartialEq,
+ F: FnMut(&mut I::Item, &I::Item),
+{
+ prev: Option<I::Item>,
+ cur: Option<I::Item>,
+ iter: I,
+ merge_func: F,
+}
+
+impl<I, F> MergeIter<I, F>
+where
+ I: Iter,
+ I::Item: PartialEq,
+ F: FnMut(&mut I::Item, &I::Item),
+{
+ pub fn new(iter: I, merge_func: F) -> Self {
+ Self {
+ prev: None,
+ cur: None,
+ iter,
+ merge_func,
+ }
+ }
+}
+
+impl<I, F> Iter for MergeIter<I, F>
+where
+ I: Iter,
+ I::Item: PartialEq,
+ F: FnMut(&mut I::Item, &I::Item),
+{
+ type Item = I::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let mut cur = self.prev.take().or_else(|| self.iter.next())?;
+ while let Some(e) = self.iter.next() {
+ if cur == e {
+ (self.merge_func)(&mut cur, &e);
+ } else {
+ self.prev.replace(e);
+ break;
+ }
+ }
+ Some(cur)
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ self.prev.take();
+ self.iter.seek(key);
+ }
+}
+
+impl<I, F> IntoIterator for MergeIter<I, F>
+where
+ I: Iter,
+ I::Item: PartialEq,
+ F: FnMut(&mut I::Item, &I::Item),
+{
+ type Item = I::Item;
+ type IntoIter = IntoIter<Self>;
+
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter(self)
+ }
+}
+
+#[cfg(disable)]
+#[test]
+fn test_merge_func() {
+ use crate::source::test_source::TestSource;
+
+ let ts = TestSource(
+ (1u8..8)
+ .flat_map(|n| vec![n; n as usize])
+ .map(|n| Entry::new(vec![n], vec![1]))
+ .collect(),
+ );
+
+ let s = ts.merge_func(|v, e| {
+ v[0] += e.value()[0];
+ });
+
+ assert_eq!(
+ Vec::from_iter(s.iter()),
+ (1u8..8)
+ .map(|n| Entry::new(vec![n], vec![n]))
+ .collect::<Vec<_>>()
+ )
+}
-use crate::{SeekableIter, Source};
+use crate::iter::{IntoIter, Iter};
use std::cmp::Ordering;
-pub struct DupsortSource<S>
-where
- S: Source,
-{
- source: S,
- #[allow(clippy::type_complexity)]
- dupsort_func: Box<dyn Fn(&S::Item, &S::Item) -> Ordering>,
-}
-
-impl<S> DupsortSource<S>
-where
- S: Source,
-{
- pub fn new<F>(source: S, dupsort_func: F) -> Self
- where
- F: Fn(&S::Item, &S::Item) -> Ordering + 'static,
- {
- Self {
- source,
- dupsort_func: Box::new(dupsort_func),
- }
- }
-}
-
-impl<S> Source for DupsortSource<S>
-where
- S: Source,
- S::Item: PartialEq,
-{
- type Item = S::Item;
- fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
- self.source.iter().dupsort_func(&self.dupsort_func)
- }
-}
-
#[derive(Debug)]
pub struct DupsortIter<I, F>
where
- I: SeekableIter,
+ I: Iter,
F: FnMut(&I::Item, &I::Item) -> Ordering,
{
run: Vec<I::Item>,
impl<I, F> DupsortIter<I, F>
where
- I: SeekableIter,
+ I: Iter,
F: FnMut(&I::Item, &I::Item) -> Ordering,
{
pub fn new(iter: I, dupsort_func: F) -> Self {
}
}
-impl<I, F> Iterator for DupsortIter<I, F>
+impl<I, F> IntoIterator for DupsortIter<I, F>
where
- I: SeekableIter,
+ I: Iter,
I::Item: PartialEq,
F: Fn(&I::Item, &I::Item) -> Ordering,
{
type Item = I::Item;
+ type IntoIter = IntoIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter(self)
+ }
+}
+
+impl<I, F> Iter for DupsortIter<I, F>
+where
+ I: Iter,
+ I::Item: PartialEq,
+ F: Fn(&I::Item, &I::Item) -> Ordering,
+{
+ type Item = I::Item;
+
+ fn seek(&mut self, key: &[u8]) {
+ self.run.clear();
+ self.next.take();
+ self.iter.seek(key);
+ }
fn next(&mut self) -> Option<Self::Item> {
self.run.pop().or_else(|| {
self.run
.push(self.next.take().or_else(|| self.iter.next())?);
- // println!("2: {:?} / {:?}", self.next, self.run);
- for e in &mut self.iter {
+ while let Some(e) = self.iter.next() {
if e == self.run[0] {
self.run.push(e);
continue;
}
}
-impl<I, F> SeekableIter for DupsortIter<I, F>
-where
- I: SeekableIter,
- I::Item: PartialEq,
- F: Fn(&I::Item, &I::Item) -> Ordering,
-{
- fn seek(&mut self, key: &[u8]) {
- self.run.clear();
- self.next.take();
- self.iter.seek(key);
- }
-}
-
+#[cfg(disable)]
#[test]
fn test_dupsort() {
use crate::source::test_source::TestSource;
-use crate::{SeekableIter, Source};
+use crate::Iter;
pub struct FilterIter<I, F> {
iter: I,
impl<I, F> FilterIter<I, F>
where
- I: SeekableIter,
+ I: Iter,
F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
{
pub fn new(iter: I, filter: F) -> Self {
}
}
-impl<I, F> Iterator for FilterIter<I, F>
+impl<I, F> Iter for FilterIter<I, F>
where
- I: SeekableIter,
+ I: Iter,
F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
{
type Item = I::Item;
}
}
}
-}
-impl<I, F> SeekableIter for FilterIter<I, F>
-where
- I: SeekableIter,
- F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
-{
fn seek(&mut self, key: &[u8]) {
self.iter.seek(key);
}
}
-pub struct FilterSource<S>
-where
- S: Source,
-{
- source: S,
- #[allow(clippy::type_complexity)]
- filter_func: Box<dyn Fn(&S::Item, &mut Vec<u8>) -> bool>,
-}
-
-impl<S> FilterSource<S>
-where
- S: Source,
-{
- pub fn new<F>(source: S, filter_func: F) -> Self
- where
- F: Fn(&S::Item, &mut Vec<u8>) -> bool + 'static,
- {
- Self {
- source,
- filter_func: Box::new(filter_func),
- }
- }
-}
-
-impl<S> Source for FilterSource<S>
-where
- S: Source,
-{
- type Item = S::Item;
- fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
- self.source.iter().filter_func(&self.filter_func)
- }
-}
-
#[test]
fn test_filter() {
use crate::source::test_source::TestSource;
+ use crate::source::Source;
use crate::Entry;
+ use std::iter::IntoIterator;
let ts =
TestSource((0u8..10).map(|n| Entry::new(vec![n], vec![])).collect()).filter(|e, sv| {
assert_eq!(
vec![0, 2, 8],
- ts.iter().map(|e| e.key()[0]).collect::<Vec<u8>>()
+ ts.map(|e| e.key()[0])
+ .iter()
+ .into_iter()
+ .collect::<Vec<u8>>()
);
}
-use crate::{SeekableIter, Source};
+use crate::{
+ iter::{IntoIter, Iter},
+ Source,
+};
pub struct MapIter<I, F, O>
where
- I: SeekableIter,
+ I: Iter,
F: FnMut(I::Item) -> O,
{
iter: I,
impl<I, F, O> MapIter<I, F, O>
where
- I: SeekableIter,
+ I: Iter,
F: FnMut(I::Item) -> O,
{
pub fn new(iter: I, mapf: F) -> Self {
}
}
-impl<I, F, O> Iterator for MapIter<I, F, O>
+impl<I, F, O> Iter for MapIter<I, F, O>
where
- I: SeekableIter,
+ I: Iter,
F: FnMut(I::Item) -> O,
{
type Item = O;
let item = self.iter.next()?;
Some((self.mapf)(item))
}
+
+ fn seek(&mut self, key: &[u8]) {
+ self.iter.seek(key);
+ }
}
-impl<I, F, O> SeekableIter for MapIter<I, F, O>
+impl<I, F, O> IntoIterator for MapIter<I, F, O>
where
- I: SeekableIter,
+ I: Iter,
F: FnMut(I::Item) -> O,
{
- fn seek(&mut self, key: &[u8]) {
- self.iter.seek(key);
+ type Item = O;
+ type IntoIter = IntoIter<Self>;
+
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter(self)
}
}
S: Source,
{
type Item = O;
- fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
- self.source.iter().map_func(&self.map)
+ fn iter(&self) -> impl Iter<Item = Self::Item> {
+ self.source.iter().map(&self.map)
}
}
#[cfg(test)]
mod test {
use crate::source::test_source::TestSource;
- use crate::Entry;
- use crate::Source;
+ use crate::{Entry, Iter, Source};
struct CompVec(Vec<u8>);
impl PartialEq<[u8]> for CompVec {
let ts = TestSource((0u8..10).map(|i| Entry::new(vec![i], vec![])).collect())
.map(|e| CompVec(Vec::from(e.key())));
assert_eq!(
- ts.iter().map(|cv| cv.0).collect::<Vec<_>>(),
+ ts.iter().map(|cv| cv.0).into_iter().collect::<Vec<_>>(),
(0u8..10).map(|i| vec![i]).collect::<Vec<_>>()
);
assert_eq!(
ts.get_range(&[2u8], &[7u8])
.map(|cv| cv.0)
+ .into_iter()
.collect::<Vec<_>>(),
(2u8..8)
.map(|i| {
--- /dev/null
+use crate::Iter;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+
+pub trait Mergeable: Ord + PartialOrd<[u8]> + AsRef<[u8]> + Clone {}
+
+struct MergeEntry<I: Iter> {
+ e: I::Item,
+ it: I,
+}
+
+impl<I: Iter> PartialEq for MergeEntry<I>
+where
+ I::Item: PartialEq,
+{
+ fn eq(&self, other: &Self) -> bool {
+ self.e == other.e
+ }
+}
+
+impl<I: Iter> Eq for MergeEntry<I> where I::Item: PartialEq {}
+
+// Note: MergEntry Ord, PartialOrd is reversed to provide "min heap" semantics
+impl<I: Iter> PartialOrd for MergeEntry<I>
+where
+ I::Item: PartialOrd,
+{
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ other.e.partial_cmp(&self.e)
+ }
+}
+
+impl<I: Iter> Ord for MergeEntry<I>
+where
+ I::Item: Ord,
+{
+ fn cmp(&self, other: &Self) -> Ordering {
+ other.e.cmp(&self.e)
+ }
+}
+
+pub struct MergeIter<I: Iter> {
+ heap: BinaryHeap<MergeEntry<I>>,
+ finished: Vec<I>,
+ last_key: Vec<u8>,
+}
+
+impl<I> From<I> for MergeIter<I::Item>
+where
+ I: Iterator,
+ I::Item: Iter,
+ <I::Item as Iter>::Item: Mergeable,
+{
+ fn from(iters: I) -> Self {
+ let mut v: Vec<I::Item> = Vec::new();
+ let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() {
+ Some(e) => Some(MergeEntry { e, it }),
+ None => {
+ v.push(it);
+ None
+ }
+ }));
+ MergeIter {
+ finished: v,
+ heap: h,
+ last_key: Vec::new(),
+ }
+ }
+}
+
+impl<I: Iter> Iter for MergeIter<I>
+where
+ I::Item: Mergeable,
+{
+ type Item = I::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let cur;
+ {
+ let mut next = self.heap.peek_mut()?;
+
+ cur = next.e.clone();
+ self.last_key.clear();
+ self.last_key.extend(cur.as_ref());
+
+ if let Some(e) = next.it.next() {
+ next.e = e;
+ return Some(cur);
+ }
+ }
+ self.heap.pop();
+ Some(cur)
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ if key >= self.last_key.as_ref() {
+ loop {
+ match self.heap.peek_mut() {
+ None => return,
+ Some(mut head) => {
+ if &head.e >= key {
+ return;
+ }
+ head.it.seek(key);
+ if let Some(e) = head.it.next() {
+ head.e = e;
+ continue;
+ }
+ }
+ }
+ self.heap.pop();
+ }
+ }
+
+ // backwards seek; reset heap
+ let mut finished: Vec<I> = Vec::new();
+ let mut heap_entries: Vec<MergeEntry<I>> = Vec::new();
+ for mut it in self
+ .heap
+ .drain()
+ .map(|me| me.it)
+ .chain(self.finished.drain(..))
+ {
+ it.seek(key);
+ match it.next() {
+ Some(e) => heap_entries.push(MergeEntry { e, it }),
+ None => finished.push(it),
+ }
+ }
+ self.heap = BinaryHeap::from_iter(heap_entries);
+ self.finished = finished;
+ self.last_key.clear();
+ self.last_key.extend(key);
+ }
+}
+
+#[cfg(disabled)]
+#[cfg(test)]
+mod test {
+ use super::Merger;
+ use crate::source::test_source::TestSource;
+ use crate::Entry;
+ use crate::Source;
+
+ fn tnum(m: u8) -> Vec<u8> {
+ Vec::from_iter((1u8..255).filter(|i| i % m == 0))
+ }
+
+ fn test_source(m: u8) -> TestSource {
+ TestSource(Vec::from_iter(
+ tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])),
+ ))
+ }
+
+ #[test]
+ fn test_merge() {
+ let range = 1..8;
+ let iters: Vec<_> = range
+ .clone()
+ .map(test_source)
+ .map(|s| s.into_boxed())
+ .collect();
+ let s = Merger::from(iters);
+ let mut v = Vec::<u8>::new();
+ for i in range {
+ v.extend(tnum(i))
+ }
+ v.sort();
+ let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0]));
+ assert_eq!(v2, v);
+ }
+
+ #[test]
+ fn test_binheap() {
+ use std::collections::BinaryHeap;
+
+ let v: Vec<u8> = vec![1, 8, 2, 9, 4, 7, 3];
+ let vs = v.as_slice();
+ let h = BinaryHeap::from_iter(vs.iter().copied());
+ assert_ne!(h.into_vec(), v);
+ }
+}
--- /dev/null
+use std::cmp::Ordering;
+pub mod dupmerge;
+pub mod dupsort;
+pub mod filter;
+pub mod map;
+pub mod merger;
+pub mod prefix;
+pub mod range;
+
+use dupmerge::MergeIter;
+use dupsort::DupsortIter;
+use filter::FilterIter;
+use map::MapIter;
+
+pub trait Iter {
+ type Item;
+
+ fn seek(&mut self, key: &[u8]);
+
+ fn next(&mut self) -> Option<Self::Item>;
+
+ fn seek_to(mut self, key: &[u8]) -> Self
+ where
+ Self: Sized,
+ {
+ self.seek(key);
+ self
+ }
+
+ fn dup_merge<F>(self, merge_func: F) -> MergeIter<Self, F>
+ where
+ Self::Item: PartialEq + Clone,
+ F: FnMut(&mut Self::Item, &Self::Item),
+ Self: Sized,
+ {
+ MergeIter::new(self, merge_func)
+ }
+
+ fn dup_sort<F>(self, dupsort_func: F) -> DupsortIter<Self, F>
+ where
+ Self: Sized,
+ F: FnMut(&Self::Item, &Self::Item) -> Ordering,
+ {
+ DupsortIter::new(self, dupsort_func)
+ }
+
+ fn filter<F>(self, filter_func: F) -> FilterIter<Self, F>
+ where
+ F: FnMut(&Self::Item, &mut Vec<u8>) -> bool,
+ Self: Sized,
+ {
+ FilterIter::new(self, filter_func)
+ }
+
+ fn map<F, O>(self, map_func: F) -> MapIter<Self, F, O>
+ where
+ F: FnMut(Self::Item) -> O,
+ Self: Sized,
+ {
+ MapIter::new(self, map_func)
+ }
+}
+
+impl<I: Iter + ?Sized> Iter for Box<I> {
+ type Item = I::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.as_mut().next()
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ self.as_mut().seek(key);
+ }
+}
+
+pub struct IntoIter<I: Iter>(pub I);
+
+impl<I: Iter> IntoIter<I> {
+ fn new(i: I) -> Self {
+ Self(i)
+ }
+}
+
+impl<I> Iterator for IntoIter<I>
+where
+ I: Iter,
+{
+ type Item = I::Item;
+ fn next(&mut self) -> Option<Self::Item> {
+ self.0.next()
+ }
+}
--- /dev/null
+use crate::iter::{IntoIter, Iter};
+
+pub struct PrefixIter<I>
+where
+ I: Iter,
+ I::Item: AsRef<[u8]>,
+{
+ iter: I,
+ prefix: Vec<u8>,
+}
+
+impl<I> PrefixIter<I>
+where
+ I: Iter,
+ I::Item: AsRef<[u8]>,
+{
+ pub fn new(mut iter: I, prefix: impl AsRef<[u8]>) -> Self {
+ iter.seek(prefix.as_ref());
+ Self {
+ iter,
+ prefix: Vec::from(prefix.as_ref()),
+ }
+ }
+}
+
+impl<I> Iter for PrefixIter<I>
+where
+ I: Iter,
+ I::Item: AsRef<[u8]>,
+{
+ type Item = I::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.iter
+ .next()
+ .filter(|e| e.as_ref().starts_with(self.prefix.as_slice()))
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ self.iter.seek(key);
+ }
+}
+
+impl<I> IntoIterator for PrefixIter<I>
+where
+ I: Iter,
+ I::Item: AsRef<[u8]>,
+{
+ type Item = I::Item;
+ type IntoIter = IntoIter<Self>;
+
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter(self)
+ }
+}
--- /dev/null
+use crate::iter::{IntoIter, Iter};
+
+pub struct RangeIter<I> {
+ iter: I,
+ start: Vec<u8>,
+ end: Vec<u8>,
+}
+
+impl<I> RangeIter<I>
+where
+ I: Iter,
+{
+ pub fn new(mut iter: I, start: impl AsRef<[u8]>, end: impl AsRef<[u8]>) -> Self {
+ iter.seek(start.as_ref());
+ Self {
+ iter,
+ start: Vec::from(start.as_ref()),
+ end: Vec::from(end.as_ref()),
+ }
+ }
+}
+
+impl<I> Iter for RangeIter<I>
+where
+ I: Iter,
+ I::Item: PartialOrd<[u8]>,
+{
+ type Item = I::Item;
+ fn next(&mut self) -> Option<Self::Item> {
+ self.iter.next().filter(|i| i <= self.end.as_slice())
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ if key <= self.start.as_slice() {
+ self.iter.seek(self.start.as_slice());
+ } else if key > self.end.as_slice() {
+ self.iter.seek(self.end.as_slice());
+ } else {
+ self.iter.seek(key);
+ }
+ }
+}
+
+impl<I> IntoIterator for RangeIter<I>
+where
+ I: Iter,
+ I::Item: PartialOrd<[u8]>,
+{
+ type Item = I::Item;
+ type IntoIter = IntoIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter(self)
+ }
+}
mod compression;
-mod dupsort;
mod entry;
-mod filter;
mod iter;
-mod map;
-mod merge;
mod merger;
pub mod reader;
pub mod sorter;
pub use compression::Compression;
pub use entry::Entry;
pub use fileset::Fileset;
-pub use iter::SeekableIter;
+pub use iter::Iter;
pub use merger::Merger;
pub use reader::Reader;
pub use source::Source;
+++ /dev/null
-use crate::{Entry, SeekableIter, Source};
-
-pub struct MergeSource<S: Source> {
- source: S,
- #[allow(clippy::type_complexity)]
- merge_func: Box<dyn Fn(&mut Vec<u8>, &Entry)>,
-}
-
-impl<S> MergeSource<S>
-where
- S: Source,
-{
- pub fn new<F>(source: S, merge_func: F) -> Self
- where
- F: Fn(&mut Vec<u8>, &Entry) + 'static,
- {
- Self {
- source,
- merge_func: Box::new(merge_func),
- }
- }
-}
-impl<S> Source for MergeSource<S>
-where
- S: Source<Item = Entry>,
-{
- type Item = S::Item;
- fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
- self.source.iter().merge_func(&self.merge_func)
- }
-}
-
-pub struct MergeIter<I, F: Fn(&mut Vec<u8>, &Entry)> {
- prev: Option<Entry>,
- iter: I,
- merge_func: F,
-}
-
-impl<I, F> MergeIter<I, F>
-where
- F: Fn(&mut Vec<u8>, &Entry),
-{
- pub fn new(iter: I, merge_func: F) -> Self {
- Self {
- prev: None,
- iter,
- merge_func,
- }
- }
-}
-
-impl<I, F> Iterator for MergeIter<I, F>
-where
- I: Iterator<Item = Entry>,
- F: Fn(&mut Vec<u8>, &Entry),
-{
- type Item = Entry;
- fn next(&mut self) -> Option<Self::Item> {
- let mut cur = self.prev.take().or_else(|| self.iter.next())?;
- for e in &mut self.iter {
- if e.key() == cur.key() {
- (self.merge_func)(cur.value_mut(), &e);
- } else {
- self.prev.replace(e);
- break;
- }
- }
- Some(cur)
- }
-}
-
-impl<I, F> SeekableIter for MergeIter<I, F>
-where
- I: SeekableIter<Item = Entry>,
- F: Fn(&mut Vec<u8>, &Entry),
-{
- fn seek(&mut self, key: &[u8]) {
- self.prev.take();
- self.iter.seek(key);
- }
-}
-
-#[test]
-fn test_merge_func() {
- use crate::source::test_source::TestSource;
-
- let ts = TestSource(
- (1u8..8)
- .flat_map(|n| vec![n; n as usize])
- .map(|n| Entry::new(vec![n], vec![1]))
- .collect(),
- );
-
- let s = ts.merge_func(|v, e| {
- v[0] += e.value()[0];
- });
-
- assert_eq!(
- Vec::from_iter(s.iter()),
- (1u8..8)
- .map(|n| Entry::new(vec![n], vec![n]))
- .collect::<Vec<_>>()
- )
-}
-use crate::{Entry, SeekableIter, Source};
-use std::cmp::Ordering;
-use std::collections::BinaryHeap;
+use crate::iter::{
+ merger::{MergeIter, Mergeable},
+ Iter,
+};
+use crate::Source;
pub struct Merger<S> {
sources: Vec<S>,
}
-impl<S, I> From<I> for Merger<S>
-where
- I: IntoIterator<Item = S>,
-{
- fn from(i: I) -> Self {
- Merger {
- sources: Vec::from_iter(i),
+impl<S> Merger<S> {
+ pub fn new() -> Self {
+ Self {
+ sources: Vec::new(),
}
}
-}
-struct MergeEntry<I: SeekableIter> {
- e: Entry,
- it: I,
-}
-
-impl<I: SeekableIter> PartialEq for MergeEntry<I> {
- fn eq(&self, other: &Self) -> bool {
- self.e.key() == other.e.key()
- }
-}
-impl<I: SeekableIter> Eq for MergeEntry<I> {}
-
-impl<I: SeekableIter> PartialOrd for MergeEntry<I> {
- fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
- Some(other.e.key().cmp(self.e.key()))
+ pub fn add(&mut self, source: S) {
+ self.sources.push(source);
}
}
-impl<I: SeekableIter> Ord for MergeEntry<I> {
- fn cmp(&self, other: &Self) -> Ordering {
- other.e.key().cmp(self.e.key())
- }
-}
-
-pub struct MergeIter<I: SeekableIter> {
- heap: BinaryHeap<MergeEntry<I>>,
- finished: Vec<I>,
- last_key: Vec<u8>,
-}
-
-impl<I> From<I> for MergeIter<I::Item>
+impl<S, I> From<I> for Merger<S>
where
- I: Iterator,
- I::Item: SeekableIter<Item = Entry>,
+ I: IntoIterator<Item = S>,
+ S: Source,
+ S::Item: Mergeable,
{
- fn from(iters: I) -> Self {
- let mut v: Vec<I::Item> = Vec::new();
- let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() {
- Some(e) => Some(MergeEntry { e, it }),
- None => {
- v.push(it);
- None
- }
- }));
- MergeIter {
- finished: v,
- heap: h,
- last_key: Vec::new(),
- }
- }
-}
-
-impl<I: SeekableIter<Item = Entry>> Iterator for MergeIter<I> {
- type Item = Entry;
-
- fn next(&mut self) -> Option<Self::Item> {
- let cur;
- {
- let mut next = self.heap.peek_mut()?;
- cur = next.e.clone();
- if let Some(e) = next.it.next() {
- next.e = e;
- self.last_key.clear();
- self.last_key.extend(next.e.key());
- return Some(cur);
- }
- }
- self.heap.pop();
- Some(cur)
- }
-}
-
-impl<I: SeekableIter<Item = Entry>> SeekableIter for MergeIter<I> {
- fn seek(&mut self, key: &[u8]) {
- if key > self.last_key.as_slice() {
- loop {
- match self.heap.peek_mut() {
- None => return,
- Some(mut head) => {
- if head.e.key() >= key {
- self.last_key.clear();
- self.last_key.extend_from_slice(head.e.key());
- return;
- }
- head.it.seek(key);
- if let Some(e) = head.it.next() {
- head.e = e;
- continue;
- }
- }
- }
- self.heap.pop();
- }
- }
-
- // backwards seek; reset heap
- let mut finished: Vec<I> = Vec::new();
- let mut heap_entries: Vec<MergeEntry<I>> = Vec::new();
- for mut it in self
- .heap
- .drain()
- .map(|me| me.it)
- .chain(self.finished.drain(..))
- {
- it.seek(key);
- match it.next() {
- Some(e) => heap_entries.push(MergeEntry { e, it }),
- None => finished.push(it),
- }
+ fn from(i: I) -> Self {
+ Merger {
+ sources: Vec::from_iter(i),
}
- self.heap = BinaryHeap::from_iter(heap_entries);
- self.finished = finished;
- self.last_key.clear();
- self.last_key.extend_from_slice(key);
}
}
impl<S> Source for Merger<S>
where
- S: Source<Item = Entry>,
+ S: Source,
+ S::Item: Mergeable,
{
type Item = S::Item;
- fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> {
MergeIter::from(self.sources.iter().map(|s| s.iter()))
}
}
+#[cfg(disabled)]
#[cfg(test)]
mod test {
use super::Merger;
-use crate::{Entry, Result, SeekableIter};
+use crate::iter::{IntoIter, Iter};
+use crate::{Entry, Result};
use integer_encoding::VarInt;
use std::mem::size_of;
.decode(&self.data.as_ref()[self.restart_off..], idx)
}
}
+
+impl<D: AsRef<[u8]>> From<Block<D>> for BlockIter<D> {
+ fn from(b: Block<D>) -> BlockIter<D> {
+ BlockIter {
+ block: b,
+ cur_ent: None,
+ off: 0,
+ }
+ }
+}
+
impl<D: AsRef<[u8]>> IntoIterator for Block<D> {
type Item = Entry;
- type IntoIter = BlockIter<D>;
+ type IntoIter = IntoIter<BlockIter<D>>;
fn into_iter(self) -> Self::IntoIter {
- Self::IntoIter {
+ IntoIter(BlockIter {
block: self,
cur_ent: None,
off: 0,
- }
+ })
}
}
}
}
-impl<D: AsRef<[u8]>> Iterator for BlockIter<D> {
+impl<D: AsRef<[u8]>> Iter for BlockIter<D> {
type Item = Entry;
fn next(&mut self) -> Option<Self::Item> {
self.decode().cloned()
}
-}
-impl<D: AsRef<[u8]>> SeekableIter for BlockIter<D> {
fn seek(&mut self, key: &[u8]) {
// TODO: "galloping search"
if self.block.restart_count > 0 {
#[cfg(test)]
mod test {
- use crate::reader::block::Block;
+ use crate::reader::block::{Block, BlockIter};
use crate::writer::block_builder::BlockBuilder;
use crate::Entry;
- use crate::SeekableIter;
+ use crate::Iter;
fn build_block(n: u32, skip: u32, r: usize) -> Block<Vec<u8>> {
let mut bb = BlockBuilder::default();
fn test_block_seek() {
let n = 40;
let b = build_block(n, 10, 10);
- let mut bi = b.into_iter();
+ let mut bi = BlockIter::from(b);
bi.seek(&u32::to_be_bytes(40));
assert_eq!(bi.next().unwrap().key(), &u32::to_be_bytes(40));
bi.seek(&u32::to_be_bytes(32));
use crate::metadata::Metadata;
use crate::Source;
-use crate::{Entry, SeekableIter};
+use crate::{iter::IntoIter, Entry, Iter};
use integer_encoding::VarInt;
pub(crate) mod block;
use crate::compression::CBuf;
off += prelude;
block::Block::new(CBuf::Buf(self.data.clone_range(off, size)))
.expect("bad block")
- .into_iter()
+ .into()
}
}
self.data_iter.replace(
block::Block::new(comp.uncompress(self.reader.data.clone_range(data_off, size))?)
.expect("bad block")
- .into_iter(),
+ .into(),
);
Some(())
}
}
-impl<D: AsRef<[u8]>> Iterator for ReaderIter<D> {
+impl<D: AsRef<[u8]>> Iter for ReaderIter<D> {
type Item = Entry;
+
fn next(&mut self) -> Option<Self::Item> {
if self.data_iter.is_none() {
self.next_block()
}
}
}
-}
-impl<D: AsRef<[u8]>> SeekableIter for ReaderIter<D> {
fn seek(&mut self, key: &[u8]) {
// TODO: detect and skip unneeded seek in iter.
self.index_iter.seek(key);
}
}
+impl<D: AsRef<[u8]>> IntoIterator for ReaderIter<D> {
+ type Item = Entry;
+ type IntoIter = IntoIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter(self)
+ }
+}
+
impl<D: AsRef<[u8]>> Source for Reader<D> {
type Item = Entry;
- fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> {
ReaderIter {
reader: self.clone(),
next_offset: 0,
-use crate::{merge::MergeSource, Entry, Merger, Reader, SeekableIter, Source, Writer};
+use crate::{Entry, Iter, Merger, Reader, Source, Writer};
use memmap::Mmap;
use std::cell::Cell;
-pub struct Sorter<F: Fn(&mut Vec<u8>, &Entry)> {
+pub struct Sorter<F: Fn(&mut Entry, &Entry)> {
batch: Cell<Vec<Entry>>,
batch_size: usize,
max_size: usize,
impl<F> Sorter<F>
where
- F: Fn(&mut Vec<u8>, &Entry) + 'static,
+ F: Fn(&mut Entry, &Entry) + 'static,
{
pub fn new(max_size: usize, merge_func: F) -> Self {
Self {
self.batch_size += esize;
}
- pub fn source(mut self) -> MergeSource<Merger<Reader<Mmap>>> {
+ pub fn source(mut self) -> impl Source<Item = Entry> {
+ // DupmergeSource<Merger<Reader<Mmap>>> {
if !self.batch.get_mut().is_empty() {
self.write_chunk();
}
- Merger::from(self.readers).merge_func(self.merge_func)
+ Merger::from(self.readers).dup_merge(self.merge_func)
}
pub fn write<W: std::io::Write>(self, mut w: Writer<W>) {
- self.source().iter().for_each(|e| w.add(e).unwrap());
+ self.source()
+ .iter()
+ .into_iter()
+ .for_each(|e| w.add(e).unwrap());
}
fn write_chunk(&mut self) {
self.batch
.take()
.iter()
- .merge_func(&self.merge_func)
+ .dup_merge(&self.merge_func)
+ .into_iter()
.for_each(|e| {
w.add(e).unwrap();
});
--- /dev/null
+use crate::{Iter, Source};
+use std::cmp::Ordering;
+
+pub struct DupmergeSource<S, F> {
+ pub(super) source: S,
+ pub(super) merge: F,
+}
+
+impl<S, F> Source for DupmergeSource<S, F>
+where
+ S: Source,
+ S::Item: PartialEq + Clone,
+ F: Fn(&mut S::Item, &S::Item),
+{
+ type Item = S::Item;
+ fn iter(&self) -> impl Iter<Item = Self::Item> {
+ self.source.iter().dup_merge(&self.merge)
+ }
+}
+
+pub struct DupsortSource<S, F> {
+ pub(super) source: S,
+ pub(super) dupsort: F,
+}
+
+impl<S, F> Source for DupsortSource<S, F>
+where
+ S: Source,
+ S::Item: PartialEq,
+ F: Fn(&S::Item, &S::Item) -> Ordering,
+{
+ type Item = S::Item;
+ fn iter(&self) -> impl Iter<Item = Self::Item> {
+ self.source.iter().dup_sort(&self.dupsort)
+ }
+}
+
+pub struct FilterSource<S, F> {
+ pub(super) source: S,
+ pub(super) filter: F,
+}
+
+impl<S, F> Source for FilterSource<S, F>
+where
+ S: Source,
+ F: Fn(&S::Item, &mut Vec<u8>) -> bool,
+{
+ type Item = S::Item;
+ fn iter(&self) -> impl Iter<Item = Self::Item> {
+ self.source.iter().filter(&self.filter)
+ }
+}
+
+pub struct MapSource<S, F, O>
+where
+ S: Source,
+ F: Fn(S::Item) -> O,
+{
+ pub(super) source: S,
+ pub(super) map: F,
+}
+
+impl<S, F, O> Source for MapSource<S, F, O>
+where
+ S: Source,
+ F: Fn(S::Item) -> O,
+{
+ type Item = O;
+ fn iter(&self) -> impl Iter<Item = Self::Item> {
+ self.source.iter().map(&self.map)
+ }
+}
-use crate::dupsort::DupsortSource;
-use crate::filter::FilterSource;
-use crate::iter::{PrefixIter, RangeIter};
-use crate::map::MapSource;
-use crate::merge::MergeSource;
-use crate::{Entry, SeekableIter};
+use crate::iter::{prefix::PrefixIter, range::RangeIter, Iter};
+use crate::Entry;
+use std::cmp::Ordering;
+
+mod adapters;
+pub use adapters::DupmergeSource;
+pub use adapters::DupsortSource;
+pub use adapters::FilterSource;
+pub use adapters::MapSource;
pub trait Source {
type Item;
- fn iter(&self) -> impl SeekableIter<Item = Self::Item>;
- fn get(&self, key: &[u8]) -> RangeIter<impl SeekableIter<Item = Self::Item>>
+ fn iter(&self) -> impl Iter<Item = Self::Item>;
+
+ fn get(&self, key: &[u8]) -> RangeIter<impl Iter<Item = Self::Item>>
where
Self::Item: PartialOrd<[u8]>,
{
RangeIter::new(self.iter(), key, key)
}
- fn get_prefix(&self, prefix: &[u8]) -> PrefixIter<impl SeekableIter<Item = Self::Item>>
+
+ fn get_prefix(&self, prefix: &[u8]) -> PrefixIter<impl Iter<Item = Self::Item>>
where
Self::Item: AsRef<[u8]>,
{
PrefixIter::new(self.iter(), prefix)
}
- fn get_range(&self, start: &[u8], end: &[u8]) -> RangeIter<impl SeekableIter<Item = Self::Item>>
+
+ fn get_range(&self, start: &[u8], end: &[u8]) -> RangeIter<impl Iter<Item = Self::Item>>
where
Self::Item: PartialOrd<[u8]>,
{
RangeIter::new(self.iter(), start, end)
}
- fn merge_func<F>(self, merge_func: F) -> MergeSource<Self>
+ fn dup_merge<F>(self, merge: F) -> DupmergeSource<Self, F>
where
Self: Sized,
- F: Fn(&mut Vec<u8>, &Entry) + 'static,
+ F: Fn(&mut Entry, &Entry) + 'static,
{
- MergeSource::new(self, merge_func)
+ DupmergeSource {
+ source: self,
+ merge,
+ }
}
- fn dupsort_func<F>(self, dupsort_func: F) -> DupsortSource<Self>
+ fn dupsort<F>(self, dupsort: F) -> DupsortSource<Self, F>
where
Self: Sized,
- F: Fn(&Self::Item, &Self::Item) -> std::cmp::Ordering + 'static,
+ F: Fn(&Self::Item, &Self::Item) -> Ordering + 'static,
{
- DupsortSource::new(self, dupsort_func)
+ DupsortSource {
+ source: self,
+ dupsort,
+ }
}
- fn filter<F>(self, filter_func: F) -> FilterSource<Self>
+ fn filter<F>(self, filter: F) -> FilterSource<Self, F>
where
Self: Sized,
F: Fn(&Self::Item, &mut Vec<u8>) -> bool + 'static,
{
- FilterSource::new(self, filter_func)
+ FilterSource {
+ source: self,
+ filter,
+ }
}
- fn map<F, O>(self, map_func: F) -> MapSource<Self, O>
+ fn map<F, O>(self, map: F) -> MapSource<Self, F, O>
where
Self: Sized,
F: Fn(Self::Item) -> O + 'static,
{
- MapSource::new(self, map_func)
+ MapSource { source: self, map }
}
fn into_boxed(self) -> Box<dyn DynSource<Item = Self::Item>>
pub trait DynSource {
type Item;
- fn iter(&self) -> Box<dyn SeekableIter<Item = Self::Item> + '_>;
+ fn iter(&self) -> Box<dyn Iter<Item = Self::Item> + '_>;
}
impl<S: Source + ?Sized> DynSource for Box<S> {
type Item = S::Item;
- fn iter(&self) -> Box<dyn SeekableIter<Item = Self::Item> + '_> {
+ fn iter(&self) -> Box<dyn Iter<Item = Self::Item> + '_> {
Box::new(self.as_ref().iter())
}
}
impl<D: DynSource + ?Sized> Source for D {
type Item = D::Item;
- fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> {
DynSource::iter(self)
}
}
vec: &'a Vec<Entry>,
}
-impl Iterator for VecIter<'_> {
+impl Iter for VecIter<'_> {
type Item = Entry;
fn next(&mut self) -> Option<Self::Item> {
if self.index > self.vec.len() {
Some(e)
}
}
-}
-impl SeekableIter for VecIter<'_> {
fn seek(&mut self, key: &[u8]) {
let mut left = 0;
let mut right = self.vec.len() - 1;
impl Source for Vec<Entry> {
type Item = Entry;
- fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> {
VecIter {
index: 0,
vec: self,
pub mod test_source {
use crate::Entry;
- use crate::SeekableIter;
+ use crate::Iter;
use crate::Source;
pub struct TestSource(pub Vec<Entry>);
off: usize,
}
- impl Iterator for TestIter<'_> {
+ impl Iter for TestIter<'_> {
type Item = Entry;
fn next(&mut self) -> Option<Self::Item> {
self.off += 1;
Some(item.clone())
}
- }
- impl SeekableIter for TestIter<'_> {
fn seek(&mut self, key: &[u8]) {
self.off = 0;
while self.off < self.source.0.len() && self.source.0[self.off].key() < key {
}
}
+ impl IntoIterator for TestIter<'_> {
+ type Item = Entry;
+ type IntoIter = crate::iter::IntoIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ crate::iter::IntoIter(self)
+ }
+ }
+
impl Source for TestSource {
type Item = Entry;
- fn iter(&self) -> impl SeekableIter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> {
TestIter {
source: self,
off: 0,
use super::test_source::TestSource;
#[allow(unused_imports)]
use super::Source;
+ #[allow(unused_imports)]
+ use crate::iter::Iter;
use crate::Entry;
#[allow(dead_code)]