Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize takehome assessment #4

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,28 @@ Fork this repository and clone to your local environment
- **Note:** If you are using Apple hardware with M1 processor, there is a common challenge with Docker. You can read more about it [here](https://javascript.plainenglish.io/which-docker-images-can-you-use-on-the-mac-m1-daba6bbc2dc5).

## Your notes (Readme.md)
@TODO: Add any additional notes / documentation in this file.

### Time spent
Give us a rough estimate of the time you spent working on this. If you spent time learning in order to do this project please feel free to let us know that too. This makes sure that we are evaluating your work fairly and in context. It also gives us the opportunity to learn and adjust our process if needed.
* Overall I spent ~4 hours on this project. As a rough breakdown:
** In the first 30-45 minutes, I gathered context by reading assessment content, API docs, etc.
** In the next ~15-30 minutes, I focused on data modeling considerations.
** In the next ~1.5 hours, I implemented a draft of the solution.
** In the next ~30 minutes, I cleaned up DAGs and added comments.
** In the next ~30 minutes, I reviewed my code and compiled this doc.
** In the last ~30 minutes, I tested my code and ensured that code runs as expected.
* I moved this weekend from DC to LA! I wrapped up most of the project at the airport / on the plane, but had to wait until after I arrived and settled in at my new house (+ setting up Internet) to test my code and finalize the project.
* LAstly, I actually had a very mild concussion last week. So I felt fine enough for the assessment, but that might have contributed to some slowness.

### Assumptions
Did you find yourself needing to make assumptions to finish this? If so, what were they and how did they impact your design/code?
* Metadata on cities (`dim_tables`) and weather types (`dim_weather_types`) should not change in any meaningful way (in terms of downstream impact). If substantial changes are needed (e.g. two cities merge into one), we expect new records to be generated with new IDs.
* This allows us to normalize the raw dataset, and prevent duplicate records when joining the core weather table to dimension tables. (I.e. we can maintain dimension tables that are unique on each entity's ID.)
* On Question #4 (least humid city), the original prompt said "per state". However, the input cities are in Canada, and the API docs note that state information is only available in the US. So I updated the prompt to "per country"; but please let me know if I am misunderstanding the prompt.

### Next steps
Provide us with some notes about what you would do next if you had more time. Are there additional features that you would want to add? Specific improvements to your code you would make?
* With more time, I would have focused next on considering additional table constraints, indexes and/or keys for optimizing downstream queries.
* Depending on business requirements, I would also have wanted to consider whether some derived views should be reconstructed as tables instead.
* Adding tests to ensure data quality would also be a good next step. E.g., we should really test that updated dimension tables retain all entries that we used to have.
* Lastly, I would definitely want to incorporate a tool like dbt to run all relevant transformations. This would streamline the process with minimal code, and enable documentation, tracking, tests, etc.

### Instructions to the evaluator
Provide any end user documentation you think is necessary and useful here
n/a
14 changes: 9 additions & 5 deletions dags/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from airflow.providers.postgres.operators.postgres import PostgresOperator


def ingest_api_data(city_ids: t.List[str]) -> t.List[t.Tuple]:
def ingest_api_data(
city_ids: t.List[str],
units: str = "metric",
) -> t.List[t.Tuple]:
"""
Pull payload from OpenWeatherAPI from a list of city IDs.
"""
Expand All @@ -23,6 +26,7 @@ def ingest_api_data(city_ids: t.List[str]) -> t.List[t.Tuple]:
payload["dt"],
payload["timezone"],
json.dumps(payload),
units
)
)
return rows
Expand All @@ -34,8 +38,8 @@ def construct_insert_statement(**context) -> str:
"""
rows = context["ti"].xcom_pull(key="return_value", task_ids="ingest_api_data")
value_rows = [
"({}, {}, {}, '{}', {}, {})".format(
row[0], row[1], row[2], row[3], "now()", "now()"
"({}, {}, {}, '{}', {}, {}, {})".format(
row[0], row[1], row[2], row[3], "'" + row[4] + "'", "now()", "now()"
)
for row in rows
]
Expand All @@ -44,7 +48,7 @@ def construct_insert_statement(**context) -> str:
return f"""
SET TIME ZONE 'UTC';

