.nth(1)
.expect("Usage: mtbl_dump <filename>");
let reader = mtbl::reader::from_file(fname);
- for e in reader.iter() {
- println!("{:?}: {:?}", e.key(), e.value());
+ for (k, v) in reader.iter() {
+ println!("{:?}: {:?}", k, v);
}
}
+use crate::Result;
use std::{
io::{Read, Write},
ops::Deref,
}
impl Compression {
- pub(crate) fn compress<B: AsRef<[u8]>>(
- &self,
- buf: B,
- ) -> Result<CBuf<B>, Box<dyn std::error::Error>> {
+ pub(crate) fn compress<B: AsRef<[u8]>>(&self, buf: B) -> Result<CBuf<B>> {
match self {
Compression::None => Ok(CBuf::Buf(buf)),
Compression::Snappy => Ok(CBuf::Vec(
}
}
- pub(crate) fn uncompress<B: AsRef<[u8]>>(&self, buf: B) -> Option<CBuf<B>> {
+ pub(crate) fn uncompress<B: AsRef<[u8]>>(&self, buf: B) -> Result<CBuf<B>> {
match self {
- Compression::None => Some(CBuf::Buf(buf)),
- Compression::Snappy => Some(CBuf::Vec(
- snap::raw::Decoder::new()
- .decompress_vec(buf.as_ref())
- .ok()?,
+ Compression::None => Ok(CBuf::Buf(buf)),
+ Compression::Snappy => Ok(CBuf::Vec(
+ snap::raw::Decoder::new().decompress_vec(buf.as_ref())?,
)),
Compression::Zlib(_) => {
let mut v = Vec::<u8>::new();
{
let mut dec = flate2::read::ZlibDecoder::new(buf.as_ref());
- dec.read_to_end(&mut v).ok()?;
+ dec.read_to_end(&mut v)?;
}
- Some(CBuf::Vec(v))
+ Ok(CBuf::Vec(v))
}
Compression::Zstd(_) => {
- let mut dec = zstd::bulk::Decompressor::new().ok()?;
- Some(CBuf::Vec(
- dec.decompress(buf.as_ref(), 100 * buf.as_ref().len())
- .ok()?,
+ let mut dec = zstd::bulk::Decompressor::new()?;
+ Ok(CBuf::Vec(
+ dec.decompress(buf.as_ref(), 100 * buf.as_ref().len())?,
))
}
}
impl<C, F> Filter<C, F>
where
C: Cursor,
+ C::Value: std::fmt::Debug,
F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
{
fn find_match(&mut self) {
while let Some((k, v)) = self.cursor.get() {
- self.seekto.clear();
if (self.filter)((k, v), &mut self.seekto) {
break;
}
self.cursor.advance();
} else {
self.cursor.seek(self.seekto.as_slice());
+ self.seekto.clear();
}
}
}
impl<C, F> Cursor for Filter<C, F>
where
C: Cursor,
+ C::Value: std::fmt::Debug,
F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
{
type Value = C::Value;
}
fn advance(&mut self) {
- self.cursor.advance();
+ if self.seekto.is_empty() {
+ self.cursor.advance();
+ } else {
+ // A previous filter passed its item but left
+ // a seek key for the next potential match.
+ self.cursor.seek(self.seekto.as_slice());
+ self.seekto.clear();
+ }
self.find_match();
}
}
impl<C, F> IntoIterator for Filter<C, F>
where
C: Cursor,
- C::Value: Clone,
+ C::Value: Clone + std::fmt::Debug,
F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> bool,
{
type Item = (Vec<u8>, C::Value);
#[cfg(test)]
mod test {
- use super::{Cursor, Filter};
+ use super::Cursor;
struct TestSource<T>(Vec<(Vec<u8>, T)>, Option<usize>);
fn test_filter() {
let range = 1u64..64;
let ts = TestSource::<u64>::from(range.clone());
- let filter = Filter::new(ts, |(_, v), _| v % 3 == 0);
+ let fiter = ts.filter(|(_, v), _| v % 3 == 0).into_iter();
+ let tv = Iterator::map(fiter, |(_, v)| v).collect::<Vec<_>>();
let check = range.into_iter().filter(|i| i % 3 == 0).collect::<Vec<_>>();
- assert_eq!(
- filter.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
- check
- )
+ assert_eq!(tv, check)
}
}
{
fn find_match(&mut self) {
while let Some((k, v)) = self.cursor.get() {
- self.seekto.clear();
match (self.filtermap)((k, v), &mut self.seekto) {
Some(val) => {
self.value.replace(val);
self.cursor.advance();
} else {
self.cursor.seek(self.seekto.as_slice());
+ self.seekto.clear();
}
}
}
}
fn advance(&mut self) {
- self.cursor.advance();
+ if self.seekto.is_empty() {
+ self.cursor.advance();
+ } else {
+ self.cursor.seek(self.seekto.as_slice());
+ self.seekto.clear();
+ }
self.find_match();
}
}
if self.get().is_none() {
self.advance();
}
- FilterMapIter {
- cursor: self,
- first: true,
- }
+ FilterMapIter { cursor: self }
}
}
-struct FilterMapIter<C, F, O>
+pub struct FilterMapIter<C, F, O>
where
C: Cursor,
O: Clone,
F: FnMut((&[u8], &C::Value), &mut Vec<u8>) -> Option<O>,
{
cursor: FilterMap<C, F, O>,
- first: bool,
}
impl<C, F, O> Iterator for FilterMapIter<C, F, O>
type Item = (Vec<u8>, O);
fn next(&mut self) -> Option<Self::Item> {
- if !self.first {
+ let (k, _) = self.cursor.get()?;
+ let k = Vec::from(k);
+ let res = self.cursor.value.take().map(|v| (k, v));
+ if res.is_some() {
self.cursor.advance();
}
- self.first = false;
- let (k, _) = self.cursor.get()?;
- self.cursor.value.take().map(|v| (Vec::from(k), v))
+ res
}
}
value: Option<O>,
}
+impl<C, F, O> Map<C, F, O>
+where
+ C: Cursor,
+ F: FnMut((&[u8], &C::Value)) -> O,
+{
+ pub fn new(cursor: C, map: F) -> Self {
+ Self {
+ cursor,
+ map,
+ value: None,
+ }
+ }
+
+ fn update(&mut self) {
+ self.cursor
+ .get()
+ .and_then(|(k, v)| self.value.replace((self.map)((k, v))))
+ .or_else(|| self.value.take());
+ }
+}
+
impl<C, F, O> Cursor for Map<C, F, O>
where
C: Cursor,
fn advance(&mut self) {
self.cursor.advance();
+ self.update();
}
fn seek(&mut self, key: &[u8]) {
self.cursor.seek(key);
+ self.update();
}
}
if self.get().is_none() {
self.advance();
}
- MapIter {
- cursor: self,
- first: true,
- }
+ MapIter { cursor: self }
}
}
-struct MapIter<C, F, O>
+pub struct MapIter<C, F, O>
where
C: Cursor,
F: FnMut((&[u8], &C::Value)) -> O,
{
cursor: Map<C, F, O>,
- first: bool,
}
impl<C, F, O> Iterator for MapIter<C, F, O>
{
type Item = (Vec<u8>, O);
fn next(&mut self) -> Option<Self::Item> {
- if !self.first {
+ let (k, _) = self.cursor.get()?;
+ let k = Vec::from(k); // needs to be here to drop borrowed k from self.cursor
+ let res = self.cursor.value.take().map(|v| (k, v));
+ if res.is_some() {
self.cursor.advance();
}
- self.first = false;
- let (k, _) = self.cursor.get()?;
- self.cursor.value.take().map(|v| (Vec::from(k), v))
+ res
}
}
use super::Cursor;
use std::borrow::Borrow;
-struct Merge<C, F>
+pub struct Merge<C, F>
where
C: Cursor,
C::Value: ToOwned,
- F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+ F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
{
cursor: C,
merge: F,
- needs_advance: bool,
merge_key: Vec<u8>,
merge_val: Option<<C::Value as ToOwned>::Owned>,
}
where
C: Cursor,
C::Value: ToOwned,
- F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+ F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
{
pub fn new(cursor: C, merge: F) -> Self {
Self {
cursor,
merge,
- needs_advance: true,
merge_key: Vec::new(),
merge_val: None,
}
.as_mut()
.map(|mv| v.clone_into(mv))
.or_else(|| {
- self.merge_val.insert(v.to_owned());
+ self.merge_val = Some(v.to_owned());
None
});
}
}
+ self.cursor.advance();
while let Some((k, v)) = self.cursor.get() {
if k != self.merge_key.as_slice() {
break;
}
- self.merge_val.as_mut().map(|mv| (self.merge)(mv, v));
+ if let Some(mv) = self.merge_val.as_mut() {
+ (self.merge)(k, v, mv);
+ }
+ self.cursor.advance();
}
}
}
where
C: Cursor,
C::Value: ToOwned,
- F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+ F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
{
type Value = C::Value;
where
C: Cursor,
C::Value: ToOwned,
- F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+ F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
{
type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
type IntoIter = MergeIter<C, F>;
if self.get().is_none() {
self.advance()
}
- MergeIter {
- cursor: self,
- first: true,
- }
+ MergeIter { cursor: self }
}
}
where
C: Cursor,
C::Value: ToOwned,
- F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+ F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
{
cursor: Merge<C, F>,
- first: bool,
}
impl<C, F> Iterator for MergeIter<C, F>
where
C: Cursor,
C::Value: ToOwned,
- F: FnMut(&mut <C::Value as ToOwned>::Owned, &C::Value),
+ F: FnMut(&[u8], &C::Value, &mut <C::Value as ToOwned>::Owned),
{
type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
fn next(&mut self) -> Option<Self::Item> {
- if !self.first {
- self.cursor.advance();
- }
- self.first = false;
- self.cursor
+ let res = self
+ .cursor
.merge_val
.take()
- .map(|mv| (self.cursor.merge_key.clone(), mv))
+ .map(|mv| (self.cursor.merge_key.clone(), mv));
+ if res.is_some() {
+ self.cursor.advance();
+ }
+ res
}
}
last: Vec<u8>,
}
+impl<I, C: Cursor + Ord> From<I> for MergeCursor<C>
+where
+ I: IntoIterator<Item = C>,
+{
+ fn from(iter: I) -> Self {
+ Self {
+ pending: Vec::from_iter(iter),
+ active: BinaryHeap::new(),
+ last: Vec::new(),
+ }
+ }
+}
+/*
impl<C: Cursor> MergeCursor<KeyOrd<C>> {
fn from<I>(iter: I) -> Self
where
}
}
}
-
+*/
impl<C: Cursor + Ord> Cursor for MergeCursor<C> {
type Value = C::Value;
fn get(&self) -> Option<(&[u8], &Self::Value)> {
- self.active.peek().map(|Reverse(c)| c.get()).flatten()
+ self.active.peek().and_then(|Reverse(c)| c.get())
}
fn advance(&mut self) {
.peek()
.map(|Reverse(c)| c.get().map(|(k, _)| k.clone_into(&mut self.last)));
} else if let Some(mut rcur) = self.active.peek_mut() {
- rcur.0.get().map(|(k, _)| k.clone_into(&mut self.last));
- rcur.0.advance();
+ if let Some((k, _)) = rcur.0.get() {
+ k.clone_into(&mut self.last);
+ rcur.0.advance();
+ }
// XXX cannot remove a cursor when `get()` return None after an advance.
// Finished cursors will remain at the bottom of the heap.
}
impl<C> IntoIterator for MergeCursor<C>
where
C: Cursor + Ord,
- C::Value: Clone,
+ C::Value: ToOwned,
{
- type Item = (Vec<u8>, C::Value);
+ type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
type IntoIter = KVIter<Self>;
fn into_iter(self) -> Self::IntoIter {
Cursor::to_iter(self)
}
}
-fn test_merger_construction<C: Cursor>(i: impl Iterator<Item = C>)
-where
- C::Value: Ord,
-{
- let _ = MergeCursor::<KeyOrd<_>>::from(i);
-}
-
-pub struct KeyOrd<C: Cursor>(C);
-pub struct KeyValOrd<C: Cursor>(C);
+pub struct KeyOrd<C: Cursor>(pub C);
+pub struct KeyValOrd<C: Cursor>(pub C);
// Ordering implementation for KeyOrd, KeyValOrd
impl<C: Cursor> Cursor for KeyOrd<C> {
impl<C: Cursor> PartialOrd for KeyOrd<C> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- let ks = self.0.get().map(|(k, _)| k);
- let ko = other.0.get().map(|(k, _)| k);
- Some(ks.cmp(&ko))
+ Some(self.cmp(other))
}
}
C::Value: Ord,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- let sp = self.0.get();
- let op = other.0.get();
- Some(sp.cmp(&op))
+ Some(self.cmp(other))
}
}
pub mod range;
pub trait Cursor {
- type Value;
+ type Value: ?Sized;
// required methods
fn get(&self) -> Option<(&[u8], &Self::Value)>;
fn to_iter(mut self) -> KVIter<Self>
where
Self: Sized,
- Self::Value: Clone,
+ Self::Value: ToOwned,
{
if self.get().is_none() {
self.advance();
}
- KVIter(self, true)
+ KVIter { cursor: self }
}
fn filter<F>(self, filter: F) -> filter::Filter<Self, F>
where
Self: Sized,
+ Self::Value: std::fmt::Debug,
F: FnMut((&[u8], &Self::Value), &mut Vec<u8>) -> bool,
{
filter::Filter::new(self, filter)
}
+ fn map<F, O>(self, map: F) -> map::Map<Self, F, O>
+ where
+ Self: Sized,
+ F: FnMut((&[u8], &Self::Value)) -> O,
+ {
+ map::Map::new(self, map)
+ }
+
fn filtermap<F, O>(self, filtermap: F) -> filtermap::FilterMap<Self, F, O>
where
Self: Sized,
{
filtermap::FilterMap::new(self, filtermap)
}
+
+ fn dup_merge<F>(self, merge: F) -> merge::Merge<Self, F>
+ where
+ Self: Sized,
+ Self::Value: ToOwned,
+ F: FnMut(&[u8], &Self::Value, &mut <Self::Value as ToOwned>::Owned),
+ {
+ merge::Merge::new(self, merge)
+ }
}
-pub struct KVIter<T>(T, bool);
+pub struct KVIter<C: Cursor> {
+ cursor: C,
+}
impl<C> Iterator for KVIter<C>
where
C: Cursor,
- C::Value: Clone,
+ C::Value: ToOwned,
{
- type Item = (Vec<u8>, C::Value);
+ type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
fn next(&mut self) -> Option<Self::Item> {
- let first = self.1;
- if !first {
- self.0.advance();
+ let ires = self.cursor.get();
+ let res = ires.map(|(k, v)| (Vec::from(k), v.to_owned()));
+ if res.is_some() {
+ self.cursor.advance();
}
- self.1 = false;
- self.0.get().map(|(k, v)| (Vec::from(k), v.clone()))
+ res
}
}
type Value = C::Value;
fn get(&self) -> Option<(&[u8], &Self::Value)> {
- self.0.get()
+ self.cursor.get()
}
fn advance(&mut self) {
- self.0.advance();
+ self.cursor.advance();
}
fn seek(&mut self, key: &[u8]) {
- self.0.seek(key);
+ self.cursor.seek(key);
}
}
-impl<C: Cursor> Cursor for Box<C> {
+impl<C: Cursor + ?Sized> Cursor for Box<C> {
type Value = C::Value;
fn get(&self) -> Option<(&[u8], &Self::Value)> {
self.as_mut().seek(key);
}
}
+
+pub(crate) struct VecCursor {
+ data: Vec<(Vec<u8>, Vec<u8>)>,
+ idx: Option<usize>,
+}
+
+impl From<Vec<(Vec<u8>, Vec<u8>)>> for VecCursor {
+ fn from(data: Vec<(Vec<u8>, Vec<u8>)>) -> Self {
+ Self { data, idx: None }
+ }
+}
+
+impl Cursor for VecCursor {
+ type Value = [u8];
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ self.idx.as_ref().map(|idx| {
+ let pair = &self.data[*idx];
+ (pair.0.as_slice(), pair.1.as_slice())
+ })
+ }
+
+ fn advance(&mut self) {
+ match self.idx.take() {
+ Some(idx) => self.idx.replace(idx + 1),
+ None => self.idx.replace(0),
+ };
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ let idx = self
+ .data
+ .binary_search_by_key(&Vec::from(key), |(k, _)| k.clone())
+ .unwrap_or(self.data.len());
+ self.idx.replace(idx);
+ }
+}
+
+impl IntoIterator for VecCursor {
+ type Item = (Vec<u8>, Vec<u8>);
+ type IntoIter = KVIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ self.to_iter()
+ }
+}
+
+struct RangeCursor(Option<(Vec<u8>, usize)>, usize);
+
+impl Cursor for RangeCursor {
+ type Value = usize;
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ self.0.as_ref().map(|(k, v)| (k.as_slice(), v))
+ }
+
+ fn advance(&mut self) {
+ match self.0.take() {
+ Some((_, mut v)) => {
+ if v < self.1 {
+ v += 1;
+ self.0.replace((v.to_be_bytes().into(), v));
+ }
+ }
+ None => {
+ self.0.replace((0usize.to_be_bytes().into(), 0));
+ }
+ }
+ }
+
+ fn seek(&mut self, key: &[u8]) {
+ let idx = usize::from_be_bytes(key.try_into().unwrap_or([255; (usize::BITS / 8) as usize]));
+ if idx < self.1 {
+ self.0.replace((idx.to_be_bytes().into(), idx));
+ } else {
+ self.0.take();
+ }
+ }
+}
+
+impl IntoIterator for RangeCursor {
+ type Item = (Vec<u8>, usize);
+ type IntoIter = KVIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ self.to_iter()
+ }
+}
+
+#[test]
+fn test_range_cursor() {
+ let rc = RangeCursor(None, 10);
+ let rcv = rc.to_iter().collect::<Vec<_>>();
+ let check = (0usize..11)
+ .map(|i| (Vec::from(i.to_be_bytes()), i))
+ .collect::<Vec<_>>();
+ assert_eq!(rcv, check)
+}
+use crate::cursor::KVIter;
+
use super::Cursor;
pub struct RangeCursor<C: Cursor> {
cursor: C,
begin: Vec<u8>,
end: Vec<u8>,
+ first: bool,
}
impl<C: Cursor> RangeCursor<C> {
pub fn new(cursor: C, begin: &[u8], end: &[u8]) -> Self {
- let begin = Vec::from(begin.as_ref());
+ let begin = Vec::from(begin);
let end = Vec::from(end);
- Self { cursor, begin, end }
+ Self {
+ cursor,
+ begin,
+ end,
+ first: true,
+ }
}
}
}
fn advance(&mut self) {
+ if self.first && self.cursor.get().is_none() {
+ self.first = false;
+ self.cursor.seek(self.begin.as_slice());
+ return;
+ }
self.cursor.advance();
}
}
}
+impl<C: Cursor> IntoIterator for RangeCursor<C>
+where
+ C::Value: ToOwned,
+{
+ type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
+ type IntoIter = KVIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ Cursor::to_iter(self)
+ }
+}
+
pub struct PrefixCursor<C: Cursor> {
cursor: C,
prefix: Vec<u8>,
+ first: bool,
}
impl<C: Cursor> PrefixCursor<C> {
pub fn new(cursor: C, prefix: &[u8]) -> Self {
let prefix = Vec::from(prefix);
- Self { cursor, prefix }
+ Self {
+ cursor,
+ prefix,
+ first: true,
+ }
}
}
}
fn advance(&mut self) {
+ if self.first && self.cursor.get().is_none() {
+ self.first = false;
+ self.cursor.seek(self.prefix.as_slice());
+ return;
+ }
self.cursor.advance();
}
self.cursor.seek(key);
}
}
+
+impl<C: Cursor> IntoIterator for PrefixCursor<C>
+where
+ C::Value: ToOwned,
+{
+ type Item = (Vec<u8>, <C::Value as ToOwned>::Owned);
+ type IntoIter = KVIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ Cursor::to_iter(self)
+ }
+}
+++ /dev/null
-use crate::iter::merger::Mergeable;
-use std::cmp::Ordering;
-use std::sync::Arc;
-
-#[derive(Debug, Clone)]
-pub struct Entry {
- key: Arc<Vec<u8>>,
- value: Arc<Vec<u8>>,
-}
-
-impl Entry {
- pub fn new(key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
- Entry {
- key: Arc::new(Vec::from(key.as_ref())),
- value: Arc::new(Vec::from(value.as_ref())),
- }
- }
-
- pub fn replace(&mut self, e: &Entry) {
- let key = Arc::make_mut(&mut self.key);
- let value = Arc::make_mut(&mut self.value);
- key.clear();
- key.extend_from_slice(e.key.as_slice());
- value.clear();
- value.extend_from_slice(e.value.as_slice());
- }
-
- pub fn clear(&mut self) {
- Arc::make_mut(&mut self.key).clear();
- Arc::make_mut(&mut self.value).clear();
- }
-
- pub fn key(&self) -> &[u8] {
- self.key.as_slice()
- }
-
- pub fn key_mut(&mut self) -> &mut Vec<u8> {
- Arc::make_mut(&mut self.key)
- }
-
- pub fn value(&self) -> &[u8] {
- self.value.as_slice()
- }
-
- pub fn value_mut(&mut self) -> &mut Vec<u8> {
- Arc::make_mut(&mut self.value)
- }
-}
-
-impl Mergeable for Entry {}
-
-impl PartialOrd<[u8]> for Entry {
- fn partial_cmp(&self, other: &[u8]) -> Option<Ordering> {
- Some(self.key().cmp(other))
- }
-}
-
-impl PartialEq<[u8]> for Entry {
- fn eq(&self, other: &[u8]) -> bool {
- self.key() == other
- }
-}
-
-impl AsRef<[u8]> for Entry {
- fn as_ref(&self) -> &[u8] {
- self.key()
- }
-}
-
-impl PartialEq for Entry {
- fn eq(&self, other: &Entry) -> bool {
- self.key() == other.key()
- }
-}
-impl Eq for Entry {}
-
-impl PartialOrd for Entry {
- fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
- Some(self.key().cmp(other.key()))
- }
-}
-
-impl Ord for Entry {
- fn cmp(&self, other: &Self) -> Ordering {
- self.key().cmp(other.key())
- }
-}
#![allow(dead_code)]
//use memmap::Mmap;
-use crate::merger::Merger;
use crate::reader::FileReader;
+use crate::source::Merger;
use crate::Result;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
+++ /dev/null
-use crate::iter::{IntoIter, Iter};
-
-pub struct MergeIter<I, F>
-where
- I: Iter,
- I::Item: PartialEq,
- F: FnMut(&mut I::Item, &I::Item),
-{
- prev: Option<I::Item>,
- iter: I,
- merge_func: F,
-}
-
-impl<I, F> MergeIter<I, F>
-where
- I: Iter,
- I::Item: PartialEq,
- F: FnMut(&mut I::Item, &I::Item),
-{
- pub fn new(iter: I, merge_func: F) -> Self {
- Self {
- prev: None,
- iter,
- merge_func,
- }
- }
-}
-
-impl<I, F> Iter for MergeIter<I, F>
-where
- I: Iter,
- I::Item: PartialEq,
- F: FnMut(&mut I::Item, &I::Item),
-{
- type Item = I::Item;
-
- fn next(&mut self) -> Option<Self::Item> {
- let mut cur = self.prev.take().or_else(|| self.iter.next())?;
- while let Some(e) = self.iter.next() {
- if cur == e {
- (self.merge_func)(&mut cur, &e);
- } else {
- self.prev.replace(e);
- break;
- }
- }
- Some(cur)
- }
-
- fn seek(&mut self, key: &[u8]) {
- self.prev.take();
- self.iter.seek(key);
- }
-}
-
-impl<I, F> IntoIterator for MergeIter<I, F>
-where
- I: Iter,
- I::Item: PartialEq,
- F: FnMut(&mut I::Item, &I::Item),
-{
- type Item = I::Item;
- type IntoIter = IntoIter<Self>;
-
- fn into_iter(self) -> Self::IntoIter {
- IntoIter(self)
- }
-}
-
-#[test]
-fn test_merge_func() {
- use crate::source::test_source::TestSource;
- use crate::Entry;
- use crate::Source;
-
- let ts = TestSource(
- (1u8..8)
- .flat_map(|n| vec![n; n as usize])
- .map(|n| Entry::new(vec![n], vec![1]))
- .collect(),
- );
-
- let s = ts.dup_merge(|v, e| {
- v.value_mut()[0] += e.value()[0];
- });
-
- assert_eq!(
- Vec::from_iter(s.iter()),
- (1u8..8)
- .map(|n| Entry::new(vec![n], vec![n]))
- .collect::<Vec<_>>()
- )
-}
+++ /dev/null
-use crate::iter::{IntoIter, Iter};
-use std::cmp::Ordering;
-
-#[derive(Debug)]
-pub struct DupsortIter<I, F>
-where
- I: Iter,
- F: FnMut(&I::Item, &I::Item) -> Ordering,
-{
- run: Vec<I::Item>,
- next: Option<I::Item>,
- iter: I,
- dupsort_func: F,
-}
-
-impl<I, F> DupsortIter<I, F>
-where
- I: Iter,
- F: FnMut(&I::Item, &I::Item) -> Ordering,
-{
- pub fn new(iter: I, dupsort_func: F) -> Self {
- Self {
- run: Vec::new(),
- next: None,
- iter,
- dupsort_func,
- }
- }
-}
-
-impl<I, F> IntoIterator for DupsortIter<I, F>
-where
- I: Iter,
- I::Item: PartialEq,
- F: Fn(&I::Item, &I::Item) -> Ordering,
-{
- type Item = I::Item;
- type IntoIter = IntoIter<Self>;
- fn into_iter(self) -> Self::IntoIter {
- IntoIter(self)
- }
-}
-
-impl<I, F> Iter for DupsortIter<I, F>
-where
- I: Iter,
- I::Item: PartialEq,
- F: Fn(&I::Item, &I::Item) -> Ordering,
-{
- type Item = I::Item;
-
- fn seek(&mut self, key: &[u8]) {
- self.run.clear();
- self.next.take();
- self.iter.seek(key);
- }
-
- fn next(&mut self) -> Option<Self::Item> {
- self.run.pop().or_else(|| {
- self.run
- .push(self.next.take().or_else(|| self.iter.next())?);
-
- while let Some(e) = self.iter.next() {
- if e == self.run[0] {
- self.run.push(e);
- continue;
- }
- self.next.replace(e);
- break;
- }
- // sort in reverse order, so self.run.pop() can be the usual
- // return value.
- self.run.sort_unstable_by(|a, b| (self.dupsort_func)(b, a));
- self.run.pop()
- })
- }
-}
-
-#[test]
-fn test_dupsort() {
- use crate::source::test_source::TestSource;
- use crate::Entry;
- use crate::Source;
-
- let ts = TestSource(
- (1u8..10)
- .flat_map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j])))
- .collect(),
- );
-
- let s = ts.dupsort(|a, b| a.value()[0].cmp(&b.value()[0]));
-
- assert_eq!(
- Vec::from_iter(s.iter()),
- (1u8..10)
- .flat_map(|i| (i..1).map(move |j| Entry::new(vec![i], vec![j])))
- .collect::<Vec<_>>()
- );
-}
+++ /dev/null
-use crate::iter::{IntoIter, Iter};
-
-pub struct FilterIter<I, F> {
- iter: I,
- filter: F,
- seek_key: Vec<u8>,
-}
-
-impl<I, F> FilterIter<I, F>
-where
- I: Iter,
- F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
-{
- pub fn new(iter: I, filter: F) -> Self {
- let seek_key = Vec::new();
- Self {
- iter,
- filter,
- seek_key,
- }
- }
-}
-
-impl<I, F> Iter for FilterIter<I, F>
-where
- I: Iter,
- F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
-{
- type Item = I::Item;
-
- fn next(&mut self) -> Option<Self::Item> {
- loop {
- let item = self.iter.next()?;
- self.seek_key.clear();
- if (self.filter)(&item, &mut self.seek_key) {
- return Some(item);
- }
- if !self.seek_key.is_empty() {
- self.iter.seek(self.seek_key.as_slice());
- }
- }
- }
-
- fn seek(&mut self, key: &[u8]) {
- self.iter.seek(key);
- }
-}
-
-impl<I, F> IntoIterator for FilterIter<I, F>
-where
- I: Iter,
- F: FnMut(&I::Item, &mut Vec<u8>) -> bool,
-{
- type Item = I::Item;
- type IntoIter = IntoIter<Self>;
- fn into_iter(self) -> Self::IntoIter {
- IntoIter(self)
- }
-}
-
-#[test]
-fn test_filter() {
- use crate::source::test_source::TestSource;
- use crate::source::Source;
- use crate::Entry;
- use std::iter::IntoIterator;
-
- let ts =
- TestSource((0u8..10).map(|n| Entry::new(vec![n], vec![])).collect()).filter(|e, sv| {
- // pass only even values
- if e.key()[0] % 2 != 0 {
- return false;
- } else if e.key()[0] == 4 {
- // at key 4, seek to 8
- sv.push(8);
- return false;
- }
- true
- });
-
- assert_eq!(
- vec![0, 2, 8],
- ts.map(|e| e.key()[0])
- .iter()
- .into_iter()
- .collect::<Vec<u8>>()
- );
-}
+++ /dev/null
-use crate::iter::{IntoIter, Iter};
-
-pub struct MapIter<I, F, O>
-where
- I: Iter,
- F: FnMut(I::Item) -> O,
-{
- iter: I,
- mapf: F,
-}
-
-impl<I, F, O> MapIter<I, F, O>
-where
- I: Iter,
- F: FnMut(I::Item) -> O,
-{
- pub fn new(iter: I, mapf: F) -> Self {
- Self { iter, mapf }
- }
-}
-
-impl<I, F, O> Iter for MapIter<I, F, O>
-where
- I: Iter,
- F: FnMut(I::Item) -> O,
-{
- type Item = O;
-
- fn next(&mut self) -> Option<Self::Item> {
- let item = self.iter.next()?;
- Some((self.mapf)(item))
- }
-
- fn seek(&mut self, key: &[u8]) {
- self.iter.seek(key);
- }
-}
-
-impl<I, F, O> IntoIterator for MapIter<I, F, O>
-where
- I: Iter,
- F: FnMut(I::Item) -> O,
-{
- type Item = O;
- type IntoIter = IntoIter<Self>;
-
- fn into_iter(self) -> Self::IntoIter {
- IntoIter(self)
- }
-}
-
-#[cfg(test)]
-mod test {
- use crate::source::test_source::TestSource;
- use crate::{Entry, Iter, Source};
-
- struct CompVec(Vec<u8>);
- impl PartialEq<[u8]> for CompVec {
- fn eq(&self, other: &[u8]) -> bool {
- self.0.as_slice().eq(other)
- }
- }
- impl PartialOrd<[u8]> for CompVec {
- fn partial_cmp(&self, other: &[u8]) -> Option<std::cmp::Ordering> {
- self.0.as_slice().partial_cmp(other)
- }
- }
-
- #[test]
- fn test_mapsource() {
- let ts = TestSource((0u8..10).map(|i| Entry::new(vec![i], vec![])).collect())
- .map(|e| CompVec(Vec::from(e.key())));
- assert_eq!(
- ts.iter().map(|cv| cv.0).into_iter().collect::<Vec<_>>(),
- (0u8..10).map(|i| vec![i]).collect::<Vec<_>>()
- );
- assert_eq!(
- ts.get_range(&[2u8], &[7u8])
- .map(|cv| cv.0)
- .into_iter()
- .collect::<Vec<_>>(),
- (2u8..8)
- .map(|i| {
- println!("{i}");
- vec![i]
- })
- .collect::<Vec<_>>()
- )
- }
-}
+++ /dev/null
-use crate::iter::{IntoIter, Iter};
-use std::cmp::Ordering;
-use std::collections::BinaryHeap;
-
-pub trait Mergeable: Ord + PartialOrd<[u8]> + AsRef<[u8]> + Clone {}
-
-struct MergeEntry<I: Iter> {
- e: I::Item,
- it: I,
-}
-
-impl<I: Iter> PartialEq for MergeEntry<I>
-where
- I::Item: PartialEq,
-{
- fn eq(&self, other: &Self) -> bool {
- self.e == other.e
- }
-}
-
-impl<I: Iter> Eq for MergeEntry<I> where I::Item: PartialEq {}
-
-// Note: MergEntry Ord, PartialOrd is reversed to provide "min heap" semantics
-impl<I: Iter> PartialOrd for MergeEntry<I>
-where
- I::Item: PartialOrd,
-{
- fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
- other.e.partial_cmp(&self.e)
- }
-}
-
-impl<I: Iter> Ord for MergeEntry<I>
-where
- I::Item: Ord,
-{
- fn cmp(&self, other: &Self) -> Ordering {
- other.e.cmp(&self.e)
- }
-}
-
-pub struct MergeIter<I: Iter> {
- heap: BinaryHeap<MergeEntry<I>>,
- finished: Vec<I>,
- last_key: Vec<u8>,
-}
-
-impl<I> From<I> for MergeIter<I::Item>
-where
- I: Iterator,
- I::Item: Iter,
- <I::Item as Iter>::Item: Mergeable,
-{
- fn from(iters: I) -> Self {
- let mut v: Vec<I::Item> = Vec::new();
- let h = BinaryHeap::from_iter(iters.filter_map(|mut it| match it.next() {
- Some(e) => Some(MergeEntry { e, it }),
- None => {
- v.push(it);
- None
- }
- }));
- MergeIter {
- finished: v,
- heap: h,
- last_key: Vec::new(),
- }
- }
-}
-
-impl<I: Iter> Iter for MergeIter<I>
-where
- I::Item: Mergeable,
-{
- type Item = I::Item;
-
- fn next(&mut self) -> Option<Self::Item> {
- let cur;
- {
- let mut next = self.heap.peek_mut()?;
-
- cur = next.e.clone();
- self.last_key.clear();
- self.last_key.extend(cur.as_ref());
-
- if let Some(e) = next.it.next() {
- next.e = e;
- return Some(cur);
- }
- }
- self.heap.pop();
- Some(cur)
- }
-
- fn seek(&mut self, key: &[u8]) {
- if key >= self.last_key.as_ref() {
- loop {
- match self.heap.peek_mut() {
- None => return,
- Some(mut head) => {
- if &head.e >= key {
- return;
- }
- head.it.seek(key);
- if let Some(e) = head.it.next() {
- head.e = e;
- continue;
- }
- }
- }
- self.heap.pop();
- }
- }
-
- // backwards seek; reset heap
- let mut finished: Vec<I> = Vec::new();
- let mut heap_entries: Vec<MergeEntry<I>> = Vec::new();
- for mut it in self
- .heap
- .drain()
- .map(|me| me.it)
- .chain(self.finished.drain(..))
- {
- it.seek(key);
- match it.next() {
- Some(e) => heap_entries.push(MergeEntry { e, it }),
- None => finished.push(it),
- }
- }
- self.heap = BinaryHeap::from_iter(heap_entries);
- self.finished = finished;
- self.last_key.clear();
- self.last_key.extend(key);
- }
-}
-
-impl<I: Iter> IntoIterator for MergeIter<I>
-where
- I::Item: Mergeable,
-{
- type Item = I::Item;
- type IntoIter = IntoIter<Self>;
- fn into_iter(self) -> Self::IntoIter {
- IntoIter(self)
- }
-}
-
-#[cfg(test)]
-mod test {
- use crate::source::test_source::TestSource;
- use crate::Entry;
- use crate::Merger;
- use crate::{Iter, Source};
-
- fn tnum(m: u8) -> Vec<u8> {
- Vec::from_iter((1u8..255).filter(|i| i % m == 0))
- }
-
- fn test_source(m: u8) -> TestSource {
- TestSource(Vec::from_iter(
- tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])),
- ))
- }
-
- #[test]
- fn test_merge() {
- let range = 1..8;
- let iters: Vec<_> = range
- .clone()
- .map(test_source)
- .map(|s| s.into_boxed())
- .collect();
- let s = Merger::from(iters);
- let mut v = Vec::<u8>::new();
- for i in range {
- v.extend(tnum(i))
- }
- v.sort();
- let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0]));
- assert_eq!(v2, v);
- }
-
- #[test]
- fn test_binheap() {
- use std::collections::BinaryHeap;
-
- let v: Vec<u8> = vec![1, 8, 2, 9, 4, 7, 3];
- let vs = v.as_slice();
- let h = BinaryHeap::from_iter(vs.iter().copied());
- assert_ne!(h.into_vec(), v);
- }
-}
+++ /dev/null
-use std::cmp::Ordering;
-pub mod dupmerge;
-pub mod dupsort;
-pub mod filter;
-pub mod map;
-pub mod merger;
-pub mod prefix;
-pub mod range;
-
-use dupmerge::MergeIter;
-use dupsort::DupsortIter;
-use filter::FilterIter;
-use map::MapIter;
-
-pub trait Iter {
- type Item;
-
- fn seek(&mut self, key: &[u8]);
-
- fn next(&mut self) -> Option<Self::Item>;
-
- fn seek_to(mut self, key: &[u8]) -> Self
- where
- Self: Sized,
- {
- self.seek(key);
- self
- }
-
- fn dup_merge<F>(self, merge_func: F) -> MergeIter<Self, F>
- where
- Self::Item: PartialEq + Clone,
- F: FnMut(&mut Self::Item, &Self::Item),
- Self: Sized,
- {
- MergeIter::new(self, merge_func)
- }
-
- fn dup_sort<F>(self, dupsort_func: F) -> DupsortIter<Self, F>
- where
- Self: Sized,
- F: FnMut(&Self::Item, &Self::Item) -> Ordering,
- {
- DupsortIter::new(self, dupsort_func)
- }
-
- fn filter<F>(self, filter_func: F) -> FilterIter<Self, F>
- where
- F: FnMut(&Self::Item, &mut Vec<u8>) -> bool,
- Self: Sized,
- {
- FilterIter::new(self, filter_func)
- }
-
- fn map<F, O>(self, map_func: F) -> MapIter<Self, F, O>
- where
- F: FnMut(Self::Item) -> O,
- Self: Sized,
- {
- MapIter::new(self, map_func)
- }
-}
-
-impl<I: Iter + ?Sized> Iter for Box<I> {
- type Item = I::Item;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.as_mut().next()
- }
-
- fn seek(&mut self, key: &[u8]) {
- self.as_mut().seek(key);
- }
-}
-
-pub struct BoxedIter<'a, T>(pub Box<dyn Iter<Item = T> + 'a>);
-impl<'a, T> Iter for BoxedIter<'a, T> {
- type Item = T;
- fn next(&mut self) -> Option<Self::Item> {
- self.0.as_mut().next()
- }
- fn seek(&mut self, key: &[u8]) {
- self.0.as_mut().seek(key)
- }
-}
-
-impl<'a, T> IntoIterator for BoxedIter<'a, T> {
- type Item = T;
- type IntoIter = IntoIter<Self>;
- fn into_iter(self) -> Self::IntoIter {
- IntoIter(self)
- }
-}
-
-pub struct IntoIter<I: Iter>(pub I);
-
-impl<I> Iterator for IntoIter<I>
-where
- I: Iter,
-{
- type Item = I::Item;
- fn next(&mut self) -> Option<Self::Item> {
- self.0.next()
- }
-}
+++ /dev/null
-use crate::iter::{IntoIter, Iter};
-
-pub struct PrefixIter<I>
-where
- I: Iter,
- I::Item: AsRef<[u8]>,
-{
- iter: I,
- prefix: Vec<u8>,
-}
-
-impl<I> PrefixIter<I>
-where
- I: Iter,
- I::Item: AsRef<[u8]>,
-{
- pub fn new(mut iter: I, prefix: impl AsRef<[u8]>) -> Self {
- iter.seek(prefix.as_ref());
- Self {
- iter,
- prefix: Vec::from(prefix.as_ref()),
- }
- }
-}
-
-impl<I> Iter for PrefixIter<I>
-where
- I: Iter,
- I::Item: AsRef<[u8]>,
-{
- type Item = I::Item;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.iter
- .next()
- .filter(|e| e.as_ref().starts_with(self.prefix.as_slice()))
- }
-
- fn seek(&mut self, key: &[u8]) {
- self.iter.seek(key);
- }
-}
-
-impl<I> IntoIterator for PrefixIter<I>
-where
- I: Iter,
- I::Item: AsRef<[u8]>,
-{
- type Item = I::Item;
- type IntoIter = IntoIter<Self>;
-
- fn into_iter(self) -> Self::IntoIter {
- IntoIter(self)
- }
-}
+++ /dev/null
-use crate::iter::{IntoIter, Iter};
-
-pub struct RangeIter<I> {
- iter: I,
- start: Vec<u8>,
- end: Vec<u8>,
-}
-
-impl<I> RangeIter<I>
-where
- I: Iter,
-{
- pub fn new(mut iter: I, start: impl AsRef<[u8]>, end: impl AsRef<[u8]>) -> Self {
- iter.seek(start.as_ref());
- Self {
- iter,
- start: Vec::from(start.as_ref()),
- end: Vec::from(end.as_ref()),
- }
- }
-}
-
-impl<I> Iter for RangeIter<I>
-where
- I: Iter,
- I::Item: PartialOrd<[u8]>,
-{
- type Item = I::Item;
- fn next(&mut self) -> Option<Self::Item> {
- self.iter.next().filter(|i| i <= self.end.as_slice())
- }
-
- fn seek(&mut self, key: &[u8]) {
- if key <= self.start.as_slice() {
- self.iter.seek(self.start.as_slice());
- } else if key > self.end.as_slice() {
- self.iter.seek(self.end.as_slice());
- } else {
- self.iter.seek(key);
- }
- }
-}
-
-impl<I> IntoIterator for RangeIter<I>
-where
- I: Iter,
- I::Item: PartialOrd<[u8]>,
-{
- type Item = I::Item;
- type IntoIter = IntoIter<Self>;
- fn into_iter(self) -> Self::IntoIter {
- IntoIter(self)
- }
-}
mod compression;
mod cursor;
-mod entry;
-mod iter;
-mod merger;
+mod fileset;
+mod metadata;
pub mod reader;
pub mod sorter;
pub mod source;
mod writer;
pub use compression::Compression;
-pub use entry::Entry;
+pub use cursor::Cursor;
pub use fileset::Fileset;
-pub use iter::Iter;
-pub use merger::Merger;
pub use reader::Reader;
+pub use sorter::Sorter;
+pub use source::Merger;
pub use source::Source;
pub use writer::Writer;
-mod fileset;
-mod metadata;
-
type Error = Box<dyn std::error::Error>;
type Result<T> = std::result::Result<T, Error>;
+++ /dev/null
-use crate::iter::{
- merger::{MergeIter, Mergeable},
- Iter,
-};
-use crate::Source;
-
-pub struct Merger<S> {
- sources: Vec<S>,
-}
-
-impl<S> Default for Merger<S> {
- fn default() -> Self {
- Self {
- sources: Vec::new(),
- }
- }
-}
-
-impl<S> Merger<S> {
- pub fn new() -> Self {
- Self {
- sources: Vec::new(),
- }
- }
-
- pub fn add(&mut self, source: S) {
- self.sources.push(source);
- }
-}
-
-impl<S, I> From<I> for Merger<S>
-where
- I: IntoIterator<Item = S>,
- S: Source,
- S::Item: Mergeable,
-{
- fn from(i: I) -> Self {
- Merger {
- sources: Vec::from_iter(i),
- }
- }
-}
-
-impl<S> Source for Merger<S>
-where
- S: Source,
- S::Item: Mergeable,
-{
- type Item = S::Item;
- fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
- MergeIter::from(self.sources.iter().map(|s| s.iter()))
- }
-}
-
-#[cfg(test)]
-mod test {
- use super::Merger;
- use crate::source::test_source::TestSource;
- use crate::Entry;
- use crate::{Iter, Source};
-
- fn tnum(m: u8) -> Vec<u8> {
- Vec::from_iter((1u8..255).filter(|i| i % m == 0))
- }
-
- fn test_source(m: u8) -> TestSource {
- TestSource(Vec::from_iter(
- tnum(m).into_iter().map(|n| Entry::new(vec![n], vec![0])),
- ))
- }
-
- #[test]
- fn test_merge() {
- let range = 1..8;
- let iters: Vec<_> = range
- .clone()
- .map(test_source)
- .map(|s| s.into_boxed())
- .collect();
- let s = Merger::from(iters);
- let mut v = Vec::<u8>::new();
- for i in range {
- v.extend(tnum(i))
- }
- v.sort();
- let v2 = Vec::from_iter(s.iter().map(|e| e.key()[0]));
- assert_eq!(v2, v);
- }
-
- #[test]
- fn test_binheap() {
- use std::collections::BinaryHeap;
-
- let v: Vec<u8> = vec![1, 8, 2, 9, 4, 7, 3];
- let vs = v.as_slice();
- let h = BinaryHeap::from_iter(vs.iter().copied());
- assert_ne!(h.into_vec(), v);
- }
-}
-use crate::iter::{IntoIter, Iter};
-use crate::{Entry, Result};
+use crate::cursor::Cursor;
+use crate::Result;
use integer_encoding::VarInt;
use std::mem::size_of;
+use std::sync::Arc;
#[derive(Debug)]
enum RestartType {
}
}
-impl<D: AsRef<[u8]>> From<Block<D>> for BlockIter<D> {
- fn from(b: Block<D>) -> BlockIter<D> {
- BlockIter {
- block: b,
- cur_ent: None,
- off: 0,
- }
- }
-}
-
-impl<D: AsRef<[u8]>> IntoIterator for Block<D> {
- type Item = Entry;
- type IntoIter = IntoIter<BlockIter<D>>;
-
- fn into_iter(self) -> Self::IntoIter {
- IntoIter(BlockIter {
- block: self,
- cur_ent: None,
- off: 0,
- })
- }
-}
-
#[derive(Debug)]
-pub(crate) struct BlockIter<D: AsRef<[u8]>> {
+pub(crate) struct BlockCursor<D: AsRef<[u8]>> {
block: Block<D>,
- cur_ent: Option<Entry>,
+ cur_key: Option<Arc<Vec<u8>>>,
pub(super) off: usize,
+ pub(super) len_val: usize,
}
-fn get_bytes(b: &[u8], n: usize) -> Result<&[u8]> {
- if n > b.len() {
- return Err("too long".into());
- }
- Ok(&b[0..n])
-}
-
-impl<D: AsRef<[u8]>> BlockIter<D> {
- fn seek_restart(&mut self, ridx: usize) -> Option<&[u8]> {
- self.off = self.block.restart(ridx).ok()?;
- self.decode_restart_key()
- }
-
+impl<D: AsRef<[u8]>> BlockCursor<D> {
fn bsearch_restart(&mut self, key: &[u8], mut left: usize, mut right: usize) {
while left < right {
let mid = (left + right).div_ceil(2);
- self.seek_restart(mid)
+ self.get_restart_key(mid)
.map(|rk| {
if rk < key {
left = mid;
None
});
}
- self.seek_restart(left);
+ if let Ok(off) = self.block.restart(left) {
+ self.off = off;
+ self.decode();
+ }
}
- fn decode_restart_key(&self) -> Option<&[u8]> {
- let mut idx = self.off;
+ // returns key at restart `ridx` and its offset, or None on error.
+ fn get_restart_key(&self, ridx: usize) -> Option<&[u8]> {
let data = self.block.data.as_ref();
+ let mut idx = self.block.restart(ridx).ok()?;
let (shared_key, len) = usize::decode_var(&data[idx..])?;
debug_assert!(shared_key == 0);
idx += len;
let (unshared_key, len) = usize::decode_var(&data[idx..])?;
idx += len;
+ debug_assert!(unshared_key > 0);
let (_len_val, len) = usize::decode_var(&data[idx..])?;
idx += len;
Some(&data[idx..idx + unshared_key])
}
- fn decode(&mut self) -> Option<&Entry> {
- let mut idx = self.off;
- if idx >= self.block.restart_off {
- self.cur_ent.take();
+ fn decode(&mut self) -> Option<&Arc<Vec<u8>>> {
+ if self.off >= self.block.restart_off {
return None;
}
let data = self.block.data.as_ref();
- let entry = self.cur_ent.get_or_insert(Entry::new([], []));
-
- let (shared_key, len) = usize::decode_var(&data[idx..])?;
- idx += len;
- let (unshared_key, len) = usize::decode_var(&data[idx..])?;
- idx += len;
- let (len_val, len) = usize::decode_var(&data[idx..])?;
- idx += len;
- if shared_key > entry.key().len() {
- // return Err("shared_key too long".into());
- return None;
+ let (shared_key, len) = usize::decode_var(data.get(self.off..)?)?;
+ self.off += len;
+ let (unshared_key, len) = usize::decode_var(data.get(self.off..)?)?;
+ self.off += len;
+ let (len_val, len) = usize::decode_var(data.get(self.off..)?)?;
+ self.off += len;
+ debug_assert!(shared_key + unshared_key > 0);
+
+ match self.cur_key {
+ None => {
+ if shared_key != 0 {
+ return None;
+ }
+ self.cur_key.replace(Arc::new(Vec::from(
+ data.get(self.off..self.off + unshared_key)?,
+ )));
+ }
+ Some(ref mut ak) => {
+ let key = Arc::make_mut(ak);
+ key.truncate(shared_key);
+ key.extend_from_slice(data.get(self.off..self.off + unshared_key)?);
+ }
}
+ self.off += unshared_key + len_val;
+ self.len_val = len_val;
+ self.cur_key.as_ref()
+ }
+}
- let key = entry.key_mut();
- key.truncate(shared_key);
- key.extend_from_slice(get_bytes(&data[idx..], unshared_key).ok()?);
- idx += unshared_key;
-
- let val = entry.value_mut();
- val.clear();
- val.extend_from_slice(get_bytes(&data[idx..], len_val).ok()?);
- idx += len_val;
- self.off = idx;
- self.cur_ent.as_ref()
+impl<D: AsRef<[u8]>> From<Block<D>> for BlockCursor<D> {
+ fn from(block: Block<D>) -> Self {
+ Self {
+ block,
+ cur_key: None,
+ off: 0,
+ len_val: 0,
+ }
}
}
-impl<D: AsRef<[u8]>> Iter for BlockIter<D> {
- type Item = Entry;
+impl<D: AsRef<[u8]>> Cursor for BlockCursor<D> {
+ type Value = [u8];
- fn next(&mut self) -> Option<Self::Item> {
- self.decode().cloned()
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ let key = self.cur_key.as_ref()?.as_slice();
+ let data = self.block.data.as_ref();
+ let val = data.get(self.off - self.len_val..self.off)?;
+ Some((key, val))
+ }
+
+ fn advance(&mut self) {
+ if self.decode().is_none() {
+ self.cur_key.take();
+ }
}
fn seek(&mut self, key: &[u8]) {
self.bsearch_restart(key, 0, self.block.restart_count - 1);
}
loop {
- let poff = self.off;
match self.decode() {
None => break,
- Some(e) => {
- if e.key() >= key {
- self.off = poff;
+ Some(k) => {
+ if k.as_slice() >= key {
return;
}
}
}
}
+pub struct BlockIter<D: AsRef<[u8]>> {
+ cur: BlockCursor<D>,
+}
+
+impl<D: AsRef<[u8]>> Iterator for BlockIter<D> {
+ type Item = (Arc<Vec<u8>>, Vec<u8>);
+ fn next(&mut self) -> Option<Self::Item> {
+ let key = self.cur.cur_key.as_ref().map(Arc::clone)?;
+ let res = self.cur.get().map(|(_, v)| (key, Vec::from(v)));
+ if res.is_some() {
+ self.cur.advance();
+ }
+ res
+ }
+}
+
+impl<D: AsRef<[u8]>> IntoIterator for BlockCursor<D> {
+ type Item = (Arc<Vec<u8>>, Vec<u8>);
+ type IntoIter = BlockIter<D>;
+ fn into_iter(mut self) -> Self::IntoIter {
+ if self.get().is_none() {
+ self.advance();
+ }
+ Self::IntoIter { cur: self }
+ }
+}
+
#[cfg(test)]
mod test {
- use crate::reader::block::{Block, BlockIter};
+ use crate::cursor::Cursor;
+ use crate::reader::block::{Block, BlockCursor};
use crate::writer::block_builder::BlockBuilder;
- use crate::Entry;
- use crate::Iter;
+ use std::sync::Arc;
fn build_block(n: u32, skip: u32, r: usize) -> Block<Vec<u8>> {
let mut bb = BlockBuilder::default();
Block::new(v).unwrap()
}
- fn build_ref(n: u32, skip: u32) -> Vec<Entry> {
- Vec::from_iter(
- (0..n)
- .map(|i| i * skip)
- .map(|i| Entry::new(u32::to_be_bytes(i), u32::to_be_bytes(i * 1024))),
- )
+ fn build_ref(n: u32, skip: u32) -> Vec<(Arc<Vec<u8>>, Vec<u8>)> {
+ Vec::from_iter((0..n).map(|i| i * skip).map(|i| {
+ (
+ Arc::new(Vec::from(u32::to_be_bytes(i))),
+ u32::to_be_bytes(i * 1024).into(),
+ )
+ }))
}
#[test]
fn test_block_iter() {
let n = 40;
let b = build_block(n, 1, 10);
- let bi = b.into_iter();
+ let bi = BlockCursor::from(b).into_iter();
assert_eq!(
- bi.map(|e| Vec::from(e.key())).collect::<Vec<_>>(),
+ bi.map(|(k, _)| Vec::from(k.as_slice())).collect::<Vec<_>>(),
build_ref(n, 1)
.into_iter()
- .map(|e| Vec::from(e.key()))
+ .map(|(k, _)| Vec::from(k.as_slice()))
.collect::<Vec<_>>()
);
}
fn test_block_seek() {
let n = 40;
let b = build_block(n, 10, 10);
- let mut bi = BlockIter::from(b);
+ let mut bi = BlockCursor::from(b);
bi.seek(&u32::to_be_bytes(40));
- assert_eq!(bi.next().unwrap().key(), &u32::to_be_bytes(40));
+ assert_eq!(bi.get().unwrap().0, &u32::to_be_bytes(40));
bi.seek(&u32::to_be_bytes(32));
- assert_eq!(bi.next().unwrap().key(), &u32::to_be_bytes(40));
+ assert_eq!(bi.get().unwrap().0, &u32::to_be_bytes(40));
}
}
+use crate::cursor::{Cursor, KVIter};
use crate::metadata::Metadata;
use crate::Source;
-use crate::{iter::IntoIter, Entry, Iter};
use integer_encoding::VarInt;
pub(crate) mod block;
use crate::compression::CBuf;
use memmap::{Mmap, MmapOptions};
use std::fs::File;
-use std::io::Cursor;
use std::path::Path;
use std::sync::Arc;
impl<D: AsRef<[u8]>> Clone for DataSlice<D> {
fn clone(&self) -> Self {
Self {
- data: self.data.clone(),
+ data: Arc::clone(&self.data),
off: self.off,
len: self.len,
}
impl<D: AsRef<[u8]>> Reader<D> {
pub fn new(d: D) -> Self {
- let cur = Cursor::new(&d.as_ref()[d.as_ref().len() - 512..]);
+ let cur = std::io::Cursor::new(&d.as_ref()[d.as_ref().len() - 512..]);
let metadata = Metadata::read_from(cur).expect("bad meta");
Self {
data: DataSlice::new(d),
}
}
- fn index_iter(&self) -> block::BlockIter<CBuf<DataSlice<D>>> {
+ fn index_iter(&self) -> block::BlockCursor<CBuf<DataSlice<D>>> {
let mut off = self.metadata.index_block_offset;
let d = &self.data.as_ref()[off..];
let (size, len_size) = usize::decode_var(d).unwrap();
}
}
-pub struct ReaderIter<D: AsRef<[u8]>> {
+pub struct ReaderCursor<D: AsRef<[u8]>> {
reader: Reader<D>,
next_offset: usize,
- index_iter: block::BlockIter<CBuf<DataSlice<D>>>,
- data_iter: Option<block::BlockIter<CBuf<DataSlice<D>>>>,
+ index_iter: block::BlockCursor<CBuf<DataSlice<D>>>,
+ data_iter: Option<block::BlockCursor<CBuf<DataSlice<D>>>>,
}
-impl<D: AsRef<[u8]>> ReaderIter<D> {
- fn next_block(&mut self) -> Option<()> {
- if self.next_offset >= self.reader.metadata.index_block_offset {
- return None;
- }
- let (size, len_size) = usize::decode_var(&self.reader.data.as_ref()[self.next_offset..])
- .expect("bad block size");
- let crc_off = self.next_offset + len_size;
- // TODO: read crc, optionally verify
- let data_off = crc_off + std::mem::size_of::<u32>();
+impl<D: AsRef<[u8]>> ReaderCursor<D> {
+ fn get_block(&self, mut off: usize) -> Option<(block::BlockCursor<CBuf<DataSlice<D>>>, usize)> {
+ let (size, len_size) = usize::decode_var(self.reader.data.as_ref().get(off..)?)?;
+ // TODO: read, verify CRC
+ off += len_size + std::mem::size_of::<u32>();
let comp = self.reader.metadata.compression_algorithm;
- self.next_offset = data_off + size;
- self.data_iter.replace(
- block::Block::new(comp.uncompress(self.reader.data.clone_range(data_off, size))?)
- .expect("bad block")
- .into(),
- );
- Some(())
+ let data = comp
+ .uncompress(self.reader.data.clone_range(off, size))
+ .ok()?;
+ let block = block::Block::new(data).ok()?;
+ Some((block.into(), off + size))
}
}
-impl<D: AsRef<[u8]>> Iter for ReaderIter<D> {
- type Item = Entry;
+impl<D: AsRef<[u8]>> Cursor for ReaderCursor<D> {
+ type Value = [u8];
+
+ fn get(&self) -> Option<(&[u8], &Self::Value)> {
+ match self.data_iter {
+ Some(ref cur) => cur.get(),
+ None => None,
+ }
+ }
- fn next(&mut self) -> Option<Self::Item> {
- if self.data_iter.is_none() {
- self.next_block()
- .and_then(|_| self.data_iter.as_mut().unwrap().next())
- } else {
- match self.data_iter.as_mut().unwrap().next() {
- Some(e) => Some(e),
+ fn advance(&mut self) {
+ loop {
+ match &mut self.data_iter {
None => {
- self.next_block()?;
- self.data_iter.as_mut().unwrap().next()
+ if self.next_offset >= self.reader.metadata.index_block_offset {
+ return;
+ }
+ match self.get_block(self.next_offset) {
+ None => return,
+ Some((cur, next_off)) => {
+ self.data_iter.replace(cur);
+ self.next_offset = next_off;
+ }
+ }
+ }
+ Some(cur) => {
+ cur.advance();
+ match cur.get() {
+ Some(_) => return,
+ None => {
+ self.data_iter.take();
+ }
+ }
}
}
}
}
fn seek(&mut self, key: &[u8]) {
- // TODO: detect and skip unneeded seek in iter.
- self.index_iter.seek(key);
+ if let Some(cur) = self.data_iter.as_mut() {
+ if let Some((cur_key, _)) = cur.get() {
+ if key >= cur_key {
+ cur.seek(key);
+ if cur.get().is_some() {
+ return;
+ }
+ }
+ }
+ }
- self.index_iter
- .next()
- .and_then(|e| {
- self.next_offset = usize::decode_var(e.value())?.0;
- self.next_block().map(|_| {
- self.data_iter.as_mut().unwrap().seek(key);
- })
- })
- .or_else(|| {
- self.next_offset = usize::MAX;
- self.data_iter.take();
- None
- });
+ self.index_iter.seek(key);
+ self.data_iter.take();
+ if let Some((_, o)) = self.index_iter.get() {
+ if let Some((off, _)) = usize::decode_var(o) {
+ if let Some((mut cur, next)) = self.get_block(off) {
+ cur.seek(key);
+ self.data_iter.replace(cur);
+ self.next_offset = next;
+ }
+ }
+ }
}
}
+/*
impl<D: AsRef<[u8]>> IntoIterator for ReaderIter<D> {
- type Item = Entry;
+ type Item = crate::Result<Entry>;
type IntoIter = IntoIter<Self>;
fn into_iter(self) -> Self::IntoIter {
IntoIter(self)
}
}
+*/
+
+impl<D: AsRef<[u8]>> IntoIterator for ReaderCursor<D> {
+ type Item = (Vec<u8>, Vec<u8>);
+ type IntoIter = KVIter<Self>;
+ fn into_iter(self) -> Self::IntoIter {
+ ReaderCursor::to_iter(self)
+ }
+}
-impl<D: AsRef<[u8]>> Source for Reader<D> {
- type Item = Entry;
+impl<'a, D: AsRef<[u8]>> Source<'a> for Reader<D> {
+ type Cur = ReaderCursor<D>;
- fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
- ReaderIter {
+ fn iter(&'a self) -> ReaderCursor<D> {
+ ReaderCursor {
reader: self.clone(),
next_offset: 0,
index_iter: self.index_iter(),
-use crate::source::DynSource;
-use crate::{Entry, Iter, Merger, Reader, Source, Writer};
+use crate::{
+ cursor::{Cursor, VecCursor},
+ Merger, Reader, Source, Writer,
+};
use memmap::Mmap;
use std::cell::Cell;
-pub struct Sorter<F: Fn(&mut Entry, &Entry)> {
- batch: Cell<Vec<Entry>>,
+pub struct Sorter<F: Fn(&[u8], &[u8], &mut Vec<u8>)> {
+ batch: Cell<Vec<(Vec<u8>, Vec<u8>)>>,
batch_size: usize,
max_size: usize,
merge_func: F,
impl<F> Sorter<F>
where
- F: Fn(&mut Entry, &Entry) + 'static,
+ F: Fn(&[u8], &[u8], &mut Vec<u8>) + 'static,
{
pub fn new(max_size: usize, merge_func: F) -> Self {
Self {
}
}
- pub fn add(&mut self, e: Entry) {
- let esize = e.key().len() + e.value().len();
+ pub fn add(&mut self, key: &[u8], val: &[u8]) {
+ let esize = key.len() + val.len();
if esize + self.batch_size > self.max_size {
self.write_chunk();
}
- self.batch.get_mut().push(e);
+ self.batch.get_mut().push((Vec::from(key), Vec::from(val)));
self.batch_size += esize;
}
- pub fn source(mut self) -> Box<dyn DynSource<Item = Entry>> {
- if !self.batch.get_mut().is_empty() {
- self.write_chunk();
- }
+ pub fn write<W: std::io::Write>(self, mut w: Writer<W>) {
Merger::from(self.readers)
.dup_merge(self.merge_func)
- .into_boxed()
- }
-
- pub fn write<W: std::io::Write>(self, mut w: Writer<W>) {
- self.source()
- .as_ref() // XXX - need to further wrap Box<dyn DynSource> in BoxedSource
.iter()
.into_iter()
- .for_each(|e| w.add(e).unwrap());
+ .for_each(|(k, v)| w.add(k.as_slice(), v.as_slice()).unwrap());
}
fn write_chunk(&mut self) {
let mut w = Writer::new(Vec::new());
- self.batch
- .get_mut()
- .sort_unstable_by(|a, b| a.key().cmp(b.key()));
- self.batch
- .take()
- .iter()
- .dup_merge(&self.merge_func)
- .into_iter()
- .for_each(|e| {
- w.add(e).unwrap();
- });
+ self.batch.get_mut().sort_unstable_by(|a, b| a.0.cmp(&b.0));
+ for (k, v) in VecCursor::from(self.batch.take()).dup_merge(&self.merge_func) {
+ w.add(k.as_slice(), v.as_slice()).unwrap();
+ }
}
}
-use crate::{Iter, Source};
-use std::cmp::Ordering;
+use crate::cursor::filter::Filter;
+use crate::cursor::map::Map;
+use crate::cursor::merge::Merge;
+use crate::{cursor::Cursor, Source};
-pub struct DupmergeSource<S, F> {
+pub struct MergeSource<S, F> {
pub(super) source: S,
pub(super) merge: F,
}
-impl<S, F> Source for DupmergeSource<S, F>
+impl<'a, S, F> Source<'a> for MergeSource<S, F>
where
- S: Source,
- S::Item: PartialEq + Clone,
- F: Fn(&mut S::Item, &S::Item),
+ S: Source<'a>,
+ <S::Cur as Cursor>::Value: ToOwned,
+ F: Fn(&[u8], &<S::Cur as Cursor>::Value, &mut <<S::Cur as Cursor>::Value as ToOwned>::Owned)
+ + 'a,
{
- type Item = S::Item;
- fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
+ type Cur = Merge<S::Cur, &'a F>;
+ fn iter(&'a self) -> Merge<S::Cur, &'a F> {
self.source.iter().dup_merge(&self.merge)
}
}
-pub struct DupsortSource<S, F> {
- pub(super) source: S,
- pub(super) dupsort: F,
-}
-
-impl<S, F> Source for DupsortSource<S, F>
-where
- S: Source,
- S::Item: PartialEq,
- F: Fn(&S::Item, &S::Item) -> Ordering,
-{
- type Item = S::Item;
- fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
- self.source.iter().dup_sort(&self.dupsort)
- }
-}
-
pub struct FilterSource<S, F> {
pub(super) source: S,
pub(super) filter: F,
}
-impl<S, F> Source for FilterSource<S, F>
+impl<'a, S, F> Source<'a> for FilterSource<S, F>
where
- S: Source,
- F: Fn(&S::Item, &mut Vec<u8>) -> bool,
+ S: Source<'a>,
+ <S::Cur as Cursor>::Value: Clone + std::fmt::Debug,
+ F: Fn((&[u8], &<S::Cur as Cursor>::Value), &mut Vec<u8>) -> bool + 'a,
{
- type Item = S::Item;
- fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
+ type Cur = Filter<S::Cur, &'a F>;
+ fn iter(&'a self) -> Self::Cur {
self.source.iter().filter(&self.filter)
}
}
-pub struct MapSource<S, F, O>
-where
- S: Source,
- F: Fn(S::Item) -> O,
-{
+pub struct MapSource<S, F> {
pub(super) source: S,
pub(super) map: F,
}
-impl<S, F, O> Source for MapSource<S, F, O>
+impl<'a, S, F, O> Source<'a> for MapSource<S, F>
where
- S: Source,
- F: Fn(S::Item) -> O,
+ S: Source<'a>,
+ F: Fn((&[u8], &<S::Cur as Cursor>::Value)) -> O + 'a,
{
- type Item = O;
- fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
+ type Cur = Map<S::Cur, &'a F, O>;
+ fn iter(&'a self) -> Self::Cur {
self.source.iter().map(&self.map)
}
}
--- /dev/null
+use super::Source;
+use crate::cursor::{merger::KeyOrd, merger::MergeCursor, Cursor};
+
+pub struct Merger<S> {
+ sources: Vec<S>,
+}
+
+impl<I, S> From<I> for Merger<S>
+where
+ I: IntoIterator<Item = S>,
+{
+ fn from(iter: I) -> Self {
+ Self {
+ sources: Vec::from_iter(iter),
+ }
+ }
+}
+
+impl<'a, S> Source<'a> for Merger<S>
+where
+ S: Source<'a>,
+ //S::Cur: Ord,
+ <S::Cur as Cursor>::Value: ToOwned,
+{
+ type Cur = MergeCursor<KeyOrd<S::Cur>>;
+ fn iter(&'a self) -> Self::Cur {
+ MergeCursor::from(self.sources.iter().map(|s| KeyOrd(s.iter())))
+ }
+}
-use crate::iter::{prefix::PrefixIter, range::RangeIter, BoxedIter, IntoIter, Iter};
-use crate::Entry;
-use std::cmp::Ordering;
+use crate::cursor::{range::PrefixCursor, range::RangeCursor, Cursor};
mod adapters;
-pub use adapters::DupmergeSource;
-pub use adapters::DupsortSource;
pub use adapters::FilterSource;
pub use adapters::MapSource;
+pub use adapters::MergeSource;
+mod merger;
+pub use merger::Merger;
-pub trait Source {
- type Item;
+pub trait Source<'a> {
+ type Cur: Cursor + IntoIterator;
- fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item>;
+ fn iter(&'a self) -> Self::Cur;
- fn get(&self, key: &[u8]) -> RangeIter<impl Iter<Item = Self::Item>>
- where
- Self::Item: PartialOrd<[u8]>,
- {
- RangeIter::new(self.iter(), key, key)
+ fn get(&'a self, key: &[u8]) -> RangeCursor<Self::Cur> {
+ RangeCursor::new(self.iter(), key, key)
}
- fn get_prefix(&self, prefix: &[u8]) -> PrefixIter<impl Iter<Item = Self::Item>>
- where
- Self::Item: AsRef<[u8]>,
- {
- PrefixIter::new(self.iter(), prefix)
+ fn get_prefix(&'a self, prefix: &[u8]) -> PrefixCursor<Self::Cur> {
+ PrefixCursor::new(self.iter(), prefix)
}
- fn get_range(&self, start: &[u8], end: &[u8]) -> RangeIter<impl Iter<Item = Self::Item>>
- where
- Self::Item: PartialOrd<[u8]>,
- {
- RangeIter::new(self.iter(), start, end)
+ fn get_range(&'a self, start: &[u8], end: &[u8]) -> RangeCursor<Self::Cur> {
+ RangeCursor::new(self.iter(), start, end)
}
- fn dup_merge<F>(self, merge: F) -> DupmergeSource<Self, F>
+ fn dup_merge<F>(self, merge: F) -> MergeSource<Self, F>
where
Self: Sized,
- F: Fn(&mut Entry, &Entry) + 'static,
- {
- DupmergeSource {
+ <Self::Cur as Cursor>::Value: ToOwned,
+ F: Fn(
+ &[u8],
+ &<Self::Cur as Cursor>::Value,
+ &mut <<Self::Cur as Cursor>::Value as ToOwned>::Owned,
+ ) + 'static,
+ {
+ MergeSource {
source: self,
merge,
}
}
- fn dupsort<F>(self, dupsort: F) -> DupsortSource<Self, F>
- where
- Self: Sized,
- F: Fn(&Self::Item, &Self::Item) -> Ordering + 'static,
- {
- DupsortSource {
- source: self,
- dupsort,
- }
- }
-
fn filter<F>(self, filter: F) -> FilterSource<Self, F>
where
Self: Sized,
- F: Fn(&Self::Item, &mut Vec<u8>) -> bool + 'static,
+ F: Fn(&<Self::Cur as Cursor>::Value, &mut Vec<u8>) -> bool + 'static,
{
FilterSource {
source: self,
}
}
- fn map<F, O>(self, map: F) -> MapSource<Self, F, O>
+ fn map<F, O>(self, map: F) -> MapSource<Self, F>
where
Self: Sized,
- F: Fn(Self::Item) -> O + 'static,
+ F: Fn((&[u8], &<Self::Cur as Cursor>::Value)) -> O + 'static,
{
MapSource { source: self, map }
}
-
- fn into_boxed(self) -> Box<dyn DynSource<Item = Self::Item>>
- where
- Self: Sized + 'static,
- {
- // Inner Box satisfied DynSource, outer Box required for an owned DynSource trait object.
- Box::new(Box::new(self))
- }
-}
-
-// A dyn-compatible variant of Source generating boxed SeekableIters. Necessary to create
-// heterogeneous collections of sources.
-pub trait DynSource {
- type Item;
-
- fn iter(&self) -> BoxedIter<'_, Self::Item>;
-}
-
-impl<S: Source + ?Sized> DynSource for Box<S> {
- type Item = S::Item;
-
- fn iter(&self) -> BoxedIter<'_, Self::Item> {
- BoxedIter(Box::new(self.as_ref().iter()))
- }
-}
-
-impl<D: DynSource + ?Sized> Source for D {
- type Item = D::Item;
- fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
- DynSource::iter(self)
- }
-}
-
-pub struct VecIter<'a> {
- index: usize,
- vec: &'a Vec<Entry>,
-}
-
-impl Iter for VecIter<'_> {
- type Item = Entry;
- fn next(&mut self) -> Option<Self::Item> {
- if self.index > self.vec.len() {
- None
- } else {
- let e = self.vec[self.index].clone();
- self.index += 1;
- Some(e)
- }
- }
-
- fn seek(&mut self, key: &[u8]) {
- let mut left = 0;
- let mut right = self.vec.len() - 1;
- while left < right {
- let mid = (left + right).div_ceil(2);
- if self.vec[mid].key() < key {
- left = mid;
- } else {
- right = mid - 1;
- }
- }
- self.index = left;
- }
-}
-
-impl<'a> IntoIterator for VecIter<'a> {
- type Item = Entry;
- type IntoIter = IntoIter<Self>;
- fn into_iter(self) -> Self::IntoIter {
- IntoIter(self)
- }
-}
-
-impl Source for Vec<Entry> {
- type Item = Entry;
- fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
- VecIter {
- index: 0,
- vec: self,
- }
- }
-}
-
-pub mod test_source {
- use crate::Entry;
- use crate::Iter;
- use crate::Source;
-
- pub struct TestSource(pub Vec<Entry>);
-
- pub struct TestIter<'a> {
- source: &'a TestSource,
- off: usize,
- }
-
- impl Iter for TestIter<'_> {
- type Item = Entry;
-
- fn next(&mut self) -> Option<Self::Item> {
- let off = self.off;
- if off >= self.source.0.len() {
- return None;
- }
- let item = &self.source.0[off];
- self.off += 1;
- Some(item.clone())
- }
-
- fn seek(&mut self, key: &[u8]) {
- self.off = 0;
- while self.off < self.source.0.len() && self.source.0[self.off].key() < key {
- self.off += 1;
- }
- }
- }
-
- impl IntoIterator for TestIter<'_> {
- type Item = Entry;
- type IntoIter = crate::iter::IntoIter<Self>;
- fn into_iter(self) -> Self::IntoIter {
- crate::iter::IntoIter(self)
- }
- }
-
- impl Source for TestSource {
- type Item = Entry;
- fn iter(&self) -> impl Iter<Item = Self::Item> + IntoIterator<Item = Self::Item> {
- TestIter {
- source: self,
- off: 0,
- }
- }
- }
-}
-
-pub mod test {
- use super::test_source::TestSource;
- #[allow(unused_imports)]
- use super::Source;
- #[allow(unused_imports)]
- use crate::iter::Iter;
- use crate::Entry;
-
- #[allow(dead_code)]
- fn test_source() -> TestSource {
- TestSource(vec![
- Entry::new(vec![0, 0, 0, 0], vec![0]),
- Entry::new(vec![0, 0, 0, 1], vec![1]),
- Entry::new(vec![0, 0, 1, 0], vec![2]),
- Entry::new(vec![0, 1, 0, 0], vec![3]),
- Entry::new(vec![1, 0, 0, 0], vec![4]),
- ])
- }
-
- #[test]
- fn test_source_iter() {
- let s = &test_source();
-
- assert_eq!(
- Vec::from_iter(s.iter().map(|e| e.value()[0])),
- vec![0, 1, 2, 3, 4]
- );
- assert_eq!(
- Vec::from_iter(s.get(vec![0, 0, 1, 0].as_slice()).map(|e| e.value()[0])),
- vec![2]
- );
- assert_eq!(
- Vec::from_iter(s.get_prefix(vec![0, 0].as_slice()).map(|e| e.value()[0])),
- vec![0, 1, 2]
- );
- assert_eq!(
- Vec::from_iter(
- s.get_range(vec![0, 0, 0, 1].as_slice(), vec![0, 1, 0, 0].as_slice())
- .map(|e| e.value()[0])
- ),
- vec![1, 2, 3]
- );
- }
}
#[cfg(test)]
mod test {
use super::BlockBuilder;
- use crate::reader::block::Block;
+ use crate::reader::block::{Block, BlockCursor};
use crate::reader::DataSlice;
+ use std::sync::Arc;
#[test]
fn test_block_builder() {
let mut bb = BlockBuilder::default();
let v = Vec::from_iter((0..16).map(|i| {
(
- Vec::from(u32::to_be_bytes(i).as_slice()),
+ Arc::new(Vec::from(u32::to_be_bytes(i).as_slice())),
Vec::from(u32::to_be_bytes(i * 2).as_slice()),
)
}));
let block_len = bb.len();
let block_data = bb.as_slice();
assert_eq!(block_data.len(), block_len);
- let bi = Block::new(DataSlice::new(bb.as_slice()))
- .unwrap()
- .into_iter();
- let vcmp = Vec::from_iter(bi.map(|e| (Vec::from(e.key()), Vec::from(e.value()))));
+ let bc = BlockCursor::from(Block::new(DataSlice::new(bb.as_slice())).unwrap());
+ let bi = bc.into_iter();
+ let vcmp = Vec::from_iter(bi);
assert_eq!(v, vcmp);
}
}
use crate::compression::Compression;
-use crate::{Entry, Result};
+use crate::Result;
use crc32c::crc32c;
use integer_encoding::{FixedIntWriter, VarInt, VarIntWriter};
pub(crate) mod block_builder;
}
}
- pub fn add(&mut self, e: Entry) -> Result<()> {
- let est = e.key().len() + e.value().len() + 15;
+ pub fn add(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
+ let est = key.len() + val.len() + 15;
if self.block.len() + est >= self.blocksize {
- bytesep(&mut self.last_key, e.key());
+ bytesep(&mut self.last_key, key);
self.write_block()?;
}
- self.meta.add_entry(e.key().len(), e.value().len());
- self.block.add(e.key(), e.value());
+ self.meta.add_entry(key.len(), val.len());
+ self.block.add(key, val);
self.last_key.clear();
- self.last_key.extend_from_slice(e.key());
+ self.last_key.extend_from_slice(key);
Ok(())
}
#[cfg(test)]
mod test {
use super::Writer;
- use crate::Entry;
#[test]
fn test_writer() {
let mut out = Vec::<u8>::new();
{
let mut w = Writer::new(&mut out);
- w.add(Entry::new(vec![0], vec![1])).unwrap();
- w.add(Entry::new(vec![0, 0], vec![1])).unwrap();
- w.add(Entry::new(vec![0, 1], vec![1])).unwrap();
- w.add(Entry::new(vec![1, 1], vec![1])).unwrap();
+ w.add(vec![0].as_ref(), vec![1].as_ref()).unwrap();
+ w.add(vec![0, 0].as_ref(), vec![1].as_ref()).unwrap();
+ w.add(vec![0, 1].as_ref(), vec![1].as_ref()).unwrap();
+ w.add(vec![1, 1].as_ref(), vec![1].as_ref()).unwrap();
// drops w
}
assert!(out.len() > 512);
-use mtbl::{Entry, Reader, Source, Writer};
+use mtbl::{Reader, Source, Writer};
#[test]
fn test_write_readback() {
let mut store = Vec::<u8>::new();
- let mut reference = Vec::<Entry>::new();
+ let mut reference = Vec::<(Vec<u8>, Vec<u8>)>::new();
{
+ println!("writer starting");
let mut w = Writer::new(&mut store).blocksize(256);
for i in 1..1024 {
- let e = Entry::new(u32::to_be_bytes(i), u32::to_be_bytes(i * 1024));
- w.add(e.clone()).expect("add failed");
+ let e = (
+ Vec::from(u32::to_be_bytes(i)),
+ Vec::from(u32::to_be_bytes(i * 1024)),
+ );
+ w.add(e.0.as_ref(), e.1.as_ref()).expect("add failed");
reference.push(e);
}
+ println!("writer finished");
}
assert!(store.len() > 512);
+ println!("reader start");
let r = Reader::new(store);
let ri = r.iter();
assert_eq!(ri.into_iter().collect::<Vec<_>>(), reference);
+ println!("reader finish");
// test range
let start = u32::to_be_bytes(192);
rangei.into_iter().collect::<Vec<_>>(),
reference
.into_iter()
- .filter(|e| e.key() >= &u32::to_be_bytes(192) && e.key() <= &u32::to_be_bytes(256))
+ .filter(|(k, _)| k.as_slice() >= &u32::to_be_bytes(192)
+ && k.as_slice() <= &u32::to_be_bytes(256))
.collect::<Vec<_>>()
)
}