Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/powerbi): PowerBI source updates #12857

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

mminichino
Copy link

@mminichino mminichino commented Mar 12, 2025

  • Fixes dataset column lineage
  • Adds MySQL support
  • Adds incremental lineage
  • Adds ability to filter workspaces by name

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Mar 12, 2025
Copy link

codecov bot commented Mar 12, 2025

❌ 2 Tests Failed:

Tests completed Failed Passed Skipped
3774 2 3772 68
View the full list of 2 ❄️ flaky tests
tests.entity_versioning.test_versioning::test_link_unlink_three_versions_unlink_middle_and_latest

Flake rate in main: 27.94% (Passed 49 times, Failed 19 times)

Stack Traces | 9.04s run time
graph_client = DataHubGraph: configured to talk to http://localhost:8080 with token: eyJh**********y_oA

    @pytest.fixture(scope="function", autouse=True)
    def ingest_cleanup_data(graph_client: DataHubGraph):
        try:
            for urn in ENTITY_URN_OBJS:
                graph_client.emit_mcp(
                    MetadataChangeProposalWrapper(
                        entityUrn=urn.urn(),
                        aspect=DatasetKeyClass(
                            platform=urn.platform, name=urn.name, origin=urn.env
                        ),
                    )
                )
            for i in [2, 1, 0, 0, 1, 2]:
>               graph_client.unlink_asset_from_version_set(ENTITY_URNS[i])

tests/entity_versioning/test_versioning.py:31: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
...../ingestion/graph/entity_versioning.py:164: in unlink_asset_from_version_set
    response = self.execute_graphql(self.UNLINK_VERSION_MUTATION, variables)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = DataHubGraph: configured to talk to http://localhost:8080 with token: eyJh**********y_oA
query = '\n        mutation($input: UnlinkVersionInput!) {\n            unlinkAssetVersion(input: $input) {\n                urn\n            }\n        }\n    '
variables = {'input': {'unlinkedEntity': 'urn:li:dataset:(urn:li:dataPlatform:snowflake,versioning_0,PROD)', 'versionSet': 'urn:li:versionSet:(12345678910,dataset)'}}
operation_name = None, format_exception = True

    def execute_graphql(
        self,
        query: str,
        variables: Optional[Dict] = None,
        operation_name: Optional[str] = None,
        format_exception: bool = True,
    ) -> Dict:
        url = f"{self.config.server}/api/graphql"
    
        body: Dict = {
            "query": query,
        }
        if variables:
            body["variables"] = variables
        if operation_name:
            body["operationName"] = operation_name
    
        logger.debug(
            f"Executing {operation_name or ''} graphql query: {query} with variables: {json.dumps(variables)}"
        )
        result = self._post_generic(url, body)
        if result.get("errors"):
            if format_exception:
>               raise GraphError(f"Error executing graphql query: {result['errors']}")
E               datahub.configuration.common.GraphError: Error executing graphql query: [{'message': 'An unknown error occurred.', 'locations': [{'line': 3, 'column': 13}], 'path': ['unlinkAssetVersion'], 'extensions': {'code': 500, 'type': 'SERVER_ERROR', 'classification': 'DataFetchingException'}}]

...../ingestion/graph/client.py:1188: GraphError
tests.entity_versioning.test_versioning::test_link_unlink_three_versions_unlink_and_relink

Flake rate in main: 27.94% (Passed 49 times, Failed 19 times)

Stack Traces | 29.8s run time
graph_client = DataHubGraph: configured to talk to http://localhost:8080 with token: eyJh**********y_oA

    @pytest.fixture(scope="function", autouse=True)
    def ingest_cleanup_data(graph_client: DataHubGraph):
        try:
            for urn in ENTITY_URN_OBJS:
                graph_client.emit_mcp(
                    MetadataChangeProposalWrapper(
                        entityUrn=urn.urn(),
                        aspect=DatasetKeyClass(
                            platform=urn.platform, name=urn.name, origin=urn.env
                        ),
                    )
                )
            for i in [2, 1, 0, 0, 1, 2]:
                graph_client.unlink_asset_from_version_set(ENTITY_URNS[i])
            yield
        finally:
            for i in [2, 1, 0, 0, 1, 2]:
>               graph_client.unlink_asset_from_version_set(ENTITY_URNS[i])

tests/entity_versioning/test_versioning.py:35: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
...../ingestion/graph/entity_versioning.py:164: in unlink_asset_from_version_set
    response = self.execute_graphql(self.UNLINK_VERSION_MUTATION, variables)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = DataHubGraph: configured to talk to http://localhost:8080 with token: eyJh**********y_oA
query = '\n        mutation($input: UnlinkVersionInput!) {\n            unlinkAssetVersion(input: $input) {\n                urn\n            }\n        }\n    '
variables = {'input': {'unlinkedEntity': 'urn:li:dataset:(urn:li:dataPlatform:snowflake,versioning_1,PROD)', 'versionSet': 'urn:li:versionSet:(12345678910,dataset)'}}
operation_name = None, format_exception = True

    def execute_graphql(
        self,
        query: str,
        variables: Optional[Dict] = None,
        operation_name: Optional[str] = None,
        format_exception: bool = True,
    ) -> Dict:
        url = f"{self.config.server}/api/graphql"
    
        body: Dict = {
            "query": query,
        }
        if variables:
            body["variables"] = variables
        if operation_name:
            body["operationName"] = operation_name
    
        logger.debug(
            f"Executing {operation_name or ''} graphql query: {query} with variables: {json.dumps(variables)}"
        )
        result = self._post_generic(url, body)
        if result.get("errors"):
            if format_exception:
>               raise GraphError(f"Error executing graphql query: {result['errors']}")
E               datahub.configuration.common.GraphError: Error executing graphql query: [{'message': 'An unknown error occurred.', 'locations': [{'line': 3, 'column': 13}], 'path': ['unlinkAssetVersion'], 'extensions': {'code': 500, 'type': 'SERVER_ERROR', 'classification': 'DataFetchingException'}}]

...../ingestion/graph/client.py:1188: GraphError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Comment on lines 312 to 315
workspace_name_pattern: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns to filter PowerBI workspaces in ingestion."
" Note: This field works in conjunction with 'workspace_type_filter' and both must be considered when filtering workspaces.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure a workspace is not filtered out, both the workspace ID and name must be allowed. Is that correct?
Similarly, if either of them is denied, the workspace will be filtered out, right?

If so, we may add such a precise explanation in the description.

IMO this confusion comes from the workspace_id_pattern, which should have never been a pattern. Given workspace id is a uuid-like, having regex patterns there is not practical. Instead, it should be workspace_ids: List[str], with the list of workspaces ids to be included, and then, only if not given, then the workspace_name_pattern would be considered.

We can skip workspace_id_pattern deprecation and just refine docs to avoid confusion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the filter is an OR condition. You can just specify a name filter, or an ID filter or both. Do if you just have a name filer, all IDs will be allowed. I'll update the docs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this scenario and assuming name2 is not the name of the workspace id id1

workspace_id_pattern:
  allow:
  - id1
workspace_name_pattern:
  allow:
  - name2

No workspace will be allowed, right? that's why I see this as an AND. Of course, if we have allow all (the default) in any of the two, then everything depends on the other

And the same scenario:

workspace_id_pattern:
  deny:
  - id1
workspace_name_pattern:
  deny:
  - name2

Everything will be allowed but workspace id id1 and workspace name name2. So an OR in this case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you allow one by ID and one by name both will be allowed. For the second scenario, if you deny one by ID and another by name, both will be denied.

@@ -373,7 +387,7 @@ class PowerBiDashboardSourceConfig(
)
# Enable/Disable extracting dataset schema
extract_dataset_schema: bool = pydantic.Field(
default=False,
default=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a sort of breaking change, why?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually not a breaking change. The API call is hard coded to return all the possible attributes, including dataset schema info so this should have been always set to true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the API call, but emitting or not the aspect depends on that config flag

if self.__config.extract_dataset_schema:
dataset_mcps.extend(self.extract_dataset_schema(table, ds_urn))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, though the problem is this should have never been an option based on the way the source works. It is confusing at best. You can set extract_column_level_lineage but if you don't extract the schema data from the API scan result, you won't get column level lineage. However, the dataset schema data, if present, will always be returned.

            params={
                "datasetExpressions": True,
                "datasetSchema": True,
                "datasourceDetails": True,
                "getArtifactUsers": True,
                "lineage": True,
            },

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Mar 14, 2025
Copy link
Contributor

@sgomezvillamor sgomezvillamor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good
I just miss some testing

Have you considered mocking some test in metadata-ingestion/tests/integration/powerbi/test_powerbi.py?
or has this been end to end tested locally somehow?

@sgomezvillamor sgomezvillamor requested a review from hsheth2 March 14, 2025 10:36
@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Mar 14, 2025
@mminichino
Copy link
Author

My updates have been tested locally.

@mminichino mminichino changed the title fix(ingest/powerbi): Fix column lineage feat(ingest/powerbi): PowerBI source updates Mar 14, 2025
@sgomezvillamor
Copy link
Contributor

CI failure

FAILED tests/integration/powerbi/test_profiling.py::test_profiling - Failed: Metadata files differ (use `pytest --update-golden-files` to update):
Urn changed, urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,PROD):
<schemaMetadata> added
= 1 failed, 286 passed, 21 skipped, 1676 deselected, 196 warnings in 1166.63s (0:19:26) =

> Task :metadata-ingestion:testIntegrationBatch0 FAILED

I would say this is related to the breaking change in the extract_dataset_schema config

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Mar 14, 2025
@mminichino
Copy link
Author

mminichino commented Mar 14, 2025

The mocked API scan result does not match what the API is actually returning. But the test was wired for a specific input and output. I should just need to either add the setting or update the golden. I'll look at it.

@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Mar 14, 2025
@mminichino
Copy link
Author

mminichino commented Mar 14, 2025

Yes, that golden does not include field data. However, in the documentation we say that Schema Metadata is enabled by default. So, in my view, that golden was incorrect from the start. That exact issue has been raised by several customers.

@hsheth2
Copy link
Collaborator

hsheth2 commented Mar 14, 2025

@mminichino sounds like we need to update the mock to include the fields, and then update/regenerate the golden file accordingly

@mminichino
Copy link
Author

Yes, I am in the process of doing that

@mminichino
Copy link
Author

Golden files updated. Integration test passes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata needs-review Label for PRs that need review from a maintainer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants