Skip to content

Commit eaeb4a4

Browse files
authored
BE: Chore: Cleanup api module (#815)
1 parent 844fbc9 commit eaeb4a4

30 files changed

+89
-277
lines changed

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import io.kafbat.ui.connect.model.ConnectorTopics;
1313
import io.kafbat.ui.connect.model.NewConnector;
1414
import io.kafbat.ui.connect.model.TaskStatus;
15-
import io.kafbat.ui.exception.KafkaConnectConflictReponseException;
15+
import io.kafbat.ui.exception.KafkaConnectConflictResponseException;
1616
import io.kafbat.ui.exception.ValidationException;
1717
import io.kafbat.ui.util.WebClientConfigurator;
1818
import jakarta.validation.constraints.NotNull;
@@ -48,7 +48,7 @@ private static Retry conflictCodeRetry() {
4848
.fixedDelay(MAX_RETRIES, RETRIES_DELAY)
4949
.filter(e -> e instanceof WebClientResponseException.Conflict)
5050
.onRetryExhaustedThrow((spec, signal) ->
51-
new KafkaConnectConflictReponseException(
51+
new KafkaConnectConflictResponseException(
5252
(WebClientResponseException.Conflict) signal.failure()));
5353
}
5454

api/src/main/java/io/kafbat/ui/config/auth/BasicAuthSecurityConfig.java

-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package io.kafbat.ui.config.auth;
22

3-
import io.kafbat.ui.util.EmptyRedirectStrategy;
43
import io.kafbat.ui.util.StaticFileWebFilter;
5-
import java.net.URI;
64
import lombok.extern.slf4j.Slf4j;
75
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
86
import org.springframework.context.annotation.Bean;
@@ -12,8 +10,6 @@
1210
import org.springframework.security.config.web.server.SecurityWebFiltersOrder;
1311
import org.springframework.security.config.web.server.ServerHttpSecurity;
1412
import org.springframework.security.web.server.SecurityWebFilterChain;
15-
import org.springframework.security.web.server.authentication.RedirectServerAuthenticationSuccessHandler;
16-
import org.springframework.security.web.server.authentication.logout.RedirectServerLogoutSuccessHandler;
1713
import org.springframework.security.web.server.util.matcher.ServerWebExchangeMatchers;
1814

1915
@Configuration

api/src/main/java/io/kafbat/ui/config/auth/logout/CognitoLogoutSuccessHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public Mono<Void> handle(WebFilterExchange exchange, Authentication authenticati
4040
requestUri.getPath(), requestUri.getQuery());
4141

4242
final UriComponents baseUrl = UriComponentsBuilder
43-
.fromHttpUrl(fullUrl)
43+
.fromUriString(fullUrl)
4444
.replacePath("/")
4545
.replaceQuery(null)
4646
.fragment(null)

api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private Map<TopicPartition, Long> firstOffsetsForPolling(Consumer<?, ?> consumer
5353
Collection<TopicPartition> partitions) {
5454
try {
5555
// we try to use offsetsForTimes() to find earliest offsets, since for
56-
// some topics (like compacted) beginningOffsets() ruturning 0 offsets
56+
// some topics (like compacted) beginningOffsets() returning 0 offsets
5757
// even when effectively first offset can be very high
5858
var offsets = consumer.offsetsForTimes(
5959
partitions.stream().collect(Collectors.toMap(p -> p, p -> 0L))

api/src/main/java/io/kafbat/ui/emitter/ResultSizeLimiter.java

-23
This file was deleted.

api/src/main/java/io/kafbat/ui/exception/ConnectNotFoundException.java

-13
This file was deleted.

api/src/main/java/io/kafbat/ui/exception/DuplicateEntityException.java

-13
This file was deleted.

api/src/main/java/io/kafbat/ui/exception/ErrorCode.java

-6
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,19 @@
44
import org.slf4j.LoggerFactory;
55
import org.springframework.http.HttpStatus;
66

7-
87
public enum ErrorCode {
98

10-
FORBIDDEN(403, HttpStatus.FORBIDDEN),
11-
129
UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR),
1310
KSQL_API_ERROR(5001, HttpStatus.INTERNAL_SERVER_ERROR),
1411
BINDING_FAIL(4001, HttpStatus.BAD_REQUEST),
1512
NOT_FOUND(404, HttpStatus.NOT_FOUND),
1613
VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST),
1714
READ_ONLY_MODE_ENABLE(4003, HttpStatus.METHOD_NOT_ALLOWED),
1815
CONNECT_CONFLICT_RESPONSE(4004, HttpStatus.CONFLICT),
19-
DUPLICATED_ENTITY(4005, HttpStatus.CONFLICT),
2016
UNPROCESSABLE_ENTITY(4006, HttpStatus.UNPROCESSABLE_ENTITY),
2117
CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND),
2218
TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND),
2319
SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND),
24-
CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND),
25-
KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND),
2620
DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST),
2721
TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST),
2822
INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST),

