AWS Machine Learning Data Engineering Pipeline for Batch Data

This post walks you through all the steps required to build a data engineering pipeline for batch data using AWS Step Functions. The sequence of steps works like so : the ingested data arrives as a CSV file in a S3 based data lake in the landing zone, which automatically triggers a Lambda function to invoke the Step Function. I have assumed that data is being ingested daily in a .csv file with a filename_date.csv naming convention like so customers_20190821.csv. The step function, as the first step, starts a landing to raw zone file transfer operation via a Lambda Function. Then we have an AWS Glue crawler crawl the raw data into an Athena table, which is used as a source for AWS Glue based PySpark transformation script. The transformed data is written in the refined zone in the parquet format. Again an AWS Glue crawler runs to “reflect” this refined data into another Athena table. Finally, the data science team can consume this refined data available in the Athena table, using an AWS Sagemaker based Jupyter notebook instance. It is to be noted that the data science does not need to do any data pull manually, as the data engineering pipeline automatically pulls in the delta data, as per the data refresh schedule that writes new data in the landing zone.

Let’s go through the steps

How to make daily data available to Amazon SageMaker?

What is Amazon SageMaker?

Amazon SageMaker is an end-to-end machine learning (ML) platform that can be leveraged to build, train, and deploy machine learning models in AWS. Using the Amazon SageMaker Notebook module, improves the efficiency of interacting with the data without the latency of bringing it locally.
For deep dive into Amazon SageMaker, please go through the official docs.

In this blog post, I will be using a dummy customers data. The customers data consists of retailer information and units purchased.

Updating Table Definitions with AWS Glue

The data catalog feature of AWS Glue and the inbuilt integration to Amazon S3 simplifies the process of identifying data and deriving the schema definition out of the source data. Glue crawlers within Data catalog, are used to build out the metadata tables of data stored in Amazon S3.

