Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions parquet/src/arrow/buffer/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
use std::ops::Range;

#[allow(unused)]
/// Counts the number of set bits in the provided range
pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
let unaligned = UnalignedBitChunk::new(bytes, range.start, range.end - range.start);
Expand Down
57 changes: 41 additions & 16 deletions parquet/src/arrow/record_reader/definition_levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use arrow_buffer::Buffer;
use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
use bytes::Bytes;

use crate::arrow::buffer::bit_util::count_set_bits;
use crate::basic::Encoding;
use crate::column::reader::decoder::{
ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
Expand Down Expand Up @@ -169,10 +168,7 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
(BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
assert_eq!(self.max_level, 1);

let start = nulls.len();
let levels_read = decoder.read(nulls, num_levels)?;

let values_read = count_set_bits(nulls.as_slice(), start..start + levels_read);
let (values_read, levels_read) = decoder.read(nulls, num_levels)?;
Ok((values_read, levels_read))
}
_ => unreachable!("inconsistent null mask"),
Expand Down Expand Up @@ -284,20 +280,31 @@ impl PackedDecoder {
self.data_offset = 0;
}

fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<usize> {
let mut read = 0;
while read != len {
/// Reads up to `len` definition levels directly into a boolean bitmask.
///
/// Returns a tuple of `(values_read, levels_read)`, where `values_read` counts the
/// number of `true` bits appended to `buffer`.
fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<(usize, usize)> {
let mut levels_read = 0;
let mut values_read = 0;
while levels_read != len {
if self.rle_left != 0 {
let to_read = self.rle_left.min(len - read);
let to_read = self.rle_left.min(len - levels_read);
buffer.append_n(to_read, self.rle_value);
self.rle_left -= to_read;
read += to_read;
if self.rle_value {
values_read += to_read;
}
levels_read += to_read;
} else if self.packed_count != self.packed_offset {
let to_read = (self.packed_count - self.packed_offset).min(len - read);
let to_read = (self.packed_count - self.packed_offset).min(len - levels_read);
let offset = self.data_offset * 8 + self.packed_offset;
buffer.append_packed_range(offset..offset + to_read, self.data.as_ref());
// Packed runs already encode bits densely; count the ones we just appended.
values_read +=
UnalignedBitChunk::new(self.data.as_ref(), offset, to_read).count_ones();
self.packed_offset += to_read;
read += to_read;
levels_read += to_read;

if self.packed_offset == self.packed_count {
self.data_offset += self.packed_count / 8;
Expand All @@ -308,7 +315,7 @@ impl PackedDecoder {
self.next_rle_block()?
}
}
Ok(read)
Ok((values_read, levels_read))
}

/// Skips `level_num` definition levels
Expand Down Expand Up @@ -360,10 +367,14 @@ mod tests {

let mut expected = BooleanBufferBuilder::new(len);
let mut encoder = RleEncoder::new(1, 1024);
let mut expected_value_count = 0;
for _ in 0..len {
let bool = rng.random_bool(0.8);
encoder.put(bool as u64);
expected.append(bool);
if bool {
expected_value_count += 1;
}
}
assert_eq!(expected.len(), len);

Expand All @@ -373,18 +384,27 @@ mod tests {

// Decode data in random length intervals
let mut decoded = BooleanBufferBuilder::new(len);
// Track how many `true` bits we appended to validate the returned counts.
let mut decoded_value_count = 0;
loop {
let remaining = len - decoded.len();
if remaining == 0 {
break;
}

let to_read = rng.random_range(1..=remaining);
decoder.read(&mut decoded, to_read).unwrap();
let offset = decoded.len();
let (values_read, levels_read) = decoder.read(&mut decoded, to_read).unwrap();
assert_eq!(levels_read, to_read);
decoded_value_count += values_read;
let expected_chunk_ones =
UnalignedBitChunk::new(expected.as_slice(), offset, levels_read).count_ones();
assert_eq!(values_read, expected_chunk_ones);
}

assert_eq!(decoded.len(), len);
assert_eq!(decoded.as_slice(), expected.as_slice());
assert_eq!(decoded_value_count, expected_value_count);
}

#[test]
Expand Down Expand Up @@ -428,18 +448,23 @@ mod tests {
skip_level += skip_level_num
} else {
let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
let read_level_num = decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
let (read_value_num, read_level_num) =
decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
read_level += read_level_num;
read_value += read_value_num;
// Verify the per-chunk counts match the exact bits we compared below.
let mut chunk_value_count = 0;
for i in 0..read_level_num {
assert!(!decoded.is_empty());
//check each read bit
let read_bit = decoded.get_bit(i);
if read_bit {
read_value += 1;
chunk_value_count += 1;
}
let expect_bit = expected.get_bit(i + offset);
assert_eq!(read_bit, expect_bit);
}
assert_eq!(chunk_value_count, read_value_num);
}
}
assert_eq!(read_level + skip_level, len);
Expand Down
102 changes: 79 additions & 23 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use bytes::Bytes;

use crate::basic::Encoding;
Expand Down Expand Up @@ -68,9 +66,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
}

pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
/// Read up to `num_levels` definition levels into `out`
/// Read up to `num_levels` definition levels into `out`.
///
/// Returns the number of values skipped, and the number of levels skipped
/// Returns the number of values read, and the number of levels read.
///
/// # Panics
///
Expand All @@ -81,9 +79,9 @@ pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
num_levels: usize,
) -> Result<(usize, usize)>;

/// Skips over `num_levels` definition levels
/// Skips over `num_levels` definition levels.
///
/// Returns the number of values skipped, and the number of levels skipped
/// Returns the number of values skipped, and the number of levels skipped.
fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
}

Expand Down Expand Up @@ -136,14 +134,75 @@ pub trait ColumnValueDecoder {
fn skip_values(&mut self, num_values: usize) -> Result<usize>;
}

/// Bucket-based storage for decoder instances keyed by `Encoding`.
///
/// This replaces `HashMap` lookups with direct indexing to avoid hashing overhead in the
/// hot decoding paths.
const ENCODING_SLOTS: usize = 10; // covers the encodings handled in `enc_slot`

#[inline]
fn enc_slot(e: Encoding) -> usize {
match e {
Encoding::PLAIN => 0,
Encoding::PLAIN_DICTIONARY => 2,
Encoding::RLE => 3,
#[allow(deprecated)]
Encoding::BIT_PACKED => 4,
Encoding::DELTA_BINARY_PACKED => 5,
Encoding::DELTA_LENGTH_BYTE_ARRAY => 6,
Encoding::DELTA_BYTE_ARRAY => 7,
Encoding::RLE_DICTIONARY => 8,
Encoding::BYTE_STREAM_SPLIT => 9,
}
}

/// Fixed-capacity storage for decoder instances keyed by Parquet encoding.
struct DecoderBuckets<V> {
inner: [Option<V>; ENCODING_SLOTS],
}

impl<V> DecoderBuckets<V> {
#[inline]
fn new() -> Self {
Self {
inner: std::array::from_fn(|_| None),
}
}

#[inline]
fn contains_key(&self, e: Encoding) -> bool {
self.inner[enc_slot(e)].is_some()
}

#[inline]
fn get_mut(&mut self, e: Encoding) -> Option<&mut V> {
self.inner[enc_slot(e)].as_mut()
}

#[inline]
fn insert_and_get_mut(&mut self, e: Encoding, v: V) -> &mut V {
let slot = &mut self.inner[enc_slot(e)];
debug_assert!(slot.is_none());
*slot = Some(v);
slot.as_mut().unwrap()
}
}

impl<V> Default for DecoderBuckets<V> {
fn default() -> Self {
Self::new()
}
}

/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
pub struct ColumnValueDecoderImpl<T: DataType> {
descr: ColumnDescPtr,

current_encoding: Option<Encoding>,

// Cache of decoders for existing encodings
decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
/// Cache of decoders for existing encodings.
/// Uses `DecoderBuckets` instead of `HashMap` for predictable indexing.
decoders: DecoderBuckets<Box<dyn Decoder<T>>>,
}

impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
Expand All @@ -153,7 +212,7 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
Self {
descr: descr.clone(),
current_encoding: None,
decoders: Default::default(),
decoders: DecoderBuckets::new(),
}
}

Expand All @@ -168,7 +227,7 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
encoding = Encoding::RLE_DICTIONARY
}

