Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dataset_cli): add dry-run support #12814

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions metadata-ingestion/src/datahub/cli/specific/dataset_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ def dataset() -> None:
name="upsert",
)
@click.option("-f", "--file", required=True, type=click.Path(exists=True))
@click.option(
"-n", "--dry-run", type=bool, is_flag=True, default=False, help="Perform a dry run"
)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def upsert(file: Path) -> None:
def upsert(file: Path, dry_run: bool) -> None:
"""Upsert attributes to a Dataset in DataHub."""
# Call the sync command with to_datahub=True to perform the upsert operation
ctx = click.get_current_context()
ctx.invoke(sync, file=str(file), to_datahub=True)
ctx.invoke(sync, file=str(file), dry_run=dry_run, to_datahub=True)


@dataset.command(
Expand Down Expand Up @@ -167,11 +170,16 @@ def file(lintcheck: bool, lintfix: bool, file: str) -> None:
)
@click.option("-f", "--file", required=True, type=click.Path(exists=True))
@click.option("--to-datahub/--from-datahub", required=True, is_flag=True)
@click.option(
"-n", "--dry-run", type=bool, is_flag=True, default=False, help="Perform a dry run"
)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def sync(file: str, to_datahub: bool) -> None:
def sync(file: str, to_datahub: bool, dry_run: bool) -> None:
"""Sync a Dataset file to/from DataHub"""

dry_run_prefix = "[dry-run]: " if dry_run else "" # prefix to use in messages

