Your resource for web content, online publishing
and the distribution of digital products.
«  
  »
S M T W T F S
 
 
1
 
2
 
3
 
4
 
5
 
6
 
7
 
8
 
9
 
10
 
11
 
12
 
13
 
14
 
15
 
16
 
17
 
18
 
19
 
20
 
21
 
22
 
23
 
24
 
25
 
26
 
27
 
28
 
29
 
30
 
 
 
 

How to Build a Scalable Data Mesh in AWS with Lake Formation

Tags: content
DATE POSTED:February 17, 2025

\ In today's world, where data is power, traditional centralized designs often become chokepoints that impede both access to data and innovation. Data Mesh is a modern approach that decentralizes ownership and treats data as a product managed by domain teams. AWS Lake Formation (LF) simplifies secure, cross-account sharing and governance, including all three: control of what users can do across accounts, and keeping track of compliance with regulations and standards. Leveraging Lake Formation, AWS Lambda, SQS, IAM, and S3 an organization can now implement a truly scalable Data Mesh architecture fostering self-serve analytics interoperability and federated governance without compromising security.

Architecture Overview

\  Data Mesh using Lake Formation

\ The architecture follows a Data Mesh design that is concerned with two AWS accounts: Account A (Producer) and Account B (Consumer). The objective is to transfer AWS Glue Data Catalog tables from Account A to Account B in a secure manner with the help of services like AWS Lake Formation (LF), S3, Lambda, and Amazon SQS.

\

Intake Process and Manifest File

Here, a manifest.json file is the single most important system configuration file and makes it clear who has access to what, such as role, database name, account ID, and permissions granted to them. Within our firm, the service intake procedure is run using ServiceNow. A requester raises a ServiceNow (SNOW) ticket with each relevant piece of information arranged within structured forms. The resulting manifest.json file is then generated and placed into an S3 bucket located in Account A after the ticket is approved within our backend systems.

Data Sharing Process
  1. Producer Lambda in Account A
  • An event in AWS Lambda (producer.py) is scheduled to occur whenever the manifest.json file is dropped in Account A S3 bucket.

  • The producer Lambda checks the validity of the request and verifies the following:

    • If the request is for same-account or cross-account access.
    • Whether the S3 bucket is registered in AWS Lake Formation (LF).
  • Once validation is complete, the producer Lambda sends a message to an Amazon SQS queue in Account B.

  • This message includes details about AWS Resource Access Manager (RAM), which facilitates cross-account resource sharing.

    \

  1. Consumer Lambda in Account B
  • Upon receiving the SQS message, a consumer Lambda function (consumer.py) in Account B is triggered.
  • It processes the request and grants the necessary permissions in Lake Formation for Account B’s role to access the shared Glue Data Catalog tables from Account A.
  • Once access is granted, a corresponding database/table entry will be created in Account B by the consumer lambda for users to query against this shared data.

\ This provides automated, scalable architecture for secure, governed, and efficient sharing of data among AWS accounts while allowing compliance utilizing AWS Lake Formation, IAM roles, and AWS Glue Data Catalog.

\ Below are the provided sample JSON, Producer Code, and Consumer Code which can be used to create and run the above architecture.

\ Manifest.JSON

{ "Records": [{ "AccessType": "grant", "Principal": "arn of IAM user/role in account a (if granting same account)", "Table": { "DatabaseName": "database name", "Name": "table name", "Wildcard":false }, "Permissions": ["SELECT"], "Cross_account": true, "AccountID": "112233445566", "Cross_role":"arn of cross account IAM user/role (if granting cross account)" }] }

\ Producer Code - producer.py

