Skip to content

Commit

Permalink
Merge branch 'feature/gh-8-cleanup' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
evadne committed Oct 3, 2020
2 parents 63c6ffe + 0c915b8 commit 37e129c
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 17 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ The format is based on [Keep a Changelog][1], and this project adheres to [Seman
- Added support for custom Sources.
- Any module which implements `Packmatic.Source` can be used as a Source.

- Added support for Encoder Events.
- Added the `on_event` option to the Encoder which can be used to receive events.
- See documentation for `Packmatic.Event`.

### Changed

- Revised `Packmatic.Source`.
Expand All @@ -22,6 +26,12 @@ The format is based on [Keep a Changelog][1], and this project adheres to [Seman
- Revised `Packmatic.Manifest.Entry`.
- Moved validation of Initialisation Arguments to Sources.

- Revised `Packmatic.Source.File`.
- Added explicit cleanup logic.

- Revised `Packmatic.Source.URL`.
- Added explicit cleanup logic.

### Fixed

- Revised `Packmatic.Encoder`.
Expand Down
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The generated archive uses Zip64, and works with individual files that are large
- [Installation](#installation)
- [Usage](#usage)
- [Source Types](#source-types)
- [Events](#events)
- [Notes](#notes)

* * *
Expand Down Expand Up @@ -165,6 +166,38 @@ If you have a different use case, for example if you need to pull data from a FT

See `Packmatic.Source` for more information.

## Events

The Encoder can be configured to emit events in order to enable feedback elsewhere in your application, for example:

```elixir
entries = [
[source: {:file, "/tmp/hello.pdf"}, path: "hello.pdf"],
[source: {:file, "/tmp/world.pdf"}, path: "world.pdf", timestamp: DateTime.utc_now()],
[source: {:url, "https://example.com/foo.pdf"}, path: "foo/bar.pdf"]
]

entries_count = length(entries)
entries_completed_agent = Agent.start(fn -> 0 end)

handler_fun = fn event ->
case event do
%Packmatic.Event.EntryCompleted{} ->
count = Agent.get_and_update(entries_completed_agent, & &1 + 1)
IO.puts "#{count} of #{entries_count} encoded"
%Packmatic.Event.EntryCompleted{} ->
:ok = Agent.stop(entries_completed_agent)
:ok
_ ->
:ok
end
end

stream = Packmatic.build_stream(entries, on_event: handler_fun)
```

See documentation for `Packmatic.Event` for a complete list of Event types.

## Notes

1. As with any user-generated content, you should exercise caution when building the Manifest, and ensure that only content that the User is entitled to retrieve is included.
Expand Down Expand Up @@ -233,7 +266,17 @@ During design and prototype development of this library, the Author has drawn in
- [ctrabant/fdzipstream](https://github.com/CTrabant/fdzipstream)
- [dgvncsz0f/zipflow](https://github.com/dgvncsz0f/zipflow)

The Author wishes to thank the following individuals:

- [Alvise Susmel][alvises] for proposing and testing [Encoder Events][gh-3]
- [Christoph Geschwind][1st8] for highlighting [the need for explicit cleanup logic][gh-8]

## Reference

- https://users.cs.jmu.edu/buchhofp/forensics/formats/pkzip.html
- https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT

[1st8]: https://github.com/1st8
[alvises]: https://github.com/alvises
[gh-3]: https://github.com/evadne/packmatic/issues/3
[gh-8]: https://github.com/evadne/packmatic/pull/8
19 changes: 15 additions & 4 deletions lib/packmatic/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,29 @@ defmodule Packmatic.Event do
returned EOF).
6. `Packmatic.Event.StreamEnded`: Sent when the Stream has completed journaling.
Please note that more event types may be added in the future.
"""

@typedoc """
Represents an Event that will be passed to the handler.
"""
@type event :: struct()
@type event ::
__MODULE__.StreamStarted.t()
| __MODULE__.StreamEnded.t()
| __MODULE__.EntryStarted.t()
| __MODULE__.EntryUpdated.t()
| __MODULE__.EntryFailed.t()
| __MODULE__.EntryCompleted.t()

@typedoc """
Represents the callback function passed to the Encoder.
The callback function takes 1 argument, which is the actual Event that is raised by Packmatic.
The Event, `t:event/0`, is a pre-defined structs under `Packmatic.Event`. Please keep in mind
that more events may be added in the future, so you should always include a fallback clause
in your handler function.
The Event, `t:event/0`, is one of the pre-defined structs under the `Packmatic.Event` namespace.
Please keep in mind that more events may be added in the future, so you should always include a
fallback clause in your handler function.
Handlers are called from the same process that the Stream is being iterated from, which allows
you to control what happens to it. Should you not wish to interrupt the Encoder, return `:ok`.
Expand Down Expand Up @@ -122,6 +131,7 @@ defmodule Packmatic.Event do
entry: Manifest.Entry.t()
}

@enforce_keys ~w(stream_id entry)a
defstruct stream_id: nil, entry: nil
end

Expand All @@ -138,6 +148,7 @@ defmodule Packmatic.Event do
reason: term()
}

@enforce_keys ~w(stream_id entry reason)a
defstruct stream_id: nil, entry: nil, reason: nil
end
end
9 changes: 9 additions & 0 deletions lib/packmatic/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ defmodule Packmatic.Source do
iex(2)> {:ok, state} = Packmatic.Source.build({:file, file_path})
iex(3)> state.__struct__
Packmatic.Source.File
### Notes
When implementing a custom Source which uses an external data provider (for example reading from
a file), remember to perform any cleanup required within the `read/1` callback if the Source is
not expected to return any further data, for example if the file has been read completely or if
there has been an error.
"""

@typedoc """
Expand Down Expand Up @@ -103,6 +110,7 @@ defmodule Packmatic.Source do
Called by `Packmatic.Manifest.Entry`.
"""
def validate(entry)

def validate({name, init_arg}) do
with {:module, module} <- resolve(name) do
module.validate(init_arg)
Expand All @@ -116,6 +124,7 @@ defmodule Packmatic.Source do
Called by `Packmatic.Encoder`.
"""
def build(entry)

def build({name, init_arg}) do
with {:module, module} <- resolve(name) do
module.init(init_arg)
Expand Down
7 changes: 6 additions & 1 deletion lib/packmatic/source/file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ defmodule Packmatic.Source.File do

@impl Source
def read(source) do
IO.binread(source.device, get_chunk_size())
with :eof <- IO.binread(source.device, get_chunk_size()) do
:ok = File.close(source.device)
:eof
else
data -> data
end
end

@otp_app Mix.Project.config()[:app]
Expand Down
32 changes: 20 additions & 12 deletions lib/packmatic/source/url.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ defmodule Packmatic.Source.URL do
@type init_result :: {:ok, t}
@spec init(init_arg) :: init_result

@type t :: %__MODULE__{url: String.t(), id: term()}
@enforce_keys ~w(url id)a
defstruct url: nil, id: nil
@type t :: %__MODULE__{url: String.t(), stream_id: term()}
@enforce_keys ~w(url stream_id)a
defstruct url: nil, stream_id: nil

@impl Source
def validate(url) when is_binary(url) and url != "", do: :ok
Expand All @@ -23,26 +23,34 @@ defmodule Packmatic.Source.URL do
def init(url) do
with %{host: host} <- URI.parse(url),
options = httpotion_options(host),
%HTTPotion.AsyncResponse{id: id} <- HTTPotion.get(url, options) do
{:ok, %__MODULE__{url: url, id: id}}
%HTTPotion.AsyncResponse{id: stream_id} <- HTTPotion.get(url, options) do
{:ok, %__MODULE__{url: url, stream_id: stream_id}}
else
{:error, reason} -> {:error, reason}
%HTTPotion.ErrorResponse{message: message} -> {:error, message}
end
end

@impl Source
def read(%__MODULE__{id: id}) do
with :ok <- :ibrowse.stream_next(id) do
def read(%__MODULE__{stream_id: stream_id}) do
with data when is_binary(data) <- read_receive_next(stream_id) do
data
else
value ->
_ = :ibrowse.stream_close(stream_id)
value
end
end

defp read_receive_next(stream_id) do
with :ok <- :ibrowse.stream_next(stream_id) do
receive do
%HTTPotion.AsyncHeaders{status_code: 200} -> <<>>
%HTTPotion.AsyncHeaders{status_code: status} -> {:error, {:unsupported_status, status}}
%HTTPotion.AsyncChunk{chunk: chunk, id: ^id} -> chunk
%HTTPotion.AsyncEnd{id: ^id} -> :eof
%HTTPotion.AsyncTimeout{id: ^id} -> {:error, :timeout}
%HTTPotion.AsyncChunk{chunk: chunk, id: ^stream_id} -> chunk
%HTTPotion.AsyncEnd{id: ^stream_id} -> :eof
%HTTPotion.AsyncTimeout{id: ^stream_id} -> {:error, :timeout}
end
else
{:error, reason} -> {:error, reason}
end
end

Expand Down

0 comments on commit 37e129c

Please sign in to comment.