failures: List[str] = []
with get_default_graph() as graph:
datasets = Dataset.from_yaml(file)
Expand All @@ -189,7 +197,7 @@ def sync(file: str, to_datahub: bool) -> None:
click.secho(
"\n\t- ".join(
[
f"Skipping Dataset {dataset.urn} due to missing entity references: "
f"{dry_run_prefix}Skipping Dataset {dataset.urn} due to missing entity references: "
]
+ missing_entity_references
),
Expand All @@ -199,13 +207,18 @@ def sync(file: str, to_datahub: bool) -> None:
continue
try:
for mcp in dataset.generate_mcp():
graph.emit(mcp)
click.secho(f"Update succeeded for urn {dataset.urn}.", fg="green")
if not dry_run:
graph.emit(mcp)
click.secho(
f"{dry_run_prefix}Update succeeded for urn {dataset.urn}.",
fg="green",
)
except Exception as e:
click.secho(
f"Update failed for id {id}. due to {e}",
f"{dry_run_prefix}Update failed for id {id}. due to {e}",
fg="red",
)
failures.append(dataset.urn)
else:
# Sync from DataHub
if graph.exists(dataset.urn):
Expand All @@ -215,13 +228,16 @@ def sync(file: str, to_datahub: bool) -> None:
existing_dataset: Dataset = Dataset.from_datahub(
graph=graph, urn=dataset.urn, config=dataset_get_config
)
existing_dataset.to_yaml(Path(file))
if not dry_run:
existing_dataset.to_yaml(Path(file))
else:
click.secho(f"{dry_run_prefix}Will update file {file}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not required, but it'd be super cool if we could show a diff here of what would change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do it in another PR. I think its simple enough with whats already there--- copy this file to a temp location, sync updates that temp file, and then show the diff of current vs sync.

else:
click.secho(f"Dataset {dataset.urn} does not exist")
click.secho(f"{dry_run_prefix}Dataset {dataset.urn} does not exist")
failures.append(dataset.urn)
if failures:
click.secho(
f"\nFailed to sync the following Datasets: {', '.join(failures)}",
f"\n{dry_run_prefix}Failed to sync the following Datasets: {', '.join(failures)}",
fg="red",
)
raise click.Abort()
138 changes: 138 additions & 0 deletions metadata-ingestion/tests/unit/cli/dataset/test_dataset_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,32 @@ def test_yaml_file():
temp_file.unlink()


@pytest.fixture
def invalid_value_yaml_file():
"""Creates a temporary yaml file - correctly formatted but bad datatype for testing."""
invalid_content = """
## This file is intentionally malformed
- id: user.badformat
platform: hive
schema:
fields:
- id: ip
type: bad_type
description: The IP address
"""

# Create a temporary file
temp_file = TEST_RESOURCES_DIR / "invalid_dataset.yaml.tmp"
with open(temp_file, "w") as f:
f.write(invalid_content)

yield temp_file

# Clean up
if temp_file.exists():
temp_file.unlink()


@pytest.fixture
def malformed_yaml_file():
"""Creates a temporary malformed yaml file for testing."""
Expand Down Expand Up @@ -217,3 +243,115 @@ def test_multiple_datasets_in_file(self, mock_dataset, test_yaml_file):
# Verify both dataset instances had to_yaml called
mock_dataset1.to_yaml.assert_called_once()
mock_dataset2.to_yaml.assert_called_once()

@patch("datahub.cli.specific.dataset_cli.get_default_graph")
def test_dry_run_sync(self, mock_get_default_graph, test_yaml_file):
mock_graph = MagicMock()
mock_graph.exists.return_value = True
mock_get_default_graph.return_value.__enter__.return_value = mock_graph

runner = CliRunner()
result = runner.invoke(
dataset, ["sync", "--dry-run", "--to-datahub", "-f", str(test_yaml_file)]
)

# Verify
assert result.exit_code == 0
assert not mock_get_default_graph.emit.called

@patch("datahub.cli.specific.dataset_cli.get_default_graph")
def test_dry_run_sync_fail_bad_type(
self, mock_get_default_graph, invalid_value_yaml_file
):
mock_graph = MagicMock()
mock_graph.exists.return_value = True
mock_get_default_graph.return_value.__enter__.return_value = mock_graph

runner = CliRunner()
result = runner.invoke(
dataset,
["sync", "--dry-run", "--to-datahub", "-f", str(invalid_value_yaml_file)],
)

# Verify
assert result.exit_code != 0
assert not mock_get_default_graph.emit.called
assert "Type bad_type is not a valid primitive type" in result.output

@patch("datahub.cli.specific.dataset_cli.get_default_graph")
def test_dry_run_sync_fail_missing_ref(
self, mock_get_default_graph, test_yaml_file
):
mock_graph = MagicMock()
mock_graph.exists.return_value = False
mock_get_default_graph.return_value.__enter__.return_value = mock_graph

runner = CliRunner()
result = runner.invoke(
dataset, ["sync", "--dry-run", "--to-datahub", "-f", str(test_yaml_file)]
)

# Verify
assert result.exit_code != 0
assert not mock_get_default_graph.emit.called
assert "missing entity reference" in result.output

@patch("datahub.cli.specific.dataset_cli.get_default_graph")
def test_run_sync(self, mock_get_default_graph, test_yaml_file):
mock_graph = MagicMock()
mock_graph.exists.return_value = True
mock_get_default_graph.return_value.__enter__.return_value = mock_graph

runner = CliRunner()
result = runner.invoke(
dataset, ["sync", "--to-datahub", "-f", str(test_yaml_file)]
)

# Verify
assert result.exit_code == 0
assert mock_graph.emit.called

@patch("datahub.cli.specific.dataset_cli.get_default_graph")
def test_run_sync_fail(self, mock_get_default_graph, invalid_value_yaml_file):
mock_graph = MagicMock()
mock_graph.exists.return_value = True
mock_get_default_graph.return_value.__enter__.return_value = mock_graph

runner = CliRunner()
result = runner.invoke(
dataset, ["sync", "--to-datahub", "-f", str(invalid_value_yaml_file)]
)

# Verify
assert result.exit_code != 0
assert not mock_get_default_graph.emit.called
assert "is not a valid primitive type" in result.output

@patch("datahub.cli.specific.dataset_cli.get_default_graph")
def test_run_upsert_fail(self, mock_get_default_graph, invalid_value_yaml_file):
mock_graph = MagicMock()
mock_graph.exists.return_value = True
mock_get_default_graph.return_value.__enter__.return_value = mock_graph

runner = CliRunner()
result = runner.invoke(dataset, ["upsert", "-f", str(invalid_value_yaml_file)])

# Verify
assert result.exit_code != 0
assert not mock_get_default_graph.emit.called
assert "is not a valid primitive type" in result.output

@patch("datahub.cli.specific.dataset_cli.get_default_graph")
def test_sync_from_datahub_fail(self, mock_get_default_graph, test_yaml_file):
mock_graph = MagicMock()
mock_graph.exists.return_value = False
mock_get_default_graph.return_value.__enter__.return_value = mock_graph

runner = CliRunner()
result = runner.invoke(
dataset, ["sync", "--dry-run", "--from-datahub", "-f", str(test_yaml_file)]
)

# Verify
assert result.exit_code != 0
assert "does not exist" in result.output
Loading