@@ -50,20 +50,16 @@ using namespace ErrorCode;
50
50
51
51
MemTable::MemTable (int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema,
52
52
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
53
- bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info,
54
- const std::shared_ptr<MemTracker>& insert_mem_tracker,
55
- const std::shared_ptr<MemTracker>& flush_mem_tracker)
56
- : _tablet_id(tablet_id),
53
+ bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info)
54
+ : _mem_type(MemType::ACTIVE),
55
+ _tablet_id (tablet_id),
57
56
_enable_unique_key_mow(enable_unique_key_mow),
58
57
_keys_type(tablet_schema->keys_type ()),
59
58
_tablet_schema(tablet_schema),
60
- _insert_mem_tracker(insert_mem_tracker),
61
- _flush_mem_tracker(flush_mem_tracker),
62
59
_is_first_insertion(true ),
63
60
_agg_functions(tablet_schema->num_columns ()),
64
61
_offsets_of_aggregate_states(tablet_schema->num_columns ()),
65
- _total_size_of_aggregate_states(0 ),
66
- _mem_usage(0 ) {
62
+ _total_size_of_aggregate_states(0 ) {
67
63
g_memtable_cnt << 1 ;
68
64
_query_thread_context.init_unlocked ();
69
65
_arena = std::make_unique<vectorized::Arena>();
@@ -82,6 +78,7 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schem
82
78
}
83
79
// TODO: Support ZOrderComparator in the future
84
80
_init_columns_offset_by_slot_descs (slot_descs, tuple_desc);
81
+ _mem_tracker = std::make_shared<MemTracker>();
85
82
}
86
83
87
84
void MemTable::_init_columns_offset_by_slot_descs (const std::vector<SlotDescriptor*>* slot_descs,
@@ -142,6 +139,13 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
142
139
143
140
MemTable::~MemTable () {
144
141
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER (_query_thread_context.query_mem_tracker );
142
+ if (_is_flush_success) {
143
+ // If the memtable is flush success, then its memtracker's consumption should be 0
144
+ if (_mem_tracker->consumption () != 0 && config::crash_in_memory_tracker_inaccurate) {
145
+ LOG (FATAL) << " memtable flush success but cosumption is not 0, it is "
146
+ << _mem_tracker->consumption ();
147
+ }
148
+ }
145
149
g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes ();
146
150
g_memtable_cnt << -1 ;
147
151
if (_keys_type != KeysType::DUP_KEYS) {
@@ -159,13 +163,7 @@ MemTable::~MemTable() {
159
163
}
160
164
}
161
165
std::for_each (_row_in_blocks.begin (), _row_in_blocks.end (), std::default_delete<RowInBlock>());
162
- _insert_mem_tracker->release (_mem_usage);
163
- _flush_mem_tracker->set_consumption (0 );
164
- DCHECK_EQ (_insert_mem_tracker->consumption (), 0 ) << std::endl
165
- << _insert_mem_tracker->log_usage ();
166
- DCHECK_EQ (_flush_mem_tracker->consumption (), 0 );
167
166
_arena.reset ();
168
- _agg_buffer_pool.clear ();
169
167
_vec_row_comparator.reset ();
170
168
_row_in_blocks.clear ();
171
169
_agg_functions.clear ();
@@ -180,6 +178,7 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r
180
178
181
179
Status MemTable::insert (const vectorized::Block* input_block,
182
180
const std::vector<uint32_t >& row_idxs) {
181
+ SCOPED_CONSUME_MEM_TRACKER (_mem_tracker);
183
182
if (_is_first_insertion) {
184
183
_is_first_insertion = false ;
185
184
auto clone_block = input_block->clone_without_columns (&_column_offset);
@@ -214,10 +213,6 @@ Status MemTable::insert(const vectorized::Block* input_block,
214
213
row_idxs.data () + num_rows, &_column_offset));
215
214
auto block_size1 = _input_mutable_block.allocated_bytes ();
216
215
g_memtable_input_block_allocated_size << block_size1 - block_size0;
217
- auto input_size = size_t (input_block->bytes () * num_rows / input_block->rows () *
218
- config::memtable_insert_memory_ratio);
219
- _mem_usage += input_size;
220
- _insert_mem_tracker->consume (input_size);
221
216
for (int i = 0 ; i < num_rows; i++) {
222
217
_row_in_blocks.emplace_back (new RowInBlock {cursor_in_mutableblock + i});
223
218
}
@@ -467,10 +462,6 @@ void MemTable::_aggregate() {
467
462
}
468
463
if constexpr (!is_final) {
469
464
// if is not final, we collect the agg results to input_block and then continue to insert
470
- size_t shrunked_after_agg = _output_mutable_block.allocated_bytes ();
471
- // flush will not run here, so will not duplicate `_flush_mem_tracker`
472
- _insert_mem_tracker->consume (shrunked_after_agg - _mem_usage);
473
- _mem_usage = shrunked_after_agg;
474
465
_input_mutable_block.swap (_output_mutable_block);
475
466
// TODO(weixang):opt here.
476
467
std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block (0 );
@@ -483,6 +474,7 @@ void MemTable::_aggregate() {
483
474
}
484
475
485
476
void MemTable::shrink_memtable_by_agg () {
477
+ SCOPED_CONSUME_MEM_TRACKER (_mem_tracker);
486
478
if (_keys_type == KeysType::DUP_KEYS) {
487
479
return ;
488
480
}
@@ -528,8 +520,8 @@ Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
528
520
}
529
521
g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes ();
530
522
_input_mutable_block.clear ();
531
- _insert_mem_tracker-> release (_mem_usage);
532
- _mem_usage = 0 ;
523
+ // After to block, all data in arena is saved in the block
524
+ _arena. reset () ;
533
525
*res = vectorized::Block::create_unique (_output_mutable_block.to_block ());
534
526
return Status::OK ();
535
527
}
0 commit comments