From: Chris Mikkelson Date: Sat, 21 Jun 2025 20:33:18 +0000 (-0500) Subject: Massive refactor #N for borrowing iteration X-Git-Url: https://git.mikk.net/?a=commitdiff_plain;h=6a45e41342df39f1ccd367da8f7bd5c1097376b4;p=mtbl-rs Massive refactor #N for borrowing iteration New "Cursor" abstraction returns a tuple of references to a byte slice key and associated Value type. This tuple replaces Entry (removed), and the cursor's immutable get() allows the merger heap to use the cursor directly for ordering. WIP, several bugs likely remain, but unit tests pass. --- diff --git a/src/bin/mtbl_dump.rs b/src/bin/mtbl_dump.rs index c1de724..c122398 100644 --- a/src/bin/mtbl_dump.rs +++ b/src/bin/mtbl_dump.rs @@ -5,7 +5,7 @@ fn main() { .nth(1) .expect("Usage: mtbl_dump "); let reader = mtbl::reader::from_file(fname); - for e in reader.iter() { - println!("{:?}: {:?}", e.key(), e.value()); + for (k, v) in reader.iter() { + println!("{:?}: {:?}", k, v); } } diff --git a/src/compression.rs b/src/compression.rs index 0a44739..f811834 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -1,3 +1,4 @@ +use crate::Result; use std::{ io::{Read, Write}, ops::Deref, @@ -68,10 +69,7 @@ impl> AsRef<[u8]> for CBuf { } impl Compression { - pub(crate) fn compress>( - &self, - buf: B, - ) -> Result, Box> { + pub(crate) fn compress>(&self, buf: B) -> Result> { match self { Compression::None => Ok(CBuf::Buf(buf)), Compression::Snappy => Ok(CBuf::Vec( @@ -91,27 +89,24 @@ impl Compression { } } - pub(crate) fn uncompress>(&self, buf: B) -> Option> { + pub(crate) fn uncompress>(&self, buf: B) -> Result> { match self { - Compression::None => Some(CBuf::Buf(buf)), - Compression::Snappy => Some(CBuf::Vec( - snap::raw::Decoder::new() - .decompress_vec(buf.as_ref()) - .ok()?, + Compression::None => Ok(CBuf::Buf(buf)), + Compression::Snappy => Ok(CBuf::Vec( + snap::raw::Decoder::new().decompress_vec(buf.as_ref())?, )), Compression::Zlib(_) => { let mut v = Vec::::new(); { let mut dec = flate2::read::ZlibDecoder::new(buf.as_ref()); - dec.read_to_end(&mut v).ok()?; + dec.read_to_end(&mut v)?; } - Some(CBuf::Vec(v)) + Ok(CBuf::Vec(v)) } Compression::Zstd(_) => { - let mut dec = zstd::bulk::Decompressor::new().ok()?; - Some(CBuf::Vec( - dec.decompress(buf.as_ref(), 100 * buf.as_ref().len()) - .ok()?, + let mut dec = zstd::bulk::Decompressor::new()?; + Ok(CBuf::Vec( + dec.decompress(buf.as_ref(), 100 * buf.as_ref().len())?, )) } } diff --git a/src/cursor/filter.rs b/src/cursor/filter.rs index a9420e3..0e27bbd 100644 --- a/src/cursor/filter.rs +++ b/src/cursor/filter.rs @@ -13,11 +13,11 @@ where impl Filter where C: Cursor, + C::Value: std::fmt::Debug, F: FnMut((&[u8], &C::Value), &mut Vec) -> bool, { fn find_match(&mut self) { while let Some((k, v)) = self.cursor.get() { - self.seekto.clear(); if (self.filter)((k, v), &mut self.seekto) { break; } @@ -25,6 +25,7 @@ where self.cursor.advance(); } else { self.cursor.seek(self.seekto.as_slice()); + self.seekto.clear(); } } } @@ -41,6 +42,7 @@ where impl Cursor for Filter where C: Cursor, + C::Value: std::fmt::Debug, F: FnMut((&[u8], &C::Value), &mut Vec) -> bool, { type Value = C::Value; @@ -55,7 +57,14 @@ where } fn advance(&mut self) { - self.cursor.advance(); + if self.seekto.is_empty() { + self.cursor.advance(); + } else { + // A previous filter passed its item but left + // a seek key for the next potential match. + self.cursor.seek(self.seekto.as_slice()); + self.seekto.clear(); + } self.find_match(); } } @@ -63,7 +72,7 @@ where impl IntoIterator for Filter where C: Cursor, - C::Value: Clone, + C::Value: Clone + std::fmt::Debug, F: FnMut((&[u8], &C::Value), &mut Vec) -> bool, { type Item = (Vec, C::Value); @@ -80,7 +89,7 @@ where #[cfg(test)] mod test { - use super::{Cursor, Filter}; + use super::Cursor; struct TestSource(Vec<(Vec, T)>, Option); @@ -120,11 +129,9 @@ mod test { fn test_filter() { let range = 1u64..64; let ts = TestSource::::from(range.clone()); - let filter = Filter::new(ts, |(_, v), _| v % 3 == 0); + let fiter = ts.filter(|(_, v), _| v % 3 == 0).into_iter(); + let tv = Iterator::map(fiter, |(_, v)| v).collect::>(); let check = range.into_iter().filter(|i| i % 3 == 0).collect::>(); - assert_eq!( - filter.into_iter().map(|(_, v)| v).collect::>(), - check - ) + assert_eq!(tv, check) } } diff --git a/src/cursor/filtermap.rs b/src/cursor/filtermap.rs index 8177da0..9b0e5f6 100644 --- a/src/cursor/filtermap.rs +++ b/src/cursor/filtermap.rs @@ -18,7 +18,6 @@ where { fn find_match(&mut self) { while let Some((k, v)) = self.cursor.get() { - self.seekto.clear(); match (self.filtermap)((k, v), &mut self.seekto) { Some(val) => { self.value.replace(val); @@ -29,6 +28,7 @@ where self.cursor.advance(); } else { self.cursor.seek(self.seekto.as_slice()); + self.seekto.clear(); } } } @@ -63,7 +63,12 @@ where } fn advance(&mut self) { - self.cursor.advance(); + if self.seekto.is_empty() { + self.cursor.advance(); + } else { + self.cursor.seek(self.seekto.as_slice()); + self.seekto.clear(); + } self.find_match(); } } @@ -81,21 +86,17 @@ where if self.get().is_none() { self.advance(); } - FilterMapIter { - cursor: self, - first: true, - } + FilterMapIter { cursor: self } } } -struct FilterMapIter +pub struct FilterMapIter where C: Cursor, O: Clone, F: FnMut((&[u8], &C::Value), &mut Vec) -> Option, { cursor: FilterMap, - first: bool, } impl Iterator for FilterMapIter @@ -107,12 +108,13 @@ where type Item = (Vec, O); fn next(&mut self) -> Option { - if !self.first { + let (k, _) = self.cursor.get()?; + let k = Vec::from(k); + let res = self.cursor.value.take().map(|v| (k, v)); + if res.is_some() { self.cursor.advance(); } - self.first = false; - let (k, _) = self.cursor.get()?; - self.cursor.value.take().map(|v| (Vec::from(k), v)) + res } } diff --git a/src/cursor/map.rs b/src/cursor/map.rs index 03c0a3f..ab036b6 100644 --- a/src/cursor/map.rs +++ b/src/cursor/map.rs @@ -10,6 +10,27 @@ where value: Option, } +impl Map +where + C: Cursor, + F: FnMut((&[u8], &C::Value)) -> O, +{ + pub fn new(cursor: C, map: F) -> Self { + Self { + cursor, + map, + value: None, + } + } + + fn update(&mut self) { + self.cursor + .get() + .and_then(|(k, v)| self.value.replace((self.map)((k, v)))) + .or_else(|| self.value.take()); + } +} + impl Cursor for Map where C: Cursor, @@ -24,10 +45,12 @@ where fn advance(&mut self) { self.cursor.advance(); + self.update(); } fn seek(&mut self, key: &[u8]) { self.cursor.seek(key); + self.update(); } } @@ -42,20 +65,16 @@ where if self.get().is_none() { self.advance(); } - MapIter { - cursor: self, - first: true, - } + MapIter { cursor: self } } } -struct MapIter +pub struct MapIter where C: Cursor, F: FnMut((&[u8], &C::Value)) -> O, { cursor: Map, - first: bool, } impl Iterator for MapIter @@ -65,11 +84,12 @@ where { type Item = (Vec, O); fn next(&mut self) -> Option { - if !self.first { + let (k, _) = self.cursor.get()?; + let k = Vec::from(k); // needs to be here to drop borrowed k from self.cursor + let res = self.cursor.value.take().map(|v| (k, v)); + if res.is_some() { self.cursor.advance(); } - self.first = false; - let (k, _) = self.cursor.get()?; - self.cursor.value.take().map(|v| (Vec::from(k), v)) + res } } diff --git a/src/cursor/merge.rs b/src/cursor/merge.rs index dfa1bb4..cc4d4ee 100644 --- a/src/cursor/merge.rs +++ b/src/cursor/merge.rs @@ -1,15 +1,14 @@ use super::Cursor; use std::borrow::Borrow; -struct Merge +pub struct Merge where C: Cursor, C::Value: ToOwned, - F: FnMut(&mut ::Owned, &C::Value), + F: FnMut(&[u8], &C::Value, &mut ::Owned), { cursor: C, merge: F, - needs_advance: bool, merge_key: Vec, merge_val: Option<::Owned>, } @@ -18,13 +17,12 @@ impl Merge where C: Cursor, C::Value: ToOwned, - F: FnMut(&mut ::Owned, &C::Value), + F: FnMut(&[u8], &C::Value, &mut ::Owned), { pub fn new(cursor: C, merge: F) -> Self { Self { cursor, merge, - needs_advance: true, merge_key: Vec::new(), merge_val: None, } @@ -42,17 +40,21 @@ where .as_mut() .map(|mv| v.clone_into(mv)) .or_else(|| { - self.merge_val.insert(v.to_owned()); + self.merge_val = Some(v.to_owned()); None }); } } + self.cursor.advance(); while let Some((k, v)) = self.cursor.get() { if k != self.merge_key.as_slice() { break; } - self.merge_val.as_mut().map(|mv| (self.merge)(mv, v)); + if let Some(mv) = self.merge_val.as_mut() { + (self.merge)(k, v, mv); + } + self.cursor.advance(); } } } @@ -61,7 +63,7 @@ impl Cursor for Merge where C: Cursor, C::Value: ToOwned, - F: FnMut(&mut ::Owned, &C::Value), + F: FnMut(&[u8], &C::Value, &mut ::Owned), { type Value = C::Value; @@ -85,7 +87,7 @@ impl IntoIterator for Merge where C: Cursor, C::Value: ToOwned, - F: FnMut(&mut ::Owned, &C::Value), + F: FnMut(&[u8], &C::Value, &mut ::Owned), { type Item = (Vec, ::Owned); type IntoIter = MergeIter; @@ -93,10 +95,7 @@ where if self.get().is_none() { self.advance() } - MergeIter { - cursor: self, - first: true, - } + MergeIter { cursor: self } } } @@ -104,27 +103,27 @@ pub struct MergeIter where C: Cursor, C::Value: ToOwned, - F: FnMut(&mut ::Owned, &C::Value), + F: FnMut(&[u8], &C::Value, &mut ::Owned), { cursor: Merge, - first: bool, } impl Iterator for MergeIter where C: Cursor, C::Value: ToOwned, - F: FnMut(&mut ::Owned, &C::Value), + F: FnMut(&[u8], &C::Value, &mut ::Owned), { type Item = (Vec, ::Owned); fn next(&mut self) -> Option { - if !self.first { - self.cursor.advance(); - } - self.first = false; - self.cursor + let res = self + .cursor .merge_val .take() - .map(|mv| (self.cursor.merge_key.clone(), mv)) + .map(|mv| (self.cursor.merge_key.clone(), mv)); + if res.is_some() { + self.cursor.advance(); + } + res } } diff --git a/src/cursor/merger.rs b/src/cursor/merger.rs index 49e3080..dd9d27c 100644 --- a/src/cursor/merger.rs +++ b/src/cursor/merger.rs @@ -8,6 +8,19 @@ pub struct MergeCursor { last: Vec, } +impl From for MergeCursor +where + I: IntoIterator, +{ + fn from(iter: I) -> Self { + Self { + pending: Vec::from_iter(iter), + active: BinaryHeap::new(), + last: Vec::new(), + } + } +} +/* impl MergeCursor> { fn from(iter: I) -> Self where @@ -36,12 +49,12 @@ where } } } - +*/ impl Cursor for MergeCursor { type Value = C::Value; fn get(&self) -> Option<(&[u8], &Self::Value)> { - self.active.peek().map(|Reverse(c)| c.get()).flatten() + self.active.peek().and_then(|Reverse(c)| c.get()) } fn advance(&mut self) { @@ -54,8 +67,10 @@ impl Cursor for MergeCursor { .peek() .map(|Reverse(c)| c.get().map(|(k, _)| k.clone_into(&mut self.last))); } else if let Some(mut rcur) = self.active.peek_mut() { - rcur.0.get().map(|(k, _)| k.clone_into(&mut self.last)); - rcur.0.advance(); + if let Some((k, _)) = rcur.0.get() { + k.clone_into(&mut self.last); + rcur.0.advance(); + } // XXX cannot remove a cursor when `get()` return None after an advance. // Finished cursors will remain at the bottom of the heap. } @@ -91,24 +106,17 @@ impl Cursor for MergeCursor { impl IntoIterator for MergeCursor where C: Cursor + Ord, - C::Value: Clone, + C::Value: ToOwned, { - type Item = (Vec, C::Value); + type Item = (Vec, ::Owned); type IntoIter = KVIter; fn into_iter(self) -> Self::IntoIter { Cursor::to_iter(self) } } -fn test_merger_construction(i: impl Iterator) -where - C::Value: Ord, -{ - let _ = MergeCursor::>::from(i); -} - -pub struct KeyOrd(C); -pub struct KeyValOrd(C); +pub struct KeyOrd(pub C); +pub struct KeyValOrd(pub C); // Ordering implementation for KeyOrd, KeyValOrd impl Cursor for KeyOrd { @@ -164,9 +172,7 @@ impl Eq for KeyValOrd where C::Value: Ord {} impl PartialOrd for KeyOrd { fn partial_cmp(&self, other: &Self) -> Option { - let ks = self.0.get().map(|(k, _)| k); - let ko = other.0.get().map(|(k, _)| k); - Some(ks.cmp(&ko)) + Some(self.cmp(other)) } } @@ -175,9 +181,7 @@ where C::Value: Ord, { fn partial_cmp(&self, other: &Self) -> Option { - let sp = self.0.get(); - let op = other.0.get(); - Some(sp.cmp(&op)) + Some(self.cmp(other)) } } diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index 756d668..b09ff6b 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -6,7 +6,7 @@ pub mod merger; pub mod range; pub trait Cursor { - type Value; + type Value: ?Sized; // required methods fn get(&self) -> Option<(&[u8], &Self::Value)>; @@ -17,22 +17,31 @@ pub trait Cursor { fn to_iter(mut self) -> KVIter where Self: Sized, - Self::Value: Clone, + Self::Value: ToOwned, { if self.get().is_none() { self.advance(); } - KVIter(self, true) + KVIter { cursor: self } } fn filter(self, filter: F) -> filter::Filter where Self: Sized, + Self::Value: std::fmt::Debug, F: FnMut((&[u8], &Self::Value), &mut Vec) -> bool, { filter::Filter::new(self, filter) } + fn map(self, map: F) -> map::Map + where + Self: Sized, + F: FnMut((&[u8], &Self::Value)) -> O, + { + map::Map::new(self, map) + } + fn filtermap(self, filtermap: F) -> filtermap::FilterMap where Self: Sized, @@ -40,23 +49,34 @@ pub trait Cursor { { filtermap::FilterMap::new(self, filtermap) } + + fn dup_merge(self, merge: F) -> merge::Merge + where + Self: Sized, + Self::Value: ToOwned, + F: FnMut(&[u8], &Self::Value, &mut ::Owned), + { + merge::Merge::new(self, merge) + } } -pub struct KVIter(T, bool); +pub struct KVIter { + cursor: C, +} impl Iterator for KVIter where C: Cursor, - C::Value: Clone, + C::Value: ToOwned, { - type Item = (Vec, C::Value); + type Item = (Vec, ::Owned); fn next(&mut self) -> Option { - let first = self.1; - if !first { - self.0.advance(); + let ires = self.cursor.get(); + let res = ires.map(|(k, v)| (Vec::from(k), v.to_owned())); + if res.is_some() { + self.cursor.advance(); } - self.1 = false; - self.0.get().map(|(k, v)| (Vec::from(k), v.clone())) + res } } @@ -64,19 +84,19 @@ impl Cursor for KVIter { type Value = C::Value; fn get(&self) -> Option<(&[u8], &Self::Value)> { - self.0.get() + self.cursor.get() } fn advance(&mut self) { - self.0.advance(); + self.cursor.advance(); } fn seek(&mut self, key: &[u8]) { - self.0.seek(key); + self.cursor.seek(key); } } -impl Cursor for Box { +impl Cursor for Box { type Value = C::Value; fn get(&self) -> Option<(&[u8], &Self::Value)> { @@ -91,3 +111,99 @@ impl Cursor for Box { self.as_mut().seek(key); } } + +pub(crate) struct VecCursor { + data: Vec<(Vec, Vec)>, + idx: Option, +} + +impl From, Vec)>> for VecCursor { + fn from(data: Vec<(Vec, Vec)>) -> Self { + Self { data, idx: None } + } +} + +impl Cursor for VecCursor { + type Value = [u8]; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + self.idx.as_ref().map(|idx| { + let pair = &self.data[*idx]; + (pair.0.as_slice(), pair.1.as_slice()) + }) + } + + fn advance(&mut self) { + match self.idx.take() { + Some(idx) => self.idx.replace(idx + 1), + None => self.idx.replace(0), + }; + } + + fn seek(&mut self, key: &[u8]) { + let idx = self + .data + .binary_search_by_key(&Vec::from(key), |(k, _)| k.clone()) + .unwrap_or(self.data.len()); + self.idx.replace(idx); + } +} + +impl IntoIterator for VecCursor { + type Item = (Vec, Vec); + type IntoIter = KVIter; + fn into_iter(self) -> Self::IntoIter { + self.to_iter() + } +} + +struct RangeCursor(Option<(Vec, usize)>, usize); + +impl Cursor for RangeCursor { + type Value = usize; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + self.0.as_ref().map(|(k, v)| (k.as_slice(), v)) + } + + fn advance(&mut self) { + match self.0.take() { + Some((_, mut v)) => { + if v < self.1 { + v += 1; + self.0.replace((v.to_be_bytes().into(), v)); + } + } + None => { + self.0.replace((0usize.to_be_bytes().into(), 0)); + } + } + } + + fn seek(&mut self, key: &[u8]) { + let idx = usize::from_be_bytes(key.try_into().unwrap_or([255; (usize::BITS / 8) as usize])); + if idx < self.1 { + self.0.replace((idx.to_be_bytes().into(), idx)); + } else { + self.0.take(); + } + } +} + +impl IntoIterator for RangeCursor { + type Item = (Vec, usize); + type IntoIter = KVIter; + fn into_iter(self) -> Self::IntoIter { + self.to_iter() + } +} + +#[test] +fn test_range_cursor() { + let rc = RangeCursor(None, 10); + let rcv = rc.to_iter().collect::>(); + let check = (0usize..11) + .map(|i| (Vec::from(i.to_be_bytes()), i)) + .collect::>(); + assert_eq!(rcv, check) +} diff --git a/src/cursor/range.rs b/src/cursor/range.rs index ecb8273..575538f 100644 --- a/src/cursor/range.rs +++ b/src/cursor/range.rs @@ -1,16 +1,24 @@ +use crate::cursor::KVIter; + use super::Cursor; pub struct RangeCursor { cursor: C, begin: Vec, end: Vec, + first: bool, } impl RangeCursor { pub fn new(cursor: C, begin: &[u8], end: &[u8]) -> Self { - let begin = Vec::from(begin.as_ref()); + let begin = Vec::from(begin); let end = Vec::from(end); - Self { cursor, begin, end } + Self { + cursor, + begin, + end, + first: true, + } } } @@ -24,6 +32,11 @@ impl Cursor for RangeCursor { } fn advance(&mut self) { + if self.first && self.cursor.get().is_none() { + self.first = false; + self.cursor.seek(self.begin.as_slice()); + return; + } self.cursor.advance(); } @@ -32,15 +45,31 @@ impl Cursor for RangeCursor { } } +impl IntoIterator for RangeCursor +where + C::Value: ToOwned, +{ + type Item = (Vec, ::Owned); + type IntoIter = KVIter; + fn into_iter(self) -> Self::IntoIter { + Cursor::to_iter(self) + } +} + pub struct PrefixCursor { cursor: C, prefix: Vec, + first: bool, } impl PrefixCursor { pub fn new(cursor: C, prefix: &[u8]) -> Self { let prefix = Vec::from(prefix); - Self { cursor, prefix } + Self { + cursor, + prefix, + first: true, + } } } @@ -54,6 +83,11 @@ impl Cursor for PrefixCursor { } fn advance(&mut self) { + if self.first && self.cursor.get().is_none() { + self.first = false; + self.cursor.seek(self.prefix.as_slice()); + return; + } self.cursor.advance(); } @@ -61,3 +95,14 @@ impl Cursor for PrefixCursor { self.cursor.seek(key); } } + +impl IntoIterator for PrefixCursor +where + C::Value: ToOwned, +{ + type Item = (Vec, ::Owned); + type IntoIter = KVIter; + fn into_iter(self) -> Self::IntoIter { + Cursor::to_iter(self) + } +} diff --git a/src/entry.rs b/src/entry.rs deleted file mode 100644 index 8d990b8..0000000 --- a/src/entry.rs +++ /dev/null @@ -1,87 +0,0 @@ -use crate::iter::merger::Mergeable; -use std::cmp::Ordering; -use std::sync::Arc; - -#[derive(Debug, Clone)] -pub struct Entry { - key: Arc>, - value: Arc>, -} - -impl Entry { - pub fn new(key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self { - Entry { - key: Arc::new(Vec::from(key.as_ref())), - value: Arc::new(Vec::from(value.as_ref())), - } - } - - pub fn replace(&mut self, e: &Entry) { - let key = Arc::make_mut(&mut self.key); - let value = Arc::make_mut(&mut self.value); - key.clear(); - key.extend_from_slice(e.key.as_slice()); - value.clear(); - value.extend_from_slice(e.value.as_slice()); - } - - pub fn clear(&mut self) { - Arc::make_mut(&mut self.key).clear(); - Arc::make_mut(&mut self.value).clear(); - } - - pub fn key(&self) -> &[u8] { - self.key.as_slice() - } - - pub fn key_mut(&mut self) -> &mut Vec { - Arc::make_mut(&mut self.key) - } - - pub fn value(&self) -> &[u8] { - self.value.as_slice() - } - - pub fn value_mut(&mut self) -> &mut Vec { - Arc::make_mut(&mut self.value) - } -} - -impl Mergeable for Entry {} - -impl PartialOrd<[u8]> for Entry { - fn partial_cmp(&self, other: &[u8]) -> Option { - Some(self.key().cmp(other)) - } -} - -impl PartialEq<[u8]> for Entry { - fn eq(&self, other: &[u8]) -> bool { - self.key() == other - } -} - -impl AsRef<[u8]> for Entry { - fn as_ref(&self) -> &[u8] { - self.key() - } -} - -impl PartialEq for Entry { - fn eq(&self, other: &Entry) -> bool { - self.key() == other.key() - } -} -impl Eq for Entry {} - -impl PartialOrd for Entry { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.key().cmp(other.key())) - } -} - -impl Ord for Entry { - fn cmp(&self, other: &Self) -> Ordering { - self.key().cmp(other.key()) - } -} diff --git a/src/fileset.rs b/src/fileset.rs index 6e2ec74..5d3b9a4 100644 --- a/src/fileset.rs +++ b/src/fileset.rs @@ -1,8 +1,8 @@ #![allow(dead_code)] //use memmap::Mmap; -use crate::merger::Merger; use crate::reader::FileReader; +use crate::source::Merger; use crate::Result; use std::collections::HashMap; use std::path::{Path, PathBuf}; diff --git a/src/iter/dupmerge.rs b/src/iter/dupmerge.rs deleted file mode 100644 index 2508a6a..0000000 --- a/src/iter/dupmerge.rs +++ /dev/null @@ -1,93 +0,0 @@ -use crate::iter::{IntoIter, Iter}; - -pub struct MergeIter -where - I: Iter, - I::Item: PartialEq, - F: FnMut(&mut I::Item, &I::Item), -{ - prev: Option, - iter: I, - merge_func: F, -} - -impl MergeIter -where - I: Iter, - I::Item: PartialEq, - F: FnMut(&mut I::Item, &I::Item), -{ - pub fn new(iter: I, merge_func: F) -> Self { - Self { - prev: None, - iter, - merge_func, - } - } -} - -impl Iter for MergeIter -where - I: Iter, - I::Item: PartialEq, - F: FnMut(&mut I::Item, &I::Item), -{ - type Item = I::Item; - - fn next(&mut self) -> Option { - let mut cur = self.prev.take().or_else(|| self.iter.next())?; - while let Some(e) = self.iter.next() { - if cur == e { - (self.merge_func)(&mut cur, &e); - } else { - self.prev.replace(e); - break; - } - } - Some(cur) - } - - fn seek(&mut self, key: &[u8]) { - self.prev.take(); - self.iter.seek(key); - } -} - -impl IntoIterator for MergeIter -where - I: Iter, - I::Item: PartialEq, - F: FnMut(&mut I::Item, &I::Item), -{ - type Item = I::Item; - type IntoIter = IntoIter; - - fn into_iter(self) -> Self::IntoIter { - IntoIter(self) - } -} - -#[test] -fn test_merge_func() { - use crate::source::test_source::TestSource; - use crate::Entry; - use crate::Source; - - let ts = TestSource( - (1u8..8) - .flat_map(|n| vec![n; n as usize]) - .map(|n| Entry::new(vec![n], vec![1])) - .collect(), - ); - - let s = ts.dup_merge(|v, e| { - v.value_mut()[0] += e.value()[0]; - }); - - assert_eq!( - Vec::from_iter(s.iter()), - (1u8..8) - .map(|n| Entry::new(vec![n], vec![n])) - .collect::>() - ) -} diff --git a/src/iter/dupsort.rs b/src/iter/dupsort.rs deleted file mode 100644 index 07e067f..0000000 --- a/src/iter/dupsort.rs +++ /dev/null @@ -1,99 +0,0 @@ -use crate::iter::{IntoIter, Iter}; -use std::cmp::Ordering; - -#[derive(Debug)] -pub struct DupsortIter -where - I: Iter, - F: FnMut(&I::Item, &I::Item) -> Ordering, -{ - run: Vec, - next: Option, - iter: I, - dupsort_func: F, -} - -impl DupsortIter -where - I: Iter, - F: FnMut(&I::Item, &I::Item) -> Ordering, -{ - pub fn new(iter: I, dupsort_func: F) -> Self { - Self { - run: Vec::new(), - next: None, - iter, - dupsort_func, - } - } -} - -impl IntoIterator for DupsortIter -where - I: Iter, - I::Item: PartialEq, - F: Fn(&I::Item, &I::Item) -> Ordering, -{ - type Item = I::Item; - type IntoIter = IntoIter; - fn into_iter(self) -> Self::IntoIter { - IntoIter(self) - } -} - -impl Iter for DupsortIter -where - I: Iter, - I::Item: PartialEq, - F: Fn(&I::Item, &I::Item) -> Ordering, -{ - type Item = I::Item; - - fn seek(&mut self, key: &[u8]) { - self.run.clear(); - self.next.take(); - self.iter.seek(key); - } - - fn next(&mut self) -> Option { - self.run.pop().or_else(|| { - self.run - .push(self.next.take().or_else(|| self.iter.next())?); - - while let Some(e) = self.iter.next() { - if e == self.run[0] { - self.run.push(e); - continue; - } - self.next.replace(e); - break; - } - // sort in reverse order, so self.run.pop() can be the usual - // return value. - self.run.sort_unstable_by(|a, b| (self.dupsort_func)(b, a)); - self.run.pop() - }) - } -} - -#[test] -fn test_dupsort() { - use crate::source::test_source::TestSource; - use crate::Entry; - use crate::Source; - - let ts = TestSource( - (1u8..10) - .flat_map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j]))) - .collect(), - ); - - let s = ts.dupsort(|a, b| a.value()[0].cmp(&b.value()[0])); - - assert_eq!( - Vec::from_iter(s.iter()), - (1u8..10) - .flat_map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j]))) - .collect::>() - ); -} diff --git a/src/iter/filter.rs b/src/iter/filter.rs deleted file mode 100644 index 4ce3bf6..0000000 --- a/src/iter/filter.rs +++ /dev/null @@ -1,88 +0,0 @@ -use crate::iter::{IntoIter, Iter}; - -pub struct FilterIter { - iter: I, - filter: F, - seek_key: Vec, -} - -impl FilterIter -where - I: Iter, - F: FnMut(&I::Item, &mut Vec) -> bool, -{ - pub fn new(iter: I, filter: F) -> Self { - let seek_key = Vec::new(); - Self { - iter, - filter, - seek_key, - } - } -} - -impl Iter for FilterIter -where - I: Iter, - F: FnMut(&I::Item, &mut Vec) -> bool, -{ - type Item = I::Item; - - fn next(&mut self) -> Option { - loop { - let item = self.iter.next()?; - self.seek_key.clear(); - if (self.filter)(&item, &mut self.seek_key) { - return Some(item); - } - if !self.seek_key.is_empty() { - self.iter.seek(self.seek_key.as_slice()); - } - } - } - - fn seek(&mut self, key: &[u8]) { - self.iter.seek(key); - } -} - -impl IntoIterator for FilterIter -where - I: Iter, - F: FnMut(&I::Item, &mut Vec) -> bool, -{ - type Item = I::Item; - type IntoIter = IntoIter; - fn into_iter(self) -> Self::IntoIter { - IntoIter(self) - } -} - -#[test] -fn test_filter() { - use crate::source::test_source::TestSource; - use crate::source::Source; - use crate::Entry; - use std::iter::IntoIterator; - - let ts = - TestSource((0u8..10).map(|n| Entry::new(vec![n], vec![])).collect()).filter(|e, sv| { - // pass only even values - if e.key()[0] % 2 != 0 { - return false; - } else if e.key()[0] == 4 { - // at key 4, seek to 8 - sv.push(8); - return false; - } - true - }); - - assert_eq!( - vec![0, 2, 8], - ts.map(|e| e.key()[0]) - .iter() - .into_iter() - .collect::>() - ); -} diff --git a/src/iter/map.rs b/src/iter/map.rs deleted file mode 100644 index e2d336e..0000000 --- a/src/iter/map.rs +++ /dev/null @@ -1,90 +0,0 @@ -use crate::iter::{IntoIter, Iter}; - -pub struct MapIter -where - I: Iter, - F: FnMut(I::Item) -> O, -{ - iter: I, - mapf: F, -} - -impl MapIter -where - I: Iter, - F: FnMut(I::Item) -> O, -{ - pub fn new(iter: I, mapf: F) -> Self { - Self { iter, mapf } - } -} - -impl Iter for MapIter -where - I: Iter, - F: FnMut(I::Item) -> O, -{ - type Item = O; - - fn next(&mut self) -> Option { - let item = self.iter.next()?; - Some((self.mapf)(item)) - } - - fn seek(&mut self, key: &[u8]) { - self.iter.seek(key); - } -} - -impl IntoIterator for MapIter -where - I: Iter, - F: FnMut(I::Item) -> O, -{ - type Item = O; - type IntoIter = IntoIter; - - fn into_iter(self) -> Self::IntoIter { - IntoIter(self) - } -} - -#[cfg(test)] -mod test { - use crate::source::test_source::TestSource; - use crate::{Entry, Iter, Source}; - - struct CompVec(Vec); - impl PartialEq<[u8]> for CompVec { - fn eq(&self, other: &[u8]) -> bool { - self.0.as_slice().eq(other) - } - } - impl PartialOrd<[u8]> for CompVec { - fn partial_cmp(&self, other: &[u8]) -> Option { - self.0.as_slice().partial_cmp(other) - } - } - - #[test] - fn test_mapsource() { - let ts = TestSource((0u8..10).map(|i| Entry::new(vec![i], vec![])).collect()) - .map(|e| CompVec(Vec::from(e.key()))); - assert_eq!( - ts.iter().map(|cv| cv.0).into_iter().collect::>(), - (0u8..10).map(|i| vec![i]).collect::>() - ); - assert_eq!( - ts.get_range(&[2u8], &[7u8]) - .map(|cv| cv.0) - .into_iter() - .collect::>(), - (2u8..8) - .map(|i| { - println!("{i}"); - vec![i] - }) - .collect::>() - ) - } -} diff --git a/src/iter/merger.rs b/src/iter/merger.rs deleted file mode 100644 index aa8d4d9..0000000 --- a/src/iter/merger.rs +++ /dev/null @@ -1,192 +0,0 @@ -use crate::iter::{IntoIter, Iter}; -use std::cmp::Ordering; -use std::collections::BinaryHeap; - -pub trait Mergeable: Ord + PartialOrd<[u8]> + AsRef<[u8]> + Clone {} - -struct MergeEntry { - e: I::Item, - it: I, -} - -impl PartialEq for MergeEntry -where - I::Item: PartialEq, -{ - fn eq(&self, other: &Self) -> bool { - self.e == other.e - } -} - -impl Eq for MergeEntry where I::Item: PartialEq {} - -// Note: MergEntry Ord, PartialOrd is reversed to provide "min heap" semantics -impl PartialOrd for MergeEntry -where - I::Item: PartialOrd, -{ - fn partial_cmp(&self, other: &Self) -> Option { - other.e.partial_cmp(&self.e) - } -} - -impl Ord for MergeEntry -where - I::Item: Ord, -{ - fn cmp(&self, other: &Self) -> Ordering { - other.e.cmp(&self.e) - } -} - -pub struct MergeIter { - heap: BinaryHeap>, - finished: Vec, - last_key: Vec, -} - -impl From for MergeIter -where - I: Iterator, - I::Item: Iter, - ::Item: Mergeable, -{ - fn from(iters: I) -> Self { - let mut v: Vec = Vec::new(); - let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() { - Some(e) => Some(MergeEntry { e, it }), - None => { - v.push(it); - None - } - })); - MergeIter { - finished: v, - heap: h, - last_key: Vec::new(), - } - } -} - -impl Iter for MergeIter -where - I::Item: Mergeable, -{ - type Item = I::Item; - - fn next(&mut self) -> Option { - let cur; - { - let mut next = self.heap.peek_mut()?; - - cur = next.e.clone(); - self.last_key.clear(); - self.last_key.extend(cur.as_ref()); - - if let Some(e) = next.it.next() { - next.e = e; - return Some(cur); - } - } - self.heap.pop(); - Some(cur) - } - - fn seek(&mut self, key: &[u8]) { - if key >= self.last_key.as_ref() { - loop { - match self.heap.peek_mut() { - None => return, - Some(mut head) => { - if &head.e >= key { - return; - } - head.it.seek(key); - if let Some(e) = head.it.next() { - head.e = e; - continue; - } - } - } - self.heap.pop(); - } - } - - // backwards seek; reset heap - let mut finished: Vec = Vec::new(); - let mut heap_entries: Vec> = Vec::new(); - for mut it in self - .heap - .drain() - .map(|me| me.it) - .chain(self.finished.drain(..)) - { - it.seek(key); - match it.next() { - Some(e) => heap_entries.push(MergeEntry { e, it }), - None => finished.push(it), - } - } - self.heap = BinaryHeap::from_iter(heap_entries); - self.finished = finished; - self.last_key.clear(); - self.last_key.extend(key); - } -} - -impl IntoIterator for MergeIter -where - I::Item: Mergeable, -{ - type Item = I::Item; - type IntoIter = IntoIter; - fn into_iter(self) -> Self::IntoIter { - IntoIter(self) - } -} - -#[cfg(test)] -mod test { - use crate::source::test_source::TestSource; - use crate::Entry; - use crate::Merger; - use crate::{Iter, Source}; - - fn tnum(m: u8) -> Vec { - Vec::from_iter((1u8..255).filter(|i| i % m == 0)) - } - - fn test_source(m: u8) -> TestSource { - TestSource(Vec::from_iter( - tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])), - )) - } - - #[test] - fn test_merge() { - let range = 1..8; - let iters: Vec<_> = range - .clone() - .map(test_source) - .map(|s| s.into_boxed()) - .collect(); - let s = Merger::from(iters); - let mut v = Vec::::new(); - for i in range { - v.extend(tnum(i)) - } - v.sort(); - let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0])); - assert_eq!(v2, v); - } - - #[test] - fn test_binheap() { - use std::collections::BinaryHeap; - - let v: Vec = vec![1, 8, 2, 9, 4, 7, 3]; - let vs = v.as_slice(); - let h = BinaryHeap::from_iter(vs.iter().copied()); - assert_ne!(h.into_vec(), v); - } -} diff --git a/src/iter/mod.rs b/src/iter/mod.rs deleted file mode 100644 index af0338a..0000000 --- a/src/iter/mod.rs +++ /dev/null @@ -1,105 +0,0 @@ -use std::cmp::Ordering; -pub mod dupmerge; -pub mod dupsort; -pub mod filter; -pub mod map; -pub mod merger; -pub mod prefix; -pub mod range; - -use dupmerge::MergeIter; -use dupsort::DupsortIter; -use filter::FilterIter; -use map::MapIter; - -pub trait Iter { - type Item; - - fn seek(&mut self, key: &[u8]); - - fn next(&mut self) -> Option; - - fn seek_to(mut self, key: &[u8]) -> Self - where - Self: Sized, - { - self.seek(key); - self - } - - fn dup_merge(self, merge_func: F) -> MergeIter - where - Self::Item: PartialEq + Clone, - F: FnMut(&mut Self::Item, &Self::Item), - Self: Sized, - { - MergeIter::new(self, merge_func) - } - - fn dup_sort(self, dupsort_func: F) -> DupsortIter - where - Self: Sized, - F: FnMut(&Self::Item, &Self::Item) -> Ordering, - { - DupsortIter::new(self, dupsort_func) - } - - fn filter(self, filter_func: F) -> FilterIter - where - F: FnMut(&Self::Item, &mut Vec) -> bool, - Self: Sized, - { - FilterIter::new(self, filter_func) - } - - fn map(self, map_func: F) -> MapIter - where - F: FnMut(Self::Item) -> O, - Self: Sized, - { - MapIter::new(self, map_func) - } -} - -impl Iter for Box { - type Item = I::Item; - - fn next(&mut self) -> Option { - self.as_mut().next() - } - - fn seek(&mut self, key: &[u8]) { - self.as_mut().seek(key); - } -} - -pub struct BoxedIter<'a, T>(pub Box + 'a>); -impl<'a, T> Iter for BoxedIter<'a, T> { - type Item = T; - fn next(&mut self) -> Option { - self.0.as_mut().next() - } - fn seek(&mut self, key: &[u8]) { - self.0.as_mut().seek(key) - } -} - -impl<'a, T> IntoIterator for BoxedIter<'a, T> { - type Item = T; - type IntoIter = IntoIter; - fn into_iter(self) -> Self::IntoIter { - IntoIter(self) - } -} - -pub struct IntoIter(pub I); - -impl Iterator for IntoIter -where - I: Iter, -{ - type Item = I::Item; - fn next(&mut self) -> Option { - self.0.next() - } -} diff --git a/src/iter/prefix.rs b/src/iter/prefix.rs deleted file mode 100644 index 8deb321..0000000 --- a/src/iter/prefix.rs +++ /dev/null @@ -1,55 +0,0 @@ -use crate::iter::{IntoIter, Iter}; - -pub struct PrefixIter -where - I: Iter, - I::Item: AsRef<[u8]>, -{ - iter: I, - prefix: Vec, -} - -impl PrefixIter -where - I: Iter, - I::Item: AsRef<[u8]>, -{ - pub fn new(mut iter: I, prefix: impl AsRef<[u8]>) -> Self { - iter.seek(prefix.as_ref()); - Self { - iter, - prefix: Vec::from(prefix.as_ref()), - } - } -} - -impl Iter for PrefixIter -where - I: Iter, - I::Item: AsRef<[u8]>, -{ - type Item = I::Item; - - fn next(&mut self) -> Option { - self.iter - .next() - .filter(|e| e.as_ref().starts_with(self.prefix.as_slice())) - } - - fn seek(&mut self, key: &[u8]) { - self.iter.seek(key); - } -} - -impl IntoIterator for PrefixIter -where - I: Iter, - I::Item: AsRef<[u8]>, -{ - type Item = I::Item; - type IntoIter = IntoIter; - - fn into_iter(self) -> Self::IntoIter { - IntoIter(self) - } -} diff --git a/src/iter/range.rs b/src/iter/range.rs deleted file mode 100644 index 00d559f..0000000 --- a/src/iter/range.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crate::iter::{IntoIter, Iter}; - -pub struct RangeIter { - iter: I, - start: Vec, - end: Vec, -} - -impl RangeIter -where - I: Iter, -{ - pub fn new(mut iter: I, start: impl AsRef<[u8]>, end: impl AsRef<[u8]>) -> Self { - iter.seek(start.as_ref()); - Self { - iter, - start: Vec::from(start.as_ref()), - end: Vec::from(end.as_ref()), - } - } -} - -impl Iter for RangeIter -where - I: Iter, - I::Item: PartialOrd<[u8]>, -{ - type Item = I::Item; - fn next(&mut self) -> Option { - self.iter.next().filter(|i| i <= self.end.as_slice()) - } - - fn seek(&mut self, key: &[u8]) { - if key <= self.start.as_slice() { - self.iter.seek(self.start.as_slice()); - } else if key > self.end.as_slice() { - self.iter.seek(self.end.as_slice()); - } else { - self.iter.seek(key); - } - } -} - -impl IntoIterator for RangeIter -where - I: Iter, - I::Item: PartialOrd<[u8]>, -{ - type Item = I::Item; - type IntoIter = IntoIter; - fn into_iter(self) -> Self::IntoIter { - IntoIter(self) - } -} diff --git a/src/lib.rs b/src/lib.rs index b093d52..fdde721 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,24 +1,20 @@ mod compression; mod cursor; -mod entry; -mod iter; -mod merger; +mod fileset; +mod metadata; pub mod reader; pub mod sorter; pub mod source; mod writer; pub use compression::Compression; -pub use entry::Entry; +pub use cursor::Cursor; pub use fileset::Fileset; -pub use iter::Iter; -pub use merger::Merger; pub use reader::Reader; +pub use sorter::Sorter; +pub use source::Merger; pub use source::Source; pub use writer::Writer; -mod fileset; -mod metadata; - type Error = Box; type Result = std::result::Result; diff --git a/src/merger.rs b/src/merger.rs deleted file mode 100644 index 4668b36..0000000 --- a/src/merger.rs +++ /dev/null @@ -1,99 +0,0 @@ -use crate::iter::{ - merger::{MergeIter, Mergeable}, - Iter, -}; -use crate::Source; - -pub struct Merger { - sources: Vec, -} - -impl Default for Merger { - fn default() -> Self { - Self { - sources: Vec::new(), - } - } -} - -impl Merger { - pub fn new() -> Self { - Self { - sources: Vec::new(), - } - } - - pub fn add(&mut self, source: S) { - self.sources.push(source); - } -} - -impl From for Merger -where - I: IntoIterator, - S: Source, - S::Item: Mergeable, -{ - fn from(i: I) -> Self { - Merger { - sources: Vec::from_iter(i), - } - } -} - -impl Source for Merger -where - S: Source, - S::Item: Mergeable, -{ - type Item = S::Item; - fn iter(&self) -> impl Iter + IntoIterator { - MergeIter::from(self.sources.iter().map(|s| s.iter())) - } -} - -#[cfg(test)] -mod test { - use super::Merger; - use crate::source::test_source::TestSource; - use crate::Entry; - use crate::{Iter, Source}; - - fn tnum(m: u8) -> Vec { - Vec::from_iter((1u8..255).filter(|i| i % m == 0)) - } - - fn test_source(m: u8) -> TestSource { - TestSource(Vec::from_iter( - tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])), - )) - } - - #[test] - fn test_merge() { - let range = 1..8; - let iters: Vec<_> = range - .clone() - .map(test_source) - .map(|s| s.into_boxed()) - .collect(); - let s = Merger::from(iters); - let mut v = Vec::::new(); - for i in range { - v.extend(tnum(i)) - } - v.sort(); - let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0])); - assert_eq!(v2, v); - } - - #[test] - fn test_binheap() { - use std::collections::BinaryHeap; - - let v: Vec = vec![1, 8, 2, 9, 4, 7, 3]; - let vs = v.as_slice(); - let h = BinaryHeap::from_iter(vs.iter().copied()); - assert_ne!(h.into_vec(), v); - } -} diff --git a/src/reader/block.rs b/src/reader/block.rs index e152af2..9f4c044 100644 --- a/src/reader/block.rs +++ b/src/reader/block.rs @@ -1,7 +1,8 @@ -use crate::iter::{IntoIter, Iter}; -use crate::{Entry, Result}; +use crate::cursor::Cursor; +use crate::Result; use integer_encoding::VarInt; use std::mem::size_of; +use std::sync::Arc; #[derive(Debug)] enum RestartType { @@ -81,53 +82,19 @@ impl> Block { } } -impl> From> for BlockIter { - fn from(b: Block) -> BlockIter { - BlockIter { - block: b, - cur_ent: None, - off: 0, - } - } -} - -impl> IntoIterator for Block { - type Item = Entry; - type IntoIter = IntoIter>; - - fn into_iter(self) -> Self::IntoIter { - IntoIter(BlockIter { - block: self, - cur_ent: None, - off: 0, - }) - } -} - #[derive(Debug)] -pub(crate) struct BlockIter> { +pub(crate) struct BlockCursor> { block: Block, - cur_ent: Option, + cur_key: Option>>, pub(super) off: usize, + pub(super) len_val: usize, } -fn get_bytes(b: &[u8], n: usize) -> Result<&[u8]> { - if n > b.len() { - return Err("too long".into()); - } - Ok(&b[0..n]) -} - -impl> BlockIter { - fn seek_restart(&mut self, ridx: usize) -> Option<&[u8]> { - self.off = self.block.restart(ridx).ok()?; - self.decode_restart_key() - } - +impl> BlockCursor { fn bsearch_restart(&mut self, key: &[u8], mut left: usize, mut right: usize) { while left < right { let mid = (left + right).div_ceil(2); - self.seek_restart(mid) + self.get_restart_key(mid) .map(|rk| { if rk < key { left = mid; @@ -140,64 +107,89 @@ impl> BlockIter { None }); } - self.seek_restart(left); + if let Ok(off) = self.block.restart(left) { + self.off = off; + self.decode(); + } } - fn decode_restart_key(&self) -> Option<&[u8]> { - let mut idx = self.off; + // returns key at restart `ridx` and its offset, or None on error. + fn get_restart_key(&self, ridx: usize) -> Option<&[u8]> { let data = self.block.data.as_ref(); + let mut idx = self.block.restart(ridx).ok()?; let (shared_key, len) = usize::decode_var(&data[idx..])?; debug_assert!(shared_key == 0); idx += len; let (unshared_key, len) = usize::decode_var(&data[idx..])?; idx += len; + debug_assert!(unshared_key > 0); let (_len_val, len) = usize::decode_var(&data[idx..])?; idx += len; Some(&data[idx..idx + unshared_key]) } - fn decode(&mut self) -> Option<&Entry> { - let mut idx = self.off; - if idx >= self.block.restart_off { - self.cur_ent.take(); + fn decode(&mut self) -> Option<&Arc>> { + if self.off >= self.block.restart_off { return None; } let data = self.block.data.as_ref(); - let entry = self.cur_ent.get_or_insert(Entry::new([], [])); - - let (shared_key, len) = usize::decode_var(&data[idx..])?; - idx += len; - let (unshared_key, len) = usize::decode_var(&data[idx..])?; - idx += len; - let (len_val, len) = usize::decode_var(&data[idx..])?; - idx += len; - if shared_key > entry.key().len() { - // return Err("shared_key too long".into()); - return None; + let (shared_key, len) = usize::decode_var(data.get(self.off..)?)?; + self.off += len; + let (unshared_key, len) = usize::decode_var(data.get(self.off..)?)?; + self.off += len; + let (len_val, len) = usize::decode_var(data.get(self.off..)?)?; + self.off += len; + debug_assert!(shared_key + unshared_key > 0); + + match self.cur_key { + None => { + if shared_key != 0 { + return None; + } + self.cur_key.replace(Arc::new(Vec::from( + data.get(self.off..self.off + unshared_key)?, + ))); + } + Some(ref mut ak) => { + let key = Arc::make_mut(ak); + key.truncate(shared_key); + key.extend_from_slice(data.get(self.off..self.off + unshared_key)?); + } } + self.off += unshared_key + len_val; + self.len_val = len_val; + self.cur_key.as_ref() + } +} - let key = entry.key_mut(); - key.truncate(shared_key); - key.extend_from_slice(get_bytes(&data[idx..], unshared_key).ok()?); - idx += unshared_key; - - let val = entry.value_mut(); - val.clear(); - val.extend_from_slice(get_bytes(&data[idx..], len_val).ok()?); - idx += len_val; - self.off = idx; - self.cur_ent.as_ref() +impl> From> for BlockCursor { + fn from(block: Block) -> Self { + Self { + block, + cur_key: None, + off: 0, + len_val: 0, + } } } -impl> Iter for BlockIter { - type Item = Entry; +impl> Cursor for BlockCursor { + type Value = [u8]; - fn next(&mut self) -> Option { - self.decode().cloned() + fn get(&self) -> Option<(&[u8], &Self::Value)> { + let key = self.cur_key.as_ref()?.as_slice(); + let data = self.block.data.as_ref(); + let val = data.get(self.off - self.len_val..self.off)?; + Some((key, val)) + } + + fn advance(&mut self) { + if self.decode().is_none() { + self.cur_key.take(); + } } fn seek(&mut self, key: &[u8]) { @@ -206,12 +198,10 @@ impl> Iter for BlockIter { self.bsearch_restart(key, 0, self.block.restart_count - 1); } loop { - let poff = self.off; match self.decode() { None => break, - Some(e) => { - if e.key() >= key { - self.off = poff; + Some(k) => { + if k.as_slice() >= key { return; } } @@ -220,13 +210,40 @@ impl> Iter for BlockIter { } } +pub struct BlockIter> { + cur: BlockCursor, +} + +impl> Iterator for BlockIter { + type Item = (Arc>, Vec); + fn next(&mut self) -> Option { + let key = self.cur.cur_key.as_ref().map(Arc::clone)?; + let res = self.cur.get().map(|(_, v)| (key, Vec::from(v))); + if res.is_some() { + self.cur.advance(); + } + res + } +} + +impl> IntoIterator for BlockCursor { + type Item = (Arc>, Vec); + type IntoIter = BlockIter; + fn into_iter(mut self) -> Self::IntoIter { + if self.get().is_none() { + self.advance(); + } + Self::IntoIter { cur: self } + } +} + #[cfg(test)] mod test { - use crate::reader::block::{Block, BlockIter}; + use crate::cursor::Cursor; + use crate::reader::block::{Block, BlockCursor}; use crate::writer::block_builder::BlockBuilder; - use crate::Entry; - use crate::Iter; + use std::sync::Arc; fn build_block(n: u32, skip: u32, r: usize) -> Block> { let mut bb = BlockBuilder::default(); @@ -242,24 +259,25 @@ mod test { Block::new(v).unwrap() } - fn build_ref(n: u32, skip: u32) -> Vec { - Vec::from_iter( - (0..n) - .map(|i| i * skip) - .map(|i| Entry::new(u32::to_be_bytes(i), u32::to_be_bytes(i * 1024))), - ) + fn build_ref(n: u32, skip: u32) -> Vec<(Arc>, Vec)> { + Vec::from_iter((0..n).map(|i| i * skip).map(|i| { + ( + Arc::new(Vec::from(u32::to_be_bytes(i))), + u32::to_be_bytes(i * 1024).into(), + ) + })) } #[test] fn test_block_iter() { let n = 40; let b = build_block(n, 1, 10); - let bi = b.into_iter(); + let bi = BlockCursor::from(b).into_iter(); assert_eq!( - bi.map(|e| Vec::from(e.key())).collect::>(), + bi.map(|(k, _)| Vec::from(k.as_slice())).collect::>(), build_ref(n, 1) .into_iter() - .map(|e| Vec::from(e.key())) + .map(|(k, _)| Vec::from(k.as_slice())) .collect::>() ); } @@ -268,10 +286,10 @@ mod test { fn test_block_seek() { let n = 40; let b = build_block(n, 10, 10); - let mut bi = BlockIter::from(b); + let mut bi = BlockCursor::from(b); bi.seek(&u32::to_be_bytes(40)); - assert_eq!(bi.next().unwrap().key(), &u32::to_be_bytes(40)); + assert_eq!(bi.get().unwrap().0, &u32::to_be_bytes(40)); bi.seek(&u32::to_be_bytes(32)); - assert_eq!(bi.next().unwrap().key(), &u32::to_be_bytes(40)); + assert_eq!(bi.get().unwrap().0, &u32::to_be_bytes(40)); } } diff --git a/src/reader/mod.rs b/src/reader/mod.rs index f749f22..8471191 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -1,12 +1,11 @@ +use crate::cursor::{Cursor, KVIter}; use crate::metadata::Metadata; use crate::Source; -use crate::{iter::IntoIter, Entry, Iter}; use integer_encoding::VarInt; pub(crate) mod block; use crate::compression::CBuf; use memmap::{Mmap, MmapOptions}; use std::fs::File; -use std::io::Cursor; use std::path::Path; use std::sync::Arc; @@ -21,7 +20,7 @@ pub(crate) struct DataSlice> { impl> Clone for DataSlice { fn clone(&self) -> Self { Self { - data: self.data.clone(), + data: Arc::clone(&self.data), off: self.off, len: self.len, } @@ -88,7 +87,7 @@ pub fn from_file(fname: impl AsRef) -> Reader { impl> Reader { pub fn new(d: D) -> Self { - let cur = Cursor::new(&d.as_ref()[d.as_ref().len() - 512..]); + let cur = std::io::Cursor::new(&d.as_ref()[d.as_ref().len() - 512..]); let metadata = Metadata::read_from(cur).expect("bad meta"); Self { data: DataSlice::new(d), @@ -96,7 +95,7 @@ impl> Reader { } } - fn index_iter(&self) -> block::BlockIter>> { + fn index_iter(&self) -> block::BlockCursor>> { let mut off = self.metadata.index_block_offset; let d = &self.data.as_ref()[off..]; let (size, len_size) = usize::decode_var(d).unwrap(); @@ -109,85 +108,114 @@ impl> Reader { } } -pub struct ReaderIter> { +pub struct ReaderCursor> { reader: Reader, next_offset: usize, - index_iter: block::BlockIter>>, - data_iter: Option>>>, + index_iter: block::BlockCursor>>, + data_iter: Option>>>, } -impl> ReaderIter { - fn next_block(&mut self) -> Option<()> { - if self.next_offset >= self.reader.metadata.index_block_offset { - return None; - } - let (size, len_size) = usize::decode_var(&self.reader.data.as_ref()[self.next_offset..]) - .expect("bad block size"); - let crc_off = self.next_offset + len_size; - // TODO: read crc, optionally verify - let data_off = crc_off + std::mem::size_of::(); +impl> ReaderCursor { + fn get_block(&self, mut off: usize) -> Option<(block::BlockCursor>>, usize)> { + let (size, len_size) = usize::decode_var(self.reader.data.as_ref().get(off..)?)?; + // TODO: read, verify CRC + off += len_size + std::mem::size_of::(); let comp = self.reader.metadata.compression_algorithm; - self.next_offset = data_off + size; - self.data_iter.replace( - block::Block::new(comp.uncompress(self.reader.data.clone_range(data_off, size))?) - .expect("bad block") - .into(), - ); - Some(()) + let data = comp + .uncompress(self.reader.data.clone_range(off, size)) + .ok()?; + let block = block::Block::new(data).ok()?; + Some((block.into(), off + size)) } } -impl> Iter for ReaderIter { - type Item = Entry; +impl> Cursor for ReaderCursor { + type Value = [u8]; + + fn get(&self) -> Option<(&[u8], &Self::Value)> { + match self.data_iter { + Some(ref cur) => cur.get(), + None => None, + } + } - fn next(&mut self) -> Option { - if self.data_iter.is_none() { - self.next_block() - .and_then(|_| self.data_iter.as_mut().unwrap().next()) - } else { - match self.data_iter.as_mut().unwrap().next() { - Some(e) => Some(e), + fn advance(&mut self) { + loop { + match &mut self.data_iter { None => { - self.next_block()?; - self.data_iter.as_mut().unwrap().next() + if self.next_offset >= self.reader.metadata.index_block_offset { + return; + } + match self.get_block(self.next_offset) { + None => return, + Some((cur, next_off)) => { + self.data_iter.replace(cur); + self.next_offset = next_off; + } + } + } + Some(cur) => { + cur.advance(); + match cur.get() { + Some(_) => return, + None => { + self.data_iter.take(); + } + } } } } } fn seek(&mut self, key: &[u8]) { - // TODO: detect and skip unneeded seek in iter. - self.index_iter.seek(key); + if let Some(cur) = self.data_iter.as_mut() { + if let Some((cur_key, _)) = cur.get() { + if key >= cur_key { + cur.seek(key); + if cur.get().is_some() { + return; + } + } + } + } - self.index_iter - .next() - .and_then(|e| { - self.next_offset = usize::decode_var(e.value())?.0; - self.next_block().map(|_| { - self.data_iter.as_mut().unwrap().seek(key); - }) - }) - .or_else(|| { - self.next_offset = usize::MAX; - self.data_iter.take(); - None - }); + self.index_iter.seek(key); + self.data_iter.take(); + if let Some((_, o)) = self.index_iter.get() { + if let Some((off, _)) = usize::decode_var(o) { + if let Some((mut cur, next)) = self.get_block(off) { + cur.seek(key); + self.data_iter.replace(cur); + self.next_offset = next; + } + } + } } } +/* impl> IntoIterator for ReaderIter { - type Item = Entry; + type Item = crate::Result; type IntoIter = IntoIter; fn into_iter(self) -> Self::IntoIter { IntoIter(self) } } +*/ + +impl> IntoIterator for ReaderCursor { + type Item = (Vec, Vec); + type IntoIter = KVIter; + fn into_iter(self) -> Self::IntoIter { + ReaderCursor::to_iter(self) + } +} -impl> Source for Reader { - type Item = Entry; +impl<'a, D: AsRef<[u8]>> Source<'a> for Reader { + type Cur = ReaderCursor; - fn iter(&self) -> impl Iter + IntoIterator { - ReaderIter { + fn iter(&'a self) -> ReaderCursor { + ReaderCursor { reader: self.clone(), next_offset: 0, index_iter: self.index_iter(), diff --git a/src/sorter.rs b/src/sorter.rs index 4bed5ab..e88cf05 100644 --- a/src/sorter.rs +++ b/src/sorter.rs @@ -1,10 +1,12 @@ -use crate::source::DynSource; -use crate::{Entry, Iter, Merger, Reader, Source, Writer}; +use crate::{ + cursor::{Cursor, VecCursor}, + Merger, Reader, Source, Writer, +}; use memmap::Mmap; use std::cell::Cell; -pub struct Sorter { - batch: Cell>, +pub struct Sorter)> { + batch: Cell, Vec)>>, batch_size: usize, max_size: usize, merge_func: F, @@ -14,7 +16,7 @@ pub struct Sorter { impl Sorter where - F: Fn(&mut Entry, &Entry) + 'static, + F: Fn(&[u8], &[u8], &mut Vec) + 'static, { pub fn new(max_size: usize, merge_func: F) -> Self { Self { @@ -27,44 +29,28 @@ where } } - pub fn add(&mut self, e: Entry) { - let esize = e.key().len() + e.value().len(); + pub fn add(&mut self, key: &[u8], val: &[u8]) { + let esize = key.len() + val.len(); if esize + self.batch_size > self.max_size { self.write_chunk(); } - self.batch.get_mut().push(e); + self.batch.get_mut().push((Vec::from(key), Vec::from(val))); self.batch_size += esize; } - pub fn source(mut self) -> Box> { - if !self.batch.get_mut().is_empty() { - self.write_chunk(); - } + pub fn write(self, mut w: Writer) { Merger::from(self.readers) .dup_merge(self.merge_func) - .into_boxed() - } - - pub fn write(self, mut w: Writer) { - self.source() - .as_ref() // XXX - need to further wrap Box in BoxedSource .iter() .into_iter() - .for_each(|e| w.add(e).unwrap()); + .for_each(|(k, v)| w.add(k.as_slice(), v.as_slice()).unwrap()); } fn write_chunk(&mut self) { let mut w = Writer::new(Vec::new()); - self.batch - .get_mut() - .sort_unstable_by(|a, b| a.key().cmp(b.key())); - self.batch - .take() - .iter() - .dup_merge(&self.merge_func) - .into_iter() - .for_each(|e| { - w.add(e).unwrap(); - }); + self.batch.get_mut().sort_unstable_by(|a, b| a.0.cmp(&b.0)); + for (k, v) in VecCursor::from(self.batch.take()).dup_merge(&self.merge_func) { + w.add(k.as_slice(), v.as_slice()).unwrap(); + } } } diff --git a/src/source/adapters.rs b/src/source/adapters.rs index ac34be1..b5b22ec 100644 --- a/src/source/adapters.rs +++ b/src/source/adapters.rs @@ -1,72 +1,55 @@ -use crate::{Iter, Source}; -use std::cmp::Ordering; +use crate::cursor::filter::Filter; +use crate::cursor::map::Map; +use crate::cursor::merge::Merge; +use crate::{cursor::Cursor, Source}; -pub struct DupmergeSource { +pub struct MergeSource { pub(super) source: S, pub(super) merge: F, } -impl Source for DupmergeSource +impl<'a, S, F> Source<'a> for MergeSource where - S: Source, - S::Item: PartialEq + Clone, - F: Fn(&mut S::Item, &S::Item), + S: Source<'a>, + ::Value: ToOwned, + F: Fn(&[u8], &::Value, &mut <::Value as ToOwned>::Owned) + + 'a, { - type Item = S::Item; - fn iter(&self) -> impl Iter + IntoIterator { + type Cur = Merge; + fn iter(&'a self) -> Merge { self.source.iter().dup_merge(&self.merge) } } -pub struct DupsortSource { - pub(super) source: S, - pub(super) dupsort: F, -} - -impl Source for DupsortSource -where - S: Source, - S::Item: PartialEq, - F: Fn(&S::Item, &S::Item) -> Ordering, -{ - type Item = S::Item; - fn iter(&self) -> impl Iter + IntoIterator { - self.source.iter().dup_sort(&self.dupsort) - } -} - pub struct FilterSource { pub(super) source: S, pub(super) filter: F, } -impl Source for FilterSource +impl<'a, S, F> Source<'a> for FilterSource where - S: Source, - F: Fn(&S::Item, &mut Vec) -> bool, + S: Source<'a>, + ::Value: Clone + std::fmt::Debug, + F: Fn((&[u8], &::Value), &mut Vec) -> bool + 'a, { - type Item = S::Item; - fn iter(&self) -> impl Iter + IntoIterator { + type Cur = Filter; + fn iter(&'a self) -> Self::Cur { self.source.iter().filter(&self.filter) } } -pub struct MapSource -where - S: Source, - F: Fn(S::Item) -> O, -{ +pub struct MapSource { pub(super) source: S, pub(super) map: F, } -impl Source for MapSource +impl<'a, S, F, O> Source<'a> for MapSource where - S: Source, - F: Fn(S::Item) -> O, + S: Source<'a>, + F: Fn((&[u8], &::Value)) -> O + 'a, { - type Item = O; - fn iter(&self) -> impl Iter + IntoIterator { + type Cur = Map; + fn iter(&'a self) -> Self::Cur { self.source.iter().map(&self.map) } } diff --git a/src/source/merger.rs b/src/source/merger.rs new file mode 100644 index 0000000..094251b --- /dev/null +++ b/src/source/merger.rs @@ -0,0 +1,29 @@ +use super::Source; +use crate::cursor::{merger::KeyOrd, merger::MergeCursor, Cursor}; + +pub struct Merger { + sources: Vec, +} + +impl From for Merger +where + I: IntoIterator, +{ + fn from(iter: I) -> Self { + Self { + sources: Vec::from_iter(iter), + } + } +} + +impl<'a, S> Source<'a> for Merger +where + S: Source<'a>, + //S::Cur: Ord, + ::Value: ToOwned, +{ + type Cur = MergeCursor>; + fn iter(&'a self) -> Self::Cur { + MergeCursor::from(self.sources.iter().map(|s| KeyOrd(s.iter()))) + } +} diff --git a/src/source/mod.rs b/src/source/mod.rs index 253dda6..c5c1ef7 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -1,65 +1,49 @@ -use crate::iter::{prefix::PrefixIter, range::RangeIter, BoxedIter, IntoIter, Iter}; -use crate::Entry; -use std::cmp::Ordering; +use crate::cursor::{range::PrefixCursor, range::RangeCursor, Cursor}; mod adapters; -pub use adapters::DupmergeSource; -pub use adapters::DupsortSource; pub use adapters::FilterSource; pub use adapters::MapSource; +pub use adapters::MergeSource; +mod merger; +pub use merger::Merger; -pub trait Source { - type Item; +pub trait Source<'a> { + type Cur: Cursor + IntoIterator; - fn iter(&self) -> impl Iter + IntoIterator; + fn iter(&'a self) -> Self::Cur; - fn get(&self, key: &[u8]) -> RangeIter> - where - Self::Item: PartialOrd<[u8]>, - { - RangeIter::new(self.iter(), key, key) + fn get(&'a self, key: &[u8]) -> RangeCursor { + RangeCursor::new(self.iter(), key, key) } - fn get_prefix(&self, prefix: &[u8]) -> PrefixIter> - where - Self::Item: AsRef<[u8]>, - { - PrefixIter::new(self.iter(), prefix) + fn get_prefix(&'a self, prefix: &[u8]) -> PrefixCursor { + PrefixCursor::new(self.iter(), prefix) } - fn get_range(&self, start: &[u8], end: &[u8]) -> RangeIter> - where - Self::Item: PartialOrd<[u8]>, - { - RangeIter::new(self.iter(), start, end) + fn get_range(&'a self, start: &[u8], end: &[u8]) -> RangeCursor { + RangeCursor::new(self.iter(), start, end) } - fn dup_merge(self, merge: F) -> DupmergeSource + fn dup_merge(self, merge: F) -> MergeSource where Self: Sized, - F: Fn(&mut Entry, &Entry) + 'static, - { - DupmergeSource { + ::Value: ToOwned, + F: Fn( + &[u8], + &::Value, + &mut <::Value as ToOwned>::Owned, + ) + 'static, + { + MergeSource { source: self, merge, } } - fn dupsort(self, dupsort: F) -> DupsortSource - where - Self: Sized, - F: Fn(&Self::Item, &Self::Item) -> Ordering + 'static, - { - DupsortSource { - source: self, - dupsort, - } - } - fn filter(self, filter: F) -> FilterSource where Self: Sized, - F: Fn(&Self::Item, &mut Vec) -> bool + 'static, + F: Fn(&::Value, &mut Vec) -> bool + 'static, { FilterSource { source: self, @@ -67,189 +51,11 @@ pub trait Source { } } - fn map(self, map: F) -> MapSource + fn map(self, map: F) -> MapSource where Self: Sized, - F: Fn(Self::Item) -> O + 'static, + F: Fn((&[u8], &::Value)) -> O + 'static, { MapSource { source: self, map } } - - fn into_boxed(self) -> Box> - where - Self: Sized + 'static, - { - // Inner Box satisfied DynSource, outer Box required for an owned DynSource trait object. - Box::new(Box::new(self)) - } -} - -// A dyn-compatible variant of Source generating boxed SeekableIters. Necessary to create -// heterogeneous collections of sources. -pub trait DynSource { - type Item; - - fn iter(&self) -> BoxedIter<'_, Self::Item>; -} - -impl DynSource for Box { - type Item = S::Item; - - fn iter(&self) -> BoxedIter<'_, Self::Item> { - BoxedIter(Box::new(self.as_ref().iter())) - } -} - -impl Source for D { - type Item = D::Item; - fn iter(&self) -> impl Iter + IntoIterator { - DynSource::iter(self) - } -} - -pub struct VecIter<'a> { - index: usize, - vec: &'a Vec, -} - -impl Iter for VecIter<'_> { - type Item = Entry; - fn next(&mut self) -> Option { - if self.index > self.vec.len() { - None - } else { - let e = self.vec[self.index].clone(); - self.index += 1; - Some(e) - } - } - - fn seek(&mut self, key: &[u8]) { - let mut left = 0; - let mut right = self.vec.len() - 1; - while left < right { - let mid = (left + right).div_ceil(2); - if self.vec[mid].key() < key { - left = mid; - } else { - right = mid - 1; - } - } - self.index = left; - } -} - -impl<'a> IntoIterator for VecIter<'a> { - type Item = Entry; - type IntoIter = IntoIter; - fn into_iter(self) -> Self::IntoIter { - IntoIter(self) - } -} - -impl Source for Vec { - type Item = Entry; - fn iter(&self) -> impl Iter + IntoIterator { - VecIter { - index: 0, - vec: self, - } - } -} - -pub mod test_source { - use crate::Entry; - use crate::Iter; - use crate::Source; - - pub struct TestSource(pub Vec); - - pub struct TestIter<'a> { - source: &'a TestSource, - off: usize, - } - - impl Iter for TestIter<'_> { - type Item = Entry; - - fn next(&mut self) -> Option { - let off = self.off; - if off >= self.source.0.len() { - return None; - } - let item = &self.source.0[off]; - self.off += 1; - Some(item.clone()) - } - - fn seek(&mut self, key: &[u8]) { - self.off = 0; - while self.off < self.source.0.len() && self.source.0[self.off].key() < key { - self.off += 1; - } - } - } - - impl IntoIterator for TestIter<'_> { - type Item = Entry; - type IntoIter = crate::iter::IntoIter; - fn into_iter(self) -> Self::IntoIter { - crate::iter::IntoIter(self) - } - } - - impl Source for TestSource { - type Item = Entry; - fn iter(&self) -> impl Iter + IntoIterator { - TestIter { - source: self, - off: 0, - } - } - } -} - -pub mod test { - use super::test_source::TestSource; - #[allow(unused_imports)] - use super::Source; - #[allow(unused_imports)] - use crate::iter::Iter; - use crate::Entry; - - #[allow(dead_code)] - fn test_source() -> TestSource { - TestSource(vec![ - Entry::new(vec![0, 0, 0, 0], vec![0]), - Entry::new(vec![0, 0, 0, 1], vec![1]), - Entry::new(vec![0, 0, 1, 0], vec![2]), - Entry::new(vec![0, 1, 0, 0], vec![3]), - Entry::new(vec![1, 0, 0, 0], vec![4]), - ]) - } - - #[test] - fn test_source_iter() { - let s = &test_source(); - - assert_eq!( - Vec::from_iter(s.iter().map(|e| e.value()[0])), - vec![0, 1, 2, 3, 4] - ); - assert_eq!( - Vec::from_iter(s.get(vec![0, 0, 1, 0].as_slice()).map(|e| e.value()[0])), - vec![2] - ); - assert_eq!( - Vec::from_iter(s.get_prefix(vec![0, 0].as_slice()).map(|e| e.value()[0])), - vec![0, 1, 2] - ); - assert_eq!( - Vec::from_iter( - s.get_range(vec![0, 0, 0, 1].as_slice(), vec![0, 1, 0, 0].as_slice()) - .map(|e| e.value()[0]) - ), - vec![1, 2, 3] - ); - } } diff --git a/src/writer/block_builder.rs b/src/writer/block_builder.rs index da6743f..0742fb3 100644 --- a/src/writer/block_builder.rs +++ b/src/writer/block_builder.rs @@ -105,15 +105,16 @@ impl BlockBuilder { #[cfg(test)] mod test { use super::BlockBuilder; - use crate::reader::block::Block; + use crate::reader::block::{Block, BlockCursor}; use crate::reader::DataSlice; + use std::sync::Arc; #[test] fn test_block_builder() { let mut bb = BlockBuilder::default(); let v = Vec::from_iter((0..16).map(|i| { ( - Vec::from(u32::to_be_bytes(i).as_slice()), + Arc::new(Vec::from(u32::to_be_bytes(i).as_slice())), Vec::from(u32::to_be_bytes(i * 2).as_slice()), ) })); @@ -121,10 +122,9 @@ mod test { let block_len = bb.len(); let block_data = bb.as_slice(); assert_eq!(block_data.len(), block_len); - let bi = Block::new(DataSlice::new(bb.as_slice())) - .unwrap() - .into_iter(); - let vcmp = Vec::from_iter(bi.map(|e| (Vec::from(e.key()), Vec::from(e.value())))); + let bc = BlockCursor::from(Block::new(DataSlice::new(bb.as_slice())).unwrap()); + let bi = bc.into_iter(); + let vcmp = Vec::from_iter(bi); assert_eq!(v, vcmp); } } diff --git a/src/writer/mod.rs b/src/writer/mod.rs index e231e9a..a4ce210 100644 --- a/src/writer/mod.rs +++ b/src/writer/mod.rs @@ -1,5 +1,5 @@ use crate::compression::Compression; -use crate::{Entry, Result}; +use crate::Result; use crc32c::crc32c; use integer_encoding::{FixedIntWriter, VarInt, VarIntWriter}; pub(crate) mod block_builder; @@ -110,16 +110,16 @@ impl Writer { } } - pub fn add(&mut self, e: Entry) -> Result<()> { - let est = e.key().len() + e.value().len() + 15; + pub fn add(&mut self, key: &[u8], val: &[u8]) -> Result<()> { + let est = key.len() + val.len() + 15; if self.block.len() + est >= self.blocksize { - bytesep(&mut self.last_key, e.key()); + bytesep(&mut self.last_key, key); self.write_block()?; } - self.meta.add_entry(e.key().len(), e.value().len()); - self.block.add(e.key(), e.value()); + self.meta.add_entry(key.len(), val.len()); + self.block.add(key, val); self.last_key.clear(); - self.last_key.extend_from_slice(e.key()); + self.last_key.extend_from_slice(key); Ok(()) } @@ -186,17 +186,16 @@ impl Drop for Writer { #[cfg(test)] mod test { use super::Writer; - use crate::Entry; #[test] fn test_writer() { let mut out = Vec::::new(); { let mut w = Writer::new(&mut out); - w.add(Entry::new(vec![0], vec![1])).unwrap(); - w.add(Entry::new(vec![0, 0], vec![1])).unwrap(); - w.add(Entry::new(vec![0, 1], vec![1])).unwrap(); - w.add(Entry::new(vec![1, 1], vec![1])).unwrap(); + w.add(vec![0].as_ref(), vec![1].as_ref()).unwrap(); + w.add(vec![0, 0].as_ref(), vec![1].as_ref()).unwrap(); + w.add(vec![0, 1].as_ref(), vec![1].as_ref()).unwrap(); + w.add(vec![1, 1].as_ref(), vec![1].as_ref()).unwrap(); // drops w } assert!(out.len() > 512); diff --git a/tests/rwtest.rs b/tests/rwtest.rs index d2c6328..32084ea 100644 --- a/tests/rwtest.rs +++ b/tests/rwtest.rs @@ -1,22 +1,29 @@ -use mtbl::{Entry, Reader, Source, Writer}; +use mtbl::{Reader, Source, Writer}; #[test] fn test_write_readback() { let mut store = Vec::::new(); - let mut reference = Vec::::new(); + let mut reference = Vec::<(Vec, Vec)>::new(); { + println!("writer starting"); let mut w = Writer::new(&mut store).blocksize(256); for i in 1..1024 { - let e = Entry::new(u32::to_be_bytes(i), u32::to_be_bytes(i * 1024)); - w.add(e.clone()).expect("add failed"); + let e = ( + Vec::from(u32::to_be_bytes(i)), + Vec::from(u32::to_be_bytes(i * 1024)), + ); + w.add(e.0.as_ref(), e.1.as_ref()).expect("add failed"); reference.push(e); } + println!("writer finished"); } assert!(store.len() > 512); + println!("reader start"); let r = Reader::new(store); let ri = r.iter(); assert_eq!(ri.into_iter().collect::>(), reference); + println!("reader finish"); // test range let start = u32::to_be_bytes(192); @@ -26,7 +33,8 @@ fn test_write_readback() { rangei.into_iter().collect::>(), reference .into_iter() - .filter(|e| e.key() >= &u32::to_be_bytes(192) && e.key() <= &u32::to_be_bytes(256)) + .filter(|(k, _)| k.as_slice() >= &u32::to_be_bytes(192) + && k.as_slice() <= &u32::to_be_bytes(256)) .collect::>() ) }