AWS Machine Learning Data Engineering Pipeline for Batch Data

This post walks you through all the steps required to build a data engineering pipeline for batch data using AWS Step Functions. The sequence of steps works like so : the ingested data arrives as a CSV file in a S3 based data lake in the landing zone, which automatically triggers a Lambda function to invoke the Step Function. I have assumed that data is being ingested daily in a .csv file with a filename_date.csv naming convention like so customers_20190821.csv. The step function, as the first step, starts a landing to raw zone file transfer operation via a Lambda Function. Then we have an AWS Glue crawler crawl the raw data into an Athena table, which is used as a source for AWS Glue based PySpark transformation script. The transformed data is written in the refined zone in the parquet format. Again an AWS Glue crawler runs to “reflect” this refined data into another Athena table. Finally, the data science team can consume this refined data available in the Athena table, using an AWS Sagemaker based Jupyter notebook instance. It is to be noted that the data science does not need to do any data pull manually, as the data engineering pipeline automatically pulls in the delta data, as per the data refresh schedule that writes new data in the landing zone.

Let’s go through the steps

How to make daily data available to Amazon SageMaker?

What is Amazon SageMaker?

Amazon SageMaker is an end-to-end machine learning (ML) platform that can be leveraged to build, train, and deploy machine learning models in AWS. Using the Amazon SageMaker Notebook module, improves the efficiency of interacting with the data without the latency of bringing it locally.
For deep dive into Amazon SageMaker, please go through the official docs.

In this blog post, I will be using a dummy customers data. The customers data consists of retailer information and units purchased.

Updating Table Definitions with AWS Glue

The data catalog feature of AWS Glue and the inbuilt integration to Amazon S3 simplifies the process of identifying data and deriving the schema definition out of the source data. Glue crawlers within Data catalog, are used to build out the metadata tables of data stored in Amazon S3.

