Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed migrate-credential cli command on AWS #1732

Merged
merged 18 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 62 additions & 51 deletions src/databricks/labs/ucx/assessment/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import subprocess
import typing
from collections.abc import Callable
from dataclasses import dataclass
from functools import lru_cache
from dataclasses import dataclass, field
from datetime import timedelta

from databricks.sdk.errors import NotFound
from databricks.sdk.retries import retried
from databricks.sdk.service.catalog import Privilege

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -68,9 +70,20 @@ def role_name(self) -> str | None:
return role_match.group(1)


@lru_cache(maxsize=1024)
@dataclass()
class AWSCredentialCandidate:
role_arn: str
privilege: str
paths: set[str] = field(default_factory=set)

@property
def role_name(self):
role_match = re.match(AWSResources.ROLE_NAME_REGEX, self.role_arn)
return role_match.group(1)


def run_command(command):
logger.info(f"Invoking Command {command}")
logger.debug(f"Invoking Command {command}")
with subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) as process:
output, error = process.communicate()
return process.returncode, output.decode("utf-8"), error.decode("utf-8")
Expand All @@ -80,6 +93,7 @@ class AWSResources:
S3_ACTIONS: typing.ClassVar[set[str]] = {"s3:PutObject", "s3:GetObject", "s3:DeleteObject", "s3:PutObjectAcl"}
S3_READONLY: typing.ClassVar[str] = "s3:GetObject"
S3_REGEX: typing.ClassVar[str] = r"arn:aws:s3:::([a-zA-Z0-9\/+=,.@_-]*)\/\*$"
S3_BUCKET: typing.ClassVar[str] = r"((s3:\/\/|s3a:\/\/)([a-zA-Z0-9+=,.@_-]*))\/.*$"
S3_PREFIX: typing.ClassVar[str] = "arn:aws:s3:::"
S3_PATH_REGEX: typing.ClassVar[str] = r"((s3:\/\/)|(s3a:\/\/))(.*)"
UC_MASTER_ROLES_ARN: typing.ClassVar[list[str]] = [
Expand Down Expand Up @@ -216,23 +230,30 @@ def _s3_actions(self, actions):
s3_actions = [actions]
return s3_actions

def _aws_role_trust_doc(self, external_id="0000"):
def _aws_role_trust_doc(self, self_assume_arn: str | None = None, external_id="0000"):
return self._get_json_for_cli(
{
"Version": "2012-10-17",
"Statement": [self._databricks_trust_statement(external_id)],
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": (
[
"arn:aws:iam::414351767826:role/unity-catalog-prod-UCMasterRole-14S5ZJVKOTYTL",
self_assume_arn,
]
if self_assume_arn
else "arn:aws:iam::414351767826:role/unity-catalog-prod-UCMasterRole-14S5ZJVKOTYTL"
)
},
"Action": "sts:AssumeRole",
"Condition": {"StringEquals": {"sts:ExternalId": external_id}},
}
],
}
)

@staticmethod
def _databricks_trust_statement(external_id="0000"):
return {
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::414351767826:role/unity-catalog-prod-UCMasterRole-14S5ZJVKOTYTL"},
"Action": "sts:AssumeRole",
"Condition": {"StringEquals": {"sts:ExternalId": external_id}},
}

