@@ -61,6 +61,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
61
61
// storage
62
62
properties.set_partition_num (partitionNum);
63
63
}
64
+
64
65
if (replicaFactor == 0 ) {
65
66
replicaFactor = FLAGS_default_replica_factor;
66
67
if (replicaFactor <= 0 ) {
@@ -73,12 +74,14 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
73
74
// storage
74
75
properties.set_replica_factor (replicaFactor);
75
76
}
77
+
76
78
if (vidSize == 0 ) {
77
79
LOG (ERROR) << " Create Space Failed : vid_size is illegal: " << vidSize;
78
80
handleErrorCode (nebula::cpp2::ErrorCode::E_INVALID_PARM);
79
81
onFinished ();
80
82
return ;
81
83
}
84
+
82
85
if (vidType != nebula::cpp2::PropertyType::INT64 &&
83
86
vidType != nebula::cpp2::PropertyType::FIXED_STRING) {
84
87
LOG (ERROR) << " Create Space Failed : vid_type is illegal: "
@@ -87,6 +90,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
87
90
onFinished ();
88
91
return ;
89
92
}
93
+
90
94
if (vidType == nebula::cpp2::PropertyType::INT64 && vidSize != 8 ) {
91
95
LOG (ERROR) << " Create Space Failed : vid_size should be 8 if vid type is integer: " << vidSize;
92
96
handleErrorCode (nebula::cpp2::ErrorCode::E_INVALID_PARM);
@@ -105,148 +109,139 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
105
109
106
110
auto spaceId = nebula::value (idRet);
107
111
std::vector<kvstore::KV> data;
108
- data.emplace_back (MetaKeyUtils::indexSpaceKey (spaceName),
109
- std::string (reinterpret_cast <const char *>(&spaceId), sizeof (spaceId)));
110
- data.emplace_back (MetaKeyUtils::spaceKey (spaceId), MetaKeyUtils::spaceVal (properties));
111
-
112
+ std::vector<::std::string> zones;
112
113
nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
113
- if (!properties.get_zone_names ().empty ()) {
114
- auto zones = properties.get_zone_names ();
115
- for (auto & zone : zones) {
116
- auto zoneKey = MetaKeyUtils::zoneKey (zone);
117
- auto ret = doGet (zoneKey);
118
- if (!nebula::ok (ret)) {
119
- auto retCode = nebula::error (ret);
120
- if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
121
- code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND;
122
- }
123
- LOG (ERROR) << " Get Zone Name: " << zone << " failed." ;
124
- break ;
125
- }
126
- }
127
-
128
- if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
129
- LOG (ERROR) << " Create space failed" ;
114
+ if (properties.get_zone_names ().empty ()) {
115
+ const auto & zonePrefix = MetaKeyUtils::zonePrefix ();
116
+ auto zoneIterRet = doPrefix (zonePrefix);
117
+ if (!nebula::ok (zoneIterRet)) {
118
+ code = nebula::error (zoneIterRet);
119
+ LOG (ERROR) << " Get zones failed, error: " << apache::thrift::util::enumNameSafe (code);
130
120
handleErrorCode (code);
131
121
onFinished ();
132
122
return ;
133
123
}
134
124
135
- int32_t zoneNum = zones.size ();
136
- if (replicaFactor > zoneNum) {
137
- LOG (ERROR) << " Replication number should less than or equal to zone number." ;
138
- handleErrorCode (nebula::cpp2::ErrorCode::E_INVALID_PARM);
139
- onFinished ();
140
- return ;
125
+ auto zoneIter = nebula::value (zoneIterRet).get ();
126
+ while (zoneIter->valid ()) {
127
+ auto zoneName = MetaKeyUtils::parseZoneName (zoneIter->key ());
128
+ zones.emplace_back (std::move (zoneName));
129
+ zoneIter->next ();
141
130
}
142
131
143
- auto hostLoadingRet = getHostLoading ();
144
- if (!nebula::ok (hostLoadingRet)) {
145
- LOG (ERROR) << " Get host loading failed." ;
146
- auto retCode = nebula::error (hostLoadingRet);
147
- if (retCode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
148
- retCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
132
+ properties.set_zone_names (zones);
133
+ } else {
134
+ zones = properties.get_zone_names ();
135
+ }
136
+
137
+ data.emplace_back (MetaKeyUtils::indexSpaceKey (spaceName),
138
+ std::string (reinterpret_cast <const char *>(&spaceId), sizeof (spaceId)));
139
+ data.emplace_back (MetaKeyUtils::spaceKey (spaceId), MetaKeyUtils::spaceVal (properties));
140
+ for (auto & zone : zones) {
141
+ auto zoneKey = MetaKeyUtils::zoneKey (zone);
142
+ auto ret = doGet (zoneKey);
143
+ if (!nebula::ok (ret)) {
144
+ auto retCode = nebula::error (ret);
145
+ if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
146
+ code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND;
149
147
}
150
- handleErrorCode (retCode);
151
- onFinished ();
152
- return ;
148
+ LOG (ERROR) << " Get Zone Name: " << zone << " failed." ;
149
+ break ;
153
150
}
151
+ }
154
152
155
- hostLoading_ = std::move (nebula::value (hostLoadingRet));
156
- std::unordered_map<std::string, Hosts> zoneHosts;
157
- for (auto & zone : zones) {
158
- auto zoneKey = MetaKeyUtils::zoneKey (zone);
159
- auto zoneValueRet = doGet (std::move (zoneKey));
160
- if (!nebula::ok (zoneValueRet)) {
161
- code = nebula::error (zoneValueRet);
162
- if (code == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
163
- code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND;
164
- }
165
- LOG (ERROR) << " Get zone " << zone << " failed." ;
166
- break ;
167
- }
153
+ if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
154
+ LOG (ERROR) << " Create space failed" ;
155
+ handleErrorCode (code);
156
+ onFinished ();
157
+ return ;
158
+ }
168
159
169
- auto hosts = MetaKeyUtils::parseZoneHosts (std::move (nebula::value (zoneValueRet)));
170
- for (auto & host : hosts) {
171
- auto hostIter = hostLoading_.find (host);
172
- if (hostIter == hostLoading_.end ()) {
173
- hostLoading_[host] = 0 ;
174
- zoneLoading_[zone] += 0 ;
175
- } else {
176
- zoneLoading_[zone] += hostIter->second ;
177
- }
178
- }
179
- zoneHosts[zone] = std::move (hosts);
180
- }
160
+ int32_t zoneNum = zones.size ();
161
+ if (replicaFactor > zoneNum) {
162
+ LOG (ERROR) << " Replication number should less than or equal to zone number." ;
163
+ LOG (ERROR) << " Replication number: " << replicaFactor << " , Zones size: " << zones.size ();
164
+ handleErrorCode (nebula::cpp2::ErrorCode::E_INVALID_PARM);
165
+ onFinished ();
166
+ return ;
167
+ }
181
168
182
- if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
183
- LOG (ERROR) << " Create space failed" ;
184
- handleErrorCode (code);
185
- onFinished ();
186
- return ;
169
+ auto hostLoadingRet = getHostLoading ();
170
+ if (!nebula::ok (hostLoadingRet)) {
171
+ LOG (ERROR) << " Get host loading failed." ;
172
+ auto retCode = nebula::error (hostLoadingRet);
173
+ if (retCode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
174
+ retCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
187
175
}
176
+ handleErrorCode (retCode);
177
+ onFinished ();
178
+ return ;
179
+ }
188
180
189
- for (auto partId = 1 ; partId <= partitionNum; partId++) {
190
- auto pickedZonesRet = pickLightLoadZones (replicaFactor);
191
- if (!pickedZonesRet.ok ()) {
192
- LOG (ERROR) << " Pick zone failed." ;
193
- code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
194
- break ;
195
- }
196
-
197
- auto pickedZones = std::move (pickedZonesRet).value ();
198
- auto partHostsRet = pickHostsWithZone (pickedZones, zoneHosts);
199
- if (!partHostsRet.ok ()) {
200
- LOG (ERROR) << " Pick hosts with zone failed." ;
201
- code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
202
- break ;
181
+ hostLoading_ = std::move (nebula::value (hostLoadingRet));
182
+ std::unordered_map<std::string, Hosts> zoneHosts;
183
+ for (auto & zone : zones) {
184
+ auto zoneKey = MetaKeyUtils::zoneKey (zone);
185
+ auto zoneValueRet = doGet (std::move (zoneKey));
186
+ if (!nebula::ok (zoneValueRet)) {
187
+ code = nebula::error (zoneValueRet);
188
+ if (code == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
189
+ code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND;
203
190
}
191
+ LOG (ERROR) << " Get zone " << zone << " failed." ;
192
+ break ;
193
+ }
204
194
205
- auto partHosts = std::move (partHostsRet).value ();
206
- if (partHosts.empty ()) {
207
- LOG (ERROR) << " Pick hosts is empty." ;
208
- code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
209
- break ;
195
+ auto hosts = MetaKeyUtils::parseZoneHosts (std::move (nebula::value (zoneValueRet)));
196
+ for (auto & host : hosts) {
197
+ auto hostIter = hostLoading_.find (host);
198
+ if (hostIter == hostLoading_.end ()) {
199
+ hostLoading_[host] = 0 ;
200
+ zoneLoading_[zone] += 0 ;
201
+ } else {
202
+ zoneLoading_[zone] += hostIter->second ;
210
203
}
204
+ }
205
+ zoneHosts[zone] = std::move (hosts);
206
+ }
211
207
212
- std::stringstream ss;
213
- for (const auto & host : partHosts) {
214
- ss << host << " , " ;
215
- }
208
+ if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
209
+ LOG (ERROR) << " Create space failed" ;
210
+ handleErrorCode (code);
211
+ onFinished ();
212
+ return ;
213
+ }
216
214
217
- VLOG (3 ) << " Space " << spaceId << " part " << partId << " hosts " << ss.str ();
218
- data.emplace_back (MetaKeyUtils::partKey (spaceId, partId), MetaKeyUtils::partVal (partHosts));
219
- }
220
- } else {
221
- auto hostsRet = ActiveHostsMan::getActiveHosts (kvstore_);
222
- if (!nebula::ok (hostsRet)) {
223
- auto retCode = nebula::error (hostsRet);
224
- LOG (ERROR) << " Create Space Failed when get active host, error "
225
- << apache::thrift::util::enumNameSafe (retCode);
226
- handleErrorCode (retCode);
227
- onFinished ();
228
- return ;
215
+ for (auto partId = 1 ; partId <= partitionNum; partId++) {
216
+ auto pickedZonesRet = pickLightLoadZones (replicaFactor);
217
+ if (!pickedZonesRet.ok ()) {
218
+ LOG (ERROR) << " Pick zone failed." ;
219
+ code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
220
+ break ;
229
221
}
230
- auto hosts = std::move (nebula::value (hostsRet));
231
- if (hosts.empty ()) {
232
- LOG (ERROR) << " Create Space Failed : No Hosts!" ;
233
- handleErrorCode (nebula::cpp2::ErrorCode::E_NO_HOSTS);
234
- onFinished ();
235
- return ;
222
+
223
+ auto pickedZones = std::move (pickedZonesRet).value ();
224
+ auto partHostsRet = pickHostsWithZone (pickedZones, zoneHosts);
225
+ if (!partHostsRet.ok ()) {
226
+ LOG (ERROR) << " Pick hosts with zone failed." ;
227
+ code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
228
+ break ;
236
229
}
237
230
238
- if ((int32_t )hosts.size () < replicaFactor) {
239
- LOG (ERROR) << " Not enough hosts existed for replica " << replicaFactor << " , hosts num "
240
- << hosts.size ();
241
- handleErrorCode (nebula::cpp2::ErrorCode::E_INVALID_PARM);
242
- onFinished ();
243
- return ;
231
+ auto partHosts = std::move (partHostsRet).value ();
232
+ if (partHosts.empty ()) {
233
+ LOG (ERROR) << " Pick hosts is empty." ;
234
+ code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
235
+ break ;
244
236
}
245
237
246
- for ( auto partId = 1 ; partId <= partitionNum; partId++) {
247
- auto partHosts = pickHosts (partId, hosts, replicaFactor);
248
- data. emplace_back ( MetaKeyUtils::partKey (spaceId, partId), MetaKeyUtils::partVal (partHosts)) ;
238
+ std::stringstream ss;
239
+ for ( const auto & host : partHosts) {
240
+ ss << host << " , " ;
249
241
}
242
+
243
+ VLOG (3 ) << " Space " << spaceId << " part " << partId << " hosts " << ss.str ();
244
+ data.emplace_back (MetaKeyUtils::partKey (spaceId, partId), MetaKeyUtils::partVal (partHosts));
250
245
}
251
246
252
247
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
@@ -261,17 +256,6 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
261
256
LOG (INFO) << " Create space " << spaceName;
262
257
}
263
258
264
- Hosts CreateSpaceProcessor::pickHosts (PartitionID partId,
265
- const Hosts& hosts,
266
- int32_t replicaFactor) {
267
- auto startIndex = partId;
268
- Hosts pickedHosts;
269
- for (int32_t i = 0 ; i < replicaFactor; i++) {
270
- pickedHosts.emplace_back (toThriftHost (hosts[startIndex++ % hosts.size ()]));
271
- }
272
- return pickedHosts;
273
- }
274
-
275
259
ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<HostAddr, int32_t >>
276
260
CreateSpaceProcessor::getHostLoading () {
277
261
const auto & prefix = MetaKeyUtils::partPrefix ();
0 commit comments