I created a crawler named raw for the data in raw zone (s3://bucketname/data/raw/customers/). In case you are just starting out on AWS Glue crawler, I have explained how to create one from scratch in one of my earlier article. If you run this crawler, it creates customers table in specified database (raw).

Create an invocation Lambda Function

In case you are just starting out on Lambda functions, I have explained how to create one from scratch with an IAM role to access the StepFunctions, Amazon S3, Lambda and CloudWatch in my earlier article.

Add trigger to the created Lambda function named invoke-step-functions. Configure Bucket, Prefix and  Suffix accordingly.

Once file is arrived at landing zone, it triggers the invoke Lambda function which extracts year, month, day from file name that comes from event. It passes year, month, day with two characters from uuid as input to the AWS StepFunctions.Please replace the following code in invoke-step-function Lambda.

import json
import uuid
import boto3
from datetime import datetime

sfn_client = boto3.client('stepfunctions')

stm_arn = 'arn:aws:states:us-west-2:XXXXXXXXXXXX:stateMachine:Datapipeline-for-SageMaker'

def lambda_handler(event, context):
    
    # Extract bucket name and file path from event
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    path = event['Records'][0]['s3']['object']['key']
    
    file_name_date = path.split('/')[2]
    processing_date_str = file_name_date.split('_')[1].replace('.csv', '')
    processing_date = datetime.strptime(processing_date_str, '%Y%m%d')
    
    # Extract year, month, day from date
    year = processing_date.strftime('%Y')
    month = processing_date.strftime('%m')
    day = processing_date.strftime('%d')
    
    uuid_temp = uuid.uuid4().hex[:2]
    execution_name = '{processing_date_str}-{uuid_temp}'.format(processing_date_str=processing_date_str, uuid_temp=uuid_temp)
    
    # Starts the execution of AWS StepFunctions
    response = sfn_client.start_execution(
          stateMachineArn = stm_arn,
          name= str(execution_name),
          input= json.dumps({"year": year, "month": month, "day": day})
      )
    
    return {"year": year, "month": month, "day": day}

Create a Generic FileTransfer Lambda

Create a Lambda function named generic-file-transfer as we created earlier in this article. In the file transfer Lambda function, it transfers files from landing zone to raw zone and landing zone to archive zone based on event coming from the StepFunction.

  1. If step is landing-to-raw-file-transfer, the Lambda function copies files from landing to raw zone.
  2. If step is landing-to-archive-file-transfer, the Lambda function copies files from landing to archive zone and deletes files from landing zone.

Please replace the following code in generic-file-transfer Lambda.

import json
import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    step = event['step']
    year = event['year']
    month = event['month']
    day = event['day']
    
    bucket_name = event['bucket_name']
    source_prefix = event['source_prefix']
    destination_prefix = event['destination_prefix']
    
    bucket = s3.Bucket(bucket_name)
    
    for objects in bucket.objects.filter(Prefix = source_prefix):
        file_path = objects.key
        
        if ('.csv' in file_path) and (step == 'landing-to-raw-file-transfer'):
            
            # Extract filename from file_path
            file_name_date = file_path.split('/')[2]
            file_name = file_name_date.split('_')[0]
            
            # Add filename to the destination prefix
            destination_prefix = '{destination_prefix}{file_name}/year={year}/month={month}/day={day}/'.format(destination_prefix=destination_prefix, file_name=file_name, year=year, month=month, day=day)
            print(destination_prefix)
            
            source_object = {'Bucket': bucket_name, "Key": file_path}
            
            # Replace source prefix with destination prefix
            new_path = file_path.replace(source_prefix, destination_prefix)
            
            # Copies file
            new_object = bucket.Object(new_path)
            new_object.copy(source_object)
         
        if ('.csv' in file_path) and (step == 'landing-to-archive-file-transfer'):
            
            # Add filename to the destination prefix
            destination_prefix = '{destination_prefix}{year}-{month}-{day}/'.format(destination_prefix=destination_prefix, year=year, month=month, day=day)
            print(destination_prefix)
            
            source_object = {'Bucket': bucket_name, "Key": file_path}
            
            # Replace source prefix with destination prefix
            new_path = file_path.replace(source_prefix, destination_prefix)
            
            # Copies file
            new_object = bucket.Object(new_path)
            new_object.copy(source_object)
            
            # Deletes copied file
            bucket.objects.filter(Prefix = file_path).delete()
            
    return {"year": year, "month": month, "day": day}

Generic FileTransfer Lambda function setup is now complete. We need to check all files are copied successfully from one zone to another zone. If you have large files that needs to be copied, you could check out our Lightening fast distributed file transfer architecture.

Create Generic FileTransfer Status Check Lambda Function

Create a Lambda function named generic-file-transfer-status. If the step is landing to raw file transfer, the Lambda function checks if all files are copied from landing to raw zone by comparing the number of objects in landing and raw zones. If count doesn’t match it will raise an exception, and that exception is handled in AWS StepFunctions and retries after some backoff rate. If the count matches, all files are copied successfully. If the step is landing to archive file transfer, the Lambda function checks that any files are left in landing zone. Please replace the following code in generic-file-transfer-status Lambda function.

import json
import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    step = event['step']
    year = event['year']
    month = event['month']
    day = event['day']
    
    bucket_name = event['bucket_name']
    source_prefix = event['source_prefix']
    destination_prefix = event['destination_prefix']
    
    bucket = s3.Bucket(bucket_name)
    
    class LandingToRawFileTransferIncompleteException(Exception):
        pass

    class LandingToArchiveFileTransferIncompleteException(Exception):
        pass
    
    if (step == 'landing-to-raw-file-transfer'):
        if file_transfer_status(bucket, source_prefix, destination_prefix):
            print('File Transfer from Landing to Raw Completed Successfully')
        else:
            raise LandingToRawFileTransferIncompleteException('File Transfer from Landing to Raw not completed')
    
    if (step == 'landing-to-archive-file-transfer'):
        if is_empty(bucket, source_prefix):
            print('File Transfer from Landing to Archive Completed Successfully')
        else:
            raise LandingToArchiveFileTransferIncompleteException('File Transfer from Landing to Archive not completed.')
    
    return {"year": year, "month": month, "day": day}

def file_transfer_status(bucket, source_prefix, destination_prefix):
    
    try:
        
        # Checks number of objects at the source prefix (count of objects at source i.e., landing zone)
        source_object_count = 0
        for obj in bucket.objects.filter(Prefix = source_prefix):
            path = obj.key
            if (".csv" in path):
                source_object_count = source_object_count + 1
        print(source_object_count)
        
        # Checks number of objects at the destination prefix (count of objects at destination i.e., raw zone)
        destination_object_count = 0
        for obj in bucket.objects.filter(Prefix = destination_prefix):
            path = obj.key
            
            if (".csv" in path):
                destination_object_count = destination_object_count + 1
        
        print(destination_object_count)
        return (source_object_count == destination_object_count)

    except Exception as e:
        print(e)
        raise e

def is_empty(bucket, prefix):
    
    try:
        # Checks if any files left in the prefix (i.e., files in landing zone)
        object_count = 0
        for obj in bucket.objects.filter(Prefix = prefix):
            path = obj.key

            if ('.csv' in path):
                object_count = object_count + 1
                    
        print(object_count)
        return (object_count == 0)
        
    except Exception as e:
        print(e)
        raise e

Create a Generic Crawler invocation Lamda

Create a Lambda function named generic-crawler-invoke. The Lambda function invokes a crawler. The crawler name is passed as argument from AWS StepFunctions through event object. Please replace the following code in generic-crawler-invoke Lambda function.

import json
import boto3

glue_client = boto3.client('glue')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    year = event['year']
    month = event['month']
    day = event['day']
    
    crawler_name = event['crawler_name']
    
    try:
        response = glue_client.start_crawler(Name = crawler_name)
    except Exception as e:
        print('Crawler in progress', e)
        raise e
    
    return {"year": year, "month": month, "day": day}

Create a Generic Crawler Status Lambda

Create a Lambda function named generic-crawler-status. The Lambda function checks whether the crawler ran successfully or not. If crawler is in running state, the Lambda function raises an exception and the exception will be handled in the Step Function and retries after a certain backoff rate. Please replace the following code in generic-crawler-status Lambda.

import json
import boto3

glue_client = boto3.client('glue')

def lambda_handler(event, context):
    
    class CrawlerInProgressException(Exception):
        pass
    
    # Extract Parametres from Event (invoked by StepFunctions)
    year = event['year']
    month = event['month']
    day = event['day']
    
    crawler_name = event['crawler_name']
    
    response = glue_client.get_crawler_metrics(CrawlerNameList =[crawler_name])
    print(response['CrawlerMetricsList'][0]['CrawlerName']) 
    print(response['CrawlerMetricsList'][0]['TimeLeftSeconds']) 
    print(response['CrawlerMetricsList'][0]['StillEstimating']) 
    
    if (response['CrawlerMetricsList'][0]['StillEstimating']):
        raise CrawlerInProgressException('Crawler In Progress!')
    elif (response['CrawlerMetricsList'][0]['TimeLeftSeconds'] > 0):
        raise CrawlerInProgressException('Crawler In Progress!')
    
    return {"year": year, "month": month, "day": day}

Create an AWS Glue Job

AWS Glue is a fully managed ETL (extract, transform, and load) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. For deep dive into AWS Glue, please go through the official docs.

Create an AWS Glue Job named raw-refined. In case you are just starting out on AWS Glue Jobs, I have explained how to create one from scratch in my earlier article. This Glue job converts file format from csv to parquet and stores in refined zone. The push down predicate is used as filter condition for reading data of only the processing date using the partitions.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
# args = getResolvedOptions(sys.argv, ['JOB_NAME'])

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'year', 'month', 'day'])

