From 43d261378a91fb643b8d7201b78ce1d171d87b02 Mon Sep 17 00:00:00 2001 From: JuanDGiraldoM Date: Sat, 13 Jul 2024 20:37:01 -0500 Subject: [PATCH] feat: create report tables and save responses --- config/config.exs | 3 +- lib/application.ex | 7 +- lib/domain/model/user/user.ex | 1 + .../use_cases/reports/report_use_case.ex | 70 ++++- lib/domain/use_cases/step/step_use_case.ex | 8 +- lib/domain/use_cases/user/user_use_case.ex | 26 +- .../driven_adapters/http/http_client.ex | 3 +- .../driven_adapters/mnesia/mnesia.ex | 296 ++++++++++-------- 8 files changed, 265 insertions(+), 149 deletions(-) diff --git a/config/config.exs b/config/config.exs index 774a812..918f9df 100644 --- a/config/config.exs +++ b/config/config.exs @@ -12,7 +12,8 @@ config :distributed_performance_analyzer, dataset_parser: DistributedPerformanceAnalyzer.Infrastructure.Adapters.Csv, file_system: DistributedPerformanceAnalyzer.Infrastructure.Adapters.FileSystem, report_exporter: DistributedPerformanceAnalyzer.Infrastructure.Adapters.Csv, - http_client: DistributedPerformanceAnalyzer.Infrastructure.Adapters.Http.HttpClient + http_client: DistributedPerformanceAnalyzer.Infrastructure.Adapters.Http.HttpClient, + report_repository: DistributedPerformanceAnalyzer.Infrastructure.Adapters.Mnesia if Mix.env() == :dev do config :git_hooks, diff --git a/lib/application.ex b/lib/application.ex index 332a37f..b4f6922 100644 --- a/lib/application.ex +++ b/lib/application.ex @@ -5,7 +5,8 @@ defmodule DistributedPerformanceAnalyzer.Application do alias DistributedPerformanceAnalyzer.Domain.UseCase.{ Execution.ExecutionUseCase, Config.ConfigUseCase, - Dataset.DatasetUseCase + Dataset.DatasetUseCase, + Reports.ReportUseCase } use Application @@ -24,7 +25,7 @@ defmodule DistributedPerformanceAnalyzer.Application do # CustomTelemetry.custom_telemetry_events() opts = [strategy: :one_for_one, name: DistributedPerformanceAnalyzer.Supervisor] pid = Supervisor.start_link(children, opts) - # Process.sleep(100_000) + if env == :dev, do: to_timeout(minute: 3) |> Process.sleep() pid end @@ -38,7 +39,7 @@ defmodule DistributedPerformanceAnalyzer.Application do def env_children(:test, _distributed), do: [] def env_children(_other_env, distributed) do - children = [DatasetUseCase] + children = [DatasetUseCase, ReportUseCase] master_children = [ExecutionUseCase] if distributed == :none || distributed == :master do diff --git a/lib/domain/model/user/user.ex b/lib/domain/model/user/user.ex index e776f0f..e15f011 100644 --- a/lib/domain/model/user/user.ex +++ b/lib/domain/model/user/user.ex @@ -7,6 +7,7 @@ defmodule DistributedPerformanceAnalyzer.Domain.Model.User do """ constructor do + field(:step_name, :atomics, constructor: &is_atom/1) field(:request, Request.t(), constructor: &Request.new/1) field(:dataset_name, String.t(), diff --git a/lib/domain/use_cases/reports/report_use_case.ex b/lib/domain/use_cases/reports/report_use_case.ex index 81ecbb0..3eafed2 100644 --- a/lib/domain/use_cases/reports/report_use_case.ex +++ b/lib/domain/use_cases/reports/report_use_case.ex @@ -2,14 +2,76 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.Reports.ReportUseCase do @moduledoc """ Provides functions for generating reports, based on the results of the step """ + use GenServer alias :mnesia, as: Mnesia + require Logger - # TODO: init, create tables (step results) + alias DistributedPerformanceAnalyzer.Config.AppConfig + alias DistributedPerformanceAnalyzer.Domain.Model.Config.Response alias DistributedPerformanceAnalyzer.Domain.UseCase.CollectorUseCase + alias DistributedPerformanceAnalyzer.Utils.DataTypeUtils - def init do - Mnesia.start() - Mnesia.create_schema([node()]) + @repository Application.compile_env!(AppConfig.get_app_name(), :report_repository) + @attributes [:key, :value] + @responses :responses + @errors :errors + + def start_link(_) do + Logger.debug("Starting report server...") + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + @impl true + def init(_) do + case @repository.start() do + :ok -> {:ok, nil} + error -> error + end + end + + def create_step_report(step_name) do + [ + get_table_name(step_name, @responses), + get_table_name(step_name, @errors) + ] + |> Enum.each(fn table -> GenServer.call(__MODULE__, {:create_table, table, @attributes}) end) + end + + def save_response(step_name, index, %Response{} = response), + do: + GenServer.cast( + __MODULE__, + {:save_item, get_table_name(step_name, @responses), index, response} + ) + + def save_error(step_name, index, reason), + do: + GenServer.cast(__MODULE__, {:save_item, get_table_name(step_name, @errors), index, reason}) + + @impl true + def handle_call({:create_table, table_name, attributes}, _, state) do + Task.async(fn -> @repository.create_table(table_name, attributes) end) |> Task.await() + {:reply, :ok, state} + end + + @impl true + def handle_cast({:save_item, table, index, item}, state) do + Task.async(fn -> @repository.save_item(table, index, item) end) |> Task.await() + {:noreply, state} + end + + defp get_table_name(step_name, type), do: DataTypeUtils.parse_to_atom("#{step_name}_#{type}") + + def consolidate_step(step_name) do + end + + def save_step_result(scenario_name, step_result) do + end + + def consolidate_scenario(scenario_name) do + end + + def create_step_report(step_name) when is_atom(step_name) do end def start_step_collector(id) do diff --git a/lib/domain/use_cases/step/step_use_case.ex b/lib/domain/use_cases/step/step_use_case.ex index 889f84e..14618a9 100644 --- a/lib/domain/use_cases/step/step_use_case.ex +++ b/lib/domain/use_cases/step/step_use_case.ex @@ -35,7 +35,7 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.Step.StepUseCase do defp start_step(_, 0), do: {:error, :invalid_step_number} defp start_step(%Scenario{strategy: strategy} = scenario, step_number) when step_number > 0 do - table_name = get_process_name(scenario.name, step_number) + step_name = get_process_name(scenario.name, step_number) concurrency = get_concurrency(strategy, step_number) Logger.info( @@ -47,14 +47,14 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.Step.StepUseCase do :poolboy.child_spec( :worker, get_pool_config(scenario.name, step_number, concurrency), - get_user_config(scenario, table_name) + get_user_config(scenario, step_name) ) ] opts = [strategy: :one_for_one, name: get_pool_name(scenario.name, step_number)] - ReportUseCase.create_step_report(table_name) + ReportUseCase.create_step_report(step_name) Supervisor.start_link(children, opts) end @@ -72,7 +72,7 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.Step.StepUseCase do defp get_user_config(%Scenario{} = scenario, table_name) do {:ok, user} = User.new( - table_name: table_name, + step_name: table_name, request: scenario.request, dataset_name: scenario.dataset_name ) diff --git a/lib/domain/use_cases/user/user_use_case.ex b/lib/domain/use_cases/user/user_use_case.ex index 2207e33..dbb05c5 100644 --- a/lib/domain/use_cases/user/user_use_case.ex +++ b/lib/domain/use_cases/user/user_use_case.ex @@ -6,15 +6,20 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.User.UserUseCase do use GenServer require Logger alias DistributedPerformanceAnalyzer.Config.AppConfig - alias DistributedPerformanceAnalyzer.Domain.Model.{User, Config.Request} - alias DistributedPerformanceAnalyzer.Domain.UseCase.Dataset.DatasetUseCase + alias DistributedPerformanceAnalyzer.Utils.DataTypeUtils + alias DistributedPerformanceAnalyzer.Domain.Model.{User, Config.Request, Config.Response} - defstruct [:connection, :config] + alias DistributedPerformanceAnalyzer.Domain.UseCase.{ + Dataset.DatasetUseCase, + Reports.ReportUseCase + } + + defstruct [:connection, :config, :step_name] @http_client Application.compile_env!(AppConfig.get_app_name(), :http_client) def start_link(%User{} = config) do - Logger.debug("Starting user for #{config.request.url}}...") + Logger.debug("Starting user for #{config.request.url}...") GenServer.start_link(__MODULE__, config) end @@ -46,9 +51,9 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.User.UserUseCase do {:ok, %{response: response, connection: connection}} = send_request(connection, request) Logger.debug(inspect(response)) + save_response(config.step_name, response) loop() - # TODO: send metrics {:noreply, %{state | connection: connection}} end @@ -69,6 +74,17 @@ defmodule DistributedPerformanceAnalyzer.Domain.UseCase.User.UserUseCase do %{request | body: replace_value.(request.body), headers: replace_value.(request.headers)} end + defp save_response(step_name, {:ok, %Response{} = response}), + do: ReportUseCase.save_response(step_name, get_response_id(response), response) + + defp save_response(step_name, {:error, reason}) do + Logger.warning("Request failed: #{inspect(reason)}") + ReportUseCase.save_error(step_name, get_response_id(reason), reason) + end + + defp get_response_id(%Response{timestamp: timestamp}), do: "#{timestamp}-#{inspect(self())}" + defp get_response_id(_), do: "#{DataTypeUtils.timestamp()}-#{inspect(self())}" + @impl true def terminate(reason, state) do Logger.debug("Terminating user due to #{inspect(reason)}") diff --git a/lib/infrastructure/driven_adapters/http/http_client.ex b/lib/infrastructure/driven_adapters/http/http_client.ex index 2aa75f9..0ccb868 100644 --- a/lib/infrastructure/driven_adapters/http/http_client.ex +++ b/lib/infrastructure/driven_adapters/http/http_client.ex @@ -107,8 +107,7 @@ defmodule DistributedPerformanceAnalyzer.Infrastructure.Adapters.Http.HttpClient }) end - # TODO: Send error reason - defp fail_response(res), do: Logger.warning("Request error: #{res}") + defp fail_response(reason), do: {:error, reason} defp mint_opts(scheme, ssl_validation, timeout) do # TODO: enable cacerts https://hexdocs.pm/mint/Mint.HTTP.html#module-ssl-certificates diff --git a/lib/infrastructure/driven_adapters/mnesia/mnesia.ex b/lib/infrastructure/driven_adapters/mnesia/mnesia.ex index d4966bb..05551b7 100644 --- a/lib/infrastructure/driven_adapters/mnesia/mnesia.ex +++ b/lib/infrastructure/driven_adapters/mnesia/mnesia.ex @@ -1,137 +1,173 @@ -defmodule MnesiaAdapter do - @moduledoc """ - Provides a high-level interface for interacting with the Mnesia database. - - This module encapsulates common Mnesia operations such as starting the database, - creating tables, writing records, and reading data. It also includes logging - for better observability and wraps asynchronous operations in Tasks. - """ - - require Logger - alias :mnesia, as: Mnesia - - @type table_name :: atom() - @type table_attributes :: [{atom(), atom()}] - @type record :: tuple() - @type result :: :ok | {:error, term()} - - @doc """ - Starts the Mnesia database. - - This function attempts to start Mnesia and logs the result. If Mnesia - is already running, it returns :ok. - - Returns: - * `:ok` if Mnesia started successfully or was already running - * `{:error, reason}` if there was an error starting Mnesia - """ - @spec start() :: result() - def start do - case Mnesia.start() do +defmodule DistributedPerformanceAnalyzer.Infrastructure.Adapters.Mnesia do + @moduledoc """ + Provides a high-level interface for interacting with the Mnesia database. + + This module encapsulates common Mnesia operations such as starting the database, + creating tables, writing records, and reading data. It also includes logging + for better observability and wraps asynchronous operations in Tasks. + """ + + alias DistributedPerformanceAnalyzer.Domain.Behaviours.Reports.Repository + @behaviour Repository + + require Logger + alias :mnesia, as: Mnesia + + @type table_name :: atom() + @type table_attributes :: [{atom(), atom()}] + @type record :: tuple() + @type result :: :ok | {:error, term()} + + @impl Repository + def start() do + stop_mnesia() + start_mnesia() + end + + @impl Repository + def create_table(table_name, fields), do: create_mnesia_table(table_name, fields) + + @impl Repository + def save_item(table_name, index, item), do: write({table_name, index, item}) + + # TODO: read hole table + @impl Repository + def get_items(table_name), do: read(table_name) + + @impl Repository + def get_item(table_name, index), do: read({table_name, index}) + + @doc """ + Starts the Mnesia database. + + This function attempts to start Mnesia and logs the result. If Mnesia + is already running, it returns :ok. + + Returns: + * `:ok` if Mnesia started successfully or was already running + * `{:error, reason}` if there was an error starting Mnesia + """ + @spec start_mnesia() :: result() + def start_mnesia do + case Mnesia.start() do + :ok -> + Logger.debug("#{__MODULE__}: Mnesia started successfully") + :ok + + {:error, {:already_started, _node}} -> + Logger.warning("#{__MODULE__}: Mnesia already started") + :ok + + {:error, reason} -> + Logger.error("#{__MODULE__}: Error starting Mnesia: #{inspect(reason)}") + {:error, reason} + end + end + + @doc """ + Stops the Mnesia database. + + This function stops Mnesia and deletes the schema for the current node. + + Returns: + * `:ok` if Mnesia was stopped successfully + * `{:error, reason}` if there was an error stopping Mnesia + """ + @spec stop_mnesia() :: result() + def stop_mnesia do + Mnesia.stop() + end + + @doc """ + Creates a new Mnesia table. + + This function creates a new table in Mnesia with the given name and attributes. + It logs the result of the operation. + + Parameters: + * `table`: The name of the table to create (atom) + * `attributes`: A list of attribute definitions for the table + + Returns: + * `:ok` if the table was created successfully + * `{:error, reason}` if there was an error creating the table + """ + @spec create_table(table_name(), table_attributes()) :: result() + def create_mnesia_table(table, attributes) when is_atom(table) and is_list(attributes) do + Logger.debug("#{__MODULE__}: Creating Mnesia table: #{table}") + + case Mnesia.create_table(table, attributes: attributes) do + {:atomic, :ok} -> + Logger.debug("#{__MODULE__}: Mnesia table #{table} created successfully") + {:ok, table} + + {:aborted, reason} -> + Logger.debug("#{__MODULE__}: Error creating Mnesia table #{table}: #{inspect(reason)}") + {:error, reason} + end + end + + @doc """ + Writes a record to Mnesia asynchronously. + + This function writes the given record to Mnesia using a dirty write operation. + The operation is wrapped in a Task for asynchronous execution. + + Parameters: + * `record`: The record to write to Mnesia (tuple) + + Returns: + * A `Task` that will resolve to `:ok` if the writing was successful, + or `{:error, reason}` if there was an error + """ + @spec write(record()) :: Task.t() + def write(record) do + Task.async(fn -> + Logger.debug("#{__MODULE__}: Writing data to Mnesia: #{inspect(record)}") + + case Mnesia.dirty_write(record) do :ok -> - Logger.info("#{__MODULE__}: Mnesia started successfully") - :ok - - {:error, {:already_started, _node}} -> - Logger.info("#{__MODULE__}: Mnesia already started") + Logger.debug("#{__MODULE__}: Data written to Mnesia successfully") :ok - + {:error, reason} -> - Logger.error("#{__MODULE__}: Error starting Mnesia: #{inspect(reason)}") + Logger.debug("#{__MODULE__}: Error writing data to Mnesia: #{inspect(reason)}") {:error, reason} end - end - - @doc """ - Creates a new Mnesia table. - - This function creates a new table in Mnesia with the given name and attributes. - It logs the result of the operation. - - Parameters: - * `table`: The name of the table to create (atom) - * `attributes`: A list of attribute definitions for the table - - Returns: - * `:ok` if the table was created successfully - * `{:error, reason}` if there was an error creating the table - """ - @spec create_table(table_name(), table_attributes()) :: result() - def create_table(table, attributes) when is_atom(table) and is_list(attributes) do - Logger.info("#{__MODULE__}: Creating Mnesia table: #{table}") - - case Mnesia.create_table(table, attributes: attributes) do - {:atomic, :ok} -> - Logger.info("#{__MODULE__}: Mnesia table #{table} created successfully") - :ok - - {:aborted, reason} -> - Logger.error("#{__MODULE__}: Error creating Mnesia table #{table}: #{inspect(reason)}") + end) + end + + @doc """ + Reads a record from Mnesia asynchronously. + + This function reads a record from Mnesia using a dirty read operation. + The operation is wrapped in a Task for asynchronous execution. + + Parameters: + * `key`: The key to read from Mnesia (can be a tuple for composite keys) + + Returns: + * A `Task` that will resolve to `{:ok, [record]}` if the read was successful, + `{:ok, []}` if no record was found, or `{:error, reason}` if there was an error + """ + @spec read(record()) :: Task.t() + def read(key) do + Task.async(fn -> + Logger.debug("#{__MODULE__}: Reading data from Mnesia: #{inspect(key)}") + + case Mnesia.dirty_read(key) do + [] -> + Logger.debug("#{__MODULE__}: No data found in Mnesia") + {:ok, []} + + result when is_list(result) -> + Logger.debug("#{__MODULE__}: Data read from Mnesia successfully: #{inspect(result)}") + {:ok, result} + + {:error, reason} -> + Logger.debug("#{__MODULE__}: Error reading data from Mnesia: #{inspect(reason)}") {:error, reason} end - end - - @doc """ - Writes a record to Mnesia asynchronously. - - This function writes the given record to Mnesia using a dirty write operation. - The operation is wrapped in a Task for asynchronous execution. - - Parameters: - * `record`: The record to write to Mnesia (tuple) - - Returns: - * A `Task` that will resolve to `:ok` if the write was successful, - or `{:error, reason}` if there was an error - """ - @spec write(record()) :: Task.t() - def write(record) do - Task.async(fn -> - Logger.info("#{__MODULE__}: Writing data to Mnesia: #{inspect(record)}") - - case Mnesia.dirty_write(record) do - :ok -> - Logger.info("#{__MODULE__}: Data written to Mnesia successfully") - :ok - - {:error, reason} -> - Logger.error("#{__MODULE__}: Error writing data to Mnesia: #{inspect(reason)}") - {:error, reason} - end - end) - end - - @doc """ - Reads a record from Mnesia asynchronously. - - This function reads a record from Mnesia using a dirty read operation. - The operation is wrapped in a Task for asynchronous execution. - - Parameters: - * `key`: The key to read from Mnesia (can be a tuple for composite keys) - - Returns: - * A `Task` that will resolve to `{:ok, [record]}` if the read was successful, - `{:ok, []}` if no record was found, or `{:error, reason}` if there was an error - """ - @spec read(record()) :: Task.t() - def read(key) do - Task.async(fn -> - Logger.info("#{__MODULE__}: Reading data from Mnesia: #{inspect(key)}") - - case Mnesia.dirty_read(key) do - [] -> - Logger.info("#{__MODULE__}: No data found in Mnesia") - {:ok, []} - - result when is_list(result) -> - Logger.info("#{__MODULE__}: Data read from Mnesia successfully: #{inspect(result)}") - {:ok, result} - - {:error, reason} -> - Logger.error("#{__MODULE__}: Error reading data from Mnesia: #{inspect(reason)}") - {:error, reason} - end - end) - end - end \ No newline at end of file + end) + end +end