]> git.mikk.net Git - mtbl-rs/commitdiff
Flesh out writer, block builder, requirements
authorChris Mikkelson <cmikk@fsi.io>
Mon, 5 Aug 2024 05:14:49 +0000 (23:14 -0600)
committerChris Mikkelson <cmikk@fsi.io>
Mon, 5 Aug 2024 06:03:51 +0000 (00:03 -0600)
Cargo.lock
Cargo.toml
src/lib.rs
src/writer/block_builder.rs
src/writer/mod.rs

index b55bdc796278b0674ae82c42c02738b6dfac24d9..c366fea9e85a95ec823c1c188e52f0aaf6ec336d 100644 (file)
@@ -2,6 +2,169 @@
 # It is not intended for manual editing.
 version = 3
 
+[[package]]
+name = "adler"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
+
+[[package]]
+name = "cc"
+version = "1.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc"
+dependencies = [
+ "jobserver",
+ "libc",
+]
+
+[[package]]
+name = "cfg-if"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+
+[[package]]
+name = "crc32c"
+version = "0.6.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47"
+dependencies = [
+ "rustc_version",
+]
+
+[[package]]
+name = "crc32fast"
+version = "1.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3"
+dependencies = [
+ "cfg-if",
+]
+
+[[package]]
+name = "flate2"
+version = "1.0.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f211bbe8e69bbd0cfdea405084f128ae8b4aaa6b0b522fc8f2b009084797920"
+dependencies = [
+ "crc32fast",
+ "miniz_oxide",
+]
+
+[[package]]
+name = "integer-encoding"
+version = "4.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0d762194228a2f1c11063e46e32e5acb96e66e906382b9eb5441f2e0504bbd5a"
+
+[[package]]
+name = "jobserver"
+version = "0.1.32"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0"
+dependencies = [
+ "libc",
+]
+
+[[package]]
+name = "libc"
+version = "0.2.155"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
+
+[[package]]
+name = "lz4"
+version = "1.26.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "958b4caa893816eea05507c20cfe47574a43d9a697138a7872990bba8a0ece68"
+dependencies = [
+ "libc",
+ "lz4-sys",
+]
+
+[[package]]
+name = "lz4-sys"
+version = "1.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "109de74d5d2353660401699a4174a4ff23fcc649caf553df71933c7fb45ad868"
+dependencies = [
+ "cc",
+ "libc",
+]
+
+[[package]]
+name = "miniz_oxide"
+version = "0.7.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08"
+dependencies = [
+ "adler",
+]
+
 [[package]]
 name = "mtbl"
 version = "0.1.0"
+dependencies = [
+ "crc32c",
+ "flate2",
+ "integer-encoding",
+ "lz4",
+ "snap",
+ "zstd",
+]
+
+[[package]]
+name = "pkg-config"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
+
+[[package]]
+name = "rustc_version"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
+dependencies = [
+ "semver",
+]
+
+[[package]]
+name = "semver"
+version = "1.0.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
+
+[[package]]
+name = "snap"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
+
+[[package]]
+name = "zstd"
+version = "0.13.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9"
+dependencies = [
+ "zstd-safe",
+]
+
+[[package]]
+name = "zstd-safe"
+version = "7.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "54a3ab4db68cea366acc5c897c7b4d4d1b8994a9cd6e6f841f8964566a419059"
+dependencies = [
+ "zstd-sys",
+]
+
+[[package]]
+name = "zstd-sys"
+version = "2.0.13+zstd.1.5.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa"
+dependencies = [
+ "cc",
+ "pkg-config",
+]
index 41a39e44c17f6497d25bc637e6cd8b3cc74795dd..0a56bd6dfe07ee67712a15673dbf6625eec6a4f6 100644 (file)
@@ -3,4 +3,12 @@ name = "mtbl"
 version = "0.1.0"
 edition = "2021"
 
+[dependencies]
+crc32c = "0.6.8"
+flate2 = "1.0.31"
+integer-encoding = "4.0.2"
+lz4 = "1.26.0"
+snap = "1.1.1"
+zstd = "0.13.2"
+
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
index afcb62814fa0e4bc66436597581e79d55ed21ed6..66d13553b8761e5fa3b2a34dc699ab1a222c9896 100644 (file)
@@ -1,3 +1,4 @@
+#![allow(dead_code)]
 pub mod entry;
 use entry::Entry;
 