year = args['year']
month = args['month']
day = args['day']

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "raw", table_name = "customers", push_down_predicate ="((year == " + year + ") and (month == " + month + ") and (day == " + day + "))", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("email_id", "string", "email_id", "string"), ("retailer_name", "string", "retailer_name", "string"), ("units_purchased", "long", "units_purchased", "long"), ("purchase_date", "string", "purchase_date", "string"), ("sale_id", "string", "sale_id", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://bucketname/data/refined/customers/", "partitionKeys": ["year","month","day"]}, format = "parquet", transformation_ctx = "datasink4")

job.commit()

Create a Refined Crawler as we created Raw Crawler earlier in this article. Please point the crawler path to refined zone(s3://bucketname/data/refined/customers/) and database as refined. No need to create a Lambda function for refined crawler invocation and status, as we will pass crawler names from the StepFunction.

Resources required to create an the StepFunction have been created.

Creating the AWS StepFunction

StepFunction is where we create and orchestrate steps to process data according to our workflow. Create an AWS StepFunctions named Datapipeline-for-SageMaker.  In case you are just starting out on AWS StepFunctions, I have explained how to create one from scratch here.

Data is being ingested into landing zone. It triggers a Lambda function which in turn invokes the execution of the StepFunction. The steps in the StepFunction are as follows:

  1. Transfers files from landing zone to raw zone.
  2. Checks all files are copied to raw zone successfully or not.
  3. Invokes raw Crawler which crawls data in raw zone and updates/creates definition of table in the specified database.
  4. Checks if the Crawler is completed successfully or not.
  5. Invokes Glue Job and waits for it to complete.
  6. Invokes refined Crawler which crawls data from refined zone in and updates/creates definition of table in the specified database.
  7. Checks if the Crawler is completed successfully or not.
  8. Transfers files from landing zone to archive zone and deletes files from landing zone.
  9. Checks all files are copied and deleted from landing zone successfully.

Please update the StepFunctions definition with the following code.

{
  "Comment": "Datapipeline For MachineLearning in AWS Sagemaker",
  "StartAt": "LandingToRawFileTransfer",
  "States": {
    "LandingToRawFileTransfer": {
      "Comment": "Transfers files from landing zone to Raw zone.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-raw-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToRawFileTransferFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToRawFileTransferFailed"
        }
      ],
      "Next": "LandingToRawFileTransferPassed"
    },
    "LandingToRawFileTransferFailed": {
      "Type": "Fail",
      "Cause": "Landing To Raw File Transfer failed"
    },
    "LandingToRawFileTransferPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToRawFileTransferStatus"
    },
    "LandingToRawFileTransferStatus": {
      "Comment": "Checks whether all files are copied from landing to raw zone successfully.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-raw-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer-status",
      "Retry": [
        {
          "ErrorEquals": [
            "LandingToRawFileTransferInCompleteException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToRawFileTransferStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToRawFileTransferStatusFailed"
        }
      ],
      "Next": "LandingToRawFileTransferStatusPassed"
    },
    "LandingToRawFileTransferStatusFailed": {
      "Type": "Fail",
      "Cause": "Landing To Raw File Transfer failed"
    },
    "LandingToRawFileTransferStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "StartRawCrawler"
    },
    "StartRawCrawler": {
      "Comment": "Crawls data from raw zone and adds table definition to the specified Database. IF table definition exists updates the definition.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "raw",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-invoke",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "StartRawCrawlerFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "StartRawCrawlerFailed"
        }
      ],
      "Next": "StartRawCrawlerPassed"
    },
    "StartRawCrawlerFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "StartRawCrawlerPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "RawCrawlerStatus"
    },
    "RawCrawlerStatus": {
      "Comment": "Checks whether crawler is successfully completed.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "raw",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-status",
      "Retry": [
        {
          "ErrorEquals": [
            "CrawlerInProgressException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "RawCrawlerStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "RawCrawlerStatusFailed"
        }
      ],
      "Next": "RawCrawlerStatusPassed"
    },
    "RawCrawlerStatusFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "RawCrawlerStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "GlueJob"
    },
    "GlueJob": {
      "Comment": "Invokes Glue job and waits for Glue job to complete.",
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "retail-raw-refined",
        "Arguments": {
          "--refined_prefix": "data/refined",
          "--year.$": "$.year",
          "--month.$": "$.month",
          "--day.$": "$.day"
        }
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "GlueJobFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GlueJobFailed"
        }
      ],
      "Next": "GlueJobPassed"
    },
    "GlueJobFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "GlueJobPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.Arguments.--year",
        "month.$": "$.Arguments.--month",
        "day.$": "$.Arguments.--day"
      },
      "Next": "StartRefinedCrawler"
    },
    "StartRefinedCrawler": {
      "Comment": "Crawls data from refined zone and adds table definition to the specified Database.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "refined",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-invoke",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "StartRefinedCrawlerFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "StartRefinedCrawlerFailed"
        }
      ],
      "Next": "StartRefinedCrawlerPassed"
    },
    "StartRefinedCrawlerFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "StartRefinedCrawlerPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "RefinedCrawlerStatus"
    },
    "RefinedCrawlerStatus": {
      "Comment": "Checks whether crawler is successfully completed.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "refined",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-status",
      "Retry": [
        {
          "ErrorEquals": [
            "CrawlerInProgressException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "RefinedCrawlerStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "RefinedCrawlerStatusFailed"
        }
      ],
      "Next": "RefinedCrawlerStatusPassed"
    },
    "RefinedCrawlerStatusFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "RefinedCrawlerStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToArchiveFileTransfer"
    },
    "LandingToArchiveFileTransfer": {
      "Comment": "Transfers files from landing zone to archived zone",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-archive-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToArchiveFileTransferFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToArchiveFileTransferFailed"
        }
      ],
      "Next": "LandingToArchiveFileTransferPassed"
    },
    "LandingToArchiveFileTransferFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "LandingToArchiveFileTransferPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToArchiveFileTransferStatus"
    },
    "LandingToArchiveFileTransferStatus": {
      "Comment": "Checks whether all files are copied from landing to archived successfully.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-archive-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer-status",
      "Retry": [
        {
          "ErrorEquals": [
            "LandingToArchiveFileTransferInCompleteException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToArchiveFileTransferStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToArchiveFileTransferStatusFailed"
        }
      ],
      "Next": "LandingToArchiveFileTransferStatusPassed"
    },
    "LandingToArchiveFileTransferStatusFailed": {
      "Type": "Fail",
      "Cause": "LandingToArchiveFileTransfer invocation failed"
    },
    "LandingToArchiveFileTransferStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "End": true
    }
  }
}