if self.decoders.contains_key(&encoding) {
if self.decoders.contains_key(encoding) {
return Err(general_err!("Column cannot have more than one dictionary"));
}

Expand All @@ -178,7 +237,8 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {

let mut decoder = DictDecoder::new();
decoder.set_dict(Box::new(dictionary))?;
self.decoders.insert(encoding, Box::new(decoder));
self.decoders
.insert_and_get_mut(encoding, Box::new(decoder));
Ok(())
} else {
Err(nyi_err!(
Expand All @@ -195,24 +255,20 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
num_levels: usize,
num_values: Option<usize>,
) -> Result<()> {
use std::collections::hash_map::Entry;

if encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY;
}

let decoder = if encoding == Encoding::RLE_DICTIONARY {
self.decoders
.get_mut(&encoding)
.get_mut(encoding)
.expect("Decoder for dict should have been set")
} else {
// Search cache for data page decoder
match self.decoders.entry(encoding) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(v) => {
let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
v.insert(data_decoder)
}
if let Some(decoder) = self.decoders.get_mut(encoding) {
decoder
} else {
let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
self.decoders.insert_and_get_mut(encoding, data_decoder)
}
};

Expand All @@ -228,7 +284,7 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {

let current_decoder = self
.decoders
.get_mut(&encoding)
.get_mut(encoding)
.unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));

// TODO: Push vec into decoder (#5177)
Expand All @@ -246,7 +302,7 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {

let current_decoder = self
.decoders
.get_mut(&encoding)
.get_mut(encoding)
.unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));

current_decoder.skip(num_values)
Expand Down