Skip to content

Commit 94edf37

Browse files
himshikhaBukhtawar
authored andcommitted
Add cluster state checksum in manifest (opensearch-project#15218)
* Add cluster state checksum in manifest for remote state and routing table publication Signed-off-by: Himshikha Gupta <[email protected]> Co-authored-by: Bukhtawar Khan <[email protected]>
1 parent a21d6f9 commit 94edf37

38 files changed

+2085
-61
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4646
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
4747
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))
4848
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
49+
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
4950

5051
### Dependencies
5152
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

libs/core/src/main/java/org/opensearch/core/common/io/stream/BufferedChecksumStreamOutput.java

+80
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,18 @@
3333
package org.opensearch.core.common.io.stream;
3434

3535
import org.apache.lucene.store.BufferedChecksum;
36+
import org.opensearch.common.Nullable;
3637
import org.opensearch.common.annotation.PublicApi;
3738

3839
import java.io.IOException;
40+
import java.util.ArrayList;
41+
import java.util.Arrays;
42+
import java.util.Collection;
43+
import java.util.Collections;
44+
import java.util.List;
45+
import java.util.Map;
46+
import java.util.TreeMap;
47+
import java.util.stream.Collectors;
3948
import java.util.zip.CRC32;
4049
import java.util.zip.Checksum;
4150

@@ -90,4 +99,75 @@ public void reset() throws IOException {
9099
public void resetDigest() {
91100
digest.reset();
92101
}
102+
103+
@Override
104+
public void writeMap(@Nullable Map<String, Object> map) throws IOException {
105+
Map<String, Object> newMap = new TreeMap<>(map);
106+
writeGenericValue(newMap);
107+
}
108+
109+
@Override
110+
public <K, V> void writeMap(Map<K, V> map, final Writeable.Writer<K> keyWriter, final Writeable.Writer<V> valueWriter)
111+
throws IOException {
112+
writeVInt(map.size());
113+
map.keySet().stream().sorted().forEachOrdered(key -> {
114+
try {
115+
keyWriter.write(this, key);
116+
valueWriter.write(this, map.get(key));
117+
} catch (IOException e) {
118+
throw new RuntimeException("Failed to write map values.", e);
119+
}
120+
});
121+
}
122+
123+
public <K, V> void writeMapValues(Map<K, V> map, final Writeable.Writer<V> valueWriter) throws IOException {
124+
writeVInt(map.size());
125+
map.keySet().stream().sorted().forEachOrdered(key -> {
126+
try {
127+
valueWriter.write(this, map.get(key));
128+
} catch (IOException e) {
129+
throw new RuntimeException("Failed to write map values.", e);
130+
}
131+
});
132+
}
133+
134+
@Override
135+
public void writeStringArray(String[] array) throws IOException {
136+
String[] copyArray = Arrays.copyOf(array, array.length);
137+
Arrays.sort(copyArray);
138+
super.writeStringArray(copyArray);
139+
}
140+
141+
@Override
142+
public void writeVLongArray(long[] values) throws IOException {
143+
long[] copyValues = Arrays.copyOf(values, values.length);
144+
Arrays.sort(copyValues);
145+
super.writeVLongArray(copyValues);
146+
}
147+
148+
@Override
149+
public void writeCollection(final Collection<? extends Writeable> collection) throws IOException {
150+
List<? extends Writeable> sortedList = collection.stream().sorted().collect(Collectors.toList());
151+
super.writeCollection(sortedList, (o, v) -> v.writeTo(o));
152+
}
153+
154+
@Override
155+
public void writeStringCollection(final Collection<String> collection) throws IOException {
156+
List<String> listCollection = new ArrayList<>(collection);
157+
Collections.sort(listCollection);
158+
writeCollection(listCollection, StreamOutput::writeString);
159+
}
160+
161+
@Override
162+
public void writeOptionalStringCollection(final Collection<String> collection) throws IOException {
163+
if (collection != null) {
164+
List<String> listCollection = new ArrayList<>(collection);
165+
Collections.sort(listCollection);
166+
writeBoolean(true);
167+
writeCollection(listCollection, StreamOutput::writeString);
168+
} else {
169+
writeBoolean(false);
170+
}
171+
}
172+
93173
}

libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ public final <K, V> void writeMapOfLists(final Map<K, List<V>> map, final Writer
633633
* @param keyWriter The key writer
634634
* @param valueWriter The value writer
635635
*/
636-
public final <K, V> void writeMap(final Map<K, V> map, final Writer<K> keyWriter, final Writer<V> valueWriter) throws IOException {
636+
public <K, V> void writeMap(final Map<K, V> map, final Writer<K> keyWriter, final Writer<V> valueWriter) throws IOException {
637637
writeVInt(map.size());
638638
for (final Map.Entry<K, V> entry : map.entrySet()) {
639639
keyWriter.write(this, entry.getKey());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.core.common.io.stream;
10+
11+
import java.io.IOException;
12+
13+
/**
14+
* Provides a method for serialization which will give ordered stream, creating same byte array on every invocation.
15+
* This should be invoked with a stream that provides ordered serialization.
16+
*/
17+
public interface VerifiableWriteable extends Writeable {
18+
19+
void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException;
20+
}

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
6767
)
6868
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REMOTE_ROUTING_TABLE_REPO)
6969
.put(REMOTE_PUBLICATION_EXPERIMENTAL, true)
70+
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true)
7071
.build();
7172
}
7273

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
9090
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
9191
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
9292
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
93+
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true)
9394
.build();
9495
}
9596

