From: Chris Mikkelson Date: Wed, 7 Aug 2024 04:50:22 +0000 (-0600) Subject: Refactor reader code around Arc> X-Git-Url: https://git.mikk.net/?a=commitdiff_plain;h=990741d706ecf246465bd5541d306ba458f9a629;p=mtbl-rs Refactor reader code around Arc> API currently named 'DataSlice' provides convenient views of ranges / slices of the underlying buffer, avoiding lifetime entaglements. --- diff --git a/src/entry.rs b/src/entry.rs index 5a4cf4a..06d7707 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -22,4 +22,9 @@ impl Entry { value.clear(); value.extend_from_slice(e.value.as_slice()); } + + pub fn clear(&mut self) { + Arc::make_mut(&mut self.key).clear(); + Arc::make_mut(&mut self.value).clear(); + } } diff --git a/src/reader/block.rs b/src/reader/block.rs index d1f1cc7..9859047 100644 --- a/src/reader/block.rs +++ b/src/reader/block.rs @@ -1,34 +1,34 @@ +use crate::reader::DataSlice; use crate::{Entry, Iter, Result}; +use integer_encoding::VarInt; use std::mem::size_of; use std::sync::Arc; #[derive(Debug)] -enum Restarts<'b> { - Restart32(&'b [u8]), - Restart64(&'b [u8]), +enum RestartType { + U32, + U64, } -impl<'b> Restarts<'b> { - fn decode(&self, ridx: usize) -> Result { +impl RestartType { + fn decode(&self, restarts: &[u8], idx: usize) -> Result { match self { - Restarts::Restart32(r) => { - let len = size_of::(); - let start = ridx * len; - let end = start + len; - if end > r.len() { - Err("todo-32".into()) + RestartType::U32 => { + if idx * size_of::() > restarts.len() { + Err("restart bound check".into()) } else { - Ok(u32::from_be_bytes(r[start..end].try_into()?) as usize) + let start = idx * size_of::(); + let end = start + size_of::(); + Ok(u32::from_be_bytes(restarts[start..end].try_into()?) as usize) } } - Restarts::Restart64(r) => { - let len = size_of::(); - let start = ridx * len; - let end = start + len; - if end > r.len() { - Err("todo-64".into()) + RestartType::U64 => { + if idx * size_of::() > restarts.len() { + Err("restart bound check".into()) } else { - Ok(u64::from_be_bytes(r[start..end].try_into()?) as usize) + let start = idx * size_of::(); + let end = start + size_of::(); + Ok(u64::from_be_bytes(restarts[start..end].try_into()?) as usize) } } } @@ -36,58 +36,66 @@ impl<'b> Restarts<'b> { } #[derive(Debug)] -pub(crate) struct Block<'b> { - data: &'b [u8], - restarts: Restarts<'b>, +pub(crate) struct Block> { + data: DataSlice, + restart_off: usize, + restart_type: RestartType, } -impl<'b> Block<'b> { - pub(crate) fn new(data: &'b [u8]) -> Result { - if data.len() < size_of::() { +impl> Block { + pub(crate) fn new(data: DataSlice) -> Result { + if data.as_ref().len() < size_of::() { return Err("block data too short".into()); } - let rc_off = data.len() - size_of::(); + let rc_off = data.as_ref().len() - size_of::(); - let nrestarts = u32::from_be_bytes(data[rc_off..].try_into()?) as usize; + let nrestarts = u32::from_be_bytes(data.as_ref()[rc_off..].try_into()?) as usize; // try 32-bit restarts if (nrestarts * size_of::()) > rc_off { return Err("block data too short 2".into()); } - let mut r_off = rc_off - (nrestarts * size_of::()); + let r_off = rc_off - (nrestarts * size_of::()); + let restart_type: RestartType; + let restart_off: usize; - let restarts = if r_off <= u32::MAX as usize { - Restarts::Restart32(&data[r_off..rc_off]) + if r_off <= u32::MAX as usize { + restart_type = RestartType::U32; + restart_off = r_off; } else { // try 64-bit restarts - if (nrestarts * size_of::()) > rc_off { - return Err("block data too short 3".into()); - } - r_off = rc_off - (nrestarts * size_of::()); - Restarts::Restart64(&data[r_off..rc_off]) + restart_type = RestartType::U64; + restart_off = rc_off - nrestarts * size_of::(); }; - println!("Block::new() -- r_off = {}", r_off); Ok(Block { - data: &data[..r_off], - restarts, + data, + restart_type, + restart_off, }) } - pub(crate) fn iter(&self) -> BlockIter<'_> { - BlockIter { + pub(crate) fn restart(&self, idx: usize) -> Result { + self.restart_type + .decode(&self.data.as_ref()[self.restart_off..], idx) + } +} +impl> IntoIterator for Block { + type Item = Entry; + type IntoIter = BlockIter; + + fn into_iter(self) -> Self::IntoIter { + Self::IntoIter { block: self, - cur_key: Arc::new(Vec::new()), - cur_val: Arc::new(Vec::new()), + cur_ent: None, off: 0, } } } #[derive(Debug)] -pub(crate) struct BlockIter<'b> { - block: &'b Block<'b>, - cur_key: Arc>, - cur_val: Arc>, +pub(crate) struct BlockIter> { + block: Block, + cur_ent: Option, off: usize, } @@ -113,60 +121,64 @@ fn decode_varint(b: &[u8]) -> Result<(usize, usize)> { Err("varint-2".into()) } -impl<'b> BlockIter<'b> { - fn seek_restart(&mut self, ridx: usize) -> Result<()> { - self.off = self.block.restarts.decode(ridx)?; - let key = Arc::make_mut(&mut self.cur_key); - let val = Arc::make_mut(&mut self.cur_val); - key.clear(); - val.clear(); - self.decode()?; - Ok(()) +impl> BlockIter { + fn seek_restart(&mut self, ridx: usize) -> Option<()> { + self.off = self.block.restart(ridx).ok()?; + self.decode() } - fn decode(&mut self) -> Result<()> { + fn decode(&mut self) -> Option<()> { let mut idx = self.off; - let (shared_key, len) = decode_varint(&self.block.data[idx..])?; + if idx >= self.block.restart_off { + self.cur_ent.take(); + return None; + } + + let data = self.block.data.as_ref(); + let entry = self.cur_ent.get_or_insert(Entry { + key: Arc::new(Vec::new()), + value: Arc::new(Vec::new()), + }); + + let (shared_key, len) = usize::decode_var(&data[idx..])?; idx += len; - let (unshared_key, len) = decode_varint(&self.block.data[idx..])?; + let (unshared_key, len) = usize::decode_var(&data[idx..])?; idx += len; - let (len_val, len) = decode_varint(&self.block.data[idx..])?; + let (len_val, len) = usize::decode_var(&data[idx..])?; idx += len; - if shared_key > self.cur_key.len() { - return Err("shared_key too long".into()); + if shared_key > entry.key.len() { + // return Err("shared_key too long".into()); + return None; } - let key = Arc::make_mut(&mut self.cur_key); + let key = Arc::make_mut(&mut entry.key); key.truncate(shared_key); - key.extend_from_slice(get_bytes(&self.block.data[idx..], unshared_key)?); + key.extend_from_slice(get_bytes(&data[idx..], unshared_key).ok()?); idx += unshared_key; - let val = Arc::make_mut(&mut self.cur_val); + let val = Arc::make_mut(&mut entry.value); val.clear(); - val.extend_from_slice(get_bytes(&self.block.data[idx..], len_val)?); + val.extend_from_slice(get_bytes(&data[idx..], len_val).ok()?); idx += len_val; self.off = idx; - Ok(()) + Some(()) } } -impl<'b> Iterator for BlockIter<'b> { +impl> Iterator for BlockIter { type Item = Entry; fn next(&mut self) -> Option { - println!("next: {:?}", self); - if self.off >= self.block.data.len() { - return None; + if self.cur_ent.is_none() { + self.decode()?; } - self.decode().ok()?; - Some(Entry { - key: self.cur_key.clone(), - value: self.cur_val.clone(), - }) + let res = self.cur_ent.clone(); + _ = self.decode(); + res } } -impl<'b> Iter for BlockIter<'b> { +impl> Iter for BlockIter { fn seek(&mut self, _key: &[u8]) {} } diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 2496992..7520098 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -1,65 +1,138 @@ use crate::metadata::Metadata; use crate::Entry; -use crate::Result; +use crate::Iter; use integer_encoding::VarInt; pub(crate) mod block; +use std::sync::Arc; -struct Reader<'r> { - data: &'r [u8], - index: block::Block<'r>, +// interface for a range +#[derive(Debug)] +pub(crate) struct DataSlice> { + data: Arc, + off: usize, + len: usize, +} + +impl> AsRef<[u8]> for DataSlice { + fn as_ref(&self) -> &[u8] { + &((*self.data).as_ref())[self.off..self.off + self.len] + } +} + +impl> DataSlice { + pub(crate) fn new(data: D) -> Self { + let len = data.as_ref().len(); + Self { + data: Arc::new(data), + off: 0, + len, + } + } + + fn clone_range(&self, off: usize, len: usize) -> Self { + assert!(self.off + off + len <= self.len); + Self { + data: self.data.clone(), + off: self.off + off, + len, + } + } + + fn len(&self) -> usize { + self.len + } +} + +#[test] +fn test_data_slice() { + let v: Vec = vec![1u8, 2, 3]; + let ds = DataSlice::new(v.clone()); + assert_eq!(ds.as_ref(), v.as_slice()); + let cr = ds.clone_range(1, 2); + assert_eq!(cr.as_ref(), vec![2u8, 3]); +} + +struct Reader> { + data: DataSlice, metadata: Metadata, } -struct ReaderIter<'r> { - r: &'r Reader<'r>, - index_iter: block::BlockIter<'r>, - cur_data: Option>, +impl> Reader { + fn index_iter(&self) -> block::BlockIter { + block::Block::new(self.data.clone_range( + self.metadata.index_block_offset, + self.metadata.bytes_index_block, + )) + .expect("bad block") + .into_iter() + } +} + +struct ReaderIter<'r, D: AsRef<[u8]>> { + reader: &'r Reader, next_offset: usize, - data_iter: Option>, + index_iter: block::BlockIter, + data_iter: Option>, } -impl<'r> Reader<'r> { - pub fn iter(&'r self) -> ReaderIter<'r> { - let index_iter = self.index.iter(); +impl> Reader { + pub fn iter(&self) -> ReaderIter<'_, D> { ReaderIter { - r: self, - index_iter, + reader: self, next_offset: 0, - cur_data: None, + index_iter: self.index_iter(), data_iter: None, } } } -impl<'r> ReaderIter<'r> { - fn load_block(&'r mut self) -> Result<()> { - let (size, len_size) = usize::decode_var(&self.r.data[self.next_offset..]).ok_or("")?; +impl<'r, D: AsRef<[u8]>> ReaderIter<'r, D> { + fn next_block(&mut self) -> Option<()> { + if self.next_offset >= self.reader.data.len() { + return None; + } + let (size, len_size) = usize::decode_var(&self.reader.data.as_ref()[self.next_offset..])?; let crc_off = self.next_offset + len_size; - let (_crc, len_crc) = u32::decode_var(&self.r.data[crc_off..]).ok_or("")?; + let (_crc, len_crc) = u32::decode_var(&self.reader.data.as_ref()[crc_off..])?; let data_off = crc_off + len_crc; self.next_offset += data_off + size; - self.cur_data - .replace(block::Block::new(&self.r.data[data_off..data_off + size])?); - for b in self.cur_data.iter() { - self.data_iter.replace(b.iter()); - } - Ok(()) + self.data_iter.replace( + block::Block::new(self.reader.data.clone_range(data_off, size)) + .ok()? + .into_iter(), + ); + Some(()) } } -impl<'r> Iterator for &'r mut ReaderIter<'r> { +impl<'r, D: AsRef<[u8]>> Iterator for ReaderIter<'r, D> { type Item = Entry; fn next(&mut self) -> Option { - if self.cur_data.is_none() { - self.load_block().ok()?; - } - if let Some(e) = self.data_iter.as_mut().unwrap().next() { - Some(e) - } else if self.next_offset == self.r.data.len() { - None - } else { - self.load_block().ok()?; + if self.data_iter.is_none() { + self.next_block(); self.data_iter.as_mut().unwrap().next() + } else { + match self.data_iter.as_mut().unwrap().next() { + Some(e) => Some(e), + None => { + self.next_block()?; + self.data_iter.as_mut().unwrap().next() + } + } } } } + +impl<'r, D: AsRef<[u8]>> Iter for ReaderIter<'r, D> { + fn seek(&mut self, key: &[u8]) { + // TODO: detect and skip unneeded seek in iter. + self.index_iter.seek(key); + self.index_iter.next().and_then(|e| { + self.next_offset = usize::decode_var(e.value.as_slice())?.0; + self.next_block().and_then(|_| { + self.data_iter.as_mut().unwrap().seek(key); + Some(()) + }) + }); + } +} diff --git a/src/writer/block_builder.rs b/src/writer/block_builder.rs index 3541fec..2f6c0ef 100644 --- a/src/writer/block_builder.rs +++ b/src/writer/block_builder.rs @@ -118,6 +118,7 @@ impl BlockBuilder { mod test { use super::BlockBuilder; use crate::reader::block::Block; + use crate::reader::DataSlice; #[test] fn test_block_builder() { @@ -133,11 +134,11 @@ mod test { let block_data = bb.as_slice(); assert_eq!(block_data.len(), block_len); println!("Block: {:?}", block_data); - let block = Block::new(bb.as_slice()).unwrap(); + let bi = Block::new(DataSlice::new(bb.as_slice())) + .unwrap() + .into_iter(); let vcmp = Vec::from_iter( - block - .iter() - .map(|e| (Vec::from(e.key.as_slice()), Vec::from(e.value.as_slice()))), + bi.map(|e| (Vec::from(e.key.as_slice()), Vec::from(e.value.as_slice()))), ); assert_eq!(v, vcmp); }