]> git.mikk.net Git - mtbl-rs/commitdiff
Refactor reader code around Arc<impl AsRef<[u8]>>
authorChris Mikkelson <cmikk@fsi.io>
Wed, 7 Aug 2024 04:50:22 +0000 (22:50 -0600)
committerChris Mikkelson <cmikk@fsi.io>
Wed, 7 Aug 2024 04:50:22 +0000 (22:50 -0600)
API currently named 'DataSlice' provides convenient views of
ranges / slices of the underlying buffer, avoiding lifetime
entaglements.

src/entry.rs
src/reader/block.rs
src/reader/mod.rs
src/writer/block_builder.rs

index 5a4cf4af63ce5d2a53628505b0d3f67163d458fd..06d7707dc17736a81a8735282708bc224fbcafcc 100644 (file)
@@ -22,4 +22,9 @@ impl Entry {
         value.clear();
         value.extend_from_slice(e.value.as_slice());
     }
+
+    pub fn clear(&mut self) {
+        Arc::make_mut(&mut self.key).clear();
+        Arc::make_mut(&mut self.value).clear();
+    }
 }
index d1f1cc7e91273fe1c038fef218462725facfd8a9..9859047d786a5452450bd97693ea0a51b4a39739 100644 (file)
@@ -1,34 +1,34 @@
+use crate::reader::DataSlice;
 use crate::{Entry, Iter, Result};
+use integer_encoding::VarInt;
 use std::mem::size_of;
 use std::sync::Arc;
 
 #[derive(Debug)]
