use crate::{Entry, Iter, Source};
+use std::sync::Arc;
-pub struct MergeFunc<S: Source, F: Fn(&mut Entry, &Entry)> {
+pub struct MergeFunc<S: Source, F: Fn(&mut Vec<u8>, &Entry)> {
source: S,
merge_func: F,
}
impl<S, F> MergeFunc<S, F>
where
S: Source,
- F: Fn(&mut Entry, &Entry),
+ F: Fn(&mut Vec<u8>, &Entry),
{
pub fn new(source: S, merge_func: F) -> Self {
Self { source, merge_func }
impl<S, F> Source for MergeFunc<S, F>
where
S: Source,
- F: Fn(&mut Entry, &Entry) + Clone,
+ F: Fn(&mut Vec<u8>, &Entry),
{
fn iter(&self) -> impl Iter {
MergeFuncIter {
impl<'a, I, F> Iterator for MergeFuncIter<'a, I, F>
where
I: Iter,
- F: Fn(&mut Entry, &Entry),
+ F: Fn(&mut Vec<u8>, &Entry),
{
type Item = Entry;
fn next(&mut self) -> Option<Self::Item> {
let mut cur = self.prev.take().or_else(|| self.iter.next())?;
while let Some(e) = self.iter.next() {
if e.key == cur.key {
- (self.merge_func)(&mut cur, &e);
+ (self.merge_func)(Arc::make_mut(&mut cur.value), &e);
} else {
self.prev.replace(e);
break;
impl<'a, I, F> Iter for MergeFuncIter<'a, I, F>
where
I: Iter,
- F: Fn(&mut Entry, &Entry),
+ F: Fn(&mut Vec<u8>, &Entry),
{
fn seek(&mut self, key: &[u8]) {
self.prev.take();
#[test]
fn test_merge_func() {
use crate::source::test_source::TestSource;
- use std::sync::Arc;
let ts = TestSource(
(1u8..8)
.collect(),
);
- let s = (&ts).merge_func(|e1, e2| {
- let v1 = Arc::make_mut(&mut e1.value);
- v1[0] += e2.value[0];
+ let s = ts.merge_func(|v, e| {
+ v[0] += e.value[0];
});
assert_eq!(
}
fn bsearch_restart(&mut self, key: &[u8], mut left: usize, mut right: usize) {
- let mut counter = 0;
while left < right {
- counter += 1;
- assert!(counter < 10);
let mid = (left + right + 1) / 2;
self.seek_restart(mid)
.map(|rk| {
-use crate::merge_func::MergeFunc;
-use crate::{Entry, Merger, Reader, Source};
+use crate::{Entry, Merger, Reader, Source, Writer};
use memmap::Mmap;
+use std::cell::Cell;
-/*
-Sorter
- method add(&mut self, e: Entry);
- method source(self) -> impl Source
-*/
-pub struct Sorter<F: Fn(&mut Entry, &Entry)> {
- batch: Vec<Entry>,
+pub struct Sorter<F: Fn(&mut Vec<u8>, &Entry)> {
+ batch: Cell<Vec<Entry>>,
batch_size: usize,
max_size: usize,
merge_func: F,
impl<F> Sorter<F>
where
- F: Fn(&mut Entry, &Entry),
+ F: Fn(&mut Vec<u8>, &Entry),
{
pub fn new(max_size: usize, merge_func: F) -> Self {
Self {
- batch: Vec::new(),
+ batch: Cell::new(Vec::new()),
batch_size: 0,
max_size,
merge_func,
if esize + self.batch_size > self.max_size {
self.write_chunk();
}
- self.batch.push(e);
+ self.batch.get_mut().push(e);
self.batch_size += esize;
}
- pub fn source(mut self) -> MergeFunc<Merger<Reader<Mmap>>, F> {
- if self.batch.len() > 0 {
+ pub fn source(mut self) -> impl Source {
+ if self.batch.get_mut().len() > 0 {
self.write_chunk();
}
Merger::from(self.readers).merge_func(self.merge_func)
}
- fn write_chunk(&mut self) {}
+ pub fn write<W: std::io::Write>(self, mut w: Writer<W>) {
+ self.source().iter().for_each(|e| {
+ w.add(e).unwrap();
+ });
+ }
+
+ fn write_chunk(&mut self) {
+ let mut w = Writer::new(Vec::new());
+ self.batch
+ .get_mut()
+ .sort_unstable_by(|a, b| a.key.cmp(&b.key));
+ self.batch
+ .take()
+ .merge_func(&self.merge_func)
+ .iter()
+ .for_each(|e| {
+ w.add(e).unwrap();
+ });
+ }
}
fn merge_func<F>(self, merge_func: F) -> MergeFunc<Self, F>
where
Self: Sized,
- F: Fn(&mut Entry, &Entry),
+ F: Fn(&mut Vec<u8>, &Entry),
{
MergeFunc::new(self, merge_func)
}
}
}
+struct VecIter<'a> {
+ index: usize,
+ vec: &'a Vec<Entry>,
+}
+
+impl<'a> Iterator for VecIter<'a> {
+ type Item = Entry;
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.index > self.vec.len() {
+ None
+ } else {
+ let e = self.vec[self.index].clone();
+ self.index += 1;
+ Some(e)
+ }
+ }
+}
+
+impl<'a> Iter for VecIter<'a> {
+ fn seek(&mut self, key: &[u8]) {
+ let mut left = 0;
+ let mut right = self.vec.len() - 1;
+ while left < right {
+ let mid = (left + right + 1) / 2;
+ if self.vec[mid].key.as_slice() < key {
+ left = mid;
+ } else {
+ right = mid - 1;
+ }
+ }
+ self.index = left;
+ }
+}
+
+impl Source for Vec<Entry> {
+ fn iter(&self) -> impl Iter {
+ VecIter {
+ index: 0,
+ vec: self,
+ }
+ }
+}
+
#[cfg(test)]
pub mod test_source {
use crate::Entry;