--- /dev/null
+use crate::iter::dupmerge::Mergeable;
+
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+
+pub struct Merger<S> {
+ sources: Vec<S>,
+}
+
+pub trait Mergeable: Ord + PartialOrd<[u8]> + AsRef<[u8]> + Clone {}
+
+impl<S, I> From<I> for Merger<S>
+where
+ I: IntoIterator<Item = S>,
+ S: Source,
+ S::Item: Mergeable,
+{
+ fn from(i: I) -> Self {
+ Merger {
+ sources: Vec::from_iter(i),
+ }
+ }
+}
+
+struct MergeEntry<I: Iter> {
+ e: I::Item,
+ it: I,
+}
+
+impl<I: Iter> PartialEq for MergeEntry<I>
+where
+ I::Item: PartialEq,
+{
+ fn eq(&self, other: &Self) -> bool {
+ self.e == other.e
+ }
+}
+
+impl<I: Iter> Eq for MergeEntry<I> where I::Item: PartialEq {}
+
+// Note: MergEntry Ord, PartialOrd is reversed to provide "min heap" semantics
+impl<I: Iter> PartialOrd for MergeEntry<I>
+where
+ I::Item: PartialOrd,
+{
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ other.e.partial_cmp(&self.e)
+ }
+}
+
+impl<I: Iter> Ord for MergeEntry<I>
+where
+ I::Item: Ord,
+{
+ fn cmp(&self, other: &Self) -> Ordering {
+ other.e.cmp(&self.e)
+ }
+}
+
+pub struct MergeIter<I: Iter> {
+ heap: BinaryHeap<MergeEntry<I>>,
+ finished: Vec<I>,
+ last_key: Vec<u8>,
+}
+
+impl<I> From<I> for MergeIter<I::Item>
+where
+ I: Iterator,
+ I::Item: Iter,
+ <I::Item as Iter>::Item: Mergeable,
+{
+ fn from(iters: I) -> Self {
+ let mut v: Vec<I::Item> = Vec::new();
+ let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() {
+ Some(e) => Some(MergeEntry { e, it }),
+ None => {
+ v.push(it);
+ None
+ }
+ }));
+ MergeIter {
+ finished: v,
+ heap: h,
+ last_key: Vec::new(),
+ }
+ }
+}
+
+impl<I: Iter> Iter for MergeIter<I>
+where
+ I::Item: Mergeable,
+{
+ type Item = I::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let cur;
+ {
+ let mut next = self.heap.peek_mut()?;
+
+ cur = next.e.clone();
+ self.last_key.clear();
+ self.last_key.extend(cur.as_ref());
+
+ if let Some(e) = next.it.next() {
+ next.e = e;
+ return Some(cur);
+ }
+ }
+ self.heap.pop();
+ Some(cur)
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ if key >= self.last_key.as_ref() {
+ loop {
+ match self.heap.peek_mut() {
+ None => return,
+ Some(mut head) => {
+ if &head.e >= key {
+ return;
+ }
+ head.it.seek(key);
+ if let Some(e) = head.it.next() {
+ head.e = e;
+ continue;
+ }
+ }
+ }
+ self.heap.pop();
+ }
+ }
+
+ // backwards seek; reset heap
+ let mut finished: Vec<I> = Vec::new();
+ let mut heap_entries: Vec<MergeEntry<I>> = Vec::new();
+ for mut it in self
+ .heap
+ .drain()
+ .map(|me| me.it)
+ .chain(self.finished.drain(..))
+ {
+ it.seek(key);
+ match it.next() {
+ Some(e) => heap_entries.push(MergeEntry { e, it }),
+ None => finished.push(it),
+ }
+ }
+ self.heap = BinaryHeap::from_iter(heap_entries);
+ self.finished = finished;
+ self.last_key.clear();
+ self.last_key.extend(key);
+ }
+}
+
+impl<S> Source for Merger<S>
+where
+ S: Source,
+ S::Item: Mergeable,
+{
+ type Item = S::Item;
+ fn iter(&self) -> impl Iter<Item = Self::Item> {
+ MergeIter::from(self.sources.iter().map(|s| s.iter()))
+ }
+}
+
+#[cfg(disabled)]
+#[cfg(test)]
+mod test {
+ use super::Merger;
+ use crate::source::test_source::TestSource;
+ use crate::Entry;
+ use crate::Source;
+
+ fn tnum(m: u8) -> Vec<u8> {
+ Vec::from_iter((1u8..255).filter(|i| i % m == 0))
+ }
+
+ fn test_source(m: u8) -> TestSource {
+ TestSource(Vec::from_iter(
+ tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])),
+ ))
+ }
+
+ #[test]
+ fn test_merge() {
+ let range = 1..8;
+ let iters: Vec<_> = range
+ .clone()
+ .map(test_source)
+ .map(|s| s.into_boxed())
+ .collect();
+ let s = Merger::from(iters);
+ let mut v = Vec::<u8>::new();
+ for i in range {
+ v.extend(tnum(i))
+ }
+ v.sort();
+ let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0]));
+ assert_eq!(v2, v);
+ }
+
+ #[test]
+ fn test_binheap() {
+ use std::collections::BinaryHeap;
+
+ let v: Vec<u8> = vec![1, 8, 2, 9, 4, 7, 3];
+ let vs = v.as_slice();
+ let h = BinaryHeap::from_iter(vs.iter().copied());
+ assert_ne!(h.into_vec(), v);
+ }
+}
.nth(1)
.expect("Usage: mtbl_dump <filename>");
let reader = mtbl::reader::from_file(fname);
- for e in reader.iter().into_iter() {
+ for e in reader.iter() {
println!("{:?}: {:?}", e.key(), e.value());
}
}
F: FnMut(&mut I::Item, &I::Item),
{
prev: Option<I::Item>,
- cur: Option<I::Item>,
iter: I,
merge_func: F,
}
pub fn new(iter: I, merge_func: F) -> Self {
Self {
prev: None,
- cur: None,
iter,
merge_func,
}
}
}
-#[cfg(disable)]
#[test]
fn test_merge_func() {
use crate::source::test_source::TestSource;
+ use crate::Entry;
+ use crate::Source;
let ts = TestSource(
(1u8..8)
.collect(),
);
- let s = ts.merge_func(|v, e| {
- v[0] += e.value()[0];
+ let s = ts.dup_merge(|v, e| {
+ v.value_mut()[0] += e.value()[0];
});
assert_eq!(
}
}
-#[cfg(disable)]
#[test]
fn test_dupsort() {
use crate::source::test_source::TestSource;
use crate::Entry;
+ use crate::Source;
let ts = TestSource(
(1u8..10)
.collect(),
);
- let s = ts.dupsort_func(|a, b| a.value()[0].cmp(&b.value()[0]));
+ let s = ts.dupsort(|a, b| a.value()[0].cmp(&b.value()[0]));
assert_eq!(
Vec::from_iter(s.iter()),
-use crate::Iter;
+use crate::iter::{IntoIter, Iter};
pub struct FilterIter<I, F> {
iter: I,
}
}
+impl<I, F> IntoIterator for FilterIter<I, F>
+where
+ I: Iter,
+ F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
+{
+ type Item = I::Item;
+ type IntoIter = IntoIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter(self)
+ }
+}
+
#[test]
fn test_filter() {
use crate::source::test_source::TestSource;
-use crate::{
- iter::{IntoIter, Iter},
- Source,
-};
+use crate::iter::{IntoIter, Iter};
pub struct MapIter<I, F, O>
where
}
}
-pub struct MapSource<S, O>
-where
- S: Source,
-{
- source: S,
- #[allow(clippy::type_complexity)]
- map: Box<dyn Fn(S::Item) -> O>,
-}
-
-impl<S, O> MapSource<S, O>
-where
- S: Source,
-{
- pub fn new<F>(source: S, map: F) -> Self
- where
- F: Fn(S::Item) -> O + 'static,
- {
- Self {
- source,
- map: Box::new(map),
- }
- }
-}
-
-impl<S, O> Source for MapSource<S, O>
-where
- S: Source,
-{
- type Item = O;
- fn iter(&self) -> impl Iter<Item = Self::Item> {
- self.source.iter().map(&self.map)
- }
-}
-
#[cfg(test)]
mod test {
use crate::source::test_source::TestSource;
-use crate::Iter;
+use crate::iter::{IntoIter, Iter};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
}
}
-#[cfg(disabled)]
+impl<I: Iter> IntoIterator for MergeIter<I>
+where
+ I::Item: Mergeable,
+{
+ type Item = I::Item;
+ type IntoIter = IntoIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter(self)
+ }
+}
+
#[cfg(test)]
mod test {
- use super::Merger;
use crate::source::test_source::TestSource;
use crate::Entry;
- use crate::Source;
+ use crate::Merger;
+ use crate::{Iter, Source};
fn tnum(m: u8) -> Vec<u8> {
Vec::from_iter((1u8..255).filter(|i| i % m == 0))
}
}
-pub struct IntoIter<I: Iter>(pub I);
+pub struct BoxedIter<'a, T>(pub Box<dyn Iter<Item = T> + 'a>);
+impl<'a, T> Iter for BoxedIter<'a, T> {
+ type Item = T;
+ fn next(&mut self) -> Option<Self::Item> {
+ self.0.as_mut().next()
+ }
+ fn seek(&mut self, key: &[u8]) {
+ self.0.as_mut().seek(key)
+ }
+}
-impl<I: Iter> IntoIter<I> {
- fn new(i: I) -> Self {
- Self(i)
+impl<'a, T> IntoIterator for BoxedIter<'a, T> {
+ type Item = T;
+ type IntoIter = IntoIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter(self)
}
}
+pub struct IntoIter<I: Iter>(pub I);
+
impl<I> Iterator for IntoIter<I>
where
I: Iter,
sources: Vec<S>,
}
+impl<S> Default for Merger<S> {
+ fn default() -> Self {
+ Self {
+ sources: Vec::new(),
+ }
+ }
+}
+
impl<S> Merger<S> {
pub fn new() -> Self {
Self {
S::Item: Mergeable,
{
type Item = S::Item;
- fn iter(&self) -> impl Iter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
MergeIter::from(self.sources.iter().map(|s| s.iter()))
}
}
-#[cfg(disabled)]
#[cfg(test)]
mod test {
use super::Merger;
use crate::source::test_source::TestSource;
use crate::Entry;
- use crate::Source;
+ use crate::{Iter, Source};
fn tnum(m: u8) -> Vec<u8> {
Vec::from_iter((1u8..255).filter(|i| i % m == 0))
impl<D: AsRef<[u8]>> Source for Reader<D> {
type Item = Entry;
- fn iter(&self) -> impl Iter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
ReaderIter {
reader: self.clone(),
next_offset: 0,
+use crate::source::DynSource;
use crate::{Entry, Iter, Merger, Reader, Source, Writer};
use memmap::Mmap;
use std::cell::Cell;
self.batch_size += esize;
}
- pub fn source(mut self) -> impl Source<Item = Entry> {
- // DupmergeSource<Merger<Reader<Mmap>>> {
+ pub fn source(mut self) -> Box<dyn DynSource<Item = Entry>> {
if !self.batch.get_mut().is_empty() {
self.write_chunk();
}
- Merger::from(self.readers).dup_merge(self.merge_func)
+ Merger::from(self.readers)
+ .dup_merge(self.merge_func)
+ .into_boxed()
}
pub fn write<W: std::io::Write>(self, mut w: Writer<W>) {
self.source()
+ .as_ref() // XXX - need to further wrap Box<dyn DynSource> in BoxedSource
.iter()
.into_iter()
.for_each(|e| w.add(e).unwrap());
F: Fn(&mut S::Item, &S::Item),
{
type Item = S::Item;
- fn iter(&self) -> impl Iter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
self.source.iter().dup_merge(&self.merge)
}
}
F: Fn(&S::Item, &S::Item) -> Ordering,
{
type Item = S::Item;
- fn iter(&self) -> impl Iter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
self.source.iter().dup_sort(&self.dupsort)
}
}
F: Fn(&S::Item, &mut Vec<u8>) -> bool,
{
type Item = S::Item;
- fn iter(&self) -> impl Iter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
self.source.iter().filter(&self.filter)
}
}
F: Fn(S::Item) -> O,
{
type Item = O;
- fn iter(&self) -> impl Iter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
self.source.iter().map(&self.map)
}
}
-use crate::iter::{prefix::PrefixIter, range::RangeIter, Iter};
+use crate::iter::{prefix::PrefixIter, range::RangeIter, BoxedIter, IntoIter, Iter};
use crate::Entry;
use std::cmp::Ordering;
pub trait Source {
type Item;
- fn iter(&self) -> impl Iter<Item = Self::Item>;
+ fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item>;
fn get(&self, key: &[u8]) -> RangeIter<impl Iter<Item = Self::Item>>
where
pub trait DynSource {
type Item;
- fn iter(&self) -> Box<dyn Iter<Item = Self::Item> + '_>;
+ fn iter(&self) -> BoxedIter<'_, Self::Item>;
}
impl<S: Source + ?Sized> DynSource for Box<S> {
type Item = S::Item;
- fn iter(&self) -> Box<dyn Iter<Item = Self::Item> + '_> {
- Box::new(self.as_ref().iter())
+ fn iter(&self) -> BoxedIter<'_, Self::Item> {
+ BoxedIter(Box::new(self.as_ref().iter()))
}
}
impl<D: DynSource + ?Sized> Source for D {
type Item = D::Item;
- fn iter(&self) -> impl Iter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
DynSource::iter(self)
}
}
}
}
+impl<'a> IntoIterator for VecIter<'a> {
+ type Item = Entry;
+ type IntoIter = IntoIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter(self)
+ }
+}
+
impl Source for Vec<Entry> {
type Item = Entry;
- fn iter(&self) -> impl Iter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
VecIter {
index: 0,
vec: self,
impl Source for TestSource {
type Item = Entry;
- fn iter(&self) -> impl Iter<Item = Self::Item> {
+ fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
TestIter {
source: self,
off: 0,
use mtbl::{Entry, Reader, Source, Writer};
-//#[test]
#[test]
fn test_write_readback() {
let mut store = Vec::<u8>::new();
assert!(store.len() > 512);
let r = Reader::new(store);
let ri = r.iter();
- assert_eq!(ri.collect::<Vec<_>>(), reference);
+ assert_eq!(ri.into_iter().collect::<Vec<_>>(), reference);
// test range
let start = u32::to_be_bytes(192);
let end = u32::to_be_bytes(256);
let rangei = r.get_range(&start, &end);
assert_eq!(
- rangei.collect::<Vec<_>>(),
+ rangei.into_iter().collect::<Vec<_>>(),
reference
.into_iter()
.filter(|e| e.key() >= &u32::to_be_bytes(192) && e.key() <= &u32::to_be_bytes(256))