|
20 | 20 | end)
|
21 | 21 | |> Enum.filter(&(&1.unverified_format == "NeTEx"))
|
22 | 22 |
|
23 |
| -netex = |
24 |
| - df |
25 |
| - |> Task.async_stream( |
26 |
| - fn r -> |
27 |
| - url = r.url |
28 |
| - file = Path.join("cache-dir", "resource-#{r.id}.dat") |
29 |
| - status_file = file <> ".status.json" |
| 23 | +download_resource = fn r -> |
| 24 | + url = r.url |
| 25 | + file = Path.join("cache-dir", "resource-#{r.id}.dat") |
| 26 | + status_file = file <> ".status.json" |
30 | 27 |
|
31 |
| - unless File.exists?(status_file) do |
32 |
| - IO.puts("Saving #{url}") |
33 |
| - url = if String.contains?(url, "|"), do: URI.encode(url), else: url |
| 28 | + unless File.exists?(status_file) do |
| 29 | + IO.puts("Saving #{url}") |
| 30 | + url = if String.contains?(url, "|"), do: URI.encode(url), else: url |
34 | 31 |
|
35 |
| - %{status: status} = |
36 |
| - Transport.HTTPClient.get!(url, |
37 |
| - decode_body: false, |
38 |
| - compressed: false, |
39 |
| - into: File.stream!(file) |
40 |
| - ) |
| 32 | + %{status: status} = |
| 33 | + Transport.HTTPClient.get!(url, |
| 34 | + decode_body: false, |
| 35 | + compressed: false, |
| 36 | + into: File.stream!(file) |
| 37 | + ) |
41 | 38 |
|
42 |
| - File.write!(status_file, %{status: status} |> Jason.encode!()) |
43 |
| - end |
| 39 | + File.write!(status_file, %{status: status} |> Jason.encode!()) |
| 40 | + end |
44 | 41 |
|
45 |
| - %{"status" => status} = File.read!(status_file) |> Jason.decode!() |
| 42 | + %{"status" => status} = File.read!(status_file) |> Jason.decode!() |
46 | 43 |
|
47 |
| - r |
48 |
| - |> Map.put(:http_status, status) |
49 |
| - |> Map.put(:local_path, file) |
50 |
| - end, |
51 |
| - max_concurrency: 10, |
52 |
| - timeout: 120_000 |
53 |
| - ) |
54 |
| - |> Stream.map(fn {:ok, result} -> result end) |
55 |
| - |> Stream.reject(&is_nil(&1)) |
56 |
| - |> Task.async_stream( |
57 |
| - fn r -> |
58 |
| - IO.puts("Processing file #{r.id}") |
| 44 | + r |
| 45 | + |> Map.put(:http_status, status) |
| 46 | + |> Map.put(:local_path, file) |
| 47 | +end |
| 48 | + |
| 49 | +count_relevant_stop_places_per_resource = fn r -> |
| 50 | + IO.puts("Processing file #{r.id}") |
59 | 51 |
|
60 |
| - try do |
61 |
| - count = |
62 |
| - Transport.NeTEx.read_all_stop_places(r.local_path) |
63 |
| - |> Enum.flat_map(fn {_file, stops} -> stops end) |
64 |
| - # some stop places have no latitude in NeTEx |
65 |
| - |> Enum.reject(fn p -> is_nil(p[:latitude]) end) |
66 |
| - |> Enum.count() |
| 52 | + try do |
| 53 | + count = |
| 54 | + Transport.NeTEx.read_all_stop_places(r.local_path) |
| 55 | + |> Enum.flat_map(fn {_file, stops} -> stops end) |
| 56 | + # some stop places have no latitude in NeTEx |
| 57 | + |> Enum.reject(fn p -> is_nil(p[:latitude]) end) |
| 58 | + |> Enum.count() |
67 | 59 |
|
68 |
| - IO.puts("#{count} StopPlaces detected") |
69 |
| - rescue |
70 |
| - e -> IO.puts("Som'thing bad happened") |
71 |
| - end |
72 |
| - end, |
73 |
| - max_concurrency: 5, |
74 |
| - timeout: 60_000 * 5 |
75 |
| - ) |
| 60 | + IO.puts("#{count} StopPlaces detected") |
| 61 | + rescue |
| 62 | + _ -> IO.puts("Som'thing bad happened") |
| 63 | + end |
| 64 | +end |
| 65 | + |
| 66 | +netex = |
| 67 | + df |
| 68 | + |> Task.async_stream(download_resource, max_concurrency: 10, timeout: 120_000) |
| 69 | + |> Stream.map(fn {:ok, result} -> result end) |
| 70 | + |> Stream.reject(&is_nil(&1)) |
| 71 | + |> Task.async_stream(count_relevant_stop_places_per_resource, max_concurrency: 5, timeout: 60_000 * 5) |
76 | 72 | |> Stream.run()
|
0 commit comments