INSERT INTO raw_current_weather (city_id, unix_time_seconds, tz_offset_seconds, raw_data, created_at, updated_at)
INSERT INTO raw_current_weather (city_id, unix_time_seconds, tz_offset_seconds, raw_data, units, created_at, updated_at)
VALUES
{values_stmt}

Expand Down Expand Up @@ -78,7 +82,7 @@ def construct_insert_statement(**context) -> str:

t1 = PostgresOperator(
task_id="create_raw_dataset",
sql="sql/create_raw_current_weather_tbl.sql",
sql="sql/create_raw_current_weather_table.sql",
)

t2 = PythonOperator(
Expand Down
4 changes: 4 additions & 0 deletions dags/openweather.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def get_current_weather(

API Docs: https://openweathermap.org/current
"""
# Test inputs
assert city_id.isnumeric
assert units in ['metric', 'standard', 'imperial']

base_url: str = "http://api.openweathermap.org/data/2.5/weather"
params = dict(
appid=api_key or os.getenv("OPENWEATHER_API_KEY"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
CREATE TABLE IF NOT EXISTS current_weather (
city_id BIGINT,
city_name VARCHAR(256),
country VARCHAR(2),
utc_recorded_at TIMESTAMPTZ,
tz_offset_hours INTERVAL,
lat NUMERIC(11, 8),
lon NUMERIC(11, 8),
weather_type VARCHAR(128),
weather_desc VARCHAR(1024),
weather_id INT,
measure_units VARCHAR(16),
visibility_pct FLOAT,
cloud_pct FLOAT,
Expand All @@ -20,27 +15,32 @@ CREATE TABLE IF NOT EXISTS current_weather (
wind_deg FLOAT,
wind_gust FLOAT,
wind_speed FLOAT,
PRIMARY KEY (city_id, utc_recorded_at)
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ,
PRIMARY KEY (city_id, utc_recorded_at),
CONSTRAINT FK_city_id
FOREIGN KEY (city_id)
REFERENCES dim_cities (id),
CONSTRAINT FK_weather_id
FOREIGN KEY (weather_id)
REFERENCES dim_weather_types (id)
);

COMMENT ON COLUMN current_weather.city_id IS 'The ID of the city according to OpenWeatherMap';
COMMENT ON COLUMN current_weather.city_name IS 'The name of the city according to OpenWeatherMap';
COMMENT ON COLUMN current_weather.country IS 'Two letter country code of the city according to OpenWeatherMap';
COMMENT ON COLUMN current_weather.utc_recorded_at IS 'When the record was created in UTC time';
COMMENT ON COLUMN current_weather.tz_offset_hours IS 'The timezone of the location where the record was created, expressed as a UTC offset in hours';
COMMENT ON COLUMN current_weather.lat IS 'The latitude of the location where the record was created';
COMMENT ON COLUMN current_weather.lon IS 'The longitude of the location where the record was created';
COMMENT ON COLUMN current_weather.weather_type IS 'The recorded weather';
COMMENT ON COLUMN current_weather.weather_desc IS 'A more precise description of the recorded weather';
COMMENT ON COLUMN current_weather.measure_units IS 'One of "Metric", "Imperial", or "Standard"';
COMMENT ON COLUMN current_weather.weather_id IS 'The recorded weather ID; join to `dim_weather_types` to get type and description';
COMMENT ON COLUMN current_weather.measure_units IS 'One of "metric", "imperial", or "standard"';
COMMENT ON COLUMN current_weather.visibility_pct IS 'The overall visibility expressed as a pct';
COMMENT ON COLUMN current_weather.cloud_pct IS 'The cloud coverage expressed as a pct';
COMMENT ON COLUMN current_weather.temp_deg IS 'The temperature in degrees, using the scale associated with the measure_units';
COMMENT ON COLUMN current_weather.humidity_pct IS 'The humidity expressed as a pct';
COMMENT ON COLUMN current_weather.pressure IS 'The atmospheric pressure, using the appropriate units associated with the measure_units';
COMMENT ON COLUMN current_weather.pressure IS 'The atmospheric pressure, in hPa';
COMMENT ON COLUMN current_weather.temp_min IS 'The lowest temperature felt at that time, using the appropriate units associated with the measure_units';
COMMENT ON COLUMN current_weather.temp_max IS 'The highest temperature felt at that time, using the appropriate units associated with the measure_units';
COMMENT ON COLUMN current_weather.feels_like IS 'The perceived temperature at that time, using the appropriate units associated with the measure_units';
COMMENT ON COLUMN current_weather.wind_deg IS 'The direction the wind was blowing at that time, expressed as degrees bearing';
COMMENT ON COLUMN current_weather.wind_gust IS 'The force of the wind gusts at that time';
COMMENT ON COLUMN current_weather.wind_speed IS 'The speed of the wind in units associated with the measure_units';
COMMENT ON COLUMN raw_current_weather.created_at IS 'When the payload was first taken from the OpenWeatherMap API';
COMMENT ON COLUMN raw_current_weather.updated_at IS 'When this row was most recently updated';
18 changes: 18 additions & 0 deletions dags/sql/create_dim_cities_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE TABLE IF NOT EXISTS dim_cities (
id BIGINT,
city_name VARCHAR(256),
country VARCHAR(2),
lat NUMERIC(11, 8),
lon NUMERIC(11, 8),
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ,
PRIMARY KEY (id)
);

COMMENT ON COLUMN dim_cities.id IS 'The ID of the city according to OpenWeatherMap';
COMMENT ON COLUMN dim_cities.city_name IS 'The name of the city according to OpenWeatherMap';
COMMENT ON COLUMN dim_cities.country IS 'Two letter country code of the city according to OpenWeatherMap';
COMMENT ON COLUMN dim_cities.lat IS 'The latitude of the location where the record was created';
COMMENT ON COLUMN dim_cities.lon IS 'The longitude of the location where the record was created';
COMMENT ON COLUMN dim_cities.created_at IS 'When the city record was first taken from the OpenWeatherMap API';
COMMENT ON COLUMN dim_cities.updated_at IS 'When this row was most recently updated';
14 changes: 14 additions & 0 deletions dags/sql/create_dim_weather_types_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS dim_weather_types (
id INT,
weather_type VARCHAR(128),
weather_desc VARCHAR(1024),
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ,
PRIMARY KEY (id)
);

COMMENT ON COLUMN weather_types.id IS 'The recorded weather ID';
COMMENT ON COLUMN weather_types.weather_type IS 'The recorded weather';
COMMENT ON COLUMN weather_types.weather_desc IS 'A more precise description of the recorded weather';
COMMENT ON COLUMN weather_types.created_at IS 'When the weather record was first taken from the OpenWeatherMap API';
COMMENT ON COLUMN weather_types.updated_at IS 'When this row was most recently updated';
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS raw_current_weather (
unix_time_seconds BIGINT,
tz_offset_seconds INTEGER,
raw_data JSONB,
units VARCHAR(16),
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ,
PRIMARY KEY (city_id, unix_time_seconds)
Expand All @@ -12,5 +13,6 @@ COMMENT ON COLUMN raw_current_weather.city_id IS 'The ID of the city according t
COMMENT ON COLUMN raw_current_weather.unix_time_seconds IS 'The time the record was created, expressed in integer seconds elapsed since 1970-01-01';
COMMENT ON COLUMN raw_current_weather.tz_offset_seconds IS 'How many seconds off of UTC the timezone the record was created in was';
COMMENT ON COLUMN raw_current_weather.raw_data IS 'The payload returned from the OpenWeatherMap API';
COMMENT ON COLUMN raw_current_weather.units IS 'One of "metric", "imperial", or "standard"';
COMMENT ON COLUMN raw_current_weather.created_at IS 'When the payload was first taken from the OpenWeatherMap API';
COMMENT ON COLUMN raw_current_weather.updated_at IS 'When this row was most recently updated';
27 changes: 27 additions & 0 deletions dags/sql/derive_current_weather_info_ranked_view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
CREATE OR REPLACE VIEW current_weather_ranked_by_recency_view AS
/* Rank records per city based on time of extraction */
SELECT
city_id
,utc_recorded_at
,tz_offset_hours
,weather_id
,measure_units
,visibility_pct
,cloud_pct
,temp_deg
,humidity_pct
,pressure
,temp_min
,temp_max
,feels_like
,wind_deg
,wind_gust
,wind_speed
,created_at
,updated_at
/* Use ROW_NUMBER() since we want absolute latest record */
,ROW_NUMBER() OVER (
PARTITION BY city_id
ORDER BY utc_recorded_at DESC
) AS rank_utc_recorded_at_desc
FROM current_weather
9 changes: 9 additions & 0 deletions dags/sql/derive_highest_temp_daily_view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE OR REPLACE VIEW highest_temp_per_city_day AS
/* For each day and city, find its highest recorded temperature */
SELECT city_id
,CAST(utc_recorded_at AS DATE) AS day
,MAX(temp_max) AS max_temp
FROM current_weather
GROUP BY
city_id
,CAST(utc_recorded_at AS DATE)
23 changes: 23 additions & 0 deletions dags/sql/derive_hottest_cities_daily_view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE OR REPLACE VIEW hottest_cities_per_day AS
/* First rank each city per day, based on highest temperature */
WITH cities_ranked AS (
SELECT city_id
,day
,max_temp
/* Use RANK() to include all cities in ties */
,RANK() OVER (
PARTITION BY day
ORDER BY max_temp DESC
) AS rank_max_temp_desc
FROM highest_temp_per_city_day
)
/* Find hottest city per day based on above rank */
SELECT
cr.day
,c.city_name
,cr.max_temp
FROM cities_ranked cr
JOIN dim_cities c
ON cr.city_id = c.id
WHERE cr.rank_max_temp_desc = 1
ORDER BY cr.day
28 changes: 28 additions & 0 deletions dags/sql/derive_hottest_days_yearly_view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
CREATE OR REPLACE VIEW hottest_days_per_city_year AS
/* Rank days per calendar year + city, ordered by max temperature */
WITH days_ranked AS (
SELECT city_id
,day
,DATE_TRUNC('year', day) AS year
,max_temp
/* Use RANK() to include all dates in ties */
,RANK() OVER (
PARTITION BY city_id, DATE_TRUNC('year', day)
ORDER BY max_temp DESC
) AS rank_max_temp_desc
FROM highest_temp_per_city_day
)
/* Extract top 7 hottest days based on above rank */
SELECT
dr.year
,c.city_name
,dr.rank_max_temp_desc
,dr.day
,dr.max_temp
FROM days_ranked dr
JOIN dim_cities c
ON dr.city_id = c.id
WHERE dr.rank_max_temp_desc <= 7
ORDER BY dr.year
,c.city_name
,dr.rank_max_temp_desc
25 changes: 25 additions & 0 deletions dags/sql/derive_latest_weather_info_view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
CREATE OR REPLACE VIEW latest_weather_information AS
/* For each city, pull the latest weather info based on above rank.
Since we ranked by utc_recorded_at, results should be unique on city_id.
*/
SELECT
city_id
,utc_recorded_at
,tz_offset_hours
,weather_id
,measure_units
,visibility_pct
,cloud_pct
,temp_deg
,humidity_pct
,pressure
,temp_min
,temp_max
,feels_like
,wind_deg
,wind_gust
,wind_speed
,created_at
,updated_at
FROM current_weather_ranked_by_recency_view
WHERE rank_utc_recorded_at_desc = 1
25 changes: 25 additions & 0 deletions dags/sql/derive_least_humid_city_per_country_view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
CREATE OR REPLACE VIEW least_humid_city_per_country_view AS
/* Rank cities by humidity */
WITH cities_ranked AS (
SELECT
w.city_id
,c.city_name
,w.humidity_pct
,c.country
/* Use RANK() to include all cities in case of ties */
,RANK() OVER(
PARTITION BY c.country
ORDER BY w.humidity_pct ASC
) AS rank_humidity_pct_asc
FROM current_weather w
JOIN dim_cities c
ON w.city_id = c.id
)
/* Use above rank to pull least humid city per country */
SELECT
country
,city_name
,humidity_pct
FROM cities_ranked
WHERE rank_humidity_pct_asc = 1
ORDER BY country
18 changes: 18 additions & 0 deletions dags/sql/derive_moving_avg_temperature_view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE OR REPLACE VIEW moving_average_temperature_view AS
/* Calculate moving average over last 5 readings */
WITH moving_avg_temp_per_city AS (
SELECT
city_id
,AVG(temp_deg) AS avg_temp_last_five_readings
FROM current_weather_ranked_by_recency_view
WHERE rank_utc_recorded_at_desc <= 5
GROUP BY city_id
)
/* Join to dim_cities to get city name */
SELECT
c.city_name
,mat.avg_temp_last_five_readings
FROM moving_avg_temp_per_city mat
JOIN dim_cities c
ON mat.city_id = c.id
ORDER BY c.city_name
Loading