52
52
#include " exec/schema_scanner/schema_workload_groups_scanner.h"
53
53
#include " exec/schema_scanner/schema_workload_sched_policy_scanner.h"
54
54
#include " olap/hll.h"
55
+ #include " pipeline/dependency.h"
55
56
#include " runtime/define_primitive_type.h"
57
+ #include " runtime/fragment_mgr.h"
58
+ #include " runtime/types.h"
56
59
#include " util/string_util.h"
57
60
#include " util/types.h"
58
61
#include " vec/columns/column.h"
66
69
#include " vec/core/column_with_type_and_name.h"
67
70
#include " vec/core/types.h"
68
71
#include " vec/data_types/data_type.h"
72
+ #include " vec/data_types/data_type_factory.hpp"
69
73
70
74
namespace doris {
71
75
class ObjectPool ;
@@ -86,7 +90,60 @@ Status SchemaScanner::start(RuntimeState* state) {
86
90
return Status::OK ();
87
91
}
88
92
89
- Status SchemaScanner::get_next_block (vectorized::Block* block, bool * eos) {
93
+ Status SchemaScanner::get_next_block (RuntimeState* state, vectorized::Block* block, bool * eos) {
94
+ if (_data_block == nullptr ) {
95
+ return Status::InternalError (" No data left!" );
96
+ }
97
+ DCHECK (_async_thread_running == false );
98
+ RETURN_IF_ERROR (_scanner_status.status ());
99
+ for (size_t i = 0 ; i < block->columns (); i++) {
100
+ std::move (*block->get_by_position (i).column )
101
+ .mutate ()
102
+ ->insert_range_from (*_data_block->get_by_position (i).column , 0 ,
103
+ _data_block->rows ());
104
+ }
105
+ _data_block->clear_column_data ();
106
+ *eos = _eos;
107
+ if (!*eos) {
108
+ RETURN_IF_ERROR (get_next_block_async (state));
109
+ }
110
+ return Status::OK ();
111
+ }
112
+
113
+ Status SchemaScanner::get_next_block_async (RuntimeState* state) {
114
+ _dependency->block ();
115
+ auto task_ctx = state->get_task_execution_context ();
116
+ RETURN_IF_ERROR (ExecEnv::GetInstance ()->fragment_mgr ()->get_thread_pool ()->submit_func (
117
+ [this , task_ctx, state]() {
118
+ DCHECK (_async_thread_running == false );
119
+ auto task_lock = task_ctx.lock ();
120
+ if (task_lock == nullptr ) {
121
+ _scanner_status.update (Status::InternalError (" Task context not exists!" ));
122
+ return ;
123
+ }
124
+ SCOPED_ATTACH_TASK (state);
125
+ _dependency->block ();
126
+ _async_thread_running = true ;
127
+ _finish_dependency->block ();
128
+ if (!_opened) {
129
+ _data_block = vectorized::Block::create_unique ();
130
+ _init_block (_data_block.get ());
131
+ _scanner_status.update (start (state));
132
+ _opened = true ;
133
+ }
134
+ bool eos = false ;
135
+ _scanner_status.update (get_next_block_internal (_data_block.get (), &eos));
136
+ _eos = eos;
137
+ _async_thread_running = false ;
138
+ _dependency->set_ready ();
139
+ if (eos) {
140
+ _finish_dependency->set_ready ();
141
+ }
142
+ }));
143
+ return Status::OK ();
144
+ }
145
+
146
+ Status SchemaScanner::get_next_block_internal (vectorized::Block* block, bool * eos) {
90
147
if (!_is_init) {
91
148
return Status::InternalError (" used before initialized." );
92
149
}
@@ -179,6 +236,16 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
179
236
}
180
237
}
181
238
239
+ void SchemaScanner::_init_block (vectorized::Block* src_block) {
240
+ const std::vector<SchemaScanner::ColumnDesc>& columns_desc (get_column_desc ());
241
+ for (int i = 0 ; i < columns_desc.size (); ++i) {
242
+ TypeDescriptor descriptor (columns_desc[i].type );
243
+ auto data_type = vectorized::DataTypeFactory::instance ().create_data_type (descriptor, true );
244
+ src_block->insert (vectorized::ColumnWithTypeAndName (data_type->create_column (), data_type,
245
+ columns_desc[i].name ));
246
+ }
247
+ }
248
+
182
249
Status SchemaScanner::fill_dest_column_for_range (vectorized::Block* block, size_t pos,
183
250
const std::vector<void *>& datas) {
184
251
const ColumnDesc& col_desc = _columns[pos];
0 commit comments