Skip to content

Commit 79cf24f

Browse files
dcotfrgsmet
authored andcommitted
Resolves 29411 & 37691 issues
(cherry picked from commit 7b7b8d9)
1 parent a2641e8 commit 79cf24f

File tree

2 files changed

+72
-62
lines changed

2 files changed

+72
-62
lines changed

extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java

+34-24
Original file line numberDiff line numberDiff line change
@@ -18,59 +18,69 @@ final class TopologyParserContext {
1818
final Mermaid mermaid = new Mermaid();
1919

2020
void addSubTopology(String subTopology) {
21-
subTopologies.add(subTopology);
22-
graphviz.addSubTopology(subTopology);
23-
mermaid.addSubTopology(subTopology);
21+
final var sanitizedSubTopology = sanitize(subTopology);
22+
subTopologies.add(sanitizedSubTopology);
23+
graphviz.addSubTopology(sanitizedSubTopology);
24+
mermaid.addSubTopology(sanitizedSubTopology);
2425
}
2526

2627
void addSink(String sink, String topic) {
27-
sinks.add(topic);
28-
currentNode = sink;
29-
graphviz.addSink(sink, topic);
30-
mermaid.addSink(sink, topic);
28+
final var sanitizedTopic = sanitize(topic);
29+
sinks.add(sanitizedTopic);
30+
final var sanitizedSink = sanitize(sink);
31+
currentNode = sanitize(sanitizedSink);
32+
graphviz.addSink(sanitizedSink, sanitizedTopic);
33+
mermaid.addSink(sanitizedSink, sanitizedTopic);
3134
}
3235

3336
void addSources(String source, String[] topics) {
34-
currentNode = source;
37+
currentNode = sanitize(source);
3538
Arrays.stream(topics)
3639
.map(String::trim).filter(topic -> !topic.isEmpty())
3740
.forEachOrdered(topic -> {
38-
sources.add(topic);
39-
graphviz.addSource(source, topic);
40-
mermaid.addSource(source, topic);
41+
final var sanitizedTopic = sanitize(topic);
42+
sources.add(sanitizedTopic);
43+
graphviz.addSource(currentNode, sanitizedTopic);
44+
mermaid.addSource(currentNode, sanitizedTopic);
4145
});
4246
}
4347

4448
void addRegexSource(String source, String regex) {
45-
currentNode = source;
46-
final var cleanRegex = regex.trim();
47-
if (!cleanRegex.isEmpty()) {
48-
sources.add(cleanRegex);
49-
graphviz.addRegexSource(source, cleanRegex);
50-
mermaid.addRegexSource(source, cleanRegex);
49+
currentNode = sanitize(source);
50+
final var sanitizedRegex = sanitize(regex);
51+
if (!sanitizedRegex.isEmpty()) {
52+
sources.add(sanitizedRegex);
53+
graphviz.addRegexSource(currentNode, sanitizedRegex);
54+
mermaid.addRegexSource(currentNode, sanitizedRegex);
5155
}
5256
}
5357

5458
void addStores(String[] stores, String processor, boolean join) {
55-
currentNode = processor;
59+
currentNode = sanitize(processor);
5660
Arrays.stream(stores)
5761
.map(String::trim).filter(store -> !store.isEmpty())
5862
.forEachOrdered(store -> {
59-
this.stores.add(store);
60-
graphviz.addStore(store, currentNode, join);
61-
mermaid.addStore(store, currentNode, join);
63+
final var sanitizedStore = sanitize(store);
64+
this.stores.add(sanitizedStore);
65+
graphviz.addStore(sanitizedStore, currentNode, join);
66+
mermaid.addStore(sanitizedStore, currentNode, join);
6267
});
6368
}
6469

6570
void addTargets(String[] targets) {
6671
Arrays.stream(targets)
6772
.map(String::trim).filter(target -> !("none".equals(target) || target.isEmpty()))
6873
.forEachOrdered(target -> {
69-
graphviz.addTarget(target, currentNode);
70-
mermaid.addTarget(target, currentNode);
74+
final var sanitizedTarget = sanitize(target);
75+
graphviz.addTarget(sanitizedTarget, currentNode);
76+
mermaid.addTarget(sanitizedTarget, currentNode);
7177
});
7278
}
7379

80+
private static String sanitize(String name) {
81+
return name != null ? name.trim().replaceAll("\"", "") : null;
82+
}
83+
7484
static final class Graphviz {
7585
String currentGraph = "";
7686
final List<String> nodes = new ArrayList<>();
@@ -138,7 +148,7 @@ private void addStore(String store, String node, boolean join) {
138148
}
139149

140150
private static String toId(String name) {
141-
return name.replaceAll("-", "_");
151+
return '\"' + name + '\"';
142152
}
143153

144154
private static String toLabel(String name) {

extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java

+38-38
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public void shouldParsingStayConstant() {
1919
+ " --> none\n"
2020
+ " <-- KSTREAM-SOURCE-0000000001\n"
2121
+ "Sub-topology: 1\n"
22-
+ " Source: KSTREAM-SOURCE-0000000003 (topics: [temperature-values])\n"
22+
+ " Source: KSTREAM-SOURCE-0000000003 (topics: [temperature.values])\n"
2323
+ " --> KSTREAM-LEFTJOIN-0000000004\n"
2424
+ " Processor: KSTREAM-LEFTJOIN-0000000004 (stores: [])\n"
2525
+ " --> KSTREAM-AGGREGATE-0000000005\n"
@@ -43,62 +43,62 @@ public void shouldParsingStayConstant() {
4343

4444
assertEquals(expectedDescribe, actual.getString("describe"));
4545
assertEquals("[0, 1, 2]", actual.getString("subTopologies"));
46-
assertEquals("[notification\\..+, temperature-values, weather-stations]", actual.getString("sources"));
46+
assertEquals("[notification\\..+, temperature.values, weather-stations]", actual.getString("sources"));
4747
assertEquals("[temperatures-aggregated]", actual.getString("sinks"));
4848
assertEquals("[weather-stations-STATE-STORE-0000000000, weather-stations-store]", actual.getString("stores"));
4949
assertEquals("digraph {\n"
5050
+ " fontname=Helvetica; fontsize=10;\n"
5151
+ " node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=Helvetica fontsize=10];\n"
52-
+ " weather_stations [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n"
53-
+ " KSTREAM_SOURCE_0000000001 [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n"
54-
+ " KTABLE_SOURCE_0000000002 [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n"
55-
+ " weather_stations_STATE_STORE_0000000000 [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n"
56-
+ " temperature_values [label=\"temperature\\nvalues\" shape=invhouse margin=\"0,0\"];\n"
57-
+ " KSTREAM_SOURCE_0000000003 [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n"
58-
+ " KSTREAM_LEFTJOIN_0000000004 [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n"
59-
+ " KSTREAM_AGGREGATE_0000000005 [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n"
60-
+ " weather_stations_store [label=\"weather\\nstations\\nstore\" shape=cylinder];\n"
61-
+ " KTABLE_TOSTREAM_0000000006 [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n"
62-
+ " KSTREAM_SINK_0000000007 [label=\"KSTREAM\\nSINK\\n0000000007\"];\n"
63-
+ " temperatures_aggregated [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n"
52+
+ " \"weather-stations\" [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n"
53+
+ " \"KSTREAM-SOURCE-0000000001\" [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n"
54+
+ " \"KTABLE-SOURCE-0000000002\" [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n"
55+
+ " \"weather-stations-STATE-STORE-0000000000\" [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n"
56+
+ " \"temperature.values\" [label=\"temperature.values\" shape=invhouse margin=\"0,0\"];\n"
57+
+ " \"KSTREAM-SOURCE-0000000003\" [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n"
58+
+ " \"KSTREAM-LEFTJOIN-0000000004\" [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n"
59+
+ " \"KSTREAM-AGGREGATE-0000000005\" [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n"
60+
+ " \"weather-stations-store\" [label=\"weather\\nstations\\nstore\" shape=cylinder];\n"
61+
+ " \"KTABLE-TOSTREAM-0000000006\" [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n"
62+
+ " \"KSTREAM-SINK-0000000007\" [label=\"KSTREAM\\nSINK\\n0000000007\"];\n"
63+
+ " \"temperatures-aggregated\" [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n"
6464
+ " REGEX_12 [label=\"notification\\\\..+\" shape=invhouse style=dashed margin=\"0,0\"];\n"
65-
+ " KSTREAM_SOURCE_0000000008 [label=\"KSTREAM\\nSOURCE\\n0000000008\"];\n"
66-
+ " KSTREAM_FOREACH_0000000009 [label=\"KSTREAM\\nFOREACH\\n0000000009\"];\n"
65+
+ " \"KSTREAM-SOURCE-0000000008\" [label=\"KSTREAM\\nSOURCE\\n0000000008\"];\n"
66+
+ " \"KSTREAM-FOREACH-0000000009\" [label=\"KSTREAM\\nFOREACH\\n0000000009\"];\n"
6767
+ " subgraph cluster0 {\n"
6868
+ " label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
69-
+ " KSTREAM_SOURCE_0000000001;\n"
70-
+ " KTABLE_SOURCE_0000000002;\n"
69+
+ " \"KSTREAM-SOURCE-0000000001\";\n"
70+
+ " \"KTABLE-SOURCE-0000000002\";\n"
7171
+ " }\n"
7272
+ " subgraph cluster1 {\n"
7373
+ " label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
74-
+ " KSTREAM_SOURCE_0000000003;\n"
75-
+ " KSTREAM_LEFTJOIN_0000000004;\n"
76-
+ " KSTREAM_AGGREGATE_0000000005;\n"
77-
+ " KTABLE_TOSTREAM_0000000006;\n"
78-
+ " KSTREAM_SINK_0000000007;\n"
74+
+ " \"KSTREAM-SOURCE-0000000003\";\n"
75+
+ " \"KSTREAM-LEFTJOIN-0000000004\";\n"
76+
+ " \"KSTREAM-AGGREGATE-0000000005\";\n"
77+
+ " \"KTABLE-TOSTREAM-0000000006\";\n"
78+
+ " \"KSTREAM-SINK-0000000007\";\n"
7979
+ " }\n"
8080
+ " subgraph cluster2 {\n"
8181
+ " label=\"Sub-Topology: 2\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
82-
+ " KSTREAM_SOURCE_0000000008;\n"
83-
+ " KSTREAM_FOREACH_0000000009;\n"
82+
+ " \"KSTREAM-SOURCE-0000000008\";\n"
83+
+ " \"KSTREAM-FOREACH-0000000009\";\n"
8484
+ " }\n"
85-
+ " weather_stations -> KSTREAM_SOURCE_0000000001;\n"
86-
+ " KSTREAM_SOURCE_0000000001 -> KTABLE_SOURCE_0000000002;\n"
87-
+ " KTABLE_SOURCE_0000000002 -> weather_stations_STATE_STORE_0000000000;\n"
88-
+ " temperature_values -> KSTREAM_SOURCE_0000000003;\n"
89-
+ " KSTREAM_SOURCE_0000000003 -> KSTREAM_LEFTJOIN_0000000004;\n"
90-
+ " KSTREAM_LEFTJOIN_0000000004 -> KSTREAM_AGGREGATE_0000000005;\n"
91-
+ " KSTREAM_AGGREGATE_0000000005 -> weather_stations_store;\n"
92-
+ " KSTREAM_AGGREGATE_0000000005 -> KTABLE_TOSTREAM_0000000006;\n"
93-
+ " KTABLE_TOSTREAM_0000000006 -> KSTREAM_SINK_0000000007;\n"
94-
+ " KSTREAM_SINK_0000000007 -> temperatures_aggregated;\n"
95-
+ " REGEX_12 -> KSTREAM_SOURCE_0000000008;\n"
96-
+ " KSTREAM_SOURCE_0000000008 -> KSTREAM_FOREACH_0000000009;\n"
85+
+ " \"weather-stations\" -> \"KSTREAM-SOURCE-0000000001\";\n"
86+
+ " \"KSTREAM-SOURCE-0000000001\" -> \"KTABLE-SOURCE-0000000002\";\n"
87+
+ " \"KTABLE-SOURCE-0000000002\" -> \"weather-stations-STATE-STORE-0000000000\";\n"
88+
+ " \"temperature.values\" -> \"KSTREAM-SOURCE-0000000003\";\n"
89+
+ " \"KSTREAM-SOURCE-0000000003\" -> \"KSTREAM-LEFTJOIN-0000000004\";\n"
90+
+ " \"KSTREAM-LEFTJOIN-0000000004\" -> \"KSTREAM-AGGREGATE-0000000005\";\n"
91+
+ " \"KSTREAM-AGGREGATE-0000000005\" -> \"weather-stations-store\";\n"
92+
+ " \"KSTREAM-AGGREGATE-0000000005\" -> \"KTABLE-TOSTREAM-0000000006\";\n"
93+
+ " \"KTABLE-TOSTREAM-0000000006\" -> \"KSTREAM-SINK-0000000007\";\n"
94+
+ " \"KSTREAM-SINK-0000000007\" -> \"temperatures-aggregated\";\n"
95+
+ " REGEX_12 -> \"KSTREAM-SOURCE-0000000008\";\n"
96+
+ " \"KSTREAM-SOURCE-0000000008\" -> \"KSTREAM-FOREACH-0000000009\";\n"
9797
+ "}", actual.getString("graphviz"));
9898
assertEquals("graph TD\n"
9999
+ " weather-stations[weather-stations] --> KSTREAM-SOURCE-0000000001(KSTREAM-<br>SOURCE-<br>0000000001)\n"
100100
+ " KTABLE-SOURCE-0000000002[KTABLE-<br>SOURCE-<br>0000000002] --> weather-stations-STATE-STORE-0000000000(weather-<br>stations-<br>STATE-<br>STORE-<br>0000000000)\n"
101-
+ " temperature-values[temperature-values] --> KSTREAM-SOURCE-0000000003(KSTREAM-<br>SOURCE-<br>0000000003)\n"
101+
+ " temperature.values[temperature.values] --> KSTREAM-SOURCE-0000000003(KSTREAM-<br>SOURCE-<br>0000000003)\n"
102102
+ " KSTREAM-AGGREGATE-0000000005[KSTREAM-<br>AGGREGATE-<br>0000000005] --> weather-stations-store(weather-<br>stations-<br>store)\n"
103103
+ " KSTREAM-SINK-0000000007[KSTREAM-<br>SINK-<br>0000000007] --> temperatures-aggregated(temperatures-aggregated)\n"
104104
+ " REGEX_5[notification\\..+] --> KSTREAM-SOURCE-0000000008(KSTREAM-<br>SOURCE-<br>0000000008)\n"

0 commit comments

Comments
 (0)