After updating the AWS StepFunctions definition, the visual workflow looks like the following.

Now upload file in data/landing/ zone in the bucket  where the trigger has been configured with the Lambda. The execution of StepFunction has started and the visual workflow looks like the following.

In RawCrawlerStatus step, if the Lambda is failing we retry till sometime and then mark the StepFunction as failed. If the StepFunction ran successfully. The visual workflow of the StepFunction looks like following.

Machine Learning workflow using Amazon SageMaker

The final step in this data pipeline is to make the processed data available in a Jupyter notebook instance of the Amazon SageMaker. Jupyter notebooks are popularly used among data scientists to do exploratory data analysis, build and train machine learning models.

Create Notebook Instance in Amazon SageMaker

Step1: In the Amazon SageMaker console choose Create notebook instance.

Step2: In the Notebook Instance settings populate the Notebook instance name, choose an instance type depends on data size, and a role for the notebook instances in Amazon SageMaker to interact with Amazon S3. The SageMaker execution role needs to have the required permission to Athena, the S3 buckets where the data resides, and KMS if encrypted.

Step3: Wait for the Notebook instances to be created and the Status to change to InService.

Step4: Choose the Open Jupyter, which will open the notebook interface in a new browser tab.

Click new to create a new notebook in Jupyter. Amazon SageMaker provides several kernels for Jupyter including support for Python 2 and 3, MXNet, TensorFlow, and PySpark. Choose Python as the kernel for this exercise as it comes with the Pandas library built in.

Step5: Within the notebook, execute the following commands to install the Athena JDBC driver. PyAthena is a Python DB API 2.0 (PEP 249) compliant client for the Amazon Athena JDBC driver.

import sys
!{sys.executable} -m pip install PyAthena

Step6: After the Athena driver is installed, you can use the JDBC connection to connect to Athena and populate the Pandas data frames. For data scientists, working with data is typically divided into multiple stages: munging and cleaning data, analyzing/ modeling it, then organizing the results of the analysis into a form suitable for plotting or tabular display. Pandas is the ideal tool for all of these tasks.

from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
               region_name='REGION, for example, us-east-1')

df = pd.read_sql("SELECT * FROM <DATABASE>.<TABLENAME> limit 10;", conn)
df