-enum Restarts<'b> {
-    Restart32(&'b [u8]),
-    Restart64(&'b [u8]),
+enum RestartType {
+    U32,
+    U64,
 }
 
-impl<'b> Restarts<'b> {
-    fn decode(&self, ridx: usize) -> Result<usize> {
+impl RestartType {
+    fn decode(&self, restarts: &[u8], idx: usize) -> Result<usize> {
         match self {
-            Restarts::Restart32(r) => {
-                let len = size_of::<u32>();
-                let start = ridx * len;
-                let end = start + len;
-                if end > r.len() {
-                    Err("todo-32".into())
+            RestartType::U32 => {
+                if idx * size_of::<u32>() > restarts.len() {
+                    Err("restart bound check".into())
                 } else {
-                    Ok(u32::from_be_bytes(r[start..end].try_into()?) as usize)
+                    let start = idx * size_of::<u32>();
+                    let end = start + size_of::<u32>();
+                    Ok(u32::from_be_bytes(restarts[start..end].try_into()?) as usize)
                 }
             }
-            Restarts::Restart64(r) => {
-                let len = size_of::<u64>();
-                let start = ridx * len;
-                let end = start + len;
-                if end > r.len() {
-                    Err("todo-64".into())
+            RestartType::U64 => {
+                if idx * size_of::<u64>() > restarts.len() {
+                    Err("restart bound check".into())
                 } else {
-                    Ok(u64::from_be_bytes(r[start..end].try_into()?) as usize)
+                    let start = idx * size_of::<u64>();
+                    let end = start + size_of::<u64>();
+                    Ok(u64::from_be_bytes(restarts[start..end].try_into()?) as usize)
                 }
             }
         }
@@ -36,58 +36,66 @@ impl<'b> Restarts<'b> {
 }
 
 #[derive(Debug)]
-pub(crate) struct Block<'b> {
-    data: &'b [u8],
-    restarts: Restarts<'b>,
+pub(crate) struct Block<D: AsRef<[u8]>> {
+    data: DataSlice<D>,
+    restart_off: usize,
+    restart_type: RestartType,
 }
 
-impl<'b> Block<'b> {
-    pub(crate) fn new(data: &'b [u8]) -> Result<Self> {
-        if data.len() < size_of::<u32>() {
+impl<D: AsRef<[u8]>> Block<D> {
+    pub(crate) fn new(data: DataSlice<D>) -> Result<Self> {
+        if data.as_ref().len() < size_of::<u32>() {
             return Err("block data too short".into());
         }
-        let rc_off = data.len() - size_of::<u32>();
+        let rc_off = data.as_ref().len() - size_of::<u32>();
 
-        let nrestarts = u32::from_be_bytes(data[rc_off..].try_into()?) as usize;
+        let nrestarts = u32::from_be_bytes(data.as_ref()[rc_off..].try_into()?) as usize;
 
         // try 32-bit restarts
         if (nrestarts * size_of::<u32>()) > rc_off {
             return Err("block data too short 2".into());
         }
-        let mut r_off = rc_off - (nrestarts * size_of::<u32>());
+        let r_off = rc_off - (nrestarts * size_of::<u32>());
+        let restart_type: RestartType;
+        let restart_off: usize;
 
-        let restarts = if r_off <= u32::MAX as usize {
-            Restarts::Restart32(&data[r_off..rc_off])
+        if r_off <= u32::MAX as usize {
+            restart_type = RestartType::U32;
+            restart_off = r_off;
         } else {
             // try 64-bit restarts
-            if (nrestarts * size_of::<u64>()) > rc_off {
-                return Err("block data too short 3".into());
-            }
-            r_off = rc_off - (nrestarts * size_of::<u64>());
-            Restarts::Restart64(&data[r_off..rc_off])
+            restart_type = RestartType::U64;
+            restart_off = rc_off - nrestarts * size_of::<u64>();
         };
-        println!("Block::new() -- r_off = {}", r_off);
         Ok(Block {
-            data: &data[..r_off],
-            restarts,
+            data,
+            restart_type,
+            restart_off,
         })
     }
 
-    pub(crate) fn iter(&self) -> BlockIter<'_> {
-        BlockIter {
+    pub(crate) fn restart(&self, idx: usize) -> Result<usize> {
+        self.restart_type
+            .decode(&self.data.as_ref()[self.restart_off..], idx)
+    }
+}
+impl<D: AsRef<[u8]>> IntoIterator for Block<D> {
+    type Item = Entry;
+    type IntoIter = BlockIter<D>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        Self::IntoIter {
             block: self,
-            cur_key: Arc::new(Vec::new()),
-            cur_val: Arc::new(Vec::new()),
+            cur_ent: None,
             off: 0,
         }
     }
 }
 
 #[derive(Debug)]
-pub(crate) struct BlockIter<'b> {
-    block: &'b Block<'b>,
-    cur_key: Arc<Vec<u8>>,
-    cur_val: Arc<Vec<u8>>,
+pub(crate) struct BlockIter<D: AsRef<[u8]>> {
+    block: Block<D>,
+    cur_ent: Option<Entry>,
     off: usize,
 }
 
@@ -113,60 +121,64 @@ fn decode_varint(b: &[u8]) -> Result<(usize, usize)> {
     Err("varint-2".into())
 }
 
-impl<'b> BlockIter<'b> {
-    fn seek_restart(&mut self, ridx: usize) -> Result<()> {
-        self.off = self.block.restarts.decode(ridx)?;
-        let key = Arc::make_mut(&mut self.cur_key);
-        let val = Arc::make_mut(&mut self.cur_val);
-        key.clear();
-        val.clear();
-        self.decode()?;
-        Ok(())
+impl<D: AsRef<[u8]>> BlockIter<D> {
+    fn seek_restart(&mut self, ridx: usize) -> Option<()> {
+        self.off = self.block.restart(ridx).ok()?;
+        self.decode()
     }
 
-    fn decode(&mut self) -> Result<()> {
+    fn decode(&mut self) -> Option<()> {
         let mut idx = self.off;
-        let (shared_key, len) = decode_varint(&self.block.data[idx..])?;
+        if idx >= self.block.restart_off {
+            self.cur_ent.take();
+            return None;
+        }
+
+        let data = self.block.data.as_ref();
+        let entry = self.cur_ent.get_or_insert(Entry {
+            key: Arc::new(Vec::new()),
+            value: Arc::new(Vec::new()),
+        });
+
+        let (shared_key, len) = usize::decode_var(&data[idx..])?;
         idx += len;
-        let (unshared_key, len) = decode_varint(&self.block.data[idx..])?;
+        let (unshared_key, len) = usize::decode_var(&data[idx..])?;
         idx += len;
-        let (len_val, len) = decode_varint(&self.block.data[idx..])?;
+        let (len_val, len) = usize::decode_var(&data[idx..])?;
         idx += len;
 
-        if shared_key > self.cur_key.len() {
-            return Err("shared_key too long".into());
+        if shared_key > entry.key.len() {
+            // return Err("shared_key too long".into());
+            return None;
         }
 
-        let key = Arc::make_mut(&mut self.cur_key);
+        let key = Arc::make_mut(&mut entry.key);
         key.truncate(shared_key);
-        key.extend_from_slice(get_bytes(&self.block.data[idx..], unshared_key)?);
+        key.extend_from_slice(get_bytes(&data[idx..], unshared_key).ok()?);
         idx += unshared_key;
 
-        let val = Arc::make_mut(&mut self.cur_val);
+        let val = Arc::make_mut(&mut entry.value);
         val.clear();
-        val.extend_from_slice(get_bytes(&self.block.data[idx..], len_val)?);
+        val.extend_from_slice(get_bytes(&data[idx..], len_val).ok()?);
         idx += len_val;
         self.off = idx;
-        Ok(())
+        Some(())
     }
 }
 
-impl<'b> Iterator for BlockIter<'b> {
+impl<D: AsRef<[u8]>> Iterator for BlockIter<D> {
     type Item = Entry;
 
     fn next(&mut self) -> Option<Self::Item> {
-        println!("next: {:?}", self);
-        if self.off >= self.block.data.len() {
-            return None;
+        if self.cur_ent.is_none() {
+            self.decode()?;
         }
-        self.decode().ok()?;
-        Some(Entry {
-            key: self.cur_key.clone(),
-            value: self.cur_val.clone(),
-        })
+        let res = self.cur_ent.clone();
+        _ = self.decode();
+        res
     }
 }
 
-impl<'b> Iter for BlockIter<'b> {
+impl<D: AsRef<[u8]>> Iter for BlockIter<D> {
     fn seek(&mut self, _key: &[u8]) {}
 }
index 2496992e4dea7bd25b0e8ef8b1da7ad1d74f1df3..75200980b87f751736d1a59d4ff1feb0b8f7a517 100644 (file)
 use crate::metadata::Metadata;
 use crate::Entry;
-use crate::Result;
+use crate::Iter;
 use integer_encoding::VarInt;
 pub(crate) mod block;
+use std::sync::Arc;
 
-struct Reader<'r> {
-    data: &'r [u8],
-    index: block::Block<'r>,
+// interface for a range
+#[derive(Debug)]
+pub(crate) struct DataSlice<D: AsRef<[u8]>> {
+    data: Arc<D>,
+    off: usize,
+    len: usize,
+}
+
+impl<D: AsRef<[u8]>> AsRef<[u8]> for DataSlice<D> {
+    fn as_ref(&self) -> &[u8] {
+        &((*self.data).as_ref())[self.off..self.off + self.len]
+    }
+}
+
+impl<D: AsRef<[u8]>> DataSlice<D> {
+    pub(crate) fn new(data: D) -> Self {
+        let len = data.as_ref().len();
+        Self {
+            data: Arc::new(data),
+            off: 0,
+            len,
+        }
+    }
+
+    fn clone_range(&self, off: usize, len: usize) -> Self {
+        assert!(self.off + off + len <= self.len);
+        Self {
+            data: self.data.clone(),
+            off: self.off + off,
+            len,
+        }
+    }
+
+    fn len(&self) -> usize {
+        self.len
+    }
+}
+
+#[test]
+fn test_data_slice() {
+    let v: Vec<u8> = vec![1u8, 2, 3];
+    let ds = DataSlice::new(v.clone());
+    assert_eq!(ds.as_ref(), v.as_slice());
+    let cr = ds.clone_range(1, 2);
+    assert_eq!(cr.as_ref(), vec![2u8, 3]);
+}
+
+struct Reader<D: AsRef<[u8]>> {
+    data: DataSlice<D>,
     metadata: Metadata,
 }
 
-struct ReaderIter<'r> {
-    r: &'r Reader<'r>,
-    index_iter: block::BlockIter<'r>,
-    cur_data: Option<block::Block<'r>>,
+impl<D: AsRef<[u8]>> Reader<D> {
+    fn index_iter(&self) -> block::BlockIter<D> {
+        block::Block::new(self.data.clone_range(
+            self.metadata.index_block_offset,
+            self.metadata.bytes_index_block,
+        ))
+        .expect("bad block")
+        .into_iter()
+    }
+}
+
+struct ReaderIter<'r, D: AsRef<[u8]>> {
+    reader: &'r Reader<D>,
     next_offset: usize,
-    data_iter: Option<block::BlockIter<'r>>,
+    index_iter: block::BlockIter<D>,
+    data_iter: Option<block::BlockIter<D>>,
 }
 
-impl<'r> Reader<'r> {
-    pub fn iter(&'r self) -> ReaderIter<'r> {
-        let index_iter = self.index.iter();
+impl<D: AsRef<[u8]>> Reader<D> {
+    pub fn iter(&self) -> ReaderIter<'_, D> {
         ReaderIter {
-            r: self,
-            index_iter,
+            reader: self,
             next_offset: 0,
-            cur_data: None,
+            index_iter: self.index_iter(),
             data_iter: None,
         }
     }
 }
 
-impl<'r> ReaderIter<'r> {
-    fn load_block(&'r mut self) -> Result<()> {
-        let (size, len_size) = usize::decode_var(&self.r.data[self.next_offset..]).ok_or("")?;
+impl<'r, D: AsRef<[u8]>> ReaderIter<'r, D> {
+    fn next_block(&mut self) -> Option<()> {
+        if self.next_offset >= self.reader.data.len() {
+            return None;
+        }
+        let (size, len_size) = usize::decode_var(&self.reader.data.as_ref()[self.next_offset..])?;
         let crc_off = self.next_offset + len_size;
-        let (_crc, len_crc) = u32::decode_var(&self.r.data[crc_off..]).ok_or("")?;
+        let (_crc, len_crc) = u32::decode_var(&self.reader.data.as_ref()[crc_off..])?;
         let data_off = crc_off + len_crc;
         self.next_offset += data_off + size;
-        self.cur_data
-            .replace(block::Block::new(&self.r.data[data_off..data_off + size])?);
-        for b in self.cur_data.iter() {
-            self.data_iter.replace(b.iter());
-        }
-        Ok(())
+        self.data_iter.replace(
+            block::Block::new(self.reader.data.clone_range(data_off, size))
+                .ok()?
+                .into_iter(),
+        );
+        Some(())
     }
 }
 
-impl<'r> Iterator for &'r mut ReaderIter<'r> {
+impl<'r, D: AsRef<[u8]>> Iterator for ReaderIter<'r, D> {
     type Item = Entry;
     fn next(&mut self) -> Option<Self::Item> {
-        if self.cur_data.is_none() {
-            self.load_block().ok()?;
-        }
-        if let Some(e) = self.data_iter.as_mut().unwrap().next() {
-            Some(e)
-        } else if self.next_offset == self.r.data.len() {
-            None
-        } else {
-            self.load_block().ok()?;
+        if self.data_iter.is_none() {
+            self.next_block();
             self.data_iter.as_mut().unwrap().next()
+        } else {
+            match self.data_iter.as_mut().unwrap().next() {
+                Some(e) => Some(e),
+                None => {
+                    self.next_block()?;
+                    self.data_iter.as_mut().unwrap().next()
+                }
+            }
         }
     }
 }
+
+impl<'r, D: AsRef<[u8]>> Iter for ReaderIter<'r, D> {
+    fn seek(&mut self, key: &[u8]) {
+        // TODO: detect and skip unneeded seek in iter.
+        self.index_iter.seek(key);
+        self.index_iter.next().and_then(|e| {
+            self.next_offset = usize::decode_var(e.value.as_slice())?.0;
+            self.next_block().and_then(|_| {
+                self.data_iter.as_mut().unwrap().seek(key);
+                Some(())
+            })
+        });
+    }
+}
index 3541fec5a98ff431a5290f43a3e7d8f21a6076c0..2f6c0efe6eb2ab136850f1ece1688dd47f2cffce 100644 (file)
@@ -118,6 +118,7 @@ impl BlockBuilder {
 mod test {
     use super::BlockBuilder;
     use crate::reader::block::Block;
+    use crate::reader::DataSlice;
 
     #[test]
     fn test_block_builder() {
@@ -133,11 +134,11 @@ mod test {
         let block_data = bb.as_slice();
         assert_eq!(block_data.len(), block_len);
         println!("Block: {:?}", block_data);
-        let block = Block::new(bb.as_slice()).unwrap();
+        let bi = Block::new(DataSlice::new(bb.as_slice()))
+            .unwrap()
+            .into_iter();
         let vcmp = Vec::from_iter(
-            block
-                .iter()
-                .map(|e| (Vec::from(e.key.as_slice()), Vec::from(e.value.as_slice()))),
+            bi.map(|e| (Vec::from(e.key.as_slice()), Vec::from(e.value.as_slice()))),
         );
         assert_eq!(v, vcmp);
     }