Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: spreedly/kaffe
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: santiment/kaffe
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Can’t automatically merge. Don’t worry, you can still create the pull request.

Commits on May 13, 2019

  1. Make changes to Kaffe and Kaffe.Producer so the producer does not dep…

    …end on the config lib
    
    With these changes it will be much easier to implement a library on top
    of Kaffe that can be started directly in the supervison tree instead of
    depending on shared config
    IvanIvanoff committed May 13, 2019

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    d915fc3 View commit details
  2. Merge pull request #1 from IvanIvanoff/configless-producer

    Kaffe and Kaffe.Producer do not depend on config lib
    IvanIvanoff authored May 13, 2019

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    8a5c565 View commit details
  3. Configless producer fixes

    IvanIvanoff committed May 13, 2019
    Copy the full SHA
    33eb7e6 View commit details
  4. Merge pull request #2 from IvanIvanoff/configless-producer

    Configless producer fixes
    IvanIvanoff authored May 13, 2019
    Copy the full SHA
    d00cf9c View commit details

Commits on Jun 10, 2019

  1. Remove unused deps

    IvanIvanoff committed Jun 10, 2019
    Copy the full SHA
    4dfc868 View commit details
  2. Merge pull request #3 from IvanIvanoff/remove-unused

    Remove unused deps
    IvanIvanoff authored Jun 10, 2019
    Copy the full SHA
    9b3d4c9 View commit details
  3. Copy the full SHA
    f8594be View commit details
  4. Update brod

    IvanIvanoff committed Jun 10, 2019
    Copy the full SHA
    c541431 View commit details
  5. Merge pull request #4 from IvanIvanoff/updates

    Run formatter, remove unused, sync with upstream
    IvanIvanoff authored Jun 10, 2019
    Copy the full SHA
    804863d View commit details
  6. Copy the full SHA
    4b683a8 View commit details
  7. Merge pull request #5 from IvanIvanoff/parse-endpoints

    Add parsing of endpoints to the producer start link function
    IvanIvanoff authored Jun 10, 2019
    Copy the full SHA
    7ecb7ad View commit details

Commits on Oct 25, 2019

  1. Allow :already_present as a valid result from brod.create_client

    Version is now 1.15.0.
    
    Closes #94
    David Sere authored and tspenov committed Oct 25, 2019
    Copy the full SHA
    f58dd34 View commit details
  2. Merge pull request #6 from santiment/last-changes

    Last changes
    tspenov authored Oct 25, 2019
    Copy the full SHA
    b46f7c5 View commit details

Commits on Nov 14, 2019

  1. Copy the full SHA
    6983b28 View commit details
  2. Merge pull request #7 from santiment/sleep-group-member-supervisor

    Sleep group member supervisor
    tspenov authored Nov 14, 2019
    Copy the full SHA
    2f158c6 View commit details

Commits on Jun 17, 2020

  1. Handle client down error

    IvanIvanoff committed Jun 17, 2020
    Copy the full SHA
    85a7d07 View commit details
  2. Merge pull request #9 from santiment/apply-fixes-from-upstream

    Handle client down error
    IvanIvanoff authored Jun 17, 2020
    Copy the full SHA
    ffe4a70 View commit details

Commits on Sep 8, 2022

  1. Copy the full SHA
    cd57c3f View commit details

Commits on Sep 28, 2022

  1. Merge pull request #10 from santiment/remove-brod-as-applications

    Remove :brod from :applications
    IvanIvanoff authored Sep 28, 2022
    Copy the full SHA
    4f31d79 View commit details

Commits on Dec 4, 2023

  1. Copy the full SHA
    67da166 View commit details
  2. Merge pull request #12 from santiment/update-mix.exs-file

    Update mix.exs applications list
    IvanIvanoff authored Dec 4, 2023
    Copy the full SHA
    330657f View commit details
  3. Copy the full SHA
    ab6c493 View commit details
  4. Merge pull request #13 from santiment/remove-retry-from-applications

    Remove retry from :applications
    IvanIvanoff authored Dec 4, 2023
    Copy the full SHA
    089b319 View commit details

Commits on Dec 5, 2023

  1. Copy the full SHA
    e0eeaf5 View commit details
  2. Merge pull request #14 from santiment/move-brod-to-included-applications

    Move :brod to included_applications
    IvanIvanoff authored Dec 5, 2023
    Copy the full SHA
    f446352 View commit details
1 change: 1 addition & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
line_length: 120
]
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -10,6 +10,8 @@
# Where 3rd-party dependencies like ExDoc output generated docs.
/doc

/.elixir_ls

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -131,7 +131,7 @@ There is also legacy support for single message consumers, which process one mes
}
]
opts = [strategy: :one_for_one, name: Sample.Supervisor]
opts = [strategy: :one_for_one, name: MyApp.Application.Supervisor]
Supervisor.start_link(children, opts)
end
end
@@ -148,7 +148,7 @@ In some cases you may not want to commit back the most recent offset after proce
Example:

```elixir
defmodule MessageProcessor
defmodule MessageProcessor do
def handle_messages(messages) do
for %{key: key, value: value} = message <- messages do
IO.inspect message
@@ -289,6 +289,7 @@ config :kaffe,
# optional
partition_strategy: :md5,
ssl: true,
sasl: %{
mechanism: :plain,
login: System.get_env("KAFFE_PRODUCER_USER"),
@@ -305,7 +306,9 @@ The `partition_strategy` setting can be one of:
You can also set any of the Brod producer configuration options in the `producer` section - see [the Brod sources](https://github.com/klarna/brod/blob/master/src/brod_producer.erl#L90) for a list of keys and their meaning.
If kafka broker configured with `SASL_PLAINTEXT` auth, `sasl` option can be added
If the Kafka broker is configured with `SASL_PLAINTEXT` auth, the `sasl` option can be added.
If using Confluent Hosted Kafka, also add `ssl: true` as shown above.
## Heroku Configuration
10 changes: 5 additions & 5 deletions lib/kaffe.ex
Original file line number Diff line number Diff line change
@@ -9,14 +9,14 @@ defmodule Kaffe do

require Logger

def start(_type, _args) do
import Supervisor.Spec, warn: false

def start(_type, args) do
Logger.debug("event#start=#{__MODULE__}")

if Application.get_env(:kaffe, :producer) do
start_producer? = Keyword.get(args, :start_producer?) || Application.get_env(:kaffe, :producer)

if start_producer? do
Logger.debug("event#start_producer_client=#{__MODULE__}")
Kaffe.Producer.start_producer_client()
Kaffe.Producer.start_producer_client(args)
end

children = []
22 changes: 17 additions & 5 deletions lib/kaffe/config.ex
Original file line number Diff line number Diff line change
@@ -5,8 +5,19 @@ defmodule Kaffe.Config do
|> parse_endpoints()
end

def parse_endpoints(endpoints) when is_list(endpoints), do: endpoints
@doc """
Transform the list of endpoints into a list of `{charlist, port}` tuples.
"""
def parse_endpoints(endpoints) when is_list(endpoints) do
endpoints
|> Enum.map(fn {host, port} ->
{to_charlist(host), port}
end)
end

@doc """
Transform the encoded string into a list of `{charlist, port}` tuples.
"""
def parse_endpoints(url) when is_binary(url) do
url
|> String.replace("kafka+ssl://", "")
@@ -17,7 +28,7 @@ defmodule Kaffe.Config do

def url_endpoint_to_tuple(endpoint) do
[ip, port] = endpoint |> String.split(":")
{ip |> String.to_atom(), port |> String.to_integer()}
{ip |> String.to_charlist(), port |> String.to_integer()}
end

def sasl_config(%{mechanism: :plain, login: login, password: password})
@@ -30,9 +41,10 @@ defmodule Kaffe.Config do
ssl_config(client_cert(), client_cert_key())
end

def ssl_config(_client_cert = nil, _client_cert_key = nil) do
[]
end
def ssl_config(true), do: [ssl: true]
def ssl_config(_), do: []

def ssl_config(_client_cert = nil, _client_cert_key = nil), do: []

def ssl_config(client_cert, client_cert_key) do
[
9 changes: 8 additions & 1 deletion lib/kaffe/config/consumer.ex
Original file line number Diff line number Diff line change
@@ -72,7 +72,8 @@ defmodule Kaffe.Config.Consumer do
end

def client_consumer_config do
default_client_consumer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options()
default_client_consumer_config() ++
maybe_heroku_kafka_ssl() ++ sasl_options() ++ ssl_options()
end

def sasl_options do
@@ -81,6 +82,12 @@ defmodule Kaffe.Config.Consumer do
|> Kaffe.Config.sasl_config()
end

def ssl_options do
:ssl
|> config_get(false)
|> Kaffe.Config.ssl_config()
end

def default_client_consumer_config do
[
auto_start_producers: false,
20 changes: 14 additions & 6 deletions lib/kaffe/config/producer.ex
Original file line number Diff line number Diff line change
@@ -6,23 +6,22 @@ defmodule Kaffe.Config.Producer do
endpoints: endpoints(),
producer_config: client_producer_config(),
client_name: config_get(:client_name, :kaffe_producer_client),
topics: producer_topics(),
topics: config_get(:topics, []),
partition_strategy: config_get(:partition_strategy, :md5)
}
end

def producer_topics, do: config_get!(:topics)

def endpoints do
if heroku_kafka?() do
heroku_kafka_endpoints()
else
parse_endpoints(config_get!(:endpoints))
parse_endpoints(config_get(:endpoints, []))
end
end

def client_producer_config do
default_client_producer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options()
default_client_producer_config() ++
maybe_heroku_kafka_ssl() ++ sasl_options() ++ ssl_options()
end

def sasl_options do
@@ -38,6 +37,12 @@ defmodule Kaffe.Config.Producer do
end
end

def ssl_options do
:ssl
|> config_get(false)
|> Kaffe.Config.ssl_config()
end

def default_client_producer_config do
[
auto_start_producers: true,
@@ -67,6 +72,9 @@ defmodule Kaffe.Config.Producer do

def config_get(key, default) do
Application.get_env(:kaffe, :producer)
|> Keyword.get(key, default)
|> case do
nil -> default
config -> Keyword.get(config, key, default)
end
end
end
7 changes: 6 additions & 1 deletion lib/kaffe/consumer.ex
Original file line number Diff line number Diff line change
@@ -99,7 +99,12 @@ defmodule Kaffe.Consumer do
"""
def init(_consumer_group, [config]) do
start_consumer_client(config)
{:ok, %Kaffe.Consumer.State{message_handler: config.message_handler, async: config.async_message_ack}}

{:ok,
%Kaffe.Consumer.State{
message_handler: config.message_handler,
async: config.async_message_ack
}}
end

@doc """
15 changes: 13 additions & 2 deletions lib/kaffe/consumer_group/group_manager.ex
Original file line number Diff line number Diff line change
@@ -57,7 +57,13 @@ defmodule Kaffe.GroupManager do
Logger.info("event#startup=#{__MODULE__} name=#{name()}")

config = Kaffe.Config.Consumer.configuration()
:ok = kafka().start_client(config.endpoints, config.subscriber_name, config.consumer_config)
case kafka().start_client(config.endpoints, config.subscriber_name, config.consumer_config) do
:ok ->
:ok

{_, :already_present} ->
Logger.info("The brod client is already present, continuing.")
end

GenServer.cast(self(), {:start_group_members})

@@ -79,7 +85,12 @@ defmodule Kaffe.GroupManager do
def handle_cast({:start_group_members}, state) do
Logger.debug("Starting worker supervisors for group manager: #{inspect(self())}")

{:ok, worker_supervisor_pid} = group_member_supervisor().start_worker_supervisor(state.supervisor_pid, state.subscriber_name)
{:ok, worker_supervisor_pid} =
group_member_supervisor().start_worker_supervisor(
state.supervisor_pid,
state.subscriber_name
)

{:ok, worker_manager_pid} = worker_supervisor().start_worker_manager(worker_supervisor_pid, state.subscriber_name)

state = %State{state | worker_manager_pid: worker_manager_pid}
3 changes: 3 additions & 0 deletions lib/kaffe/consumer_group/group_member_supervisor.ex
Original file line number Diff line number Diff line change
@@ -54,6 +54,9 @@ defmodule Kaffe.GroupMemberSupervisor do

def init(:ok) do
Logger.info("event#starting=#{__MODULE__}")
# Added sleep between GroupMemberSupervisor restarts so it doesn't
# reach max attempts and kill parent supervisor.
Process.sleep(5000)

children = [
worker(Kaffe.GroupManager, [])
13 changes: 8 additions & 5 deletions lib/kaffe/consumer_group/subscriber/group_member.ex
Original file line number Diff line number Diff line change
@@ -89,12 +89,10 @@ defmodule Kaffe.GroupMember do
self()
)

Logger.info(
"event#init=#{__MODULE__}
Logger.info("event#init=#{__MODULE__}
group_coordinator=#{inspect(pid)}
subscriber_name=#{subscriber_name}
consumer_group=#{consumer_group}"
)
consumer_group=#{consumer_group}")

{:ok,
%State{
@@ -124,7 +122,11 @@ defmodule Kaffe.GroupMember do
end

# If we're not at the latest generation, discard the assignment for whatever is next.
def handle_info({:allocate_subscribers, gen_id, _assignments}, %{current_gen_id: current_gen_id} = state) when gen_id < current_gen_id do
def handle_info(
{:allocate_subscribers, gen_id, _assignments},
%{current_gen_id: current_gen_id} = state
)
when gen_id < current_gen_id do
Logger.debug("Discarding old generation #{gen_id} for current generation: #{current_gen_id}")
{:noreply, state}
end
@@ -175,6 +177,7 @@ defmodule Kaffe.GroupMember do
defp compute_offset(:undefined, configured_offset) do
[begin_offset: configured_offset]
end

defp compute_offset(offset, _configured_offset) do
[begin_offset: offset]
end
9 changes: 7 additions & 2 deletions lib/kaffe/consumer_group/subscriber/subscriber.ex
Original file line number Diff line number Diff line change
@@ -154,7 +154,11 @@ defmodule Kaffe.Subscriber do
end

def handle_cast({:commit_offsets, topic, partition, generation_id, offset}, state) do
Logger.debug("event#commit_offsets topic=#{state.topic} partition=#{state.partition} offset=#{offset} generation=#{generation_id}")
Logger.debug(
"event#commit_offsets topic=#{state.topic} partition=#{state.partition} offset=#{offset} generation=#{
generation_id
}"
)

# Is this the ack we're looking for?
^topic = state.topic
@@ -188,7 +192,8 @@ defmodule Kaffe.Subscriber do
{:noreply, %{state | subscriber_pid: subscriber_pid}}
end

defp handle_subscribe({:error, reason}, %{retries_remaining: retries_remaining} = state) when retries_remaining > 0 do
defp handle_subscribe({:error, reason}, %{retries_remaining: retries_remaining} = state)
when retries_remaining > 0 do
Logger.debug("Failed to subscribe with reason: #{inspect(reason)}, #{retries_remaining} retries remaining")

Process.send_after(self(), {:subscribe_to_topic_partition}, retry_delay())
5 changes: 4 additions & 1 deletion lib/kaffe/consumer_group/worker/worker.ex
Original file line number Diff line number Diff line change
@@ -21,7 +21,10 @@ defmodule Kaffe.Worker do
end

def process_messages(pid, subscriber_pid, topic, partition, generation_id, messages) do
GenServer.cast(pid, {:process_messages, subscriber_pid, topic, partition, generation_id, messages})
GenServer.cast(
pid,
{:process_messages, subscriber_pid, topic, partition, generation_id, messages}
)
end

## ==========================================================================
35 changes: 23 additions & 12 deletions lib/kaffe/producer.ex
Original file line number Diff line number Diff line change
@@ -21,14 +21,19 @@ defmodule Kaffe.Producer do

@kafka Application.get_env(:kaffe, :kafka_mod, :brod)

import Kaffe.Config, only: [parse_endpoints: 1]
require Logger

## -------------------------------------------------------------------------
## public api
## -------------------------------------------------------------------------

def start_producer_client do
@kafka.start_client(config().endpoints, client_name(), config().producer_config)
def start_producer_client(opts \\ []) do
client_name = Keyword.get(opts, :client_name) || client_name()
endpoints = Keyword.get(opts, :endpoints) || config().endpoints
producer_config = Keyword.get(opts, :producer_config) || config().producer_config

@kafka.start_client(parse_endpoints(endpoints), client_name, producer_config)
end

@doc """
@@ -119,7 +124,14 @@ defmodule Kaffe.Producer do
message_list
|> add_timestamp
|> group_by_partition(topic, partition_strategy)
|> produce_list_to_topic(topic)
|> case do
messages = %{} ->
produce_list_to_topic(messages, topic)

{:error, reason} ->
Logger.warn("Error while grouping by partition #{inspect(reason)}")
{:error, reason}
end
end

defp produce_value(topic, key, value) do
@@ -132,10 +144,9 @@ defmodule Kaffe.Producer do
)

@kafka.produce_sync(client_name(), topic, partition, key, value)

error ->
Logger.warn(
"event#produce topic=#{topic} key=#{key} error=#{inspect(error)}"
)
Logger.warn("event#produce topic=#{topic} key=#{key} error=#{inspect(error)}")

error
end
@@ -149,12 +160,12 @@ defmodule Kaffe.Producer do
end

defp group_by_partition(messages, topic, partition_strategy) do
{:ok, partitions_count} = @kafka.get_partitions_count(client_name(), topic)

messages
|> Enum.group_by(fn {_timestamp, key, message} ->
choose_partition(topic, partitions_count, key, message, partition_strategy)
end)
with {:ok, partitions_count} <- @kafka.get_partitions_count(client_name(), topic) do
messages
|> Enum.group_by(fn {_timestamp, key, message} ->
choose_partition(topic, partitions_count, key, message, partition_strategy)
end)
end
end

defp produce_list_to_topic(message_list, topic) do
12 changes: 8 additions & 4 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ defmodule Kaffe.Mixfile do
def project do
[
app: :kaffe,
version: "1.12.0",
version: "1.15.0",
description:
"An opinionated Elixir wrapper around brod, the Erlang Kafka client, that supports encrypted connections to Heroku Kafka out of the box.",
name: "Kaffe",
@@ -19,16 +19,20 @@ defmodule Kaffe.Mixfile do
end

def application do
[applications: [:logger, :brod], mod: {Kaffe, []}]
[
applications: [:logger],
mod: {Kaffe, []},
included_applications: [:brod]
]
end

defp elixirc_paths(:test), do: ["lib", "test/support"]
defp elixirc_paths(_env), do: ["lib"]

defp deps do
[
{:brod, ">= 3.0.0 and < 3.5.0"},
{:ex_doc, "~> 0.14", only: :dev, runtime: false},
{:brod, "~> 3.0"},
{:ex_doc, "~> 0.14", only: :dev, runtime: false}
]
end

20 changes: 12 additions & 8 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
%{"brod": {:hex, :brod, "3.2.0", "64f0778a7a32ec0a39cec9a564f4686bdfe72b147b48076e114a156fd0a30222", [:make, :rebar, :rebar3], [{:kafka_protocol, "1.1.0", [repo: "hexpm", hex: :kafka_protocol, optional: false]}, {:supervisor3, "1.1.5", [repo: "hexpm", hex: :supervisor3, optional: false]}], "hexpm"},
"earmark": {:hex, :earmark, "1.2.2", "f718159d6b65068e8daeef709ccddae5f7fdc770707d82e7d126f584cd925b74", [:mix], []},
"ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]},
"kafka_protocol": {:hex, :kafka_protocol, "1.1.0", "817c07a6339cbfb32d1f20a588353bf8d9a8944df296eb2e930360b83760c171", [:rebar, :rebar3], [{:snappyer, "1.2.1", [repo: "hexpm", hex: :snappyer, optional: false]}], "hexpm"},
"logfmt": {:hex, :logfmt, "3.2.0", "887a091adad28acc6e4d8b3d3bce177b934e7c61e7655c86946410f44aca6d84", [:mix], []},
"metrix": {:git, "https://github.com/rwdaigle/metrix.git", "a6738df9346da0412ca68f82a24a67d2a32b066e", [branch: "master"]},
"snappyer": {:hex, :snappyer, "1.2.1", "06c5f5c8afe80ba38e94e1ca1bd9253de95d8f2c85b08783e8d0f63815580556", [:make, :rebar, :rebar3], [], "hexpm"},
"supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], []}}
%{
"brod": {:hex, :brod, "3.7.10", "d1d3845dbc1e663eb65b9166140504cb6024531cae47d0b09acdaae46b85d305", [:make, :rebar, :rebar3], [{:kafka_protocol, "2.2.8", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.8", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "a719f80dcd4eb5ba77072e8265152d0c5dd4b52c9937e041fe155f17582c93fe"},
"crc32cer": {:hex, :crc32cer, "0.1.3", "8984906c4b4fae6aa292c48f286a1c83b19ad44bd102287acb94d696015967ce", [:make, :rebar, :rebar3], [], "hexpm", "e35840bfd312192748bf177e92e85270e2bf0bbc01462da1f7afd4298edae4a7"},
"earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm", "e3be2bc3ae67781db529b80aa7e7c49904a988596e2dbff897425b48b3581161"},
"ex_doc": {:hex, :ex_doc, "0.20.2", "1bd0dfb0304bade58beb77f20f21ee3558cc3c753743ae0ddbb0fd7ba2912331", [:mix], [{:earmark, "~> 1.3", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "8e24fc8ff9a50b9f557ff020d6c91a03cded7e59ac3e0eec8a27e771430c7d27"},
"kafka_protocol": {:hex, :kafka_protocol, "2.2.8", "f21f0f97529a106ad48eb545b388f58a3536b82ca5e935fa7458008e4b701f4a", [:rebar, :rebar3], [{:crc32cer, "0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "1.2.4", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "377fe1359965e1966f30a7050f8b23b9545cee40539e38b719797650c9ebd710"},
"makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5fbc8e549aa9afeea2847c0769e3970537ed302f93a23ac612602e805d9d1e7f"},
"makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "adf0218695e22caeda2820eaba703fa46c91820d53813a2223413da3ef4ba515"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm", "5c040b8469c1ff1b10093d3186e2e10dbe483cd73d79ec017993fb3985b8a9b3"},
"snappyer": {:hex, :snappyer, "1.2.4", "6d739c534cd2339633127a2b40279be71f149e5842c5363a4d88e66efb7c1fec", [:make, :rebar, :rebar3], [], "hexpm", "76abb8ed503722e10ee0f587956662881af9009067efdec87798a03db3527184"},
"supervisor3": {:hex, :supervisor3, "1.1.8", "5cf95c95342b589ec8d74689eea0646c0a3eb92820241e0c2d0ca4c104df92bc", [:make, :rebar, :rebar3], [], "hexpm", "4814b4d4343e777cc724312a588061828703f05149129cda2cb30d14105b1128"},
}
56 changes: 52 additions & 4 deletions test/kaffe/config/consumer_test.exs
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ defmodule Kaffe.Config.ConsumerTest do
consumer_config =
Application.get_env(:kaffe, :consumer)
|> Keyword.delete(:offset_reset_policy)
|> Keyword.delete(:ssl)
|> Keyword.put(:start_with_earliest_message, true)

Application.put_env(:kaffe, :consumer, consumer_config)
@@ -20,7 +21,7 @@ defmodule Kaffe.Config.ConsumerTest do
Application.put_env(:kaffe, :consumer, no_sasl_config)

expected = %{
endpoints: [kafka: 9092],
endpoints: [{'kafka', 9092}],
subscriber_name: :"kaffe-test-group",
consumer_group: "kaffe-test-group",
topics: ["kaffe-test"],
@@ -51,10 +52,15 @@ defmodule Kaffe.Config.ConsumerTest do
test "string endpoints parsed correctly" do
config = Application.get_env(:kaffe, :consumer)
endpoints = Keyword.get(config, :endpoints)
Application.put_env(:kaffe, :consumer, Keyword.put(config, :endpoints, "kafka:9092,localhost:9092"))

Application.put_env(
:kaffe,
:consumer,
Keyword.put(config, :endpoints, "kafka:9092,localhost:9092")
)

expected = %{
endpoints: [kafka: 9092, localhost: 9092],
endpoints: [{'kafka', 9092}, {'localhost', 9092}],
subscriber_name: :"kaffe-test-group",
consumer_group: "kaffe-test-group",
topics: ["kaffe-test"],
@@ -90,12 +96,13 @@ defmodule Kaffe.Config.ConsumerTest do
test "correct settings with sasl plain are extracted" do
config = Application.get_env(:kaffe, :consumer)
sasl = Keyword.get(config, :sasl)

sasl_config = Keyword.put(config, :sasl, %{mechanism: :plain, login: "Alice", password: "ecilA"})

Application.put_env(:kaffe, :consumer, sasl_config)

expected = %{
endpoints: [kafka: 9092],
endpoints: [{'kafka', 9092}],
subscriber_name: :"kaffe-test-group",
consumer_group: "kaffe-test-group",
topics: ["kaffe-test"],
@@ -128,6 +135,47 @@ defmodule Kaffe.Config.ConsumerTest do
assert Kaffe.Config.Consumer.configuration() == expected
end

test "correct settings with ssl are extracted" do
config = Application.get_env(:kaffe, :consumer)
ssl = Keyword.get(config, :ssl)
ssl_config = Keyword.put(config, :ssl, true)

Application.put_env(:kaffe, :consumer, ssl_config)

expected = %{
endpoints: [{'kafka', 9092}],
subscriber_name: :"kaffe-test-group",
consumer_group: "kaffe-test-group",
topics: ["kaffe-test"],
group_config: [
offset_commit_policy: :commit_to_kafka_v2,
offset_commit_interval_seconds: 10
],
consumer_config: [
auto_start_producers: false,
allow_topic_auto_creation: false,
begin_offset: :earliest,
ssl: true
],
message_handler: SilentMessage,
async_message_ack: false,
rebalance_delay_ms: 100,
max_bytes: 10_000,
min_bytes: 0,
max_wait_time: 10_000,
subscriber_retries: 1,
subscriber_retry_delay_ms: 5,
offset_reset_policy: :reset_by_subscriber,
worker_allocation_strategy: :worker_per_partition
}

on_exit(fn ->
Application.put_env(:kaffe, :consumer, Keyword.put(config, :ssl, ssl))
end)

assert Kaffe.Config.Consumer.configuration() == expected
end

describe "offset_reset_policy" do
test "computes correctly from start_with_earliest_message == true" do
consumer_config =
49 changes: 45 additions & 4 deletions test/kaffe/config/producer_test.exs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ defmodule Kaffe.Config.ProducerTest do
Application.put_env(:kaffe, :producer, no_sasl_config)

expected = %{
endpoints: [kafka: 9092],
endpoints: [{'kafka', 9092}],
producer_config: [
auto_start_producers: true,
allow_topic_auto_creation: false,
@@ -38,12 +38,13 @@ defmodule Kaffe.Config.ProducerTest do
test "correct settings with sasl plain are extracted" do
config = Application.get_env(:kaffe, :producer)
sasl = Keyword.get(config, :sasl)

sasl_config = Keyword.put(config, :sasl, %{mechanism: :plain, login: "Alice", password: "ecilA"})

Application.put_env(:kaffe, :producer, sasl_config)

expected = %{
endpoints: [kafka: 9092],
endpoints: [{'kafka', 9092}],
producer_config: [
auto_start_producers: true,
allow_topic_auto_creation: false,
@@ -76,10 +77,15 @@ defmodule Kaffe.Config.ProducerTest do
test "string endpoints parsed correctly" do
config = Application.get_env(:kaffe, :producer)
endpoints = Keyword.get(config, :endpoints)
Application.put_env(:kaffe, :producer, Keyword.put(config, :endpoints, "kafka:9092,localhost:9092"))

Application.put_env(
:kaffe,
:producer,
Keyword.put(config, :endpoints, "kafka:9092,localhost:9092")
)

expected = %{
endpoints: [kafka: 9092, localhost: 9092],
endpoints: [{'kafka', 9092}, {'localhost', 9092}],
producer_config: [
auto_start_producers: true,
allow_topic_auto_creation: false,
@@ -106,4 +112,39 @@ defmodule Kaffe.Config.ProducerTest do

assert Kaffe.Config.Producer.configuration() == expected
end

test "adds ssl when true" do
config = Application.get_env(:kaffe, :producer)
ssl = Keyword.get(config, :ssl)
Application.put_env(:kaffe, :producer, Keyword.put(config, :ssl, true))

expected = %{
endpoints: [{'kafka', 9092}],
producer_config: [
auto_start_producers: true,
allow_topic_auto_creation: false,
default_producer_config: [
required_acks: -1,
ack_timeout: 1000,
partition_buffer_limit: 512,
partition_onwire_limit: 1,
max_batch_size: 1_048_576,
max_retries: 3,
retry_backoff_ms: 500,
compression: :no_compression,
min_compression_batch_size: 1024
],
ssl: true
],
topics: ["kaffe-test"],
client_name: :kaffe_producer_client,
partition_strategy: :md5
}

on_exit(fn ->
Application.put_env(:kaffe, :producer, Keyword.put(config, :ssl, ssl))
end)

assert Kaffe.Config.Producer.configuration() == expected
end
end
5 changes: 3 additions & 2 deletions test/kaffe/config_test.exs
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ defmodule Kaffe.ConfigTest do
"kafka+ssl://192.168.1.100:9096,kafka+ssl://192.168.1.101:9096,kafka+ssl://192.168.1.102:9096"
)

expected = [{:"192.168.1.100", 9096}, {:"192.168.1.101", 9096}, {:"192.168.1.102", 9096}]
expected = [{'192.168.1.100', 9096}, {'192.168.1.101', 9096}, {'192.168.1.102', 9096}]

on_exit(fn ->
System.delete_env("KAFKA_URL")
@@ -19,7 +19,8 @@ defmodule Kaffe.ConfigTest do

test "transforms endpoints into the correct format" do
kafka_url = "kafka+ssl://192.168.1.100:9096,kafka+ssl://192.168.1.101:9096,kafka+ssl://192.168.1.102:9096"
expected = [{:"192.168.1.100", 9096}, {:"192.168.1.101", 9096}, {:"192.168.1.102", 9096}]

expected = [{'192.168.1.100', 9096}, {'192.168.1.101', 9096}, {'192.168.1.102', 9096}]

assert Kaffe.Config.parse_endpoints(kafka_url) == expected
end
16 changes: 13 additions & 3 deletions test/kaffe/consumer_group/group_manager_test.exs
Original file line number Diff line number Diff line change
@@ -16,7 +16,13 @@ defmodule Kaffe.GroupManagerTest do
{:ok, self()}
end

def start_group_member(_supervisor_pid, _subscriber_name, _consumer_group, _worker_manager_pid, topic) do
def start_group_member(
_supervisor_pid,
_subscriber_name,
_consumer_group,
_worker_manager_pid,
topic
) do
send(:test_case, {:start_group_member, topic})
{:ok, self()}
end
@@ -69,7 +75,9 @@ defmodule Kaffe.GroupManagerTest do
assert_receive {:start_group_member, "so-interesting"}
assert_receive {:start_group_member, "such-random"}

assert [] == GroupManager.list_subscribed_topics() -- ["kaffe-test", "so-interesting", "such-random"]
assert [] ==
GroupManager.list_subscribed_topics() --
["kaffe-test", "so-interesting", "such-random"]
end

test "duplicate topic subscription does nothing" do
@@ -95,6 +103,8 @@ defmodule Kaffe.GroupManagerTest do

refute_receive {:start_group_member, "so-interesting"}

assert [] == GroupManager.list_subscribed_topics() -- ["kaffe-test", "so-interesting", "such-random"]
assert [] ==
GroupManager.list_subscribed_topics() --
["kaffe-test", "so-interesting", "such-random"]
end
end
12 changes: 10 additions & 2 deletions test/kaffe/consumer_group/subscriber/group_member_startup_test.exs
Original file line number Diff line number Diff line change
@@ -8,7 +8,15 @@ defmodule Kaffe.GroupMemberStartupTest do
@moduletag :e2e

defmodule TestSubscriber do
def subscribe(subscriber_name, _group_coordinator_pid, _worker_pid, _gen_id, topic, partition, _ops) do
def subscribe(
subscriber_name,
_group_coordinator_pid,
_worker_pid,
_gen_id,
topic,
partition,
_ops
) do
send(:test_case, {:subscribe, subscriber_name, topic, partition})
{:ok, self()}
end
@@ -33,7 +41,7 @@ defmodule Kaffe.GroupMemberStartupTest do
Application.put_env(:kaffe, :consumer, Keyword.put(consumer_config, :subscriber_name, "s2"))
{:ok, _pid} = Kaffe.GroupMemberSupervisor.start_link()

:timer.sleep(Kaffe.Config.Consumer.configuration().rebalance_delay_ms + 100)
Process.sleep(Kaffe.Config.Consumer.configuration().rebalance_delay_ms + 100)

assignments =
Enum.reduce(0..31, %{}, fn _partition, map ->
10 changes: 9 additions & 1 deletion test/kaffe/consumer_group/subscriber/group_member_test.exs
Original file line number Diff line number Diff line change
@@ -25,7 +25,15 @@ defmodule Kaffe.GroupMemberTest do
end

defmodule TestSubscriber do
def subscribe(_subscriber_name, _group_coordinator_pid, _worker_pid, _gen_id, _topic, _partition, _ops) do
def subscribe(
_subscriber_name,
_group_coordinator_pid,
_worker_pid,
_gen_id,
_topic,
_partition,
_ops
) do
send(:test_case, {:subscriber, {:subscribe}})
{:ok, self()}
end
5 changes: 1 addition & 4 deletions test/kaffe/consumer_group/subscriber/subscriber_test.exs
Original file line number Diff line number Diff line change
@@ -121,11 +121,8 @@ defmodule Kaffe.SubscriberTest do
Enum.map(1..10, fn n ->
Subscriber.kafka_message(
offset: n,
magic_byte: 0,
attributes: 0,
key: "key-#{n}",
value: "#{n}",
crc: -1
value: "#{n}"
)
end)
end
7 changes: 6 additions & 1 deletion test/kaffe/consumer_group/worker/worker_manager_test.exs
Original file line number Diff line number Diff line change
@@ -38,6 +38,11 @@ defmodule Kaffe.WorkerManagerTest do

defp configure_strategy(strategy) do
consumer_config = Application.get_env(:kaffe, :consumer)
Application.put_env(:kaffe, :consumer, Keyword.put(consumer_config, :worker_allocation_strategy, strategy))

Application.put_env(
:kaffe,
:consumer,
Keyword.put(consumer_config, :worker_allocation_strategy, strategy)
)
end
end
15 changes: 12 additions & 3 deletions test/kaffe/consumer_group/worker/worker_test.exs
Original file line number Diff line number Diff line change
@@ -39,7 +39,10 @@ defmodule Kaffe.WorkerTest do
test "handle messages and commit back offset" do
{:ok, worker_pid} = Worker.start_link(TestHandler, "subscriber_name", 0)

Worker.process_messages(worker_pid, self(), "topic", 1, 2, [%{key: :one, offset: 100}, %{key: :two, offset: 101}])
Worker.process_messages(worker_pid, self(), "topic", 1, 2, [
%{key: :one, offset: 100},
%{key: :two, offset: 101}
])

assert_receive {:handle_messages, [%{key: :one, offset: 100}, %{key: :two, offset: 101}]}
assert_receive {:commit_offsets, {"topic", 1, 2, 101}}
@@ -49,7 +52,10 @@ defmodule Kaffe.WorkerTest do
test "handle messages and maintain offset" do
{:ok, worker_pid} = Worker.start_link(TestHandler, "subscriber_name", 0)

Worker.process_messages(worker_pid, self(), "topic", 1, 2, [%{key: :one, offset: 100}, %{key: :two, offset: 888}])
Worker.process_messages(worker_pid, self(), "topic", 1, 2, [
%{key: :one, offset: 100},
%{key: :two, offset: 888}
])

assert_receive {:handle_messages, [%{key: :one, offset: 100}, %{key: :two, offset: 888}]}
refute_received {:commit_offsets, {"topic", 1, 2, 888}}
@@ -59,7 +65,10 @@ defmodule Kaffe.WorkerTest do
test "handle messages and commit back specific offset" do
{:ok, worker_pid} = Worker.start_link(TestHandler, "subscriber_name", 0)

Worker.process_messages(worker_pid, self(), "topic", 1, 2, [%{key: :one, offset: 100}, %{key: :two, offset: 999}])
Worker.process_messages(worker_pid, self(), "topic", 1, 2, [
%{key: :one, offset: 100},
%{key: :two, offset: 999}
])

assert_receive {:handle_messages, [%{key: :one, offset: 100}, %{key: :two, offset: 999}]}
assert_receive {:commit_offsets, {"topic", 1, 2, 100}}
26 changes: 23 additions & 3 deletions test/kaffe/producer_test.exs
Original file line number Diff line number Diff line change
@@ -19,12 +19,26 @@ defmodule Kaffe.ProducerTest do

test "(topic, message_list) produces messages to the specific topic" do
:ok = Producer.produce_sync("topic2", [{"key8", "value1"}, {"key12", "value2"}])
assert_receive [:produce_sync, "topic2", 17, "ignored", [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}]]

assert_receive [
:produce_sync,
"topic2",
17,
"ignored",
[{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}]
]
end

test "(topic, message_list, partition_strategy) produces messages to the specific topic" do
:ok = Producer.produce("topic2", [{"key8", "value1"}, {"key12", "value2"}], partition_strategy: :md5)
assert_receive [:produce_sync, "topic2", 17, "ignored", [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}]]

assert_receive [
:produce_sync,
"topic2",
17,
"ignored",
[{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}]
]
end

test "(topic, message_list, partition_strategy) produces messages to the specific topic and partition" do
@@ -33,7 +47,13 @@ defmodule Kaffe.ProducerTest do
partition_strategy: fn _topic, _partitions_count, _key, _value -> 19 end
)

assert_receive [:produce_sync, "topic2", 19, "ignored", [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}]]
assert_receive [
:produce_sync,
"topic2",
19,
"ignored",
[{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}]
]
end

test "(topic, key, value) produces a message to the specific topic" do