I created a crawler named raw for the data in raw zone (s3://bucketname/data/raw/customers/). In case you are just starting out on AWS Glue crawler, I have explained how to create one from scratch in one of my earlier article. If you run this crawler, it creates customers table in specified database (raw).

Create an invocation Lambda Function

In case you are just starting out on Lambda functions, I have explained how to create one from scratch with an IAM role to access the StepFunctions, Amazon S3, Lambda and CloudWatch in my earlier article.

Add trigger to the created Lambda function named invoke-step-functions. Configure Bucket, Prefix and  Suffix accordingly.

Once file is arrived at landing zone, it triggers the invoke Lambda function which extracts year, month, day from file name that comes from event. It passes year, month, day with two characters from uuid as input to the AWS StepFunctions.Please replace the following code in invoke-step-function Lambda.

import json
import uuid
import boto3
from datetime import datetime

sfn_client = boto3.client('stepfunctions')

stm_arn = 'arn:aws:states:us-west-2:XXXXXXXXXXXX:stateMachine:Datapipeline-for-SageMaker'

def lambda_handler(event, context):
    
    # Extract bucket name and file path from event
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    path = event['Records'][0]['s3']['object']['key']
    
    file_name_date = path.split('/')[2]
    processing_date_str = file_name_date.split('_')[1].replace('.csv', '')
    processing_date = datetime.strptime(processing_date_str, '%Y%m%d')
    
    # Extract year, month, day from date
    year = processing_date.strftime('%Y')
    month = processing_date.strftime('%m')
    day = processing_date.strftime('%d')
    
    uuid_temp = uuid.uuid4().hex[:2]
    execution_name = '{processing_date_str}-{uuid_temp}'.format(processing_date_str=processing_date_str, uuid_temp=uuid_temp)
    
    # Starts the execution of AWS StepFunctions
    response = sfn_client.start_execution(
          stateMachineArn = stm_arn,
          name= str(execution_name),
          input= json.dumps({"year": year, "month": month, "day": day})
      )
    
    return {"year": year, "month": month, "day": day}

Create a Generic FileTransfer Lambda

Create a Lambda function named generic-file-transfer as we created earlier in this article. In the file transfer Lambda function, it transfers files from landing zone to raw zone and landing zone to archive zone based on event coming from the StepFunction.

  1. If step is landing-to-raw-file-transfer, the Lambda function copies files from landing to raw zone.
  2. If step is landing-to-archive-file-transfer, the Lambda function copies files from landing to archive zone and deletes files from landing zone.

Please replace the following code in generic-file-transfer Lambda.

import json
import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    step = event['step']
    year = event['year']
    month = event['month']
    day = event['day']
    
    bucket_name = event['bucket_name']
    source_prefix = event['source_prefix']
    destination_prefix = event['destination_prefix']
    
    bucket = s3.Bucket(bucket_name)
    
    for objects in bucket.objects.filter(Prefix = source_prefix):
        file_path = objects.key
        
        if ('.csv' in file_path) and (step == 'landing-to-raw-file-transfer'):
            
            # Extract filename from file_path
            file_name_date = file_path.split('/')[2]
            file_name = file_name_date.split('_')[0]
            
            # Add filename to the destination prefix
            destination_prefix = '{destination_prefix}{file_name}/year={year}/month={month}/day={day}/'.format(destination_prefix=destination_prefix, file_name=file_name, year=year, month=month, day=day)
            print(destination_prefix)
            
            source_object = {'Bucket': bucket_name, "Key": file_path}
            
            # Replace source prefix with destination prefix
            new_path = file_path.replace(source_prefix, destination_prefix)
            
            # Copies file
            new_object = bucket.Object(new_path)
            new_object.copy(source_object)
         
        if ('.csv' in file_path) and (step == 'landing-to-archive-file-transfer'):
            
            # Add filename to the destination prefix
            destination_prefix = '{destination_prefix}{year}-{month}-{day}/'.format(destination_prefix=destination_prefix, year=year, month=month, day=day)
            print(destination_prefix)
            
            source_object = {'Bucket': bucket_name, "Key": file_path}
            
            # Replace source prefix with destination prefix
            new_path = file_path.replace(source_prefix, destination_prefix)
            
            # Copies file
            new_object = bucket.Object(new_path)
            new_object.copy(source_object)
            
            # Deletes copied file
            bucket.objects.filter(Prefix = file_path).delete()
            
    return {"year": year, "month": month, "day": day}

Generic FileTransfer Lambda function setup is now complete. We need to check all files are copied successfully from one zone to another zone. If you have large files that needs to be copied, you could check out our Lightening fast distributed file transfer architecture.

Create Generic FileTransfer Status Check Lambda Function

Create a Lambda function named generic-file-transfer-status. If the step is landing to raw file transfer, the Lambda function checks if all files are copied from landing to raw zone by comparing the number of objects in landing and raw zones. If count doesn’t match it will raise an exception, and that exception is handled in AWS StepFunctions and retries after some backoff rate. If the count matches, all files are copied successfully. If the step is landing to archive file transfer, the Lambda function checks that any files are left in landing zone. Please replace the following code in generic-file-transfer-status Lambda function.

import json
import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    step = event['step']
    year = event['year']
    month = event['month']
    day = event['day']
    
    bucket_name = event['bucket_name']
    source_prefix = event['source_prefix']
    destination_prefix = event['destination_prefix']
    
    bucket = s3.Bucket(bucket_name)
    
    class LandingToRawFileTransferIncompleteException(Exception):
        pass

    class LandingToArchiveFileTransferIncompleteException(Exception):
        pass
    
    if (step == 'landing-to-raw-file-transfer'):
        if file_transfer_status(bucket, source_prefix, destination_prefix):
            print('File Transfer from Landing to Raw Completed Successfully')
        else:
            raise LandingToRawFileTransferIncompleteException('File Transfer from Landing to Raw not completed')
    
    if (step == 'landing-to-archive-file-transfer'):
        if is_empty(bucket, source_prefix):
            print('File Transfer from Landing to Archive Completed Successfully')
        else:
            raise LandingToArchiveFileTransferIncompleteException('File Transfer from Landing to Archive not completed.')
    
    return {"year": year, "month": month, "day": day}

def file_transfer_status(bucket, source_prefix, destination_prefix):
    
    try:
        
        # Checks number of objects at the source prefix (count of objects at source i.e., landing zone)
        source_object_count = 0
        for obj in bucket.objects.filter(Prefix = source_prefix):
            path = obj.key
            if (".csv" in path):
                source_object_count = source_object_count + 1
        print(source_object_count)
        
        # Checks number of objects at the destination prefix (count of objects at destination i.e., raw zone)
        destination_object_count = 0
        for obj in bucket.objects.filter(Prefix = destination_prefix):
            path = obj.key
            
            if (".csv" in path):
                destination_object_count = destination_object_count + 1
        
        print(destination_object_count)
        return (source_object_count == destination_object_count)

    except Exception as e:
        print(e)
        raise e

def is_empty(bucket, prefix):
    
    try:
        # Checks if any files left in the prefix (i.e., files in landing zone)
        object_count = 0
        for obj in bucket.objects.filter(Prefix = prefix):
            path = obj.key

            if ('.csv' in path):
                object_count = object_count + 1
                    
        print(object_count)
        return (object_count == 0)
        
    except Exception as e:
        print(e)
        raise e

Create a Generic Crawler invocation Lamda

Create a Lambda function named generic-crawler-invoke. The Lambda function invokes a crawler. The crawler name is passed as argument from AWS StepFunctions through event object. Please replace the following code in generic-crawler-invoke Lambda function.

import json
import boto3

glue_client = boto3.client('glue')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    year = event['year']
    month = event['month']
    day = event['day']
    
    crawler_name = event['crawler_name']
    
    try:
        response = glue_client.start_crawler(Name = crawler_name)
    except Exception as e:
        print('Crawler in progress', e)
        raise e
    
    return {"year": year, "month": month, "day": day}

Create a Generic Crawler Status Lambda

Create a Lambda function named generic-crawler-status. The Lambda function checks whether the crawler ran successfully or not. If crawler is in running state, the Lambda function raises an exception and the exception will be handled in the Step Function and retries after a certain backoff rate. Please replace the following code in generic-crawler-status Lambda.

import json
import boto3

glue_client = boto3.client('glue')

def lambda_handler(event, context):
    
    class CrawlerInProgressException(Exception):
        pass
    
    # Extract Parametres from Event (invoked by StepFunctions)
    year = event['year']
    month = event['month']
    day = event['day']
    
    crawler_name = event['crawler_name']
    
    response = glue_client.get_crawler_metrics(CrawlerNameList =[crawler_name])
    print(response['CrawlerMetricsList'][0]['CrawlerName']) 
    print(response['CrawlerMetricsList'][0]['TimeLeftSeconds']) 
    print(response['CrawlerMetricsList'][0]['StillEstimating']) 
    
    if (response['CrawlerMetricsList'][0]['StillEstimating']):
        raise CrawlerInProgressException('Crawler In Progress!')
    elif (response['CrawlerMetricsList'][0]['TimeLeftSeconds'] > 0):
        raise CrawlerInProgressException('Crawler In Progress!')
    
    return {"year": year, "month": month, "day": day}

Create an AWS Glue Job

AWS Glue is a fully managed ETL (extract, transform, and load) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. For deep dive into AWS Glue, please go through the official docs.

Create an AWS Glue Job named raw-refined. In case you are just starting out on AWS Glue Jobs, I have explained how to create one from scratch in my earlier article. This Glue job converts file format from csv to parquet and stores in refined zone. The push down predicate is used as filter condition for reading data of only the processing date using the partitions.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
# args = getResolvedOptions(sys.argv, ['JOB_NAME'])

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'year', 'month', 'day'])

