]> git.mikk.net Git - mtbl-rs/commitdiff
Massive refactor #N for borrowing iteration borrow-take-3
authorChris Mikkelson <chris@mikk.net>
Sat, 21 Jun 2025 20:33:18 +0000 (15:33 -0500)
committerChris Mikkelson <chris@mikk.net>
Sat, 21 Jun 2025 20:36:32 +0000 (15:36 -0500)
New "Cursor" abstraction returns a tuple of references to a byte slice
key and associated Value type. This tuple replaces Entry (removed), and
the cursor's immutable get() allows the merger heap to use the cursor
directly for ordering.

WIP, several bugs likely remain, but unit tests pass.

30 files changed:
src/bin/mtbl_dump.rs
src/compression.rs
src/cursor/filter.rs
src/cursor/filtermap.rs
src/cursor/map.rs
src/cursor/merge.rs
src/cursor/merger.rs
src/cursor/mod.rs
src/cursor/range.rs
src/entry.rs [deleted file]
src/fileset.rs
src/iter/dupmerge.rs [deleted file]
src/iter/dupsort.rs [deleted file]
src/iter/filter.rs [deleted file]
src/iter/map.rs [deleted file]
src/iter/merger.rs [deleted file]
src/iter/mod.rs [deleted file]
src/iter/prefix.rs [deleted file]
src/iter/range.rs [deleted file]
src/lib.rs
src/merger.rs [deleted file]
src/reader/block.rs
src/reader/mod.rs
src/sorter.rs
src/source/adapters.rs
src/source/merger.rs [new file with mode: 0644]
src/source/mod.rs
src/writer/block_builder.rs
src/writer/mod.rs
tests/rwtest.rs

