Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT ingestion(ingestion/fivetran) - support for standard version #12832

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@
"unity-catalog": databricks | sql_common,
# databricks is alias for unity-catalog and needs to be kept in sync
"databricks": databricks | sql_common,
"fivetran": snowflake_common | bigquery_common | sqlglot_lib,
"fivetran": snowflake_common | bigquery_common | sqlglot_lib | {"requests"},
"qlik-sense": sqlglot_lib | {"requests", "websocket-client"},
"sigma": sqlglot_lib | {"requests"},
"sac": sac,
Expand Down
85 changes: 68 additions & 17 deletions metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import warnings
from typing import Dict, Optional

import pydantic
from pydantic import Field, root_validator
from typing_extensions import Literal

Expand All @@ -19,6 +18,7 @@
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryConnectionConfig,
)
from datahub.ingestion.source.fivetran.fivetran_constants import FivetranMode
from datahub.ingestion.source.snowflake.snowflake_connection import (
SnowflakeConnectionConfig,
)
Expand Down Expand Up @@ -85,15 +85,15 @@ class BigQueryDestinationConfig(BigQueryConnectionConfig):


class FivetranLogConfig(ConfigModel):
destination_platform: Literal["snowflake", "bigquery"] = pydantic.Field(
destination_platform: Literal["snowflake", "bigquery"] = Field(
default="snowflake",
description="The destination platform where fivetran connector log tables are dumped.",
)
snowflake_destination_config: Optional[SnowflakeDestinationConfig] = pydantic.Field(
snowflake_destination_config: Optional[SnowflakeDestinationConfig] = Field(
default=None,
description="If destination platform is 'snowflake', provide snowflake configuration.",
)
bigquery_destination_config: Optional[BigQueryDestinationConfig] = pydantic.Field(
bigquery_destination_config: Optional[BigQueryDestinationConfig] = Field(
default=None,
description="If destination platform is 'bigquery', provide bigquery configuration.",
)
Expand Down Expand Up @@ -121,6 +121,19 @@ def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict:
return values


class FivetranAPIConfig(ConfigModel):
"""Configuration for the Fivetran API client."""

api_key: str = Field(description="Fivetran API key")
api_secret: str = Field(description="Fivetran API secret")
base_url: str = Field(
default="https://api.fivetran.com", description="Fivetran API base URL"
)
request_timeout_sec: int = Field(
default=30, description="Request timeout in seconds"
)


@dataclasses.dataclass
class MetadataExtractionPerfReport(Report):
connectors_metadata_extraction_sec: PerfTimer = dataclasses.field(
Expand Down Expand Up @@ -150,33 +163,47 @@ def report_connectors_dropped(self, connector: str) -> None:


class PlatformDetail(ConfigModel):
platform: Optional[str] = pydantic.Field(
platform: Optional[str] = Field(
default=None,
description="Override the platform type detection.",
)
platform_instance: Optional[str] = pydantic.Field(
platform_instance: Optional[str] = Field(
default=None,
description="The instance of the platform that all assets produced by this recipe belong to",
)
env: str = pydantic.Field(
env: str = Field(
default=DEFAULT_ENV,
description="The environment that all assets produced by DataHub platform ingestion source belong to",
)
database: Optional[str] = pydantic.Field(
database: Optional[str] = Field(
default=None,
description="The database that all assets produced by this connector belong to. "
"For destinations, this defaults to the fivetran log config's database.",
)
include_schema_in_urn: bool = pydantic.Field(
include_schema_in_urn: bool = Field(
default=True,
description="Include schema in the dataset URN. In some cases, the schema is not relevant to the dataset URN and Fivetran sets it to the source and destination table names in the connector.",
)


class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
fivetran_log_config: FivetranLogConfig = pydantic.Field(
description="Fivetran log connector destination server configurations.",
fivetran_mode: FivetranMode = Field(
default=FivetranMode.AUTO,
description="Mode of operation: 'enterprise' for log tables access, 'standard' for REST API, or 'auto' to detect.",
)

# Enterprise version configuration
fivetran_log_config: Optional[FivetranLogConfig] = Field(
default=None,
description="Fivetran log connector destination server configurations for enterprise version.",
)

# Standard version configuration
api_config: Optional[FivetranAPIConfig] = Field(
default=None,
description="Fivetran REST API configuration for standard version.",
)

connector_patterns: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Filtering regex patterns for connector names.",
Expand All @@ -193,22 +220,46 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
)

# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Airbyte Stateful Ingestion Config."
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Fivetran Stateful Ingestion Config."
)

# Fivetran connector all sources to platform instance mapping
sources_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
sources_to_platform_instance: Dict[str, PlatformDetail] = Field(
default={},
description="A mapping from connector id to its platform/instance/env/database details.",
)
# Fivetran destination to platform instance mapping
destination_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
destination_to_platform_instance: Dict[str, PlatformDetail] = Field(
default={},
description="A mapping of destination id to its platform/instance/env details.",
)

@pydantic.root_validator(pre=True)
@root_validator
def validate_config_based_on_mode(cls, values: Dict) -> Dict:
"""Validate configuration based on the selected mode."""
mode = values.get("fivetran_mode", FivetranMode.AUTO)
log_config = values.get("fivetran_log_config")
api_config = values.get("api_config")

if mode == FivetranMode.ENTERPRISE:
if not log_config:
raise ValueError(
"Enterprise mode requires 'fivetran_log_config' to be specified."
)
elif mode == FivetranMode.STANDARD:
if not api_config:
raise ValueError("Standard mode requires 'api_config' to be specified.")
elif mode == FivetranMode.AUTO:
# Auto-detect based on provided configs
if not log_config and not api_config:
raise ValueError(
"Either 'fivetran_log_config' (for enterprise) or 'api_config' (for standard) must be specified."
)

return values

@root_validator(pre=True)
def compat_sources_to_database(cls, values: Dict) -> Dict:
if "sources_to_database" in values:
warnings.warn(
Expand All @@ -227,7 +278,7 @@ def compat_sources_to_database(cls, values: Dict) -> Dict:

return values

history_sync_lookback_period: int = pydantic.Field(
history_sync_lookback_period: int = Field(
7,
description="The number of days to look back when extracting connectors' sync history.",
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List
from dataclasses import dataclass, field
from typing import Any, Dict, List


@dataclass
Expand All @@ -26,6 +26,7 @@ class Connector:
user_id: str
lineage: List[TableLineage]
jobs: List["Job"]
additional_properties: Dict[str, Any] = field(default_factory=dict)


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
PlatformDetail,
)
from datahub.ingestion.source.fivetran.data_classes import Connector, Job
from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI
from datahub.ingestion.source.fivetran.fivetran_access import (
create_fivetran_access,
)
from datahub.ingestion.source.fivetran.fivetran_query import (
MAX_JOBS_PER_CONNECTOR,
MAX_TABLE_LINEAGE_PER_CONNECTOR,
Expand Down Expand Up @@ -60,7 +62,7 @@
class FivetranSource(StatefulIngestionSourceBase):
"""
This plugin extracts fivetran users, connectors, destinations and sync history.
This plugin is in beta and has only been tested on Snowflake connector.
Supports both enterprise and standard versions.
"""

config: FivetranSourceConfig
Expand All @@ -72,7 +74,11 @@ def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext):
self.config = config
self.report = FivetranSourceReport()

self.audit_log = FivetranLogAPI(self.config.fivetran_log_config)
# Create the appropriate access implementation using the factory
self.fivetran_access = create_fivetran_access(config)

# For backward compatibility with existing tests
self.audit_log = self.fivetran_access

def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, str]:
input_dataset_urn_list: List[DatasetUrn] = []
Expand Down Expand Up @@ -104,11 +110,33 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
connector.destination_id, PlatformDetail()
)
if destination_details.platform is None:
destination_details.platform = (
self.config.fivetran_log_config.destination_platform
)
# If using enterprise version, get destination platform from config
if (
hasattr(self.config, "fivetran_log_config")
and self.config.fivetran_log_config is not None
):
destination_details.platform = (
self.config.fivetran_log_config.destination_platform
)
else:
# For standard version, use the detected platform if available
detected_platform = connector.additional_properties.get(
"destination_platform"
)
if detected_platform:
destination_details.platform = detected_platform
else:
# Default to snowflake if detection failed
destination_details.platform = "snowflake"

# Ensure database is not None to avoid attribute errors when calling .lower()
if destination_details.database is None:
destination_details.database = self.audit_log.fivetran_log_database
destination_details.database = (
self.fivetran_access.fivetran_log_database or ""
)

if source_details.database is None:
source_details.database = ""

if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR:
self.report.warning(
Expand All @@ -124,13 +152,17 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
if source_details.include_schema_in_urn
else lineage.source_table.split(".", 1)[1]
)

# Safe access to database.lower() with None check
source_table_name = (
f"{source_details.database.lower()}.{source_table}"
if source_details.database
else source_table
)

input_dataset_urn = DatasetUrn.create_from_ids(
platform_id=source_details.platform,
table_name=(
f"{source_details.database.lower()}.{source_table}"
if source_details.database
else source_table
),
table_name=source_table_name,
env=source_details.env,
platform_instance=source_details.platform_instance,
)
Expand All @@ -141,9 +173,17 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
if destination_details.include_schema_in_urn
else lineage.destination_table.split(".", 1)[1]
)

# Safe access to database.lower() with None check
destination_table_name = (
f"{destination_details.database.lower()}.{destination_table}"
if destination_details.database
else destination_table
)

output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=destination_details.platform,
table_name=f"{destination_details.database.lower()}.{destination_table}",
table_name=destination_table_name,
env=destination_details.env,
platform_instance=destination_details.platform_instance,
)
Expand Down Expand Up @@ -211,7 +251,7 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob:
env=self.config.env,
platform_instance=self.config.platform_instance,
)
owner_email = self.audit_log.get_user_email(connector.user_id)
owner_email = self.fivetran_access.get_user_email(connector.user_id)
datajob = DataJob(
id=connector.connector_id,
flow_urn=dataflow_urn,
Expand Down Expand Up @@ -314,7 +354,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
Datahub Ingestion framework invoke this method
"""
logger.info("Fivetran plugin execution is started")
connectors = self.audit_log.get_allowed_connectors_list(
connectors = self.fivetran_access.get_allowed_connectors_list(
self.config.connector_patterns,
self.config.destination_patterns,
self.report,
Expand Down
Loading
Loading