7.6 C
London
Thursday, April 25, 2024

Orchestrate an end-to-end ETL pipeline utilizing Amazon S3, AWS Glue, and Amazon Redshift Serverless with Amazon MWAA


Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you should use to arrange and function information pipelines within the cloud at scale. Apache Airflow is an open supply software used to programmatically creator, schedule, and monitor sequences of processes and duties, known as workflows. With Amazon MWAA, you should use Apache Airflow and Python to create workflows with out having to handle the underlying infrastructure for scalability, availability, and safety.

Through the use of a number of AWS accounts, organizations can successfully scale their workloads and handle their complexity as they develop. This method offers a strong mechanism to mitigate the potential influence of disruptions or failures, ensuring that essential workloads stay operational. Moreover, it permits price optimization by aligning sources with particular use instances, ensuring that bills are effectively managed. By isolating workloads with particular safety necessities or compliance wants, organizations can preserve the very best ranges of knowledge privateness and safety. Moreover, the power to arrange a number of AWS accounts in a structured method means that you can align what you are promoting processes and sources based on your distinctive operational, regulatory, and budgetary necessities. This method promotes effectivity, flexibility, and scalability, enabling giant enterprises to satisfy their evolving wants and obtain their targets.

This put up demonstrates tips on how to orchestrate an end-to-end extract, remodel, and cargo (ETL) pipeline utilizing Amazon Easy Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA.

Resolution overview

For this put up, we contemplate a use case the place an information engineering crew desires to construct an ETL course of and provides the most effective expertise to their end-users after they need to question the newest information after new uncooked information are added to Amazon S3 within the central account (Account A within the following structure diagram). The info engineering crew desires to separate the uncooked information into its personal AWS account (Account B within the diagram) for elevated safety and management. In addition they need to carry out the info processing and transformation work in their very own account (Account B) to compartmentalize duties and stop any unintended modifications to the supply uncooked information current within the central account (Account A). This method permits the crew to course of the uncooked information extracted from Account A to Account B, which is devoted for information dealing with duties. This makes certain the uncooked and processed information may be maintained securely separated throughout a number of accounts, if required, for enhanced information governance and safety.

Our resolution makes use of an end-to-end ETL pipeline orchestrated by Amazon MWAA that appears for brand spanking new incremental information in an Amazon S3 location in Account A, the place the uncooked information is current. That is carried out by invoking AWS Glue ETL jobs and writing to information objects in a Redshift Serverless cluster in Account B. The pipeline then begins operating saved procedures and SQL instructions on Redshift Serverless. Because the queries end operating, an UNLOAD operation is invoked from the Redshift information warehouse to the S3 bucket in Account A.

As a result of safety is vital, this put up additionally covers tips on how to configure an Airflow connection utilizing AWS Secrets and techniques Supervisor to keep away from storing database credentials inside Airflow connections and variables.

The next diagram illustrates the architectural overview of the elements concerned within the orchestration of the workflow.

The workflow consists of the next elements:

  • The supply and goal S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a special account (Account B). Cross-account entry has been arrange between S3 buckets in Account A with sources in Account B to have the ability to load and unload information.
  • Within the second account, Amazon MWAA is hosted in a single VPC and Redshift Serverless in a special VPC, that are related by VPC peering. A Redshift Serverless workgroup is secured inside personal subnets throughout three Availability Zones.
  • Secrets and techniques like person title, password, DB port, and AWS Area for Redshift Serverless are saved in Secrets and techniques Supervisor.
  • VPC endpoints are created for Amazon S3 and Secrets and techniques Supervisor to work together with different sources.
  • Often, information engineers create an Airflow Directed Acyclic Graph (DAG) and commit their modifications to GitHub. With GitHub actions, they’re deployed to an S3 bucket in Account B (for this put up, we add the information into S3 bucket immediately). The S3 bucket shops Airflow-related information like DAG information, necessities.txt information, and plugins. AWS Glue ETL scripts and belongings are saved in one other S3 bucket. This separation helps preserve group and keep away from confusion.
  • The Airflow DAG makes use of varied operators, sensors, connections, duties, and guidelines to run the info pipeline as wanted.
  • The Airflow logs are logged in Amazon CloudWatch, and alerts may be configured for monitoring duties. For extra info, see Monitoring dashboards and alarms on Amazon MWAA.

