Skip to content

Commit

Permalink
Persistence & Email ingest (#1)
Browse files Browse the repository at this point in the history
Adds [persistence](https://github.com/hlte-net/persistence/) and email ingest support.

Bumps app version to `0.2.0`. API version remains the same as the surface the extension uses has not changed (only `POST /sns` has been added).
  • Loading branch information
rpj authored Feb 11, 2022
1 parent f183219 commit 9d2c6e1
Show file tree
Hide file tree
Showing 13 changed files with 334 additions and 40 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,23 @@

Work-in-progress rewrite of the current `go` daemon in elixir, while also removing much of the cruft from an older, now-dead usage model.

Use [`tools/keygen`](https://github.com/hlte-net/tools/blob/main/keygen) to create an appropriate key file and get the hexadecimal representation needed for the extension's settings.
Use [`tools/keygen`](https://github.com/hlte-net/tools/blob/main/keygen) to create an appropriate key file and get the hexadecimal representation needed for the extension's settings.

## Building & Running

You'll need [Elixir 1.13]() or later.

### Runtime configuration

Via the following environment variables:

* `HLTE_REDIS_URL`: Redis server connection URL
* `HLTE_SNS_WHITELIST_JSON`: SNS ingest email whitelist as JSON
* `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`: AWS credentials for S3 email lookup

In this directory, run:

```shell
$ mix deps.gets # sources all the required dependencies
$ mix run --no-halt # runs the application without exiting immediately
```
10 changes: 8 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ import Config
config :hlte,
api_version: "20220126",
header: "x-hlte",
port: 31337,
port: 56555,
db_path: "./data.sqlite3",
key_path: "./.keyfile"
key_path: "./.keyfile",
redis_url: nil,
sns_whitelist: [],
delete_sns_s3_post_proc: true

import_config("#{config_env()}.config.exs")

config :ex_aws,
region: "us-east-1"

config :logger,
utc_log: true,
truncate: :infinity
Expand Down
3 changes: 0 additions & 3 deletions config/prod.config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,3 @@ config :logger,
compile_time_purge_matching: [
[level_lower_than: :info]
]

config :hlte,
port: 56555
5 changes: 5 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import Config

config :hlte,
redis_url: System.fetch_env!("HLTE_REDIS_URL"),
sns_whitelist: Jason.decode!(System.fetch_env!("HLTE_SNS_WHITELIST_JSON"))
5 changes: 4 additions & 1 deletion lib/hlte/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ defmodule HLTE.Application do
def start(_type, args) do
case load_key(args[:key_path]) do
{:ok, key} ->
:ok = :persistent_term.put(:key, key)
keyHash = :crypto.hash(:sha256, key) |> :binary.encode_hex() |> :string.lowercase()
:ok = :persistent_term.put(:key, key)
:ok = :persistent_term.put(:key_hash, keyHash)

Logger.notice("Loaded #{byte_size(key)}-byte key with SHA256 checksum of #{keyHash}")

start_link(args)
Expand Down Expand Up @@ -46,6 +48,7 @@ defmodule HLTE.Application do
def start_link(args) do
children = [
{Task.Supervisor, name: HLTE.AsyncSupervisor},
{HLTE.EmailProcessor, name: EmailProcessor},
{HLTE.HTTP, [args[:port], args[:header]]},
{HLTE.DB, [args[:db_path]]}
]
Expand Down
72 changes: 47 additions & 25 deletions lib/hlte/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,33 @@ defmodule HLTE.DB do
end

def persist(%{"uri" => uri, "secondaryURI" => suri, "data" => data, "annotation" => ann}, hmac) do
rxTime = System.os_time(:nanosecond)
persist_async(
uri,
suri,
data,
ann,
hmac
)
end

Task.Supervisor.async_nolink(HLTE.AsyncSupervisor, fn ->
persist_async(
uri,
suri,
data,
ann,
hmac,
rxTime
)
end)
def persist(%{"uri" => uri, "secondaryURI" => suri, "annotation" => ann}, hmac) do
persist_async(
uri,
suri,
nil,
ann,
hmac
)
end

rxTime
def persist(%{"uri" => uri, "data" => data}, hmac) do
persist_async(
uri,
nil,
data,
nil,
hmac
)
end

def search(query, limit, newestFirst) do
Expand All @@ -83,22 +96,31 @@ defmodule HLTE.DB do
suri,
data,
ann,
hmac,
rxTime
hmac
) do
{:ok, conn} = get_conn(:persistent_term.get(:db_path))
rxTime = System.os_time(:nanosecond)

{:ok, _, _, _} =
Basic.exec(conn, "insert into hlte values(?, ?, ?, ?, ?, ?)", [
hmac,
rxTime,
uri,
suri,
data,
ann
])
entryID =
Task.await(
Task.Supervisor.async(HLTE.AsyncSupervisor, fn ->
{:ok, conn} = get_conn(:persistent_term.get(:db_path))

{:ok, _, _, _} =
Basic.exec(conn, "insert into hlte values(?, ?, ?, ?, ?, ?)", [
hmac,
rxTime,
uri,
suri,
data,
ann
])

Basic.close(conn)
HLTE.Redis.post_persistence_work(rxTime, hmac, %{"uri" => uri, "secondaryURI" => suri})
end)
)

Basic.close(conn)
{:ok, rxTime, entryID}
end

defp search_async(query, limit, sortDir) do
Expand Down
102 changes: 102 additions & 0 deletions lib/hlte/email_proc.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
defmodule HLTE.EmailProcessor do
require Logger
use GenServer

def start_link(opts) do
Logger.notice(
"Email processor using whitelist: #{Enum.join(Application.fetch_env!(:hlte, :sns_whitelist), ", ")}"
)

GenServer.start_link(__MODULE__, :ok, opts)
end

def from_bucket(bucket, key, from, subject) do
GenServer.cast(EmailProcessor, {:process_from_bucket, bucket, key, from, subject})
end

@impl true
def init(:ok) do
{:ok, %{}}
end

@impl true
def handle_cast({:process_from_bucket, bucket, key, from, subject}, state) do
case Enum.find(Application.fetch_env!(:hlte, :sns_whitelist), fn whiteListedAddress ->
from === whiteListedAddress
end) do
^from ->
case URI.parse(subject) |> validate_parsed_subject_uri() do
:error -> Logger.error("Malformed URI as subject!")
host -> stream_and_parse(bucket, key, subject, host)
end

nil ->
Logger.error("Message from non-whitelisted address <#{from}>!")
end

if Application.fetch_env!(:hlte, :delete_sns_s3_post_proc) === true do
{:ok, _content} = ExAws.S3.delete_object(bucket, key) |> ExAws.request()
end

{:noreply, [state]}
end

def validate_parsed_subject_uri(%URI{:host => host, :scheme => s})
when host !== nil and s !== nil,
do: host

def validate_parsed_subject_uri(_bad_uri), do: :error

def stream_and_parse(bucket, key, uri, host) do
{content_type, parsed_body, part_type} =
ExAws.S3.download_file(bucket, key, :memory)
|> ExAws.stream!()
|> Stream.chunk_while(
"",
fn cur, acc ->
{:cont, cur, acc <> cur}
end,
fn
"" -> {:cont, ""}
acc -> {:cont, acc, ""}
end
)
|> Enum.to_list()
|> Enum.at(0)
|> Mail.Parsers.RFC2822.parse()
|> extract_body()

Logger.info(
"Parsed #{String.length(parsed_body)} bytes of '#{content_type}' from a #{part_type} message"
)

{:ok, rxTime, entryID} =
HLTE.DB.persist(
%{
"uri" => uri,
"data" => parsed_body
},
HLTE.HTTP.calculate_body_hmac(parsed_body)
)

Logger.info("Persisted hilite for #{host} at #{floor(rxTime / 1.0e9)}, work ID #{entryID}")
end

def extract_body(%Mail.Message{:multipart => true, :parts => parts}) do
target_part =
Enum.find(parts, fn p ->
Map.get(p.headers, "content-type") |> Enum.at(0) === "text/plain"
end) ||
Enum.at(parts, 0)

{Map.get(target_part.headers, "content-type") |> Enum.at(0), target_part.body, "multipart"}
end

def extract_body(%Mail.Message{
:multipart => false,
:body => body,
:headers => %{"content-type" => content_type}
}) do
{content_type |> Enum.at(0), body, "mono"}
end
end
1 change: 1 addition & 0 deletions lib/hlte/http/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule HLTE.HTTP do
[
# POST
{"/", HLTE.HTTP.Route.PostHilite, [headerName]},
{"/sns", HLTE.HTTP.Route.SNSIngest, []},

# GET
{"/version", HLTE.HTTP.Route.Version, []},
Expand Down
4 changes: 2 additions & 2 deletions lib/hlte/http/routes/post_hilite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ defmodule HLTE.HTTP.Route.PostHilite do

def persist(bodyText, bodyHmac, calcHmac) when bodyHmac === calcHmac do
dec = Jason.decode!(bodyText)
rxTime = HLTE.DB.persist(dec, bodyHmac)
{:ok, rxTime, entryID} = HLTE.DB.persist(dec, bodyHmac)

Logger.info(
"Persisted hilite for #{URI.parse(Map.get(dec, "uri")).host} at #{floor(rxTime / 1.0e9)}"
"Persisted hilite for #{URI.parse(Map.get(dec, "uri")).host} at #{floor(rxTime / 1.0e9)}, work ID #{entryID}"
)

true
Expand Down
62 changes: 62 additions & 0 deletions lib/hlte/http/routes/sns_ingest.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
defmodule HLTE.HTTP.Route.SNSIngest do
require Logger

def init(req, state) when req.method === "POST" do
{:ok, bodyText, req2} = :cowboy_req.read_body(req)

case Jason.decode(bodyText) |> ingest_post() do
:error ->
Logger.warn("Raw request:\n#{inspect(req)}")
Logger.warn("Raw body:\n#{bodyText}")

:ok ->
:ok
end

{:ok, :cowboy_req.set_resp_header("Access-Control-Allow-Origin", "*", req2), state}
end

def init(req, state) do
{:ok, :cowboy_req.reply(405, req), state}
end

defp ingest_post(
{:ok,
%{
"notificationType" => "Received",
"receipt" => %{
"action" => %{
"type" => "S3",
"bucketName" => bucket,
"objectKey" => objectKey
}
},
"mail" => %{
"source" => source,
"commonHeaders" => %{
"subject" => subject
}
}
}}
) do
Logger.info("Processing SNS from <#{source}>, subject \"#{subject}\"")
HLTE.EmailProcessor.from_bucket(bucket, objectKey, source, subject)
:ok
end

defp ingest_post({:ok, malformed}) do
Logger.error("Malformed POST object! #{inspect(malformed)}")
:error
end

defp ingest_post({:error, %Jason.DecodeError{:data => d, :position => p, :token => t}}) do
Logger.error("Malformed POST JSON!")
Logger.error(" at position #{p}, token '#{t}' in data: #{d}")
:error
end

defp ingest_post({:error, unkErr}) do
Logger.error("Malformed POST! Unknown error: #{inspect(unkErr)}")
:error
end
end
Loading

0 comments on commit 9d2c6e1

Please sign in to comment.