index 41f7d7d10f6960ca160f8951555943d444c59668..8efad59d8092b116f8845e2d1837696398074c32 100644 (file)
@@ -5,14 +5,14 @@ pub struct BlockBuilder {
     data: Vec<u8>,
     restarts: Vec<usize>,
     count: usize,
-    restart_interval: usize,
+    pub(crate) restart_interval: usize,
     finished: bool,
 }
 
 impl Default for BlockBuilder {
     fn default() -> Self {
         BlockBuilder {
-            prev_key: Vec::new(),
+            prev_key: Vec::default(),
             data: Vec::new(),
             restarts: Vec::new(),
             count: 0,
@@ -36,7 +36,7 @@ fn varint_append(v: &mut Vec<u8>, mut i: usize) {
 }
 
 impl BlockBuilder {
-    fn add(&mut self, key: &[u8], val: &[u8]) {
+    pub(crate) fn add(&mut self, key: &[u8], val: &[u8]) {
         if self.count > 0 && self.count % self.restart_interval == 0 {
             self.restarts.push(self.data.len());
             self.prev_key.clear();
@@ -72,7 +72,7 @@ impl BlockBuilder {
         }
     }
 
-    fn len(&self) -> usize {
+    pub(crate) fn len(&self) -> usize {
         if self.finished {
             self.data.len()
         } else {
@@ -80,7 +80,7 @@ impl BlockBuilder {
         }
     }
 
-    fn finish(&mut self) -> &[u8] {
+    pub(crate) fn as_slice(&mut self) -> &[u8] {
         if self.finished {
             return self.data.as_slice();
         }
@@ -109,7 +109,7 @@ impl BlockBuilder {
         self.data.as_slice()
     }
 
-    fn reset(&mut self) {
+    pub(crate) fn reset(&mut self) {
         self.count = 0;
         self.restarts.clear();
         self.prev_key.clear();
@@ -134,10 +134,10 @@ mod test {
         }));
         v.iter().for_each(|(k, v)| bb.add(k, v));
         let block_len = bb.len();
-        let block_data = bb.finish();
+        let block_data = bb.as_slice();
         assert_eq!(block_data.len(), block_len);
         println!("Block: {:?}", block_data);
-        let block = Block::new(bb.finish()).unwrap();
+        let block = Block::new(bb.as_slice()).unwrap();
         let vcmp = Vec::from_iter(
             block
                 .iter()
index 513cecdc1f1329b9de6126d4e3cd87aad9d81fa2..6068412d1bed7e5510262e3e0b4fb9a486da1747 100644 (file)
@@ -1 +1,192 @@
+use crate::compression::Compression;
+use crate::{Entry, Result};
+use crc32c::crc32c;
+use integer_encoding::{FixedIntWriter, VarInt, VarIntWriter};
 pub(crate) mod block_builder;
+use crate::metadata::Metadata;
+use block_builder::BlockBuilder;
+use std::cmp::min;
+use std::mem::size_of;
+
+pub struct Writer<W: std::io::Write> {
+    out: W,
+    blocksize: usize,
+    comp: Compression,
+    meta: Metadata,
+    block: BlockBuilder,
+    index: BlockBuilder,
+    written: usize,
+    last_key: Vec<u8>,
+}
+
+// This is more or less a straight port of `bytes_shortest_separator`
+// from mtbl. I believe the logic, particularly the big-endian 16-bit
+// case, could use a bit of review.
+fn bytesep(last: &mut Vec<u8>, next: &[u8]) {
+    let min_len = min(last.len(), next.len());
+    // Find common prefix length, if any
+    let shared_len = last
+        .iter()
+        .zip(next.iter())
+        .take_while(|(a, b)| a == b)
+        .count();
+
+    // Case 1: shared prefix is all of the  last key: use last key as-is
+    if shared_len == min_len {
+        return;
+    }
+
+    // Case 2: shared prefix is a proper prefix of the previous
+    // key.
+    //
+    // Because a prefix of a key sorts before a key, next cannot
+    // be a prefix of last, and thus the shared prefix length must
+    // be less than next.
+    debug_assert!(shared_len < next.len());
+    let lb = last[shared_len];
+    let nb = next[shared_len];
+    if nb > lb + 1 {
+        // removed unneeded lb < 255 check; nb > lb by definition of shared_len, implying lb < 255.
+        // 2.1 next octets differ by greater than 1
+        last.truncate(shared_len);
+        last.push(lb + 1);
+    } else if shared_len + size_of::<u16>() < min_len {
+        // 2.2 next octets differ by exactly 1 -- go 16-bit
+        let lbb: u16 = (lb as u16) << 8
+            | if shared_len + 1 < last.len() {
+                last[shared_len + 1] as u16
+            } else {
+                0
+            };
+        let nbb: u16 = (nb as u16) << 8
+            | if shared_len + 1 < next.len() {
+                next[shared_len + 1] as u16
+            } else {
+                0
+            };
+        if lbb + 1 < nbb {
+            last.truncate(shared_len);
+            last.extend_from_slice(&u16::to_be_bytes(lbb + 1));
+        }
+    }
+}
+
+#[test]
+fn test_bytes_sep() {
+    let cases = [
+        (vec![1], vec![3], vec![2]),
+        (vec![0, 1], vec![0, 3], vec![0, 2]),
+        (vec![0, 255], vec![0, 255, 1], vec![0, 255]),
+        (vec![0, 255, 1, 0], vec![0, 255, 2], vec![0, 255, 1, 0]),
+        (vec![0, 255, 1, 0], vec![0, 255, 2, 0], vec![0, 255, 1, 0]),
+        (
+            vec![0, 255, 1, 255],
+            vec![0, 255, 2, 0],
+            vec![0, 255, 1, 255],
+        ),
+        (
+            vec![0, 255, 1, 0, 0],
+            vec![0, 255, 2, 0, 0],
+            vec![0, 255, 1, 1],
+        ),
+    ];
+    for (mut vl, vn, vs) in cases {
+        bytesep(&mut vl, vn.as_slice());
+        assert_eq!(vl, vs);
+    }
+}
+
+impl<W: std::io::Write> Writer<W> {
+    pub fn new(out: W) -> Self {
+        Self {
+            out,
+            blocksize: 8192,
+            comp: Default::default(),
+            meta: Default::default(),
+            block: BlockBuilder::default(),
+            index: BlockBuilder::default(),
+            written: 0,
+            last_key: Vec::new(),
+        }
+    }
+
+    pub fn add(&mut self, e: Entry) -> Result<()> {
+        let est = e.key.len() + e.value.len() + 15;
+        if self.block.len() + est >= self.blocksize {
+            let mut off_buf: [u8; 10] = Default::default();
+            let offlen = self.written.encode_var(&mut off_buf);
+            bytesep(&mut self.last_key, e.key.as_slice());
+            self.index.add(self.last_key.as_slice(), &off_buf[..offlen]);
+            self.write_block()?;
+        }
+        self.meta.add_entry(e.key.len(), e.value.len());
+        self.block.add(e.key.as_slice(), e.value.as_slice());
+        self.last_key.clear();
+        self.last_key.extend_from_slice(e.key.as_slice());
+        Ok(())
+    }
+
+    pub fn blocksize(mut self, size: usize) -> Self {
+        self.blocksize = size;
+        self.meta.data_block_size = size;
+        self
+    }
+
+    pub fn compression(mut self, comp: Compression) -> Self {
+        self.comp = comp;
+        self.meta.compression_algorithm = comp.into();
+        self
+    }
+
+    pub fn restart_interval(mut self, r: usize) -> Self {
+        self.block.restart_interval = r;
+        self.index.restart_interval = r;
+        self
+    }
+
+    fn write_block(&mut self) -> Result<()> {
+        let b = self.block.as_slice();
+        let cb = self.comp.compress(b)?;
+        self.written += self.out.write_varint(cb.len())?;
+        self.written += self.out.write_fixedint(crc32c(&*cb))?;
+        self.out.write_all(&*cb)?;
+        self.written += cb.len();
+        self.block.reset();
+        Ok(())
+    }
+
+    // private function, called by Drop when writer goes out of scope.
+    fn finish(&mut self) -> Result<()> {
+        let b = self.index.as_slice();
+        self.meta.index_block_offset = self.written;
+        self.meta.bytes_index_block = b.len();
+        self.out.write_all(b)?;
+        self.meta.write_to(&mut self.out)
+    }
+}
+
+impl<W: std::io::Write> Drop for Writer<W> {
+    fn drop(&mut self) {
+        self.finish().unwrap();
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::Writer;
+    use crate::Entry;
+
+    #[test]
+    fn test_writer() {
+        let mut out = Vec::<u8>::new();
+        {
+            let mut w = Writer::new(&mut out);
+            w.add(Entry::new(vec![0], vec![1])).unwrap();
+            w.add(Entry::new(vec![0, 0], vec![1])).unwrap();
+            w.add(Entry::new(vec![0, 1], vec![1])).unwrap();
+            w.add(Entry::new(vec![1, 1], vec![1])).unwrap();
+            // drops w
+        }
+        assert!(out.len() > 512);
+    }
+}