Stipulations

As a result of this resolution facilities round utilizing Amazon MWAA to orchestrate the ETL pipeline, it is advisable arrange sure foundational sources throughout accounts beforehand. Particularly, it is advisable create the S3 buckets and folders, AWS Glue sources, and Redshift Serverless sources of their respective accounts previous to implementing the total workflow integration utilizing Amazon MWAA.

Deploy sources in Account A utilizing AWS CloudFormation

In Account A, launch the offered AWS CloudFormation stack to create the next sources:

  • The supply and goal S3 buckets and folders. As a greatest observe, the enter and output bucket constructions are formatted with hive fashion partitioning as s3://<bucket>/merchandise/YYYY/MM/DD/.
  • A pattern dataset known as merchandise.csv, which we use on this put up.

Add the AWS Glue job to Amazon S3 in Account B

In Account B, create an Amazon S3 location known as aws-glue-assets-<account-id>-<area>/scripts (if not current). Exchange the parameters for the account ID and Area within the sample_glue_job.py script and add the AWS Glue job file to the Amazon S3 location.

Deploy sources in Account B utilizing AWS CloudFormation

In Account B, launch the offered CloudFormation stack template to create the next sources:

  • The S3 bucket airflow-<username>-bucket to retailer Airflow-related information with the next construction:
    • dags – The folder for DAG information.
    • plugins – The file for any customized or group Airflow plugins.
    • necessities – The necessities.txt file for any Python packages.
    • scripts – Any SQL scripts used within the DAG.
    • information – Any datasets used within the DAG.
  • A Redshift Serverless surroundings. The title of the workgroup and namespace are prefixed with pattern.
  • An AWS Glue surroundings, which accommodates the next:
    • An AWS Glue crawler, which crawls the info from the S3 supply bucket sample-inp-bucket-etl-<username> in Account A.
    • A database known as products_db within the AWS Glue Knowledge Catalog.
    • An ELT job known as sample_glue_job. This job can learn information from the merchandise desk within the Knowledge Catalog and cargo information into the Redshift desk merchandise.
  • A VPC gateway endpointto Amazon S3.
  • An Amazon MWAA surroundings. For detailed steps to create an Amazon MWAA surroundings utilizing the Amazon MWAA console, seek advice from Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

launch stack 1

Create Amazon Redshift sources

Create two tables and a saved process on an Redshift Serverless workgroup utilizing the merchandise.sql file.

On this instance, we create two tables known as merchandise and products_f. The title of the saved process is sp_products.

Configure Airflow permissions

After the Amazon MWAA surroundings is created efficiently, the standing will present as Accessible. Select Open Airflow UI to view the Airflow UI. DAGs are mechanically synced from the S3 bucket and visual within the UI. Nevertheless, at this stage, there aren’t any DAGs within the S3 folder.

Add the shopper managed coverage AmazonMWAAFullConsoleAccess, which grants Airflow customers permissions to entry AWS Id and Entry Administration (IAM) sources, and fix this coverage to the Amazon MWAA position. For extra info, see Accessing an Amazon MWAA surroundings.

The insurance policies hooked up to the Amazon MWAA position have full entry and should solely be used for testing functions in a safe take a look at surroundings. For manufacturing deployments, comply with the least privilege precept.

Arrange the surroundings

This part outlines the steps to configure the surroundings. The method includes the next high-level steps:

  1. Replace any vital suppliers.
  2. Arrange cross-account entry.
  3. Set up a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC.
  4. Configure Secrets and techniques Supervisor to combine with Amazon MWAA.
  5. Outline Airflow connections.

Replace the suppliers

Observe the steps on this part in case your model of Amazon MWAA is lower than 2.8.1 (the newest model as of scripting this put up).