def _aws_s3_policy(self, s3_prefixes, account_id, role_name, kms_key=None):
"""
Create the UC IAM policy for the given S3 prefixes, account ID, role name, and KMS key.
Expand All @@ -246,7 +267,14 @@ def _aws_s3_policy(self, s3_prefixes, account_id, role_name, kms_key=None):
s3_prefixes_enriched = sorted([self.S3_PREFIX + s3_prefix for s3_prefix in s3_prefixes_strip])
statement = [
{
"Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject", "s3:ListBucket", "s3:GetBucketLocation"],
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:PutObjectAcl",
"s3:DeleteObject",
"s3:ListBucket",
"s3:GetBucketLocation",
],
"Resource": s3_prefixes_enriched,
"Effect": "Allow",
},
Expand Down Expand Up @@ -282,6 +310,15 @@ def _create_role(self, role_name: str, assume_role_json: str) -> str | None:
return None
return add_role["Role"]["Arn"]

def _update_role(self, role_name: str, assume_role_json: str) -> str | None:
"""
Create an AWS role with the given name and assume role policy document.
"""
update_role = self._run_json_command(
f"iam update-assume-role-policy --role-name {role_name} --policy-document {assume_role_json}"
)
return update_role

def create_uc_role(self, role_name: str) -> str | None:
"""
Create an IAM role for Unity Catalog to access the S3 buckets.
Expand All @@ -290,44 +327,18 @@ def create_uc_role(self, role_name: str) -> str | None:
"""
return self._create_role(role_name, self._aws_role_trust_doc())

def update_uc_trust_role(self, role_name: str, external_id: str = "0000") -> str | None:
@retried(on=[NotFound], timeout=timedelta(seconds=30))
def update_uc_role(self, role_name: str, role_arn: str, external_id: str = "0000") -> str | None:
"""
Modify an existing IAM role for Unity Catalog to access the S3 buckets with the external ID
captured from the UC credential.
Create an IAM role for Unity Catalog to access the S3 buckets.
the AssumeRole condition will be modified later with the external ID captured from the UC credential.
https://docs.databricks.com/en/connect/unity-catalog/storage-credentials.html
"""
role_document = self._run_json_command(f"iam get-role --role-name {role_name}")
if role_document is None:
logger.error(f"Role {role_name} doesn't exist")
return None
role = role_document.get("Role")
policy_document = role.get("AssumeRolePolicyDocument")
if policy_document and policy_document.get("Statement"):
for idx, statement in enumerate(policy_document["Statement"]):
effect = statement.get("Effect")
action = statement.get("Action")
principal = statement.get("Principal")
if not (effect and action and principal):
continue
if effect != "Allow":
continue
if action != "sts:AssumeRole":
continue
principal = principal.get("AWS")
if not principal:
continue
if not self._is_uc_principal(principal):
continue
policy_document["Statement"][idx] = self._databricks_trust_statement(external_id)
policy_document_json = self._get_json_for_cli(policy_document)
else:
policy_document_json = self._aws_role_trust_doc(external_id)
update_role = self._run_json_command(
f"iam update-assume-role-policy --role-name {role_name} --policy-document {policy_document_json}"
)
if not update_role:
return None
return update_role["Role"]["Arn"]
result = self._update_role(role_name, self._aws_role_trust_doc(role_arn, external_id))
logger.debug(f"Updated role {role_name} with {result}")
if result is None:
raise NotFound("Assume role policy not updated.")
return result

def put_role_policy(
self,
Expand Down
56 changes: 47 additions & 9 deletions src/databricks/labs/ucx/aws/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from databricks.labs.blueprint.tui import Prompts
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound, ResourceDoesNotExist
from databricks.sdk.service.catalog import Privilege
from databricks.sdk.service.compute import Policy

from databricks.labs.ucx.assessment.aws import (
Expand All @@ -18,6 +19,7 @@
AWSRoleAction,
logger,
AWSUCRoleCandidate,
AWSCredentialCandidate,
)
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.hive_metastore import ExternalLocations
Expand Down Expand Up @@ -52,34 +54,44 @@ def list_uc_roles(self, *, single_role=True, role_name="UC_ROLE", policy_name="U
"""
roles: list[AWSUCRoleCandidate] = []
missing_paths = self._identify_missing_paths()
s3_prefixes = set()
s3_buckets = set()
for missing_path in missing_paths:
match = re.match(AWSResources.S3_PATH_REGEX, missing_path)
match = re.match(AWSResources.S3_BUCKET, missing_path)
if match:
s3_prefixes.add(missing_path)
s3_buckets.add(match.group(1))
if single_role:
roles.append(AWSUCRoleCandidate(role_name, policy_name, list(s3_prefixes)))
roles.append(AWSUCRoleCandidate(role_name, policy_name, list(s3_buckets)))
else:
for idx, s3_prefix in enumerate(sorted(list(s3_prefixes))):
for idx, s3_prefix in enumerate(sorted(list(s3_buckets))):
roles.append(AWSUCRoleCandidate(f"{role_name}_{idx+1}", policy_name, [s3_prefix]))
return roles