index c1de7244bd2920c6d71e88f6db5fcb88fa9c01c7..c1223987fa613254389f09d8b84591499a7a510b 100644 (file)
@@ -5,7 +5,7 @@ fn main() {
         .nth(1)
         .expect("Usage: mtbl_dump <filename>");
     let reader = mtbl::reader::from_file(fname);
-    for e in reader.iter() {
-        println!("{:?}: {:?}", e.key(), e.value());
+    for (k, v) in reader.iter() {
+        println!("{:?}: {:?}", k, v);
     }
 }
index 0a447395b3ef9acd4bfbac98f358b57618828770..f8118347f8407158f0087ba0a7f80bc121c7b573 100644 (file)
@@ -1,3 +1,4 @@
+use crate::Result;
 use std::{
     io::{Read, Write},
     ops::Deref,
@@ -68,10 +69,7 @@ impl<B: AsRef<[u8]>> AsRef<[u8]> for CBuf<B> {
 }
 
 impl Compression {
-    pub(crate) fn compress<B: AsRef<[u8]>>(
-        &self,
-        buf: B,
-    ) -> Result<CBuf<B>, Box<dyn std::error::Error>> {
+    pub(crate) fn compress<B: AsRef<[u8]>>(&self, buf: B) -> Result<CBuf<B>> {
         match self {
             Compression::None => Ok(CBuf::Buf(buf)),
             Compression::Snappy => Ok(CBuf::Vec(
@@ -91,27 +89,24 @@ impl Compression {
         }
     }
 
-    pub(crate) fn uncompress<B: AsRef<[u8]>>(&self, buf: B) -> Option<CBuf<B>> {
+    pub(crate) fn uncompress<B: AsRef<[u8]>>(&self, buf: B) -> Result<CBuf<B>> {
         match self {
-            Compression::None => Some(CBuf::Buf(buf)),
-            Compression::Snappy => Some(CBuf::Vec(
-                snap::raw::Decoder::new()
-                    .decompress_vec(buf.as_ref())
-                    .ok()?,
+            Compression::None => Ok(CBuf::Buf(buf)),
+            Compression::Snappy => Ok(CBuf::Vec(
+                snap::raw::Decoder::new().decompress_vec(buf.as_ref())?,
             )),
             Compression::Zlib(_) => {
                 let mut v = Vec::<u8>::new();
                 {
                     let mut dec = flate2::read::ZlibDecoder::new(buf.as_ref());
-                    dec.read_to_end(&mut v).ok()?;
+                    dec.read_to_end(&mut v)?;
                 }
-                Some(CBuf::Vec(v))
+                Ok(CBuf::Vec(v))
             }
             Compression::Zstd(_) => {
-                let mut dec = zstd::bulk::Decompressor::new().ok()?;
-                Some(CBuf::Vec(
-                    dec.decompress(buf.as_ref(), 100 * buf.as_ref().len())
-                        .ok()?,
+                let mut dec = zstd::bulk::Decompressor::new()?;
+                Ok(CBuf::Vec(
+                    dec.decompress(buf.as_ref(), 100 * buf.as_ref().len())?,
                 ))
             }
         }
index a9420e3db5a875a204479aa1c1ffae4e7283efed..0e27bbd46492a68c8ac823aac9cb5405cb85d6cd 100644 (file)
@@ -13,11 +13,11 @@ where
 impl<C, F> Filter<C, F>
 where
     C: Cursor,
+    C::Value: std::fmt::Debug,
     F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
 {
     fn find_match(&mut self) {
         while let Some((k, v)) = self.cursor.get() {
-            self.seekto.clear();
             if (self.filter)((k, v), &mut self.seekto) {
                 break;
             }
@@ -25,6 +25,7 @@ where
                 self.cursor.advance();
             } else {
                 self.cursor.seek(self.seekto.as_slice());
+                self.seekto.clear();
             }
         }
     }
@@ -41,6 +42,7 @@ where
 impl<C, F> Cursor for Filter<C, F>
 where
     C: Cursor,
+    C::Value: std::fmt::Debug,
     F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
 {
     type Value = C::Value;
@@ -55,7 +57,14 @@ where
     }
 
     fn advance(&mut self) {
-        self.cursor.advance();
+        if self.seekto.is_empty() {
+            self.cursor.advance();
+        } else {
+            // A previous filter passed its item but left
+            // a seek key for the next potential match.
+            self.cursor.seek(self.seekto.as_slice());
+            self.seekto.clear();
+        }
         self.find_match();
     }
 }
@@ -63,7 +72,7 @@ where
 impl<C, F> IntoIterator for Filter<C, F>
 where
     C: Cursor,
-    C::Value: Clone,
+    C::Value: Clone + std::fmt::Debug,
     F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
 {
     type Item = (Vec<u8>, C::Value);
@@ -80,7 +89,7 @@ where
 
 #[cfg(test)]
 mod test {
-    use super::{Cursor, Filter};
+    use super::Cursor;
 
     struct TestSource<T>(Vec<(Vec<u8>, T)>, Option<usize>);
 
@@ -120,11 +129,9 @@ mod test {
     fn test_filter() {
         let range = 1u64..64;
         let ts = TestSource::<u64>::from(range.clone());
-        let filter = Filter::new(ts, |(_, v), _| v % 3 == 0);
+        let fiter = ts.filter(|(_, v), _| v % 3 == 0).into_iter();
+        let tv = Iterator::map(fiter, |(_, v)| v).collect::<Vec<_>>();
         let check = range.into_iter().filter(|i| i % 3 == 0).collect::<Vec<_>>();
-        assert_eq!(
-            filter.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
-            check
-        )
+        assert_eq!(tv, check)
     }
 }
index 8177da0d1436282f08cd3467e1455575d22bbea1..9b0e5f6efa7d6bfc72367bab985e14194c4f2a70 100644 (file)
@@ -18,7 +18,6 @@ where
 {
     fn find_match(&mut self) {
         while let Some((k, v)) = self.cursor.get() {
-            self.seekto.clear();
             match (self.filtermap)((k, v), &mut self.seekto) {
                 Some(val) => {
                     self.value.replace(val);
@@ -29,6 +28,7 @@ where
                         self.cursor.advance();
                     } else {
                         self.cursor.seek(self.seekto.as_slice());
+                        self.seekto.clear();
                     }
                 }
             }
@@ -63,7 +63,12 @@ where
     }
 
     fn advance(&mut self) {
-        self.cursor.advance();
+        if self.seekto.is_empty() {
+            self.cursor.advance();
+        } else {
+            self.cursor.seek(self.seekto.as_slice());
+            self.seekto.clear();
+        }
         self.find_match();
     }
 }
@@ -81,21 +86,17 @@ where
         if self.get().is_none() {
             self.advance();
         }
-        FilterMapIter {
-            cursor: self,
-            first: true,
-        }
+        FilterMapIter { cursor: self }
     }
 }
 
-struct FilterMapIter<C, F, O>
+pub struct FilterMapIter<C, F, O>
 where
     C: Cursor,
     O: Clone,
     F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
 {
     cursor: FilterMap<C, F, O>,
-    first: bool,
 }
 
 impl<C, F, O> Iterator for FilterMapIter<C, F, O>
@@ -107,12 +108,13 @@ where
     type Item = (Vec<u8>, O);
 
     fn next(&mut self) -> Option<Self::Item> {
-        if !self.first {
+        let (k, _) = self.cursor.get()?;
+        let k = Vec::from(k);
+        let res = self.cursor.value.take().map(|v| (k, v));
+        if res.is_some() {
             self.cursor.advance();
         }
-        self.first = false;
-        let (k, _) = self.cursor.get()?;
-        self.cursor.value.take().map(|v| (Vec::from(k), v))
+        res
     }
 }
 
index 03c0a3faeec5b71937e765ec77b058a02bb81d74..ab036b624ede903cb8d7110ce9d2ff52e3e2012d 100644 (file)
@@ -10,6 +10,27 @@ where
     value: Option<O>,
 }
 
+impl<C, F, O> Map<C, F, O>
+where
+    C: Cursor,
+    F: FnMut((&[u8], &C::Value)) -> O,
+{
+    pub fn new(cursor: C, map: F) -> Self {
+        Self {
+            cursor,
+            map,
+            value: None,
+        }
+    }
+
+    fn update(&mut self) {
+        self.cursor
+            .get()
+            .and_then(|(k, v)| self.value.replace((self.map)((k, v))))
+            .or_else(|| self.value.take());
+    }
+}
+
 impl<C, F, O> Cursor for Map<C, F, O>
 where
     C: Cursor,
@@ -24,10 +45,12 @@ where
 
     fn advance(&mut self) {
         self.cursor.advance();
+        self.update();
     }
 
     fn seek(&mut self, key: &[u8]) {
         self.cursor.seek(key);
+        self.update();
     }
 }
 
@@ -42,20 +65,16 @@ where
         if self.get().is_none() {
             self.advance();
         }
-        MapIter {
-            cursor: self,
-            first: true,
-        }
+        MapIter { cursor: self }
     }
 }
 
-struct MapIter<C, F, O>
+pub struct MapIter<C, F, O>
 where
     C: Cursor,
     F: FnMut((&[u8], &C::Value)) -> O,
 {
     cursor: Map<C, F, O>,
-    first: bool,
 }
 
 impl<C, F, O> Iterator for MapIter<C, F, O>
@@ -65,11 +84,12 @@ where
 {
     type Item = (Vec<u8>, O);
     fn next(&mut self) -> Option<Self::Item> {
-        if !self.first {
+        let (k, _) = self.cursor.get()?;
+        let k = Vec::from(k); // needs to be here to drop borrowed k from self.cursor
+        let res = self.cursor.value.take().map(|v| (k, v));
+        if res.is_some() {
             self.cursor.advance();
         }
-        self.first = false;
-        let (k, _) = self.cursor.get()?;
-        self.cursor.value.take().map(|v| (Vec::from(k), v))
+        res
     }
 }
index dfa1bb455503f2bef0fc19b2b9467bcf5440c27f..cc4d4eede38740d74032d39cbbaca2478501b533 100644 (file)
@@ -1,15 +1,14 @@
 use super::Cursor;
 use std::borrow::Borrow;
 
-struct Merge<C, F>
+pub struct Merge<C, F>
 where
     C: Cursor,
     C::Value: ToOwned,
-    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+    F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
 {
     cursor: C,
     merge: F,
-    needs_advance: bool,
     merge_key: Vec<u8>,
     merge_val: Option<<C::Value as ToOwned>::Owned>,
 }
@@ -18,13 +17,12 @@ impl<C, F> Merge<C, F>
 where
     C: Cursor,
     C::Value: ToOwned,
-    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+    F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
 {
     pub fn new(cursor: C, merge: F) -> Self {
         Self {
             cursor,
             merge,
-            needs_advance: true,
             merge_key: Vec::new(),
             merge_val: None,
         }
@@ -42,17 +40,21 @@ where
                     .as_mut()
                     .map(|mv| v.clone_into(mv))
                     .or_else(|| {
-                        self.merge_val.insert(v.to_owned());
+                        self.merge_val = Some(v.to_owned());
                         None
                     });
             }
         }
 
+        self.cursor.advance();
         while let Some((k, v)) = self.cursor.get() {
             if k != self.merge_key.as_slice() {
                 break;
             }
-            self.merge_val.as_mut().map(|mv| (self.merge)(mv, v));
+            if let Some(mv) = self.merge_val.as_mut() {
+                (self.merge)(k, v, mv);
+            }
+            self.cursor.advance();
         }
     }
 }
@@ -61,7 +63,7 @@ impl<C, F> Cursor for Merge<C, F>
 where
     C: Cursor,
     C::Value: ToOwned,
-    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+    F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
 {
     type Value = C::Value;
 
@@ -85,7 +87,7 @@ impl<C, F> IntoIterator for Merge<C, F>
 where
     C: Cursor,
     C::Value: ToOwned,
-    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+    F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
 {
     type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
     type IntoIter = MergeIter<C, F>;
@@ -93,10 +95,7 @@ where
         if self.get().is_none() {
             self.advance()
         }
-        MergeIter {
-            cursor: self,
-            first: true,
-        }
+        MergeIter { cursor: self }
     }
 }
 
@@ -104,27 +103,27 @@ pub struct MergeIter<C, F>
 where
     C: Cursor,
     C::Value: ToOwned,
-    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+    F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
 {
     cursor: Merge<C, F>,
-    first: bool,
 }
 
 impl<C, F> Iterator for MergeIter<C, F>
 where
     C: Cursor,
     C::Value: ToOwned,
-    F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+    F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
 {
     type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
     fn next(&mut self) -> Option<Self::Item> {
-        if !self.first {
-            self.cursor.advance();
-        }
-        self.first = false;
-        self.cursor
+        let res = self
+            .cursor
             .merge_val
             .take()
-            .map(|mv| (self.cursor.merge_key.clone(), mv))
+            .map(|mv| (self.cursor.merge_key.clone(), mv));
+        if res.is_some() {
+            self.cursor.advance();
+        }
+        res
     }
 }
index 49e30806f38c8d7a99aa7fe02d42be1629aaa600..dd9d27cc5f4c343aba92d5181f4821a8126f2744 100644 (file)
@@ -8,6 +8,19 @@ pub struct MergeCursor<C: Cursor + Ord> {
     last: Vec<u8>,
 }
 
+impl<I, C: Cursor + Ord> From<I> for MergeCursor<C>
+where
+    I: IntoIterator<Item = C>,
+{
+    fn from(iter: I) -> Self {
+        Self {
+            pending: Vec::from_iter(iter),
+            active: BinaryHeap::new(),
+            last: Vec::new(),
+        }
+    }
+}
+/*
 impl<C: Cursor> MergeCursor<KeyOrd<C>> {
     fn from<I>(iter: I) -> Self
     where
@@ -36,12 +49,12 @@ where
         }
     }
 }
-
+*/
 impl<C: Cursor + Ord> Cursor for MergeCursor<C> {
     type Value = C::Value;
 
     fn get(&self) -> Option<(&[u8], &Self::Value)> {
-        self.active.peek().map(|Reverse(c)| c.get()).flatten()
+        self.active.peek().and_then(|Reverse(c)| c.get())
     }
 
     fn advance(&mut self) {
@@ -54,8 +67,10 @@ impl<C: Cursor + Ord> Cursor for MergeCursor<C> {
                 .peek()
                 .map(|Reverse(c)| c.get().map(|(k, _)| k.clone_into(&mut self.last)));
         } else if let Some(mut rcur) = self.active.peek_mut() {
-            rcur.0.get().map(|(k, _)| k.clone_into(&mut self.last));
-            rcur.0.advance();
+            if let Some((k, _)) = rcur.0.get() {
+                k.clone_into(&mut self.last);
+                rcur.0.advance();
+            }
             // XXX cannot remove a cursor when `get()` return None after an advance.
             // Finished cursors will remain at the bottom of the heap.
         }
@@ -91,24 +106,17 @@ impl<C: Cursor + Ord> Cursor for MergeCursor<C> {
 impl<C> IntoIterator for MergeCursor<C>
 where
     C: Cursor + Ord,
-    C::Value: Clone,
+    C::Value: ToOwned,
 {
-    type Item = (Vec<u8>, C::Value);
+    type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
     type IntoIter = KVIter<Self>;
     fn into_iter(self) -> Self::IntoIter {
         Cursor::to_iter(self)
     }
 }
 
-fn test_merger_construction<C: Cursor>(i: impl Iterator<Item = C>)
-where
-    C::Value: Ord,
-{
-    let _ = MergeCursor::<KeyOrd<_>>::from(i);
-}
-
-pub struct KeyOrd<C: Cursor>(C);
-pub struct KeyValOrd<C: Cursor>(C);
+pub struct KeyOrd<C: Cursor>(pub C);
+pub struct KeyValOrd<C: Cursor>(pub C);
 
 // Ordering implementation for KeyOrd, KeyValOrd
 impl<C: Cursor> Cursor for KeyOrd<C> {
@@ -164,9 +172,7 @@ impl<C: Cursor> Eq for KeyValOrd<C> where C::Value: Ord {}
 
 impl<C: Cursor> PartialOrd for KeyOrd<C> {
     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
-        let ks = self.0.get().map(|(k, _)| k);
-        let ko = other.0.get().map(|(k, _)| k);
-        Some(ks.cmp(&ko))
+        Some(self.cmp(other))
     }
 }
 
@@ -175,9 +181,7 @@ where
     C::Value: Ord,
 {
     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
-        let sp = self.0.get();
-        let op = other.0.get();
-        Some(sp.cmp(&op))
+        Some(self.cmp(other))
     }
 }
 
index 756d66839b5a25d68c7314c0221ebf6438a57266..b09ff6b8649fadbdaff6cad32eb91a8f15415da5 100644 (file)
@@ -6,7 +6,7 @@ pub mod merger;
 pub mod range;
 
 pub trait Cursor {
-    type Value;
+    type Value: ?Sized;
 
     // required methods
     fn get(&self) -> Option<(&[u8], &Self::Value)>;
@@ -17,22 +17,31 @@ pub trait Cursor {
     fn to_iter(mut self) -> KVIter<Self>
     where
         Self: Sized,
-        Self::Value: Clone,
+        Self::Value: ToOwned,
     {
         if self.get().is_none() {
             self.advance();
         }
-        KVIter(self, true)
+        KVIter { cursor: self }
     }
 
     fn filter<F>(self, filter: F) -> filter::Filter<Self, F>
     where
         Self: Sized,
+        Self::Value: std::fmt::Debug,
         F: FnMut((&[u8], &Self::Value), &mut Vec<u8>) -> bool,
     {
         filter::Filter::new(self, filter)
     }
 
+    fn map<F, O>(self, map: F) -> map::Map<Self, F, O>
+    where
+        Self: Sized,
+        F: FnMut((&[u8], &Self::Value)) -> O,
+    {
+        map::Map::new(self, map)
+    }
+
     fn filtermap<F, O>(self, filtermap: F) -> filtermap::FilterMap<Self, F, O>
     where
         Self: Sized,
@@ -40,23 +49,34 @@ pub trait Cursor {
     {
         filtermap::FilterMap::new(self, filtermap)
     }
+
+    fn dup_merge<F>(self, merge: F) -> merge::Merge<Self, F>
+    where
+        Self: Sized,
+        Self::Value: ToOwned,
+        F: FnMut(&[u8], &Self::Value, &mut <Self::Value as ToOwned>::Owned),
+    {
+        merge::Merge::new(self, merge)
+    }
 }
 
-pub struct KVIter<T>(T, bool);
+pub struct KVIter<C: Cursor> {
+    cursor: C,
+}
 
 impl<C> Iterator for KVIter<C>
 where
     C: Cursor,
-    C::Value: Clone,
+    C::Value: ToOwned,
 {
-    type Item = (Vec<u8>, C::Value);
+    type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
     fn next(&mut self) -> Option<Self::Item> {
-        let first = self.1;
-        if !first {
-            self.0.advance();
+        let ires = self.cursor.get();
+        let res = ires.map(|(k, v)| (Vec::from(k), v.to_owned()));
+        if res.is_some() {
+            self.cursor.advance();
         }
-        self.1 = false;
-        self.0.get().map(|(k, v)| (Vec::from(k), v.clone()))
+        res
     }
 }
 
@@ -64,19 +84,19 @@ impl<C: Cursor> Cursor for KVIter<C> {
     type Value = C::Value;
 
     fn get(&self) -> Option<(&[u8], &Self::Value)> {
-        self.0.get()
+        self.cursor.get()
     }
 
     fn advance(&mut self) {
-        self.0.advance();
+        self.cursor.advance();
     }
 
     fn seek(&mut self, key: &[u8]) {
-        self.0.seek(key);
+        self.cursor.seek(key);
     }
 }
 
-impl<C: Cursor> Cursor for Box<C> {
+impl<C: Cursor + ?Sized> Cursor for Box<C> {
     type Value = C::Value;
 
     fn get(&self) -> Option<(&[u8], &Self::Value)> {
@@ -91,3 +111,99 @@ impl<C: Cursor> Cursor for Box<C> {
         self.as_mut().seek(key);
     }
 }
+
+pub(crate) struct VecCursor {
+    data: Vec<(Vec<u8>, Vec<u8>)>,
+    idx: Option<usize>,
+}
+
+impl From<Vec<(Vec<u8>, Vec<u8>)>> for VecCursor {
+    fn from(data: Vec<(Vec<u8>, Vec<u8>)>) -> Self {
+        Self { data, idx: None }
+    }
+}
+
+impl Cursor for VecCursor {
+    type Value = [u8];
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        self.idx.as_ref().map(|idx| {
+            let pair = &self.data[*idx];
+            (pair.0.as_slice(), pair.1.as_slice())
+        })
+    }
+
+    fn advance(&mut self) {
+        match self.idx.take() {
+            Some(idx) => self.idx.replace(idx + 1),
+            None => self.idx.replace(0),
+        };
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        let idx = self
+            .data
+            .binary_search_by_key(&Vec::from(key), |(k, _)| k.clone())
+            .unwrap_or(self.data.len());
+        self.idx.replace(idx);
+    }
+}
+
+impl IntoIterator for VecCursor {
+    type Item = (Vec<u8>, Vec<u8>);
+    type IntoIter = KVIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        self.to_iter()
+    }
+}
+
+struct RangeCursor(Option<(Vec<u8>, usize)>, usize);
+
+impl Cursor for RangeCursor {
+    type Value = usize;
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        self.0.as_ref().map(|(k, v)| (k.as_slice(), v))
+    }
+
+    fn advance(&mut self) {
+        match self.0.take() {
+            Some((_, mut v)) => {
+                if v < self.1 {
+                    v += 1;
+                    self.0.replace((v.to_be_bytes().into(), v));
+                }
+            }
+            None => {
+                self.0.replace((0usize.to_be_bytes().into(), 0));
+            }
+        }
+    }
+
+    fn seek(&mut self, key: &[u8]) {
+        let idx = usize::from_be_bytes(key.try_into().unwrap_or([255; (usize::BITS / 8) as usize]));
+        if idx < self.1 {
+            self.0.replace((idx.to_be_bytes().into(), idx));
+        } else {
+            self.0.take();
+        }
+    }
+}
+
+impl IntoIterator for RangeCursor {
+    type Item = (Vec<u8>, usize);
+    type IntoIter = KVIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        self.to_iter()
+    }
+}
+
+#[test]
+fn test_range_cursor() {
+    let rc = RangeCursor(None, 10);
+    let rcv = rc.to_iter().collect::<Vec<_>>();
+    let check = (0usize..11)
+        .map(|i| (Vec::from(i.to_be_bytes()), i))
+        .collect::<Vec<_>>();
+    assert_eq!(rcv, check)
+}
index ecb8273d9d97c9793b61b892c14b425635c556fc..575538fafcc9da36743dcafcb31f63226be1032e 100644 (file)
@@ -1,16 +1,24 @@
+use crate::cursor::KVIter;
+
 use super::Cursor;
 
 pub struct RangeCursor<C: Cursor> {
     cursor: C,
     begin: Vec<u8>,
     end: Vec<u8>,
+    first: bool,
 }
 
 impl<C: Cursor> RangeCursor<C> {
     pub fn new(cursor: C, begin: &[u8], end: &[u8]) -> Self {
-        let begin = Vec::from(begin.as_ref());
+        let begin = Vec::from(begin);
         let end = Vec::from(end);
-        Self { cursor, begin, end }
+        Self {
+            cursor,
+            begin,
+            end,
+            first: true,
+        }
     }
 }
 
@@ -24,6 +32,11 @@ impl<C: Cursor> Cursor for RangeCursor<C> {
     }
 
     fn advance(&mut self) {
+        if self.first && self.cursor.get().is_none() {
+            self.first = false;
+            self.cursor.seek(self.begin.as_slice());
+            return;
+        }
         self.cursor.advance();
     }
 
@@ -32,15 +45,31 @@ impl<C: Cursor> Cursor for RangeCursor<C> {
     }
 }
 
+impl<C: Cursor> IntoIterator for RangeCursor<C>
+where
+    C::Value: ToOwned,
+{
+    type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
+    type IntoIter = KVIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        Cursor::to_iter(self)
+    }
+}
+
 pub struct PrefixCursor<C: Cursor> {
     cursor: C,
     prefix: Vec<u8>,
+    first: bool,
 }
 
 impl<C: Cursor> PrefixCursor<C> {
     pub fn new(cursor: C, prefix: &[u8]) -> Self {
         let prefix = Vec::from(prefix);
-        Self { cursor, prefix }
+        Self {
+            cursor,
+            prefix,
+            first: true,
+        }
     }
 }
 
@@ -54,6 +83,11 @@ impl<C: Cursor> Cursor for PrefixCursor<C> {
     }
 
     fn advance(&mut self) {
+        if self.first && self.cursor.get().is_none() {
+            self.first = false;
+            self.cursor.seek(self.prefix.as_slice());
+            return;
+        }
         self.cursor.advance();
     }
 
@@ -61,3 +95,14 @@ impl<C: Cursor> Cursor for PrefixCursor<C> {
         self.cursor.seek(key);
     }
 }
+
+impl<C: Cursor> IntoIterator for PrefixCursor<C>
+where
+    C::Value: ToOwned,
+{
+    type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
+    type IntoIter = KVIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        Cursor::to_iter(self)
+    }
+}
diff --git a/src/entry.rs b/src/entry.rs
deleted file mode 100644 (file)
index 8d990b8..0000000
+++ /dev/null
@@ -1,87 +0,0 @@
-use crate::iter::merger::Mergeable;
-use std::cmp::Ordering;
-use std::sync::Arc;
-
-#[derive(Debug, Clone)]
-pub struct Entry {
-    key: Arc<Vec<u8>>,
-    value: Arc<Vec<u8>>,
-}
-
-impl Entry {
-    pub fn new(key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
-        Entry {
-            key: Arc::new(Vec::from(key.as_ref())),
-            value: Arc::new(Vec::from(value.as_ref())),
-        }
-    }
-
-    pub fn replace(&mut self, e: &Entry) {
-        let key = Arc::make_mut(&mut self.key);
-        let value = Arc::make_mut(&mut self.value);
-        key.clear();
-        key.extend_from_slice(e.key.as_slice());
-        value.clear();
-        value.extend_from_slice(e.value.as_slice());
-    }
-
-    pub fn clear(&mut self) {
-        Arc::make_mut(&mut self.key).clear();
-        Arc::make_mut(&mut self.value).clear();
-    }
-
-    pub fn key(&self) -> &[u8] {
-        self.key.as_slice()
-    }
-
-    pub fn key_mut(&mut self) -> &mut Vec<u8> {
-        Arc::make_mut(&mut self.key)
-    }
-
-    pub fn value(&self) -> &[u8] {
-        self.value.as_slice()
-    }
-
-    pub fn value_mut(&mut self) -> &mut Vec<u8> {
-        Arc::make_mut(&mut self.value)
-    }
-}
-
-impl Mergeable for Entry {}
-
-impl PartialOrd<[u8]> for Entry {
-    fn partial_cmp(&self, other: &[u8]) -> Option<Ordering> {
-        Some(self.key().cmp(other))
-    }
-}
-
-impl PartialEq<[u8]> for Entry {
-    fn eq(&self, other: &[u8]) -> bool {
-        self.key() == other
-    }
-}
-
-impl AsRef<[u8]> for Entry {
-    fn as_ref(&self) -> &[u8] {
-        self.key()
-    }
-}
-
-impl PartialEq for Entry {
-    fn eq(&self, other: &Entry) -> bool {
-        self.key() == other.key()
-    }
-}
-impl Eq for Entry {}
-
-impl PartialOrd for Entry {
-    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
-        Some(self.key().cmp(other.key()))
-    }
-}
-
-impl Ord for Entry {
-    fn cmp(&self, other: &Self) -> Ordering {
-        self.key().cmp(other.key())
-    }
-}
index 6e2ec74f6394c7d9dc22896721aeda379a339407..5d3b9a4394945abe250fb01c0bd6d0933815a957 100644 (file)
@@ -1,8 +1,8 @@
 #![allow(dead_code)]
 //use memmap::Mmap;
 
-use crate::merger::Merger;
 use crate::reader::FileReader;
+use crate::source::Merger;
 use crate::Result;
 use std::collections::HashMap;
 use std::path::{Path, PathBuf};
diff --git a/src/iter/dupmerge.rs b/src/iter/dupmerge.rs
deleted file mode 100644 (file)
index 2508a6a..0000000
+++ /dev/null
@@ -1,93 +0,0 @@
-use crate::iter::{IntoIter, Iter};
-
-pub struct MergeIter<I, F>
-where
-    I: Iter,
-    I::Item: PartialEq,
-    F: FnMut(&mut I::Item, &I::Item),
-{
-    prev: Option<I::Item>,
-    iter: I,
-    merge_func: F,
-}
-
-impl<I, F> MergeIter<I, F>
-where
-    I: Iter,
-    I::Item: PartialEq,
-    F: FnMut(&mut I::Item, &I::Item),
-{
-    pub fn new(iter: I, merge_func: F) -> Self {
-        Self {
-            prev: None,
-            iter,
-            merge_func,
-        }
-    }
-}
-
-impl<I, F> Iter for MergeIter<I, F>
-where
-    I: Iter,
-    I::Item: PartialEq,
-    F: FnMut(&mut I::Item, &I::Item),
-{
-    type Item = I::Item;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        let mut cur = self.prev.take().or_else(|| self.iter.next())?;
-        while let Some(e) = self.iter.next() {
-            if cur == e {
-                (self.merge_func)(&mut cur, &e);
-            } else {
-                self.prev.replace(e);
-                break;
-            }
-        }
-        Some(cur)
-    }
-
-    fn seek(&mut self, key: &[u8]) {
-        self.prev.take();
-        self.iter.seek(key);
-    }
-}
-
-impl<I, F> IntoIterator for MergeIter<I, F>
-where
-    I: Iter,
-    I::Item: PartialEq,
-    F: FnMut(&mut I::Item, &I::Item),
-{
-    type Item = I::Item;
-    type IntoIter = IntoIter<Self>;
-
-    fn into_iter(self) -> Self::IntoIter {
-        IntoIter(self)
-    }
-}
-
-#[test]
-fn test_merge_func() {
-    use crate::source::test_source::TestSource;
-    use crate::Entry;
-    use crate::Source;
-
-    let ts = TestSource(
-        (1u8..8)
-            .flat_map(|n| vec![n; n as usize])
-            .map(|n| Entry::new(vec![n], vec![1]))
-            .collect(),
-    );
-
-    let s = ts.dup_merge(|v, e| {
-        v.value_mut()[0] += e.value()[0];
-    });
-
-    assert_eq!(
-        Vec::from_iter(s.iter()),
-        (1u8..8)
-            .map(|n| Entry::new(vec![n], vec![n]))
-            .collect::<Vec<_>>()
-    )
-}
diff --git a/src/iter/dupsort.rs b/src/iter/dupsort.rs
deleted file mode 100644 (file)
index 07e067f..0000000
+++ /dev/null
@@ -1,99 +0,0 @@
-use crate::iter::{IntoIter, Iter};
-use std::cmp::Ordering;
-
-#[derive(Debug)]
-pub struct DupsortIter<I, F>
-where
-    I: Iter,
-    F: FnMut(&I::Item, &I::Item) -> Ordering,
-{
-    run: Vec<I::Item>,
-    next: Option<I::Item>,
-    iter: I,
-    dupsort_func: F,
-}
-
-impl<I, F> DupsortIter<I, F>
-where
-    I: Iter,
-    F: FnMut(&I::Item, &I::Item) -> Ordering,
-{
-    pub fn new(iter: I, dupsort_func: F) -> Self {
-        Self {
-            run: Vec::new(),
-            next: None,
-            iter,
-            dupsort_func,
-        }
-    }
-}
-
-impl<I, F> IntoIterator for DupsortIter<I, F>
-where
-    I: Iter,
-    I::Item: PartialEq,
-    F: Fn(&I::Item, &I::Item) -> Ordering,
-{
-    type Item = I::Item;
-    type IntoIter = IntoIter<Self>;
-    fn into_iter(self) -> Self::IntoIter {
-        IntoIter(self)
-    }
-}
-
-impl<I, F> Iter for DupsortIter<I, F>
-where
-    I: Iter,
-    I::Item: PartialEq,
-    F: Fn(&I::Item, &I::Item) -> Ordering,
-{
-    type Item = I::Item;
-
-    fn seek(&mut self, key: &[u8]) {
-        self.run.clear();
-        self.next.take();
-        self.iter.seek(key);
-    }
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.run.pop().or_else(|| {
-            self.run
-                .push(self.next.take().or_else(|| self.iter.next())?);
-
-            while let Some(e) = self.iter.next() {
-                if e == self.run[0] {
-                    self.run.push(e);
-                    continue;
-                }
-                self.next.replace(e);
-                break;
-            }
-            // sort in reverse order, so self.run.pop() can be the usual
-            // return value.
-            self.run.sort_unstable_by(|a, b| (self.dupsort_func)(b, a));
-            self.run.pop()
-        })
-    }
-}
-
-#[test]
-fn test_dupsort() {
-    use crate::source::test_source::TestSource;
-    use crate::Entry;
-    use crate::Source;
-
-    let ts = TestSource(
-        (1u8..10)
-            .flat_map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j])))
-            .collect(),
-    );
-
-    let s = ts.dupsort(|a, b| a.value()[0].cmp(&b.value()[0]));
-
-    assert_eq!(
-        Vec::from_iter(s.iter()),
-        (1u8..10)
-            .flat_map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j])))
-            .collect::<Vec<_>>()
-    );
-}
diff --git a/src/iter/filter.rs b/src/iter/filter.rs
deleted file mode 100644 (file)
index 4ce3bf6..0000000
+++ /dev/null
@@ -1,88 +0,0 @@
-use crate::iter::{IntoIter, Iter};
-
-pub struct FilterIter<I, F> {
-    iter: I,
-    filter: F,
-    seek_key: Vec<u8>,
-}
-
-impl<I, F> FilterIter<I, F>
-where
-    I: Iter,
-    F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
-{
-    pub fn new(iter: I, filter: F) -> Self {
-        let seek_key = Vec::new();
-        Self {
-            iter,
-            filter,
-            seek_key,
-        }
-    }
-}
-
-impl<I, F> Iter for FilterIter<I, F>
-where
-    I: Iter,
-    F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
-{
-    type Item = I::Item;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        loop {
-            let item = self.iter.next()?;
-            self.seek_key.clear();
-            if (self.filter)(&item, &mut self.seek_key) {
-                return Some(item);
-            }
-            if !self.seek_key.is_empty() {
-                self.iter.seek(self.seek_key.as_slice());
-            }
-        }
-    }
-
-    fn seek(&mut self, key: &[u8]) {
-        self.iter.seek(key);
-    }
-}
-
-impl<I, F> IntoIterator for FilterIter<I, F>
-where
-    I: Iter,
-    F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
-{
-    type Item = I::Item;
-    type IntoIter = IntoIter<Self>;
-    fn into_iter(self) -> Self::IntoIter {
-        IntoIter(self)
-    }
-}
-
-#[test]
-fn test_filter() {
-    use crate::source::test_source::TestSource;
-    use crate::source::Source;
-    use crate::Entry;
-    use std::iter::IntoIterator;
-
-    let ts =
-        TestSource((0u8..10).map(|n| Entry::new(vec![n], vec![])).collect()).filter(|e, sv| {
-            // pass only even values
-            if e.key()[0] % 2 != 0 {
-                return false;
-            } else if e.key()[0] == 4 {
-                // at key 4, seek to 8
-                sv.push(8);
-                return false;
-            }
-            true
-        });
-
-    assert_eq!(
-        vec![0, 2, 8],
-        ts.map(|e| e.key()[0])
-            .iter()
-            .into_iter()
-            .collect::<Vec<u8>>()
-    );
-}
diff --git a/src/iter/map.rs b/src/iter/map.rs
deleted file mode 100644 (file)
index e2d336e..0000000
+++ /dev/null
@@ -1,90 +0,0 @@
-use crate::iter::{IntoIter, Iter};
-
-pub struct MapIter<I, F, O>
-where
-    I: Iter,
-    F: FnMut(I::Item) -> O,
-{
-    iter: I,
-    mapf: F,
-}
-
-impl<I, F, O> MapIter<I, F, O>
-where
-    I: Iter,
-    F: FnMut(I::Item) -> O,
-{
-    pub fn new(iter: I, mapf: F) -> Self {
-        Self { iter, mapf }
-    }
-}
-
-impl<I, F, O> Iter for MapIter<I, F, O>
-where
-    I: Iter,
-    F: FnMut(I::Item) -> O,
-{
-    type Item = O;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        let item = self.iter.next()?;
-        Some((self.mapf)(item))
-    }
-
-    fn seek(&mut self, key: &[u8]) {
-        self.iter.seek(key);
-    }
-}
-
-impl<I, F, O> IntoIterator for MapIter<I, F, O>
-where
-    I: Iter,
-    F: FnMut(I::Item) -> O,
-{
-    type Item = O;
-    type IntoIter = IntoIter<Self>;
-
-    fn into_iter(self) -> Self::IntoIter {
-        IntoIter(self)
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use crate::source::test_source::TestSource;
-    use crate::{Entry, Iter, Source};
-
-    struct CompVec(Vec<u8>);
-    impl PartialEq<[u8]> for CompVec {
-        fn eq(&self, other: &[u8]) -> bool {
-            self.0.as_slice().eq(other)
-        }
-    }
-    impl PartialOrd<[u8]> for CompVec {
-        fn partial_cmp(&self, other: &[u8]) -> Option<std::cmp::Ordering> {
-            self.0.as_slice().partial_cmp(other)
-        }
-    }
-
-    #[test]
-    fn test_mapsource() {
-        let ts = TestSource((0u8..10).map(|i| Entry::new(vec![i], vec![])).collect())
-            .map(|e| CompVec(Vec::from(e.key())));
-        assert_eq!(
-            ts.iter().map(|cv| cv.0).into_iter().collect::<Vec<_>>(),
-            (0u8..10).map(|i| vec![i]).collect::<Vec<_>>()
-        );
-        assert_eq!(
-            ts.get_range(&[2u8], &[7u8])
-                .map(|cv| cv.0)
-                .into_iter()
-                .collect::<Vec<_>>(),
-            (2u8..8)
-                .map(|i| {
-                    println!("{i}");
-                    vec![i]
-                })
-                .collect::<Vec<_>>()
-        )
-    }
-}
diff --git a/src/iter/merger.rs b/src/iter/merger.rs
deleted file mode 100644 (file)
index aa8d4d9..0000000
+++ /dev/null
@@ -1,192 +0,0 @@
-use crate::iter::{IntoIter, Iter};
-use std::cmp::Ordering;
-use std::collections::BinaryHeap;
-
-pub trait Mergeable: Ord + PartialOrd<[u8]> + AsRef<[u8]> + Clone {}
-
-struct MergeEntry<I: Iter> {
-    e: I::Item,
-    it: I,
-}
-
-impl<I: Iter> PartialEq for MergeEntry<I>
-where
-    I::Item: PartialEq,
-{
-    fn eq(&self, other: &Self) -> bool {
-        self.e == other.e
-    }
-}
-
-impl<I: Iter> Eq for MergeEntry<I> where I::Item: PartialEq {}
-
-// Note: MergEntry Ord, PartialOrd is reversed to provide "min heap" semantics
-impl<I: Iter> PartialOrd for MergeEntry<I>
-where
-    I::Item: PartialOrd,
-{
-    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
-        other.e.partial_cmp(&self.e)
-    }
-}
-
-impl<I: Iter> Ord for MergeEntry<I>
-where
-    I::Item: Ord,
-{
-    fn cmp(&self, other: &Self) -> Ordering {
-        other.e.cmp(&self.e)
-    }
-}
-
-pub struct MergeIter<I: Iter> {
-    heap: BinaryHeap<MergeEntry<I>>,
-    finished: Vec<I>,
-    last_key: Vec<u8>,
-}
-
-impl<I> From<I> for MergeIter<I::Item>
-where
-    I: Iterator,
-    I::Item: Iter,
-    <I::Item as Iter>::Item: Mergeable,
-{
-    fn from(iters: I) -> Self {
-        let mut v: Vec<I::Item> = Vec::new();
-        let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() {
-            Some(e) => Some(MergeEntry { e, it }),
-            None => {
-                v.push(it);
-                None
-            }
-        }));
-        MergeIter {
-            finished: v,
-            heap: h,
-            last_key: Vec::new(),
-        }
-    }
-}
-
-impl<I: Iter> Iter for MergeIter<I>
-where
-    I::Item: Mergeable,
-{
-    type Item = I::Item;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        let cur;
-        {
-            let mut next = self.heap.peek_mut()?;
-
-            cur = next.e.clone();
-            self.last_key.clear();
-            self.last_key.extend(cur.as_ref());
-
-            if let Some(e) = next.it.next() {
-                next.e = e;
-                return Some(cur);
-            }
-        }
-        self.heap.pop();
-        Some(cur)
-    }
-
-    fn seek(&mut self, key: &[u8]) {
-        if key >= self.last_key.as_ref() {
-            loop {
-                match self.heap.peek_mut() {
-                    None => return,
-                    Some(mut head) => {
-                        if &head.e >= key {
-                            return;
-                        }
-                        head.it.seek(key);
-                        if let Some(e) = head.it.next() {
-                            head.e = e;
-                            continue;
-                        }
-                    }
-                }
-                self.heap.pop();
-            }
-        }
-
-        // backwards seek; reset heap
-        let mut finished: Vec<I> = Vec::new();
-        let mut heap_entries: Vec<MergeEntry<I>> = Vec::new();
-        for mut it in self
-            .heap
-            .drain()
-            .map(|me| me.it)
-            .chain(self.finished.drain(..))
-        {
-            it.seek(key);
-            match it.next() {
-                Some(e) => heap_entries.push(MergeEntry { e, it }),
-                None => finished.push(it),
-            }
-        }
-        self.heap = BinaryHeap::from_iter(heap_entries);
-        self.finished = finished;
-        self.last_key.clear();
-        self.last_key.extend(key);
-    }
-}
-
-impl<I: Iter> IntoIterator for MergeIter<I>
-where
-    I::Item: Mergeable,
-{
-    type Item = I::Item;
-    type IntoIter = IntoIter<Self>;
-    fn into_iter(self) -> Self::IntoIter {
-        IntoIter(self)
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use crate::source::test_source::TestSource;
-    use crate::Entry;
-    use crate::Merger;
-    use crate::{Iter, Source};
-
-    fn tnum(m: u8) -> Vec<u8> {
-        Vec::from_iter((1u8..255).filter(|i| i % m == 0))
-    }
-
-    fn test_source(m: u8) -> TestSource {
-        TestSource(Vec::from_iter(
-            tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])),
-        ))
-    }
-
-    #[test]
-    fn test_merge() {
-        let range = 1..8;
-        let iters: Vec<_> = range
-            .clone()
-            .map(test_source)
-            .map(|s| s.into_boxed())
-            .collect();
-        let s = Merger::from(iters);
-        let mut v = Vec::<u8>::new();
-        for i in range {
-            v.extend(tnum(i))
-        }
-        v.sort();
-        let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0]));
-        assert_eq!(v2, v);
-    }
-
-    #[test]
-    fn test_binheap() {
-        use std::collections::BinaryHeap;
-
-        let v: Vec<u8> = vec![1, 8, 2, 9, 4, 7, 3];
-        let vs = v.as_slice();
-        let h = BinaryHeap::from_iter(vs.iter().copied());
-        assert_ne!(h.into_vec(), v);
-    }
-}
diff --git a/src/iter/mod.rs b/src/iter/mod.rs
deleted file mode 100644 (file)
index af0338a..0000000
+++ /dev/null
@@ -1,105 +0,0 @@
-use std::cmp::Ordering;
-pub mod dupmerge;
-pub mod dupsort;
-pub mod filter;
-pub mod map;
-pub mod merger;
-pub mod prefix;
-pub mod range;
-
-use dupmerge::MergeIter;
-use dupsort::DupsortIter;
-use filter::FilterIter;
-use map::MapIter;
-
-pub trait Iter {
-    type Item;
-
-    fn seek(&mut self, key: &[u8]);
-
-    fn next(&mut self) -> Option<Self::Item>;
-
-    fn seek_to(mut self, key: &[u8]) -> Self
-    where
-        Self: Sized,
-    {
-        self.seek(key);
-        self
-    }
-
-    fn dup_merge<F>(self, merge_func: F) -> MergeIter<Self, F>
-    where
-        Self::Item: PartialEq + Clone,
-        F: FnMut(&mut Self::Item, &Self::Item),
-        Self: Sized,
-    {
-        MergeIter::new(self, merge_func)
-    }
-
-    fn dup_sort<F>(self, dupsort_func: F) -> DupsortIter<Self, F>
-    where
-        Self: Sized,
-        F: FnMut(&Self::Item, &Self::Item) -> Ordering,
-    {
-        DupsortIter::new(self, dupsort_func)
-    }
-
-    fn filter<F>(self, filter_func: F) -> FilterIter<Self, F>
-    where
-        F: FnMut(&Self::Item, &mut Vec<u8>) -> bool,
-        Self: Sized,
-    {
-        FilterIter::new(self, filter_func)
-    }
-
-    fn map<F, O>(self, map_func: F) -> MapIter<Self, F, O>
-    where
-        F: FnMut(Self::Item) -> O,
-        Self: Sized,
-    {
-        MapIter::new(self, map_func)
-    }
-}
-
-impl<I: Iter + ?Sized> Iter for Box<I> {
-    type Item = I::Item;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.as_mut().next()
-    }
-
-    fn seek(&mut self, key: &[u8]) {
-        self.as_mut().seek(key);
-    }
-}
-
-pub struct BoxedIter<'a, T>(pub Box<dyn Iter<Item = T> + 'a>);
-impl<'a, T> Iter for BoxedIter<'a, T> {
-    type Item = T;
-    fn next(&mut self) -> Option<Self::Item> {
-        self.0.as_mut().next()
-    }
-    fn seek(&mut self, key: &[u8]) {
-        self.0.as_mut().seek(key)
-    }
-}
-
-impl<'a, T> IntoIterator for BoxedIter<'a, T> {
-    type Item = T;
-    type IntoIter = IntoIter<Self>;
-    fn into_iter(self) -> Self::IntoIter {
-        IntoIter(self)
-    }
-}
-
-pub struct IntoIter<I: Iter>(pub I);
-
-impl<I> Iterator for IntoIter<I>
-where
-    I: Iter,
-{
-    type Item = I::Item;
-    fn next(&mut self) -> Option<Self::Item> {
-        self.0.next()
-    }
-}
diff --git a/src/iter/prefix.rs b/src/iter/prefix.rs
deleted file mode 100644 (file)
index 8deb321..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-use crate::iter::{IntoIter, Iter};
-
-pub struct PrefixIter<I>
-where
-    I: Iter,
-    I::Item: AsRef<[u8]>,
-{
-    iter: I,
-    prefix: Vec<u8>,
-}
-
-impl<I> PrefixIter<I>
-where
-    I: Iter,
-    I::Item: AsRef<[u8]>,
-{
-    pub fn new(mut iter: I, prefix: impl AsRef<[u8]>) -> Self {
-        iter.seek(prefix.as_ref());
-        Self {
-            iter,
-            prefix: Vec::from(prefix.as_ref()),
-        }
-    }
-}
-
-impl<I> Iter for PrefixIter<I>
-where
-    I: Iter,
-    I::Item: AsRef<[u8]>,
-{
-    type Item = I::Item;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.iter
-            .next()
-            .filter(|e| e.as_ref().starts_with(self.prefix.as_slice()))
-    }
-
-    fn seek(&mut self, key: &[u8]) {
-        self.iter.seek(key);
-    }
-}
-
-impl<I> IntoIterator for PrefixIter<I>
-where
-    I: Iter,
-    I::Item: AsRef<[u8]>,
-{
-    type Item = I::Item;
-    type IntoIter = IntoIter<Self>;
-
-    fn into_iter(self) -> Self::IntoIter {
-        IntoIter(self)
-    }
-}
diff --git a/src/iter/range.rs b/src/iter/range.rs
deleted file mode 100644 (file)
index 00d559f..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-use crate::iter::{IntoIter, Iter};
-
-pub struct RangeIter<I> {
-    iter: I,
-    start: Vec<u8>,
-    end: Vec<u8>,
-}
-
-impl<I> RangeIter<I>
-where
-    I: Iter,
-{
-    pub fn new(mut iter: I, start: impl AsRef<[u8]>, end: impl AsRef<[u8]>) -> Self {
-        iter.seek(start.as_ref());
-        Self {
-            iter,
-            start: Vec::from(start.as_ref()),
-            end: Vec::from(end.as_ref()),
-        }
-    }
-}
-
-impl<I> Iter for RangeIter<I>
-where
-    I: Iter,
-    I::Item: PartialOrd<[u8]>,
-{
-    type Item = I::Item;
-    fn next(&mut self) -> Option<Self::Item> {
-        self.iter.next().filter(|i| i <= self.end.as_slice())
-    }
-
-    fn seek(&mut self, key: &[u8]) {
-        if key <= self.start.as_slice() {
-            self.iter.seek(self.start.as_slice());
-        } else if key > self.end.as_slice() {
-            self.iter.seek(self.end.as_slice());
-        } else {
-            self.iter.seek(key);
-        }
-    }
-}
-
-impl<I> IntoIterator for RangeIter<I>
-where
-    I: Iter,
-    I::Item: PartialOrd<[u8]>,
-{
-    type Item = I::Item;
-    type IntoIter = IntoIter<Self>;
-    fn into_iter(self) -> Self::IntoIter {
-        IntoIter(self)
-    }
-}
index b093d52335c401e59da820383613bd654e3a7dc3..fdde721f7674ca3e2e584cc2a1c1d72fa21ecded 100644 (file)
@@ -1,24 +1,20 @@
 mod compression;
 mod cursor;
