Skip to content

Commit

Permalink
feat: create report tables and save responses
Browse files Browse the repository at this point in the history
  • Loading branch information
JuanDGiraldoM committed Jul 14, 2024
1 parent c02e70f commit 43d2613
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 149 deletions.
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ config :distributed_performance_analyzer,
dataset_parser: DistributedPerformanceAnalyzer.Infrastructure.Adapters.Csv,
file_system: DistributedPerformanceAnalyzer.Infrastructure.Adapters.FileSystem,
report_exporter: DistributedPerformanceAnalyzer.Infrastructure.Adapters.Csv,
http_client: DistributedPerformanceAnalyzer.Infrastructure.Adapters.Http.HttpClient
http_client: DistributedPerformanceAnalyzer.Infrastructure.Adapters.Http.HttpClient,
report_repository: DistributedPerformanceAnalyzer.Infrastructure.Adapters.Mnesia

if Mix.env() == :dev do
config :git_hooks,
Expand Down
7 changes: 4 additions & 3 deletions lib/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ defmodule DistributedPerformanceAnalyzer.Application do
alias DistributedPerformanceAnalyzer.Domain.UseCase.{
Execution.ExecutionUseCase,
Config.ConfigUseCase,
Dataset.DatasetUseCase
Dataset.DatasetUseCase,
Reports.ReportUseCase
}

use Application
Expand All @@ -24,7 +25,7 @@ defmodule DistributedPerformanceAnalyzer.Application do
# CustomTelemetry.custom_telemetry_events()
opts = [strategy: :one_for_one, name: DistributedPerformanceAnalyzer.Supervisor]
pid = Supervisor.start_link(children, opts)
# Process.sleep(100_000)
if env == :dev, do: to_timeout(minute: 3) |> Process.sleep()
pid
end

Expand All @@ -38,7 +39,7 @@ defmodule DistributedPerformanceAnalyzer.Application do
def env_children(:test, _distributed), do: []

def env_children(_other_env, distributed) do
children = [DatasetUseCase]
children = [DatasetUseCase, ReportUseCase]
master_children = [ExecutionUseCase]