Suppliers are packages which are maintained by the group and embody all of the core operators, hooks, and sensors for a given service. The Amazon supplier is used to work together with AWS providers like Amazon S3, Amazon Redshift Serverless, AWS Glue, and extra. There are over 200 modules throughout the Amazon supplier.

Though the model of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon offered package deal model 8.2.0, assist for Amazon Redshift Serverless was not added till the Amazon offered package deal model 8.4.0. As a result of the default bundled supplier model is older than when Redshift Serverless assist was launched, the supplier model have to be upgraded to be able to use that performance.

Step one is to replace the constraints file and necessities.txt file with the right variations. Confer with Specifying newer supplier packages for steps to replace the Amazon supplier package deal.

  1. Specify the necessities as follows:
    --constraint "/usr/native/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. Replace the model within the constraints file to eight.4.0 or larger.
  3. Add the constraints-3.11-updated.txt file to the /dags folder.

Confer with Apache Airflow variations on Amazon Managed Workflows for Apache Airflow for proper variations of the constraints file relying on the Airflow model.

  1. Navigate to the Amazon MWAA surroundings and select Edit.
  2. Below DAG code in Amazon S3, for Necessities file, select the newest model.
  3. Select Save.

This can replace the surroundings and new suppliers will likely be in impact.

  1. To confirm the suppliers model, go to Suppliers underneath the Admin desk.

The model for the Amazon supplier package deal must be 8.4.0, as proven within the following screenshot. If not, there was an error whereas loading necessities.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, the place errors are listed. Confer with Enabling logs on the Amazon MWAA console for extra particulars.

Arrange cross-account entry

