]> git.mikk.net Git - mtbl-rs/commitdiff
Beginnings of reader and writer
authorChris Mikkelson <cmikk@fsi.io>
Tue, 16 Jul 2024 06:14:44 +0000 (01:14 -0500)
committerChris Mikkelson <cmikk@fsi.io>
Tue, 16 Jul 2024 06:14:44 +0000 (01:14 -0500)
src/lib.rs
src/reader/block.rs [new file with mode: 0644]
src/reader/mod.rs [new file with mode: 0644]
src/writer/block_builder.rs [new file with mode: 0644]
src/writer/mod.rs [new file with mode: 0644]

index fe3618918eb0d892954ce5b8840e6430a5ceb5c7..d54ab8b9b13ede4b1a59a23454cbd7190237eb28 100644 (file)
@@ -6,3 +6,6 @@ use source::{Iter, Source};
 
 pub mod merger;
 //use merger::Merger;
+
+pub mod reader;
+pub mod writer;
diff --git a/src/reader/block.rs b/src/reader/block.rs
new file mode 100644 (file)
index 0000000..83e639f
--- /dev/null
@@ -0,0 +1,178 @@
+use crate::{Entry, Iter};
+use std::sync::Arc;
+
+type Error = Box<dyn std::error::Error>;
+type Result<T> = std::result::Result<T, Error>;
+
+#[derive(Debug)]
+enum Restarts<'b> {
+    Restart32(&'b [u8]),
+    Restart64(&'b [u8]),
+}
+
+impl<'b> Restarts<'b> {
+    fn decode(&self, ridx: usize) -> Result<usize> {
+        match self {
+            Restarts::Restart32(r) => {
+                let len = 4;
+                let start = ridx * len;
+                let end = start + len;
+                if end > r.len() {
+                    Err("todo-32".into())
+                } else {
+                    Ok(u32::from_be_bytes(r[start..end].try_into()?) as usize)
+                }
+            }
+            Restarts::Restart64(r) => {
+                let len = 8;
+                let start = ridx * len;
+                let end = start + len;
+                if end > r.len() {
+                    Err("todo-64".into())
+                } else {
+                    Ok(u64::from_be_bytes(r[start..end].try_into()?) as usize)
+                }
+            }
+        }
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct Block<'b> {
+    data: &'b [u8],
+    restarts: Restarts<'b>,
+}
+
+impl<'b> Block<'b> {
+    pub(crate) fn new(data: &'b [u8]) -> Result<Self> {
+        let mut dlen = data.len();
+
+        if data.len() < 4 {
+            return Err("block data too short".into());
+        }
+        dlen -= 4;
+
+        let restarts: Restarts;
+        let nrestarts = u32::from_be_bytes(data[dlen..].try_into()?) as usize;
+
+        // try 32-bit restarts
+        let mut len_restarts = nrestarts * 4;
+        if len_restarts > dlen {
+            return Err("block data too short 2".into());
+        }
+        let mut dlen = dlen - len_restarts;
+
+        if dlen <= u32::max_value() as usize {
+            restarts = Restarts::Restart32(&data[dlen..data.len() - 4]);
+        } else {
+            // try 64-bit restarts
+            dlen -= len_restarts;
+            len_restarts *= 2;
+            if len_restarts > dlen {
+                return Err("block data too short 3".into());
+            }
+            restarts = Restarts::Restart64(&data[dlen..data.len() - 4]);
+        }
+        Ok(Block {
+            data: &data[..dlen - 4], // XXX - debugme -- off by 4
+            restarts,
+        })
+    }
+
+    pub(crate) fn iter(&self) -> BlockIter<'_> {
+        BlockIter {
+            block: &self,
+            cur_key: Arc::new(Vec::new()),
+            cur_val: Arc::new(Vec::new()),
+            off: 0,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct BlockIter<'b> {
+    block: &'b Block<'b>,
+    cur_key: Arc<Vec<u8>>,
+    cur_val: Arc<Vec<u8>>,
+    off: usize,
+}
+
+fn get_bytes(b: &[u8], n: usize) -> Result<&[u8]> {
+    if n > b.len() {
+        return Err("too long".into());
+    }
+    Ok(&b[0..n])
+}
+
+fn decode_varint(b: &[u8]) -> Result<(usize, usize)> {
+    let (mut i, mut shift) = (0, 0);
+    let mut ret: usize = 0;
+    while i < b.len() {
+        let v = (b[i] & 0x7f) as usize;
+        ret |= v.checked_shl(shift).ok_or("varint-1")?;
+        if b[i] & 0x80 == 0 {
+            return Ok((ret, i + 1));
+        }
+        i += 1;
+        shift += 7;
+    }
+    Err("varint-2".into())
+}
+
+impl<'b> BlockIter<'b> {
+    fn seek_restart(&mut self, ridx: usize) -> Result<()> {
+        self.off = self.block.restarts.decode(ridx)?;
+        let key = Arc::make_mut(&mut self.cur_key);
+        let val = Arc::make_mut(&mut self.cur_val);
+        key.clear();
+        val.clear();
+        self.decode()?;
+        Ok(())
+    }
+
+    fn decode(&mut self) -> Result<()> {
+        let mut idx = self.off;
+        let (shared_key, len) = decode_varint(&self.block.data[idx..])?;
+        idx += len;
+        let (unshared_key, len) = decode_varint(&self.block.data[idx..])?;
+        idx += len;
+        let (len_val, len) = decode_varint(&self.block.data[idx..])?;
+        idx += len;
+
+        if shared_key > self.cur_key.len() {
+            return Err("shared_key too long".into());
+        }
+
+        let key = Arc::make_mut(&mut self.cur_key);
+        key.truncate(shared_key);
+        key.extend_from_slice(get_bytes(&self.block.data[idx..], unshared_key)?);
+        idx += unshared_key;
+
+        let val = Arc::make_mut(&mut self.cur_val);
+        val.clear();
+        val.extend_from_slice(get_bytes(&self.block.data[idx..], len_val)?);
+        idx += len_val;
+        self.off = idx;
+        Ok(())
+    }
+}
+
+impl<'b> Iterator for BlockIter<'b> {
+    type Item = Entry;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        println!("next: {:?}", self);
+        if self.off >= self.block.data.len() {
+            return None;
+        }
+        self.decode().ok()?;
+        Some(Entry {
+            key: self.cur_key.clone(),
+            value: self.cur_val.clone(),
+        })
+    }
+}
+
+impl<'b> Iter for BlockIter<'b> {
+    fn seek(&mut self, _key: &[u8]) {}
+}
diff --git a/src/reader/mod.rs b/src/reader/mod.rs
new file mode 100644 (file)
index 0000000..ee48211
--- /dev/null
@@ -0,0 +1 @@
+pub(crate) mod block;
diff --git a/src/writer/block_builder.rs b/src/writer/block_builder.rs
new file mode 100644 (file)
index 0000000..5ad380b
--- /dev/null
@@ -0,0 +1,144 @@
+pub struct BlockBuilder {
+    prev_key: Vec<u8>,
+    data: Vec<u8>,
+    restarts: Vec<usize>,
+    count: usize,
+    restart_interval: usize,
+    finished: bool,
+}
+
+impl Default for BlockBuilder {
+    fn default() -> Self {
+        BlockBuilder {
+            prev_key: Vec::new(),
+            data: Vec::new(),
+            restarts: Vec::new(),
+            count: 0,
+            restart_interval: 16,
+            finished: false,
+        }
+    }
+}
+
+fn varint_append(v: &mut Vec<u8>, mut i: usize) {
+    loop {
+        let b: u8 = i as u8 & 0x7f;
+        i >>= 7;
+        if i > 0 {
+            v.push(b | 0x80);
+        } else {
+            v.push(b);
+            break;
+        }
+    }
+}
+
+impl BlockBuilder {
+    fn add(&mut self, key: &[u8], val: &[u8]) {
+        if self.count > 0 && self.count % self.restart_interval == 0 {
+            self.restarts.push(self.data.len());
+            self.prev_key.clear();
+        }
+
+        let key_shared = self
+            .prev_key
+            .iter()
+            .zip(key.iter())
+            .take_while(|(a, b)| a == b)
+            .count();
+        println!(
+            "prev: {:?}\n key: {:?}\n  shared: {}\n",
+            self.prev_key, key, key_shared
+        );
+        let key_unshared = key.len() - key_shared;
+        self.data.reserve(3 * 5 + key_unshared + val.len());
+        varint_append(&mut self.data, key_shared);
+        varint_append(&mut self.data, key_unshared);
+        varint_append(&mut self.data, val.len());
+        self.data.extend_from_slice(&key[key_shared..]);
+        self.data.extend_from_slice(val);
+        self.count += 1;
+        self.prev_key.clear();
+        self.prev_key.extend_from_slice(key);
+    }
+
+    fn restart_size(&self) -> usize {
+        if self.data.len() > u32::max_value() as usize {
+            4
+        } else {
+            8
+        }
+    }
+
+    fn len(&self) -> usize {
+        if self.finished {
+            self.data.len()
+        } else {
+            self.data.len() + self.restarts.len() * self.restart_size() + 4
+        }
+    }
+
+    fn finish(&mut self) -> &[u8] {
+        if self.finished {
+            return self.data.as_slice();
+        }
+
+        let num_restarts = self.restarts.len();
+        assert!(num_restarts <= u32::max_value() as usize);
+        self.data.reserve(num_restarts * self.restart_size() + 4);
+        match self.restart_size() {
+            4 => {
+                for b in self.restarts.iter().map(|r| u32::to_be_bytes(*r as u32)) {
+                    self.data.extend_from_slice(&b[..]);
+                }
+            }
+            8 => {
+                for b in self.restarts.iter().map(|r| u64::to_be_bytes(*r as u64)) {
+                    self.data.extend_from_slice(&b[..]);
+                }
+            }
+            _ => unreachable!(),
+        };
+        self.data
+            .extend_from_slice(u32::to_be_bytes(num_restarts as u32).as_slice());
+
+        self.data.as_slice()
+    }
+
+    fn reset(&mut self) {
+        self.count = 0;
+        self.restarts.clear();
+        self.prev_key.clear();
+        self.data.clear();
+        self.finished = false;
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::BlockBuilder;
+    use crate::reader::block::Block;
+
+    #[test]
+    fn test_block_builder() {
+        let mut bb = BlockBuilder::default();
+        let v = Vec::from_iter((0..16).map(|i| {
+            (
+                Vec::from(u32::to_be_bytes(i).as_slice()),
+                Vec::from(u32::to_be_bytes(i * 2).as_slice()),
+            )
+        }));
+        v.iter().for_each(|(k, v)| bb.add(k, v));
+        let block_len = bb.len();
+        let block_data = bb.finish();
+        assert_eq!(block_data.len(), block_len);
+        println!("Block: {:?}", block_data);
+        let block = Block::new(bb.finish()).unwrap();
+        let vcmp = Vec::from_iter(
+            block
+                .iter()
+                .map(|e| (Vec::from(e.key.as_slice()), Vec::from(e.value.as_slice()))),
+        );
+        assert_eq!(v, vcmp);
+    }
+}
diff --git a/src/writer/mod.rs b/src/writer/mod.rs
new file mode 100644 (file)
index 0000000..513cecd
--- /dev/null
@@ -0,0 +1 @@
+pub(crate) mod block_builder;