1
+ // Licensed to the Apache Software Foundation (ASF) under one
2
+ // or more contributor license agreements. See the NOTICE file
3
+ // distributed with this work for additional information
4
+ // regarding copyright ownership. The ASF licenses this file
5
+ // to you under the Apache License, Version 2.0 (the
6
+ // "License"); you may not use this file except in compliance
7
+ // with the License. You may obtain a copy of the License at
8
+ //
9
+ // http://www.apache.org/licenses/LICENSE-2.0
10
+ //
11
+ // Unless required by applicable law or agreed to in writing,
12
+ // software distributed under the License is distributed on an
13
+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
+ // KIND, either express or implied. See the License for the
15
+ // specific language governing permissions and limitations
16
+ // under the License.
17
+
18
+ #include " exec/schema_scanner/schema_workload_group_privileges.h"
19
+
20
+ #include " runtime/client_cache.h"
21
+ #include " runtime/exec_env.h"
22
+ #include " runtime/runtime_state.h"
23
+ #include " util/thrift_rpc_helper.h"
24
+ #include " vec/common/string_ref.h"
25
+ #include " vec/core/block.h"
26
+ #include " vec/data_types/data_type_factory.hpp"
27
+
28
+ namespace doris {
29
+ std::vector<SchemaScanner::ColumnDesc> SchemaWorkloadGroupPrivilegesScanner::_s_tbls_columns = {
30
+ {" GRANTEE" , TYPE_VARCHAR, sizeof (StringRef), true },
31
+ {" WORKLOAD_GROUP_NAME" , TYPE_VARCHAR, sizeof (StringRef), true },
32
+ {" PRIVILEGE_TYPE" , TYPE_VARCHAR, sizeof (StringRef), true },
33
+ {" IS_GRANTABLE" , TYPE_VARCHAR, sizeof (StringRef), true },
34
+ };
35
+
36
+ SchemaWorkloadGroupPrivilegesScanner::SchemaWorkloadGroupPrivilegesScanner ()
37
+ : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_WORKLOAD_GROUPS) {}
38
+
39
+ SchemaWorkloadGroupPrivilegesScanner::~SchemaWorkloadGroupPrivilegesScanner () {}
40
+
41
+ Status SchemaWorkloadGroupPrivilegesScanner::start (RuntimeState* state) {
42
+ _block_rows_limit = state->batch_size ();
43
+ _rpc_timeout = state->execution_timeout () * 1000 ;
44
+ return Status::OK ();
45
+ }
46
+
47
+ Status SchemaWorkloadGroupPrivilegesScanner::_get_workload_group_privs_block_from_fe () {
48
+ TNetworkAddress master_addr = ExecEnv::GetInstance ()->master_info ()->network_address ;
49
+
50
+ TSchemaTableRequestParams schema_table_request_params;
51
+ for (int i = 0 ; i < _s_tbls_columns.size (); i++) {
52
+ schema_table_request_params.__isset .columns_name = true ;
53
+ schema_table_request_params.columns_name .emplace_back (_s_tbls_columns[i].name );
54
+ }
55
+ schema_table_request_params.__set_current_user_ident (*_param->common_param ->current_user_ident );
56
+
57
+ TFetchSchemaTableDataRequest request;
58
+ request.__set_schema_table_name (TSchemaTableName::WORKLOAD_GROUP_PRIVILEGES);
59
+ request.__set_schema_table_params (schema_table_request_params);
60
+
61
+ TFetchSchemaTableDataResult result;
62
+
63
+ RETURN_IF_ERROR (ThriftRpcHelper::rpc<FrontendServiceClient>(
64
+ master_addr.hostname , master_addr.port ,
65
+ [&request, &result](FrontendServiceConnection& client) {
66
+ client->fetchSchemaTableData (result, request);
67
+ },
68
+ _rpc_timeout));
69
+
70
+ Status status (Status::create (result.status ));
71
+ if (!status.ok ()) {
72
+ LOG (WARNING) << " fetch workload group privileges from FE failed, errmsg=" << status;
73
+ return status;
74
+ }
75
+ std::vector<TRow> result_data = result.data_batch ;
76
+
77
+ _workload_groups_privs_block = vectorized::Block::create_unique ();
78
+ for (int i = 0 ; i < _s_tbls_columns.size (); ++i) {
79
+ TypeDescriptor descriptor (_s_tbls_columns[i].type );
80
+ auto data_type = vectorized::DataTypeFactory::instance ().create_data_type (descriptor, true );
81
+ _workload_groups_privs_block->insert (vectorized::ColumnWithTypeAndName (
82
+ data_type->create_column (), data_type, _s_tbls_columns[i].name ));
83
+ }
84
+
85
+ if (result_data.size () > 0 ) {
86
+ int col_size = result_data[0 ].column_value .size ();
87
+ if (col_size != _s_tbls_columns.size ()) {
88
+ return Status::InternalError<false >(
89
+ " workload group privileges schema is not match for FE and BE" );
90
+ }
91
+ }
92
+
93
+ _workload_groups_privs_block->reserve (result_data.size ());
94
+
95
+ for (int i = 0 ; i < result_data.size (); i++) {
96
+ TRow row = result_data[i];
97
+
98
+ for (int j = 0 ; j < _s_tbls_columns.size (); j++) {
99
+ RETURN_IF_ERROR (insert_block_column (row.column_value [j], j,
100
+ _workload_groups_privs_block.get (),
101
+ _s_tbls_columns[j].type ));
102
+ }
103
+ }
104
+ return Status::OK ();
105
+ }
106
+
107
+ Status SchemaWorkloadGroupPrivilegesScanner::get_next_block (vectorized::Block* block, bool * eos) {
108
+ if (!_is_init) {
109
+ return Status::InternalError (" Used before initialized." );
110
+ }
111
+
112
+ if (nullptr == block || nullptr == eos) {
113
+ return Status::InternalError (" input pointer is nullptr." );
114
+ }
115
+
116
+ if (_workload_groups_privs_block == nullptr ) {
117
+ RETURN_IF_ERROR (_get_workload_group_privs_block_from_fe ());
118
+ _total_rows = _workload_groups_privs_block->rows ();
119
+ }
120
+
121
+ if (_row_idx == _total_rows) {
122
+ *eos = true ;
123
+ return Status::OK ();
124
+ }
125
+
126
+ int current_batch_rows = std::min (_block_rows_limit, _total_rows - _row_idx);
127
+ vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block (block);
128
+ RETURN_IF_ERROR (
129
+ mblock.add_rows (_workload_groups_privs_block.get (), _row_idx, current_batch_rows));
130
+ _row_idx += current_batch_rows;
131
+
132
+ *eos = _row_idx == _total_rows;
133
+ return Status::OK ();
134
+ }
135
+
136
+ } // namespace doris
0 commit comments