As shown above, the dataframe always stays consistent with the latest incoming data because of the data engineering pipeline setup earlier in the ML workflow. This dataframe can be used for downstream ad-hoc model building purposes or for exploratory data analysis.

That’s it folks. Thanks for the read.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Machine Learning Operations (MLOps) Pipeline using Google Cloud Composer

In an earlier post, we had described the need for automating the Data Engineering pipeline for Machine Learning based systems. Today, we will expand the scope to setup a fully automated MLOps pipeline using Google Cloud Composer.

Cloud Composer

Cloud Composer is official defined as a fully managed workflow orchestration service that empowers you to author, schedule, and monitor pipelines that span across clouds and on-premises data centers. Built on the popular Apache Airflow open source project and operated using the Python programming language, Cloud Composer is free from lock-in and easy to use.

So let’s get on with the required steps to create this MLOps infrastructure on Google Cloud Platform

Creating a Cloud Composer Environment

Step1: Please enable the Cloud Composer API.

Step2: Go to create environment page in GCP console. Composer is available in Big Data section.

Step3: Click on create to start creating a Composer environment

Step4: Please select the Service account which has the required permissions to access GCS, Big Query, ML Engine and  Composer environment. The required roles for accessing Composer environment is Composer Administrator and Composer Worker. 
For more details about access control in Composer environment please see this.

Step5: Please use Python Version 3 and latest Image version.

Step6: Click on create. It will take about 15-20 minutes to create the environment. Once it completes, the environment page shall look like the following.

Click on Airflow to see Airflow WebUI. The Airflow WebUI looks as follows

DAGs folder is where our dag file is stored. DAG folder is nothing but a folder inside a GCS bucket which is created by the environment. To know more about the concept of DAG and general introduction to Airflow, please refer to this post.

You could see Composer related logs in Logging.

Step7: Please add the following PyPI packages in Composer environment.

Click on created environment and navigate to PYPI packages and click on edit to add packages

The required packages are:

# to read data from MongoDB
pymongo==3.8.0
oauth2client==4.1.3
# to read data from firestore
google-cloud-firestore==1.3.0
firebase-admin==2.17.0
google-api-core==1.13.0

Create a ML model

Step1: Please create a folder structure like the following on your instance.

ml_model
├── setup.py
└── trainer
    ├── __init__.py
    └── train.py

Step2: Please place the following code in train.py file, which shall upload the model to GCS bucket as shown below. This model would be used to create model versions as explained a bit later.

from google.cloud import bigquery
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
import numpy as np
from google.cloud import storage
import datetime
import json
import pickle
client = bigquery.Client()
sql = '''
SELECT *
FROM `<PROJECT_ID>.<DATASET>.<TABLENAME>`
'''

df = client.query(sql).to_dataframe()
df = df[['is_stressed', 'is_engaged', 'status']]

df['is_stressed'] = df['is_stressed'].fillna('n')
df['is_engaged'] = df['is_engaged'].fillna('n')
df['stressed'] = np.where(df['is_stressed']=='y', 1, 0)
df['engaged'] = np.where(df['is_engaged']=='y', 1, 0)
df['status'] = np.where(df['status']=='complete', 1, 0)

feature_cols = ['stressed', 'engaged']
X = df[feature_cols]
y = df.status
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.25,random_state=0)
logreg = LogisticRegression()
logreg.fit(X_train,y_train)
pkl_filename = "model.pkl"  
with open(pkl_filename, 'wb') as file:  
    pickle.dump(logreg, file)
BUCKET_NAME=BUCKET_NAME# Upload the model to GCS
bucket = storage.Client().bucket(BUCKET_NAME)
file_path = datetime.datetime.now().strftime('machine_learning/models/%Y%m%d_%H%M%S')
blob = bucket.blob('{}/{}'.format(
    file_path,
    pkl_filename))
blob.upload_from_filename(pkl_filename)

file_location = 'gs://{BUCKET_NAME}/{file_path}'.format(BUCKET_NAME=BUCKET_NAME, file_path=file_path)
file_config = json.dumps({'file_location': file_location})

bucket = storage.Client().bucket(COMPOSER_BUCKET)
blob = bucket.blob('data/file_config.json')
blob.upload_from_string(file_config)

Step3: Create an empty init.py file inside the trainer directory.

Step4: Please place the following code in setup.py file. The setup.py file contains required packages to execute code.

import setuptools

REQUIRED_PACKAGES = [
    'pandas-gbq==0.3.0',
    'cloudml-hypertune',
    'google-cloud-bigquery==1.14.0',
    'urllib3'
]

setuptools.setup(
    name='ml_model',
    version='1.0',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
    description='',
)

Step5: Packaging the code using the following command. It creates a gz file inside ml_model directory.

python3 setup.py sdist

Step6: The package name is the name that is specified in setup.py file. The package name becomes ml_model-1.0.tar.gz
Copy the package to gs://{your-GCS-bucket}/machine_learning/. This becomes the base directory for your machine learning activities described in this post.

Creating a DAG

