From e522a85f65adb8c23c21d4cdfc4b62548fcea867 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Fri, 28 Feb 2025 00:21:13 +0500 Subject: [PATCH 1/2] BE: Allow overriding producer and consumer properties --- .../kafbat/ui/config/ClustersProperties.java | 4 ++++ .../java/io/kafbat/ui/model/KafkaCluster.java | 2 ++ .../ui/service/ConsumerGroupService.java | 2 +- .../ui/service/KafkaClusterFactory.java | 2 ++ .../io/kafbat/ui/service/MessagesService.java | 2 +- .../io/kafbat/ui/AbstractIntegrationTest.java | 7 +++++++ .../java/io/kafbat/ui/service/ConfigTest.java | 21 +++++++++++++++++++ 7 files changed, 38 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 5931602b2..81f71ddfc 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -54,6 +54,8 @@ public static class Cluster { MetricsConfigData metrics; Map properties; + Map consumerProperties; + Map producerProperties; boolean readOnly = false; Long pollingThrottleRate; @@ -189,6 +191,8 @@ private void setMetricsDefaults() { private void flattenClusterProperties() { for (Cluster cluster : clusters) { cluster.setProperties(flattenClusterProperties(null, cluster.getProperties())); + cluster.setConsumerProperties(flattenClusterProperties(null, cluster.getConsumerProperties())); + cluster.setProducerProperties(flattenClusterProperties(null, cluster.getProducerProperties())); } } diff --git a/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java b/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java index 94e097373..6e2a00988 100644 --- a/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java +++ b/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java @@ -24,6 +24,8 @@ public class KafkaCluster { private final String version; private final String bootstrapServers; private final Properties properties; + private final Properties consumerProperties; + private final Properties producerProperties; private final boolean readOnly; private final MetricsConfig metricsConfig; private final DataMasking masking; diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 282bdc5b6..5fa3916c8 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -265,7 +265,7 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster, Map properties) { Properties props = new Properties(); KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props); - props.putAll(cluster.getProperties()); + props.putAll(cluster.getConsumerProperties()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafbat-ui-consumer-" + System.currentTimeMillis()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java index 16e41ff84..a74a33ce4 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java @@ -53,6 +53,8 @@ public KafkaCluster create(ClustersProperties properties, builder.name(clusterProperties.getName()); builder.bootstrapServers(clusterProperties.getBootstrapServers()); builder.properties(convertProperties(clusterProperties.getProperties())); + builder.consumerProperties(convertProperties(clusterProperties.getConsumerProperties())); + builder.producerProperties(convertProperties(clusterProperties.getProducerProperties())); builder.readOnly(clusterProperties.isReadOnly()); builder.masking(DataMasking.create(clusterProperties.getMasking())); builder.pollingSettings(PollingSettings.create(clusterProperties, properties)); diff --git a/api/src/main/java/io/kafbat/ui/service/MessagesService.java b/api/src/main/java/io/kafbat/ui/service/MessagesService.java index c94472d56..e3ffb3e06 100644 --- a/api/src/main/java/io/kafbat/ui/service/MessagesService.java +++ b/api/src/main/java/io/kafbat/ui/service/MessagesService.java @@ -200,7 +200,7 @@ public static KafkaProducer createProducer(KafkaCluster cluster, Map additionalProps) { Properties properties = new Properties(); KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); - properties.putAll(cluster.getProperties()); + properties.putAll(cluster.getProducerProperties()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); diff --git a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java index 9e9e903cc..6c6c73113 100644 --- a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java +++ b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java @@ -12,6 +12,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.IsolationLevel; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.function.ThrowingConsumer; import org.junit.jupiter.api.io.TempDir; @@ -102,6 +103,12 @@ public void initialize(@NotNull ConfigurableApplicationContext context) { System.setProperty("kafka.clusters.0.audit.topicAuditEnabled", "true"); System.setProperty("kafka.clusters.0.audit.consoleAuditEnabled", "true"); + System.setProperty("kafka.clusters.0.consumerProperties.request.timeout.ms", "60000"); + System.setProperty("kafka.clusters.0.consumerProperties.isolation.level", + IsolationLevel.READ_COMMITTED.toString()); + System.setProperty("kafka.clusters.0.producerProperties.request.timeout.ms", "45000"); + System.setProperty("kafka.clusters.0.producerProperties.max.block.ms", "80000"); + System.setProperty("kafka.clusters.1.name", SECOND_LOCAL); System.setProperty("kafka.clusters.1.readOnly", "true"); System.setProperty("kafka.clusters.1.bootstrapServers", kafka.getBootstrapServers()); diff --git a/api/src/test/java/io/kafbat/ui/service/ConfigTest.java b/api/src/test/java/io/kafbat/ui/service/ConfigTest.java index 756bd5c91..c436869a2 100644 --- a/api/src/test/java/io/kafbat/ui/service/ConfigTest.java +++ b/api/src/test/java/io/kafbat/ui/service/ConfigTest.java @@ -1,6 +1,7 @@ package io.kafbat.ui.service; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import io.kafbat.ui.AbstractIntegrationTest; import io.kafbat.ui.model.BrokerConfigDTO; @@ -10,6 +11,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.IsolationLevel; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -79,6 +84,22 @@ void testAlterReadonlyConfig() { .expectStatus().isBadRequest(); } + @Test + void testKafkaClientCustomProperties() { + KafkaCluster cluster = applicationContext.getBean(ClustersStorage.class).getClusterByName(LOCAL).orElseThrow(); + + Properties consumerProps = cluster.getConsumerProperties(); + + assertEquals("60000", consumerProps.getProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); + assertEquals(IsolationLevel.READ_COMMITTED.toString(), + consumerProps.getProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG)); + + Properties producerProps = cluster.getProducerProperties(); + + assertEquals("45000", producerProps.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG)); + assertEquals("80000", producerProps.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG)); + } + private Optional getConfig(String name) { List configs = webTestClient.get() .uri("/api/clusters/{clusterName}/brokers/{id}/configs", LOCAL, 1) From 68e0efd3638594fc6779a2c299e481778f81e3f9 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Fri, 28 Feb 2025 11:47:38 +0500 Subject: [PATCH 2/2] BE: Allow overriding producer and consumer properties address review comments --- api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java | 1 + api/src/main/java/io/kafbat/ui/service/MessagesService.java | 1 + 2 files changed, 2 insertions(+) diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 5fa3916c8..f1256cf00 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -265,6 +265,7 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster, Map properties) { Properties props = new Properties(); KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props); + props.putAll(cluster.getProperties()); props.putAll(cluster.getConsumerProperties()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafbat-ui-consumer-" + System.currentTimeMillis()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); diff --git a/api/src/main/java/io/kafbat/ui/service/MessagesService.java b/api/src/main/java/io/kafbat/ui/service/MessagesService.java index e3ffb3e06..b33be8b76 100644 --- a/api/src/main/java/io/kafbat/ui/service/MessagesService.java +++ b/api/src/main/java/io/kafbat/ui/service/MessagesService.java @@ -200,6 +200,7 @@ public static KafkaProducer createProducer(KafkaCluster cluster, Map additionalProps) { Properties properties = new Properties(); KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); + properties.putAll(cluster.getProperties()); properties.putAll(cluster.getProducerProperties()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);