You want to arrange cross-account insurance policies and roles between Account A and Account B to entry the S3 buckets to load and unload information. Full the next steps:

  1. In Account A, configure the bucket coverage for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>:
    {
        "Model": "2012-10-17",
        "Assertion": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Motion": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Useful resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. Equally, configure the bucket coverage for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to place objects on this bucket:
    {
        "Model": "2012-10-17",
        "Assertion": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Useful resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. In Account A, create an IAM coverage known as policy_for_roleA, which permits vital Amazon S3 actions on the output bucket:
    {
        "Model": "2012-10-17",
        "Assertion": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Useful resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Impact": "Enable",
                "Motion": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Useful resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. Create a brand new IAM position known as RoleA with Account B because the trusted entity position and add this coverage to the position. This permits Account B to imagine RoleA to carry out vital Amazon S3 actions on the output bucket.
  5. In Account B, create an IAM coverage known as s3-cross-account-access with permission to entry objects within the bucket sample-inp-bucket-etl-<username>, which is in Account A.
  6. Add this coverage to the AWS Glue position and Amazon MWAA position:
    {
        "Model": "2012-10-17",
        "Assertion": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Useful resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. In Account B, create the IAM coverage policy_for_roleB specifying Account A as a trusted entity. The next is the belief coverage to imagine RoleA in Account A:
    {
        "Model": "2012-10-17",
        "Assertion": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. Create a brand new IAM position known as RoleB with Amazon Redshift because the trusted entity sort and add this coverage to the position. This permits RoleB to imagine RoleA in Account A and likewise to be assumable by Amazon Redshift.
  9. Connect RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A.
  10. Connect the coverage policy_for_roleB to the Amazon MWAA position, which permits Amazon MWAA to entry the output bucket in Account A.

Confer with How do I present cross-account entry to things which are in Amazon S3 buckets? for extra particulars on organising cross-account entry to things in Amazon S3 from AWS Glue and Amazon MWAA. Confer with How do I COPY or UNLOAD information from Amazon Redshift to an Amazon S3 bucket in one other account? for extra particulars on organising roles to unload information from Amazon Redshift to Amazon S3 from Amazon MWAA.

Arrange VPC peering between the Amazon MWAA and Amazon Redshift VPCs

As a result of Amazon MWAA and Amazon Redshift are in two separate VPCs, it is advisable arrange VPC peering between them. You will need to add a path to the route tables related to the subnets for each providers. Confer with Work with VPC peering connections for particulars on VPC peering.

Be sure that CIDR vary of the Amazon MWAA VPC is allowed within the Redshift safety group and the CIDR vary of the Amazon Redshift VPC is allowed within the Amazon MWAA safety group, as proven within the following screenshot.

If any of the previous steps are configured incorrectly, you’re more likely to encounter a “Connection Timeout” error within the DAG run.

Configure the Amazon MWAA reference to Secrets and techniques Supervisor

When the Amazon MWAA pipeline is configured to make use of Secrets and techniques Supervisor, it’ll first search for connections and variables in an alternate backend (like Secrets and techniques Supervisor). If the alternate backend accommodates the wanted worth, it’s returned. In any other case, it’ll examine the metadata database for the worth and return that as a substitute. For extra particulars, seek advice from Configuring an Apache Airflow connection utilizing an AWS Secrets and techniques Supervisor secret.

Full the next steps:

  1. Configure a VPC endpoint to hyperlink Amazon MWAA and Secrets and techniques Supervisor (com.amazonaws.us-east-1.secretsmanager).

This permits Amazon MWAA to entry credentials saved in Secrets and techniques Supervisor.

  1. To supply Amazon MWAA with permission to entry Secrets and techniques Supervisor secret keys, add the coverage known as SecretsManagerReadWrite to the IAM position of the surroundings.
  2. To create the Secrets and techniques Supervisor backend as an Apache Airflow configuration possibility, go to the Airflow configuration choices, add the next key-value pairs, and save your settings.

This configures Airflow to search for connection strings and variables on the airflow/connections/* and airflow/variables/* paths:

secrets and techniques.backend: airflow.suppliers.amazon.aws.secrets and techniques.secrets_manager.SecretsManagerBackend secrets and techniques.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}

  1. To generate an Airflow connection URI string, go to AWS CloudShell and enter right into a Python shell.
  2. Run the next code to generate the connection URI string:
    import urllib.parse
    conn_type="redshift"
    host="sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com" #Specify the Amazon Redshift workgroup endpoint
    port="5439"
    login = 'admin' #Specify the username to make use of for authentication with Amazon Redshift
    password = '<password>' #Specify the password to make use of for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:position/service-role/<MWAA-role>')
    database="dev"
    area = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&area={7}'.format(conn_type, login, password, host, port, role_arn, database, area)
    print(conn_string)
    

The connection string must be generated as follows:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA position ARN>&database=dev&area=<area>

  1. Add the connection in Secrets and techniques Supervisor utilizing the next command within the AWS Command Line Interface (AWS CLI).

This will also be carried out from the Secrets and techniques Supervisor console. This will likely be added in Secrets and techniques Supervisor as plaintext.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA position ARN>&database=dev&area=us-east-1" --region=us-east-1

Use the connection airflow/connections/secrets_redshift_connection within the DAG. When the DAG is run, it’ll search for this connection and retrieve the secrets and techniques from Secrets and techniques Supervisor. In case of RedshiftDataOperator, cross the secret_arn as a parameter as a substitute of connection title.

You may as well add secrets and techniques utilizing the Secrets and techniques Supervisor console as key-value pairs.

  1. Add one other secret in Secrets and techniques Supervisor in and put it aside as airflow/connections/redshift_conn_test.

Create an Airflow connection by the metadata database

You may as well create connections within the UI. On this case, the connection particulars will likely be saved in an Airflow metadata database. If the Amazon MWAA surroundings isn’t configured to make use of the Secrets and techniques Supervisor backend, it’ll examine the metadata database for the worth and return that. You possibly can create an Airflow connection utilizing the UI, AWS CLI, or API. On this part, we present tips on how to create a connection utilizing the Airflow UI.

  1. For Connection Id, enter a reputation for the connection.
  2. For Connection Kind, select Amazon Redshift.
  3. For Host, enter the Redshift endpoint (with out port and database) for Redshift Serverless.
  4. For Database, enter dev.
  5. For Person, enter your admin person title.
  6. For Password, enter your password.
  7. For Port, use port 5439.
  8. For Further, set the area and timeout parameters.
  9. Check the connection, then save your settings.

Create and run a DAG

On this part, we describe tips on how to create a DAG utilizing varied elements. After you create and run the DAG, you possibly can confirm the outcomes by querying Redshift tables and checking the goal S3 buckets.

Create a DAG

In Airflow, information pipelines are outlined in Python code as DAGs. We create a DAG that consists of varied operators, sensors, connections, duties, and guidelines:

  • The DAG begins with in search of supply information within the S3 bucket sample-inp-bucket-etl-<username> underneath Account A for the present day utilizing S3KeySensor. S3KeySensor is used to attend for one or a number of keys to be current in an S3 bucket.
    • For instance, our S3 bucket is partitioned as s3://bucket/merchandise/YYYY/MM/DD/, so our sensor ought to examine for folders with the present date. We derived the present date within the DAG and handed this to S3KeySensor, which seems to be for any new information within the present day folder.
    • We additionally set wildcard_match as True, which permits searches on bucket_key to be interpreted as a Unix wildcard sample. Set the mode to reschedule in order that the sensor job frees the employee slot when the standards isn’t met and it’s rescheduled at a later time. As a greatest observe, use this mode when poke_interval is greater than 1 minute to stop an excessive amount of load on a scheduler.
  • After the file is obtainable within the S3 bucket, the AWS Glue crawler runs utilizing GlueCrawlerOperator to crawl the S3 supply bucket sample-inp-bucket-etl-<username> underneath Account A and updates the desk metadata underneath the products_db database within the Knowledge Catalog. The crawler makes use of the AWS Glue position and Knowledge Catalog database that had been created within the earlier steps.
  • The DAG makes use of GlueCrawlerSensor to attend for the crawler to finish.
  • When the crawler job is full, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script title (together with location) and is handed to the operator together with the AWS Glue IAM position. Different parameters like GlueVersion, NumberofWorkers, and WorkerType are handed utilizing the create_job_kwargs parameter.
  • The DAG makes use of GlueJobSensor to attend for the AWS Glue job to finish. When it’s full, the Redshift staging desk merchandise will likely be loaded with information from the S3 file.
  • You possibly can connect with Amazon Redshift from Airflow utilizing three totally different operators:
    • PythonOperator.
    • SQLExecuteQueryOperator, which makes use of a PostgreSQL connection and redshift_default because the default connection.
    • RedshiftDataOperator, which makes use of the Redshift Knowledge API and aws_default because the default connection.

In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to point out tips on how to use these operators. The Redshift saved procedures are run RedshiftDataOperator. The DAG additionally runs SQL instructions in Amazon Redshift to delete the info from the staging desk utilizing SQLExecuteQueryOperator.

As a result of we configured our Amazon MWAA surroundings to search for connections in Secrets and techniques Supervisor, when the DAG runs, it retrieves the Redshift connection particulars like person title, password, host, port, and Area from Secrets and techniques Supervisor. If the connection isn’t present in Secrets and techniques Supervisor, the values are retrieved from the default connections.

In SQLExecuteQueryOperator, we cross the connection title that we created in Secrets and techniques Supervisor. It seems to be for airflow/connections/secrets_redshift_connection and retrieves the secrets and techniques from Secrets and techniques Supervisor. If Secrets and techniques Supervisor isn’t arrange, the connection created manually (for instance, redshift-conn-id) may be handed.

In RedshiftDataOperator, we cross the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets and techniques Supervisor as a parameter.

  • As ultimate job, RedshiftToS3Operator is used to unload information from the Redshift desk to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets and techniques Supervisor is used for unloading the info.
  • TriggerRule is ready to ALL_DONE, which permits the following step to run in spite of everything upstream duties are full.
  • The dependency of duties is outlined utilizing the chain() operate, which permits for parallel runs of duties if wanted. In our case, we would like all duties to run in sequence.

The next is the entire DAG code. The dag_id ought to match the DAG script title, in any other case it received’t be synced into the Airflow UI.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import job
from airflow.fashions.baseoperator import chain
from airflow.suppliers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.suppliers.amazon.aws.operators.glue import GlueJobOperator
from airflow.suppliers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.suppliers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.suppliers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.suppliers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.suppliers.widespread.sql.operators.sql import SQLExecuteQueryOperator
from airflow.suppliers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.as we speak().strftime("%Y")
vMonth = datetime.as we speak().strftime("%m")
vDay = datetime.as we speak().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="merchandise"
#Please exchange the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:position/<Glue-role>"
glue_crawler_name = "merchandise"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<area>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@job
def get_role_name(arn: str) -> str:
    return arn.cut up("/")[-1]

@job
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/yr=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@as soon as",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Examine for brand spanking new incremental information in S3 supply/enter bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Identify": glue_crawler_name,
        "Position": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to check the Sensor beneath.
    crawl_s3.wait_for_completion = False

    # Watch for Glue crawler to finish
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to check the Sensor beneath.
    submit_glue_job.wait_for_completion = False

    # Watch for Glue Job to finish
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from earlier Glue Job Operator job
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Saved Process in Redshift Serverless utilizing Knowledge Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Saved Process in Redshift Serverless utilizing SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM merchandise;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the info from Redshift desk to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        desk=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the duties to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

Confirm the DAG run

After you create the DAG file (exchange the variables within the DAG script) and add it to the s3://sample-airflow-instance/dags folder, it is going to be mechanically synced with the Airflow UI. All DAGs seem on the DAGs tab. Toggle the ON choice to make the DAG runnable. As a result of our DAG is ready to schedule="@as soon as", it is advisable manually run the job by selecting the run icon underneath Actions. When the DAG is full, the standing is up to date in inexperienced, as proven within the following screenshot.

Within the Hyperlinks part, there are alternatives to view the code, graph, grid, log, and extra. Select Graph to visualise the DAG in a graph format. As proven within the following screenshot, every colour of the node denotes a particular operator, and the colour of the node define denotes a particular standing.

Confirm the outcomes

On the Amazon Redshift console, navigate to the Question Editor v2 and choose the info within the products_f desk. The desk must be loaded and have the identical variety of information as S3 information.

On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f information must be created underneath the folder construction s3://sample-opt-bucket-etl/merchandise/YYYY/MM/DD/.

Clear up

Clear up the sources created as a part of this put up to keep away from incurring ongoing prices:

  1. Delete the CloudFormation stacks and S3 bucket that you just created as stipulations.
  2. Delete the VPCs and VPC peering connections, cross-account insurance policies and roles, and secrets and techniques in Secrets and techniques Supervisor.

Conclusion

With Amazon MWAA, you possibly can construct advanced workflows utilizing Airflow and Python with out managing clusters, nodes, or some other operational overhead usually related to deploying and scaling Airflow in manufacturing. On this put up, we confirmed how Amazon MWAA offers an automatic solution to ingest, remodel, analyze, and distribute information between totally different accounts and providers inside AWS. For extra examples of different AWS operators, seek advice from the next GitHub repository; we encourage you to study extra by making an attempt out a few of these examples.


In regards to the Authors


Radhika Jakkula is a Massive Knowledge Prototyping Options Architect at AWS. She helps clients construct prototypes utilizing AWS analytics providers and purpose-built databases. She is a specialist in assessing big selection of necessities and making use of related AWS providers, huge information instruments, and frameworks to create a strong structure.

Sidhanth Muralidhar is a Principal Technical Account Supervisor at AWS. He works with giant enterprise clients who run their workloads on AWS. He’s captivated with working with clients and serving to them architect workloads for prices, reliability, efficiency, and operational excellence at scale of their cloud journey. He has a eager curiosity in information analytics as effectively.

Latest news
Related news

LEAVE A REPLY

Please enter your comment!
Please enter your name here