Skip to content

Commit ce702f3

Browse files
use shared config with producer
1 parent 7da4901 commit ce702f3

12 files changed

+415
-207
lines changed

Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift

+3-85
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,6 @@ public struct KafkaProducerConfiguration {
9797
/// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
9898
/// Default: `.plaintext`
9999
public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext
100-
101-
internal var transactionalId: String?
102100

103101
public init() {}
104102
}
@@ -107,54 +105,7 @@ public struct KafkaProducerConfiguration {
107105

108106
extension KafkaProducerConfiguration {
109107
internal var dictionary: [String: String] {
110-
var resultDict: [String: String] = [:]
111-
112-
resultDict["enable.idempotence"] = String(self.enableIdempotence)
113-
if let transactionalId {
114-
resultDict["transactional.id"] = transactionalId
115-
resultDict["transaction.timeout.ms"] = "60000"
116-
resultDict["message.timeout.ms"] = "60000"
117-
118-
}
119-
resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
120-
resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
121-
resultDict["queue.buffering.max.ms"] = String(self.queue.bufferingMaxMilliseconds)
122-
resultDict["message.send.max.retries"] = String(self.messageSendMaxRetries)
123-
resultDict["allow.auto.create.topics"] = String(self.allowAutoCreateTopics)
124-
125-
resultDict["client.id"] = self.clientID
126-
resultDict["bootstrap.servers"] = self.bootstrapServers.map(\.description).joined(separator: ",")
127-
resultDict["message.max.bytes"] = String(self.message.maxBytes)
128-
resultDict["message.copy.max.bytes"] = String(self.message.copyMaxBytes)
129-
resultDict["receive.message.max.bytes"] = String(self.receiveMessageMaxBytes)
130-
resultDict["max.in.flight.requests.per.connection"] = String(self.maxInFlightRequestsPerConnection)
131-
resultDict["metadata.max.age.ms"] = String(self.metadataMaxAgeMilliseconds)
132-
resultDict["topic.metadata.refresh.interval.ms"] = String(self.topicMetadata.refreshIntervalMilliseconds)
133-
resultDict["topic.metadata.refresh.fast.interval.ms"] = String(self.topicMetadata.refreshFastIntervalMilliseconds)
134-
resultDict["topic.metadata.refresh.sparse"] = String(self.topicMetadata.refreshSparse)
135-
resultDict["topic.metadata.propagation.max.ms"] = String(self.topicMetadata.propagationMaxMilliseconds)
136-
resultDict["topic.blacklist"] = self.topicDenylist.joined(separator: ",")
137-
if !self.debug.isEmpty {
138-
resultDict["debug"] = self.debug.map(\.description).joined(separator: ",")
139-
}
140-
resultDict["socket.timeout.ms"] = String(self.socket.timeoutMilliseconds)
141-
resultDict["socket.send.buffer.bytes"] = String(self.socket.sendBufferBytes)
142-
resultDict["socket.receive.buffer.bytes"] = String(self.socket.receiveBufferBytes)
143-
resultDict["socket.keepalive.enable"] = String(self.socket.keepaliveEnable)
144-
resultDict["socket.nagle.disable"] = String(self.socket.nagleDisable)
145-
resultDict["socket.max.fails"] = String(self.socket.maxFails)
146-
resultDict["socket.connection.setup.timeout.ms"] = String(self.socket.connectionSetupTimeoutMilliseconds)
147-
resultDict["broker.address.ttl"] = String(self.broker.addressTTL)
148-
resultDict["broker.address.family"] = self.broker.addressFamily.description
149-
resultDict["reconnect.backoff.ms"] = String(self.reconnect.backoffMilliseconds)
150-
resultDict["reconnect.backoff.max.ms"] = String(self.reconnect.backoffMaxMilliseconds)
151-
152-
// Merge with SecurityProtocol configuration dictionary
153-
resultDict.merge(self.securityProtocol.dictionary) { _, _ in
154-
fatalError("securityProtocol and \(#file) should not have duplicate keys")
155-
}
156-
157-
return resultDict
108+
sharedPropsDictionary
158109
}
159110
}
160111

@@ -166,6 +117,8 @@ extension KafkaProducerConfiguration: Hashable {}
166117

167118
extension KafkaProducerConfiguration: Sendable {}
168119

120+
extension KafkaProducerConfiguration: KafkaProducerSharedProperties {}
121+
169122
// MARK: - KafkaConfiguration + Producer Additions
170123

171124
extension KafkaConfiguration {
@@ -194,38 +147,3 @@ extension KafkaConfiguration {
194147
}
195148
}
196149
}
197-
198-
// FIXME: should we really duplicate `KafkaProducerConfiguration`
199-
// FIXME: after public api updated?
200-
public struct KafkaTransactionalProducerConfiguration {
201-
var transactionalId: String
202-
var transactionsTimeout: Duration
203-
204-
var producerConfiguration: KafkaProducerConfiguration {
205-
set {
206-
self.producerConfiguration_ = newValue
207-
}
208-
get {
209-
var conf = self.producerConfiguration_
210-
conf.transactionalId = self.transactionalId
211-
conf.enableIdempotence = true
212-
conf.maxInFlightRequestsPerConnection = min(conf.maxInFlightRequestsPerConnection, 5)
213-
return conf
214-
}
215-
}
216-
217-
private var producerConfiguration_: KafkaProducerConfiguration = .init()
218-
219-
public init(transactionalId: String, transactionsTimeout: Duration = .kafkaUntilEndOfTransactionTimeout, producerConfiguration: KafkaProducerConfiguration = .init()) {
220-
self.transactionalId = transactionalId
221-
self.transactionsTimeout = transactionsTimeout
222-
self.producerConfiguration = producerConfiguration
223-
}
224-
}
225-
// MARK: - KafkaProducerConfiguration + Hashable
226-
227-
extension KafkaTransactionalProducerConfiguration: Hashable {}
228-
229-
// MARK: - KafkaProducerConfiguration + Sendable
230-
231-
extension KafkaTransactionalProducerConfiguration: Sendable {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
internal protocol KafkaProducerSharedProperties: Sendable, Hashable {
16+
// MARK: - SwiftKafka-specific Config properties
17+
18+
/// The time between two consecutive polls.
19+
/// Effectively controls the rate at which incoming events are consumed.
20+
/// Default: `.milliseconds(100)`
21+
var pollInterval: Duration { get }
22+
23+
/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
24+
/// Default: `10000`
25+
var flushTimeoutMilliseconds: Int { get }
26+
27+
// MARK: - Producer-specific Config Properties
28+
29+
/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
30+
/// Default: `false`
31+
var enableIdempotence: Bool { get }
32+
33+
/// Producer queue options.
34+
var queue: KafkaConfiguration.QueueOptions { get }
35+
36+
/// How many times to retry sending a failing Message. Note: retrying may cause reordering unless enable.idempotence is set to true.
37+
/// Default: `2_147_483_647`
38+
var messageSendMaxRetries: Int { get }
39+
40+
/// Allow automatic topic creation on the broker when producing to non-existent topics.
41+
/// The broker must also be configured with auto.create.topics.enable=true for this configuration to take effect.
42+
/// Default: `true`
43+
var allowAutoCreateTopics: Bool { get }
44+
45+
// MARK: - Common Client Config Properties
46+
47+
/// Client identifier.
48+
/// Default: `"rdkafka"`
49+
var clientID: String { get }
50+
51+
/// Initial list of brokers.
52+
/// Default: `[]`
53+
var bootstrapServers: [KafkaConfiguration.Broker] { get }
54+
55+
/// Message options.
56+
var message: KafkaConfiguration.MessageOptions { get }
57+
58+
/// Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set.
59+
/// Default: `100_000_000`
60+
var receiveMessageMaxBytes: Int { get }
61+
62+
/// Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
63+
/// Default: `1_000_000`
64+
var maxInFlightRequestsPerConnection: Int { get }
65+
66+
/// Metadata cache max age.
67+
/// Default: `900_000`
68+
var metadataMaxAgeMilliseconds: Int { get }
69+
70+
/// Topic metadata options.
71+
var topicMetadata: KafkaConfiguration.TopicMetadataOptions { get }
72+
73+
/// Topic denylist.
74+
/// Default: `[]`
75+
var topicDenylist: [String] { get }
76+
77+
/// Debug options.
78+
/// Default: `[]`
79+
var debug: [KafkaConfiguration.DebugOption] { get }
80+
81+
/// Socket options.
82+
var socket: KafkaConfiguration.SocketOptions { get }
83+
84+
/// Broker options.
85+
var broker: KafkaConfiguration.BrokerOptions { get }
86+
87+
/// Reconnect options.
88+
var reconnect: KafkaConfiguration.ReconnectOptions { get }
89+
90+
/// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
91+
/// Default: `.plaintext`
92+
var securityProtocol: KafkaConfiguration.SecurityProtocol { get }
93+
94+
var dictionary: [String: String] { get }
95+
}
96+
97+
extension KafkaProducerSharedProperties {
98+
internal var sharedPropsDictionary: [String: String] {
99+
var resultDict: [String: String] = [:]
100+
101+
resultDict["enable.idempotence"] = String(self.enableIdempotence)
102+
resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
103+
resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
104+
resultDict["queue.buffering.max.ms"] = String(self.queue.bufferingMaxMilliseconds)
105+
resultDict["message.send.max.retries"] = String(self.messageSendMaxRetries)
106+
resultDict["allow.auto.create.topics"] = String(self.allowAutoCreateTopics)
107+
108+
resultDict["client.id"] = self.clientID
109+
resultDict["bootstrap.servers"] = self.bootstrapServers.map(\.description).joined(separator: ",")
110+
resultDict["message.max.bytes"] = String(self.message.maxBytes)
111+
resultDict["message.copy.max.bytes"] = String(self.message.copyMaxBytes)
112+
resultDict["receive.message.max.bytes"] = String(self.receiveMessageMaxBytes)
113+
resultDict["max.in.flight.requests.per.connection"] = String(self.maxInFlightRequestsPerConnection)
114+
resultDict["metadata.max.age.ms"] = String(self.metadataMaxAgeMilliseconds)
115+
resultDict["topic.metadata.refresh.interval.ms"] = String(self.topicMetadata.refreshIntervalMilliseconds)
116+
resultDict["topic.metadata.refresh.fast.interval.ms"] = String(self.topicMetadata.refreshFastIntervalMilliseconds)
117+
resultDict["topic.metadata.refresh.sparse"] = String(self.topicMetadata.refreshSparse)
118+
resultDict["topic.metadata.propagation.max.ms"] = String(self.topicMetadata.propagationMaxMilliseconds)
119+
resultDict["topic.blacklist"] = self.topicDenylist.joined(separator: ",")
120+
if !self.debug.isEmpty {
121+
resultDict["debug"] = self.debug.map(\.description).joined(separator: ",")
122+
}
123+
resultDict["socket.timeout.ms"] = String(self.socket.timeoutMilliseconds)
124+
resultDict["socket.send.buffer.bytes"] = String(self.socket.sendBufferBytes)
125+
resultDict["socket.receive.buffer.bytes"] = String(self.socket.receiveBufferBytes)
126+
resultDict["socket.keepalive.enable"] = String(self.socket.keepaliveEnable)
127+
resultDict["socket.nagle.disable"] = String(self.socket.nagleDisable)
128+
resultDict["socket.max.fails"] = String(self.socket.maxFails)
129+
resultDict["socket.connection.setup.timeout.ms"] = String(self.socket.connectionSetupTimeoutMilliseconds)
130+
resultDict["broker.address.ttl"] = String(self.broker.addressTTL)
131+
resultDict["broker.address.family"] = self.broker.addressFamily.description
132+
resultDict["reconnect.backoff.ms"] = String(self.reconnect.backoffMilliseconds)
133+
resultDict["reconnect.backoff.max.ms"] = String(self.reconnect.backoffMaxMilliseconds)
134+
135+
// Merge with SecurityProtocol configuration dictionary
136+
resultDict.merge(self.securityProtocol.dictionary) { _, _ in
137+
fatalError("securityProtocol and \(#file) should not have duplicate keys")
138+
}
139+
140+
return resultDict
141+
}
142+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
// FIXME: should we really duplicate `KafkaProducerConfiguration`
16+
// FIXME: after public api updated?
17+
public struct KafkaTransactionalProducerConfiguration {
18+
// MARK: - SwiftKafka-specific Config properties
19+
20+
/// The time between two consecutive polls.
21+
/// Effectively controls the rate at which incoming events are consumed.
22+
/// Default: `.milliseconds(100)`
23+
public var pollInterval: Duration = .milliseconds(100)
24+
25+
/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
26+
/// Default: `10000`
27+
public var flushTimeoutMilliseconds: Int = 10000 {
28+
didSet {
29+
precondition(
30+
0...Int(Int32.max) ~= self.flushTimeoutMilliseconds,
31+
"Flush timeout outside of valid range \(0...Int32.max)"
32+
)
33+
}
34+
}
35+
36+
// MARK: - Producer-specific Config Properties
37+
38+
/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
39+
/// Default: `false`
40+
internal let enableIdempotence: Bool = true
41+
42+
/// Producer queue options.
43+
public var queue: KafkaConfiguration.QueueOptions = .init()
44+
45+
/// How many times to retry sending a failing Message. Note: retrying may cause reordering unless enable.idempotence is set to true.
46+
/// Default: `2_147_483_647`
47+
public var messageSendMaxRetries: Int = 2_147_483_647
48+
49+
/// Allow automatic topic creation on the broker when producing to non-existent topics.
50+
/// The broker must also be configured with auto.create.topics.enable=true for this configuration to take effect.
51+
/// Default: `true`
52+
public var allowAutoCreateTopics: Bool = true
53+
54+
// MARK: - Common Client Config Properties
55+
56+
/// Client identifier.
57+
/// Default: `"rdkafka"`
58+
public var clientID: String = "rdkafka"
59+
60+
/// Initial list of brokers.
61+
/// Default: `[]`
62+
public var bootstrapServers: [KafkaConfiguration.Broker] = []
63+
64+
/// Message options.
65+
public var message: KafkaConfiguration.MessageOptions = .init()
66+
67+
/// Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set.
68+
/// Default: `100_000_000`
69+
public var receiveMessageMaxBytes: Int = 100_000_000
70+
71+
/// Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
72+
/// Default: `1_000_000`
73+
public var maxInFlightRequestsPerConnection: Int = 5 {
74+
didSet {
75+
precondition(
76+
0...5 ~= self.maxInFlightRequestsPerConnection,
77+
"Transactional producer can have no more than 5 in flight requests"
78+
)
79+
}
80+
}
81+
82+
/// Metadata cache max age.
83+
/// Default: `900_000`
84+
public var metadataMaxAgeMilliseconds: Int = 900_000
85+
86+
/// Topic metadata options.
87+
public var topicMetadata: KafkaConfiguration.TopicMetadataOptions = .init()
88+
89+
/// Topic denylist.
90+
/// Default: `[]`
91+
public var topicDenylist: [String] = []
92+
93+
/// Debug options.
94+
/// Default: `[]`
95+
public var debug: [KafkaConfiguration.DebugOption] = []
96+
97+
/// Socket options.
98+
public var socket: KafkaConfiguration.SocketOptions = .init()
99+
100+
/// Broker options.
101+
public var broker: KafkaConfiguration.BrokerOptions = .init()
102+
103+
/// Reconnect options.
104+
public var reconnect: KafkaConfiguration.ReconnectOptions = .init()
105+
106+
/// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
107+
/// Default: `.plaintext`
108+
public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext
109+
110+
// TODO: add Docc
111+
var transactionalId: String
112+
var transactionsTimeout: Duration = .seconds(60) // equal to socket TODO: add didSet
113+
114+
public init(transactionalId: String) {
115+
self.transactionalId = transactionalId
116+
}
117+
}
118+
119+
// MARK: - KafkaProducerConfiguration + Hashable
120+
121+
extension KafkaTransactionalProducerConfiguration: Hashable {}
122+
123+
// MARK: - KafkaProducerConfiguration + Sendable
124+
125+
extension KafkaTransactionalProducerConfiguration: Sendable {}
126+
127+
extension KafkaTransactionalProducerConfiguration: KafkaProducerSharedProperties {
128+
internal var dictionary: [String: String] {
129+
var resultDict: [String: String] = sharedPropsDictionary
130+
resultDict["transactional.id"] = self.transactionalId
131+
resultDict["transaction.timeout.ms"] = String(self.transactionsTimeout.totalMilliseconds)
132+
return resultDict
133+
}
134+
}

0 commit comments

Comments
 (0)