In this use case, we have created a DAG file which exports some table data from a MongoDB instance into a GCS bucket and then creates a BigQuery table off of that exported data. It trains a model and creates version for that model. The DAG file supports full data extraction and daily data extraction explained in the code below using a variable tot_data. This variable is extracted from Airflow configurations set by the user. This process is also described later in this post.

Please place the following code in the DAG file.

import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
from airflow.operators.python_operator import PythonOperator
import pprint
import json
import re

from pymongo import MongoClient
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import storage
import os

from airflow import models
from mlengine_operator import MLEngineTrainingOperator, MLEngineVersionOperator

ts = datetime.now()
today = str(ts.date()) + 'T00:00:00.000Z'
yester_day = str(ts.date() - timedelta(days = 1)) + 'T00:00:00.000Z'

str_ts = ts.strftime('%Y_%m_%d_%H_%m_%S')

config = Variable.get("mongo_conf", deserialize_json=True)
host = config['host']
db_name = config['db_name']
table_name = config['table_name']
file_prefix = config['file_prefix']
bucket_name = config['bucket_name']
# file_path = file_prefix + '/' + table_name + '.json'
file_path = '{file_prefix}/{table_name}/{table_name}_{str_ts}.json'.format(file_prefix=file_prefix, str_ts=str_ts, table_name=table_name)
file_location = 'gs://' + bucket_name + '/' + file_prefix + '/' + table_name + '/' + table_name + '_*.json'
config['file_location'] = file_location
bq_dataset = config['bq_dataset']
tot_data = config['tot_data'].lower()

BUCKET_NAME = config['ml_configuration']['BUCKET_NAME']
BASE_DIR = config['ml_configuration']['BASE_DIR']
PACKAGE_NAME = config['ml_configuration']['PACKAGE_NAME']
TRAINER_BIN = os.path.join(BASE_DIR, 'packages', PACKAGE_NAME)
TRAINER_MODULE = config['ml_configuration']['TRAINER_MODULE']
RUNTIME_VERSION = config['ml_configuration']['RUNTIME_VERSION']
PROJECT_ID = config['ml_configuration']['PROJECT_ID']
MODEL_NAME = config['ml_configuration']['MODEL_NAME']

MODEL_FILE_BUCKET = config['ml_configuration']['MODEL_FILE_BUCKET']
model_file_loc = config['ml_configuration']['MODEL_FILE_LOCATION']

bucket = storage.Client().bucket(MODEL_FILE_BUCKET)
blob = bucket.get_blob(model_file_loc)
file_config = json.loads(blob.download_as_string().decode("utf-8"))
export_uri = file_config['file_location']

def flatten_json(y):
    out = {}

    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

def mongoexport():
        client = storage.Client()
        bucket = client.get_bucket(bucket_name)
        blob = bucket.blob(file_path)

        client = MongoClient(host)
        db = client[db_name]
        tasks = db[table_name]
        pprint.pprint(tasks.count_documents({}))
        # if tot_data is set to 'yes' in airflow configurations, full data 
        # is processed.  
        if tot_data == 'no':
          query = {"edit_datetime": { "$gte": yester_day, "$lt": today}}
          print(query)
          data = tasks.find(query)
        else:
          data = tasks.find()
        emp_list = []
        for record in data:
                emp_list.append(json.dumps(record, default=str))
        flat_list =[]
        for data in emp_list:
                flat_list.append((flatten_json(json.loads(data))))
        data = '\n'.join(json.dumps({re.sub('[^0-9a-zA-Z_ ]+', '', str(k)).lower().replace(' ', '_'): str(v) for k, v in record.items()}) for record in flat_list)
        blob.upload_from_string(data)

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG('ml_pipeline', schedule_interval=None, default_args=default_args) as dag:

    # priority_weight has type int in Airflow DB, uses the maximum.
    pymongo_export_op = PythonOperator(
        task_id='pymongo_export',
        python_callable=mongoexport,
        )

    update_bq_table_op = BashOperator(
        task_id='update_bq_table',
        bash_command='''
        bq rm -f {bq_dataset}.{table_name}
        bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON --ignore_unknown_values=True {bq_dataset}.{table_name} {file_location}
        '''.format(bq_dataset=bq_dataset, table_name=table_name, file_location=file_location)
        )

    date_nospecial = '{{ execution_date.strftime("%Y%m%d") }}'
    date_min_nospecial = '{{ execution_date.strftime("%Y%m%d_%H%m") }}'
    uuid = '{{ macros.uuid.uuid4().hex[:8] }}'

    training_op = MLEngineTrainingOperator(
      task_id='submit_job_for_training',
      project_id=PROJECT_ID,
      job_id='{}_{}_{}'.format(table_name, date_nospecial, uuid),
      package_uris=[os.path.join(TRAINER_BIN)],
      training_python_module=TRAINER_MODULE,
      training_args=[
          '--base-dir={}'.format(BASE_DIR),
          '--event-date={}'.format(date_nospecial),
      ],
      region='us-central1',
      runtime_version=RUNTIME_VERSION,
      python_version='3.5')

    create_version_op = MLEngineVersionOperator(
      task_id='create_version',
      project_id=PROJECT_ID,
      model_name=MODEL_NAME,
      version={
          'name': 'version_{}_{}'.format(date_min_nospecial, uuid),
          'deploymentUri': export_uri,
          'runtimeVersion': RUNTIME_VERSION,
          'pythonVersion': '3.5',
          'framework': 'SCIKIT_LEARN',
      },
      operation='create')

    pymongo_export_op >> update_bq_table_op >> training_op >> create_version_op

