|
1 | 1 | package com.redis.lettucemod;
|
2 | 2 |
|
3 |
| -import static org.junit.jupiter.api.Assertions.assertEquals; |
4 |
| -import static org.junit.jupiter.api.Assertions.assertFalse; |
5 |
| -import static org.junit.jupiter.api.Assertions.assertTrue; |
6 |
| - |
7 |
| -import java.util.List; |
8 |
| -import java.util.Map; |
9 |
| - |
10 |
| -import org.awaitility.Awaitility; |
11 |
| -import org.junit.jupiter.api.Assertions; |
12 |
| -import org.junit.jupiter.api.Disabled; |
13 | 3 | import org.junit.jupiter.api.Test;
|
14 | 4 | import org.junit.jupiter.api.condition.EnabledOnOs;
|
15 | 5 | import org.junit.jupiter.api.condition.OS;
|
16 |
| -import org.slf4j.Logger; |
17 |
| -import org.slf4j.LoggerFactory; |
18 | 6 | import org.springframework.util.unit.DataSize;
|
19 | 7 |
|
20 | 8 | import com.redis.enterprise.Database;
|
21 | 9 | import com.redis.enterprise.RedisModule;
|
22 |
| -import com.redis.lettucemod.api.sync.RedisGearsCommands; |
23 |
| -import com.redis.lettucemod.api.sync.RedisModulesCommands; |
24 | 10 | import com.redis.lettucemod.cluster.RedisModulesClusterClient;
|
25 | 11 | import com.redis.lettucemod.cluster.api.StatefulRedisModulesClusterConnection;
|
26 |
| -import com.redis.lettucemod.gears.Execution; |
27 |
| -import com.redis.lettucemod.gears.ExecutionDetails; |
28 |
| -import com.redis.lettucemod.gears.Registration; |
29 |
| -import com.redis.lettucemod.output.ExecutionResults; |
30 |
| -import com.redis.lettucemod.util.RedisModulesUtils; |
| 12 | +import com.redis.testcontainers.AbstractRedisContainer; |
31 | 13 | import com.redis.testcontainers.RedisEnterpriseContainer;
|
32 |
| -import com.redis.testcontainers.RedisServer; |
33 | 14 |
|
34 |
| -import io.lettuce.core.RedisCommandExecutionException; |
35 | 15 | import io.lettuce.core.RedisURI;
|
36 | 16 | import io.lettuce.core.codec.StringCodec;
|
37 | 17 | import io.lettuce.core.resource.DefaultClientResources;
|
38 | 18 |
|
39 | 19 | @EnabledOnOs(OS.LINUX)
|
40 | 20 | class EnterpriseTests extends ModulesTests {
|
41 | 21 |
|
42 |
| - private static final Logger log = LoggerFactory.getLogger(EnterpriseTests.class); |
43 |
| - |
44 |
| - private final RedisEnterpriseContainer container = new RedisEnterpriseContainer( |
45 |
| - RedisEnterpriseContainer.DEFAULT_IMAGE_NAME.withTag("latest")) |
46 |
| - .withDatabase(Database.name("ModulesTests").memory(DataSize.ofMegabytes(110)).ossCluster(true) |
47 |
| - .modules(RedisModule.SEARCH, RedisModule.JSON, RedisModule.TIMESERIES).build()); |
48 |
| - |
49 |
| - @Override |
50 |
| - protected RedisServer getRedisServer() { |
51 |
| - return container; |
52 |
| - } |
53 |
| - |
54 |
| - @Test |
55 |
| - void client() { |
56 |
| - try (RedisModulesClusterClient client = RedisModulesClusterClient.create(container.getRedisURI()); |
57 |
| - StatefulRedisModulesClusterConnection<String, String> connection = client.connect();) { |
58 |
| - assertPing(connection); |
59 |
| - } |
60 |
| - DefaultClientResources resources = DefaultClientResources.create(); |
61 |
| - try (RedisModulesClusterClient client = RedisModulesClusterClient.create(resources, |
62 |
| - RedisURI.create(container.getRedisURI())); |
63 |
| - StatefulRedisModulesClusterConnection<String, String> connection = client.connect();) { |
64 |
| - assertPing(connection); |
65 |
| - } |
66 |
| - resources.shutdown(); |
67 |
| - try (RedisModulesClusterClient client = RedisModulesClusterClient.create(container.getRedisURI()); |
68 |
| - StatefulRedisModulesClusterConnection<String, String> connection = client.connect(StringCodec.UTF8);) { |
69 |
| - assertPing(connection); |
70 |
| - } |
71 |
| - } |
72 |
| - |
73 |
| - @Test |
74 |
| - @Disabled("Gears") |
75 |
| - void rgPyExecute() { |
76 |
| - RedisModulesCommands<String, String> sync = connection.sync(); |
77 |
| - sync.set("foo", "bar"); |
78 |
| - ExecutionResults results = pyExecute(sync, "sleep.py"); |
79 |
| - assertEquals("1", results.getResults().get(0)); |
80 |
| - } |
81 |
| - |
82 |
| - @Test |
83 |
| - @Disabled("Gears") |
84 |
| - void rgPyExecuteUnblocking() { |
85 |
| - RedisModulesCommands<String, String> sync = connection.sync(); |
86 |
| - sync.set("foo", "bar"); |
87 |
| - String executionId = pyExecuteUnblocking(sync, "sleep.py"); |
88 |
| - String[] array = executionId.split("-"); |
89 |
| - assertEquals(2, array.length); |
90 |
| - assertEquals(40, array[0].length()); |
91 |
| - Assertions.assertTrue(Integer.parseInt(array[1]) >= 0); |
92 |
| - } |
93 |
| - |
94 |
| - private ExecutionResults pyExecute(RedisGearsCommands<String, String> sync, String resourceName) { |
95 |
| - return sync.rgPyexecute(load(resourceName)); |
96 |
| - } |
97 |
| - |
98 |
| - private String pyExecuteUnblocking(RedisGearsCommands<String, String> sync, String resourceName) { |
99 |
| - return sync.rgPyexecuteUnblocking(load(resourceName)); |
100 |
| - } |
101 |
| - |
102 |
| - private String load(String resourceName) { |
103 |
| - return RedisModulesUtils.toString(getClass().getClassLoader().getResourceAsStream(resourceName)); |
104 |
| - } |
105 |
| - |
106 |
| - private void rgClear() throws InterruptedException { |
107 |
| - RedisModulesCommands<String, String> sync = connection.sync(); |
108 |
| - // Unregister all registrations |
109 |
| - for (Registration registration : sync.rgDumpregistrations()) { |
110 |
| - log.info("Unregistering {}", registration.getId()); |
111 |
| - sync.rgUnregister(registration.getId()); |
112 |
| - } |
113 |
| - // Drop all executions |
114 |
| - for (Execution execution : sync.rgDumpexecutions()) { |
115 |
| - if (execution.getStatus().matches("running|created")) { |
116 |
| - log.info("Aborting execution {} with status {}", execution.getId(), execution.getStatus()); |
117 |
| - sync.rgAbortexecution(execution.getId()); |
118 |
| - } |
119 |
| - try { |
120 |
| - sync.rgDropexecution(execution.getId()); |
121 |
| - } catch (RedisCommandExecutionException e) { |
122 |
| - log.info("Execution status: {}", execution.getStatus()); |
123 |
| - throw e; |
124 |
| - } |
125 |
| - } |
126 |
| - } |
127 |
| - |
128 |
| - @Test |
129 |
| - @Disabled("This test is not passing at the moment") |
130 |
| - void rgDumpRegistrations() throws InterruptedException { |
131 |
| - rgClear(); |
132 |
| - RedisModulesCommands<String, String> sync = connection.sync(); |
133 |
| - // Single registration |
134 |
| - assertEquals(0, sync.rgDumpregistrations().size()); |
135 |
| - ExecutionResults results = pyExecute(sync, "streamreader.py"); |
136 |
| - Assertions.assertFalse(results.isError()); |
137 |
| - Awaitility.await().until(() -> sync.rgDumpregistrations().size() == 1); |
138 |
| - Registration registration = sync.rgDumpregistrations().get(0); |
139 |
| - assertEquals("StreamReader", registration.getReader()); |
140 |
| - assertEquals("MyStreamReader", registration.getDescription()); |
141 |
| - assertEquals("async", registration.getData().getMode()); |
142 |
| - Map<String, Object> args = registration.getData().getArgs(); |
143 |
| - assertTrue(args.size() >= 3); |
144 |
| - assertEquals(1L, args.get("batchSize")); |
145 |
| - assertEquals("mystream", args.get("stream")); |
146 |
| - assertEquals("OK", registration.getData().getStatus()); |
147 |
| - |
148 |
| - // Multiple registrations |
149 |
| - sync.rgDumpregistrations().forEach(r -> sync.rgUnregister(r.getId())); |
150 |
| - String function = "GB('KeysReader').register('*', keyTypes=['hash'])"; |
151 |
| - Assertions.assertTrue(sync.rgPyexecute(function).isOk()); |
152 |
| - Assertions.assertEquals(1, sync.rgDumpregistrations().size()); |
153 |
| - } |
154 |
| - |
155 |
| - @Test |
156 |
| - @Disabled("Gears") |
157 |
| - void rgPyExecuteResults() { |
158 |
| - RedisModulesCommands<String, String> sync = connection.sync(); |
159 |
| - sync.set("foo", "bar"); |
160 |
| - ExecutionResults results = sync.rgPyexecute("GB().foreach(lambda x: log('test')).register()"); |
161 |
| - Assertions.assertTrue(results.isOk()); |
162 |
| - Assertions.assertFalse(results.isError()); |
163 |
| - } |
164 |
| - |
165 |
| - private void rgExecutions() { |
166 |
| - RedisModulesCommands<String, String> sync = connection.sync(); |
167 |
| - sync.set("foo", "bar"); |
168 |
| - pyExecuteUnblocking(sync, "sleep.py"); |
169 |
| - } |
170 |
| - |
171 |
| - @Test |
172 |
| - @Disabled("Gears") |
173 |
| - void rgDumpExecutions() throws InterruptedException { |
174 |
| - rgClear(); |
175 |
| - rgExecutions(); |
176 |
| - assertFalse(connection.sync().rgDumpexecutions().isEmpty()); |
177 |
| - } |
178 |
| - |
179 |
| - @Test |
180 |
| - @Disabled("Flaky test") |
181 |
| - void rgDropExecution() throws InterruptedException { |
182 |
| - rgClear(); |
183 |
| - rgExecutions(); |
184 |
| - RedisModulesCommands<String, String> sync = connection.sync(); |
185 |
| - List<Execution> executions = sync.rgDumpexecutions(); |
186 |
| - executions.forEach(e -> sync.rgAbortexecution(e.getId())); |
187 |
| - executions.forEach(e -> sync.rgDropexecution(e.getId())); |
188 |
| - assertEquals(0, sync.rgDumpexecutions().size()); |
189 |
| - } |
190 |
| - |
191 |
| - @Test |
192 |
| - @Disabled("Gears") |
193 |
| - void rgAbortExecution() throws InterruptedException { |
194 |
| - rgClear(); |
195 |
| - rgExecutions(); |
196 |
| - RedisModulesCommands<String, String> sync = connection.sync(); |
197 |
| - for (Execution execution : sync.rgDumpexecutions()) { |
198 |
| - sync.rgAbortexecution(execution.getId()); |
199 |
| - ExecutionDetails details = sync.rgGetexecution(execution.getId()); |
200 |
| - Assertions.assertTrue(details.getPlan().getStatus().matches("done|aborted")); |
201 |
| - } |
202 |
| - } |
| 22 | + private final RedisEnterpriseContainer container = new RedisEnterpriseContainer( |
| 23 | + RedisEnterpriseContainer.DEFAULT_IMAGE_NAME.withTag("latest")) |
| 24 | + .withDatabase(Database.name("ModulesTests").memory(DataSize.ofMegabytes(110)).ossCluster(true) |
| 25 | + .modules(RedisModule.SEARCH, RedisModule.JSON, RedisModule.TIMESERIES).build()); |
| 26 | + |
| 27 | + @Override |
| 28 | + protected AbstractRedisContainer<?> getRedisContainer() { |
| 29 | + return container; |
| 30 | + } |
| 31 | + |
| 32 | + @Test |
| 33 | + void client() { |
| 34 | + try (RedisModulesClusterClient client = RedisModulesClusterClient.create(container.getRedisURI()); |
| 35 | + StatefulRedisModulesClusterConnection<String, String> connection = client.connect();) { |
| 36 | + assertPing(connection); |
| 37 | + } |
| 38 | + DefaultClientResources resources = DefaultClientResources.create(); |
| 39 | + try (RedisModulesClusterClient client = RedisModulesClusterClient.create(resources, |
| 40 | + RedisURI.create(container.getRedisURI())); |
| 41 | + StatefulRedisModulesClusterConnection<String, String> connection = client.connect();) { |
| 42 | + assertPing(connection); |
| 43 | + } |
| 44 | + resources.shutdown(); |
| 45 | + try (RedisModulesClusterClient client = RedisModulesClusterClient.create(container.getRedisURI()); |
| 46 | + StatefulRedisModulesClusterConnection<String, String> connection = client.connect(StringCodec.UTF8);) { |
| 47 | + assertPing(connection); |
| 48 | + } |
| 49 | + } |
203 | 50 |
|
204 | 51 | }
|
0 commit comments