def create_uc_roles(self, roles: list[AWSUCRoleCandidate]):
roles_created = []
for role in roles:
if self._aws_resources.create_uc_role(role.role_name):
expanded_paths = set()
for path in role.resource_paths:
expanded_paths.add(path)
expanded_paths.add(f"{path}/*")
role_arn = self._aws_resources.create_uc_role(role.role_name)
if role_arn:
self._aws_resources.put_role_policy(
role.role_name,
role.policy_name,
set(role.resource_paths),
expanded_paths,
self._aws_account_id,
self._kms_key,
)
roles_created.append(role)
# We need to create a buffer between the role creation and the role update, Otherwise the update fails.
for created_role in roles_created:
self._aws_resources.update_uc_role(
created_role.role_name, f"arn:aws:iam::{self._aws_account_id}:role/{created_role.role_name}"
)
return roles_created

def update_uc_role_trust_policy(self, role_name, external_id="0000"):
return self._aws_resources.update_uc_trust_role(role_name, external_id)
def update_uc_role(self, role_name, role_arn, external_id="0000"):
return self._aws_resources.update_uc_role(role_name, role_arn, external_id)

def save_uc_compatible_roles(self):
uc_role_access = list(self._get_role_access())
Expand Down Expand Up @@ -137,6 +149,9 @@ def _get_role_access(self):
# Aggregating the outputs from all the tasks
return sum(Threads.strict("Scanning Roles", tasks), [])

def _get_role_arn(self, role_name: str):
return f"arn:aws:iam::{self._aws_account_id}:role/" + role_name

def _get_role_access_task(self, arn: str, role_name: str):
policy_actions = []
policies = list(self._aws_resources.list_role_policies(role_name))
Expand Down Expand Up @@ -181,6 +196,29 @@ def _identify_missing_paths(self):
missing_paths.add(external_location.location)
return missing_paths

def get_roles_to_migrate(self) -> list[AWSCredentialCandidate]:
"""
Identify the roles that need to be migrated to UC from the UC compatible roles list.
"""
external_locations = self._locations.snapshot()
compatible_roles = self.load_uc_compatible_roles()
roles: dict[str, AWSCredentialCandidate] = {}
for external_location in external_locations:
path = PurePath(external_location.location)
for role in compatible_roles:
if not (path.match(role.resource_path) or path.match(role.resource_path + "/*")):
continue
if role.role_arn not in roles:
roles[role.role_arn] = AWSCredentialCandidate(
role_arn=role.role_arn, privilege=role.privilege, paths=set([external_location.location])
)
continue
roles[role.role_arn].paths.add(external_location.location)
if role.privilege == Privilege.WRITE_FILES.value:
roles[role.role_arn].privilege = Privilege.WRITE_FILES.value

return list(roles.values())

def _get_cluster_policy(self, policy_id: str | None) -> Policy:
if not policy_id:
msg = "Cluster policy not found in UCX config"
Expand Down
40 changes: 23 additions & 17 deletions src/databricks/labs/ucx/aws/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
ValidationResultResult,
)

from databricks.labs.ucx.assessment.aws import AWSRoleAction, AWSUCRoleCandidate
from databricks.labs.ucx.assessment.aws import AWSRoleAction, AWSUCRoleCandidate, AWSCredentialCandidate
from databricks.labs.ucx.aws.access import AWSResourcePermissions

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -52,12 +52,12 @@ def list(self, include_names: set[str] | None = None) -> set[str]:
logger.info(f"Found {len(iam_roles)} distinct IAM roles already used in UC storage credentials")
return iam_roles

def create(self, role_action: AWSRoleAction) -> StorageCredentialInfo:
def create(self, name: str, role_arn: str, read_only: bool) -> StorageCredentialInfo:
return self._ws.storage_credentials.create(
role_action.role_name,
aws_iam_role=AwsIamRoleRequest(role_action.role_arn),
comment=f"Created by UCX during migration to UC using AWS IAM Role: {role_action.role_name}",
read_only=role_action.privilege == Privilege.READ_FILES.value,
name,
aws_iam_role=AwsIamRoleRequest(role_arn),
comment=f"Created by UCX during migration to UC using AWS IAM Role: {name}",
read_only=read_only,
)

