From 48869ab3e94869095302badbd256370735c9577d Mon Sep 17 00:00:00 2001 From: skwon615 <78287018+skwon615@users.noreply.github.com> Date: Mon, 16 May 2022 00:52:48 -0700 Subject: [PATCH] Finalize takehome assessment --- README.md | 22 ++++- dags/fetcher.py | 14 ++- dags/openweather.py | 4 + ...l.sql => create_current_weather_table.sql} | 30 +++--- dags/sql/create_dim_cities_table.sql | 18 ++++ dags/sql/create_dim_weather_types_table.sql | 14 +++ ...l => create_raw_current_weather_table.sql} | 2 + ...erive_current_weather_info_ranked_view.sql | 27 +++++ dags/sql/derive_highest_temp_daily_view.sql | 9 ++ dags/sql/derive_hottest_cities_daily_view.sql | 23 +++++ dags/sql/derive_hottest_days_yearly_view.sql | 28 ++++++ dags/sql/derive_latest_weather_info_view.sql | 25 +++++ ...rive_least_humid_city_per_country_view.sql | 25 +++++ .../derive_moving_avg_temperature_view.sql | 18 ++++ dags/sql/transform_dim_cities_table.sql | 47 +++++++++ dags/sql/transform_dim_weather_type_table.sql | 44 +++++++++ dags/sql/transform_weather_table.sql | 34 +++++++ dags/transformer.py | 98 +++++++++++++++---- docker-compose.yaml | 9 ++ env.sample | 2 +- 20 files changed, 450 insertions(+), 43 deletions(-) rename dags/sql/{create_current_weather_tbl.sql => create_current_weather_table.sql} (67%) create mode 100644 dags/sql/create_dim_cities_table.sql create mode 100644 dags/sql/create_dim_weather_types_table.sql rename dags/sql/{create_raw_current_weather_tbl.sql => create_raw_current_weather_table.sql} (88%) create mode 100644 dags/sql/derive_current_weather_info_ranked_view.sql create mode 100644 dags/sql/derive_highest_temp_daily_view.sql create mode 100644 dags/sql/derive_hottest_cities_daily_view.sql create mode 100644 dags/sql/derive_hottest_days_yearly_view.sql create mode 100644 dags/sql/derive_latest_weather_info_view.sql create mode 100644 dags/sql/derive_least_humid_city_per_country_view.sql create mode 100644 dags/sql/derive_moving_avg_temperature_view.sql create mode 100644 dags/sql/transform_dim_cities_table.sql create mode 100644 dags/sql/transform_dim_weather_type_table.sql create mode 100644 dags/sql/transform_weather_table.sql diff --git a/README.md b/README.md index 54b0758..58c25d1 100644 --- a/README.md +++ b/README.md @@ -100,16 +100,28 @@ Fork this repository and clone to your local environment - **Note:** If you are using Apple hardware with M1 processor, there is a common challenge with Docker. You can read more about it [here](https://javascript.plainenglish.io/which-docker-images-can-you-use-on-the-mac-m1-daba6bbc2dc5). ## Your notes (Readme.md) -@TODO: Add any additional notes / documentation in this file. ### Time spent -Give us a rough estimate of the time you spent working on this. If you spent time learning in order to do this project please feel free to let us know that too. This makes sure that we are evaluating your work fairly and in context. It also gives us the opportunity to learn and adjust our process if needed. +* Overall I spent ~4 hours on this project. As a rough breakdown: +** In the first 30-45 minutes, I gathered context by reading assessment content, API docs, etc. +** In the next ~15-30 minutes, I focused on data modeling considerations. +** In the next ~1.5 hours, I implemented a draft of the solution. +** In the next ~30 minutes, I cleaned up DAGs and added comments. +** In the next ~30 minutes, I reviewed my code and compiled this doc. +** In the last ~30 minutes, I tested my code and ensured that code runs as expected. +* I moved this weekend from DC to LA! I wrapped up most of the project at the airport / on the plane, but had to wait until after I arrived and settled in at my new house (+ setting up Internet) to test my code and finalize the project. +* LAstly, I actually had a very mild concussion last week. So I felt fine enough for the assessment, but that might have contributed to some slowness. ### Assumptions -Did you find yourself needing to make assumptions to finish this? If so, what were they and how did they impact your design/code? +* Metadata on cities (`dim_tables`) and weather types (`dim_weather_types`) should not change in any meaningful way (in terms of downstream impact). If substantial changes are needed (e.g. two cities merge into one), we expect new records to be generated with new IDs. + * This allows us to normalize the raw dataset, and prevent duplicate records when joining the core weather table to dimension tables. (I.e. we can maintain dimension tables that are unique on each entity's ID.) +* On Question #4 (least humid city), the original prompt said "per state". However, the input cities are in Canada, and the API docs note that state information is only available in the US. So I updated the prompt to "per country"; but please let me know if I am misunderstanding the prompt. ### Next steps -Provide us with some notes about what you would do next if you had more time. Are there additional features that you would want to add? Specific improvements to your code you would make? +* With more time, I would have focused next on considering additional table constraints, indexes and/or keys for optimizing downstream queries. +* Depending on business requirements, I would also have wanted to consider whether some derived views should be reconstructed as tables instead. +* Adding tests to ensure data quality would also be a good next step. E.g., we should really test that updated dimension tables retain all entries that we used to have. +* Lastly, I would definitely want to incorporate a tool like dbt to run all relevant transformations. This would streamline the process with minimal code, and enable documentation, tracking, tests, etc. ### Instructions to the evaluator -Provide any end user documentation you think is necessary and useful here +n/a diff --git a/dags/fetcher.py b/dags/fetcher.py index a1ef31a..749b9f0 100644 --- a/dags/fetcher.py +++ b/dags/fetcher.py @@ -7,7 +7,10 @@ from airflow.providers.postgres.operators.postgres import PostgresOperator -def ingest_api_data(city_ids: t.List[str]) -> t.List[t.Tuple]: +def ingest_api_data( + city_ids: t.List[str], + units: str = "metric", +) -> t.List[t.Tuple]: """ Pull payload from OpenWeatherAPI from a list of city IDs. """ @@ -23,6 +26,7 @@ def ingest_api_data(city_ids: t.List[str]) -> t.List[t.Tuple]: payload["dt"], payload["timezone"], json.dumps(payload), + units ) ) return rows @@ -34,8 +38,8 @@ def construct_insert_statement(**context) -> str: """ rows = context["ti"].xcom_pull(key="return_value", task_ids="ingest_api_data") value_rows = [ - "({}, {}, {}, '{}', {}, {})".format( - row[0], row[1], row[2], row[3], "now()", "now()" + "({}, {}, {}, '{}', {}, {}, {})".format( + row[0], row[1], row[2], row[3], "'" + row[4] + "'", "now()", "now()" ) for row in rows ] @@ -44,7 +48,7 @@ def construct_insert_statement(**context) -> str: return f""" SET TIME ZONE 'UTC'; - INSERT INTO raw_current_weather (city_id, unix_time_seconds, tz_offset_seconds, raw_data, created_at, updated_at) + INSERT INTO raw_current_weather (city_id, unix_time_seconds, tz_offset_seconds, raw_data, units, created_at, updated_at) VALUES {values_stmt} @@ -78,7 +82,7 @@ def construct_insert_statement(**context) -> str: t1 = PostgresOperator( task_id="create_raw_dataset", - sql="sql/create_raw_current_weather_tbl.sql", + sql="sql/create_raw_current_weather_table.sql", ) t2 = PythonOperator( diff --git a/dags/openweather.py b/dags/openweather.py index 3ef3604..8115ed9 100644 --- a/dags/openweather.py +++ b/dags/openweather.py @@ -16,6 +16,10 @@ def get_current_weather( API Docs: https://openweathermap.org/current """ + # Test inputs + assert city_id.isnumeric + assert units in ['metric', 'standard', 'imperial'] + base_url: str = "http://api.openweathermap.org/data/2.5/weather" params = dict( appid=api_key or os.getenv("OPENWEATHER_API_KEY"), diff --git a/dags/sql/create_current_weather_tbl.sql b/dags/sql/create_current_weather_table.sql similarity index 67% rename from dags/sql/create_current_weather_tbl.sql rename to dags/sql/create_current_weather_table.sql index 3c331ec..cb04e52 100644 --- a/dags/sql/create_current_weather_tbl.sql +++ b/dags/sql/create_current_weather_table.sql @@ -1,13 +1,8 @@ CREATE TABLE IF NOT EXISTS current_weather ( city_id BIGINT, - city_name VARCHAR(256), - country VARCHAR(2), utc_recorded_at TIMESTAMPTZ, tz_offset_hours INTERVAL, - lat NUMERIC(11, 8), - lon NUMERIC(11, 8), - weather_type VARCHAR(128), - weather_desc VARCHAR(1024), + weather_id INT, measure_units VARCHAR(16), visibility_pct FLOAT, cloud_pct FLOAT, @@ -20,27 +15,32 @@ CREATE TABLE IF NOT EXISTS current_weather ( wind_deg FLOAT, wind_gust FLOAT, wind_speed FLOAT, - PRIMARY KEY (city_id, utc_recorded_at) + created_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ, + PRIMARY KEY (city_id, utc_recorded_at), + CONSTRAINT FK_city_id + FOREIGN KEY (city_id) + REFERENCES dim_cities (id), + CONSTRAINT FK_weather_id + FOREIGN KEY (weather_id) + REFERENCES dim_weather_types (id) ); COMMENT ON COLUMN current_weather.city_id IS 'The ID of the city according to OpenWeatherMap'; -COMMENT ON COLUMN current_weather.city_name IS 'The name of the city according to OpenWeatherMap'; -COMMENT ON COLUMN current_weather.country IS 'Two letter country code of the city according to OpenWeatherMap'; COMMENT ON COLUMN current_weather.utc_recorded_at IS 'When the record was created in UTC time'; COMMENT ON COLUMN current_weather.tz_offset_hours IS 'The timezone of the location where the record was created, expressed as a UTC offset in hours'; -COMMENT ON COLUMN current_weather.lat IS 'The latitude of the location where the record was created'; -COMMENT ON COLUMN current_weather.lon IS 'The longitude of the location where the record was created'; -COMMENT ON COLUMN current_weather.weather_type IS 'The recorded weather'; -COMMENT ON COLUMN current_weather.weather_desc IS 'A more precise description of the recorded weather'; -COMMENT ON COLUMN current_weather.measure_units IS 'One of "Metric", "Imperial", or "Standard"'; +COMMENT ON COLUMN current_weather.weather_id IS 'The recorded weather ID; join to `dim_weather_types` to get type and description'; +COMMENT ON COLUMN current_weather.measure_units IS 'One of "metric", "imperial", or "standard"'; COMMENT ON COLUMN current_weather.visibility_pct IS 'The overall visibility expressed as a pct'; COMMENT ON COLUMN current_weather.cloud_pct IS 'The cloud coverage expressed as a pct'; COMMENT ON COLUMN current_weather.temp_deg IS 'The temperature in degrees, using the scale associated with the measure_units'; COMMENT ON COLUMN current_weather.humidity_pct IS 'The humidity expressed as a pct'; -COMMENT ON COLUMN current_weather.pressure IS 'The atmospheric pressure, using the appropriate units associated with the measure_units'; +COMMENT ON COLUMN current_weather.pressure IS 'The atmospheric pressure, in hPa'; COMMENT ON COLUMN current_weather.temp_min IS 'The lowest temperature felt at that time, using the appropriate units associated with the measure_units'; COMMENT ON COLUMN current_weather.temp_max IS 'The highest temperature felt at that time, using the appropriate units associated with the measure_units'; COMMENT ON COLUMN current_weather.feels_like IS 'The perceived temperature at that time, using the appropriate units associated with the measure_units'; COMMENT ON COLUMN current_weather.wind_deg IS 'The direction the wind was blowing at that time, expressed as degrees bearing'; COMMENT ON COLUMN current_weather.wind_gust IS 'The force of the wind gusts at that time'; COMMENT ON COLUMN current_weather.wind_speed IS 'The speed of the wind in units associated with the measure_units'; +COMMENT ON COLUMN raw_current_weather.created_at IS 'When the payload was first taken from the OpenWeatherMap API'; +COMMENT ON COLUMN raw_current_weather.updated_at IS 'When this row was most recently updated'; diff --git a/dags/sql/create_dim_cities_table.sql b/dags/sql/create_dim_cities_table.sql new file mode 100644 index 0000000..86e11d9 --- /dev/null +++ b/dags/sql/create_dim_cities_table.sql @@ -0,0 +1,18 @@ +CREATE TABLE IF NOT EXISTS dim_cities ( + id BIGINT, + city_name VARCHAR(256), + country VARCHAR(2), + lat NUMERIC(11, 8), + lon NUMERIC(11, 8), + created_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ, + PRIMARY KEY (id) +); + +COMMENT ON COLUMN dim_cities.id IS 'The ID of the city according to OpenWeatherMap'; +COMMENT ON COLUMN dim_cities.city_name IS 'The name of the city according to OpenWeatherMap'; +COMMENT ON COLUMN dim_cities.country IS 'Two letter country code of the city according to OpenWeatherMap'; +COMMENT ON COLUMN dim_cities.lat IS 'The latitude of the location where the record was created'; +COMMENT ON COLUMN dim_cities.lon IS 'The longitude of the location where the record was created'; +COMMENT ON COLUMN dim_cities.created_at IS 'When the city record was first taken from the OpenWeatherMap API'; +COMMENT ON COLUMN dim_cities.updated_at IS 'When this row was most recently updated'; diff --git a/dags/sql/create_dim_weather_types_table.sql b/dags/sql/create_dim_weather_types_table.sql new file mode 100644 index 0000000..60262f3 --- /dev/null +++ b/dags/sql/create_dim_weather_types_table.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS dim_weather_types ( + id INT, + weather_type VARCHAR(128), + weather_desc VARCHAR(1024), + created_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ, + PRIMARY KEY (id) +); + +COMMENT ON COLUMN weather_types.id IS 'The recorded weather ID'; +COMMENT ON COLUMN weather_types.weather_type IS 'The recorded weather'; +COMMENT ON COLUMN weather_types.weather_desc IS 'A more precise description of the recorded weather'; +COMMENT ON COLUMN weather_types.created_at IS 'When the weather record was first taken from the OpenWeatherMap API'; +COMMENT ON COLUMN weather_types.updated_at IS 'When this row was most recently updated'; diff --git a/dags/sql/create_raw_current_weather_tbl.sql b/dags/sql/create_raw_current_weather_table.sql similarity index 88% rename from dags/sql/create_raw_current_weather_tbl.sql rename to dags/sql/create_raw_current_weather_table.sql index b6d4728..1222873 100644 --- a/dags/sql/create_raw_current_weather_tbl.sql +++ b/dags/sql/create_raw_current_weather_table.sql @@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS raw_current_weather ( unix_time_seconds BIGINT, tz_offset_seconds INTEGER, raw_data JSONB, + units VARCHAR(16), created_at TIMESTAMPTZ, updated_at TIMESTAMPTZ, PRIMARY KEY (city_id, unix_time_seconds) @@ -12,5 +13,6 @@ COMMENT ON COLUMN raw_current_weather.city_id IS 'The ID of the city according t COMMENT ON COLUMN raw_current_weather.unix_time_seconds IS 'The time the record was created, expressed in integer seconds elapsed since 1970-01-01'; COMMENT ON COLUMN raw_current_weather.tz_offset_seconds IS 'How many seconds off of UTC the timezone the record was created in was'; COMMENT ON COLUMN raw_current_weather.raw_data IS 'The payload returned from the OpenWeatherMap API'; +COMMENT ON COLUMN raw_current_weather.units IS 'One of "metric", "imperial", or "standard"'; COMMENT ON COLUMN raw_current_weather.created_at IS 'When the payload was first taken from the OpenWeatherMap API'; COMMENT ON COLUMN raw_current_weather.updated_at IS 'When this row was most recently updated'; diff --git a/dags/sql/derive_current_weather_info_ranked_view.sql b/dags/sql/derive_current_weather_info_ranked_view.sql new file mode 100644 index 0000000..329beb5 --- /dev/null +++ b/dags/sql/derive_current_weather_info_ranked_view.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE VIEW current_weather_ranked_by_recency_view AS +/* Rank records per city based on time of extraction */ +SELECT + city_id + ,utc_recorded_at + ,tz_offset_hours + ,weather_id + ,measure_units + ,visibility_pct + ,cloud_pct + ,temp_deg + ,humidity_pct + ,pressure + ,temp_min + ,temp_max + ,feels_like + ,wind_deg + ,wind_gust + ,wind_speed + ,created_at + ,updated_at + /* Use ROW_NUMBER() since we want absolute latest record */ + ,ROW_NUMBER() OVER ( + PARTITION BY city_id + ORDER BY utc_recorded_at DESC + ) AS rank_utc_recorded_at_desc +FROM current_weather \ No newline at end of file diff --git a/dags/sql/derive_highest_temp_daily_view.sql b/dags/sql/derive_highest_temp_daily_view.sql new file mode 100644 index 0000000..e6b40f6 --- /dev/null +++ b/dags/sql/derive_highest_temp_daily_view.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE VIEW highest_temp_per_city_day AS +/* For each day and city, find its highest recorded temperature */ +SELECT city_id + ,CAST(utc_recorded_at AS DATE) AS day + ,MAX(temp_max) AS max_temp +FROM current_weather +GROUP BY + city_id + ,CAST(utc_recorded_at AS DATE) \ No newline at end of file diff --git a/dags/sql/derive_hottest_cities_daily_view.sql b/dags/sql/derive_hottest_cities_daily_view.sql new file mode 100644 index 0000000..b7164e9 --- /dev/null +++ b/dags/sql/derive_hottest_cities_daily_view.sql @@ -0,0 +1,23 @@ +CREATE OR REPLACE VIEW hottest_cities_per_day AS +/* First rank each city per day, based on highest temperature */ +WITH cities_ranked AS ( + SELECT city_id + ,day + ,max_temp + /* Use RANK() to include all cities in ties */ + ,RANK() OVER ( + PARTITION BY day + ORDER BY max_temp DESC + ) AS rank_max_temp_desc + FROM highest_temp_per_city_day +) +/* Find hottest city per day based on above rank */ +SELECT + cr.day + ,c.city_name + ,cr.max_temp +FROM cities_ranked cr +JOIN dim_cities c + ON cr.city_id = c.id +WHERE cr.rank_max_temp_desc = 1 +ORDER BY cr.day \ No newline at end of file diff --git a/dags/sql/derive_hottest_days_yearly_view.sql b/dags/sql/derive_hottest_days_yearly_view.sql new file mode 100644 index 0000000..cabbc74 --- /dev/null +++ b/dags/sql/derive_hottest_days_yearly_view.sql @@ -0,0 +1,28 @@ +CREATE OR REPLACE VIEW hottest_days_per_city_year AS +/* Rank days per calendar year + city, ordered by max temperature */ +WITH days_ranked AS ( + SELECT city_id + ,day + ,DATE_TRUNC('year', day) AS year + ,max_temp + /* Use RANK() to include all dates in ties */ + ,RANK() OVER ( + PARTITION BY city_id, DATE_TRUNC('year', day) + ORDER BY max_temp DESC + ) AS rank_max_temp_desc + FROM highest_temp_per_city_day +) +/* Extract top 7 hottest days based on above rank */ +SELECT + dr.year + ,c.city_name + ,dr.rank_max_temp_desc + ,dr.day + ,dr.max_temp +FROM days_ranked dr +JOIN dim_cities c + ON dr.city_id = c.id +WHERE dr.rank_max_temp_desc <= 7 +ORDER BY dr.year + ,c.city_name + ,dr.rank_max_temp_desc \ No newline at end of file diff --git a/dags/sql/derive_latest_weather_info_view.sql b/dags/sql/derive_latest_weather_info_view.sql new file mode 100644 index 0000000..9012fd1 --- /dev/null +++ b/dags/sql/derive_latest_weather_info_view.sql @@ -0,0 +1,25 @@ +CREATE OR REPLACE VIEW latest_weather_information AS +/* For each city, pull the latest weather info based on above rank. + Since we ranked by utc_recorded_at, results should be unique on city_id. +*/ +SELECT + city_id + ,utc_recorded_at + ,tz_offset_hours + ,weather_id + ,measure_units + ,visibility_pct + ,cloud_pct + ,temp_deg + ,humidity_pct + ,pressure + ,temp_min + ,temp_max + ,feels_like + ,wind_deg + ,wind_gust + ,wind_speed + ,created_at + ,updated_at +FROM current_weather_ranked_by_recency_view +WHERE rank_utc_recorded_at_desc = 1 \ No newline at end of file diff --git a/dags/sql/derive_least_humid_city_per_country_view.sql b/dags/sql/derive_least_humid_city_per_country_view.sql new file mode 100644 index 0000000..be5566e --- /dev/null +++ b/dags/sql/derive_least_humid_city_per_country_view.sql @@ -0,0 +1,25 @@ +CREATE OR REPLACE VIEW least_humid_city_per_country_view AS +/* Rank cities by humidity */ +WITH cities_ranked AS ( + SELECT + w.city_id + ,c.city_name + ,w.humidity_pct + ,c.country + /* Use RANK() to include all cities in case of ties */ + ,RANK() OVER( + PARTITION BY c.country + ORDER BY w.humidity_pct ASC + ) AS rank_humidity_pct_asc + FROM current_weather w + JOIN dim_cities c + ON w.city_id = c.id +) +/* Use above rank to pull least humid city per country */ +SELECT + country + ,city_name + ,humidity_pct +FROM cities_ranked +WHERE rank_humidity_pct_asc = 1 +ORDER BY country \ No newline at end of file diff --git a/dags/sql/derive_moving_avg_temperature_view.sql b/dags/sql/derive_moving_avg_temperature_view.sql new file mode 100644 index 0000000..c7ee043 --- /dev/null +++ b/dags/sql/derive_moving_avg_temperature_view.sql @@ -0,0 +1,18 @@ +CREATE OR REPLACE VIEW moving_average_temperature_view AS +/* Calculate moving average over last 5 readings */ +WITH moving_avg_temp_per_city AS ( + SELECT + city_id + ,AVG(temp_deg) AS avg_temp_last_five_readings + FROM current_weather_ranked_by_recency_view + WHERE rank_utc_recorded_at_desc <= 5 + GROUP BY city_id +) +/* Join to dim_cities to get city name */ +SELECT + c.city_name + ,mat.avg_temp_last_five_readings +FROM moving_avg_temp_per_city mat +JOIN dim_cities c + ON mat.city_id = c.id +ORDER BY c.city_name \ No newline at end of file diff --git a/dags/sql/transform_dim_cities_table.sql b/dags/sql/transform_dim_cities_table.sql new file mode 100644 index 0000000..35a29ea --- /dev/null +++ b/dags/sql/transform_dim_cities_table.sql @@ -0,0 +1,47 @@ +/* If any part of the below goes wrong, we want to revert. + So we wrap the whole transformation in a transaction block. +*/ +BEGIN TRANSACTION; + /* Since raw table has full historical data, + we can truncate and recreate the table. + While this is not the most performant, + it's a safer option in terms of data quality. + */ + ALTER TABLE current_weather DROP CONSTRAINT FK_city_id; + TRUNCATE TABLE dim_cities; + + /* Add new city entries to dimension table, + and update existing city records with latest metadata + */ + INSERT INTO dim_cities + (id,city_name,country,lat,lon,created_at,updated_at) + SELECT id + ,city_name + ,country + ,lat + ,lon + ,created_at + ,updated_at + FROM ( + SELECT DISTINCT + CAST(raw_data->>'id' AS BIGINT) AS id + ,raw_data->>'name' AS city_name + ,raw_data#>>'{sys,country}' AS country + ,CAST(raw_data#>>'{coord,lat}' AS NUMERIC(11, 8)) AS lat + ,CAST(raw_data#>>'{coord,lon}' AS NUMERIC(11, 8)) AS lon + ,created_at + ,updated_at + ,ROW_NUMBER() OVER ( + PARTITION BY CAST(raw_data->>'id' AS BIGINT) + ORDER BY updated_at DESC + ) AS rown_updated_at_desc + FROM raw_current_weather + ) AS raw_current_weather_ranked + WHERE rown_updated_at_desc = 1; + + /* Add back Foreign Key once we create the table */ + ALTER TABLE current_weather ADD CONSTRAINT FK_city_id + FOREIGN KEY (city_id) + REFERENCES dim_cities (id); + +COMMIT; \ No newline at end of file diff --git a/dags/sql/transform_dim_weather_type_table.sql b/dags/sql/transform_dim_weather_type_table.sql new file mode 100644 index 0000000..3551e3c --- /dev/null +++ b/dags/sql/transform_dim_weather_type_table.sql @@ -0,0 +1,44 @@ +/* If any part of the below goes wrong, we want to revert. + So we wrap the whole transformation in a transaction block. +*/ +BEGIN TRANSACTION; + /* Since raw table has full historical data, + we can truncate and recreate the table. + While this is not the most performant, + it's a safer option in terms of data quality. + */ + ALTER TABLE current_weather DROP CONSTRAINT FK_weather_id; + TRUNCATE TABLE dim_weather_types; + + /* Add new weather type entries to dimension table, + and update existing weather type records with latest metadata + */ + INSERT INTO dim_weather_types + (id, weather_type, weather_desc,created_at,updated_at) + SELECT id + ,weather_type + ,weather_desc + ,created_at + ,updated_at + FROM ( + SELECT DISTINCT + CAST(raw_data#>>'{weather,id}' AS INT) AS id + ,raw_data#>>'{weather,main}' AS weather_type + ,raw_data#>>'{weather,description}' AS weather_desc + ,created_at + ,updated_at + ,ROW_NUMBER() OVER ( + PARTITION BY CAST(raw_data#>>'{weather,id}' AS INT) + ORDER BY updated_at DESC + ) AS rown_updated_at_desc + FROM raw_current_weather + WHERE raw_data#>>'{weather,id}' IS NOT NULL + ) AS raw_current_weather_ranked + WHERE rown_updated_at_desc = 1; + + /* Add back Foreign Key once we create the table */ + ALTER TABLE current_weather ADD CONSTRAINT FK_weather_id + FOREIGN KEY (weather_id) + REFERENCES dim_weather_types (id); + +COMMIT; \ No newline at end of file diff --git a/dags/sql/transform_weather_table.sql b/dags/sql/transform_weather_table.sql new file mode 100644 index 0000000..5534760 --- /dev/null +++ b/dags/sql/transform_weather_table.sql @@ -0,0 +1,34 @@ +/* If any part of the below goes wrong, we want to revert. + So we wrap the whole transformation in a transaction block. +*/ +BEGIN TRANSACTION; + /* Since raw table has full historical data, + we can truncate and recreate the table. + While this is not the most performant, + it's a safer option in terms of data quality. + */ + TRUNCATE TABLE current_weather; + + INSERT INTO current_weather + SELECT + CAST(raw_data->>'id' AS BIGINT) AS city_id + ,TO_TIMESTAMP(CAST(raw_data->>'dt' AS BIGINT)) AS utc_recorded_at + ,MAKE_INTERVAL(hours => CAST(raw_data->>'timezone' AS INT) / 3600) AS tz_offset_hours /* Raw field is in seconds */ + ,CAST(raw_data#>>'{weather,id}' AS INT) AS weather_id + ,units AS measure_units + ,100.0 * (CAST(raw_data->>'visibility' AS INT) / 10000) AS visibility_pct /* Raw field is in meters, while max is 10km */ + ,CAST(raw_data#>>'{clouds,all}' AS FLOAT) AS cloud_pct + ,CAST(raw_data#>>'{main,temp}' AS FLOAT) AS temp_deg + ,CAST(raw_data#>>'{main,humidity}' AS FLOAT) AS humidity_pct + ,CAST(raw_data#>>'{main,pressure}' AS FLOAT) AS pressure + ,CAST(raw_data#>>'{main,temp_min}' AS FLOAT) AS temp_min + ,CAST(raw_data#>>'{main,temp_max}' AS FLOAT) AS temp_max + ,CAST(raw_data#>>'{main,feels_like}' AS FLOAT) AS feels_like + ,CAST(raw_data#>>'{wind,deg}' AS FLOAT) AS wind_deg + ,CAST(raw_data#>>'{wind,gust}' AS FLOAT) AS wind_gust + ,CAST(raw_data#>>'{wind,speed}' AS FLOAT) AS wind_speed + ,created_at + ,updated_at + FROM raw_current_weather; + +COMMIT; \ No newline at end of file diff --git a/dags/transformer.py b/dags/transformer.py index 1fb7c8b..4766276 100644 --- a/dags/transformer.py +++ b/dags/transformer.py @@ -6,6 +6,13 @@ # Operators; we need this to operate! from airflow.providers.postgres.operators.postgres import PostgresOperator +# Operator wrapper for calling PostgresWrapper on scripts in "sql" subfolder +def PostgresWrapper(script: str) -> PostgresOperator: + return PostgresOperator( + task_id=script, + sql=f"sql/{script}.sql", + ) + # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args = { @@ -19,26 +26,83 @@ 'transformer', default_args=default_args, description='To transform the raw current weather to a modeled dataset', - schedule_interval=timedelta(minutes=5), + schedule_interval=None,#timedelta(minutes=5), start_date=datetime(2021, 1, 1), catchup=False, tags=['take-home'], ) as dag: - # @TODO: Fill in the below - t1 = PostgresOperator( - task_id="create_modeled_dataset_table", - sql=""" - CREATE TABLE IF NOT EXISTS current_weather ( - ); - """, - ) + ############################# + # Create modeled tables + ############################# - # @TODO: Fill in the below - t2 = PostgresOperator( - task_id="transform_raw_into_modelled", - sql=""" - SELECT * FROM raw_current_weather ... - """, - ) - t1 >> t2 \ No newline at end of file + # Create dimension table for cities + t_create_dim_cities_table = PostgresWrapper("create_dim_cities_table") + # Create dimension table for weather types + t_create_dim_weather_types_table = PostgresWrapper("create_dim_weather_types_table") + # Create main weather table + t_create_current_weather_table = PostgresWrapper("create_current_weather_table") + + ############################# + # Load dimension tables + ############################# + + # Load dimension table for cities + t_transform_dim_cities_table = PostgresWrapper("transform_dim_cities_table") + # Load dimension table for weather types + t_transform_dim_weather_type_table = PostgresWrapper("transform_dim_weather_type_table") + # Finally, load main weather table + t_transform_weather_table = PostgresWrapper("transform_weather_table") + + ############################# + # Create intermediate views + ############################# + + # Hottest temperature per city per day + t_derive_highest_temp_daily_view = PostgresWrapper("derive_highest_temp_daily_view") + # Current_weather records, ranked by recency + t_derive_current_weather_info_ranked_view = PostgresWrapper("derive_current_weather_info_ranked_view") + + ############################# + # Create final derived views + ############################# + + # Q1: Top hot cities in your city list per day + t_derive_hottest_cities_daily_view = PostgresWrapper("derive_hottest_cities_daily_view") + # Q2: Top 7 hottest day per city in each calendar year + t_derive_hottest_days_yearly_view = PostgresWrapper("derive_hottest_days_yearly_view") + # Q3: Latest weather information per city + t_derive_latest_weather_info_view = PostgresWrapper("derive_latest_weather_info_view") + # Q4: The least humid city per country + t_derive_least_humid_city_per_country_view = PostgresWrapper("derive_least_humid_city_per_country_view") + # Q5: Moving average of the temperature per city for 5 readings + t_derive_moving_avg_temperature_view = PostgresWrapper("derive_moving_avg_temperature_view") + + ############################# + # Set task order + ############################# + + # Dimension tables need be loaded first + t_create_dim_cities_table >> t_transform_dim_cities_table + t_create_dim_weather_types_table >> t_transform_dim_weather_type_table + + # Once both finish, we can load core weather table + [t_transform_dim_cities_table, t_transform_dim_weather_type_table] \ + >> t_create_current_weather_table >> t_transform_weather_table + + # Then we can load intermediate views, plus Q4 view which has no dependencies + t_transform_weather_table >> [ + t_derive_highest_temp_daily_view, + t_derive_current_weather_info_ranked_view, + t_derive_least_humid_city_per_country_view, + ] + + # After each intermediate view finishes, we can run downstream tasks + t_derive_highest_temp_daily_view >> [ + t_derive_hottest_cities_daily_view, + t_derive_hottest_days_yearly_view, + ] + t_derive_current_weather_info_ranked_view >> [ + t_derive_latest_weather_info_view, + t_derive_moving_avg_temperature_view, + ] \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 357e3e2..5f31130 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -73,6 +73,7 @@ x-airflow-common: services: postgres: + platform: linux/amd64 image: postgres:13 environment: POSTGRES_USER: airflow @@ -87,6 +88,7 @@ services: restart: always redis: + platform: linux/amd64 image: redis:latest expose: - 6379 @@ -99,6 +101,7 @@ services: airflow-webserver: <<: *airflow-common + platform: linux/amd64 command: webserver ports: - 8080:8080 @@ -115,6 +118,7 @@ services: airflow-scheduler: <<: *airflow-common + platform: linux/amd64 command: scheduler healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] @@ -129,6 +133,7 @@ services: airflow-worker: <<: *airflow-common + platform: linux/amd64 command: celery worker healthcheck: test: @@ -150,6 +155,7 @@ services: airflow-triggerer: <<: *airflow-common + platform: linux/amd64 command: triggerer healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] @@ -164,6 +170,7 @@ services: airflow-init: <<: *airflow-common + platform: linux/amd64 entrypoint: /bin/bash # yamllint disable rule:line-length command: @@ -241,6 +248,7 @@ services: airflow-cli: <<: *airflow-common + platform: linux/amd64 profiles: - debug environment: @@ -254,6 +262,7 @@ services: flower: <<: *airflow-common + platform: linux/amd64 command: celery flower ports: - 5555:5555 diff --git a/env.sample b/env.sample index 68fb0fd..b049416 100644 --- a/env.sample +++ b/env.sample @@ -1,2 +1,2 @@ AIRFLOW_CONN_POSTGRES_DEFAULT=postgres://airflow:airflow@postgres:5432 -OPENWEATHER_API_KEY= +OPENWEATHER_API_KEY=949a0551f6665f27787cd01605d9207d