year = args['year']
month = args['month']
day = args['day']

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "raw", table_name = "customers", push_down_predicate ="((year == " + year + ") and (month == " + month + ") and (day == " + day + "))", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("email_id", "string", "email_id", "string"), ("retailer_name", "string", "retailer_name", "string"), ("units_purchased", "long", "units_purchased", "long"), ("purchase_date", "string", "purchase_date", "string"), ("sale_id", "string", "sale_id", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://bucketname/data/refined/customers/", "partitionKeys": ["year","month","day"]}, format = "parquet", transformation_ctx = "datasink4")

job.commit()

Create a Refined Crawler as we created Raw Crawler earlier in this article. Please point the crawler path to refined zone(s3://bucketname/data/refined/customers/) and database as refined. No need to create a Lambda function for refined crawler invocation and status, as we will pass crawler names from the StepFunction.

Resources required to create an the StepFunction have been created.

Creating the AWS StepFunction

StepFunction is where we create and orchestrate steps to process data according to our workflow. Create an AWS StepFunctions named Datapipeline-for-SageMaker.  In case you are just starting out on AWS StepFunctions, I have explained how to create one from scratch here.

Data is being ingested into landing zone. It triggers a Lambda function which in turn invokes the execution of the StepFunction. The steps in the StepFunction are as follows:

  1. Transfers files from landing zone to raw zone.
  2. Checks all files are copied to raw zone successfully or not.
  3. Invokes raw Crawler which crawls data in raw zone and updates/creates definition of table in the specified database.
  4. Checks if the Crawler is completed successfully or not.
  5. Invokes Glue Job and waits for it to complete.
  6. Invokes refined Crawler which crawls data from refined zone in and updates/creates definition of table in the specified database.
  7. Checks if the Crawler is completed successfully or not.
  8. Transfers files from landing zone to archive zone and deletes files from landing zone.
  9. Checks all files are copied and deleted from landing zone successfully.

Please update the StepFunctions definition with the following code.

{
  "Comment": "Datapipeline For MachineLearning in AWS Sagemaker",
  "StartAt": "LandingToRawFileTransfer",
  "States": {
    "LandingToRawFileTransfer": {
      "Comment": "Transfers files from landing zone to Raw zone.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-raw-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToRawFileTransferFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToRawFileTransferFailed"
        }
      ],
      "Next": "LandingToRawFileTransferPassed"
    },
    "LandingToRawFileTransferFailed": {
      "Type": "Fail",
      "Cause": "Landing To Raw File Transfer failed"
    },
    "LandingToRawFileTransferPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToRawFileTransferStatus"
    },
    "LandingToRawFileTransferStatus": {
      "Comment": "Checks whether all files are copied from landing to raw zone successfully.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-raw-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer-status",
      "Retry": [
        {
          "ErrorEquals": [
            "LandingToRawFileTransferInCompleteException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToRawFileTransferStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToRawFileTransferStatusFailed"
        }
      ],
      "Next": "LandingToRawFileTransferStatusPassed"
    },
    "LandingToRawFileTransferStatusFailed": {
      "Type": "Fail",
      "Cause": "Landing To Raw File Transfer failed"
    },
    "LandingToRawFileTransferStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "StartRawCrawler"
    },
    "StartRawCrawler": {
      "Comment": "Crawls data from raw zone and adds table definition to the specified Database. IF table definition exists updates the definition.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "raw",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-invoke",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "StartRawCrawlerFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "StartRawCrawlerFailed"
        }
      ],
      "Next": "StartRawCrawlerPassed"
    },
    "StartRawCrawlerFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "StartRawCrawlerPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "RawCrawlerStatus"
    },
    "RawCrawlerStatus": {
      "Comment": "Checks whether crawler is successfully completed.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "raw",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-status",
      "Retry": [
        {
          "ErrorEquals": [
            "CrawlerInProgressException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "RawCrawlerStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "RawCrawlerStatusFailed"
        }
      ],
      "Next": "RawCrawlerStatusPassed"
    },
    "RawCrawlerStatusFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "RawCrawlerStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "GlueJob"
    },
    "GlueJob": {
      "Comment": "Invokes Glue job and waits for Glue job to complete.",
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "retail-raw-refined",
        "Arguments": {
          "--refined_prefix": "data/refined",
          "--year.$": "$.year",
          "--month.$": "$.month",
          "--day.$": "$.day"
        }
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "GlueJobFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GlueJobFailed"
        }
      ],
      "Next": "GlueJobPassed"
    },
    "GlueJobFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "GlueJobPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.Arguments.--year",
        "month.$": "$.Arguments.--month",
        "day.$": "$.Arguments.--day"
      },
      "Next": "StartRefinedCrawler"
    },
    "StartRefinedCrawler": {
      "Comment": "Crawls data from refined zone and adds table definition to the specified Database.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "refined",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-invoke",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "StartRefinedCrawlerFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "StartRefinedCrawlerFailed"
        }
      ],
      "Next": "StartRefinedCrawlerPassed"
    },
    "StartRefinedCrawlerFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "StartRefinedCrawlerPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "RefinedCrawlerStatus"
    },
    "RefinedCrawlerStatus": {
      "Comment": "Checks whether crawler is successfully completed.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "refined",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-status",
      "Retry": [
        {
          "ErrorEquals": [
            "CrawlerInProgressException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "RefinedCrawlerStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "RefinedCrawlerStatusFailed"
        }
      ],
      "Next": "RefinedCrawlerStatusPassed"
    },
    "RefinedCrawlerStatusFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "RefinedCrawlerStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToArchiveFileTransfer"
    },
    "LandingToArchiveFileTransfer": {
      "Comment": "Transfers files from landing zone to archived zone",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-archive-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToArchiveFileTransferFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToArchiveFileTransferFailed"
        }
      ],
      "Next": "LandingToArchiveFileTransferPassed"
    },
    "LandingToArchiveFileTransferFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "LandingToArchiveFileTransferPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToArchiveFileTransferStatus"
    },
    "LandingToArchiveFileTransferStatus": {
      "Comment": "Checks whether all files are copied from landing to archived successfully.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-archive-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer-status",
      "Retry": [
        {
          "ErrorEquals": [
            "LandingToArchiveFileTransferInCompleteException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToArchiveFileTransferStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToArchiveFileTransferStatusFailed"
        }
      ],
      "Next": "LandingToArchiveFileTransferStatusPassed"
    },
    "LandingToArchiveFileTransferStatusFailed": {
      "Type": "Fail",
      "Cause": "LandingToArchiveFileTransfer invocation failed"
    },
    "LandingToArchiveFileTransferStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "End": true
    }
  }
}

