Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/snowflake): Fixing table rename query handling #12852

Merged
merged 3 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ def _parse_audit_log_row(
res["session_id"],
res["query_start_time"],
object_modified_by_ddl,
res["query_type"],
)
if known_ddl_entry:
return known_ddl_entry
Expand Down Expand Up @@ -537,40 +538,42 @@ def parse_ddl_query(
session_id: str,
timestamp: datetime,
object_modified_by_ddl: dict,
query_type: str,
) -> Optional[Union[TableRename, TableSwap]]:
timestamp = timestamp.astimezone(timezone.utc)
if object_modified_by_ddl[
"operationType"
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
urn1 = self.identifiers.gen_dataset_urn(
if (
object_modified_by_ddl["operationType"] == "ALTER"
and query_type == "RENAME_TABLE"
and object_modified_by_ddl["properties"].get("objectName")
):
original_un = self.identifiers.gen_dataset_urn(
self.identifiers.get_dataset_identifier_from_qualified_name(
object_modified_by_ddl["objectName"]
)
)

urn2 = self.identifiers.gen_dataset_urn(
new_urn = self.identifiers.gen_dataset_urn(
self.identifiers.get_dataset_identifier_from_qualified_name(
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
object_modified_by_ddl["properties"]["objectName"]["value"]
)
)

return TableSwap(urn1, urn2, query, session_id, timestamp)
return TableRename(original_un, new_urn, query, session_id, timestamp)
elif object_modified_by_ddl[
"operationType"
] == "RENAME_TABLE" and object_modified_by_ddl["properties"].get("objectName"):
original_un = self.identifiers.gen_dataset_urn(
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
urn1 = self.identifiers.gen_dataset_urn(
self.identifiers.get_dataset_identifier_from_qualified_name(
object_modified_by_ddl["objectName"]
)
)

new_urn = self.identifiers.gen_dataset_urn(
urn2 = self.identifiers.gen_dataset_urn(
self.identifiers.get_dataset_identifier_from_qualified_name(
object_modified_by_ddl["properties"]["objectName"]["value"]
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
)
)

return TableRename(original_un, new_urn, query, session_id, timestamp)
return TableSwap(urn1, urn2, query, session_id, timestamp)
else:
self.report.num_ddl_queries_dropped += 1
return None
Expand Down
120 changes: 119 additions & 1 deletion metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from typing import Any, Dict
from unittest.mock import MagicMock, patch

Expand All @@ -16,18 +17,27 @@
from datahub.ingestion.source.snowflake.oauth_config import OAuthConfiguration
from datahub.ingestion.source.snowflake.snowflake_config import (
DEFAULT_TEMP_TABLES_PATTERNS,
SnowflakeIdentifierConfig,
SnowflakeV2Config,
)
from datahub.ingestion.source.snowflake.snowflake_lineage_v2 import UpstreamLineageEdge
from datahub.ingestion.source.snowflake.snowflake_queries import (
SnowflakeQueriesExtractor,
SnowflakeQueriesExtractorConfig,
)
from datahub.ingestion.source.snowflake.snowflake_query import (
SnowflakeQuery,
create_deny_regex_sql_filter,
)
from datahub.ingestion.source.snowflake.snowflake_usage_v2 import (
SnowflakeObjectAccessEntry,
)
from datahub.ingestion.source.snowflake.snowflake_utils import SnowsightUrlBuilder
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeIdentifierBuilder,
SnowsightUrlBuilder,
)
from datahub.ingestion.source.snowflake.snowflake_v2 import SnowflakeV2Source
from datahub.sql_parsing.sql_parsing_aggregator import TableRename, TableSwap
from datahub.testing.doctest import assert_doctest
from tests.test_helpers import test_connection_helpers

Expand Down Expand Up @@ -689,3 +699,111 @@ def test_snowflake_query_result_parsing():
],
}
assert UpstreamLineageEdge.parse_obj(db_row)


class TestDDLProcessing:
@pytest.fixture
def session_id(self):
return "14774700483022321"

@pytest.fixture
def timestamp(self):
return datetime.datetime(
year=2025, month=2, day=3, hour=15, minute=1, second=43
).astimezone(datetime.timezone.utc)

@pytest.fixture
def extractor(self) -> SnowflakeQueriesExtractor:
connection = MagicMock()
config = SnowflakeQueriesExtractorConfig()
structured_report = MagicMock()
filters = MagicMock()
structured_report.num_ddl_queries_dropped = 0
identifier_config = SnowflakeIdentifierConfig()
identifiers = SnowflakeIdentifierBuilder(identifier_config, structured_report)
return SnowflakeQueriesExtractor(
connection, config, structured_report, filters, identifiers
)

def test_ddl_processing_alter_table_rename(self, extractor, session_id, timestamp):
query = "ALTER TABLE person_info_loading RENAME TO person_info_final;"
object_modified_by_ddl = {
"objectDomain": "Table",
"objectId": 1789034,
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO_LOADING",
"operationType": "ALTER",
"properties": {
"objectName": {"value": "DUMMY_DB.PUBLIC.PERSON_INFO_FINAL"}
},
}
query_type = "RENAME_TABLE"

ddl = extractor.parse_ddl_query(
query, session_id, timestamp, object_modified_by_ddl, query_type
)

assert ddl == TableRename(
original_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_loading,PROD)",
new_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_final,PROD)",
query=query,
session_id=session_id,
timestamp=timestamp,
), "Processing ALTER ... RENAME should result in a proper TableRename object"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, we don't need comments on asserts - the test feels pretty clear already


def test_ddl_processing_alter_table_add_column(
self, extractor, session_id, timestamp
):
query = "ALTER TABLE person_info ADD year BIGINT"
object_modified_by_ddl = {
"objectDomain": "Table",
"objectId": 2612260,
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO",
"operationType": "ALTER",
"properties": {
"columns": {
"BIGINT": {
"objectId": {"value": 8763407},
"subOperationType": "ADD",
}
}
},
}
query_type = "ALTER_TABLE_ADD_COLUMN"

ddl = extractor.parse_ddl_query(
query, session_id, timestamp, object_modified_by_ddl, query_type
)

assert ddl is None, (
"For altering columns statement ddl parsing should return None"
)
assert extractor.report.num_ddl_queries_dropped == 1, (
"Dropped ddls should be properly counted"
)

def test_ddl_processing_alter_table_swap(self, extractor, session_id, timestamp):
query = "ALTER TABLE person_info SWAP WITH person_info_swap;"
object_modified_by_ddl = {
"objectDomain": "Table",
"objectId": 3776835,
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO",
"operationType": "ALTER",
"properties": {
"swapTargetDomain": {"value": "Table"},
"swapTargetId": {"value": 3786260},
"swapTargetName": {"value": "DUMMY_DB.PUBLIC.PERSON_INFO_SWAP"},
},
}
query_type = "ALTER"

ddl = extractor.parse_ddl_query(
query, session_id, timestamp, object_modified_by_ddl, query_type
)

assert ddl == TableSwap(
urn1="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info,PROD)",
urn2="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_swap,PROD)",
query=query,
session_id=session_id,
timestamp=timestamp,
), "Processing ALTER ... SWAP DDL should result in a proper TableSwap object"
Loading