pub(crate) struct BlockIter<D: AsRef<[u8]>> {
block: Block<D>,
cur_ent: Option<Entry>,
- off: usize,
+ pub(super) off: usize,
}
fn get_bytes(b: &[u8], n: usize) -> Result<&[u8]> {
}
impl<D: AsRef<[u8]>> BlockIter<D> {
- fn seek_restart(&mut self, ridx: usize) -> Option<()> {
+ fn seek_restart(&mut self, ridx: usize) -> Option<&[u8]> {
self.off = self.block.restart(ridx).ok()?;
- self.decode()
+ self.decode_restart_key()
}
- fn decode(&mut self) -> Option<()> {
+ fn bsearch_restart(&mut self, key: &[u8], mut left: usize, mut right: usize) {
+ let mut counter = 0;
+ while left < right {
+ counter += 1;
+ assert!(counter < 10);
+ let mid = (left + right + 1) / 2;
+ self.seek_restart(mid)
+ .map(|rk| {
+ if rk < key {
+ left = mid;
+ } else {
+ right = mid - 1;
+ }
+ })
+ .or_else(|| {
+ left = right; // breaks loop
+ None
+ });
+ }
+ self.seek_restart(left);
+ }
+
+ fn decode_restart_key(&self) -> Option<&[u8]> {
+ let mut idx = self.off;
+ let data = self.block.data.as_ref();
+
+ let (shared_key, len) = usize::decode_var(&data[idx..])?;
+ debug_assert!(shared_key == 0);
+ idx += len;
+ let (unshared_key, len) = usize::decode_var(&data[idx..])?;
+ idx += len;
+ let (_len_val, len) = usize::decode_var(&data[idx..])?;
+ idx += len;
+ Some(&data[idx..idx + unshared_key])
+ }
+
+ fn decode(&mut self) -> Option<&Entry> {
let mut idx = self.off;
if idx >= self.block.restart_off {
self.cur_ent.take();
val.extend_from_slice(get_bytes(&data[idx..], len_val).ok()?);
idx += len_val;
self.off = idx;
- Some(())
+ self.cur_ent.as_ref()
}
}
type Item = Entry;
fn next(&mut self) -> Option<Self::Item> {
- if self.cur_ent.is_none() {
- self.decode()?;
- }
- let res = self.cur_ent.clone();
- _ = self.decode();
- res
+ self.decode().map(|e| e.clone())
}
}
impl<D: AsRef<[u8]>> Iter for BlockIter<D> {
fn seek(&mut self, key: &[u8]) {
// TODO: "galloping search"
- let mut left: usize = 0;
- let mut right: usize = self.block.restart_count;
- while left < right {
- let mid = left + (right - left) / 2;
- self.seek_restart(mid)
- .map(|()| {
- if self.cur_ent.as_ref().unwrap().key.as_slice() > key {
- right = mid;
- } else {
- left = mid + 1;
- }
- })
- .or_else(|| {
- left = right; // breaks loop
- Some(())
- });
- }
- if self.cur_ent.is_none() && self.decode().is_none() {
- // empty block?
- return;
+ if self.block.restart_count > 0 {
+ self.bsearch_restart(key, 0, self.block.restart_count - 1);
}
- while self.cur_ent.as_ref().unwrap().key.as_slice() < key {
- if self.decode().is_none() {
- return;
+ loop {
+ let poff = self.off;
+ match self.decode() {
+ None => break,
+ Some(e) => {
+ if e.key.as_slice() >= key {
+ self.off = poff;
+ return;
+ }
+ }
}
}
}
}
+
+#[cfg(test)]
+mod test {
+
+ use crate::reader::block::Block;
+ use crate::reader::DataSlice;
+ use crate::writer::block_builder::BlockBuilder;
+ use crate::Entry;
+ use crate::Iter;
+
+ fn build_block(n: u32, skip: u32, r: usize) -> Block<Vec<u8>> {
+ let mut bb = BlockBuilder::default();
+ bb.restart_interval = r;
+ for i in 0..n {
+ bb.add(
+ &u32::to_be_bytes(i * skip),
+ &u32::to_be_bytes(i * skip * 1024),
+ );
+ }
+ let mut v = Vec::new();
+ v.extend_from_slice(bb.as_slice());
+ Block::new(DataSlice::new(v)).unwrap()
+ }
+
+ fn build_ref(n: u32, skip: u32) -> Vec<Entry> {
+ Vec::from_iter(
+ (0..n)
+ .map(|i| i * skip)
+ .map(|i| Entry::new(&u32::to_be_bytes(i), &u32::to_be_bytes(i * 1024))),
+ )
+ }
+
+ #[test]
+ fn test_block_iter() {
+ let n = 40;
+ let b = build_block(n, 1, 10);
+ let bi = b.into_iter();
+ assert_eq!(
+ bi.map(|e| e.key).collect::<Vec<_>>(),
+ build_ref(n, 1)
+ .into_iter()
+ .map(|e| e.key)
+ .collect::<Vec<_>>()
+ );
+ }
+
+ #[test]
+ fn test_block_seek() {
+ let n = 40;
+ let b = build_block(n, 10, 10);
+ let mut bi = b.into_iter();
+ bi.seek(&u32::to_be_bytes(40));
+ assert_eq!(bi.next().unwrap().key.as_ref(), &u32::to_be_bytes(40));
+ }
+}
BlockBuilder {
prev_key: Vec::default(),
data: Vec::new(),
- restarts: Vec::new(),
+ restarts: vec![0usize],
count: 0,
restart_interval: 16,
finished: false,
}
fn restart_size(&self) -> usize {
- if self.data.len() > u32::MAX as usize {
+ if self.data.len() <= u32::MAX as usize {
size_of::<u32>()
} else {
size_of::<u64>()
match self.restart_size() {
4 => {
for b in self.restarts.iter().map(|r| u32::to_be_bytes(*r as u32)) {
- self.data.extend_from_slice(&b[..]);
+ self.data.extend_from_slice(&b);
}
}
8 => {
for b in self.restarts.iter().map(|r| u64::to_be_bytes(*r as u64)) {
- self.data.extend_from_slice(&b[..]);
+ self.data.extend_from_slice(&b);
}
}
_ => unreachable!(),
};
self.data
- .extend_from_slice(u32::to_be_bytes(num_restarts as u32).as_slice());
+ .extend_from_slice(&u32::to_be_bytes(num_restarts as u32));
self.data.as_slice()
}