After updating the AWS StepFunctions definition, the visual workflow looks like the following.

Now upload file in data/landing/ zone in the bucket  where the trigger has been configured with the Lambda. The execution of StepFunction has started and the visual workflow looks like the following.

In RawCrawlerStatus step, if the Lambda is failing we retry till sometime and then mark the StepFunction as failed. If the StepFunction ran successfully. The visual workflow of the StepFunction looks like following.

Machine Learning workflow using Amazon SageMaker

The final step in this data pipeline is to make the processed data available in a Jupyter notebook instance of the Amazon SageMaker. Jupyter notebooks are popularly used among data scientists to do exploratory data analysis, build and train machine learning models.

Create Notebook Instance in Amazon SageMaker

Step1: In the Amazon SageMaker console choose Create notebook instance.

Step2: In the Notebook Instance settings populate the Notebook instance name, choose an instance type depends on data size, and a role for the notebook instances in Amazon SageMaker to interact with Amazon S3. The SageMaker execution role needs to have the required permission to Athena, the S3 buckets where the data resides, and KMS if encrypted.

Step3: Wait for the Notebook instances to be created and the Status to change to InService.

Step4: Choose the Open Jupyter, which will open the notebook interface in a new browser tab.

Click new to create a new notebook in Jupyter. Amazon SageMaker provides several kernels for Jupyter including support for Python 2 and 3, MXNet, TensorFlow, and PySpark. Choose Python as the kernel for this exercise as it comes with the Pandas library built in.