-mod entry;
-mod iter;
-mod merger;
+mod fileset;
+mod metadata;
 pub mod reader;
 pub mod sorter;
 pub mod source;
 mod writer;
 
 pub use compression::Compression;
-pub use entry::Entry;
+pub use cursor::Cursor;
 pub use fileset::Fileset;
-pub use iter::Iter;
-pub use merger::Merger;
 pub use reader::Reader;
+pub use sorter::Sorter;
+pub use source::Merger;
 pub use source::Source;
 pub use writer::Writer;
 
-mod fileset;
-mod metadata;
-
 type Error = Box<dyn std::error::Error>;
 type Result<T> = std::result::Result<T, Error>;
diff --git a/src/merger.rs b/src/merger.rs
deleted file mode 100644 (file)
index 4668b36..0000000
+++ /dev/null
@@ -1,99 +0,0 @@
-use crate::iter::{
-    merger::{MergeIter, Mergeable},
-    Iter,
-};
-use crate::Source;
-
-pub struct Merger<S> {
-    sources: Vec<S>,
-}
-
-impl<S> Default for Merger<S> {
-    fn default() -> Self {
-        Self {
-            sources: Vec::new(),
-        }
-    }
-}
-
-impl<S> Merger<S> {
-    pub fn new() -> Self {
-        Self {
-            sources: Vec::new(),
-        }
-    }
-
-    pub fn add(&mut self, source: S) {
-        self.sources.push(source);
-    }
-}
-
-impl<S, I> From<I> for Merger<S>
-where
-    I: IntoIterator<Item = S>,
-    S: Source,
-    S::Item: Mergeable,
-{
-    fn from(i: I) -> Self {
-        Merger {
-            sources: Vec::from_iter(i),
-        }
-    }
-}
-
-impl<S> Source for Merger<S>
-where
-    S: Source,
-    S::Item: Mergeable,
-{
-    type Item = S::Item;
-    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
-        MergeIter::from(self.sources.iter().map(|s| s.iter()))
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use super::Merger;
-    use crate::source::test_source::TestSource;
-    use crate::Entry;
-    use crate::{Iter, Source};
-
-    fn tnum(m: u8) -> Vec<u8> {
-        Vec::from_iter((1u8..255).filter(|i| i % m == 0))
-    }
-
-    fn test_source(m: u8) -> TestSource {
-        TestSource(Vec::from_iter(
-            tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])),
-        ))
-    }
-
-    #[test]
-    fn test_merge() {
-        let range = 1..8;
-        let iters: Vec<_> = range
-            .clone()
-            .map(test_source)
-            .map(|s| s.into_boxed())
-            .collect();
-        let s = Merger::from(iters);
-        let mut v = Vec::<u8>::new();
-        for i in range {
-            v.extend(tnum(i))
-        }
-        v.sort();
-        let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0]));
-        assert_eq!(v2, v);
-    }
-
-    #[test]
-    fn test_binheap() {
-        use std::collections::BinaryHeap;
-
-        let v: Vec<u8> = vec![1, 8, 2, 9, 4, 7, 3];
-        let vs = v.as_slice();
-        let h = BinaryHeap::from_iter(vs.iter().copied());
-        assert_ne!(h.into_vec(), v);
-    }
-}
index e152af24dba7f4c07e2507bf7a1c23b9241b98c0..9f4c044ab1caff6e17f88e05817568913a5627fc 100644 (file)
@@ -1,7 +1,8 @@
-use crate::iter::{IntoIter, Iter};
-use crate::{Entry, Result};
+use crate::cursor::Cursor;
+use crate::Result;
 use integer_encoding::VarInt;
 use std::mem::size_of;
