Skip to content

refactor: Connector refactor #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Prev Previous commit
Next Next commit
[11067] Support scan filter for decimal in ORC
  • Loading branch information
rui-mo authored and GlutenPerfBot committed May 15, 2025
commit 4331529c320654425fd7972c0338025358b8deeb
14 changes: 10 additions & 4 deletions velox/dwio/dwrf/reader/ReaderBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,19 @@ std::shared_ptr<const Type> ReaderBase::convertType(
return SMALLINT();
case TypeKind::INTEGER:
return INTEGER();
case TypeKind::BIGINT:
case TypeKind::BIGINT: {
TypePtr converted;
if (type.format() == DwrfFormat::kOrc &&
type.getOrcPtr()->kind() == proto::orc::Type_Kind_DECIMAL) {
return DECIMAL(
type.getOrcPtr()->precision(), type.getOrcPtr()->scale());
converted =
DECIMAL(type.getOrcPtr()->precision(), type.getOrcPtr()->scale());
} else {
converted = BIGINT();
common::testutil::TestValue::adjust(
"facebook::velox::dwrf::ReaderBase::convertType", &converted);
}
return BIGINT();
return converted;
}
case TypeKind::HUGEINT:
if (type.format() == DwrfFormat::kOrc &&
type.getOrcPtr()->kind() == proto::orc::Type_Kind_DECIMAL) {
Expand Down
180 changes: 168 additions & 12 deletions velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ void SelectiveDecimalColumnReader<DataT>::seekToRowGroup(int64_t index) {

template <typename DataT>
template <bool kDense>
void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
vector_size_t numRows = rows.back() + 1;
void SelectiveDecimalColumnReader<DataT>::readHelper(
common::Filter* filter,
RowSet rows) {
ExtractToReader extractValues(this);
common::AlwaysTrue filter;
common::AlwaysTrue alwaysTrue;
DirectRleColumnVisitor<
int64_t,
common::AlwaysTrue,
decltype(extractValues),
kDense>
visitor(filter, this, rows, extractValues);
visitor(alwaysTrue, this, rows, extractValues);

// decode scale stream
if (version_ == velox::dwrf::RleVersion_1) {
Expand All @@ -104,46 +105,201 @@ void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
// reset numValues_ before reading values
numValues_ = 0;
valueSize_ = sizeof(DataT);
vector_size_t numRows = rows.back() + 1;
ensureValuesCapacity<DataT>(numRows);

// decode value stream
facebook::velox::dwio::common::
ColumnVisitor<DataT, common::AlwaysTrue, decltype(extractValues), kDense>
valueVisitor(filter, this, rows, extractValues);
valueVisitor(alwaysTrue, this, rows, extractValues);
decodeWithVisitor<DirectDecoder<true>>(valueDecoder_.get(), valueVisitor);
readOffset_ += numRows;

// Fill decimals before applying filter.
fillDecimals();

const auto rawNulls = nullsInReadRange_
? (kDense ? nullsInReadRange_->as<uint64_t>() : rawResultNulls_)
: nullptr;
// Process filter.
process(filter, rows, rawNulls);
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processNulls(
bool isNull,
const RowSet& rows,
const uint64_t* rawNulls) {
if (!rawNulls) {
return;
}
returnReaderNulls_ = false;
anyNulls_ = !isNull;
allNull_ = isNull;

auto rawDecimal = values_->asMutable<DataT>();
auto rawScale = scaleBuffer_->asMutable<int64_t>();

vector_size_t idx = 0;
if (isNull) {
for (vector_size_t i = 0; i < numValues_; i++) {
if (bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
idx++;
}
}
} else {
for (vector_size_t i = 0; i < numValues_; i++) {
if (!bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx, false);
rawDecimal[idx] = rawDecimal[i];
rawScale[idx] = rawScale[i];
addOutputRow(rows[i]);
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processFilter(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls) {
VELOX_CHECK_NOT_NULL(filter, "Filter must not be null.");
returnReaderNulls_ = false;
anyNulls_ = false;
allNull_ = true;

vector_size_t idx = 0;
auto rawDecimal = values_->asMutable<DataT>();
for (vector_size_t i = 0; i < numValues_; i++) {
if (rawNulls && bits::isBitNull(rawNulls, i)) {
if (filter->testNull()) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
anyNulls_ = true;
idx++;
}
} else {
bool tested;
if constexpr (std::is_same_v<DataT, int64_t>) {
tested = filter->testInt64(rawDecimal[i]);
} else {
tested = filter->testInt128(rawDecimal[i]);
}

if (tested) {
if (rawNulls) {
bits::setNull(rawResultNulls_, idx, false);
}
rawDecimal[idx] = rawDecimal[i];
addOutputRow(rows[i]);
allNull_ = false;
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::process(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls) {
// Treat the filter as kAlwaysTrue if any of the following conditions are met:
// 1) No filter found;
// 2) Filter is kIsNotNull but rawNulls == NULL (no elements is null).
auto filterKind =
!filter || (filter->kind() == common::FilterKind::kIsNotNull && !rawNulls)
? common::FilterKind::kAlwaysTrue
: filter->kind();
switch (filterKind) {
case common::FilterKind::kAlwaysTrue:
// Simply add all rows to output.
for (vector_size_t i = 0; i < numValues_; i++) {
addOutputRow(rows[i]);
}
break;
case common::FilterKind::kIsNull:
processNulls(true, rows, rawNulls);
break;
case common::FilterKind::kIsNotNull:
processNulls(false, rows, rawNulls);
break;
case common::FilterKind::kBigintRange:
case common::FilterKind::kBigintValuesUsingHashTable:
case common::FilterKind::kBigintValuesUsingBitmask:
case common::FilterKind::kNegatedBigintRange:
case common::FilterKind::kNegatedBigintValuesUsingHashTable:
case common::FilterKind::kNegatedBigintValuesUsingBitmask:
case common::FilterKind::kBigintMultiRange: {
if constexpr (std::is_same_v<DataT, int64_t>) {
processFilter(filter, rows, rawNulls);
} else {
const auto actualType = CppToType<DataT>::create();
VELOX_NYI(
"Expected type BIGINT, but found file type {}.",
actualType->toString());
}
break;
}
case common::FilterKind::kHugeintValuesUsingHashTable:
case common::FilterKind::kHugeintRange: {
if constexpr (std::is_same_v<DataT, int128_t>) {
processFilter(filter, rows, rawNulls);
} else {
const auto actualType = CppToType<DataT>::create();
VELOX_NYI(
"Expected type HUGEINT, but found file type {}.",
actualType->toString());
}
break;
}
default:
VELOX_NYI("Unsupported filter: {}.", static_cast<int>(filterKind));
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::read(
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
VELOX_CHECK(!scanSpec_->filter());
VELOX_CHECK(!scanSpec_->valueHook());
prepareRead<int64_t>(offset, rows, incomingNulls);
if (!resultNulls_ || !resultNulls_->unique() ||
resultNulls_->capacity() * 8 < rows.size()) {
// Make sure a dedicated resultNulls_ is allocated with enough capacity as
// RleDecoder always assumes it is available.
resultNulls_ = AlignedBuffer::allocate<bool>(rows.size(), memoryPool_);
rawResultNulls_ = resultNulls_->asMutable<uint64_t>();
}
bool isDense = rows.back() == rows.size() - 1;
if (isDense) {
readHelper<true>(rows);
readHelper<true>(scanSpec_->filter(), rows);
} else {
readHelper<false>(rows);
readHelper<false>(scanSpec_->filter(), rows);
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::getValues(
const RowSet& rows,
VectorPtr* result) {
rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::fillDecimals() {
auto nullsPtr =
resultNulls() ? resultNulls()->template as<uint64_t>() : nullptr;
auto scales = scaleBuffer_->as<int64_t>();
auto values = values_->asMutable<DataT>();

DecimalUtil::fillDecimals<DataT>(
values, nullsPtr, values, scales, numValues_, scale_);

rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template class SelectiveDecimalColumnReader<int64_t>;
Expand Down
19 changes: 18 additions & 1 deletion velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,24 @@ class SelectiveDecimalColumnReader : public SelectiveColumnReader {

private:
template <bool kDense>
void readHelper(RowSet rows);
void readHelper(common::Filter* filter, RowSet rows);

// Process IsNull and IsNotNull filters.
void processNulls(bool isNull, const RowSet& rows, const uint64_t* rawNulls);

// Process filters on decimal values.
void processFilter(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls);

// Dispatch to the respective filter processing based on the filter type.
void process(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls);

void fillDecimals();

std::unique_ptr<IntDecoder<true>> valueDecoder_;
std::unique_ptr<IntDecoder<true>> scaleDecoder_;
Expand Down
Loading