forked from scylladb/scylla-rust-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathschema_agreement.rs
276 lines (246 loc) · 8.42 KB
/
schema_agreement.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
use std::sync::Arc;
use std::time::Duration;
use crate::ccm::cluster::{Cluster, ClusterOptions};
use crate::ccm::{run_ccm_test, CLUSTER_VERSION};
use crate::common::utils::{setup_tracing, unique_keyspace_name};
use scylla::client::execution_profile::ExecutionProfile;
use scylla::client::session::Session;
use scylla::cluster::{ClusterState, Node, NodeRef};
use scylla::policies::load_balancing::{FallbackPlan, LoadBalancingPolicy, RoutingInfo};
use scylla::query::Query;
use tokio::sync::Mutex;
use tracing::info;
use uuid::Uuid;
/// Creates a cluster configuration with 3 nodes for schema agreement tests.
fn cluster_3_nodes() -> ClusterOptions {
ClusterOptions {
name: "schema_agreement_test".to_string(),
version: CLUSTER_VERSION.clone(),
nodes: vec![3],
..ClusterOptions::default()
}
}
/// A load balancing policy that targets a single node.
#[derive(Debug)]
struct SingleTargetLBP {
target: (Arc<Node>, Option<u32>),
}
impl LoadBalancingPolicy for SingleTargetLBP {
fn pick<'a>(
&'a self,
_query: &'a RoutingInfo,
_cluster: &'a ClusterState,
) -> Option<(NodeRef<'a>, Option<u32>)> {
Some((&self.target.0, self.target.1))
}
fn fallback<'a>(
&'a self,
_query: &'a RoutingInfo,
_cluster: &'a ClusterState,
) -> FallbackPlan<'a> {
Box::new(std::iter::empty())
}
fn name(&self) -> String {
"SingleTargetLBP".to_owned()
}
}
/// Waits for schema agreement with a timeout and retries.
async fn wait_for_schema_agreement(
session: &Session,
timeout: Duration,
retries: u32,
) -> Result<Option<Uuid>, anyhow::Error> {
let retry_interval = Duration::from_millis(500);
let mut attempts = 0;
tokio::time::timeout(timeout, async {
loop {
match session.check_schema_agreement().await {
Ok(Some(agreement)) => return Ok(Some(agreement)),
Ok(None) => {
attempts += 1;
if attempts > retries {
return Err(anyhow::anyhow!(
"Schema agreement not reached after {} retries",
retries
));
}
info!(
"Schema agreement not yet reached, retrying ({}/{})",
attempts, retries
);
tokio::time::sleep(retry_interval).await;
}
Err(e) => return Err(anyhow::anyhow!("Failed to check schema agreement: {}", e)),
}
}
})
.await
.map_err(|_| anyhow::anyhow!("Schema agreement timed out after {:?}", timeout))?
}
/// Sets up a keyspace with a given replication factor.
async fn setup_keyspace(
session: &Session,
keyspace: &str,
replication_factor: u32,
) -> Result<(), anyhow::Error> {
let query = format!(
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : {}}}",
keyspace, replication_factor
);
session.query_unpaged(query, &[]).await?;
session.use_keyspace(keyspace, true).await?;
Ok(())
}
#[tokio::test]
#[cfg_attr(not(ccm_tests), ignore)]
async fn test_schema_agreement() {
setup_tracing();
run_ccm_test(cluster_3_nodes, test_schema_agreement_all_nodes).await;
run_ccm_test(cluster_3_nodes, test_schema_agreement_with_stopped_node).await;
run_ccm_test(cluster_3_nodes, test_schema_agreement_with_paused_node).await;
// TODO - multidc cases
}
/// Tests schema agreement with all nodes running.
async fn test_schema_agreement_all_nodes(cluster: Arc<Mutex<Cluster>>) {
let cluster = cluster.lock().await;
let session = cluster
.make_session_builder()
.await
.build()
.await
.expect("Failed to create session");
let keyspace = unique_keyspace_name();
setup_keyspace(&session, &keyspace, 3)
.await
.expect("Failed to setup keyspace");
info!("Creating table in test_schema_agreement_all_nodes");
session
.query_unpaged("CREATE TABLE test_table (k int primary key, v int)", &[])
.await
.expect("Failed to create table");
let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20)
.await
.expect("Schema agreement failed");
assert!(agreement.is_some(), "Schema agreement should be reached");
info!("Schema agreement achieved with all nodes");
}
/// Tests schema agreement with one node stopped.
async fn test_schema_agreement_with_stopped_node(cluster: Arc<Mutex<Cluster>>) {
let cluster = cluster.lock().await;
let session = cluster
.make_session_builder()
.await
.build()
.await
.expect("Failed to create session");
let keyspace = unique_keyspace_name();
setup_keyspace(&session, &keyspace, 3)
.await
.expect("Failed to setup keyspace");
let node = cluster
.nodes()
.get_by_id(2)
.await
.expect("Failed to get node 2");
info!("Stopping node 2");
node.write()
.await
.stop(None)
.await
.expect("Failed to stop node");
info!("Creating table with one node stopped");
session
.query_unpaged("CREATE TABLE test_table (k int primary key, v int)", &[])
.await
.expect("Failed to create table");
let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20)
.await
.expect("Schema agreement failed with stopped node");
assert!(
agreement.is_some(),
"Schema agreement should be reached with remaining nodes"
);
info!("Restarting node 2");
node.write()
.await
.start(None)
.await
.expect("Failed to restart node");
let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20)
.await
.expect("Schema agreement failed after restart");
assert!(
agreement.is_some(),
"Schema agreement should be reached after node restart"
);
info!("Schema agreement achieved after node restart");
}
/// Tests schema agreement with one node paused.
async fn test_schema_agreement_with_paused_node(cluster: Arc<Mutex<Cluster>>) {
let cluster = cluster.lock().await;
let session = cluster
.make_session_builder()
.await
.build()
.await
.expect("Failed to create session");
let keyspace = unique_keyspace_name();
setup_keyspace(&session, &keyspace, 3)
.await
.expect("Failed to setup keyspace");
let node_id = 2;
let ccm_node = cluster
.nodes()
.get_by_id(node_id)
.await
.expect("Failed to get node 2");
let ccm_node_addr = ccm_node.read().await.broadcast_rpc_address().clone();
info!("Pausing node 2");
ccm_node
.write()
.await
.pause()
.await
.expect("Failed to pause node");
let cluster_state = session.get_cluster_state();
let running_scylla_node = cluster_state
.get_nodes_info()
.iter()
.find(|n| n.address.ip() != ccm_node_addr)
.expect("Could not find unpaused Scylla node");
let policy = SingleTargetLBP {
target: (running_scylla_node.clone(), Some(0)),
};
let execution_profile = ExecutionProfile::builder()
.load_balancing_policy(Arc::new(policy))
.build();
let mut stmt = Query::new("CREATE TABLE test_table (k int primary key, v int)");
stmt.set_execution_profile_handle(Some(execution_profile.into_handle()));
info!("Creating table with one node paused");
session
.query_unpaged(stmt, &[])
.await
.expect("Failed to create table");
let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20)
.await
.expect("Schema agreement failed with paused node");
assert!(
agreement.is_some(),
"Schema agreement should be reached with remaining nodes"
);
info!("Resuming node 2");
ccm_node
.write()
.await
.resume()
.await
.expect("Failed to resume node");
let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20)
.await
.expect("Schema agreement failed after resume");
assert!(
agreement.is_some(),
"Schema agreement should be reached after node resume"
);
info!("Schema agreement achieved after node resume");
}