From 4aa7b67d84624a30f1fc7e6a41bfac2b0af5ee81 Mon Sep 17 00:00:00 2001 From: Chris Mikkelson Date: Tue, 16 Jul 2024 01:14:44 -0500 Subject: [PATCH] Beginnings of reader and writer --- src/lib.rs | 3 + src/reader/block.rs | 178 ++++++++++++++++++++++++++++++++++++ src/reader/mod.rs | 1 + src/writer/block_builder.rs | 144 +++++++++++++++++++++++++++++ src/writer/mod.rs | 1 + 5 files changed, 327 insertions(+) create mode 100644 src/reader/block.rs create mode 100644 src/reader/mod.rs create mode 100644 src/writer/block_builder.rs create mode 100644 src/writer/mod.rs diff --git a/src/lib.rs b/src/lib.rs index fe36189..d54ab8b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,3 +6,6 @@ use source::{Iter, Source}; pub mod merger; //use merger::Merger; + +pub mod reader; +pub mod writer; diff --git a/src/reader/block.rs b/src/reader/block.rs new file mode 100644 index 0000000..83e639f --- /dev/null +++ b/src/reader/block.rs @@ -0,0 +1,178 @@ +use crate::{Entry, Iter}; +use std::sync::Arc; + +type Error = Box; +type Result = std::result::Result; + +#[derive(Debug)] +enum Restarts<'b> { + Restart32(&'b [u8]), + Restart64(&'b [u8]), +} + +impl<'b> Restarts<'b> { + fn decode(&self, ridx: usize) -> Result { + match self { + Restarts::Restart32(r) => { + let len = 4; + let start = ridx * len; + let end = start + len; + if end > r.len() { + Err("todo-32".into()) + } else { + Ok(u32::from_be_bytes(r[start..end].try_into()?) as usize) + } + } + Restarts::Restart64(r) => { + let len = 8; + let start = ridx * len; + let end = start + len; + if end > r.len() { + Err("todo-64".into()) + } else { + Ok(u64::from_be_bytes(r[start..end].try_into()?) as usize) + } + } + } + } +} + +#[derive(Debug)] +pub(crate) struct Block<'b> { + data: &'b [u8], + restarts: Restarts<'b>, +} + +impl<'b> Block<'b> { + pub(crate) fn new(data: &'b [u8]) -> Result { + let mut dlen = data.len(); + + if data.len() < 4 { + return Err("block data too short".into()); + } + dlen -= 4; + + let restarts: Restarts; + let nrestarts = u32::from_be_bytes(data[dlen..].try_into()?) as usize; + + // try 32-bit restarts + let mut len_restarts = nrestarts * 4; + if len_restarts > dlen { + return Err("block data too short 2".into()); + } + let mut dlen = dlen - len_restarts; + + if dlen <= u32::max_value() as usize { + restarts = Restarts::Restart32(&data[dlen..data.len() - 4]); + } else { + // try 64-bit restarts + dlen -= len_restarts; + len_restarts *= 2; + if len_restarts > dlen { + return Err("block data too short 3".into()); + } + restarts = Restarts::Restart64(&data[dlen..data.len() - 4]); + } + Ok(Block { + data: &data[..dlen - 4], // XXX - debugme -- off by 4 + restarts, + }) + } + + pub(crate) fn iter(&self) -> BlockIter<'_> { + BlockIter { + block: &self, + cur_key: Arc::new(Vec::new()), + cur_val: Arc::new(Vec::new()), + off: 0, + } + } +} + +#[derive(Debug)] +pub(crate) struct BlockIter<'b> { + block: &'b Block<'b>, + cur_key: Arc>, + cur_val: Arc>, + off: usize, +} + +fn get_bytes(b: &[u8], n: usize) -> Result<&[u8]> { + if n > b.len() { + return Err("too long".into()); + } + Ok(&b[0..n]) +} + +fn decode_varint(b: &[u8]) -> Result<(usize, usize)> { + let (mut i, mut shift) = (0, 0); + let mut ret: usize = 0; + while i < b.len() { + let v = (b[i] & 0x7f) as usize; + ret |= v.checked_shl(shift).ok_or("varint-1")?; + if b[i] & 0x80 == 0 { + return Ok((ret, i + 1)); + } + i += 1; + shift += 7; + } + Err("varint-2".into()) +} + +impl<'b> BlockIter<'b> { + fn seek_restart(&mut self, ridx: usize) -> Result<()> { + self.off = self.block.restarts.decode(ridx)?; + let key = Arc::make_mut(&mut self.cur_key); + let val = Arc::make_mut(&mut self.cur_val); + key.clear(); + val.clear(); + self.decode()?; + Ok(()) + } + + fn decode(&mut self) -> Result<()> { + let mut idx = self.off; + let (shared_key, len) = decode_varint(&self.block.data[idx..])?; + idx += len; + let (unshared_key, len) = decode_varint(&self.block.data[idx..])?; + idx += len; + let (len_val, len) = decode_varint(&self.block.data[idx..])?; + idx += len; + + if shared_key > self.cur_key.len() { + return Err("shared_key too long".into()); + } + + let key = Arc::make_mut(&mut self.cur_key); + key.truncate(shared_key); + key.extend_from_slice(get_bytes(&self.block.data[idx..], unshared_key)?); + idx += unshared_key; + + let val = Arc::make_mut(&mut self.cur_val); + val.clear(); + val.extend_from_slice(get_bytes(&self.block.data[idx..], len_val)?); + idx += len_val; + self.off = idx; + Ok(()) + } +} + +impl<'b> Iterator for BlockIter<'b> { + type Item = Entry; + + fn next(&mut self) -> Option { + println!("next: {:?}", self); + if self.off >= self.block.data.len() { + return None; + } + self.decode().ok()?; + Some(Entry { + key: self.cur_key.clone(), + value: self.cur_val.clone(), + }) + } +} + +impl<'b> Iter for BlockIter<'b> { + fn seek(&mut self, _key: &[u8]) {} +} diff --git a/src/reader/mod.rs b/src/reader/mod.rs new file mode 100644 index 0000000..ee48211 --- /dev/null +++ b/src/reader/mod.rs @@ -0,0 +1 @@ +pub(crate) mod block; diff --git a/src/writer/block_builder.rs b/src/writer/block_builder.rs new file mode 100644 index 0000000..5ad380b --- /dev/null +++ b/src/writer/block_builder.rs @@ -0,0 +1,144 @@ +pub struct BlockBuilder { + prev_key: Vec, + data: Vec, + restarts: Vec, + count: usize, + restart_interval: usize, + finished: bool, +} + +impl Default for BlockBuilder { + fn default() -> Self { + BlockBuilder { + prev_key: Vec::new(), + data: Vec::new(), + restarts: Vec::new(), + count: 0, + restart_interval: 16, + finished: false, + } + } +} + +fn varint_append(v: &mut Vec, mut i: usize) { + loop { + let b: u8 = i as u8 & 0x7f; + i >>= 7; + if i > 0 { + v.push(b | 0x80); + } else { + v.push(b); + break; + } + } +} + +impl BlockBuilder { + fn add(&mut self, key: &[u8], val: &[u8]) { + if self.count > 0 && self.count % self.restart_interval == 0 { + self.restarts.push(self.data.len()); + self.prev_key.clear(); + } + + let key_shared = self + .prev_key + .iter() + .zip(key.iter()) + .take_while(|(a, b)| a == b) + .count(); + println!( + "prev: {:?}\n key: {:?}\n shared: {}\n", + self.prev_key, key, key_shared + ); + let key_unshared = key.len() - key_shared; + self.data.reserve(3 * 5 + key_unshared + val.len()); + varint_append(&mut self.data, key_shared); + varint_append(&mut self.data, key_unshared); + varint_append(&mut self.data, val.len()); + self.data.extend_from_slice(&key[key_shared..]); + self.data.extend_from_slice(val); + self.count += 1; + self.prev_key.clear(); + self.prev_key.extend_from_slice(key); + } + + fn restart_size(&self) -> usize { + if self.data.len() > u32::max_value() as usize { + 4 + } else { + 8 + } + } + + fn len(&self) -> usize { + if self.finished { + self.data.len() + } else { + self.data.len() + self.restarts.len() * self.restart_size() + 4 + } + } + + fn finish(&mut self) -> &[u8] { + if self.finished { + return self.data.as_slice(); + } + + let num_restarts = self.restarts.len(); + assert!(num_restarts <= u32::max_value() as usize); + self.data.reserve(num_restarts * self.restart_size() + 4); + match self.restart_size() { + 4 => { + for b in self.restarts.iter().map(|r| u32::to_be_bytes(*r as u32)) { + self.data.extend_from_slice(&b[..]); + } + } + 8 => { + for b in self.restarts.iter().map(|r| u64::to_be_bytes(*r as u64)) { + self.data.extend_from_slice(&b[..]); + } + } + _ => unreachable!(), + }; + self.data + .extend_from_slice(u32::to_be_bytes(num_restarts as u32).as_slice()); + + self.data.as_slice() + } + + fn reset(&mut self) { + self.count = 0; + self.restarts.clear(); + self.prev_key.clear(); + self.data.clear(); + self.finished = false; + } +} + +#[cfg(test)] +mod test { + use super::BlockBuilder; + use crate::reader::block::Block; + + #[test] + fn test_block_builder() { + let mut bb = BlockBuilder::default(); + let v = Vec::from_iter((0..16).map(|i| { + ( + Vec::from(u32::to_be_bytes(i).as_slice()), + Vec::from(u32::to_be_bytes(i * 2).as_slice()), + ) + })); + v.iter().for_each(|(k, v)| bb.add(k, v)); + let block_len = bb.len(); + let block_data = bb.finish(); + assert_eq!(block_data.len(), block_len); + println!("Block: {:?}", block_data); + let block = Block::new(bb.finish()).unwrap(); + let vcmp = Vec::from_iter( + block + .iter() + .map(|e| (Vec::from(e.key.as_slice()), Vec::from(e.value.as_slice()))), + ); + assert_eq!(v, vcmp); + } +} diff --git a/src/writer/mod.rs b/src/writer/mod.rs new file mode 100644 index 0000000..513cecd --- /dev/null +++ b/src/writer/mod.rs @@ -0,0 +1 @@ +pub(crate) mod block_builder; -- 2.50.1