diff --git a/marquez_airflow/dag.py b/marquez_airflow/dag.py index 6516f40158c6d..9c0493d41aa1e 100644 --- a/marquez_airflow/dag.py +++ b/marquez_airflow/dag.py @@ -449,8 +449,9 @@ def _get_location(task): return get_location(task.file_path) else: return get_location(task.dag.fileloc) - except Exception as e: - log.warning(f'Unable to fetch the location. {e}', exc_info=True) + except Exception: + log.warning(f"Failed to get location for task '{task.task_id}'.", + exc_info=True) return None @staticmethod diff --git a/marquez_airflow/utils.py b/marquez_airflow/utils.py index 58fb4576f060a..4c80ad918c0ad 100644 --- a/marquez_airflow/utils.py +++ b/marquez_airflow/utils.py @@ -65,7 +65,11 @@ def make_key(job_name, run_id): return "marquez_id_mapping-{}-{}".format(job_name, run_id) -def url_to_https(url): +def url_to_https(url) -> str: + # Ensure URL exists + if not url: + return None + base_url = None if url.startswith('git@'): part = url.split('git@')[1:2] @@ -75,14 +79,18 @@ def url_to_https(url): base_url = url if not base_url: - raise ValueError(f'Unable to extract location from: {url}') + raise ValueError(f"Unable to extract location from: {url}") if base_url.endswith('.git'): base_url = base_url[:-4] return base_url -def get_location(file_path): +def get_location(file_path) -> str: + # Ensure file path exists + if not file_path: + return None + # move to the file directory abs_path = os.path.abspath(file_path) file_name = os.path.basename(file_path) @@ -99,6 +107,9 @@ def get_location(file_path): # build the URL base_url = url_to_https(repo_url) + if not base_url: + return None + return f'{base_url}/blob/{commit_id}/{repo_relative_path}{file_name}' diff --git a/tests/test_utils.py b/tests/test_utils.py index 26f5ddbb15b0d..3d461ebb66978 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -19,6 +19,8 @@ from marquez_airflow.extractors import StepMetadata from marquez_airflow.version import VERSION as MARQUEZ_AIRFLOW_VERSION from marquez_airflow.utils import ( + url_to_https, + get_location, get_connection_uri, add_airflow_info_to ) @@ -56,3 +58,13 @@ def test_add_airflow_info_to(): assert step_metadata.context['airflow.task_info'] is not None assert step_metadata.context['marquez_airflow.version'] == \ MARQUEZ_AIRFLOW_VERSION + + +def test_get_location_no_file_path(): + assert get_location(None) is None + assert get_location("") is None + + +def test_url_to_https_no_url(): + assert url_to_https(None) is None + assert url_to_https("") is None