From: Chris Mikkelson Date: Fri, 6 Sep 2024 21:31:07 +0000 (-0500) Subject: Multiple cleanups; flesh out sorter logic X-Git-Url: https://git.mikk.net/?a=commitdiff_plain;h=bf637239b8b1d6cf02fc2dacfef465bcae71451e;p=mtbl-rs Multiple cleanups; flesh out sorter logic Give merge function a mutable reference to data only, to prevent accidental mangling of keys. Implement source on Vec for sorter; migrate tests eventually. --- diff --git a/src/merge_func.rs b/src/merge_func.rs index dd25378..1e16f0e 100644 --- a/src/merge_func.rs +++ b/src/merge_func.rs @@ -1,6 +1,7 @@ use crate::{Entry, Iter, Source}; +use std::sync::Arc; -pub struct MergeFunc { +pub struct MergeFunc, &Entry)> { source: S, merge_func: F, } @@ -8,7 +9,7 @@ pub struct MergeFunc { impl MergeFunc where S: Source, - F: Fn(&mut Entry, &Entry), + F: Fn(&mut Vec, &Entry), { pub fn new(source: S, merge_func: F) -> Self { Self { source, merge_func } @@ -18,7 +19,7 @@ where impl Source for MergeFunc where S: Source, - F: Fn(&mut Entry, &Entry) + Clone, + F: Fn(&mut Vec, &Entry), { fn iter(&self) -> impl Iter { MergeFuncIter { @@ -59,14 +60,14 @@ struct MergeFuncIter<'a, I: Iter, F: Fn(&mut Vec, &Entry)> { impl<'a, I, F> Iterator for MergeFuncIter<'a, I, F> where I: Iter, - F: Fn(&mut Entry, &Entry), + F: Fn(&mut Vec, &Entry), { type Item = Entry; fn next(&mut self) -> Option { let mut cur = self.prev.take().or_else(|| self.iter.next())?; while let Some(e) = self.iter.next() { if e.key == cur.key { - (self.merge_func)(&mut cur, &e); + (self.merge_func)(Arc::make_mut(&mut cur.value), &e); } else { self.prev.replace(e); break; @@ -79,7 +80,7 @@ where impl<'a, I, F> Iter for MergeFuncIter<'a, I, F> where I: Iter, - F: Fn(&mut Entry, &Entry), + F: Fn(&mut Vec, &Entry), { fn seek(&mut self, key: &[u8]) { self.prev.take(); @@ -90,7 +91,6 @@ where #[test] fn test_merge_func() { use crate::source::test_source::TestSource; - use std::sync::Arc; let ts = TestSource( (1u8..8) @@ -100,9 +100,8 @@ fn test_merge_func() { .collect(), ); - let s = (&ts).merge_func(|e1, e2| { - let v1 = Arc::make_mut(&mut e1.value); - v1[0] += e2.value[0]; + let s = ts.merge_func(|v, e| { + v[0] += e.value[0]; }); assert_eq!( diff --git a/src/reader/block.rs b/src/reader/block.rs index 5f5f1a0..d456f6b 100644 --- a/src/reader/block.rs +++ b/src/reader/block.rs @@ -114,10 +114,7 @@ impl> BlockIter { } fn bsearch_restart(&mut self, key: &[u8], mut left: usize, mut right: usize) { - let mut counter = 0; while left < right { - counter += 1; - assert!(counter < 10); let mid = (left + right + 1) / 2; self.seek_restart(mid) .map(|rk| { diff --git a/src/sorter.rs b/src/sorter.rs index 72e4823..5012530 100644 --- a/src/sorter.rs +++ b/src/sorter.rs @@ -1,14 +1,9 @@ -use crate::merge_func::MergeFunc; -use crate::{Entry, Merger, Reader, Source}; +use crate::{Entry, Merger, Reader, Source, Writer}; use memmap::Mmap; +use std::cell::Cell; -/* -Sorter - method add(&mut self, e: Entry); - method source(self) -> impl Source -*/ -pub struct Sorter { - batch: Vec, +pub struct Sorter, &Entry)> { + batch: Cell>, batch_size: usize, max_size: usize, merge_func: F, @@ -18,11 +13,11 @@ pub struct Sorter { impl Sorter where - F: Fn(&mut Entry, &Entry), + F: Fn(&mut Vec, &Entry), { pub fn new(max_size: usize, merge_func: F) -> Self { Self { - batch: Vec::new(), + batch: Cell::new(Vec::new()), batch_size: 0, max_size, merge_func, @@ -36,16 +31,34 @@ where if esize + self.batch_size > self.max_size { self.write_chunk(); } - self.batch.push(e); + self.batch.get_mut().push(e); self.batch_size += esize; } - pub fn source(mut self) -> MergeFunc>, F> { - if self.batch.len() > 0 { + pub fn source(mut self) -> impl Source { + if self.batch.get_mut().len() > 0 { self.write_chunk(); } Merger::from(self.readers).merge_func(self.merge_func) } - fn write_chunk(&mut self) {} + pub fn write(self, mut w: Writer) { + self.source().iter().for_each(|e| { + w.add(e).unwrap(); + }); + } + + fn write_chunk(&mut self) { + let mut w = Writer::new(Vec::new()); + self.batch + .get_mut() + .sort_unstable_by(|a, b| a.key.cmp(&b.key)); + self.batch + .take() + .merge_func(&self.merge_func) + .iter() + .for_each(|e| { + w.add(e).unwrap(); + }); + } } diff --git a/src/source.rs b/src/source.rs index 9403378..dd79f88 100644 --- a/src/source.rs +++ b/src/source.rs @@ -18,7 +18,7 @@ pub trait Source { fn merge_func(self, merge_func: F) -> MergeFunc where Self: Sized, - F: Fn(&mut Entry, &Entry), + F: Fn(&mut Vec, &Entry), { MergeFunc::new(self, merge_func) } @@ -32,6 +32,49 @@ pub trait Source { } } +struct VecIter<'a> { + index: usize, + vec: &'a Vec, +} + +impl<'a> Iterator for VecIter<'a> { + type Item = Entry; + fn next(&mut self) -> Option { + if self.index > self.vec.len() { + None + } else { + let e = self.vec[self.index].clone(); + self.index += 1; + Some(e) + } + } +} + +impl<'a> Iter for VecIter<'a> { + fn seek(&mut self, key: &[u8]) { + let mut left = 0; + let mut right = self.vec.len() - 1; + while left < right { + let mid = (left + right + 1) / 2; + if self.vec[mid].key.as_slice() < key { + left = mid; + } else { + right = mid - 1; + } + } + self.index = left; + } +} + +impl Source for Vec { + fn iter(&self) -> impl Iter { + VecIter { + index: 0, + vec: self, + } + } +} + #[cfg(test)] pub mod test_source { use crate::Entry;