api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ private Mono<ServerResponse> render(CustomBaseException baseException, ServerReq
102102

103103
private Mono<ServerResponse> render(WebExchangeBindException exception, ServerRequest request) {
104104
Map<String, Set<String>> fieldErrorsMap = exception.getFieldErrors().stream()
105-
.collect(Collectors
106-
.toMap(FieldError::getField, f -> Set.of(extractFieldErrorMsg(f)), Sets::union));
105+
.collect(Collectors.toMap(FieldError::getField, f -> Set.of(extractFieldErrorMsg(f)), Sets::union));
107106

108107
var fieldsErrors = fieldErrorsMap.entrySet().stream()
109108
.map(e -> {

api/src/main/java/io/kafbat/ui/exception/KafkaConnectConflictReponseException.java api/src/main/java/io/kafbat/ui/exception/KafkaConnectConflictResponseException.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package io.kafbat.ui.exception;
22

3-
43
import org.springframework.web.reactive.function.client.WebClientResponseException;
54

6-
public class KafkaConnectConflictReponseException extends CustomBaseException {
5+
public class KafkaConnectConflictResponseException extends CustomBaseException {
76

8-
public KafkaConnectConflictReponseException(WebClientResponseException.Conflict e) {
7+
public KafkaConnectConflictResponseException(WebClientResponseException.Conflict e) {
98
super("Kafka Connect responded with 409 (Conflict) code. Response body: "
109
+ e.getResponseBodyAsString());
1110
}

api/src/main/java/io/kafbat/ui/exception/KsqlDbNotFoundException.java

-13
This file was deleted.

api/src/main/java/io/kafbat/ui/exception/SchemaFailedToDeleteException.java

-13
This file was deleted.

api/src/main/java/io/kafbat/ui/exception/UnprocessableEntityException.java

-14
This file was deleted.

api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java

+9-17
Original file line numberDiff line numberDiff line change
@@ -97,23 +97,15 @@ private static BrokerDTO mapCoordinator(Node node) {
9797
return new BrokerDTO().host(node.host()).id(node.id()).port(node.port());
9898
}
9999

100-
private static ConsumerGroupStateDTO mapConsumerGroupState(
101-
org.apache.kafka.common.ConsumerGroupState state) {
102-
switch (state) {
103-
case DEAD:
104-
return ConsumerGroupStateDTO.DEAD;
105-
case EMPTY:
106-
return ConsumerGroupStateDTO.EMPTY;
107-
case STABLE:
108-
return ConsumerGroupStateDTO.STABLE;
109-
case PREPARING_REBALANCE:
110-
return ConsumerGroupStateDTO.PREPARING_REBALANCE;
111-
case COMPLETING_REBALANCE:
112-
return ConsumerGroupStateDTO.COMPLETING_REBALANCE;
113-
default:
114-
return ConsumerGroupStateDTO.UNKNOWN;
115-
}
100+
private static ConsumerGroupStateDTO mapConsumerGroupState(org.apache.kafka.common.ConsumerGroupState state) {
101+
return switch (state) {
102+
case DEAD -> ConsumerGroupStateDTO.DEAD;
103+
case EMPTY -> ConsumerGroupStateDTO.EMPTY;
104+
case STABLE -> ConsumerGroupStateDTO.STABLE;
105+
case PREPARING_REBALANCE -> ConsumerGroupStateDTO.PREPARING_REBALANCE;
106+
case COMPLETING_REBALANCE -> ConsumerGroupStateDTO.COMPLETING_REBALANCE;
107+
default -> ConsumerGroupStateDTO.UNKNOWN;
108+
};
116109
}
117110

118-
119111
}

api/src/main/java/io/kafbat/ui/mapper/DescribeLogDirsMapper.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName,
4242

4343
private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
4444
List<Map.Entry<TopicPartition,
45-
DescribeLogDirsResponse.ReplicaInfo>> partitions) {
45+
DescribeLogDirsResponse.ReplicaInfo>> partitions) {
4646
BrokerTopicLogdirsDTO topic = new BrokerTopicLogdirsDTO();
4747
topic.setName(name);
4848
topic.setPartitions(
@@ -54,8 +54,7 @@ private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
5454
}
5555

5656
private BrokerTopicPartitionLogdirDTO topicPartitionLogDir(Integer broker, Integer partition,
57-
DescribeLogDirsResponse.ReplicaInfo
58-
replicaInfo) {
57+
DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
5958
BrokerTopicPartitionLogdirDTO logDir = new BrokerTopicPartitionLogdirDTO();
6059
logDir.setBroker(broker);
6160
logDir.setPartition(partition);

api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void close() {
9797
try {
9898
serde.close();
9999
} catch (Exception e) {
100-
log.error("Error closing serde " + name, e);
100+
log.error("Error closing serde {}", name, e);
101101
}
102102
return null;
103103
});

api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ private Map<String, ProtoFile> knownProtoFiles() {
380380
}
381381

382382
private ProtoFile loadKnownProtoFile(String path, Descriptors.FileDescriptor fileDescriptor) {
383-
String protoFileString = null;
383+
String protoFileString;
384384
// know type file contains either message or enum
385385
if (!fileDescriptor.getMessageTypes().isEmpty()) {
386386
protoFileString = new ProtobufSchema(fileDescriptor.getMessageTypes().getFirst()).canonicalString();

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.kafbat.ui.service;
22

3-
import com.fasterxml.jackson.databind.ObjectMapper;
43
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
54
import io.kafbat.ui.connect.model.ConnectorStatus;
65
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
@@ -44,7 +43,6 @@
4443
public class KafkaConnectService {
4544
private final ClusterMapper clusterMapper;
4645
private final KafkaConnectMapper kafkaConnectMapper;
47-
private final ObjectMapper objectMapper;
4846
private final KafkaConfigSanitizer kafkaConfigSanitizer;
4947

5048
public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

-6
Original file line numberDiff line numberDiff line change
@@ -389,12 +389,6 @@ static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> v
389389
);
390390
}
391391

392-
public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
393-
return describeCluster()
394-
.map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
395-
.flatMap(this::describeLogDirs);
396-
}
397-
398392
public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
399393
Collection<Integer> brokerIds) {
400394
return toMono(client.describeLogDirs(brokerIds).all())

api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public Mono<List<String>> getAllSubjectNames(KafkaCluster cluster) {
6363
@SneakyThrows
6464
private List<String> parseSubjectListString(String subjectNamesStr) {
6565
//workaround for https://github.com/spring-projects/spring-framework/issues/24734
66-
return new JsonMapper().readValue(subjectNamesStr, new TypeReference<List<String>>() {
66+
return new JsonMapper().readValue(subjectNamesStr, new TypeReference<>() {
6767
});
6868
}
6969

api/src/main/java/io/kafbat/ui/service/TopicsService.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private Mono<InternalTopic> loadTopic(KafkaCluster c, String topicName) {
9797

9898
/**
9999
* After creation topic can be invisible via API for some time.
100-
* To workaround this, we retyring topic loading until it becomes visible.
100+
* To workaround this, we're retrying topic loading until it becomes visible.
101101
*/
102102
private Mono<InternalTopic> loadTopicAfterCreation(KafkaCluster c, String topicName) {
103103
return loadTopic(c, topicName)
@@ -137,8 +137,7 @@ private List<InternalTopic> createList(List<String> orderedNames,
137137
.collect(toList());
138138
}
139139

140-
private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription>
141-
descriptionsMap,
140+
private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription> descriptionsMap,
142141
ReactiveAdminClient ac) {
143142
var descriptions = descriptionsMap.values();
144143
return ac.listOffsets(descriptions, OffsetSpec.earliest())
@@ -225,8 +224,7 @@ private Mono<InternalTopic> updateTopic(KafkaCluster cluster,
225224
.then(loadTopic(cluster, topicName)));
226225
}
227226

228-
public Mono<InternalTopic> updateTopic(KafkaCluster cl, String topicName,
229-
Mono<TopicUpdateDTO> topicUpdate) {
227+
public Mono<InternalTopic> updateTopic(KafkaCluster cl, String topicName, Mono<TopicUpdateDTO> topicUpdate) {
230228
return topicUpdate
231229
.flatMap(t -> updateTopic(cl, topicName, t));
232230
}
@@ -298,7 +296,7 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
298296
var brokers = brokersUsage.entrySet().stream()
299297
.sorted(Map.Entry.comparingByValue())
300298
.map(Map.Entry::getKey)
301-
.collect(toList());
299+
.toList();
302300

303301
// Iterate brokers and try to add them in assignment
304302
// while partition replicas count != requested replication factor
@@ -326,7 +324,7 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
326324
var brokersUsageList = brokersUsage.entrySet().stream()
327325
.sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
328326
.map(Map.Entry::getKey)
329-
.collect(toList());
327+
.toList();
330328

331329
// Iterate brokers and try to remove them from assignment
332330
// while partition replicas count != requested replication factor

api/src/main/java/io/kafbat/ui/service/acl/AclsService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,13 @@ private void logAclSyncPlan(KafkaCluster cluster, Set<AclBinding> toBeAdded, Set
112112
if (!toBeAdded.isEmpty()) {
113113
log.info("ACLs to be added ({}): ", toBeAdded.size());
114114
for (AclBinding aclBinding : toBeAdded) {
115-
log.info(" " + AclCsv.createAclString(aclBinding));
115+
log.info(" {}", AclCsv.createAclString(aclBinding));
116116
}
117117
}
118118
if (!toBeDeleted.isEmpty()) {
119119
log.info("ACLs to be deleted ({}): ", toBeDeleted.size());
120120
for (AclBinding aclBinding : toBeDeleted) {
121-
log.info(" " + AclCsv.createAclString(aclBinding));
121+
log.info(" {}", AclCsv.createAclString(aclBinding));
122122
}
123123
}
124124
}

api/src/main/java/io/kafbat/ui/service/ksql/KsqlApiClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public Flux<KsqlResponseTable> execute(String ksql, Map<String, String> streamPr
176176
if (statements.size() > 1) {
177177
return errorTableFlux("Only single statement supported now");
178178
}
179-
if (statements.size() == 0) {
179+
if (statements.isEmpty()) {
180180
return errorTableFlux("No valid ksql statement found");
181181
}
182182
if (isUnsupportedStatementType(statements.get(0))) {

0 commit comments

Comments
 (0)