Step5: Within the notebook, execute the following commands to install the Athena JDBC driver. PyAthena is a Python DB API 2.0 (PEP 249) compliant client for the Amazon Athena JDBC driver.

import sys
!{sys.executable} -m pip install PyAthena

Step6: After the Athena driver is installed, you can use the JDBC connection to connect to Athena and populate the Pandas data frames. For data scientists, working with data is typically divided into multiple stages: munging and cleaning data, analyzing/ modeling it, then organizing the results of the analysis into a form suitable for plotting or tabular display. Pandas is the ideal tool for all of these tasks.

from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
               region_name='REGION, for example, us-east-1')

df = pd.read_sql("SELECT * FROM <DATABASE>.<TABLENAME> limit 10;", conn)
df

As shown above, the dataframe always stays consistent with the latest incoming data because of the data engineering pipeline setup earlier in the ML workflow. This dataframe can be used for downstream ad-hoc model building purposes or for exploratory data analysis.

That’s it folks. Thanks for the read.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Processing Kinesis Data Streams with Spark Streaming


Solution Overview : In this blog, we are going to build a real time anomaly detection solution using Spark Streaming. Kinesis Data Streams would act as the input streaming source and the anomalous records would be written as Data Streams in DynamoDB.

Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. KDS can continuously capture gigabytes of data per second from hundreds of thousands of sources such as website clickstreams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events.

Data Streams

The unit of data stored by Kinesis Data Streams is a data record. A data stream represents a group of data records.

For deep dive into Kinesis Data Streams, please go through these official docs.

Kinesis Data Streams Producers

A producer puts data records into Amazon Kinesis Data Streams. For example, a web server sending log data to a Kinesis Data Stream is a producer.

For more details about Kinesis Data Streams Producers, please go through these official docs.

Kinesis Data Streams Consumers

A consumer, known as an Amazon Kinesis Data Streams application, is an application that you build to read and process data records from Kinesis Data Streams.

For more details about Kinesis Data Streams Consumers, please go through these official docs.


Creating a Kinesis Data Stream

Step1. Go to Amazon Kinesis console -> click on Create Data Stream

Step2. Give Kinesis Stream Name and Number of shards as per volume of the incoming data. In this case, Kinesis stream name as kinesis-stream and number of shards are 1.

Shards in Kinesis Data Streams

A shard is a uniquely identified sequence of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity.

For more about shards, please go through these official docs.

Step3. Click on Create Kinesis Stream

Kinesis Data Streams can be connected with Kinesis Data Firehoseto write the streamsinto S3.


Configure Kinesis Data Streams with Kinesis Data Producers

The Amazon Kinesis Data Generator (KDG) makes it easy to send data to Kinesis Streams or Kinesis Firehose.

While following this link, choose to Create a Cognito User with Cloud Formation.

After selecting the above option, we will navigate to the Cloud Formation console:

Click on Next and provide Username and Password for Cognito User for Kinesis Data Generator.

Click on Next and Create Stack.

CloudFormation Stack is created.

Click on Outputs tab and open the link

After opening the link, enter the usernameand password of Cognito user.

After Sign In is completed, select the RegionStream and configure the number of records per second. Choose record template as your requirement.

In this case, the template data format is

{{name.firstName}},{{random.number({“min”:10, “max”:550})}},{{random.arrayElement([“OK”,”FAIL”,”WARN”] )}}

The template data looks like the following

You can send different types of dummy data to Kinesis Data Streams.

Kinesis Data Streams with Kinesis Data Producers are ready. Now we shall build a Spark Streaming application which consumes data streams from Kinesis Data Streams and dumps the output streams into DynamoDB.


Create DynamoDB Tables To Store Data Frame

Go to Amazon DynamoDB console -> Choose Create Table and name the table, in this case, data_dump

In the same way, create another table named anomaly_data. Make sure Kinesis Data streams and DynamoDb tables are in the same region.

Spark Streaming with Kinesis Data Streams

Spark Streaming

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window.

For deep dive into Spark Streams, please go through docs.

In this case, the Scala programming language is used. Scala version is 2.11.12. Please install scala, sbt and spark.

Create a folder structure like the following

Kinesis-spark-streams-dynamo
| -- src/main/scala/packagename/object
| -- build.sbt
| -- project/assembly.sbt

In this case, the structure looks like the following

After creating the folder structure,

Please replace build.sbt file with the following code. The following code will add the required dependencies like spark, spark kinesis assembly, spark streaming and many more.

name := "kinesis-spark-streams-dynamo"

version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies += "com.audienceproject" %% "spark-dynamodb" % "0.4.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3"
libraryDependencies += "com.google.guava" % "guava" % "14.0.1"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.466"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.3"

assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}

Please replace assembly.sbt file with the following code. This will add the assembly plugin which can be used for creating the jar.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")

Please replace kinesis-spark-streams-dynamo file with the following code.

package com.wisdatum.kinesisspark

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.spark._
import org.apache.spark.streaming._
import com.amazonaws.services.kinesis.AmazonKinesis
import scala.collection.JavaConverters._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import org.apache.log4j.{Level, Logger}
import com.audienceproject.spark.dynamodb.implicits._