import boto3 import json import logging import os from botocore.exceptions import ClientError # Logger setup logger = logging.getLogger() logger.setLevel(logging.INFO) # Environment Variables ADMIN_ROLE_ARN = os.environ['ADMIN_ROLE_ARN'] LF_REGISTER_ROLE = os.environ['LF_ROLE'] QUEUE_URL = os.environ['CROSS_QUEUE_URL'] S3_POLICY_ARN = os.environ['S3_POLICY_ARN'] S3_POLICY_NAME = os.environ['S3_POLICY_NAME'] def get_ram_invite(ram_client): """Retrieve Resource Access Manager (RAM) invite for cross-account sharing.""" try: response = ram_client.get_resource_share_associations( associationType='PRINCIPAL', associationStatus='ASSOCIATING' ) return response['resourceShareAssociations'][0]['resourceShareArn'] except Exception as e: logger.error(f"Error retrieving RAM invite: {e}") raise def delete_oldest_policy_version(iam_client, policy_arn): """Deletes the oldest version of an IAM policy (max 5 versions allowed).""" try: versions = iam_client.list_policy_versions(PolicyArn=policy_arn)['Versions'] non_default_versions = [v['VersionId'] for v in versions if not v['IsDefaultVersion']] if non_default_versions: oldest_version = min(non_default_versions) iam_client.delete_policy_version(PolicyArn=policy_arn, VersionId=oldest_version) except Exception as e: logger.error(f"Error deleting old policy version: {e}") raise def update_lf_s3_policy(iam_client, iam_resource, bucket_name): """Modifies the Lake Formation IAM policy to include S3 paths.""" try: account_id = boto3.client('sts').get_caller_identity()['Account'] policy_arn = f'arn:aws:iam::{account_id}:policy/{S3_POLICY_NAME}' policy = iam_resource.Policy(policy_arn) policy_json = policy.default_version.document s3_arn = f'arn:aws:s3:::{bucket_name}' updated = False for statement in policy_json['Statement']: if s3_arn not in statement['Resource']: statement['Resource'].append(f'{s3_arn}/*') updated = True if updated: delete_oldest_policy_version(iam_client, S3_POLICY_ARN) iam_client.create_policy_version( PolicyArn=policy_arn, PolicyDocument=json.dumps(policy_json), SetAsDefault=True ) except Exception as e: logger.error(f"Error updating LF S3 policy: {e}") raise def register_s3_location(iam_client, s3_client, glue_client, lf_client, database, table, iam_resource, is_table=True): """Registers an S3 location with Lake Formation.""" try: s3_location = glue_client.get_table(DatabaseName=database, Name=table)['Table']['StorageDescriptor']['Location'] if is_table else \ glue_client.get_database(Name=database)['Database']['LocationUri'] bucket_name = s3_location.split('/')[2] registered_buckets = [res['ResourceArn'].split(':::')[1] for res in lf_client.list_resources()['ResourceInfoList']] if bucket_name not in registered_buckets: lf_client.register_resource( ResourceArn=f'arn:aws:s3:::{bucket_name}', UseServiceLinkedRole=False, RoleArn=LF_REGISTER_ROLE ) update_lf_s3_policy(iam_client, iam_resource, bucket_name) except ClientError as e: logger.error(f"Error registering S3 location: {e}") raise def grant_data_location_permissions(lf_client, glue_client, principal, database, table, is_table=True): """Grants Data Location Permissions to a principal.""" try: s3_location = glue_client.get_table(DatabaseName=database, Name=table)['Table']['StorageDescriptor']['Location'] if is_table else \ glue_client.get_database(Name=database)['Database']['LocationUri'] bucket_name = s3_location.split('/')[2] lf_client.grant_permissions( Principal={'DataLakePrincipalIdentifier': principal}, Resource={'DataLocation': {'ResourceArn': f'arn:aws:s3:::{bucket_name}'}}, Permissions=['DATA_LOCATION_ACCESS'], PermissionsWithGrantOption=['DATA_LOCATION_ACCESS'] ) except ClientError as e: logger.error(f"Error granting Data Location Permissions: {e}") def create_resource(database, table=None, wildcard=False): """Creates a resource dictionary for granting permissions.""" if database and table: return {'Table': {'DatabaseName': database, 'Name': table}} elif database and wildcard: return {'Table': {'DatabaseName': database, 'TableWildcard': {}}} elif database: return {'Database': {'Name': database}} return None def revoke_permission(lf_client, principal, permissions, database, table, wildcard): """Revokes permissions from a principal.""" try: resource = create_resource(database, table, wildcard) lf_client.revoke_permissions( Principal={'DataLakePrincipalIdentifier': principal}, Resource=resource, Permissions=permissions ) except Exception as e: logger.error(f"Error revoking permissions for {principal}: {e}") raise def lambda_handler(event, context): """Lambda function to process S3 event and manage Lake Formation permissions.""" try: sts_client = boto3.client('sts') assume_role_response = sts_client.assume_role( RoleArn=ADMIN_ROLE_ARN, RoleSessionName='LFSession' ) aws_session = boto3.session.Session( aws_access_key_id=assume_role_response['Credentials']['AccessKeyId'], aws_secret_access_key=assume_role_response['Credentials']['SecretAccessKey'], aws_session_token=assume_role_response['Credentials']['SessionToken'] ) s3_client = aws_session.client("s3") bucket_name = event['Records'][0]['s3']['bucket']['name'] file_key = event['Records'][0]['s3']['object']['key'] obj = s3_client.get_object(Bucket=bucket_name, Key=file_key) json_content = json.loads(obj["Body"].read().decode('utf-8')) # Extracting manifest file details record = json_content['Records'][0] access_type = record['AccessType'] principal = record['Principal'] database = record['Table']['DatabaseName'] table = record['Table']['Name'] permissions = record['Permissions'] cross_account = record['Cross_account'] glue_client = aws_session.client('glue') lf_client = aws_session.client('lakeformation') iam_client = aws_session.client('iam') iam_resource = aws_session.resource('iam') if access_type == 'revoke': revoke_permission(lf_client, principal, permissions, database, table, wildcard=False) else: register_s3_location(iam_client, s3_client, glue_client, lf_client, database, table, iam_resource) grant_data_location_permissions(lf_client, glue_client, principal, database, table) except Exception as e: logger.error(f"Lambda execution error: {e}") raise