Once file is created, please upload the file to DAGs folder. And also please add the following plugin dependency file named mlengine_operator in DAGs folder.
Place the following code in mlengine_operator.py file.

import re

from apiclient import errors

from airflow.contrib.hooks.gcp_mlengine_hook import MLEngineHook
from airflow.exceptions import AirflowException
from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log


def _normalize_mlengine_job_id(job_id):

    # Add a prefix when a job_id starts with a digit or a template
    match = re.search(r'\d|\{{2}', job_id)
    if match and match.start() is 0:
        job = 'z_{}'.format(job_id)
    else:
        job = job_id

    # Clean up 'bad' characters except templates
    tracker = 0
    cleansed_job_id = ''
    for m in re.finditer(r'\{{2}.+?\}{2}', job):
        cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_',
                                  job[tracker:m.start()])
        cleansed_job_id += job[m.start():m.end()]
        tracker = m.end()

    # Clean up last substring or the full string if no templates
    cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_', job[tracker:])

    return cleansed_job_id


class MLEngineBatchPredictionOperator(BaseOperator):
   
    template_fields = [
        '_project_id',
        '_job_id',
        '_region',
        '_input_paths',
        '_output_path',
        '_model_name',
        '_version_name',
        '_uri',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 job_id,
                 region,
                 data_format,
                 input_paths,
                 output_path,
                 model_name=None,
                 version_name=None,
                 uri=None,
                 max_worker_count=None,
                 runtime_version=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):
        super(MLEngineBatchPredictionOperator, self).__init__(*args, **kwargs)

        self._project_id = project_id
        self._job_id = job_id
        self._region = region
        self._data_format = data_format
        self._input_paths = input_paths
        self._output_path = output_path
        self._model_name = model_name
        self._version_name = version_name
        self._uri = uri
        self._max_worker_count = max_worker_count
        self._runtime_version = runtime_version
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

        if not self._project_id:
            raise AirflowException('Google Cloud project id is required.')
        if not self._job_id:
            raise AirflowException(
                'An unique job id is required for Google MLEngine prediction '
                'job.')

        if self._uri:
            if self._model_name or self._version_name:
                raise AirflowException('Ambiguous model origin: Both uri and '
                                       'model/version name are provided.')

        if self._version_name and not self._model_name:
            raise AirflowException(
                'Missing model: Batch prediction expects '
                'a model name when a version name is provided.')

        if not (self._uri or self._model_name):
            raise AirflowException(
                'Missing model origin: Batch prediction expects a model, '
                'a model & version combination, or a URI to a savedModel.')

    def execute(self, context):
        job_id = _normalize_mlengine_job_id(self._job_id)
        prediction_request = {
            'jobId': job_id,
            'predictionInput': {
                'dataFormat': self._data_format,
                'inputPaths': self._input_paths,
                'outputPath': self._output_path,
                'region': self._region
            }
        }

        if self._uri:
            prediction_request['predictionInput']['uri'] = self._uri
        elif self._model_name:
            origin_name = 'projects/{}/models/{}'.format(
                self._project_id, self._model_name)
            if not self._version_name:
                prediction_request['predictionInput'][
                    'modelName'] = origin_name
            else:
                prediction_request['predictionInput']['versionName'] = \
                    origin_name + '/versions/{}'.format(self._version_name)

        if self._max_worker_count:
            prediction_request['predictionInput'][
                'maxWorkerCount'] = self._max_worker_count

        if self._runtime_version:
            prediction_request['predictionInput'][
                'runtimeVersion'] = self._runtime_version

        hook = MLEngineHook(self._gcp_conn_id, self._delegate_to)

        # Helper method to check if the existing job's prediction input is the
        # same as the request we get here.
        def check_existing_job(existing_job):
            return existing_job.get('predictionInput', None) == \
                prediction_request['predictionInput']

        try:
            finished_prediction_job = hook.create_job(
                self._project_id, prediction_request, check_existing_job)
        except errors.HttpError:
            raise

        if finished_prediction_job['state'] != 'SUCCEEDED':
            self.log.error('MLEngine batch prediction job failed: {}'.format(
                str(finished_prediction_job)))
            raise RuntimeError(finished_prediction_job['errorMessage'])

        return finished_prediction_job['predictionOutput']


class MLEngineModelOperator(BaseOperator):
    template_fields = [
        '_model',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 model,
                 operation='create',
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):
        super(MLEngineModelOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._model = model
        self._operation = operation
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

    def execute(self, context):
        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)
        if self._operation == 'create':
            return hook.create_model(self._project_id, self._model)
        elif self._operation == 'get':
            return hook.get_model(self._project_id, self._model['name'])
        else:
            raise ValueError('Unknown operation: {}'.format(self._operation))


