From ba65802f5c5d25259874654e6d5423db0bc140f1 Mon Sep 17 00:00:00 2001 From: Chris Mikkelson Date: Sun, 4 Aug 2024 23:14:49 -0600 Subject: [PATCH] Flesh out writer, block builder, requirements --- Cargo.lock | 163 ++++++++++++++++++++++++++++++ Cargo.toml | 8 ++ src/lib.rs | 1 + src/writer/block_builder.rs | 16 +-- src/writer/mod.rs | 191 ++++++++++++++++++++++++++++++++++++ 5 files changed, 371 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b55bdc7..c366fea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,169 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "cc" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" +dependencies = [ + "jobserver", + "libc", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "crc32c" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +dependencies = [ + "rustc_version", +] + +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "flate2" +version = "1.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f211bbe8e69bbd0cfdea405084f128ae8b4aaa6b0b522fc8f2b009084797920" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + +[[package]] +name = "integer-encoding" +version = "4.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d762194228a2f1c11063e46e32e5acb96e66e906382b9eb5441f2e0504bbd5a" + +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + +[[package]] +name = "libc" +version = "0.2.155" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" + +[[package]] +name = "lz4" +version = "1.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958b4caa893816eea05507c20cfe47574a43d9a697138a7872990bba8a0ece68" +dependencies = [ + "libc", + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109de74d5d2353660401699a4174a4ff23fcc649caf553df71933c7fb45ad868" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "miniz_oxide" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +dependencies = [ + "adler", +] + [[package]] name = "mtbl" version = "0.1.0" +dependencies = [ + "crc32c", + "flate2", + "integer-encoding", + "lz4", + "snap", + "zstd", +] + +[[package]] +name = "pkg-config" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" + +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + +[[package]] +name = "zstd" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54a3ab4db68cea366acc5c897c7b4d4d1b8994a9cd6e6f841f8964566a419059" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.13+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index 41a39e4..0a56bd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,4 +3,12 @@ name = "mtbl" version = "0.1.0" edition = "2021" +[dependencies] +crc32c = "0.6.8" +flate2 = "1.0.31" +integer-encoding = "4.0.2" +lz4 = "1.26.0" +snap = "1.1.1" +zstd = "0.13.2" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/lib.rs b/src/lib.rs index afcb628..66d1355 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] pub mod entry; use entry::Entry; diff --git a/src/writer/block_builder.rs b/src/writer/block_builder.rs index 41f7d7d..8efad59 100644 --- a/src/writer/block_builder.rs +++ b/src/writer/block_builder.rs @@ -5,14 +5,14 @@ pub struct BlockBuilder { data: Vec, restarts: Vec, count: usize, - restart_interval: usize, + pub(crate) restart_interval: usize, finished: bool, } impl Default for BlockBuilder { fn default() -> Self { BlockBuilder { - prev_key: Vec::new(), + prev_key: Vec::default(), data: Vec::new(), restarts: Vec::new(), count: 0, @@ -36,7 +36,7 @@ fn varint_append(v: &mut Vec, mut i: usize) { } impl BlockBuilder { - fn add(&mut self, key: &[u8], val: &[u8]) { + pub(crate) fn add(&mut self, key: &[u8], val: &[u8]) { if self.count > 0 && self.count % self.restart_interval == 0 { self.restarts.push(self.data.len()); self.prev_key.clear(); @@ -72,7 +72,7 @@ impl BlockBuilder { } } - fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { if self.finished { self.data.len() } else { @@ -80,7 +80,7 @@ impl BlockBuilder { } } - fn finish(&mut self) -> &[u8] { + pub(crate) fn as_slice(&mut self) -> &[u8] { if self.finished { return self.data.as_slice(); } @@ -109,7 +109,7 @@ impl BlockBuilder { self.data.as_slice() } - fn reset(&mut self) { + pub(crate) fn reset(&mut self) { self.count = 0; self.restarts.clear(); self.prev_key.clear(); @@ -134,10 +134,10 @@ mod test { })); v.iter().for_each(|(k, v)| bb.add(k, v)); let block_len = bb.len(); - let block_data = bb.finish(); + let block_data = bb.as_slice(); assert_eq!(block_data.len(), block_len); println!("Block: {:?}", block_data); - let block = Block::new(bb.finish()).unwrap(); + let block = Block::new(bb.as_slice()).unwrap(); let vcmp = Vec::from_iter( block .iter() diff --git a/src/writer/mod.rs b/src/writer/mod.rs index 513cecd..6068412 100644 --- a/src/writer/mod.rs +++ b/src/writer/mod.rs @@ -1 +1,192 @@ +use crate::compression::Compression; +use crate::{Entry, Result}; +use crc32c::crc32c; +use integer_encoding::{FixedIntWriter, VarInt, VarIntWriter}; pub(crate) mod block_builder; +use crate::metadata::Metadata; +use block_builder::BlockBuilder; +use std::cmp::min; +use std::mem::size_of; + +pub struct Writer { + out: W, + blocksize: usize, + comp: Compression, + meta: Metadata, + block: BlockBuilder, + index: BlockBuilder, + written: usize, + last_key: Vec, +} + +// This is more or less a straight port of `bytes_shortest_separator` +// from mtbl. I believe the logic, particularly the big-endian 16-bit +// case, could use a bit of review. +fn bytesep(last: &mut Vec, next: &[u8]) { + let min_len = min(last.len(), next.len()); + // Find common prefix length, if any + let shared_len = last + .iter() + .zip(next.iter()) + .take_while(|(a, b)| a == b) + .count(); + + // Case 1: shared prefix is all of the last key: use last key as-is + if shared_len == min_len { + return; + } + + // Case 2: shared prefix is a proper prefix of the previous + // key. + // + // Because a prefix of a key sorts before a key, next cannot + // be a prefix of last, and thus the shared prefix length must + // be less than next. + debug_assert!(shared_len < next.len()); + let lb = last[shared_len]; + let nb = next[shared_len]; + if nb > lb + 1 { + // removed unneeded lb < 255 check; nb > lb by definition of shared_len, implying lb < 255. + // 2.1 next octets differ by greater than 1 + last.truncate(shared_len); + last.push(lb + 1); + } else if shared_len + size_of::() < min_len { + // 2.2 next octets differ by exactly 1 -- go 16-bit + let lbb: u16 = (lb as u16) << 8 + | if shared_len + 1 < last.len() { + last[shared_len + 1] as u16 + } else { + 0 + }; + let nbb: u16 = (nb as u16) << 8 + | if shared_len + 1 < next.len() { + next[shared_len + 1] as u16 + } else { + 0 + }; + if lbb + 1 < nbb { + last.truncate(shared_len); + last.extend_from_slice(&u16::to_be_bytes(lbb + 1)); + } + } +} + +#[test] +fn test_bytes_sep() { + let cases = [ + (vec![1], vec![3], vec![2]), + (vec![0, 1], vec![0, 3], vec![0, 2]), + (vec![0, 255], vec![0, 255, 1], vec![0, 255]), + (vec![0, 255, 1, 0], vec![0, 255, 2], vec![0, 255, 1, 0]), + (vec![0, 255, 1, 0], vec![0, 255, 2, 0], vec![0, 255, 1, 0]), + ( + vec![0, 255, 1, 255], + vec![0, 255, 2, 0], + vec![0, 255, 1, 255], + ), + ( + vec![0, 255, 1, 0, 0], + vec![0, 255, 2, 0, 0], + vec![0, 255, 1, 1], + ), + ]; + for (mut vl, vn, vs) in cases { + bytesep(&mut vl, vn.as_slice()); + assert_eq!(vl, vs); + } +} + +impl Writer { + pub fn new(out: W) -> Self { + Self { + out, + blocksize: 8192, + comp: Default::default(), + meta: Default::default(), + block: BlockBuilder::default(), + index: BlockBuilder::default(), + written: 0, + last_key: Vec::new(), + } + } + + pub fn add(&mut self, e: Entry) -> Result<()> { + let est = e.key.len() + e.value.len() + 15; + if self.block.len() + est >= self.blocksize { + let mut off_buf: [u8; 10] = Default::default(); + let offlen = self.written.encode_var(&mut off_buf); + bytesep(&mut self.last_key, e.key.as_slice()); + self.index.add(self.last_key.as_slice(), &off_buf[..offlen]); + self.write_block()?; + } + self.meta.add_entry(e.key.len(), e.value.len()); + self.block.add(e.key.as_slice(), e.value.as_slice()); + self.last_key.clear(); + self.last_key.extend_from_slice(e.key.as_slice()); + Ok(()) + } + + pub fn blocksize(mut self, size: usize) -> Self { + self.blocksize = size; + self.meta.data_block_size = size; + self + } + + pub fn compression(mut self, comp: Compression) -> Self { + self.comp = comp; + self.meta.compression_algorithm = comp.into(); + self + } + + pub fn restart_interval(mut self, r: usize) -> Self { + self.block.restart_interval = r; + self.index.restart_interval = r; + self + } + + fn write_block(&mut self) -> Result<()> { + let b = self.block.as_slice(); + let cb = self.comp.compress(b)?; + self.written += self.out.write_varint(cb.len())?; + self.written += self.out.write_fixedint(crc32c(&*cb))?; + self.out.write_all(&*cb)?; + self.written += cb.len(); + self.block.reset(); + Ok(()) + } + + // private function, called by Drop when writer goes out of scope. + fn finish(&mut self) -> Result<()> { + let b = self.index.as_slice(); + self.meta.index_block_offset = self.written; + self.meta.bytes_index_block = b.len(); + self.out.write_all(b)?; + self.meta.write_to(&mut self.out) + } +} + +impl Drop for Writer { + fn drop(&mut self) { + self.finish().unwrap(); + } +} + +#[cfg(test)] +mod test { + use super::Writer; + use crate::Entry; + + #[test] + fn test_writer() { + let mut out = Vec::::new(); + { + let mut w = Writer::new(&mut out); + w.add(Entry::new(vec![0], vec![1])).unwrap(); + w.add(Entry::new(vec![0, 0], vec![1])).unwrap(); + w.add(Entry::new(vec![0, 1], vec![1])).unwrap(); + w.add(Entry::new(vec![1, 1], vec![1])).unwrap(); + // drops w + } + assert!(out.len() > 512); + } +} -- 2.50.1