+use std::sync::Arc;
 
 #[derive(Debug)]
 enum RestartType {
@@ -81,53 +82,19 @@ impl<D: AsRef<[u8]>> Block<D> {
     }
 }
 
-impl<D: AsRef<[u8]>> From<Block<D>> for BlockIter<D> {
-    fn from(b: Block<D>) -> BlockIter<D> {
-        BlockIter {
-            block: b,
-            cur_ent: None,
-            off: 0,
-        }
-    }
-}
-
-impl<D: AsRef<[u8]>> IntoIterator for Block<D> {
-    type Item = Entry;
-    type IntoIter = IntoIter<BlockIter<D>>;
-
-    fn into_iter(self) -> Self::IntoIter {
-        IntoIter(BlockIter {
-            block: self,
-            cur_ent: None,
-            off: 0,
-        })
-    }
-}
-
 #[derive(Debug)]
-pub(crate) struct BlockIter<D: AsRef<[u8]>> {
+pub(crate) struct BlockCursor<D: AsRef<[u8]>> {
     block: Block<D>,
-    cur_ent: Option<Entry>,
+    cur_key: Option<Arc<Vec<u8>>>,
     pub(super) off: usize,
+    pub(super) len_val: usize,
 }
 
-fn get_bytes(b: &[u8], n: usize) -> Result<&[u8]> {
-    if n > b.len() {
-        return Err("too long".into());
-    }
-    Ok(&b[0..n])
-}
-
-impl<D: AsRef<[u8]>> BlockIter<D> {
-    fn seek_restart(&mut self, ridx: usize) -> Option<&[u8]> {
-        self.off = self.block.restart(ridx).ok()?;
-        self.decode_restart_key()
-    }
-
+impl<D: AsRef<[u8]>> BlockCursor<D> {
     fn bsearch_restart(&mut self, key: &[u8], mut left: usize, mut right: usize) {
         while left < right {
             let mid = (left + right).div_ceil(2);
-            self.seek_restart(mid)
+            self.get_restart_key(mid)
                 .map(|rk| {
                     if rk < key {
                         left = mid;
@@ -140,64 +107,89 @@ impl<D: AsRef<[u8]>> BlockIter<D> {
                     None
                 });
         }
-        self.seek_restart(left);
+        if let Ok(off) = self.block.restart(left) {
+            self.off = off;
+            self.decode();
+        }
     }
 
-    fn decode_restart_key(&self) -> Option<&[u8]> {
-        let mut idx = self.off;
+    // returns key at restart `ridx` and its offset, or None on error.
+    fn get_restart_key(&self, ridx: usize) -> Option<&[u8]> {
         let data = self.block.data.as_ref();
+        let mut idx = self.block.restart(ridx).ok()?;
 
         let (shared_key, len) = usize::decode_var(&data[idx..])?;
         debug_assert!(shared_key == 0);
         idx += len;
         let (unshared_key, len) = usize::decode_var(&data[idx..])?;
         idx += len;
+        debug_assert!(unshared_key > 0);
         let (_len_val, len) = usize::decode_var(&data[idx..])?;
         idx += len;
         Some(&data[idx..idx + unshared_key])
     }
 
-    fn decode(&mut self) -> Option<&Entry> {
-        let mut idx = self.off;
-        if idx >= self.block.restart_off {
-            self.cur_ent.take();
+    fn decode(&mut self) -> Option<&Arc<Vec<u8>>> {
+        if self.off >= self.block.restart_off {
             return None;
         }
 
         let data = self.block.data.as_ref();
-        let entry = self.cur_ent.get_or_insert(Entry::new([], []));
-
-        let (shared_key, len) = usize::decode_var(&data[idx..])?;
-        idx += len;
-        let (unshared_key, len) = usize::decode_var(&data[idx..])?;
-        idx += len;
-        let (len_val, len) = usize::decode_var(&data[idx..])?;
-        idx += len;
 
-        if shared_key > entry.key().len() {
-            // return Err("shared_key too long".into());
-            return None;
+        let (shared_key, len) = usize::decode_var(data.get(self.off..)?)?;
+        self.off += len;
+        let (unshared_key, len) = usize::decode_var(data.get(self.off..)?)?;
+        self.off += len;
+        let (len_val, len) = usize::decode_var(data.get(self.off..)?)?;
+        self.off += len;
+        debug_assert!(shared_key + unshared_key > 0);
+
+        match self.cur_key {
+            None => {
+                if shared_key != 0 {
+                    return None;
+                }
+                self.cur_key.replace(Arc::new(Vec::from(
+                    data.get(self.off..self.off + unshared_key)?,
+                )));
+            }
+            Some(ref mut ak) => {
+                let key = Arc::make_mut(ak);
+                key.truncate(shared_key);
+                key.extend_from_slice(data.get(self.off..self.off + unshared_key)?);
+            }
         }
+        self.off += unshared_key + len_val;
+        self.len_val = len_val;
+        self.cur_key.as_ref()
+    }
+}
 
-        let key = entry.key_mut();
-        key.truncate(shared_key);
-        key.extend_from_slice(get_bytes(&data[idx..], unshared_key).ok()?);
-        idx += unshared_key;
-
-        let val = entry.value_mut();
-        val.clear();
-        val.extend_from_slice(get_bytes(&data[idx..], len_val).ok()?);
-        idx += len_val;
-        self.off = idx;
-        self.cur_ent.as_ref()
+impl<D: AsRef<[u8]>> From<Block<D>> for BlockCursor<D> {
+    fn from(block: Block<D>) -> Self {
+        Self {
+            block,
+            cur_key: None,
+            off: 0,
+            len_val: 0,
+        }
     }
 }
 
-impl<D: AsRef<[u8]>> Iter for BlockIter<D> {
-    type Item = Entry;
+impl<D: AsRef<[u8]>> Cursor for BlockCursor<D> {
+    type Value = [u8];
 
-    fn next(&mut self) -> Option<Self::Item> {
-        self.decode().cloned()
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        let key = self.cur_key.as_ref()?.as_slice();
+        let data = self.block.data.as_ref();
+        let val = data.get(self.off - self.len_val..self.off)?;
+        Some((key, val))
+    }
+
+    fn advance(&mut self) {
+        if self.decode().is_none() {
+            self.cur_key.take();
+        }
     }
 
     fn seek(&mut self, key: &[u8]) {
@@ -206,12 +198,10 @@ impl<D: AsRef<[u8]>> Iter for BlockIter<D> {
             self.bsearch_restart(key, 0, self.block.restart_count - 1);
         }
         loop {
-            let poff = self.off;
             match self.decode() {
                 None => break,
-                Some(e) => {
-                    if e.key() >= key {
-                        self.off = poff;
+                Some(k) => {
+                    if k.as_slice() >= key {
                         return;
                     }
                 }
@@ -220,13 +210,40 @@ impl<D: AsRef<[u8]>> Iter for BlockIter<D> {
     }
 }
 
+pub struct BlockIter<D: AsRef<[u8]>> {
+    cur: BlockCursor<D>,
+}
+
+impl<D: AsRef<[u8]>> Iterator for BlockIter<D> {
+    type Item = (Arc<Vec<u8>>, Vec<u8>);
+    fn next(&mut self) -> Option<Self::Item> {
+        let key = self.cur.cur_key.as_ref().map(Arc::clone)?;
+        let res = self.cur.get().map(|(_, v)| (key, Vec::from(v)));
+        if res.is_some() {
+            self.cur.advance();
+        }
+        res
+    }
+}
+
+impl<D: AsRef<[u8]>> IntoIterator for BlockCursor<D> {
+    type Item = (Arc<Vec<u8>>, Vec<u8>);
+    type IntoIter = BlockIter<D>;
+    fn into_iter(mut self) -> Self::IntoIter {
+        if self.get().is_none() {
+            self.advance();
+        }
+        Self::IntoIter { cur: self }
+    }
+}
+
 #[cfg(test)]
 mod test {
 
-    use crate::reader::block::{Block, BlockIter};
+    use crate::cursor::Cursor;
+    use crate::reader::block::{Block, BlockCursor};
     use crate::writer::block_builder::BlockBuilder;
-    use crate::Entry;
-    use crate::Iter;
+    use std::sync::Arc;
 
     fn build_block(n: u32, skip: u32, r: usize) -> Block<Vec<u8>> {
         let mut bb = BlockBuilder::default();
@@ -242,24 +259,25 @@ mod test {
         Block::new(v).unwrap()
     }
 
-    fn build_ref(n: u32, skip: u32) -> Vec<Entry> {
-        Vec::from_iter(
-            (0..n)
-                .map(|i| i * skip)
-                .map(|i| Entry::new(u32::to_be_bytes(i), u32::to_be_bytes(i * 1024))),
-        )
+    fn build_ref(n: u32, skip: u32) -> Vec<(Arc<Vec<u8>>, Vec<u8>)> {
+        Vec::from_iter((0..n).map(|i| i * skip).map(|i| {
+            (
+                Arc::new(Vec::from(u32::to_be_bytes(i))),
+                u32::to_be_bytes(i * 1024).into(),
+            )
+        }))
     }
 
     #[test]
     fn test_block_iter() {
         let n = 40;
         let b = build_block(n, 1, 10);
-        let bi = b.into_iter();
+        let bi = BlockCursor::from(b).into_iter();
         assert_eq!(
-            bi.map(|e| Vec::from(e.key())).collect::<Vec<_>>(),
+            bi.map(|(k, _)| Vec::from(k.as_slice())).collect::<Vec<_>>(),
             build_ref(n, 1)
                 .into_iter()
-                .map(|e| Vec::from(e.key()))
+                .map(|(k, _)| Vec::from(k.as_slice()))
                 .collect::<Vec<_>>()
         );
     }
@@ -268,10 +286,10 @@ mod test {
     fn test_block_seek() {
         let n = 40;
         let b = build_block(n, 10, 10);
-        let mut bi = BlockIter::from(b);
+        let mut bi = BlockCursor::from(b);
         bi.seek(&u32::to_be_bytes(40));
-        assert_eq!(bi.next().unwrap().key(), &u32::to_be_bytes(40));
+        assert_eq!(bi.get().unwrap().0, &u32::to_be_bytes(40));
         bi.seek(&u32::to_be_bytes(32));
-        assert_eq!(bi.next().unwrap().key(), &u32::to_be_bytes(40));
+        assert_eq!(bi.get().unwrap().0, &u32::to_be_bytes(40));
     }
 }
index f749f223a7dfd659fa2fdca2b4cb71ea2f7f67c5..8471191da753f4b5d0f8d43facb05d3966c0d046 100644 (file)
@@ -1,12 +1,11 @@
+use crate::cursor::{Cursor, KVIter};
 use crate::metadata::Metadata;
 use crate::Source;
-use crate::{iter::IntoIter, Entry, Iter};
 use integer_encoding::VarInt;
 pub(crate) mod block;
 use crate::compression::CBuf;
 use memmap::{Mmap, MmapOptions};
 use std::fs::File;
-use std::io::Cursor;
 use std::path::Path;
 use std::sync::Arc;
 
@@ -21,7 +20,7 @@ pub(crate) struct DataSlice<D: AsRef<[u8]>> {
 impl<D: AsRef<[u8]>> Clone for DataSlice<D> {
     fn clone(&self) -> Self {
         Self {
-            data: self.data.clone(),
+            data: Arc::clone(&self.data),
             off: self.off,
             len: self.len,
         }
@@ -88,7 +87,7 @@ pub fn from_file(fname: impl AsRef<Path>) -> Reader<Mmap> {
 
 impl<D: AsRef<[u8]>> Reader<D> {
     pub fn new(d: D) -> Self {
-        let cur = Cursor::new(&d.as_ref()[d.as_ref().len() - 512..]);
+        let cur = std::io::Cursor::new(&d.as_ref()[d.as_ref().len() - 512..]);
         let metadata = Metadata::read_from(cur).expect("bad meta");
         Self {
             data: DataSlice::new(d),
@@ -96,7 +95,7 @@ impl<D: AsRef<[u8]>> Reader<D> {
         }
     }
 
-    fn index_iter(&self) -> block::BlockIter<CBuf<DataSlice<D>>> {
+    fn index_iter(&self) -> block::BlockCursor<CBuf<DataSlice<D>>> {
         let mut off = self.metadata.index_block_offset;
         let d = &self.data.as_ref()[off..];
         let (size, len_size) = usize::decode_var(d).unwrap();
@@ -109,85 +108,114 @@ impl<D: AsRef<[u8]>> Reader<D> {
     }
 }
 
-pub struct ReaderIter<D: AsRef<[u8]>> {
+pub struct ReaderCursor<D: AsRef<[u8]>> {
     reader: Reader<D>,
     next_offset: usize,
-    index_iter: block::BlockIter<CBuf<DataSlice<D>>>,
-    data_iter: Option<block::BlockIter<CBuf<DataSlice<D>>>>,
+    index_iter: block::BlockCursor<CBuf<DataSlice<D>>>,
+    data_iter: Option<block::BlockCursor<CBuf<DataSlice<D>>>>,
 }
 
-impl<D: AsRef<[u8]>> ReaderIter<D> {
-    fn next_block(&mut self) -> Option<()> {
-        if self.next_offset >= self.reader.metadata.index_block_offset {
-            return None;
-        }
-        let (size, len_size) = usize::decode_var(&self.reader.data.as_ref()[self.next_offset..])
-            .expect("bad block size");
-        let crc_off = self.next_offset + len_size;
-        // TODO: read crc, optionally verify
-        let data_off = crc_off + std::mem::size_of::<u32>();
+impl<D: AsRef<[u8]>> ReaderCursor<D> {
+    fn get_block(&self, mut off: usize) -> Option<(block::BlockCursor<CBuf<DataSlice<D>>>, usize)> {
+        let (size, len_size) = usize::decode_var(self.reader.data.as_ref().get(off..)?)?;
+        // TODO: read, verify CRC
+        off += len_size + std::mem::size_of::<u32>();
         let comp = self.reader.metadata.compression_algorithm;
-        self.next_offset = data_off + size;
-        self.data_iter.replace(
-            block::Block::new(comp.uncompress(self.reader.data.clone_range(data_off, size))?)
-                .expect("bad block")
-                .into(),
-        );
-        Some(())
+        let data = comp
+            .uncompress(self.reader.data.clone_range(off, size))
+            .ok()?;
+        let block = block::Block::new(data).ok()?;
+        Some((block.into(), off + size))
     }
 }
 
-impl<D: AsRef<[u8]>> Iter for ReaderIter<D> {
-    type Item = Entry;
+impl<D: AsRef<[u8]>> Cursor for ReaderCursor<D> {
+    type Value = [u8];
+
+    fn get(&self) -> Option<(&[u8], &Self::Value)> {
+        match self.data_iter {
+            Some(ref cur) => cur.get(),
+            None => None,
+        }
+    }
 
-    fn next(&mut self) -> Option<Self::Item> {
-        if self.data_iter.is_none() {
-            self.next_block()
-                .and_then(|_| self.data_iter.as_mut().unwrap().next())
-        } else {
-            match self.data_iter.as_mut().unwrap().next() {
-                Some(e) => Some(e),
+    fn advance(&mut self) {
+        loop {
+            match &mut self.data_iter {
                 None => {
-                    self.next_block()?;
-                    self.data_iter.as_mut().unwrap().next()
+                    if self.next_offset >= self.reader.metadata.index_block_offset {
+                        return;
+                    }
+                    match self.get_block(self.next_offset) {
+                        None => return,
+                        Some((cur, next_off)) => {
+                            self.data_iter.replace(cur);
+                            self.next_offset = next_off;
+                        }
+                    }
+                }
+                Some(cur) => {
+                    cur.advance();
+                    match cur.get() {
+                        Some(_) => return,
+                        None => {
+                            self.data_iter.take();
+                        }
+                    }
                 }
             }
         }
     }
 
     fn seek(&mut self, key: &[u8]) {
-        // TODO: detect and skip unneeded seek in iter.
-        self.index_iter.seek(key);
+        if let Some(cur) = self.data_iter.as_mut() {
+            if let Some((cur_key, _)) = cur.get() {
+                if key >= cur_key {
+                    cur.seek(key);
+                    if cur.get().is_some() {
+                        return;
+                    }
+                }
+            }
+        }
 
-        self.index_iter
-            .next()
-            .and_then(|e| {
-                self.next_offset = usize::decode_var(e.value())?.0;
-                self.next_block().map(|_| {
-                    self.data_iter.as_mut().unwrap().seek(key);
-                })
-            })
-            .or_else(|| {
-                self.next_offset = usize::MAX;
-                self.data_iter.take();
-                None
-            });
+        self.index_iter.seek(key);
+        self.data_iter.take();
+        if let Some((_, o)) = self.index_iter.get() {
+            if let Some((off, _)) = usize::decode_var(o) {
+                if let Some((mut cur, next)) = self.get_block(off) {
+                    cur.seek(key);
+                    self.data_iter.replace(cur);
+                    self.next_offset = next;
+                }
+            }
+        }
     }
 }
 
+/*
 impl<D: AsRef<[u8]>> IntoIterator for ReaderIter<D> {
-    type Item = Entry;
+    type Item = crate::Result<Entry>;
     type IntoIter = IntoIter<Self>;
     fn into_iter(self) -> Self::IntoIter {
         IntoIter(self)
     }
 }
+*/
+
+impl<D: AsRef<[u8]>> IntoIterator for ReaderCursor<D> {
+    type Item = (Vec<u8>, Vec<u8>);
+    type IntoIter = KVIter<Self>;
+    fn into_iter(self) -> Self::IntoIter {
+        ReaderCursor::to_iter(self)
+    }
+}
 
-impl<D: AsRef<[u8]>> Source for Reader<D> {
-    type Item = Entry;
+impl<'a, D: AsRef<[u8]>> Source<'a> for Reader<D> {
+    type Cur = ReaderCursor<D>;
 
-    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
-        ReaderIter {
+    fn iter(&'a self) -> ReaderCursor<D> {
+        ReaderCursor {
             reader: self.clone(),
             next_offset: 0,
             index_iter: self.index_iter(),
index 4bed5ab2b2d68b0fd27e99db6d48e55b3c095629..e88cf052002fec345047917058db906b4943500d 100644 (file)
@@ -1,10 +1,12 @@
-use crate::source::DynSource;
-use crate::{Entry, Iter, Merger, Reader, Source, Writer};
+use crate::{
+    cursor::{Cursor, VecCursor},
+    Merger, Reader, Source, Writer,
+};
 use memmap::Mmap;
 use std::cell::Cell;
 
-pub struct Sorter<F: Fn(&mut Entry, &Entry)> {
-    batch: Cell<Vec<Entry>>,
+pub struct Sorter<F: Fn(&[u8], &[u8], &mut Vec<u8>)> {
+    batch: Cell<Vec<(Vec<u8>, Vec<u8>)>>,
     batch_size: usize,
     max_size: usize,
     merge_func: F,
@@ -14,7 +16,7 @@ pub struct Sorter<F: Fn(&mut Entry, &Entry)> {
 
 impl<F> Sorter<F>
 where
-    F: Fn(&mut Entry, &Entry) + 'static,
+    F: Fn(&[u8], &[u8], &mut Vec<u8>) + 'static,
 {
     pub fn new(max_size: usize, merge_func: F) -> Self {
         Self {
@@ -27,44 +29,28 @@ where
         }
     }
 
-    pub fn add(&mut self, e: Entry) {
-        let esize = e.key().len() + e.value().len();
+    pub fn add(&mut self, key: &[u8], val: &[u8]) {
+        let esize = key.len() + val.len();
         if esize + self.batch_size > self.max_size {
             self.write_chunk();
         }
-        self.batch.get_mut().push(e);
+        self.batch.get_mut().push((Vec::from(key), Vec::from(val)));
         self.batch_size += esize;
     }
 
-    pub fn source(mut self) -> Box<dyn DynSource<Item = Entry>> {
-        if !self.batch.get_mut().is_empty() {
-            self.write_chunk();
-        }
+    pub fn write<W: std::io::Write>(self, mut w: Writer<W>) {
         Merger::from(self.readers)
             .dup_merge(self.merge_func)
-            .into_boxed()
-    }
-
-    pub fn write<W: std::io::Write>(self, mut w: Writer<W>) {
-        self.source()
-            .as_ref() // XXX - need to further wrap Box<dyn DynSource> in  BoxedSource
             .iter()
             .into_iter()
-            .for_each(|e| w.add(e).unwrap());
+            .for_each(|(k, v)| w.add(k.as_slice(), v.as_slice()).unwrap());
     }
 
     fn write_chunk(&mut self) {
         let mut w = Writer::new(Vec::new());
-        self.batch
-            .get_mut()
-            .sort_unstable_by(|a, b| a.key().cmp(b.key()));
-        self.batch
-            .take()
-            .iter()
-            .dup_merge(&self.merge_func)
-            .into_iter()
-            .for_each(|e| {
-                w.add(e).unwrap();
-            });
+        self.batch.get_mut().sort_unstable_by(|a, b| a.0.cmp(&b.0));
+        for (k, v) in VecCursor::from(self.batch.take()).dup_merge(&self.merge_func) {
+            w.add(k.as_slice(), v.as_slice()).unwrap();
+        }
     }
 }
index ac34be16812df611c333d437d60f4aec9b5ed65c..b5b22ecd77aa6bc7dff01c7a29c068498cf078eb 100644 (file)
@@ -1,72 +1,55 @@
-use crate::{Iter, Source};
-use std::cmp::Ordering;
+use crate::cursor::filter::Filter;
+use crate::cursor::map::Map;
+use crate::cursor::merge::Merge;
+use crate::{cursor::Cursor, Source};
 
-pub struct DupmergeSource<S, F> {
+pub struct MergeSource<S, F> {
     pub(super) source: S,
     pub(super) merge: F,
 }
 
-impl<S, F> Source for DupmergeSource<S, F>
+impl<'a, S, F> Source<'a> for MergeSource<S, F>
 where
-    S: Source,
-    S::Item: PartialEq + Clone,
-    F: Fn(&mut S::Item, &S::Item),
+    S: Source<'a>,
+    <S::Cur as Cursor>::Value: ToOwned,
+    F: Fn(&[u8], &<S::Cur as Cursor>::Value, &mut <<S::Cur as Cursor>::Value as ToOwned>::Owned)
+        + 'a,
 {
-    type Item = S::Item;
-    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
+    type Cur = Merge<S::Cur, &'a F>;
+    fn iter(&'a self) -> Merge<S::Cur, &'a F> {
         self.source.iter().dup_merge(&self.merge)
     }
 }
 
-pub struct DupsortSource<S, F> {
-    pub(super) source: S,
-    pub(super) dupsort: F,
-}
-
-impl<S, F> Source for DupsortSource<S, F>
-where
-    S: Source,
-    S::Item: PartialEq,
-    F: Fn(&S::Item, &S::Item) -> Ordering,
-{
-    type Item = S::Item;
-    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
-        self.source.iter().dup_sort(&self.dupsort)
-    }
-}
-
 pub struct FilterSource<S, F> {
     pub(super) source: S,
     pub(super) filter: F,
 }
 
-impl<S, F> Source for FilterSource<S, F>
+impl<'a, S, F> Source<'a> for FilterSource<S, F>
 where
-    S: Source,
-    F: Fn(&S::Item, &mut Vec<u8>) -> bool,
+    S: Source<'a>,
+    <S::Cur as Cursor>::Value: Clone + std::fmt::Debug,
+    F: Fn((&[u8], &<S::Cur as Cursor>::Value), &mut Vec<u8>) -> bool + 'a,
 {
-    type Item = S::Item;
-    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
+    type Cur = Filter<S::Cur, &'a F>;
+    fn iter(&'a self) -> Self::Cur {
         self.source.iter().filter(&self.filter)
     }
 }
 
-pub struct MapSource<S, F, O>
-where
-    S: Source,
-    F: Fn(S::Item) -> O,
-{
+pub struct MapSource<S, F> {
     pub(super) source: S,
     pub(super) map: F,
 }
 
-impl<S, F, O> Source for MapSource<S, F, O>
+impl<'a, S, F, O> Source<'a> for MapSource<S, F>
 where
-    S: Source,
-    F: Fn(S::Item) -> O,
+    S: Source<'a>,
+    F: Fn((&[u8], &<S::Cur as Cursor>::Value)) -> O + 'a,
 {
-    type Item = O;
-    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
+    type Cur = Map<S::Cur, &'a F, O>;
+    fn iter(&'a self) -> Self::Cur {
         self.source.iter().map(&self.map)
     }
 }
diff --git a/src/source/merger.rs b/src/source/merger.rs
new file mode 100644 (file)
index 0000000..094251b
--- /dev/null
@@ -0,0 +1,29 @@
+use super::Source;
+use crate::cursor::{merger::KeyOrd, merger::MergeCursor, Cursor};
+
+pub struct Merger<S> {
+    sources: Vec<S>,
+}
+
+impl<I, S> From<I> for Merger<S>
+where
+    I: IntoIterator<Item = S>,
+{
+    fn from(iter: I) -> Self {
+        Self {
+            sources: Vec::from_iter(iter),
+        }
+    }
+}
+
+impl<'a, S> Source<'a> for Merger<S>
+where
+    S: Source<'a>,
+    //S::Cur: Ord,
+    <S::Cur as Cursor>::Value: ToOwned,
+{
+    type Cur = MergeCursor<KeyOrd<S::Cur>>;
+    fn iter(&'a self) -> Self::Cur {
+        MergeCursor::from(self.sources.iter().map(|s| KeyOrd(s.iter())))
+    }
+}
index 253dda6ed29de4af2b4303bce4d9d10be2f22ae8..c5c1ef7d95f6f889dbc52a2896bd563ebc0a2aa3 100644 (file)
@@ -1,65 +1,49 @@
-use crate::iter::{prefix::PrefixIter, range::RangeIter, BoxedIter, IntoIter, Iter};
-use crate::Entry;
-use std::cmp::Ordering;
+use crate::cursor::{range::PrefixCursor, range::RangeCursor, Cursor};
 
 mod adapters;
-pub use adapters::DupmergeSource;
-pub use adapters::DupsortSource;
 pub use adapters::FilterSource;
 pub use adapters::MapSource;
+pub use adapters::MergeSource;
+mod merger;
+pub use merger::Merger;
 
-pub trait Source {
-    type Item;
+pub trait Source<'a> {
+    type Cur: Cursor + IntoIterator;
 
-    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item>;
+    fn iter(&'a self) -> Self::Cur;
 
-    fn get(&self, key: &[u8]) -> RangeIter<impl Iter<Item = Self::Item>>
-    where
-        Self::Item: PartialOrd<[u8]>,
-    {
-        RangeIter::new(self.iter(), key, key)
+    fn get(&'a self, key: &[u8]) -> RangeCursor<Self::Cur> {
+        RangeCursor::new(self.iter(), key, key)
     }
 
-    fn get_prefix(&self, prefix: &[u8]) -> PrefixIter<impl Iter<Item = Self::Item>>
-    where
-        Self::Item: AsRef<[u8]>,
-    {
-        PrefixIter::new(self.iter(), prefix)
+    fn get_prefix(&'a self, prefix: &[u8]) -> PrefixCursor<Self::Cur> {
+        PrefixCursor::new(self.iter(), prefix)
     }
 
-    fn get_range(&self, start: &[u8], end: &[u8]) -> RangeIter<impl Iter<Item = Self::Item>>
-    where
-        Self::Item: PartialOrd<[u8]>,
-    {
-        RangeIter::new(self.iter(), start, end)
+    fn get_range(&'a self, start: &[u8], end: &[u8]) -> RangeCursor<Self::Cur> {
+        RangeCursor::new(self.iter(), start, end)
     }
 
-    fn dup_merge<F>(self, merge: F) -> DupmergeSource<Self, F>
+    fn dup_merge<F>(self, merge: F) -> MergeSource<Self, F>
     where
         Self: Sized,
-        F: Fn(&mut Entry, &Entry) + 'static,
-    {
-        DupmergeSource {
+        <Self::Cur as Cursor>::Value: ToOwned,
+        F: Fn(
+                &[u8],
+                &<Self::Cur as Cursor>::Value,
+                &mut <<Self::Cur as Cursor>::Value as ToOwned>::Owned,
+            ) + 'static,
+    {
+        MergeSource {
             source: self,
             merge,
         }
     }
 
-    fn dupsort<F>(self, dupsort: F) -> DupsortSource<Self, F>
-    where
-        Self: Sized,
-        F: Fn(&Self::Item, &Self::Item) -> Ordering + 'static,
-    {
-        DupsortSource {
-            source: self,
-            dupsort,
-        }
-    }
-
     fn filter<F>(self, filter: F) -> FilterSource<Self, F>
     where
         Self: Sized,
-        F: Fn(&Self::Item, &mut Vec<u8>) -> bool + 'static,
+        F: Fn(&<Self::Cur as Cursor>::Value, &mut Vec<u8>) -> bool + 'static,
     {
         FilterSource {
             source: self,
@@ -67,189 +51,11 @@ pub trait Source {
         }
     }
 
-    fn map<F, O>(self, map: F) -> MapSource<Self, F, O>
+    fn map<F, O>(self, map: F) -> MapSource<Self, F>
     where
         Self: Sized,
-        F: Fn(Self::Item) -> O + 'static,
+        F: Fn((&[u8], &<Self::Cur as Cursor>::Value)) -> O + 'static,
     {
         MapSource { source: self, map }
     }
-
-    fn into_boxed(self) -> Box<dyn DynSource<Item = Self::Item>>
-    where
-        Self: Sized + 'static,
-    {
-        // Inner Box satisfied DynSource, outer Box required for an owned DynSource trait object.
-        Box::new(Box::new(self))
-    }
-}
-
-// A dyn-compatible variant of Source generating boxed SeekableIters. Necessary to create
-// heterogeneous collections of sources.
-pub trait DynSource {
-    type Item;
-
-    fn iter(&self) -> BoxedIter<'_, Self::Item>;
-}
-
-impl<S: Source + ?Sized> DynSource for Box<S> {
-    type Item = S::Item;
-
-    fn iter(&self) -> BoxedIter<'_, Self::Item> {
-        BoxedIter(Box::new(self.as_ref().iter()))
-    }
-}
-
-impl<D: DynSource + ?Sized> Source for D {
-    type Item = D::Item;
-    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
-        DynSource::iter(self)
-    }
-}
-
-pub struct VecIter<'a> {
-    index: usize,
-    vec: &'a Vec<Entry>,
-}
-
-impl Iter for VecIter<'_> {
-    type Item = Entry;
-    fn next(&mut self) -> Option<Self::Item> {
-        if self.index > self.vec.len() {
-            None
-        } else {
-            let e = self.vec[self.index].clone();
-            self.index += 1;
-            Some(e)
-        }
-    }
-
-    fn seek(&mut self, key: &[u8]) {
-        let mut left = 0;
-        let mut right = self.vec.len() - 1;
-        while left < right {
-            let mid = (left + right).div_ceil(2);
-            if self.vec[mid].key() < key {
-                left = mid;
-            } else {
-                right = mid - 1;
-            }
-        }
-        self.index = left;
-    }
-}
-
-impl<'a> IntoIterator for VecIter<'a> {
-    type Item = Entry;
-    type IntoIter = IntoIter<Self>;
-    fn into_iter(self) -> Self::IntoIter {
-        IntoIter(self)
-    }
-}
-
-impl Source for Vec<Entry> {
-    type Item = Entry;
-    fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
-        VecIter {
-            index: 0,
-            vec: self,
-        }
-    }
-}
-
-pub mod test_source {
-    use crate::Entry;
-    use crate::Iter;
-    use crate::Source;
-
-    pub struct TestSource(pub Vec<Entry>);
-
-    pub struct TestIter<'a> {
-        source: &'a TestSource,
-        off: usize,
-    }
-
-    impl Iter for TestIter<'_> {
-        type Item = Entry;
-
-        fn next(&mut self) -> Option<Self::Item> {
-            let off = self.off;
-            if off >= self.source.0.len() {
-                return None;
-            }
-            let item = &self.source.0[off];
-            self.off += 1;
-            Some(item.clone())
-        }
-
-        fn seek(&mut self, key: &[u8]) {
-            self.off = 0;
-            while self.off < self.source.0.len() && self.source.0[self.off].key() < key {
-                self.off += 1;
-            }
-        }
-    }
-
-    impl IntoIterator for TestIter<'_> {
-        type Item = Entry;
-        type IntoIter = crate::iter::IntoIter<Self>;
-        fn into_iter(self) -> Self::IntoIter {
-            crate::iter::IntoIter(self)
-        }
-    }
-
-    impl Source for TestSource {
-        type Item = Entry;
-        fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
-            TestIter {
-                source: self,
-                off: 0,
-            }
-        }
-    }
-}
-
-pub mod test {
-    use super::test_source::TestSource;
-    #[allow(unused_imports)]
-    use super::Source;
-    #[allow(unused_imports)]
-    use crate::iter::Iter;
-    use crate::Entry;
-
-    #[allow(dead_code)]
-    fn test_source() -> TestSource {
-        TestSource(vec![
-            Entry::new(vec![0, 0, 0, 0], vec![0]),
-            Entry::new(vec![0, 0, 0, 1], vec![1]),
-            Entry::new(vec![0, 0, 1, 0], vec![2]),
-            Entry::new(vec![0, 1, 0, 0], vec![3]),
-            Entry::new(vec![1, 0, 0, 0], vec![4]),
-        ])
-    }
-
-    #[test]
-    fn test_source_iter() {
-        let s = &test_source();
-
-        assert_eq!(
-            Vec::from_iter(s.iter().map(|e| e.value()[0])),
-            vec![0, 1, 2, 3, 4]
-        );
-        assert_eq!(
-            Vec::from_iter(s.get(vec![0, 0, 1, 0].as_slice()).map(|e| e.value()[0])),
-            vec![2]
-        );
-        assert_eq!(
-            Vec::from_iter(s.get_prefix(vec![0, 0].as_slice()).map(|e| e.value()[0])),
-            vec![0, 1, 2]
-        );
-        assert_eq!(
-            Vec::from_iter(
-                s.get_range(vec![0, 0, 0, 1].as_slice(), vec![0, 1, 0, 0].as_slice())
-                    .map(|e| e.value()[0])
-            ),
-            vec![1, 2, 3]
-        );
-    }
 }
index da6743f542bf91343a79534816c3d0d8d53f0fcc..0742fb3096c08775e962cfa292e54fd0c45c090a 100644 (file)
@@ -105,15 +105,16 @@ impl BlockBuilder {
 #[cfg(test)]
 mod test {
     use super::BlockBuilder;
-    use crate::reader::block::Block;
+    use crate::reader::block::{Block, BlockCursor};
     use crate::reader::DataSlice;
+    use std::sync::Arc;
 
     #[test]
     fn test_block_builder() {
         let mut bb = BlockBuilder::default();
         let v = Vec::from_iter((0..16).map(|i| {
             (
-                Vec::from(u32::to_be_bytes(i).as_slice()),
+                Arc::new(Vec::from(u32::to_be_bytes(i).as_slice())),
                 Vec::from(u32::to_be_bytes(i * 2).as_slice()),
             )
         }));
@@ -121,10 +122,9 @@ mod test {
         let block_len = bb.len();
         let block_data = bb.as_slice();
         assert_eq!(block_data.len(), block_len);
-        let bi = Block::new(DataSlice::new(bb.as_slice()))
-            .unwrap()
-            .into_iter();
-        let vcmp = Vec::from_iter(bi.map(|e| (Vec::from(e.key()), Vec::from(e.value()))));
+        let bc = BlockCursor::from(Block::new(DataSlice::new(bb.as_slice())).unwrap());
+        let bi = bc.into_iter();
+        let vcmp = Vec::from_iter(bi);
         assert_eq!(v, vcmp);
     }
 }
index e231e9a31a83f059890f2529d96a2ad7cac43f6d..a4ce210d9fa6f50b6c2ee3e0e0dca5417fcd65de 100644 (file)
@@ -1,5 +1,5 @@
 use crate::compression::Compression;
-use crate::{Entry, Result};
+use crate::Result;
 use crc32c::crc32c;
 use integer_encoding::{FixedIntWriter, VarInt, VarIntWriter};
 pub(crate) mod block_builder;
@@ -110,16 +110,16 @@ impl<W: std::io::Write> Writer<W> {
         }
     }
 
-    pub fn add(&mut self, e: Entry) -> Result<()> {
-        let est = e.key().len() + e.value().len() + 15;
+    pub fn add(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
+        let est = key.len() + val.len() + 15;
         if self.block.len() + est >= self.blocksize {
-            bytesep(&mut self.last_key, e.key());
+            bytesep(&mut self.last_key, key);
             self.write_block()?;
         }
-        self.meta.add_entry(e.key().len(), e.value().len());
-        self.block.add(e.key(), e.value());
+        self.meta.add_entry(key.len(), val.len());
+        self.block.add(key, val);
         self.last_key.clear();
-        self.last_key.extend_from_slice(e.key());
+        self.last_key.extend_from_slice(key);
         Ok(())
     }
 
@@ -186,17 +186,16 @@ impl<W: std::io::Write> Drop for Writer<W> {
 #[cfg(test)]
 mod test {
     use super::Writer;
-    use crate::Entry;
 
     #[test]
     fn test_writer() {
         let mut out = Vec::<u8>::new();
         {
             let mut w = Writer::new(&mut out);
-            w.add(Entry::new(vec![0], vec![1])).unwrap();
-            w.add(Entry::new(vec![0, 0], vec![1])).unwrap();
-            w.add(Entry::new(vec![0, 1], vec![1])).unwrap();
-            w.add(Entry::new(vec![1, 1], vec![1])).unwrap();
+            w.add(vec![0].as_ref(), vec![1].as_ref()).unwrap();
+            w.add(vec![0, 0].as_ref(), vec![1].as_ref()).unwrap();
+            w.add(vec![0, 1].as_ref(), vec![1].as_ref()).unwrap();
+            w.add(vec![1, 1].as_ref(), vec![1].as_ref()).unwrap();
             // drops w
         }
         assert!(out.len() > 512);
index d2c6328b9808afcb65654e38f69a38e6cd1b474f..32084eac17a246d3e6d056f9da9e0a127fdf3b60 100644 (file)
@@ -1,22 +1,29 @@
-use mtbl::{Entry, Reader, Source, Writer};
+use mtbl::{Reader, Source, Writer};
 
 #[test]
 fn test_write_readback() {
     let mut store = Vec::<u8>::new();
-    let mut reference = Vec::<Entry>::new();
+    let mut reference = Vec::<(Vec<u8>, Vec<u8>)>::new();
     {
+        println!("writer starting");
         let mut w = Writer::new(&mut store).blocksize(256);
         for i in 1..1024 {
-            let e = Entry::new(u32::to_be_bytes(i), u32::to_be_bytes(i * 1024));
-            w.add(e.clone()).expect("add failed");
+            let e = (
+                Vec::from(u32::to_be_bytes(i)),
+                Vec::from(u32::to_be_bytes(i * 1024)),
+            );
+            w.add(e.0.as_ref(), e.1.as_ref()).expect("add failed");
             reference.push(e);
         }
+        println!("writer finished");
     }
 
     assert!(store.len() > 512);
+    println!("reader start");
     let r = Reader::new(store);
     let ri = r.iter();
     assert_eq!(ri.into_iter().collect::<Vec<_>>(), reference);
+    println!("reader finish");
 
     // test range
     let start = u32::to_be_bytes(192);
@@ -26,7 +33,8 @@ fn test_write_readback() {
         rangei.into_iter().collect::<Vec<_>>(),
         reference
             .into_iter()
-            .filter(|e| e.key() >= &u32::to_be_bytes(192) && e.key() <= &u32::to_be_bytes(256))
+            .filter(|(k, _)| k.as_slice() >= &u32::to_be_bytes(192)
+                && k.as_slice() <= &u32::to_be_bytes(256))
             .collect::<Vec<_>>()
     )
 }