@@ -9,6 +9,8 @@ namespace nebula {
9
9
namespace meta {
10
10
11
11
void DropHostsProcessor::process (const cpp2::DropHostsReq& req) {
12
+ folly::SharedMutex::ReadHolder zHolder (LockUtils::zoneLock ());
13
+ folly::SharedMutex::ReadHolder mHolder (LockUtils::machineLock ());
12
14
auto hosts = req.get_hosts ();
13
15
if (std::unique (hosts.begin (), hosts.end ()) != hosts.end ()) {
14
16
LOG (ERROR) << " Hosts have duplicated element" ;
@@ -25,6 +27,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
25
27
}
26
28
27
29
std::vector<std::string> data;
30
+ std::vector<kvstore::KV> rewriteData;
28
31
// Check that partition is not held on the host
29
32
const auto & spacePrefix = MetaKeyUtils::spacePrefix ();
30
33
auto spaceIterRet = doPrefix (spacePrefix);
@@ -77,21 +80,49 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
77
80
78
81
auto iter = nebula::value (iterRet).get ();
79
82
while (iter->valid ()) {
83
+ auto zoneKey = iter->key ();
80
84
auto hs = MetaKeyUtils::parseZoneHosts (iter->val ());
85
+ // Delete all hosts in the zone
81
86
if (std::all_of (hs.begin (), hs.end (), [&hosts](auto & h) {
82
87
return std::find (hosts.begin (), hosts.end (), h) != hosts.end ();
83
88
})) {
84
- auto zoneName = MetaKeyUtils::parseZoneName (iter-> key () );
89
+ auto zoneName = MetaKeyUtils::parseZoneName (zoneKey );
85
90
LOG (INFO) << " Drop zone " << zoneName;
86
- code = checkRelatedSpace (zoneName);
87
- if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
91
+ auto result = checkRelatedSpaceAndCollect (zoneName);
92
+ if (!nebula::ok (result)) {
93
+ LOG (ERROR) << " Check related space failed" ;
94
+ code = nebula::error (result);
88
95
break ;
89
96
}
90
- data.emplace_back (iter->key ());
97
+
98
+ const auto & rewrites = nebula::value (result);
99
+ rewriteData.insert (rewriteData.end (), rewrites.begin (), rewrites.end ());
100
+ data.emplace_back (zoneKey);
101
+ } else {
102
+ // Delete some hosts in the zone
103
+ for (auto & h : hosts) {
104
+ auto it = std::find (hs.begin (), hs.end (), h);
105
+ if (it != hs.end ()) {
106
+ hs.erase (it);
107
+ }
108
+ }
109
+
110
+ auto zoneValue = MetaKeyUtils::zoneVal (hs);
111
+ LOG (INFO) << " Zone Value: " << zoneValue;
112
+ rewriteData.emplace_back (std::move (zoneKey), std::move (zoneValue));
113
+ }
114
+ if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
115
+ break ;
91
116
}
92
117
iter->next ();
93
118
}
94
119
120
+ if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
121
+ handleErrorCode (code);
122
+ onFinished ();
123
+ return ;
124
+ }
125
+
95
126
// Detach the machine from cluster
96
127
for (auto & host : hosts) {
97
128
auto machineKey = MetaKeyUtils::machineKey (host.host , host.port );
@@ -111,10 +142,21 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
111
142
}
112
143
113
144
resp_.set_code (nebula::cpp2::ErrorCode::SUCCEEDED);
114
- doMultiRemove (std::move (data));
145
+ folly::Baton<true , std::atomic> baton;
146
+ kvstore_->asyncMultiRemove (kDefaultSpaceId ,
147
+ kDefaultPartId ,
148
+ std::move (data),
149
+ [this , &baton](nebula::cpp2::ErrorCode result) {
150
+ this ->handleErrorCode (result);
151
+ baton.post ();
152
+ });
153
+ baton.wait ();
154
+ doSyncPutAndUpdate (std::move (rewriteData));
115
155
}
116
156
117
- nebula::cpp2::ErrorCode DropHostsProcessor::checkRelatedSpace (const std::string& zoneName) {
157
+ ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>>
158
+ DropHostsProcessor::checkRelatedSpaceAndCollect (const std::string& zoneName) {
159
+ std::vector<kvstore::KV> data;
118
160
const auto & prefix = MetaKeyUtils::spacePrefix ();
119
161
auto ret = doPrefix (prefix);
120
162
if (!nebula::ok (ret)) {
@@ -137,19 +179,15 @@ nebula::cpp2::ErrorCode DropHostsProcessor::checkRelatedSpace(const std::string&
137
179
} else {
138
180
zones.erase (it);
139
181
properties.set_zone_names (zones);
140
- rewriteSpaceProperties (iter->key ().data (), properties);
182
+
183
+ auto spaceKey = iter->key ().data ();
184
+ auto spaceVal = MetaKeyUtils::spaceVal (properties);
185
+ data.emplace_back (std::move (spaceKey), std::move (spaceVal));
141
186
}
142
187
}
143
188
iter->next ();
144
189
}
145
- return nebula::cpp2::ErrorCode::SUCCEEDED;
146
- }
147
-
148
- void DropHostsProcessor::rewriteSpaceProperties (const std::string& spaceKey,
149
- const meta::cpp2::SpaceDesc& properties) {
150
- auto spaceVal = MetaKeyUtils::spaceVal (properties);
151
- std::vector<kvstore::KV> data = {{spaceKey, spaceVal}};
152
- doSyncPutAndUpdate (std::move (data));
190
+ return data;
153
191
}
154
192
155
193
} // namespace meta
0 commit comments