--- /dev/null
+use crate::{Entry, Iter, Source};
+use std::cmp::Ordering;
+
+pub struct DupsortFunc<S, F>
+where
+ S: Source,
+ F: Fn(&Entry, &Entry) -> Ordering,
+{
+ source: S,
+ dupsort_func: F,
+}
+
+impl<S, F> DupsortFunc<S, F>
+where
+ S: Source,
+ F: Fn(&Entry, &Entry) -> Ordering,
+{
+ pub fn new(source: S, dupsort_func: F) -> Self {
+ Self {
+ source,
+ dupsort_func,
+ }
+ }
+}
+
+impl<'a, S, F> Source for &'a DupsortFunc<S, F>
+where
+ S: Source,
+ F: Fn(&Entry, &Entry) -> Ordering,
+{
+ type It = DupsortFuncIter<'a, S::It, F>;
+ fn iter(&self) -> Self::It {
+ Self::It {
+ run: Vec::new(),
+ next: None,
+ iter: self.source.iter(),
+ dupsort_func: &self.dupsort_func,
+ }
+ }
+ type Get = DupsortFuncIter<'a, S::Get, F>;
+ fn get(&self, key: &[u8]) -> Self::Get {
+ Self::Get {
+ run: Vec::new(),
+ next: None,
+ iter: self.source.get(key),
+ dupsort_func: &self.dupsort_func,
+ }
+ }
+ type Prefix = DupsortFuncIter<'a, S::Prefix, F>;
+ fn get_prefix(&self, prefix: &[u8]) -> Self::Prefix {
+ Self::Prefix {
+ run: Vec::new(),
+ next: None,
+ iter: self.source.get_prefix(prefix),
+ dupsort_func: &self.dupsort_func,
+ }
+ }
+ type Range = DupsortFuncIter<'a, S::Range, F>;
+ fn get_range(&self, start: &[u8], end: &[u8]) -> Self::Range {
+ Self::Range {
+ run: Vec::new(),
+ next: None,
+ iter: self.source.get_range(start, end),
+ dupsort_func: &self.dupsort_func,
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct DupsortFuncIter<'a, I, F>
+where
+ I: Iter,
+ F: Fn(&Entry, &Entry) -> Ordering,
+{
+ run: Vec<Entry>,
+ next: Option<Entry>,
+ iter: I,
+ dupsort_func: &'a F,
+}
+
+impl<'a, I, F> Iterator for DupsortFuncIter<'a, I, F>
+where
+ I: Iter,
+ F: Fn(&Entry, &Entry) -> Ordering,
+{
+ type Item = Entry;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.run.pop().or_else(|| {
+ self.run
+ .push(self.next.take().or_else(|| self.iter.next())?);
+
+ // println!("2: {:?} / {:?}", self.next, self.run);
+ while let Some(e) = self.iter.next() {
+ if e.key == self.run[0].key {
+ self.run.push(e);
+ continue;
+ }
+ self.next.replace(e);
+ break;
+ }
+ // sort in reverse order, so self.run.pop() can be the usual
+ // return value.
+ self.run.sort_unstable_by(|a, b| (self.dupsort_func)(b, a));
+ self.run.pop()
+ })
+ }
+}
+
+impl<'a, I, F> Iter for DupsortFuncIter<'a, I, F>
+where
+ I: Iter,
+ F: Fn(&Entry, &Entry) -> Ordering,
+{
+ fn seek(&mut self, key: &[u8]) {
+ self.run.clear();
+ self.next.take();
+ self.iter.seek(key);
+ }
+}
+
+#[test]
+fn test_dupsort() {
+ use crate::source::test_source::TestSource;
+
+ let ts = TestSource(
+ (1u8..10)
+ .map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j])))
+ .flatten()
+ .collect(),
+ );
+
+ let s = ts.dupsort_func(|a, b| a.value[0].cmp(&b.value[0]));
+
+ assert_eq!(
+ Vec::from_iter((&s).iter()),
+ (1u8..10)
+ .map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j])))
+ .flatten()
+ .collect::<Vec<_>>()
+ );
+}
mod compression;
+mod dupsort_func;
mod entry;
mod iter;
+mod merge_func;
mod merger;
pub mod reader;
pub mod source;
--- /dev/null
+use crate::{Entry, Iter, Source};
+
+pub struct MergeFunc<S: Source, F: Fn(&mut Entry, &Entry)> {
+ source: S,
+ merge_func: F,
+}
+
+impl<S, F> MergeFunc<S, F>
+where
+ S: Source,
+ F: Fn(&mut Entry, &Entry),
+{
+ pub fn new(source: S, merge_func: F) -> Self {
+ Self { source, merge_func }
+ }
+}
+
+impl<'a, S, F> Source for &'a MergeFunc<S, F>
+where
+ S: Source,
+ F: Fn(&mut Entry, &Entry) + Clone,
+{
+ type It = MergeFuncIter<'a, S::It, F>;
+ fn iter(&self) -> Self::It {
+ Self::It {
+ prev: None,
+ iter: self.source.iter(),
+ merge_func: &self.merge_func,
+ }
+ }
+ type Get = MergeFuncIter<'a, S::Get, F>;
+ fn get(&self, key: &[u8]) -> Self::Get {
+ MergeFuncIter {
+ prev: None,
+ iter: self.source.get(key),
+ merge_func: &self.merge_func,
+ }
+ }
+ type Prefix = MergeFuncIter<'a, S::Prefix, F>;
+ fn get_prefix(&self, prefix: &[u8]) -> Self::Prefix {
+ MergeFuncIter {
+ prev: None,
+ iter: self.source.get_prefix(prefix),
+ merge_func: &self.merge_func,
+ }
+ }
+ type Range = MergeFuncIter<'a, S::Range, F>;
+ fn get_range(&self, start: &[u8], end: &[u8]) -> Self::Range {
+ MergeFuncIter {
+ prev: None,
+ iter: self.source.get_range(start, end),
+ merge_func: &self.merge_func,
+ }
+ }
+}
+
+pub struct MergeFuncIter<'a, I: Iter, F: Fn(&mut Entry, &Entry)> {
+ prev: Option<Entry>,
+ iter: I,
+ merge_func: &'a F,
+}
+
+impl<'a, I, F> Iterator for MergeFuncIter<'a, I, F>
+where
+ I: Iter,
+ F: Fn(&mut Entry, &Entry),
+{
+ type Item = Entry;
+ fn next(&mut self) -> Option<Self::Item> {
+ let mut cur = self.prev.take().or_else(|| self.iter.next())?;
+ while let Some(e) = self.iter.next() {
+ if e.key == cur.key {
+ (self.merge_func)(&mut cur, &e);
+ } else {
+ self.prev.replace(e);
+ break;
+ }
+ }
+ Some(cur)
+ }
+}
+
+impl<'a, I, F> Iter for MergeFuncIter<'a, I, F>
+where
+ I: Iter,
+ F: Fn(&mut Entry, &Entry),
+{
+ fn seek(&mut self, key: &[u8]) {
+ self.prev.take();
+ self.iter.seek(key);
+ }
+}
+
+#[test]
+fn test_merge_func() {
+ use crate::source::test_source::TestSource;
+ use std::sync::Arc;
+
+ let ts = TestSource(
+ (1u8..8)
+ .map(|n| vec![n; n as usize])
+ .flatten()
+ .map(|n| Entry::new(vec![n], vec![1]))
+ .collect(),
+ );
+
+ let s = (&ts).merge_func(|e1, e2| {
+ let v1 = Arc::make_mut(&mut e1.value);
+ v1[0] += e2.value[0];
+ });
+
+ assert_eq!(
+ Vec::from_iter((&s).iter()),
+ (1u8..8)
+ .map(|n| Entry::new(vec![n], vec![n]))
+ .collect::<Vec<_>>()
+ )
+}
+use crate::dupsort_func::DupsortFunc;
use crate::iter::{BoxedIter, PrefixIter, RangeIter};
-use crate::Iter;
+use crate::merge_func::MergeFunc;
+use crate::{Entry, Iter};
use std::marker::PhantomData;
pub trait Source {
fn get(&self, key: &[u8]) -> Self::Get;
fn get_prefix(&self, prefix: &[u8]) -> Self::Prefix;
fn get_range(&self, start: &[u8], end: &[u8]) -> Self::Range;
+
+ fn merge_func<F>(self, merge_func: F) -> MergeFunc<Self, F>
+ where
+ Self: Sized,
+ F: Fn(&mut Entry, &Entry),
+ {
+ MergeFunc::new(self, merge_func)
+ }
+
+ fn dupsort_func<F>(self, dupsort_func: F) -> DupsortFunc<Self, F>
+ where
+ Self: Sized,
+ F: Fn(&Entry, &Entry) -> std::cmp::Ordering,
+ {
+ DupsortFunc::new(self, dupsort_func)
+ }
}
pub trait IterSource {