class MLEngineVersionOperator(BaseOperator):
    
    template_fields = [
        '_model_name',
        '_version_name',
        '_version',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 model_name,
                 version_name=None,
                 version=None,
                 operation='create',
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):

        super(MLEngineVersionOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._model_name = model_name
        self._version_name = version_name
        self._version = version or {}
        self._operation = operation
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

    def execute(self, context):
        if 'name' not in self._version:
            self._version['name'] = self._version_name

        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)

        if self._operation == 'create':
            assert self._version is not None
            return hook.create_version(self._project_id, self._model_name,
                                       self._version)
        elif self._operation == 'set_default':
            return hook.set_default_version(self._project_id, self._model_name,
                                            self._version['name'])
        elif self._operation == 'list':
            return hook.list_versions(self._project_id, self._model_name)
        elif self._operation == 'delete':
            return hook.delete_version(self._project_id, self._model_name,
                                       self._version['name'])
        else:
            raise ValueError('Unknown operation: {}'.format(self._operation))


class MLEngineTrainingOperator(BaseOperator):
    
    template_fields = [
        '_project_id',
        '_job_id',
        '_package_uris',
        '_training_python_module',
        '_training_args',
        '_region',
        '_scale_tier',
        '_runtime_version',
        '_python_version',
        '_job_dir'
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 job_id,
                 package_uris,
                 training_python_module,
                 training_args,
                 region,
                 scale_tier=None,
                 runtime_version=None,
                 python_version=None,
                 job_dir=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 mode='PRODUCTION',
                 *args,
                 **kwargs):
        super(MLEngineTrainingOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._job_id = job_id
        self._package_uris = package_uris
        self._training_python_module = training_python_module
        self._training_args = training_args
        self._region = region
        self._scale_tier = scale_tier
        self._runtime_version = runtime_version
        self._python_version = python_version
        self._job_dir = job_dir
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to
        self._mode = mode

        if not self._project_id:
            raise AirflowException('Google Cloud project id is required.')
        if not self._job_id:
            raise AirflowException(
                'An unique job id is required for Google MLEngine training '
                'job.')
        if not package_uris:
            raise AirflowException(
                'At least one python package is required for MLEngine '
                'Training job.')
        if not training_python_module:
            raise AirflowException(
                'Python module name to run after installing required '
                'packages is required.')
        if not self._region:
            raise AirflowException('Google Compute Engine region is required.')

    def execute(self, context):
        job_id = _normalize_mlengine_job_id(self._job_id)
        training_request = {
            'jobId': job_id,
            'trainingInput': {
                'scaleTier': self._scale_tier,
                'packageUris': self._package_uris,
                'pythonModule': self._training_python_module,
                'region': self._region,
                'args': self._training_args,
            }
        }

        if self._runtime_version:
            training_request['trainingInput']['runtimeVersion'] = self._runtime_version

        if self._python_version:
            training_request['trainingInput']['pythonVersion'] = self._python_version

        if self._job_dir:
            training_request['trainingInput']['jobDir'] = self._job_dir

        if self._mode == 'DRY_RUN':
            self.log.info('In dry_run mode.')
            self.log.info('MLEngine Training job request is: {}'.format(
                training_request))
            return

        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)

        # Helper method to check if the existing job's training input is the
        # same as the request we get here.
        def check_existing_job(existing_job):
            return existing_job.get('trainingInput', None) == \
                training_request['trainingInput']

        try:
            finished_training_job = hook.create_job(
                self._project_id, training_request, check_existing_job)
        except errors.HttpError:
            raise

        if finished_training_job['state'] != 'SUCCEEDED':
            self.log.error('MLEngine training job failed: {}'.format(
                str(finished_training_job)))
            raise RuntimeError(finished_training_job['errorMessage'])

Import variables from composer_conf.json file into Airflow Variables.
Go to Airflow WebUI → Admin → Variables → Browse to file path or configure variables manually.
Please place the following in composer_conf

{
  "mongo_conf": {
    "host": "mongodb://<instance-internal-ip>:27017",
    "db_name": "DBNAME",
    "table_name": "TABLENAME",
    "file_prefix": "Folder In GCS Bucket",
    "bq_dataset": "BigQuery Dataset",
    "bucket_name": "GCS Bucket",
    "tot_data": "yes",
    "ml_configuration": {
      "BUCKET_NAME": "GCS Bucket",
      "BASE_DIR": "gs://GCS Bucket/machine_learning/",
      "PACKAGE_NAME": "PACKAGE NAME FROM setup.py FILE in ML",
      "TRAINER_MODULE": "trainer.train",
      "RUNTIME_VERSION": "1.13",
      "PROJECT_ID": "GCP Project",
      "MODEL_FILE_BUCKET": "BUCKET CREATED BY Composer Environment",
      "MODEL_FILE_LOCATION": "data/MODEL LOCATION FILE",
      "MODEL_NAME": "MODEL_NAME"
    }
  }

Please store any configuration files or credentials file that are used by Composer in the data folder in the bucket created by Composer environment.

After configuring variables accordingly, you can see the DAG named ml_pipeline in the Airflow WebUI.

Please trigger the DAG file from Airflow WebUI. Once the DAG ran successfully. It looks like the following:

Thanks for the read and look forward to your comments.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.