\ \ Consumer Code - consumer.py

import boto3 import json import logging import os from botocore.exceptions import ClientError # Logger setup logger = logging.getLogger() logger.setLevel(logging.INFO) # Environment Variables ACCOUNT_A = os.environ['SRC_ACC_NUM'] ADMIN_ROLE_ARN = os.environ['ADMIN_ROLE_ARN'] def assume_role(role_arn): """Assume AWS IAM Role and return a session with temporary credentials.""" sts_client = boto3.client('sts') try: response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="LFSession") return boto3.session.Session( aws_access_key_id=response['Credentials']['AccessKeyId'], aws_secret_access_key=response['Credentials']['SecretAccessKey'], aws_session_token=response['Credentials']['SessionToken'] ) except ClientError as e: logger.error(f"Error assuming role {role_arn}: {e}") raise def get_ram_invite(ram_client, ram_arn): """Retrieve a Resource Access Manager (RAM) invitation.""" try: response = ram_client.get_resource_share_invitations(resourceShareArns=[ram_arn]) return response['resourceShareInvitations'][0]['resourceShareInvitationArn'] except ClientError as e: logger.error(f"Error retrieving RAM invite: {e}") raise def accept_ram_invite(ram_client, ram_invite): """Accept a RAM invitation.""" try: ram_client.accept_resource_share_invitation(resourceShareInvitationArn=ram_invite) except ClientError: logger.info("RAM invite already accepted") def create_database(glue_client, database_name): """Create a Glue database if it does not already exist.""" try: glue_client.create_database(DatabaseInput={'Name': database_name}) logger.info(f"Created database: {database_name}") except ClientError: logger.info(f"Database {database_name} already exists") def create_resource_link_database(glue_client, rl_name, source_db, account_id): """Create a resource link for a shared Glue database.""" try: glue_client.create_database(DatabaseInput={ 'Name': rl_name, "TargetDatabase": { "CatalogId": account_id, "DatabaseName": source_db } }) logger.info(f"Created resource link database: {rl_name}") except ClientError: logger.info(f"Resource link {rl_name} already exists") def create_resource_link_table(glue_client, rl_db, rl_table, source_db, source_table, account_id): """Create a resource link for a shared Glue table.""" try: glue_client.create_table( DatabaseName=rl_db, TableInput={ "Name": rl_table, "TargetTable": { "CatalogId": account_id, "DatabaseName": source_db, "Name": source_table } } ) logger.info(f"Created resource link table: {rl_table}") except ClientError: logger.info(f"Resource link table {rl_table} already exists") def grant_permissions(lf_client, principal, resource, permissions): """Grant permissions to a principal on a specified resource.""" try: lf_client.grant_permissions( Principal={"DataLakePrincipalIdentifier": principal}, Resource=resource, Permissions=permissions, PermissionsWithGrantOption=permissions ) except ClientError as e: logger.error(f"Error granting permissions to {principal}: {e}") raise def revoke_permissions(lf_client, principal, resource, permissions): """Revoke permissions from a principal.""" try: lf_client.revoke_permissions( Principal={"DataLakePrincipalIdentifier": principal}, Resource=resource, Permissions=permissions ) except ClientError as e: logger.error(f"Error revoking permissions from {principal}: {e}") raise def construct_resource(database, table=None, wildcard=False, catalog_id=None): """Construct the resource dictionary for permissions.""" if table: return {"Table": {"DatabaseName": database, "Name": table, **({"CatalogId": catalog_id} if catalog_id else {})}} elif wildcard: return {"Table": {"DatabaseName": database, "TableWildcard": {}}} else: return {"Database": {"Name": database}} def lambda_handler(event, context): """Lambda function to process SQS messages and manage Lake Formation permissions.""" try: records = [json.loads(record["body"]) for record in event['Records']] except (json.JSONDecodeError, KeyError) as e: logger.error(f"Error processing event data: {e}") return aws_session = assume_role(ADMIN_ROLE_ARN) # AWS Clients lf_client = aws_session.client('lakeformation') glue_client = aws_session.client('glue') ram_client = aws_session.client('ram') for record in records: ram_arn = record.get('ram_url') principal = record.get('cross_role') database = record.get('db_name') table = record.get('table_name') permissions = record.get('permissions', []) wildcard = record.get('wild_card', False) access_type = record.get('access_type') rl_database = f'rl_{database}' db_target = f'{database}_shared' table_target = f'rl_{table}' if access_type == 'grant': try: ram_invite = get_ram_invite(ram_client, ram_arn) accept_ram_invite(ram_client, ram_invite) except Exception as e: logger.error(f"Error accepting RAM invite: {e}") # Handle Database/Table Creation if database and table: create_database(glue_client, db_target) create_resource_link_table(glue_client, db_target, table_target, database, table, ACCOUNT_A) elif database: create_resource_link_database(glue_client, rl_database, database, ACCOUNT_A) # Handle Permissions try: resource_db = construct_resource(db_target) resource_table = construct_resource(db_target, table_target) if access_type == 'grant': if database and table: grant_permissions(lf_client, principal, resource_db, ['ALL']) grant_permissions(lf_client, principal, resource_table, permissions) elif database: resource_wildcard = construct_resource(database, wildcard=True) grant_permissions(lf_client, principal, resource_wildcard, permissions) else: if database and table: revoke_permissions(lf_client, principal, resource_db, ['ALL']) revoke_permissions(lf_client, principal, resource_table, permissions) elif database: resource_wildcard = construct_resource(rl_database, wildcard=True) revoke_permissions(lf_client, principal, resource_wildcard, permissions) except Exception as e: logger.error(f"Error modifying permissions: {e}")