object KinesisSparkStreamsDynamo {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
.asScala
.find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
.map(_.getName)
.getOrElse(
throw new IllegalArgumentException(s"Could not resolve region for endpoint: $endpoint"))
}

def main(args: Array[String]) {

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

val conf = new SparkConf().setAppName("KinesisSparkExample").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
println("Launching")
val Array(appName, streamName, endpointUrl, dynamoDbTableName) = args
println(streamName)
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()

require(credentials != null,
"No AWS credentials found. Please specify credentials using one of the methods specified " +
"in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
println("numShards are " + numShards)

val numStreams = numShards

val batchInterval = Milliseconds(100)

val kinesisCheckpointInterval = batchInterval

val regionName = getRegionNameByEndpoint(endpointUrl)

val anomalyDynamoTable = "data_anomaly"

println("regionName is " + regionName)

val kinesisStreams = (0 until numStreams).map { i =>
KinesisInputDStream.builder
.streamingContext(ssc)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointAppName(appName)
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
}

val unionStreams = ssc.union(kinesisStreams)

val inputStreamData = unionStreams.map { byteArray =>
val Array(sensorId, temp, status) = new String(byteArray).split(",")
StreamData(sensorId, temp.toInt, status)
}

val inputStream: DStream[StreamData] = inputStreamData

inputStream.window(Seconds(20)).foreachRDD { rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._

val inputStreamDataDF = rdd.toDF()
inputStreamDataDF.createOrReplaceTempView("hot_sensors")

val dataDumpDF = spark.sql("SELECT * FROM hot_sensors ORDER BY currentTemp DESC")
dataDumpDF.show(2)
dataDumpDF.write.dynamodb(dynamoDbTableName)

val anomalyDf = spark.sql("SELECT * FROM hot_sensors WHERE currentTemp > 100 ORDER BY currentTemp DESC")
anomalyDf.write.dynamodb(anomalyDynamoTable)
}

// To make sure data is not deleted by the time we query it interactively
ssc.remember(Minutes(1))

ssc.start()
ssc.awaitTermination()
}
}
case class StreamData(id: String, currentTemp: Int, status: String)

appName: The application name that will be used to checkpoint the Kinesis sequence numbers in the DynamoDB table.

  1. The application name must be unique for a given account and region.
  2. If the table exists but has incorrect checkpoint information (for a different stream, or old expired sequenced numbers), then there may be temporary errors.

kinesisCheckpointInterval
The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.

endpointURL:
Valid Kinesis endpoints URL can be found here.

For more details about building KinesisInputDStream, please go through the documentation.

Configure AWS Credentials using environment variables or using aws configure command.

Make sure all the resources are under the same account and region. Region of CloudFormation Stack that was created is in us-west-2 even though all the resources are in another region, this would not affect the process.


Building Executable Jar

  • Open Terminal -> Go to project root directory, in this case 
    kinesis-spark-streams-dynamo
  • Run sbt assembly

The jar has been packaged into project root directory/target/scala-2.11/XXXX.jar. Name of the jar is the name that provided in build.sbt file.

Run the Jar using spark-submit

  • Open Terminal -> Go to Spark bin directory
  • Run the following command, and it looks like
./bin/spark-submit ~/Desktop/kinesis-spark-streams-dynamo/target/scala-2.11/kinesis-spark-streams-dynamo-assembly-0.1.jar appName streamName endpointUrl dynamoDbTable

To know more about how to submit applications using spark-submit, please review this.

Arguments that are passed are highlighted in the above highlighted blue box. Place the arguments as needed.

Read Kinesis Data Streams in Spark Streams

  1. Go to Amazon Kinesis Data Generator-> Sign In using Cognito user
  2. Click on Send Data, it starts sending data to Kinesis Data Streams

Data would be sent to Kinesis Data Stream, in this case, kinesis-stream, it looks like this.

Monitoring Kinesis Data Streams

Go to Amazon Kinesis Console -> Choose Data streams -> Select created Data Stream -> click on Monitoring

The terminal looks like the following when it starts receiving the data from Kinesis Data Streams

The data_dump table has the whole data that is coming from Kinesis Data Streams. And the data in the data_dump table looks like

The data_anomaly table has data where currentTemp is greater than 100. Here the anomaly is temperature greater than 100. And the data in the data_anomaly table looks like

I hope this article was helpful in setting up Kinesis Data Streams that are consumed and processed using Spark Streaming and stored in DynamoDB.

This story is authored by P V Subbareddy. He is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

Linear regression using Apache Spark MLlib

What is linear Regression?

Wikipedia states – In statistics, linear regression is a linear approach to modeling the relationship between  dependent variable and one or more  independent variables.

Linear regression is a basic and commonly used type of predictive analysis.

Back to school math, every straight line can be represented by the equation: y = mx + b, where y is dependent variable and X is the independent variable on which y depends. 