if distributed == :none || distributed == :master do
Expand Down
1 change: 1 addition & 0 deletions lib/domain/model/user/user.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule DistributedPerformanceAnalyzer.Domain.Model.User do
"""

constructor do
field(:step_name, :atomics, constructor: &is_atom/1)
field(:request, Request.t(), constructor: &Request.new/1)

field(:dataset_name, String.t(),
Expand Down
70 changes: 66 additions & 4 deletions lib/domain/use_cases/reports/report_use_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,76 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.Reports.ReportUseCase do
@moduledoc """
Provides functions for generating reports, based on the results of the step
"""
use GenServer
alias :mnesia, as: Mnesia
require Logger

# TODO: init, create tables (step results)
alias DistributedPerformanceAnalyzer.Config.AppConfig
alias DistributedPerformanceAnalyzer.Domain.Model.Config.Response
alias DistributedPerformanceAnalyzer.Domain.UseCase.CollectorUseCase
alias DistributedPerformanceAnalyzer.Utils.DataTypeUtils

def init do
Mnesia.start()
Mnesia.create_schema([node()])
@repository Application.compile_env!(AppConfig.get_app_name(), :report_repository)
@attributes [:key, :value]
@responses :responses
@errors :errors

def start_link(_) do
Logger.debug("Starting report server...")
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end

@impl true
def init(_) do
case @repository.start() do
:ok -> {:ok, nil}
error -> error
end
end

def create_step_report(step_name) do
[
get_table_name(step_name, @responses),
get_table_name(step_name, @errors)
]
|> Enum.each(fn table -> GenServer.call(__MODULE__, {:create_table, table, @attributes}) end)
end

def save_response(step_name, index, %Response{} = response),
do:
GenServer.cast(
__MODULE__,
{:save_item, get_table_name(step_name, @responses), index, response}
)

def save_error(step_name, index, reason),
do:
GenServer.cast(__MODULE__, {:save_item, get_table_name(step_name, @errors), index, reason})

@impl true
def handle_call({:create_table, table_name, attributes}, _, state) do
Task.async(fn -> @repository.create_table(table_name, attributes) end) |> Task.await()
{:reply, :ok, state}
end

@impl true
def handle_cast({:save_item, table, index, item}, state) do
Task.async(fn -> @repository.save_item(table, index, item) end) |> Task.await()
{:noreply, state}
end

defp get_table_name(step_name, type), do: DataTypeUtils.parse_to_atom("#{step_name}_#{type}")

def consolidate_step(step_name) do
end

def save_step_result(scenario_name, step_result) do
end

def consolidate_scenario(scenario_name) do
end

def create_step_report(step_name) when is_atom(step_name) do
end

def start_step_collector(id) do
Expand Down
8 changes: 4 additions & 4 deletions lib/domain/use_cases/step/step_use_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.Step.StepUseCase do
defp start_step(_, 0), do: {:error, :invalid_step_number}

defp start_step(%Scenario{strategy: strategy} = scenario, step_number) when step_number > 0 do
table_name = get_process_name(scenario.name, step_number)
step_name = get_process_name(scenario.name, step_number)
concurrency = get_concurrency(strategy, step_number)

Logger.info(
Expand All @@ -47,14 +47,14 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.Step.StepUseCase do
:poolboy.child_spec(
:worker,
get_pool_config(scenario.name, step_number, concurrency),
get_user_config(scenario, table_name)
get_user_config(scenario, step_name)
)
]

opts =
[strategy: :one_for_one, name: get_pool_name(scenario.name, step_number)]

ReportUseCase.create_step_report(table_name)
ReportUseCase.create_step_report(step_name)
Supervisor.start_link(children, opts)
end

Expand All @@ -72,7 +72,7 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.Step.StepUseCase do
defp get_user_config(%Scenario{} = scenario, table_name) do
{:ok, user} =
User.new(
table_name: table_name,
step_name: table_name,
request: scenario.request,
dataset_name: scenario.dataset_name
)
Expand Down
26 changes: 21 additions & 5 deletions lib/domain/use_cases/user/user_use_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.User.UserUseCase do
use GenServer
require Logger
alias DistributedPerformanceAnalyzer.Config.AppConfig
alias DistributedPerformanceAnalyzer.Domain.Model.{User, Config.Request}
alias DistributedPerformanceAnalyzer.Domain.UseCase.Dataset.DatasetUseCase
alias DistributedPerformanceAnalyzer.Utils.DataTypeUtils
alias DistributedPerformanceAnalyzer.Domain.Model.{User, Config.Request, Config.Response}

defstruct [:connection, :config]
alias DistributedPerformanceAnalyzer.Domain.UseCase.{
Dataset.DatasetUseCase,
Reports.ReportUseCase
}

defstruct [:connection, :config, :step_name]

@http_client Application.compile_env!(AppConfig.get_app_name(), :http_client)

def start_link(%User{} = config) do
Logger.debug("Starting user for #{config.request.url}}...")
Logger.debug("Starting user for #{config.request.url}...")
GenServer.start_link(__MODULE__, config)
end

Expand Down Expand Up @@ -46,9 +51,9 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.User.UserUseCase do

{:ok, %{response: response, connection: connection}} = send_request(connection, request)
Logger.debug(inspect(response))
save_response(config.step_name, response)

loop()
# TODO: send metrics
{:noreply, %{state | connection: connection}}
end

Expand All @@ -69,6 +74,17 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.User.UserUseCase do
%{request | body: replace_value.(request.body), headers: replace_value.(request.headers)}
end

defp save_response(step_name, {:ok, %Response{} = response}),
do: ReportUseCase.save_response(step_name, get_response_id(response), response)

defp save_response(step_name, {:error, reason}) do
Logger.warning("Request failed: #{inspect(reason)}")
ReportUseCase.save_error(step_name, get_response_id(reason), reason)
end

defp get_response_id(%Response{timestamp: timestamp}), do: "#{timestamp}-#{inspect(self())}"
defp get_response_id(_), do: "#{DataTypeUtils.timestamp()}-#{inspect(self())}"

@impl true
def terminate(reason, state) do
Logger.debug("Terminating user due to #{inspect(reason)}")
Expand Down
3 changes: 1 addition & 2 deletions lib/infrastructure/driven_adapters/http/http_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ defmodule DistributedPerformanceAnalyzer.Infrastructure.Adapters.Http.HttpClient
})
end

# TODO: Send error reason
defp fail_response(res), do: Logger.warning("Request error: #{res}")
defp fail_response(reason), do: {:error, reason}

defp mint_opts(scheme, ssl_validation, timeout) do
# TODO: enable cacerts https://hexdocs.pm/mint/Mint.HTTP.html#module-ssl-certificates
Expand Down
Loading

0 comments on commit 43d2613

Please sign in to comment.