Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ struct OlapReaderStatistics {
int64_t lazy_read_ns = 0;
int64_t block_lazy_read_seek_num = 0;
int64_t block_lazy_read_seek_ns = 0;
int64_t lazy_read_pruned_ns = 0;

int64_t raw_rows_read = 0;

Expand Down
360 changes: 262 additions & 98 deletions be/src/olap/rowset/segment_v2/column_reader.cpp

Large diffs are not rendered by default.

137 changes: 136 additions & 1 deletion be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/segment_v2.pb.h>
#include <glog/logging.h>
#include <sys/types.h>

#include <cstddef> // for size_t
Expand Down Expand Up @@ -403,12 +404,65 @@ class ColumnIterator {

virtual void remove_pruned_sub_iterators() {};

enum class ReadingMode : int {
NORMAL, // default mode
PREDICATE,
LAZY
};

virtual void set_reading_mode(ReadingMode mode) { _reading_mode = mode; }

virtual bool need_to_read() const {
switch (_reading_mode) {
case ReadingMode::NORMAL:
return _reading_flag != ReadingFlag::SKIP_READING;
case ReadingMode::PREDICATE:
return _reading_flag == ReadingFlag::READING_FOR_PREDICATE;
case ReadingMode::LAZY:
return _reading_flag == ReadingFlag::NEED_TO_READ;
default:
return false;
}
}

// Whether need to read meta columns, such as null map column, offset column.
bool need_to_read_meta_columns() const {
if (_reading_flag == ReadingFlag::SKIP_READING) {
return false;
}
switch (_reading_mode) {
case ReadingMode::NORMAL:
case ReadingMode::PREDICATE:
return true;
case ReadingMode::LAZY:
return _reading_flag != ReadingFlag::READING_FOR_PREDICATE;
}
return false;
}

virtual void finalize_lazy_mode(vectorized::MutableColumnPtr& dst) {
_recovery_from_place_holder_column(dst);
}

virtual void set_reading_flag_recursively(ReadingFlag flag) { set_reading_flag(flag); }

bool is_pruned() const { return _pruned; }

protected:
Result<TColumnAccessPaths> _get_sub_access_paths(const TColumnAccessPaths& access_paths);
void _convert_to_place_holder_column(vectorized::MutableColumnPtr& dst, size_t count);

void _recovery_from_place_holder_column(vectorized::MutableColumnPtr& dst);

Result<TColumnAccessPaths> _process_sub_access_paths(const TColumnAccessPaths& access_paths,
const bool is_predicate);
ColumnIteratorOptions _opts;

ReadingFlag _reading_flag {ReadingFlag::NORMAL_READING};
ReadingMode _reading_mode {ReadingMode::NORMAL};
std::string _column_name;
bool _pruned {false};

std::set<vectorized::IColumn*> _place_holder_columns;
};

// This iterator is used to read column data from file
Expand Down Expand Up @@ -560,6 +614,33 @@ class MapFileColumnIterator final : public ColumnIterator {

void remove_pruned_sub_iterators() override;

void set_reading_mode(ReadingMode mode) override;

bool need_to_read() const override {
switch (_reading_mode) {
case ReadingMode::NORMAL:
return _reading_flag != ReadingFlag::SKIP_READING;
case ReadingMode::PREDICATE:
return _reading_flag == ReadingFlag::READING_FOR_PREDICATE;
case ReadingMode::LAZY:
// For lazy mode, maybe some of key/value columns are needed to be read.
// For example:
// Map<Key, Value> the reading flags are:
// - Key: NEED_TO_READ
// - Value: READING_FOR_PREDICATE
// So the reading flag of the map column should be READING_FOR_PREDICATE.
// Thus when the reading mode is LAZY, we need to read the Key.
return _reading_flag == ReadingFlag::NEED_TO_READ ||
_reading_flag == ReadingFlag::READING_FOR_PREDICATE;
default:
return false;
}
}

void finalize_lazy_mode(vectorized::MutableColumnPtr& dst) override;

void set_reading_flag_recursively(ReadingFlag flag) override;

private:
std::shared_ptr<ColumnReader> _map_reader = nullptr;
ColumnIteratorUPtr _null_iterator;
Expand Down Expand Up @@ -596,6 +677,33 @@ class StructFileColumnIterator final : public ColumnIterator {

void remove_pruned_sub_iterators() override;

void set_reading_mode(ReadingMode mode) override;

bool need_to_read() const override {
switch (_reading_mode) {
case ReadingMode::NORMAL:
return _reading_flag != ReadingFlag::SKIP_READING;
case ReadingMode::PREDICATE:
return _reading_flag == ReadingFlag::READING_FOR_PREDICATE;
case ReadingMode::LAZY:
// For lazy mode, maybe some of sub-columns are needed to be read.
// For example:
// struct<col1, col2, col3> the reading flags are:
// - col1: NEED_TO_READ
// - col2: SKIP_READING
// - col3: READING_FOR_PREDICATE
// So the reading flag of the struct column should be READING_FOR_PREDICATE.
// Thus when the reading mode is LAZY, we need to read the col1.
return _reading_flag == ReadingFlag::NEED_TO_READ ||
_reading_flag == ReadingFlag::READING_FOR_PREDICATE;
default:
return false;
}
}

void finalize_lazy_mode(vectorized::MutableColumnPtr& dst) override;
void set_reading_flag_recursively(ReadingFlag flag) override;

private:
std::shared_ptr<ColumnReader> _struct_reader = nullptr;
ColumnIteratorUPtr _null_iterator;
Expand Down Expand Up @@ -630,6 +738,33 @@ class ArrayFileColumnIterator final : public ColumnIterator {

void remove_pruned_sub_iterators() override;

void set_reading_mode(ReadingMode mode) override;

bool need_to_read() const override {
switch (_reading_mode) {
case ReadingMode::NORMAL:
return _reading_flag != ReadingFlag::SKIP_READING;
case ReadingMode::PREDICATE:
return _reading_flag == ReadingFlag::READING_FOR_PREDICATE;
case ReadingMode::LAZY:
// For lazy mode, maybe some of key/value columns are needed to be read.
// For example:
// Map<Key, Value> the reading flags are:
// - Key: NEED_TO_READ
// - Value: READING_FOR_PREDICATE
// So the reading flag of the map column should be READING_FOR_PREDICATE.
// Thus when the reading mode is LAZY, we need to read the Key.
return _reading_flag == ReadingFlag::NEED_TO_READ ||
_reading_flag == ReadingFlag::READING_FOR_PREDICATE;
default:
return false;
}
}

void finalize_lazy_mode(vectorized::MutableColumnPtr& dst) override;

void set_reading_flag_recursively(ReadingFlag flag) override;

private:
std::shared_ptr<ColumnReader> _array_reader = nullptr;
std::unique_ptr<OffsetFileColumnIterator> _offset_iterator;
Expand Down
60 changes: 57 additions & 3 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
Expand Down Expand Up @@ -374,6 +375,10 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
_score_runtime = _opts.score_runtime;
_ann_topn_runtime = _opts.ann_topn_runtime;

_enable_prune_nested_column = _opts.io_ctx.reader_type == ReaderType::READER_QUERY &&
_opts.runtime_state &&
_opts.runtime_state->enable_prune_nested_column();

if (opts.output_columns != nullptr) {
_output_columns = *(opts.output_columns);
}
Expand Down Expand Up @@ -1736,6 +1741,17 @@ Status SegmentIterator::_vec_init_lazy_materialization() {
if (_is_common_expr_column[cid] || _is_pred_column[cid]) {
auto loc = _schema_block_id_map[cid];
_columns_to_filter.push_back(loc);

const auto field_type = _schema->column(cid)->type();
if (_is_common_expr_column[cid] && _enable_prune_nested_column &&
(field_type == FieldType::OLAP_FIELD_TYPE_STRUCT ||
field_type == FieldType::OLAP_FIELD_TYPE_ARRAY ||
field_type == FieldType::OLAP_FIELD_TYPE_MAP)) {
DCHECK(_column_iterators[cid]);
if (_column_iterators[cid]->is_pruned()) {
_support_lazy_read_pruned_columns.emplace(cid);
}
}
}
}

Expand Down Expand Up @@ -2080,6 +2096,13 @@ Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint16
})
}

const bool read_for_predicate = _support_lazy_read_pruned_columns.contains(cid);
if (read_for_predicate) {
_column_iterators[cid]->set_reading_mode(ColumnIterator::ReadingMode::PREDICATE);
} else {
_column_iterators[cid]->set_reading_mode(ColumnIterator::ReadingMode::NORMAL);
}

if (is_continuous) {
size_t rows_read = nrows_read;
_opts.stats->predicate_column_read_seek_num += 1;
Expand Down Expand Up @@ -2262,7 +2285,8 @@ Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
std::vector<rowid_t>& rowid_vector,
uint16_t* sel_rowid_idx, size_t select_size,
vectorized::MutableColumns* mutable_columns,
bool init_condition_cache) {
bool init_condition_cache,
bool read_for_predicate) {
SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
std::vector<rowid_t> rowids(select_size);

Expand Down Expand Up @@ -2306,6 +2330,15 @@ Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
"SegmentIterator meet invalid column, return columns size {}, cid {}",
_current_return_columns.size(), cid);
}

const bool should_read_for_predicate =
read_for_predicate && _support_lazy_read_pruned_columns.contains(cid);
if (should_read_for_predicate) {
_column_iterators[cid]->set_reading_mode(ColumnIterator::ReadingMode::PREDICATE);
} else {
_column_iterators[cid]->set_reading_mode(ColumnIterator::ReadingMode::NORMAL);
}

RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size,
_current_return_columns[cid]));
}
Expand Down Expand Up @@ -2483,7 +2516,7 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
SCOPED_RAW_TIMER(&_opts.stats->non_predicate_read_ns);
RETURN_IF_ERROR(_read_columns_by_rowids(
_common_expr_column_ids, _block_rowids, _sel_rowid_idx.data(),
_selected_size, &_current_return_columns));
_selected_size, &_current_return_columns, false, true));
_replace_version_col_if_needed(_common_expr_column_ids, _selected_size);
RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block));
}
Expand Down Expand Up @@ -2514,7 +2547,7 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
RETURN_IF_ERROR(_read_columns_by_rowids(
_non_predicate_columns, _block_rowids, _sel_rowid_idx.data(),
_selected_size, &_current_return_columns,
_opts.condition_cache_digest && !_find_condition_cache));
_opts.condition_cache_digest && !_find_condition_cache, false));
_replace_version_col_if_needed(_non_predicate_columns, _selected_size);
} else {
if (_opts.condition_cache_digest && !_find_condition_cache) {
Expand All @@ -2526,6 +2559,27 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
}
}
}

if (!_support_lazy_read_pruned_columns.empty()) {
SCOPED_RAW_TIMER(&_opts.stats->lazy_read_pruned_ns);
DorisVector<rowid_t> rowids(_selected_size);
for (size_t i = 0; i < _selected_size; ++i) {
rowids[i] = _block_rowids[_sel_rowid_idx[i]];
}

for (auto cid : _support_lazy_read_pruned_columns) {
auto loc = _schema_block_id_map[cid];
auto column = block->get_by_position(loc).column->assume_mutable();
_column_iterators[cid]->set_reading_mode(ColumnIterator::ReadingMode::LAZY);
if (_selected_size > 0) {
RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(),
_selected_size, column));
}
_column_iterators[cid]->finalize_lazy_mode(column);
_column_iterators[cid]->set_reading_mode(ColumnIterator::ReadingMode::NORMAL);
block->get_by_position(loc).column = std::move(column);
}
}
}

// step5: output columns
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ class SegmentIterator : public RowwiseIterator {
std::vector<rowid_t>& rowid_vector,
uint16_t* sel_rowid_idx, size_t select_size,
vectorized::MutableColumns* mutable_columns,
bool init_condition_cache = false);
bool init_condition_cache = false,
bool read_for_predicate = false);

Status copy_column_data_by_selector(vectorized::IColumn* input_col_ptr,
vectorized::MutableColumnPtr& output_col,
Expand Down Expand Up @@ -417,6 +418,9 @@ class SegmentIterator : public RowwiseIterator {
bool _is_need_short_eval = false;
bool _is_need_expr_eval = false;

std::set<ColumnId> _support_lazy_read_pruned_columns;
bool _enable_prune_nested_column = false;

// fields for vectorization execution
std::vector<ColumnId>
_vec_pred_column_ids; // keep columnId of columns for vectorized predicate evaluation
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ Status OlapScanLocalState::_init_profile() {
_lazy_read_seek_timer = ADD_TIMER(_segment_profile, "LazyReadSeekTime");
_lazy_read_seek_counter = ADD_COUNTER(_segment_profile, "LazyReadSeekCount", TUnit::UNIT);

_lazy_read_pruned_timer = ADD_TIMER(_segment_profile, "LazyReadPrunedTime");

_output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime");

_stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT);
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
RuntimeProfile::Counter* _lazy_read_timer = nullptr;
RuntimeProfile::Counter* _lazy_read_seek_timer = nullptr;
RuntimeProfile::Counter* _lazy_read_seek_counter = nullptr;
RuntimeProfile::Counter* _lazy_read_pruned_timer = nullptr;

// total pages read
// used by segment v2
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,11 @@ class RuntimeState {
return _query_options.__isset.enable_parallel_scan && _query_options.enable_parallel_scan;
}

bool enable_prune_nested_column() const {
return _query_options.__isset.enable_prune_nested_column &&
_query_options.enable_prune_nested_column;
}

bool is_read_csv_empty_line_as_null() const {
return _query_options.__isset.read_csv_empty_line_as_null &&
_query_options.read_csv_empty_line_as_null;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ void OlapScanner::_collect_profile_before_close() {
COUNTER_UPDATE(local_state->_predicate_column_read_seek_counter,
stats.predicate_column_read_seek_num);
COUNTER_UPDATE(local_state->_lazy_read_timer, stats.lazy_read_ns);
COUNTER_UPDATE(local_state->_lazy_read_pruned_timer, stats.lazy_read_pruned_ns);
COUNTER_UPDATE(local_state->_lazy_read_seek_timer, stats.block_lazy_read_seek_ns);
COUNTER_UPDATE(local_state->_lazy_read_seek_counter, stats.block_lazy_read_seek_num);
COUNTER_UPDATE(local_state->_output_col_timer, stats.output_col_ns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4927,6 +4927,8 @@ public TQueryOptions toThrift() {
tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
tResult.setSerdeDialect(getSerdeDialect());

tResult.setEnablePruneNestedColumn(enablePruneNestedColumns);

tResult.setEnableMatchWithoutInvertedIndex(enableMatchWithoutInvertedIndex);
tResult.setEnableFallbackOnMissingInvertedIndex(enableFallbackOnMissingInvertedIndex);
tResult.setEnableInvertedIndexSearcherCache(enableInvertedIndexSearcherCache);
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ struct TQueryOptions {

182: optional i32 ivf_nprobe = 1;

183: optional bool enable_prune_nested_column = false;

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down
Loading
Loading