From ad4b6a3afebc9ea7aec54ca732858e3890743dc4 Mon Sep 17 00:00:00 2001 From: Chris Mikkelson Date: Sun, 4 Aug 2024 23:12:22 -0600 Subject: [PATCH] add metadata, compression --- src/compression.rs | 117 ++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 4 +- src/metadata.rs | 119 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 239 insertions(+), 1 deletion(-) create mode 100644 src/compression.rs create mode 100644 src/metadata.rs diff --git a/src/compression.rs b/src/compression.rs new file mode 100644 index 0000000..82164d5 --- /dev/null +++ b/src/compression.rs @@ -0,0 +1,117 @@ +use std::{ + io::{Read, Write}, + ops::Deref, +}; + +#[derive(Default, Clone, Copy)] +pub enum Compression { + #[default] + None, + Snappy, + Zlib(flate2::Compression), + //LZ4, + //LZ4HC, + Zstd(i32), +} + +impl TryFrom for Compression { + type Error = &'static str; + fn try_from(i: u64) -> std::result::Result { + match i { + 0 => Ok(Compression::None), + 1 => Ok(Compression::Snappy), + 2 => Ok(Compression::Zlib(flate2::Compression::default())), + //3 => Ok(Compression::LZ4), + //4 => Ok(Compression::LZ4HC), + 5 => Ok(Compression::Zstd(zstd::DEFAULT_COMPRESSION_LEVEL)), + _ => Err("invalid compression type"), + } + } +} + +impl From for u64 { + fn from(c: Compression) -> u64 { + match c { + Compression::None => 0, + Compression::Snappy => 1, + Compression::Zlib(_) => 2, + // Compression::LZ4 => 3, + //Compression::LZ4HC => 4, + Compression::Zstd(_) => 5, + } + } +} + +// Store compressed data as either a vec or (for uncompressed) +// +#[derive(Debug)] +pub(crate) enum CBuf<'a> { + Buf(&'a [u8]), + Vec(Vec), +} + +impl<'a> Deref for CBuf<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + match self { + CBuf::Buf(b) => b, + CBuf::Vec(v) => v.as_slice(), + } + } +} + +impl<'a> Compression { + pub(crate) fn compress(&'a self, buf: &'a [u8]) -> Result> { + match self { + Compression::None => Ok(CBuf::Buf(buf)), + Compression::Snappy => Ok(CBuf::Vec(snap::raw::Encoder::new().compress_vec(buf)?)), + Compression::Zlib(level) => { + let mut v = Vec::::new(); + { + let mut enc = flate2::write::ZlibEncoder::new(&mut v, *level); + enc.write_all(buf)?; + } + Ok(CBuf::Vec(v)) + } + // Compression::LZ4 => {} + // Compression::LZ4HC => {} + Compression::Zstd(level) => Ok(CBuf::Vec(zstd::bulk::compress(buf, *level)?)), + } + } + + pub(crate) fn uncompress(&'a self, buf: &'a [u8]) -> Option { + match self { + Compression::None => Some(CBuf::Buf(buf)), + Compression::Snappy => Some(CBuf::Vec( + snap::raw::Decoder::new().decompress_vec(buf).ok()?, + )), + Compression::Zlib(_) => { + let mut v = Vec::::new(); + { + let mut dec = flate2::read::ZlibDecoder::new(buf); + dec.read_to_end(&mut v).ok()?; + } + Some(CBuf::Vec(v)) + } + Compression::Zstd(_) => { + let mut dec = zstd::bulk::Decompressor::new().ok()?; + Some(CBuf::Vec(dec.decompress(buf, 100 * buf.len()).ok()?)) + } + } + } +} + +#[cfg(test)] +mod test { + use super::Compression; + + #[test] + fn test_compress() { + let v: Vec = (1..255).collect(); + let comp = Compression::Zstd(3); + let cdata = comp.compress(v.as_slice()).unwrap(); + let udata = comp.uncompress(&cdata).unwrap(); + assert_eq!(&*udata, v.as_slice()); + } +} diff --git a/src/lib.rs b/src/lib.rs index 8304d77..2f2a944 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,9 @@ pub mod iter; use iter::Iter; pub mod merger; -//use merger::Merger; +pub mod compression; pub mod reader; pub mod writer; + +mod metadata; diff --git a/src/metadata.rs b/src/metadata.rs new file mode 100644 index 0000000..20485a8 --- /dev/null +++ b/src/metadata.rs @@ -0,0 +1,119 @@ +/* +struct mtbl_metadata { + mtbl_file_version file_version; + uint64_t index_block_offset; + uint64_t data_block_size; + uint64_t compression_algorithm; + uint64_t count_entries; + uint64_t count_data_blocks; + uint64_t bytes_data_blocks; + uint64_t bytes_index_block; + uint64_t bytes_keys; + uint64_t bytes_values; +}; +*/ +use crate::compression::Compression; +use crate::Result; +use std::mem::size_of; + +const MTBL_MAGIC: u32 = 0x4D54424C; + +#[derive(Default)] +pub(crate) struct Metadata { + pub(crate) index_block_offset: usize, + pub(crate) data_block_size: usize, + pub(crate) compression_algorithm: Compression, + pub(crate) count_entries: u64, + pub(crate) count_data_blocks: u64, + pub(crate) bytes_data_blocks: usize, + pub(crate) bytes_index_block: usize, + pub(crate) bytes_keys: usize, + pub(crate) bytes_values: usize, +} + +fn try_read64(b: &mut &[u8]) -> Result { + if b.len() < size_of::() { + Err("buffer too short".into()) + } else { + let (n, rest) = b.split_at(size_of::()); + *b = rest; + Ok(u64::from_be_bytes(n.try_into()?)) + } +} + +fn write64(n: u64, mut out: impl std::io::Write) -> std::io::Result<()> { + out.write_all(n.to_be_bytes().as_slice()) +} + +impl Metadata { + pub(crate) fn from_bytes(mut b: &[u8]) -> Result { + if b.len() < size_of::() { + return Err("metadata block too short".into()); + } else if u32::from_be_bytes(b[b.len() - size_of::()..].try_into()?) != MTBL_MAGIC { + return Err("bad magic".into()); + } + Ok(Self { + index_block_offset: try_read64(&mut b)? as usize, + data_block_size: try_read64(&mut b)? as usize, + compression_algorithm: try_read64(&mut b)?.try_into()?, + count_entries: try_read64(&mut b)?, + count_data_blocks: try_read64(&mut b)?, + bytes_data_blocks: try_read64(&mut b)? as usize, + bytes_index_block: try_read64(&mut b)? as usize, + bytes_keys: try_read64(&mut b)? as usize, + bytes_values: try_read64(&mut b)? as usize, + }) + } + + pub(crate) fn write_to(&self, mut out: impl std::io::Write) -> Result<()> { + write64(self.index_block_offset as u64, &mut out)?; + write64(self.data_block_size as u64, &mut out)?; + write64(self.compression_algorithm.into(), &mut out)?; + write64(self.count_entries, &mut out)?; + write64(self.count_data_blocks, &mut out)?; + write64(self.bytes_data_blocks as u64, &mut out)?; + write64(self.bytes_index_block as u64, &mut out)?; + write64(self.bytes_keys as u64, &mut out)?; + write64(self.bytes_values as u64, &mut out)?; + let bytes_written = 9 * 8; + out.write_all(vec![0; 512 - bytes_written - size_of::()].as_slice())?; + out.write_all(MTBL_MAGIC.to_be_bytes().as_slice())?; + Ok(()) + } + + pub(crate) fn add_entry(&mut self, len_key: usize, len_val: usize) { + self.count_entries += 1; + self.bytes_keys += len_key; + self.bytes_values += len_val; + } + + pub(crate) fn add_data_block(&mut self, size: usize) { + self.count_data_blocks += 1; + self.bytes_data_blocks += size; + } + + pub(crate) fn add_index_block(&mut self, offset: usize, bytes: usize) { + self.index_block_offset = offset; + self.bytes_index_block = bytes; + } +} + +#[cfg(test)] +mod test { + use super::Metadata; + + #[test] + fn test_metadata_default() { + let mut m = Metadata::default(); + m.add_entry(1, 2); + m.add_data_block(18); + m.add_index_block(18, 512); + let mut v = Vec::::new(); + m.write_to(&mut v).expect("write_to"); + assert_eq!(v.len(), 512); + let m2 = Metadata::from_bytes(v.as_slice()).unwrap(); + assert_eq!(m2.count_entries, 1); + assert_eq!(m2.bytes_keys, 1); + assert_eq!(m2.bytes_values, 2); + } +} -- 2.50.1