Skip to content

Commit 00ef863

Browse files
thbarfchabouis
andauthored
Module d'import unitaire GTFS stops avec suppression + outils associés (#2851)
* Update MinIO template to work with homebrew-based install (I had troubles with Docker) * Save first MinIO-based sync of latest ResourceHistory assets So that I can work at scale locally. * Add helper to stream just the stops * Extract code to module and fix cache bug * Save sequential processing (without delete of prior copies) * Automatically delete gtfs tables when a given DataImport is deleted * Add quick logs * Save very WIP * Save WIP before switching to another branch (some refactoring is needed) * Add note * Make get_file_stream mox'able * Success while implementing one test around GtfsToDB import part * Make sure to delete previous data_import for same resource_history_id * Add more tests * Make tests more reliable (order) * Add bits of documentation * Delete previous DataImport for the same resource * Increase coverage * Make credo happy * Mix format * Remove TODO * Mix format & simplification * Mix format * Make credo happy & mix format * Refactor code to proper modules & rename accordingly * Remove now legacy code (replaced by ImportStops) * Rename for clarity * Update gtfs_import_stops_test.exs * Update apps/transport/test/transport/jobs/gtfs_import_stops_test.exs Co-authored-by: Francis Chabouis <[email protected]> * Improve minio template * Update gtfs_import_stops.ex * DRY code (careful - code isn't under test) * Update gtfs_import_stops.ex * Avoid join * Format code Co-authored-by: Francis Chabouis <[email protected]>
1 parent 7bfbb6e commit 00ef863

File tree

9 files changed

+233
-10
lines changed

9 files changed

+233
-10
lines changed

.miniorc.template

+9-5
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@
44
export MINIO_ROOT_USER=test-local
55
export MINIO_ROOT_PASSWORD=apoi8761876bbazeriouy
66

7-
# 3. Follow instructions at
7+
# 3. It is preferable to use a non-Docker version when available,
8+
# such as https://min.io/docs/minio/macos/index.html, in which case you can do:
9+
# minio server --console-address :9090 ~/data
10+
11+
# 4. For Docker support (a bit more involved), follow instructions at
812
# https://docs.min.io/minio/baremetal/quickstart/container.html#quickstart-container
913
# which means at time of writing:
1014
# mkdir -p ~/minio/data
1115
# NOTE: -name removed for simplicity, and "quayio" removed since the container appeared outdated
1216
# docker run -p 9000:9000 -p 9090:9090 -v ~/minio/data:/data -e "MINIO_ROOT_USER=$MINIO_ROOT_USER" -e "MINIO_ROOT_PASSWORD=$MINIO_ROOT_PASSWORD" minio/minio server /data --console-address ":9090"
1317

14-
# 4. setup `dev.secret.exs` from `dev.secret.template.exs`
15-
# 5. source `.miniorc`
16-
# 6. go to console at http://127.0.0.1:9090 and create bucket `transport-data-gouv-fr-resource-history-dev`
17-
# 7. start `mix phx.server`
18+
# 5. setup `dev.secret.exs` from `dev.secret.template.exs`
19+
# 6. source `.miniorc`
20+
# 7. go to console at http://127.0.0.1:9090 and create bucket `transport-data-gouv-fr-resource-history-dev`
21+
# 8. start `mix phx.server`

apps/transport/lib/S3/unzip.ex