\

Runtime Notes:

For producer script:

  1. Create Lambda Function and upload the producer.py code
  2. Add an environment variable called ADMINROLEARN and add the Data Lake Admin role ARN as the value
  3. Add an environment variable called CROSSQUEUEURL and add the consumer queue URL as the value
  4. Add an environment variable called LF_ROLE and add Lake Formation Service Role arn for Account A
  5. Add an environment variable called S3POLICYARN and add s3 custom policy arn as the value

\ For consumer script:

  1. Create an AWS Lambda Function and upload the consumer.py code
  2. Add an environment variable called SRCACCNUM and provide the source AWS account number as the value
  3. Add an environment variable called ADMINROLEARN and add the Data Lake Admin Role arn as the value

\

Conclusion

Using AWS's Lake Formation, Glue Data Catalog, IAM, and S3 to put a Data Mesh into action gives you a way to spread out data ownership that's both flexible and safe, all while keeping a close eye on things. With the help of Lambda, SQS, and AWS Resource Access Manager (RAM), sharing data across different accounts can be automated, making it simpler for organizations to control access and let different teams handle their own data products without a hitch. This setup lets people do their own data analysis while still following the rules for compliance and security. As the world of data keeps changing, embracing a method like this, which is both unified and well-regulated, can make data easier to get to, boost teamwork, and lead to better decisions throughout a company.

\

References

\

Tags: content