How can we use regression for a real life use case?! Let’s take an example – what if I have data of past 5-10 years on the quantity of wheat produced annually. With Linear regression, I will be able to predict what the wheat production would be this year or a year from now.

Why is prediction so important? It helps us plan and be prepared.  It helps us deal with unforeseen situations. In the context of above example, a country will know the quantity of wheat it can export/import in a year.  It means, if global demand seems much lower than the quantity we foresee producing,  we can help farmers choose some other crop, since less demand means a bare minimum selling rate.

Some additional use cases:

  1. A consistent increase in Blood pressure and sugar levels, Is the patient heading towards heart attack?
  2. A distinctive seismic activity, are we in for a tsunami /earth quake?
  3. Inward migration of birds increasing on yearly basis. Are a certain species of trees responsible for alluring birds?
  4. Will a certain steel stock move up or down this year?

These are all some basic uses cases of Linear regression model. We call it regression, because we will be predicting continuous values (as opposed to a yes or no result). Graphically, a linear equation (with one dependent and one independent variable) looks similar to this

So, for every X , we can derive the value of Y from the equation.

What happens if Y is dependent on , not just one independent variable (X) but few more variables.  The graph above will have not just X,Y axis but Z axis too to represent the second independent variable. More than two independent variables are hard to depict graphically. But, can be represented quite easily with equation as

y=   β + β1×1 + β2×2….

where β 1,β2…βn are coefficients of X1, X2,X3

β  is the y-intercept

What this implies for our wheat production example is:

y(total wheat produced in a year) =  β +  β1*Total available land in acres + β2 * rainfall received for that year +  β3 * fertiliser availability in the market….. Here, rainfall received, fertiliser availability are the added independent variables apart from the size of land in acres.

Where does Machine Learning come into picture?

In the above equation – β1, β2, β3…are all coefficients whose value we need to compute to form the linear equation.  This is where learning happens. Based on past data, ML learns the values of these coefficients through a number of iterations.

How?

To start with, we feed in data of past few years to ML packages, this is called training data, since it helps train the ML model. Through a large chunk of data and a number of iterations, the model learns the values of each of the coefficients.

We can then evaluate the model, ML packages offers a lot of functions and related parameters to know how the model has performed. Based on these values we decide if the model has learnt “enough” and then use this on the data we need to predict for (test data).

What is the error people talk about in ML?

If all data points (Y1,Y2,Y3….) form a perfect linear line as shown above, we can derive an exact output (prediction) we are looking. But, in the real world, this is not the case. The data points do not exactly form a line.  They are a bit scattered on the graph. So what do we do?  We make a line in such a way that its at a least possible distance from the points as shown below:

this is where root mean square error or any other error reduction method is used. So, we use the chosen method, and come out with a line (which best depicts the given data points)as output. Based on this line we predict the outcome, this is where our final values come from.

Let’s look at a very basic example of linear regression in PySpark

I have downloaded the dataset from  https://www.kaggle.com/leonbora/analytics-vidhya-loan-prediction

  1. creating a Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('lr_example').getOrCreate()

2.  Importing Linear Regression packages

from pyspark.ml.regression import LinearRegression

3.  We will read the input data and see its structure

data = spark.read.csv("train.csv",inferSchema=True,header=True)
data.printSchema()
#output looks similar to this
root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)

Machine Learning packages expect only numerical inputs and cannot accept strings. A lot of packages are present to help us transform our data to suit ML requirements. This, we will look at this aspect in another post. As of now, we will just consider two numerical fields from above data schema( ‘ApplicantIncome, CoapplicantIncome’) and we will try to predict ‘LoanAmount’ with these inputs.

4. All our input data needs to be in the form of Vectors, an input to ML packages.  So, lets get this sorted first:

from pyspark.ml.feature import (VectorAssembler, VectorIndexer)
#lets define what inputs will go into our vector and give a name for the output of it
lassembler = VectorAssembler(
    inputCols=['ApplicantIncome','CoapplicantIncome'],
    outputCol='features')

5. We will now transform our data to ML standard input

output = lassembler.transform(data)

If you have downloaded the same set, running the above command will output something similar to:

Error is pretty clear, while transforming data ML has encountered nulls in the data and is asking for a clarification of what needs to be done. We have two options, one, clean the data of null values and feed it back or tell ML packages to skip the null values. We will take the second route by adding an extra line in step 5

output = lassembler.setHandleInvalid("skip").transform(data)

Now, the code executes.  Let’s see what’s there in the “features” output

output.select("features").show()
#output looks similar to below
+-----------------+
|         features|
+-----------------+
|  [4583.0,1508.0]|
|     [3000.0,0.0]|
|  [2583.0,2358.0]|
|     [6000.0,0.0]|
|  [5417.0,4196.0]|
|  [2333.0,1516.0]|
|  [3036.0,2504.0]|
|  [4006.0,1526.0]|
|[12841.0,10968.0]|
|   [3200.0,700.0]|
|  [2500.0,1840.0]|
|  [3073.0,8106.0]|
|  [1853.0,2840.0]|
|  [1299.0,1086.0]|
|     [4950.0,0.0]|
|     [3596.0,0.0]|
|     [3510.0,0.0]|
|     [4887.0,0.0]|
|  [2600.0,3500.0]|
|     [7660.0,0.0]|
+-----------------+
only showing top 20 rows