+13-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,16 @@
1+
defmodule Transport.Unzip.S3.Behaviour do
2+
@moduledoc """
3+
Behaviour to allow partial Unzip testing.
4+
"""
5+
6+
@callback get_file_stream(binary(), binary(), binary()) :: Enumerable.t()
7+
end
8+
19
defmodule Transport.Unzip.S3 do
10+
def impl, do: Application.get_env(:transport, :unzip_s3_impl, __MODULE__)
11+
12+
@behaviour Transport.Unzip.S3.Behaviour
13+
214
@moduledoc """
315
Read a remote zip file stored on a S3 bucket, as explained here
416
https://hexdocs.pm/unzip/readme.html
@@ -19,10 +31,7 @@ defmodule Transport.Unzip.S3 do
1931
)
2032

2133
def get_file_stream(file_name, zip_name, bucket_name) do
22-
aws_s3_config = aws_s3_config()
23-
file = new(zip_name, bucket_name, aws_s3_config)
24-
{:ok, unzip} = Unzip.new(file)
25-
34+
{:ok, unzip} = get_unzip(zip_name, bucket_name)
2635
Unzip.file_stream!(unzip, file_name)
2736
end
2837

apps/transport/lib/jobs/gtfs_to_db.ex

+7-1
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,16 @@ defmodule Transport.Jobs.GtfsToDB do
1313
fill_trips_from_resource_history(resource_history_id, data_import_id)
1414
end
1515

16+
def import_gtfs_from_resource_history(resource_history_id, :stops) do
17+
%{id: data_import_id} = %DB.DataImport{resource_history_id: resource_history_id} |> DB.Repo.insert!()
18+
fill_stops_from_resource_history(resource_history_id, data_import_id)
19+
data_import_id
20+
end
21+
1622
def file_stream(resource_history_id, gtfs_file_name) do
1723
%{payload: %{"filename" => filename}} = DB.ResourceHistory |> DB.Repo.get!(resource_history_id)
1824
bucket_name = Transport.S3.bucket_name(:history)
19-
Transport.Unzip.S3.get_file_stream(gtfs_file_name, filename, bucket_name)
25+
Transport.Unzip.S3.impl().get_file_stream(gtfs_file_name, filename, bucket_name)
2026
end
2127

2228
def fill_stops_from_resource_history(resource_history_id, data_import_id) do
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
defmodule Transport.GTFSImportStops do
2+
@moduledoc """
3+
A module to import stops in a single `DB.DataImport` for a given resource, based on `resource_history_id`.
4+
"""
5+
6+
import Ecto.Query
7+
8+
@doc """
9+
For the given `resource_history_id`, imports stops in a new `DB.DataImport`, then delete all related
10+
pre-existing `DB.DataImport` (either with the same `resource_history_id`, or for the same resource).
11+
"""
12+
def import_stops_and_remove_previous(resource_history_id) do
13+
# Transaction timeout is at 15s currently, we may need to customize this here later
14+
{:ok, data_import_id} =
15+
DB.Repo.transaction(fn ->
16+
data_import_id = Transport.Jobs.GtfsToDB.import_gtfs_from_resource_history(resource_history_id, :stops)
17+
18+
resource_id = DB.Repo.get_by(DB.ResourceHistory, id: resource_history_id).resource_id
19+
20+
query =
21+
from(rh in DB.ResourceHistory,
22+
where: rh.resource_id == ^resource_id and rh.id != ^resource_history_id,
23+
select: rh.id
24+
)
25+
26+
resource_history_ids = query |> DB.Repo.all()
27+
28+
# NOTE: we may need to add an index on di.resource_history_id
29+
DB.DataImport
30+
# delete all previous data imports for the same resource history id
31+
|> where([di], di.resource_history_id == ^resource_history_id and di.id != ^data_import_id)
32+
# delete all previous data imports for the same resource but different resource history ids
33+
|> or_where([di], di.resource_history_id in ^resource_history_ids)
34+
|> DB.Repo.delete_all()
35+
36+
data_import_id
37+
end)
38+
39+
data_import_id
40+
end
41+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
defmodule DB.Repo.Migrations.DataImportDelete do
2+
use Ecto.Migration
3+
4+
@gtfs_tables [
5+
"gtfs_stops",
6+
"gtfs_stop_times",
7+
"gtfs_trips",
8+
"gtfs_calendar",
9+
"gtfs_calendar_dates"
10+
]
11+
12+
def up do
13+
@gtfs_tables
14+
|> Enum.each fn(tbl) ->
15+
constraint_name = "#{tbl}_data_import_id_fkey" |> String.to_atom()
16+
drop constraint(tbl, constraint_name)
17+
alter table(tbl) do
18+
modify :data_import_id,
19+
references(:data_import, on_delete: :delete_all,
20+
from: references(:data_import, on_delete: :nothing))
21+
end
22+
end
23+
end
24+
end

apps/transport/test/support/mocks.ex

+1
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ Mox.defmock(Transport.EmailSender.Mock, for: Transport.EmailSender)
1111
Mox.defmock(Hasher.Mock, for: Hasher.Wrapper)
1212
Mox.defmock(Transport.ValidatorsSelection.Mock, for: Transport.ValidatorsSelection)
1313
Mox.defmock(Transport.SIRIQueryGenerator.Mock, for: Transport.SIRIQueryGenerator.Behaviour)
14+
Mox.defmock(Transport.Unzip.S3.Mock, for: Transport.Unzip.S3.Behaviour)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
defmodule Transport.Test.Transport.Jobs.GTFSImportStopsTest do
2+
use ExUnit.Case, async: true
3+
use Oban.Testing, repo: DB.Repo
4+
import DB.Factory
5+
import Mox
6+
import Ecto.Query
7+
8+
setup :verify_on_exit!
9+
10+
setup do
11+
:ok = Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo)
12+
end
13+
14+
def data_import_ids do
15+
DB.Repo.all(from(di in DB.DataImport, select: di.id, order_by: [asc: di.id]))
16+
end
17+
18+
def setup_mox(zip_filename) do
19+
# NOTE: it will be possible to reuse common code from Transport.Unzip.S3 in there
20+
Transport.Unzip.S3.Mock
21+
|> expect(:get_file_stream, fn file_in_zip, zip_file, bucket ->
22+
# from payload
23+
assert zip_file == zip_filename
24+
# from config
25+
assert bucket == "transport-data-gouv-fr-resource-history-test"
26+
27+
# stub with a local file
28+
path = "#{__DIR__}/../../fixture/files/gtfs_import.zip"
29+
zip_file = Unzip.LocalFile.open(path)
30+
{:ok, unzip} = Unzip.new(zip_file)
31+
Unzip.file_stream!(unzip, file_in_zip)
32+
end)
33+
end
34+
35+
test "import stops" do
36+
%{id: dataset_id} = insert(:dataset, %{datagouv_id: "xxx", datagouv_title: "coucou"})
37+
%{id: resource_id} = insert(:resource, dataset_id: dataset_id)
38+
39+
%{id: resource_history_id} =
40+
insert(:resource_history, %{resource_id: resource_id, payload: %{"filename" => "some-file.zip"}})
41+
42+
setup_mox("some-file.zip")
43+
assert data_import_ids() == []
44+
first_data_import_id = Transport.GTFSImportStops.import_stops_and_remove_previous(resource_history_id)
45+
assert data_import_ids() == [first_data_import_id]
46+
47+
# subsequent import must remove the previous import for same resource_history_id
48+
setup_mox("some-file.zip")
49+
second_data_import_id = Transport.GTFSImportStops.import_stops_and_remove_previous(resource_history_id)
50+
assert data_import_ids() == [second_data_import_id]
51+
52+
# subsequent import for a new resource_history_id on same resource should also remove previous imports
53+
%{id: new_resource_history_id} =
54+
insert(:resource_history, %{resource_id: resource_id, payload: %{"filename" => "some-new-file.zip"}})
55+
56+
setup_mox("some-new-file.zip")
57+
third_data_import_id = Transport.GTFSImportStops.import_stops_and_remove_previous(new_resource_history_id)
58+
assert data_import_ids() == [third_data_import_id]
59+
60+
# other resources should not be impacted by import
61+
setup_mox("some-other-file.zip")
62+
%{id: other_dataset_id} = insert(:dataset, %{datagouv_id: "yyy"})
63+
%{id: other_resource_id} = insert(:resource, dataset_id: other_dataset_id)
64+
65+
%{id: other_resource_history_id} =
66+
insert(:resource_history, %{resource_id: other_resource_id, payload: %{"filename" => "some-other-file.zip"}})
67+
68+
other_data_import_id = Transport.GTFSImportStops.import_stops_and_remove_previous(other_resource_history_id)
69+
70+
assert data_import_ids() == [third_data_import_id, other_data_import_id]
71+
72+
%{id: new_resource_history_id} =
73+
insert(:resource_history, %{resource_id: resource_id, payload: %{"filename" => "some-new-file.zip"}})
74+
75+
setup_mox("some-new-file.zip")
76+
fourth_data_import_id = Transport.GTFSImportStops.import_stops_and_remove_previous(new_resource_history_id)
77+
assert data_import_ids() == [other_data_import_id, fourth_data_import_id]
78+
end
79+
end

config/test.exs

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ config :transport,
4141
validator_selection: Transport.ValidatorsSelection.Mock,
4242
data_visualization: Transport.DataVisualization.Mock,
4343
notifications_api_token: "secret",
44+
unzip_s3_impl: Transport.Unzip.S3.Mock,
4445
s3_buckets: %{
4546
history: "resource-history-test",
4647
on_demand_validation: "on-demand-validation-test",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# NOTE: temporary script to help work locally with large amounts of GTFS.
2+
require Logger
3+
4+
Logger.info("Starting...")
5+
6+
import Ecto.Query
7+
8+
# Useful notes to create the global import job later:
9+
# - we could look at "last up to date" to ensure we only have fresh data in database
10+
# - use gtfs_to_db once downloaded
11+
# - look at Transport.Jobs.BNLCToGeoData and Transport.Jobs.BaseGeoData.import_replace_data
12+
13+
defmodule Tooling do
14+
# for each active dataset, grab all resources with a resource history.
15+
def query() do
16+
DB.Dataset.base_query()
17+
|> DB.Resource.join_dataset_with_resource()
18+
|> DB.ResourceHistory.join_resource_with_latest_resource_history()
19+
|> where([resource: r], r.format == "GTFS")
20+
|> select([resource_history: rh], rh)
21+
end
22+
end
23+
24+
defmodule SyncS3LatestResourceHistory do
25+
def sync!(minio_folder) do
26+
Tooling.query()
27+
|> DB.Repo.all()
28+
# TODO: task sync max concurrency (this is slow)
29+
|> Enum.each(fn rh ->
30+
# create local minio bucket, if needed
31+
bucket_name = Transport.S3.bucket_name(:history)
32+
base_path = Path.join(minio_folder, bucket_name)
33+
unless File.exists?(base_path), do: File.mkdir_p!(base_path)
34+
35+
# simple check based on file presence on disk
36+
file_path = Path.join(base_path, rh.payload["filename"])
37+
38+
# TODO: replace by head request
39+
if File.exists?(file_path) do
40+
Logger.info("File already downloaded, skipping...")
41+
else
42+
Logger.info("Downloading file...")
43+
Logger.info(file_path)
44+
45+
%HTTPoison.Response{status_code: 200, body: body} =
46+
Transport.Shared.Wrapper.HTTPoison.impl().get!(rh.payload["permanent_url"])
47+
48+
Transport.S3.upload_to_s3!(:history, body, rh.payload["filename"])
49+
end
50+
end)
51+
end
52+
end
53+
54+
Logger.configure(level: :info)
55+
56+
# # create a local S3 copy (via MinIO) of each latest ResourceHistory file so
57+
# # that the production database dump can be used locally with a matching local S3 file
58+
SyncS3LatestResourceHistory.sync!(Path.expand("~/data"))

0 commit comments

Comments
 (0)