def validate(self, role_action: AWSRoleAction) -> CredentialValidationResult:
Expand Down Expand Up @@ -122,17 +122,18 @@ def __init__(
self._storage_credential_manager = storage_credential_manager

@staticmethod
def _print_action_plan(iam_list: list[AWSRoleAction]):
def _print_action_plan(iam_list: list[AWSCredentialCandidate]):
# print action plan to console for customer to review.
for iam in iam_list:
logger.info(f"{iam.role_arn}: {iam.privilege} on {iam.resource_path}")
logger.info(f"Credential {iam.role_name} --> {iam.role_arn}: {iam.privilege}")

def _generate_migration_list(self, include_names: set[str] | None = None) -> list[AWSRoleAction]:
def _generate_migration_list(self, include_names: set[str] | None = None) -> list[AWSCredentialCandidate]:
"""
Create the list of IAM roles that need to be migrated, output an action plan as a csv file for users to confirm
Create the list of IAM roles that need to be migrated, output an action plan as a csv file for users to confirm.
It returns a list of ARNs
"""
# load IAM role list
iam_list = self._resource_permissions.load_uc_compatible_roles()
iam_list = self._resource_permissions.get_roles_to_migrate()
# list existing storage credentials
sc_set = self._storage_credential_manager.list(include_names)
# check if the iam is already used in UC storage credential
Expand Down Expand Up @@ -161,16 +162,20 @@ def run(self, prompts: Prompts, include_names: set[str] | None = None) -> list[C

execution_result = []
for iam in iam_list:
storage_credential = self._storage_credential_manager.create(iam)
storage_credential = self._storage_credential_manager.create(
name=iam.role_name,
role_arn=iam.role_arn,
read_only=iam.privilege == Privilege.READ_FILES.value,
)
if storage_credential.aws_iam_role is None:
logger.error(f"Failed to create storage credential for IAM role: {iam.role_arn}")
continue

self._resource_permissions.update_uc_role_trust_policy(
iam.role_name, storage_credential.aws_iam_role.external_id
self._resource_permissions.update_uc_role(
iam.role_name, iam.role_arn, storage_credential.aws_iam_role.external_id
)

execution_result.append(self._storage_credential_manager.validate(iam))
for path in iam.paths:
role_action = AWSRoleAction(iam.role_arn, "s3", path, iam.privilege)
execution_result.append(self._storage_credential_manager.validate(role_action))

if execution_result:
results_file = self.save(execution_result)
Expand Down Expand Up @@ -224,3 +229,4 @@ def run(self, prompts: Prompts, *, single_role=False, role_name="UC_ROLE", polic
return

self._resource_permissions.create_uc_roles(iam_list)
self._resource_permissions.save_uc_compatible_roles()
8 changes: 4 additions & 4 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,10 @@ def migrate_credentials(w: WorkspaceClient, prompts: Prompts, ctx: WorkspaceCont
Principals to location mapping are listed in {install_folder}/.ucx/azure_storage_account_info.csv which is generated
by principal_prefix_access command. Please review the file and delete the Service Principals you do not want to be
migrated. The command will only migrate the Service Principals that have client secret stored in Databricks Secret.
For AWS, this command migrates AWS Instance Profiles that are being used in Databricks, to UC storage credentials.
The AWS Instance Profiles to location mapping are listed in
{install_folder}/.ucx/aws_instance_profile_info.csv which is generated by principal_prefix_access command.
Please review the file and delete the Instance Profiles you do not want to be migrated.
For AWS, this command migrates AWS UC compatible roles that are required by Databricks, to UC storage credentials.
The AWS Roles to location mapping are listed in
{install_folder}/.ucx/uc_roles_access.csv which is generated by principal_prefix_access command.
Please review the file and delete the Roles you do not want to be migrated.
Pass aws_profile for aws.
"""
if not ctx:
Expand Down
Loading
Loading