6. Let’s now feed this input Vector and our prediction value(Y), which is ‘LoanAmount’

loan_data = output.select('features','LoanAmount')
loan_data.show()
#and Output is
+-----------------+----------+
|         features|LoanAmount|
+-----------------+----------+
|  [4583.0,1508.0]|       128|
|     [3000.0,0.0]|        66|
|  [2583.0,2358.0]|       120|
|     [6000.0,0.0]|       141|
|  [5417.0,4196.0]|       267|
|  [2333.0,1516.0]|        95|
|  [3036.0,2504.0]|       158|
|  [4006.0,1526.0]|       168|
|[12841.0,10968.0]|       349|
|   [3200.0,700.0]|        70|
|  [2500.0,1840.0]|       109|
|  [3073.0,8106.0]|       200|
|  [1853.0,2840.0]|       114|
|  [1299.0,1086.0]|        17|
|     [4950.0,0.0]|       125|
|     [3596.0,0.0]|       100|
|     [3510.0,0.0]|        76|
|     [4887.0,0.0]|       133|
|  [2600.0,3500.0]|       115|
|     [7660.0,0.0]|       104|
+-----------------+----------+
only showing top 20 rows

7. Standard practice is to divide the test data into 2 parts- train the ML model with the first and let it predict the values on second set, so we can crosscheck the performance.

#splitting train and test data into 70% and 30% of total data available
train_data,test_data = loan_data.randomSplit([0.7,0.3])

#finally, calling the linear regression package we imported
lr = LinearRegression(labelCol='LoanAmount')
#we are specifying our 'Y' by explicitly mentioned it with 'labelCol'

#fitting the model to train data set
lrModel = lr.fit(train_data)

When you run the above, you should get an output like below

The error says – “Params must be either a param map…”. Reason for this error is our dependent variable (LoanAmount) has null values and ML cannot fit a model which as null as output values.  There are lot of ways to clean this kind of data, we will not consider null values in our example.  Let’s filter out null ‘LoanAmount’ values when we read the data from csv itself like so:

data = spark.read.csv("train.csv",inferSchema=True,header=True)
#we will add a filter to remove the null values from our dependent variable
data = data.filter("LoanAmount is not NULL")
data.printSchema()

Repeat the steps above and the error will go away. So, our Linear regression Model is ready now.

8. Let’s check the residuals (residual = observed value(the value in our input) – predicted value (value predicted by the model) of Y).  Each data point will have one residual.  So, number of residuals will be equal to the number of records we fed as input.

test_results = lrModel.evaluate(test_data)
test_results.residuals.show()
#output looks similar to below
+--------------------+
|           residuals|
+--------------------+
|5.684341886080801...|
|-1.42108547152020...|
|-4.26325641456060...|
|-1.42108547152020...|
|-1.42108547152020...|
|-2.84217094304040...|
|-1.42108547152020...|
|-1.42108547152020...|
|-1.42108547152020...|
|-1.42108547152020...|
|                 0.0|
|-1.42108547152020...|
|-2.13162820728030...|
|-1.42108547152020...|
|-2.84217094304040...|
|                 0.0|
|2.842170943040400...|
|-1.42108547152020...|
|-1.42108547152020...|
|-3.55271367880050...|
+--------------------+
only showing top 20 rows

9. Voila, we can now use this model to predict output of our test data

predictions_data = test_data.select('features')
predictions = lrModel.transform(predictions_data)
predictions.show()
#output looks similar to below
+----------------+------------------+
|        features|        prediction|
+----------------+------------------+
|  [150.0,1800.0]| 102.2878796393209|
| [1299.0,1086.0]|104.99451369589991|
| [1378.0,1881.0]|113.43100472341747|
| [1500.0,1800.0]|113.66768427384854|
|[1600.0,20000.0]|292.40273753359924|
| [1782.0,2232.0]|120.26729293510716|
| [1800.0,1213.0]|110.45902065483665|
| [1820.0,1719.0]|115.57340183734365|
| [1820.0,1769.0]|116.06211641088294|
|[1836.0,33837.0]| 429.6389670546782|
|    [1880.0,0.0]| 99.27716389393049|
| [1907.0,2365.0]|122.62095931502981|
| [1926.0,1851.0]|117.75713371242067|
| [2031.0,1632.0]|116.50165979633736|
| [2130.0,6666.0]|166.53996206680586|
| [2132.0,1591.0]|116.95229182239609|
| [2137.0,8980.0]| 189.2166789246058|
|    [2221.0,0.0]|102.15161824976303|
|    [2237.0,0.0]|102.28649000839447|
| [2253.0,2033.0]|122.29249632713373|
+----------------+------------------+
only showing top 20 rows

That’s it folks.