Skip to content

Commit 0e2bd4d

Browse files
committed
[Fix](parquet-reader) Fix definition level rle decode dead loop in parquet-reader.
1 parent 9d87bbf commit 0e2bd4d

File tree

4 files changed

+28
-3
lines changed

4 files changed

+28
-3
lines changed

be/src/util/bit_stream_utils.h

+4
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ class BitReader {
145145

146146
bool is_initialized() const { return buffer_ != nullptr; }
147147

148+
const uint8_t* buffer() const { return buffer_; }
149+
150+
int max_bytes() const { return max_bytes_; }
151+
148152
private:
149153
// Used by SeekToBit() and GetValue() to fetch the
150154
// the next word into buffer_.

be/src/util/rle_encoding.h

+2
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ class RleDecoder {
120120
// Get current repeated value, make sure that count equals repeated_count()
121121
T get_repeated_value(size_t count);
122122

123+
const BitReader& bit_reader() const { return bit_reader_; }
124+
123125
private:
124126
bool ReadHeader();
125127

be/src/vec/exec/format/parquet/level_decoder.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ class LevelDecoder {
5656

5757
inline void rewind_one() { _rle_decoder.RewindOne(); }
5858

59+
const RleDecoder<level_t>& rle_decoder() const { return _rle_decoder; }
60+
5961
private:
6062
tparquet::Encoding::type _encoding;
6163
level_t _bit_width = 0;
@@ -65,4 +67,4 @@ class LevelDecoder {
6567
BitReader _bit_packed_decoder;
6668
};
6769

68-
} // namespace doris::vectorized
70+
} // namespace doris::vectorized

be/src/vec/exec/format/parquet/vparquet_column_reader.cpp

+19-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// Licensed to the Apache Software Foundation (ASF) under one
2+
23
// or more contributor license agreements. See the NOTICE file
34
// distributed with this work for additional information
45
// regarding copyright ownership. The ASF licenses this file
@@ -209,7 +210,15 @@ Status ScalarColumnReader::_skip_values(size_t num_values) {
209210
level_t def_level = -1;
210211
size_t loop_skip = def_decoder.get_next_run(&def_level, num_values - skipped);
211212
if (loop_skip == 0) {
212-
continue;
213+
std::stringstream ss;
214+
auto& bit_reader = def_decoder.rle_decoder().bit_reader();
215+
ss << "def_decoder buffer (hex): ";
216+
for (size_t i = 0; i < bit_reader.max_bytes(); ++i) {
217+
ss << std::hex << std::setw(2) << std::setfill('0')
218+
<< static_cast<int>(bit_reader.buffer()[i]) << " ";
219+
}
220+
LOG(WARNING) << ss.str();
221+
return Status::InternalError("Failed to decode definition level.");
213222
}
214223
if (def_level == 0) {
215224
null_size += loop_skip;
@@ -254,7 +263,15 @@ Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_colu
254263
level_t def_level;
255264
size_t loop_read = def_decoder.get_next_run(&def_level, num_values - has_read);
256265
if (loop_read == 0) {
257-
continue;
266+
std::stringstream ss;
267+
auto& bit_reader = def_decoder.rle_decoder().bit_reader();
268+
ss << "def_decoder buffer (hex): ";
269+
for (size_t i = 0; i < bit_reader.max_bytes(); ++i) {
270+
ss << std::hex << std::setw(2) << std::setfill('0')
271+
<< static_cast<int>(bit_reader.buffer()[i]) << " ";
272+
}
273+
LOG(WARNING) << ss.str();
274+
return Status::InternalError("Failed to decode definition level.");
258275
}
259276
bool is_null = def_level == 0;
260277
if (!(prev_is_null ^ is_null)) {

0 commit comments

Comments
 (0)