server/src/main/java/org/opensearch/cluster/AbstractDiffable.java

+5
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ private static class CompleteDiff<T extends Diffable<T>> implements Diff<T> {
8383
this.part = part;
8484
}
8585

86+
@Override
87+
public String toString() {
88+
return "CompleteDiff{" + "part=" + part + '}';
89+
}
90+
8691
/**
8792
* Creates simple diff without changes
8893
*/

server/src/main/java/org/opensearch/cluster/ClusterState.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ default boolean isPrivate() {
156156

157157
}
158158

159-
private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
159+
public static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
160160

161161
public static final String UNKNOWN_UUID = "_na_";
162162

@@ -839,6 +839,34 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
839839
minimumClusterManagerNodesOnPublishingClusterManager = after.minimumClusterManagerNodesOnPublishingClusterManager;
840840
}
841841

842+
@Override
843+
public String toString() {
844+
return new StringBuilder().append("ClusterStateDiff{toVersion=")
845+
.append(toVersion)
846+
.append(", fromUuid='")
847+
.append(fromUuid)
848+
.append('\'')
849+
.append(", toUuid='")
850+
.append(toUuid)
851+
.append('\'')
852+
.append(", clusterName=")
853+
.append(clusterName)
854+
.append(", routingTable=")
855+
.append(routingTable)
856+
.append(", nodes=")
857+
.append(nodes)
858+
.append(", metadata=")
859+
.append(metadata)
860+
.append(", blocks=")
861+
.append(blocks)
862+
.append(", customs=")
863+
.append(customs)
864+
.append(", minimumClusterManagerNodesOnPublishingClusterManager=")
865+
.append(minimumClusterManagerNodesOnPublishingClusterManager)
866+
.append("}")
867+
.toString();
868+
}
869+
842870
ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException {
843871
clusterName = new ClusterName(in);
844872
fromUuid = in.readString();

server/src/main/java/org/opensearch/cluster/DiffableUtils.java

+12
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,18 @@ public Map<K, T> getUpserts() {
271271
return upserts;
272272
}
273273

274+
@Override
275+
public String toString() {
276+
return new StringBuilder().append("MapDiff{deletes=")
277+
.append(deletes)
278+
.append(", diffs=")
279+
.append(diffs)
280+
.append(", upserts=")
281+
.append(upserts)
282+
.append("}")
283+
.toString();
284+
}
285+
274286
@Override
275287
public void writeTo(StreamOutput out) throws IOException {
276288
out.writeCollection(deletes, (o, v) -> keySerializer.writeKey(v, o));

server/src/main/java/org/opensearch/cluster/block/ClusterBlock.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
* @opensearch.api
5353
*/
5454
@PublicApi(since = "1.0.0")
55-
public class ClusterBlock implements Writeable, ToXContentFragment {
55+
public class ClusterBlock implements Writeable, ToXContentFragment, Comparable<ClusterBlock> {
5656

5757
private final int id;
5858
@Nullable
@@ -217,7 +217,13 @@ public int hashCode() {
217217
return Objects.hash(id, uuid);
218218
}
219219

220+
@Override
221+
public int compareTo(ClusterBlock block) {
222+
return Integer.compare(block.id(), this.id());
223+
}
224+
220225
public boolean isAllowReleaseResources() {
221226
return allowReleaseResources;
222227
}
228+
223229
}

server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,10 @@
3939
import org.opensearch.common.Nullable;
4040
import org.opensearch.common.annotation.PublicApi;
4141
import org.opensearch.common.util.set.Sets;
42+
import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput;
4243
import org.opensearch.core.common.io.stream.StreamInput;
4344
import org.opensearch.core.common.io.stream.StreamOutput;
45+
import org.opensearch.core.common.io.stream.VerifiableWriteable;
4446
import org.opensearch.core.rest.RestStatus;
4547

4648
import java.io.IOException;
@@ -62,7 +64,7 @@
6264
* @opensearch.api
6365
*/
6466
@PublicApi(since = "1.0.0")
65-
public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
67+
public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> implements VerifiableWriteable {
6668
public static final ClusterBlocks EMPTY_CLUSTER_BLOCK = new ClusterBlocks(emptySet(), Map.of());
6769

6870
private final Set<ClusterBlock> global;
@@ -303,6 +305,11 @@ public void writeTo(StreamOutput out) throws IOException {
303305
out.writeMap(indicesBlocks, StreamOutput::writeString, (o, s) -> writeBlockSet(s, o));
304306
}
305307

308+
@Override
309+
public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException {
310+
writeTo(out);
311+
}
312+
306313
private static void writeBlockSet(Set<ClusterBlock> blocks, StreamOutput out) throws IOException {
307314
out.writeCollection(blocks);
308315
}

server/src/main/java/org/opensearch/cluster/coordination/CoordinationMetadata.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
import org.opensearch.common.annotation.PublicApi;
3636
import org.opensearch.common.util.set.Sets;
3737
import org.opensearch.core.ParseField;
38+
import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput;
3839
import org.opensearch.core.common.io.stream.StreamInput;
3940
import org.opensearch.core.common.io.stream.StreamOutput;
41+
import org.opensearch.core.common.io.stream.VerifiableWriteable;
4042
import org.opensearch.core.common.io.stream.Writeable;
4143
import org.opensearch.core.xcontent.ConstructingObjectParser;
4244
import org.opensearch.core.xcontent.ToXContentFragment;
@@ -59,7 +61,7 @@
5961
* @opensearch.api
6062
*/
6163
@PublicApi(since = "1.0.0")
62-
public class CoordinationMetadata implements Writeable, ToXContentFragment {
64+
public class CoordinationMetadata implements VerifiableWriteable, ToXContentFragment {
6365

6466
public static final CoordinationMetadata EMPTY_METADATA = builder().build();
6567

@@ -149,6 +151,11 @@ public void writeTo(StreamOutput out) throws IOException {
149151
out.writeCollection(votingConfigExclusions);
150152
}
151153

154+
@Override
155+
public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException {
156+
writeTo(out);
157+
}
158+
152159
@Override
153160
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
154161
return builder.field(TERM_PARSE_FIELD.getPreferredName(), term)
@@ -272,7 +279,7 @@ public CoordinationMetadata build() {
272279
* @opensearch.api
273280
*/
274281
@PublicApi(since = "1.0.0")
275-
public static class VotingConfigExclusion implements Writeable, ToXContentFragment {
282+
public static class VotingConfigExclusion implements Writeable, ToXContentFragment, Comparable<VotingConfigExclusion> {
276283
public static final String MISSING_VALUE_MARKER = "_absent_";
277284
private final String nodeId;
278285
private final String nodeName;
@@ -361,6 +368,10 @@ public String toString() {
361368
return sb.toString();
362369
}
363370

371+
@Override
372+
public int compareTo(VotingConfigExclusion votingConfigExclusion) {
373+
return votingConfigExclusion.getNodeId().compareTo(this.getNodeId());
374+
}
364375
}
365376

366377
/**

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,10 @@
5454
import org.opensearch.common.xcontent.XContentHelper;
5555
import org.opensearch.core.Assertions;
5656
import org.opensearch.core.common.Strings;
57+
import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput;
5758
import org.opensearch.core.common.io.stream.StreamInput;
5859
import org.opensearch.core.common.io.stream.StreamOutput;
60+
import org.opensearch.core.common.io.stream.VerifiableWriteable;
5961
import org.opensearch.core.common.io.stream.Writeable;
6062
import org.opensearch.core.index.Index;
6163
import org.opensearch.core.index.shard.ShardId;
@@ -88,6 +90,7 @@
8890
import java.util.Map;
8991
import java.util.Objects;
9092
import java.util.Set;
93+
import java.util.TreeSet;
9194
import java.util.function.Function;
9295

9396
import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM;
@@ -103,7 +106,7 @@
103106
* @opensearch.api
104107
*/
105108
@PublicApi(since = "1.0.0")
106-
public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragment {
109+
public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragment, VerifiableWriteable {
107110

108111
public static final ClusterBlock INDEX_READ_ONLY_BLOCK = new ClusterBlock(
109112
5,
@@ -1264,6 +1267,32 @@ public void writeTo(StreamOutput out) throws IOException {
12641267
}
12651268
}
12661269

1270+
@Override
1271+
public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException {
1272+
out.writeString(index.getName()); // uuid will come as part of settings
1273+
out.writeLong(version);
1274+
out.writeVLong(mappingVersion);
1275+
out.writeVLong(settingsVersion);
1276+
out.writeVLong(aliasesVersion);
1277+
out.writeInt(routingNumShards);
1278+
out.writeByte(state.id());
1279+
writeSettingsToStream(settings, out);
1280+
out.writeVLongArray(primaryTerms);
1281+
out.writeMapValues(mappings, (stream, val) -> val.writeTo(stream));
1282+
out.writeMapValues(aliases, (stream, val) -> val.writeTo(stream));
1283+
out.writeMap(customData, StreamOutput::writeString, (stream, val) -> val.writeTo(stream));
1284+
out.writeMap(
1285+
inSyncAllocationIds,
1286+
StreamOutput::writeVInt,
1287+
(stream, val) -> DiffableUtils.StringSetValueSerializer.getInstance().write(new TreeSet<>(val), stream)
1288+
);
1289+
out.writeMapValues(rolloverInfos, (stream, val) -> val.writeTo(stream));
1290+
out.writeBoolean(isSystem);
1291+
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
1292+
out.writeOptionalWriteable(context);
1293+
}
1294+
}
1295+
12671296
public boolean isSystem() {
12681297
return isSystem;
12691298
}

0 commit comments

Comments
 (0)