Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE - DRAFT - Azure fabric connector #12828

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import sacLogo from '../../../../images/saclogo.svg';
import cassandraLogo from '../../../../images/cassandralogo.png';
import datahubLogo from '../../../../images/datahublogo.png';
import neo4j from '../../../../images/neo4j.png';
import msfabricLogo from '../../../../images/msfabric.png';

export const ATHENA = 'athena';
export const ATHENA_URN = `urn:li:dataPlatform:${ATHENA}`;
Expand Down Expand Up @@ -140,6 +141,8 @@ export const DATAHUB_BUSINESS_GLOSSARY = 'datahub-business-glossary';
export const DATAHUB_URN = `urn:li:dataPlatform:${DATAHUB}`;
export const NEO4J = 'neo4j';
export const NEO4J_URN = `urn:li:dataPlatform:${NEO4J}`;
export const MSFABRIC = 'msfabric';
export const MSFABRIC_URN = `urn:li:dataPlatform:${MSFABRIC}`;

export const PLATFORM_URN_TO_LOGO = {
[ATHENA_URN]: athenaLogo,
Expand Down Expand Up @@ -184,6 +187,7 @@ export const PLATFORM_URN_TO_LOGO = {
[CASSANDRA_URN]: cassandraLogo,
[DATAHUB_URN]: datahubLogo,
[NEO4J_URN]: neo4j,
[MSFABRIC_URN]: msfabricLogo,
};

export const SOURCE_TO_PLATFORM_URN = {
Expand Down
8 changes: 8 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -333,5 +333,13 @@
"description": "Import Nodes and Relationships from Neo4j.",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/neo4j/",
"recipe": "source:\n type: 'neo4j'\n config:\n uri: 'neo4j+ssc://host:7687'\n username: 'neo4j'\n password: 'password'\n env: 'PROD'\n\nsink:\n type: \"datahub-rest\"\n config:\n server: 'http://localhost:8080'"
},
{
"urn": "urn:li:dataPlatform:msfabric",
"name": "msfabric",
"displayName": "Microsoft Fabric",
"description": "Import Nodes and Relationships from Microsoft Fabric.",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/msfabric/",
"recipe": ""
}
]
Binary file added datahub-web-react/src/images/msfabric.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@
# It's technically wrong for packages to depend on setuptools. However, it seems mlflow does it anyways.
"setuptools",
},
"msfabric": {"requests"} | {*abs_base, *data_lake_profiling} | microsoft_common
| {"lark[regex]==1.1.4", "sqlparse", "more-itertools"}
| sqlglot_lib
| threading_timeout_common |
sql_common | mssql_common | {"pyodbc"} | {*data_lake_profiling, *delta_lake},
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"} | sqlglot_lib,
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common | mssql_common,
Expand Down Expand Up @@ -677,6 +682,7 @@
"sac",
"cassandra",
"neo4j",
"msfabric"
]
if plugin
for dependency in plugins[plugin]
Expand Down Expand Up @@ -799,6 +805,7 @@
"sac = datahub.ingestion.source.sac.sac:SACSource",
"cassandra = datahub.ingestion.source.cassandra.cassandra:CassandraSource",
"neo4j = datahub.ingestion.source.neo4j.neo4j_source:Neo4jSource",
"msfabric = datahub.ingestion.source.ms_fabric.source:AzureFabricSource",
],
"datahub.ingestion.transformer.plugins": [
"pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from typing import Dict, Optional, Union

from azure.identity import ClientSecretCredential
from azure.core.exceptions import ClientAuthenticationError
from azure.identity import (
AzureCliCredential,
ClientSecretCredential,
DefaultAzureCredential,
ManagedIdentityCredential,
)
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient
from pydantic import Field, root_validator
Expand All @@ -20,30 +26,41 @@ class AzureConnectionConfig(ConfigModel):
default="/",
description="Base folder in hierarchical namespaces to start from.",
)
container_name: str = Field(
container_name: Optional[str] = Field(
default=None,
description="Azure storage account container name.",
)
account_name: str = Field(
account_name: Optional[str] = Field(
default=None,
description="Name of the Azure storage account. See [Microsoft official documentation on how to create a storage account.](https://docs.microsoft.com/en-us/azure/storage/blobs/create-data-lake-storage-account)",
)
# Authentication Options
use_managed_identity: bool = Field(
default=False,
description="Whether to use Azure Managed Identity authentication.",
)
use_cli_auth: bool = Field(
default=False,
description="Whether to authenticate using the Azure CLI.",
)
account_key: Optional[str] = Field(
description="Azure storage account access key that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
description="Azure storage account access key.",
default=None,
)
sas_token: Optional[str] = Field(
description="Azure storage account Shared Access Signature (SAS) token that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
description="Azure storage account SAS token.",
default=None,
)
client_secret: Optional[str] = Field(
description="Azure client secret that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
client_id: Optional[str] = Field(
description="Azure client (Application) ID for service principal auth.",
default=None,
)
client_id: Optional[str] = Field(
description="Azure client (Application) ID required when a `client_secret` is used as a credential.",
client_secret: Optional[str] = Field(
description="Azure client secret for service principal auth.",
default=None,
)
tenant_id: Optional[str] = Field(
description="Azure tenant (Directory) ID required when a `client_secret` is used as a credential.",
description="Azure tenant ID required for service principal auth.",
default=None,
)

Expand Down Expand Up @@ -72,19 +89,47 @@ def get_data_lake_service_client(self) -> DataLakeServiceClient:

def get_credentials(
self,
) -> Union[Optional[str], ClientSecretCredential]:
if self.client_id and self.client_secret and self.tenant_id:
return ClientSecretCredential(
tenant_id=self.tenant_id,
client_id=self.client_id,
client_secret=self.client_secret,
)
return self.sas_token if self.sas_token is not None else self.account_key
) -> Union[
AzureCliCredential,
ClientSecretCredential,
DefaultAzureCredential,
ManagedIdentityCredential,
str,
]:
"""Get appropriate Azure credential based on configuration"""
try:
if self.use_managed_identity:
return ManagedIdentityCredential()

elif self.use_cli_auth:
return AzureCliCredential()

elif self.client_id and self.client_secret and self.tenant_id:
return ClientSecretCredential(
tenant_id=self.tenant_id,
client_id=self.client_id,
client_secret=self.client_secret,
)

elif self.account_key:
return self.account_key

elif self.sas_token:
return self.sas_token

else:
return DefaultAzureCredential()

except ClientAuthenticationError as e:
raise ConfigurationError(f"Failed to initialize Azure credential: {str(e)}")

@root_validator()
def _check_credential_values(cls, values: Dict) -> Dict:
"""Validate that at least one valid authentication method is configured"""
if (
values.get("account_key")
values.get("use_managed_identity")
or values.get("use_cli_auth")
or values.get("account_key")
or values.get("sas_token")
or (
values.get("client_id")
Expand All @@ -94,5 +139,6 @@ def _check_credential_values(cls, values: Dict) -> Dict:
):
return values
raise ConfigurationError(
"credentials missing, requires one combination of account_key or sas_token or (client_id and client_secret and tenant_id)"
"Authentication configuration missing. Please provide one of: "
"managed identity, CLI auth, account key, SAS token, or service principal credentials"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
def strip_onelake_prefix(path: str) -> str:
"""Strip the OneLake prefix from an ABFSS path."""
if path.startswith("abfss://"):
# Split on @ and take everything after the first slash after onelake.dfs.fabric.microsoft.com
parts = path.split("@")[1].split("/", 1)
if len(parts) > 1:
return parts[1]
return path
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
)
from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig
from datahub.ingestion.source.aws.s3_util import is_s3_uri
from datahub.ingestion.source.azure.abs_utils import is_abs_uri
from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
Expand All @@ -38,6 +40,27 @@ class S3(ConfigModel):
)


class Azure(ConfigModel):
"""Azure-specific configuration for Delta Lake"""

azure_config: Optional[AzureConnectionConfig] = Field(
default=None, description="Azure configuration"
)

use_abs_container_properties: Optional[bool] = Field(
False,
description="Whether or not to create properties in datahub from the Azure blob container",
)
use_abs_blob_properties: Optional[bool] = Field(
False,
description="Whether or not to create properties in datahub from the Azure blob",
)
use_abs_blob_tags: Optional[bool] = Field(
False,
description="Whether or not to create tags in datahub from Azure blob tags",
)


class DeltaLakeSourceConfig(
PlatformInstanceConfigMixin,
EnvConfigMixin,
Expand Down Expand Up @@ -78,12 +101,24 @@ class DeltaLakeSourceConfig(
"When set to `False`, number_of_files in delta table can not be reported.",
)

s3: Optional[S3] = Field()
s3: Optional[S3] = Field(default=None, description="S3 specific configuration")
azure: Optional[Azure] = Field(
default=None, description="Azure specific configuration"
)

@cached_property
def is_s3(self):
return is_s3_uri(self.base_path or "")

@cached_property
def is_azure(self):
is_abfss = self.base_path.startswith("abfss://")
is_abs = is_abs_uri(self.base_path or "")
logger.debug(
f"Checking if {self.base_path} is Azure path: abfss={is_abfss}, abs={is_abs}"
)
return is_abfss or is_abs

@cached_property
def complete_path(self):
complete_path = self.base_path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
def read_delta_table(
path: str, opts: Dict[str, str], delta_lake_config: DeltaLakeSourceConfig
) -> Optional[DeltaTable]:
if not delta_lake_config.is_s3 and not pathlib.Path(path).exists():
# For local paths, check existence
if (
not delta_lake_config.is_s3
and not delta_lake_config.is_azure
and not pathlib.Path(path).exists()
):
# The DeltaTable() constructor will create the path if it doesn't exist.
# Hence we need an extra, manual check here.
return None
Expand Down
Loading
Loading