Skip to content

Commit d91483b

Browse files
authored
BE: Allow overriding producer and consumer properties (#874)
1 parent 3121341 commit d91483b

File tree

7 files changed

+38
-0
lines changed

7 files changed

+38
-0
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

+4
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public static class Cluster {
6161

6262
MetricsConfigData metrics;
6363
Map<String, Object> properties;
64+
Map<String, Object> consumerProperties;
65+
Map<String, Object> producerProperties;
6466
boolean readOnly = false;
6567

6668
Long pollingThrottleRate;
@@ -200,6 +202,8 @@ private void setMetricsDefaults() {
200202
private void flattenClusterProperties() {
201203
for (Cluster cluster : clusters) {
202204
cluster.setProperties(flattenClusterProperties(null, cluster.getProperties()));
205+
cluster.setConsumerProperties(flattenClusterProperties(null, cluster.getConsumerProperties()));
206+
cluster.setProducerProperties(flattenClusterProperties(null, cluster.getProducerProperties()));
203207
}
204208
}
205209

api/src/main/java/io/kafbat/ui/model/KafkaCluster.java

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public class KafkaCluster {
2424
private final String version;
2525
private final String bootstrapServers;
2626
private final Properties properties;
27+
private final Properties consumerProperties;
28+
private final Properties producerProperties;
2729
private final boolean readOnly;
2830
private final MetricsConfig metricsConfig;
2931
private final DataMasking masking;

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

+1
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster,
266266
Properties props = new Properties();
267267
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
268268
props.putAll(cluster.getProperties());
269+
props.putAll(cluster.getConsumerProperties());
269270
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafbat-ui-consumer-" + System.currentTimeMillis());
270271
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
271272
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java

+2
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public KafkaCluster create(ClustersProperties properties,
5353
builder.name(clusterProperties.getName());
5454
builder.bootstrapServers(clusterProperties.getBootstrapServers());
5555
builder.properties(convertProperties(clusterProperties.getProperties()));
56+
builder.consumerProperties(convertProperties(clusterProperties.getConsumerProperties()));
57+
builder.producerProperties(convertProperties(clusterProperties.getProducerProperties()));
5658
builder.readOnly(clusterProperties.isReadOnly());
5759
builder.masking(DataMasking.create(clusterProperties.getMasking()));
5860
builder.pollingSettings(PollingSettings.create(clusterProperties, properties));

api/src/main/java/io/kafbat/ui/service/MessagesService.java

+1
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
201201
Properties properties = new Properties();
202202
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
203203
properties.putAll(cluster.getProperties());
204+
properties.putAll(cluster.getProducerProperties());
204205
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
205206
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
206207
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.kafka.clients.admin.AdminClient;
1313
import org.apache.kafka.clients.admin.AdminClientConfig;
1414
import org.apache.kafka.clients.admin.NewTopic;
15+
import org.apache.kafka.common.IsolationLevel;
1516
import org.jetbrains.annotations.NotNull;
1617
import org.junit.jupiter.api.function.ThrowingConsumer;
1718
import org.junit.jupiter.api.io.TempDir;
@@ -102,6 +103,12 @@ public void initialize(@NotNull ConfigurableApplicationContext context) {
102103
System.setProperty("kafka.clusters.0.audit.topicAuditEnabled", "true");
103104
System.setProperty("kafka.clusters.0.audit.consoleAuditEnabled", "true");
104105

106+
System.setProperty("kafka.clusters.0.consumerProperties.request.timeout.ms", "60000");
107+
System.setProperty("kafka.clusters.0.consumerProperties.isolation.level",
108+
IsolationLevel.READ_COMMITTED.toString());
109+
System.setProperty("kafka.clusters.0.producerProperties.request.timeout.ms", "45000");
110+
System.setProperty("kafka.clusters.0.producerProperties.max.block.ms", "80000");
111+
105112
System.setProperty("kafka.clusters.1.name", SECOND_LOCAL);
106113
System.setProperty("kafka.clusters.1.readOnly", "true");
107114
System.setProperty("kafka.clusters.1.bootstrapServers", kafka.getBootstrapServers());

api/src/test/java/io/kafbat/ui/service/ConfigTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kafbat.ui.service;
22

33
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
45

56
import io.kafbat.ui.AbstractIntegrationTest;
67
import io.kafbat.ui.model.BrokerConfigDTO;
@@ -10,6 +11,10 @@
1011
import java.util.List;
1112
import java.util.Map;
1213
import java.util.Optional;
14+
import java.util.Properties;
15+
import org.apache.kafka.clients.consumer.ConsumerConfig;
16+
import org.apache.kafka.clients.producer.ProducerConfig;
17+
import org.apache.kafka.common.IsolationLevel;
1318
import org.junit.jupiter.api.BeforeEach;
1419
import org.junit.jupiter.api.Test;
1520
import org.springframework.beans.factory.annotation.Autowired;
@@ -79,6 +84,22 @@ void testAlterReadonlyConfig() {
7984
.expectStatus().isBadRequest();
8085
}
8186

87+
@Test
88+
void testKafkaClientCustomProperties() {
89+
KafkaCluster cluster = applicationContext.getBean(ClustersStorage.class).getClusterByName(LOCAL).orElseThrow();
90+
91+
Properties consumerProps = cluster.getConsumerProperties();
92+
93+
assertEquals("60000", consumerProps.getProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
94+
assertEquals(IsolationLevel.READ_COMMITTED.toString(),
95+
consumerProps.getProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
96+
97+
Properties producerProps = cluster.getProducerProperties();
98+
99+
assertEquals("45000", producerProps.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG));
100+
assertEquals("80000", producerProps.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG));
101+
}
102+
82103
private Optional<BrokerConfigDTO> getConfig(String name) {
83104
List<BrokerConfigDTO> configs = webTestClient.get()
84105
.uri("/api/clusters/{clusterName}/brokers/{id}/configs", LOCAL, 1)

0 commit comments

Comments
 (0)