Machine Learning based Fuzzy Matching using AWS Glue ML Transforms

Machine Learning Transforms in AWS Glue

Machine Learning Transforms in AWS Glue

AWS Glue provides machine learning capabilities to create custom transforms to do Machine Learning based fuzzy matching to deduplicate and cleanse your data. For this we are going to use a transform named FindMatches. The FindMatches transform enables you to identify duplicate or matching records in your dataset, even when the records do not have a common unique identifier and no fields match exactly. This will not require writing any code or knowing how machine learning works. For more details about ML Transforms, please go through the docs.

Creating a Machine Learning Transform with AWS Glue

This article walks you through the actions to create and manage a machine learning (ML) transform using AWS Glue. I assume that you are familiar with using the AWS Glue console to add crawlers and jobs and edit scripts. You should also be familiar with finding and downloading files on the Amazon Simple Storage Service (Amazon S3) console.

In case you are just starting out on AWS Glue, I have explained how to create an AWS Glue Crawler and Glue Job from scratch in one of my earlier articles.
The source data used in this blog is a hypothetical file named customers_data.csv. A second file, label_file.csv, is an example of a labeling file that contains both matching and nonmatching records used to teach the transform.

Step 1: Crawl the Data using AWS Glue Crawler

At the outset, crawl the source data from the CSV file in S3 to create a metadata table in the AWS Glue Data Catalog. I created a crawler pointing to the source location (s3://bucketname/data/ml-transform/customers/).

In case you are just starting out on the AWS Glue crawler, I have explained how to create one from scratch in one of my earlier articles. If you run this crawler, it creates a customers table in the specified database (ml-transform).

Step 2: Add a Machine Learning Transform

Next, add a machine learning transform that is based on the schema of your data source table created by the above crawler.

  • On the AWS Glue console, in the navigation pane, choose ML Transforms, Add transform.
    1. For transform name, enter ml-transform. This is the name of the transform that is used to find matches in the source data.
    2. Choose an IAM role that has permission to access Amazon S3 and AWS Glue API operations.

Choose Worker type and Maximum capacity as per the requirements.
3. For Data source, choose the table that was created in the earlier step. In this, the table named customers in database ml-transform.
4. For Primary key, choose the primary key column for the table, email.

  • Choose Finish.

Step 3: How to Teach Your Machine Learning Transform

Next, teach the machine learning transform using the sample labeling file.
You can’t use a machine language transform in an extract, transform, and load (ETL) job until its status is Ready for use. To get your transform ready, you must teach it how to identify matching and non-matching records by providing examples of matching and non-matching records. To teach your transform, you can Generate a label file, add labels, and then Upload label file.

For this article, the label file I have used is label_file.csv

  • On the AWS Glue console, in the navigation pane, choose ML Transforms.
  • Choose the earlier created transform, and then choose Action, Teach.
  • If you don’t have the label file, choose I do not have labels, you can Generate a label file, add labels, and then Upload label file.

If you have the label file, choose I have labels, then choose Upload labelling file from S3.
Choose an Amazon S3 path to the sample labeling file in the current AWS Region. (s3://bucketname/data/ml-transform/labels/label_file.csv) with the option to overwrite existing labels. The labeling file must be located in S3 in the same Region as the AWS Glue console.

When you upload a labeling file, a task is started in AWS Glue to add or overwrite the labels used to teach the transform how to process the data source.

  • Choose Finish, and return to the ML transforms list.

Step 4: Estimate the Quality of ML Transform

What is Labeling?

The act of labeling is creating a labeling file (such as in a spreadsheet) and adding identifiers, or labels, into the label column that identifies matching and non-matching records. It is important to have a clear and consistent definition of a match in your source data. AWS Glue learns from which records you designate as matches (or not) and uses your decisions to learn how to find duplicate records.

Next, you can estimate the quality of your machine learning transform. The quality depends on how much labeling you have done.

  • On the AWS Glue console, in the navigation pane, choose ML Transforms.
  • Choose the earlier created transform, and choose the Estimate quality tab. This tab displays the current quality estimates, if available, for the transform.
  • Choose Estimate quality to start a task to estimate the quality of the transform. The accuracy of the quality estimate is based on the labeling of the source data.
  • Navigate to the History tab. In this pane, task runs are listed for the transform, including the Estimating quality task. For more details about the run, choose Logs. Check that the run status is Succeeded when it finishes.

Step 5: Create and Run a Job with ML Transform

In this step, we use your machine learning transform to add and run a job in AWS Glue. When the transform is Ready for use, we can use it in an ETL job.

On the AWS Glue console, in the navigation pane, choose Jobs.

Choose Add job.

In case you are just starting out on AWS Glue ETL Job, I have explained how to create one from scratch in one of my earlier articles.

  • For Name, choose the example job in this tutorial, ml-transform.
  • Choose an IAM role that has permission to access Amazon S3 and AWS Glue API operations.
  • For ETL language, choose Spark 2.2, Python 2. Machine learning transforms are currently not supported for Spark 2.4.
  • For Data source, choose the table created in Step 1. The data source you choose must match the machine learning transform data source schema.
  • For Transform type, choose to Find matching records to create a job using a machine learning transform.
  • For Transform, choose transform created in step 2, the machine learning transform used by the job.
  • For Create tables in your data target, choose to create tables with the following properties.
    • Data store type — Amazon S3
    • Format — CSV
    • Compression type — None
    • Target path — The Amazon S3 path where the output of the job is written (in the current console AWS Region)

Choose Save job and edit script to display the script editor page. The script looks like the following. After you edit the script, choose Save.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglueml.transforms import FindMatches

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "ml_transforms", table_name = "customers", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "ml_transforms", table_name = "customers", transformation_ctx = "datasource0")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "ml_transforms", table_name = "customers", transformation_ctx = "resolvechoice1"]
## @return: resolvechoice1
## @inputs: [frame = datasource0]
resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "MATCH_CATALOG", database = "ml_transforms", table_name = "customers", transformation_ctx = "resolvechoice1")
## @type: FindMatches
## @args: [transformId = "eacb9a1ffbc686f61387f63", emitFusion = false, survivorComparisonField = "<primary_id>", transformation_ctx = "findmatches2"]
## @return: findmatches2
## @inputs: [frame = resolvechoice1]
findmatches2 = FindMatches.apply(frame = resolvechoice1, transformId = "eacb9a1ffbc686f61387f63", transformation_ctx = "findmatches2")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://bucket-name/data/ml-transforms/output/"}, format = "csv", transformation_ctx = "datasink3"]
## @return: datasink3
## @inputs: [frame = findmatches2]
datasink3 = glueContext.write_dynamic_frame.from_options(frame = findmatches2, connection_type = "s3", connection_options = {"path": "s3:/<bucket-name>/data/ml-transforms/output/"}, format = "csv", transformation_ctx = "datasink3")
job.commit()

Choose Run job to start the job run. Check the status of the job in the jobs list. When the job finishes, in the ML transform, History tab, there is a new Run ID row added of type ETL job. 

Navigate to the Jobs, History tab. In this pane, job runs are listed. For more details about the run, choose Logs. Check that the run status is Succeeded when it finishes.

Step 6: Verify Output Data from Amazon S3 in Amazon Athena

In this step, check the output of the job run in the Amazon S3 bucket that you chose when you added the job. You can create a table in the Glue Data catalog pointing to the output location, just like the way we crawled the source data in Step 1. You can then query the data in Athena.

However, the Find matches transform adds another column named match_id to identify matching records in the output. Rows with the same match_id are considered matching records.

If you don’t find any matches, you can continue to teach the transform by adding more labels.

Thanks for the read and look forward to your comments

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

Creating a Chatbot for Healthcare in React Native using Dialogflow

In this blog, we shall learn how to build an AI virtual assistant or a Chatbot using React Native and Dialogflow API.

Why are chatbots important?
A chatbot is a piece of software that helps in conducting a conversation through voice based or textual methods. Chatbots offer companies new opportunities to improve the customer engagement process and operational efficiency by reducing the typical cost of customer service.

Image result for dialogflow

What is Dialogflow?
Dialogflow (previously known as API.AI) is a Natural Language Processing (NLP) platform which can be greatly helpful to build conversational applications for a company’s customers in various languages and also across multiple platforms. Dialogflow enables developers to create text-based and voice conversation interfaces for responding to customer queries in different languages.

Why Dialogflow?
There are different chatbot SDK’s like Dialogflow, Amazon Lex, IBM Watson, Microsoft Bot Framework etc. The reasons to why we chose to use Dialogflow are:

  1. Dialogflow supports multiple platforms.
  2. Dialogflow supports all the devices like wearables, phones and other devices.
  3. Dialogflow also supports multiple languages.

How Dialogflow works?
In Dialogflow, the typical flow of any conversation involves these steps:

  1. The user providing an input.
  2. Dialogflow agent parsing that input based on the intent.
  3. Agent returning a response to the user.

Setting up Dialogflow account:

Navigate to console in the official website. After navigating to console you will be prompted to sign in with Google, go ahead and sign-in. After successfully signing in you can see a dashboard.

Before we dive into the platform and start building the bot/agent, let us learn about the terms used in Dialogflow.

After signing in, you could see a Create Agent tab. An agent is nothing but the bot that you would like to create. Give a name of your choice and click on the Create button. After creating successfully you could see multiple tabs on the left side of the screen like:

  1. Intents
  2. Entities
  3. Fulfillment etc

Intents:
An Intent is a specific action that the user can invoke by using one of the defined terms in the Dialogflow console. 

For example, the user could ask “What’s the time?” or “What is today’s date?” if these terms are defined within the console, then they will be detected by Dialogflow and intents that are defined under will get triggered.

You can create an intent by clicking on create intent as shown below.

You shall see some default intents already available. We can create the new intents here.

Entities:
An Entity is a property which can be used by Dialogflow to answer the request from the user. The entity will usually be a keyword within the request such as a name, date, time etc. 

Dialogflow has a rich set of predefined entities and also has an option that enables the developer to define custom entities as well.

Fulfillment:
When the user provides the input, Dialogflow needs to process the user input which might contain entities as well. Hence Dialogflow needs to request the information from web-hook so as to fulfill the users request. The input provided by the user along with entities is then sent to the web-hook so that the required information can be retrieved. Once the Dialogflow receives the information from web-hook it sends the response back to the user in the desired manner.

For example, if the user wants to know about weather conditions, a web-hook could be used to get info about weather and pass it on to the user.

Response:
It is the content which Dialogflow sends back to the user once the user’s query is processed.

Creating a ChatBot for Health care:

Now that we have learnt about some basic terms of Dialogflow, let us start building a chatbot (in this case Healthbot) which helps the user (patient) to schedule an appointment with a specific doctor in an organization.

Let’s go ahead and create an agent first. Here we are creating an agent with the name HealthBot.

After clicking the create button, the HealthBot agent would be created. It would look like below.

You could see some default intents there. We can create our own intents here. So, let’s move forward and start creating the intents.

The intent we will be creating here is “Schedule an Appointment”.

Save the intent after creating. In the Training Phrases section, we can add our own training phrases to train the agent.

When we add a particular training phrase , Dialogflow would look for predefined entities in the phrase, if found it will highlight them as shown.

Add few other relative training phrases and click on save.

Next in the Action and Parameters section we can make the @sys.person, @sys.date, @sys.time as required by checking on the Required checkbox. We can also define the prompts for the required fields so that if the user does not provide any one of them the defined prompt will be shown up asking the user to provide the required parameters.

The prompts for the entities could be defined by clicking define prompts under Prompts. Below are the prompts for the respective entities.

Next we have to add the response in the Response section.

After receiving all the required parameters from the user , we can phrase a response like shown.

Now we have to create a front end app using React Native which would communicate with the HealthBot agent.

Let’s go to React Native Docs, select React Native CLI Quickstart and select the appropriate development OS and the target OS as Android, as we are going to build an android application. 

Follow the docs for installing dependencies, then create a new react native application. Use the command line interface to create a new react native project.

react-native init <project-name>

By using the below commands you can run the app on android device. You could see the default welcome page. 

cd <project-name>
Npm install
React-native run-android

Note:  If you face an issue like “Failed to install the app. Make sure you have the Android development environment set up”, just traverse to <project-name>/android folder and create a file named local.properties and add the Android SDK path in it as shown here.

sdk.dir = Your Android SDK Path

We also need to install some dependencies using below command.

npm install react-native-gifted-chat
react-native-dialogflow -save

We are using react-native-gifted-chat package as it provides a customizable and complete chat UI interface.

We are also using react-native-dialogflow so that we can bridge our app with Google Dialogflow’s SDK. 

For our app to communicate with Dialogflow agent, we need to configure few things. For that create any .js file in your project root folder (in this env.js).
We need to configure few values in env.js file.

To get the values click on the Service Account link as shown in the image.
You can get this by clicking on the gear icon present beside the agent name on the left side of the screen.

After clicking the link , you would be shown a table called Service accounts for project “<Agent Name>”. Click on Actions and select create key option from there. A prompt will appear asking to choose an option. Select JSON and click on create. A json file would be downloaded. Just copy the contents of the json file and add it in env.js.

Your env.js file would look like below.

env.js

export const dialogflowConfig = {
  "type": "service_account",
  "project_id": "Health-bot",
  "private_key_id": "xxxx",
  "private_key": "-----BEGIN PRIVATE KEY-----\n xxxx\n-----END PRIVATE KEY-----\n",
  "client_email": "xxxx",
  "client_id": "xxxx",
  "auth_uri": "xxxx",
  "token_uri": "xxxx",
  "auth_provider_x509_cert_url": "xxxx",
  "client_x509_cert_url": "xxxx"
}

Now go to <project-name> directory and open App.js. Modify the content of App.js as below.

App.js

import React, { Component } from 'react';
import {View} from 'react-native';
import { GiftedChat } from 'react-native-gifted-chat';
import { Dialogflow_V2 } from 'react-native-dialogflow';
import { dialogflowConfig } from './env';

const BOT_USER = {
  _id: 2,
  name: 'Health Bot',
  avatar: 'https://previews.123rf.com/images/iulika1/iulika11909/iulika1190900021/129697389-medical-worker-health-professional-avatar-medical-staff-doctor-icon-isolated-on-white-background-vec.jpg'
};
class App extends Component {

  state = {
    messages: [
      {
        _id: 1,
        text: 'Hi! I am the Healthbot 🤖.\n\nHow may I help you today?',
        createdAt: new Date(),
        user: BOT_USER
      }
    ]
  };

  componentDidMount() {
    Dialogflow_V2.setConfiguration(
      dialogflowConfig.client_email,
      dialogflowConfig.private_key,
      Dialogflow_V2.LANG_ENGLISH_US,
      dialogflowConfig.project_id
    );
  }

  onSend(messages = []) {
    this.setState(previousState => ({
      messages: GiftedChat.append(previousState.messages, messages)
    }));

    let message = messages[0].text;
    Dialogflow_V2.requestQuery(
      message,
      result => this.handleGoogleResponse(result),
      error => console.log(error)
    );
  }

  handleGoogleResponse(result) {
    let text = result.queryResult.fulfillmentMessages[0].text.text[0];
    this.sendBotResponse(text);
  }

  sendBotResponse(text) {
    let msg = {
      _id: this.state.messages.length + 1,
      text,
      createdAt: new Date(),
      user: BOT_USER
    };

    this.setState(previousState => ({
      messages: GiftedChat.append(previousState.messages, [msg])
    }));
  }

  render() {
    return (
      <View style={{ flex: 1, backgroundColor: '#fff' }}>
        <GiftedChat
          messages={this.state.messages}
          onSend={messages => this.onSend(messages)}
          user={{
            _id: 1
          }}
        />
      </View>
    );
  }
}
export default App;

When the App.js file renders, the first thing it renders is componentDidMount() where we set the configuration of Dialogflow as given below.


componentDidMount() {
    Dialogflow_V2.setConfiguration(
      dialogflowConfig.client_email,
      dialogflowConfig.private_key,
      Dialogflow_V2.LANG_ENGLISH_US,
      dialogflowConfig.project_id
    );
  }

When you click on send , it will trigger the onSend() method where the user message gets stored in the state variable and we will send a request to Dialogflow using Dialogflow_V2.requestQuery. If the response is successful, handleGoogleResponse() method gets triggered.

onSend(messages = []) {
    this.setState(previousState => ({
      messages: GiftedChat.append(previousState.messages, messages)
    }));

    let message = messages[0].text;
    Dialogflow_V2.requestQuery(
      message,
      result => this.handleGoogleResponse(result),
      error => console.log(error)
    );
  }

handleGoogleResponse() will get the text from the response and triggers sendBotResponse() method where it will set the state to response as shown below

handleGoogleResponse(result) {
    let text = result.queryResult.fulfillmentMessages[0].text.text[0];
    this.sendBotResponse(text);
  }

  sendBotResponse(text) {
      let msg = {
        _id: this.state.messages.length + 1,
        text,
        createdAt: new Date(),
        user: BOT_USER
      };

      this.setState(previousState => ({
        messages: GiftedChat.append(previousState.messages, [msg])
      }));
    }

Below are the images of the app running on an Android device.

That’s it folks, we hope it was fun and useful.

This story is authored by Dheeraj Kumar and Santosh Kumar. Dheeraj is a software engineer specializing in React Native and React based frontend development. Santosh specializes on Cloud Services based development.

Processing High Volume Big Data Concurrently with No Duplicates using AWS SQS

In this blog post, we’ll be looking at how one could leverage AWS Simple Queue Service (Standard queue) to achieve high concurrency while processing with no duplicates. Also we compare it with other AWS services like DynamoDB, SQS FIFO queue and Kinesis in terms of cost and performance.

A simple use case for the below architecture could be building an end-end messaging service, or sending out transactional emails. In both the above use cases, a highly concurrent processing with no duplicates is needed.

Using AWS SQS with Lambda to process Big data concurrently with no duplicates

We have a Lambda function that writes messages to the Standard queue in batches. This writer function is invoked when a file is posted to S3. While there are messages in the queue, Lambda polls the queue, reads messages in batches and invokes the Processor function synchronously with an event that contains queue messages. The processing function is invoked once for each batch. When the function successfully processes a batch, Lambda deletes its messages from the queue. If at all the function fails processing(raise error) the batch is put back in the queue. Now, the Standard queue is configured with redrive policy to move messages to a Dead Letter Queue (DLQ) when receive request reaches the Maximum receive count(MRC). We set the MRC to 1 to ensure deduplication.

Setting up Standard Queue with Dead Letter Queue

We need two queues one for processing, second for moving failed messages into it. First create the failed_messages queue. As it is needed while creating the message processing queue. Create a new queue, give it a name (failed_messages), select type as Standard and choose Configure Queue

According to the needs, set the queue attributes like visibility timeout, message retention period etc.

For processing messages, Create a new queue, give it a name, select type as standard and choose Configure Queue.

Set the Default Visibility Timeout to 5min and Dead Letter Queue Settings to setup the redrive policy to move failed messages into failed_messages queue created earlier.

From the SQS homepage, select processing queue, and select Redrive Policy, If setup correctly you should see the ARN of failed_messages queue there.

Creating the Writer and Processor lambda functions:

Writer.py

# Write batch messages to queue
import csv
import boto3

s3 = boto3.resource('s3')

# Update this dummy URL
processing_queue_url = "https://sqs.us-west-2.amazonaws.com/85XXXXXXX205/ToBeProcessed"

def lambda_handler(event, context):
    try:
        if 'Records' in event:
            bucket_name = event['Records'][0]['s3']['bucket']['name']        
            key = event['Records'][0]['s3']['object']['key']
            bucket = s3.Bucket(bucket_name)
            obj = bucket.Object(key=key)

            # get the object
            response = obj.get()['Body'].read().decode('utf-8').split('\n')
            resp = list(response)
            if resp[-1] == '':
                #removing header metadata and extra newline
                total_records = len(resp) - 2 
            else:
                #removing header metadata
                total_records = len(resp) - 1 
            print("total record count is :", total_records)

            batch_size = 0
            record_count = 0
            messages = []

            # Write to SQS
            for row in csv.DictReader(response):
                record_count += 1
                record = {}
                for k,v in row.items():
                    record[k] = v

                # Replace below with appropriate column with all values as unique
                unique_id = record['ANY_COLUMN_WITH_ALL_VALUES_UNIQUE']
                
                batch_size += 1
                messages.append(
                {
                    'Id': unique_id,
                    'MessageBody': json.dumps(record)
                })
                   
                if (batch_size == 10):
                    batch_size = 0
                    try:
                        response = sqs.send_message_batch(
                            QueueUrl = processing_queue_url,
                            Entries = messages
                        )
                        print("response:", response)
                        if 'Failed' in response:
                            print('failed_count:', len(response['Failed']))
                    except Exception as e:
                        print("error:",e)
                    messages = []
                
                # Handling last batch
                if(record_count == total_records):
                    print("batch size is :", batch_size)
                    batch_size = 0
                    try:
                        response = sqs.send_message_batch(
                            QueueUrl = processing_queue_url,
                            Entries = messages
                        )
                        print("response:", response)
                        if 'Failed' in response:
                            print('failed count is :', len(response['Failed']))
                    except Exception as e:
                        print("error:",e)
                    messages = []    
        
        print('record count is :', record_count)

    except Exception as e:
        return e

Processor.py

# Process queue messages

def handler(event, context):
    if 'Records' in event:
        try:
            messages = event['Records']
            for message in messages:
                print("message to be processed :", message)
                
                result = message['body']
                result = json.loads(result)

                print("result:",result)
            return {
                'statusCode': 200,
                'body': 'All messages processed successfully'
            }

        except Exception as e:
            print(e)
            return str(e)

Setting up S3 as trigger to Writer lambda

Setting up SQS trigger to processor Lambda

If set up properly, you should be able to view it in Lambda Triggers section from the SQS homepage like this.

The setup is done. To test this upload a .csv file to the S3 location.

SQS Standard Queue in comparison with FIFO queue

FIFO queue in SQS supports deduplication in two ways:

  1. Content based deduplication while writing to SQS.
  2. Processing one record/batch at a time. 

Unlike Standard Queue, FIFO doesn’t support concurrency and lambda invocation. On top of all this there is a limit to how many messages you could write to FIFO queue in a second. FIFO queues are much suited when the order of processing is important.

Cost analysis:
First 1 million Amazon SQS requests are free each month.

TypeCost per 1 million requests
Standard Queue$0.40
FIFO Queue$0.50

More on SQS pricing here.

SQS Standard Queue in comparison with DynamoDB

DynamoDB streams are slow when compared SQS, and costs on various aspects like:

  1. Data Storage
  2. Writes
  3. Reads
  4. Provisioned throughput
  5. Reserved capacity
  6. Indexed data storage
  7. Streams and many more.

In a nutshell, DynamoDB’s monthly cost is dictated by data storage, writes and reads. The best use cases for DynamoDB are those that require a flexible data model, reliable performance, and the automatic scaling of throughput capacity.

SQS Standard Queue in comparison with Kinesis

Kinesis primary use case is collecting, storing and processing real-time continuous data streams. Kinesis is designed for large scale data ingestion and processing, with the ability to maximise write throughput for large volumes of data.

While a message queue makes it easy to decouple and scale micro-services, distributed systems, and serverless applications. Using a queue, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be always available. In a nutshell, Serverless applications are built using micro services, message queue serves as a reliable plumbing.

Drawbacks of Kinesis:

  1. Shard management
  2. Limited Read Throughput

For a much detailed comparison of SQS and Kinesis visit here.

Thanks for the read, I hope it was helpful.

This story is authored by Koushik. Koushik is a software engineer and a keen data science and machine learning enthusiast.

Text Detection in React Native App using AWS Rekognition

In this story, we are going to build an app for detecting text in an image using Amazon Rekognition in React Native.

You shall learn how to build a mobile application in React Native, which talks to AWS API Gateway. This API endpoint is configured with a lambda that stores the sent image in S3 and detects the text using AWS Rekognition and sends back the response.

Installing dependencies:

Let’s go to React Native Docs, select React Native CLI Quickstart and select our Development OS and Target OS -> Android, as we are going to build an android application.

Follow the docs for installing dependencies, after installing create a new React Native Application. Use the command line interface to generate a new React Native project called text-detection.

react-native init text-detection

Preparing the Android device:

We shall need an Android device to run our React Native Android app. This can be either a physical Android device, or more commonly, we can use an Android Virtual Device (AVD) which allows us to emulate an Android device on our computer (using Android Studio).

Either way, we shall need to prepare the device to run Android apps for development. If you have a physical Android device, you can use it for development in place of an AVD by connecting it to your computer using a USB cable and following the instructions here.

If you are using a virtual device follow this link. I shall be using physical android device.
Now go to command line and run react-native run-android inside your React Native app directory:

cd text-detection
react-native run-android

If everything is set up correctly, you should see your new app running in your physical device or Android emulator.

API Creation in AWS Console:

Before going further, create an API in your AWS console following this link. Once you are done with creating API come back to the React Native application. Now, go to your project directory and Replace your App.js file with the following code.
Now, go to your project directory and Replace your App.js file with the following code.

import React, {Component} from 'react';
import { StyleSheet, View, Text, TextInput, Image, ScrollView, TouchableHighlight } from 'react-native';
import ImagePicker from "react-native-image-picker";
import Amplify, {API} from "aws-amplify";
Amplify.configure({
    API: {
        endpoints: [
            {
                name: <Your API name>,
                Endpoint: <Your end-point url>
            }
        ]
    }
});

class Registration extends Component {
  
    constructor(props){
        super(props);
        this.state =  {
            imageName : '',
            capturedImage : '',
            detectedText: []
        };
    }

    captureImageButtonHandler = () => {
        ImagePicker.showImagePicker({title: "Pick an Image", maxWidth: 800, maxHeight: 600}, (response) => {
            console.log('Response - ', response);
            alert(response)
            if (response.didCancel) {
                console.log('User cancelled image picker');
            } else if (response.error) {
                console.log('ImagePicker Error: ', response.error);
            } else if (response.customButton) {
                console.log('User tapped custom button: ', response.customButton);
            } else {
                // You can also display the image using data:
                const source = { uri: 'data:image/jpeg;base64,' + response.data };
            
                this.setState({capturedImage: response.uri, base64String: source.uri });
            }
        });
    }

    submitButtonHandler = () => {
        if (this.state.imageName == '' || this.state.imageName == undefined || this.state.imageName == null) {
            alert("Please Enter the image name");
        } else if (this.state.capturedImage == '' || this.state.capturedImage == undefined || this.state.capturedImage == null) {
            alert("Please Capture the Image");
        } else {
            console.log("submiting")
            const apiName = "faceRekognition";
            const path = "/detecttext";
            const init = {
                headers: {
                    'Accept': 'application/json',
                    "Content-Type": "application/x-amz-json-1.1"
                },
                body: JSON.stringify({
                    Image: this.state.base64String,
                    name: this.state.imageName
                })
            }

            API.post(apiName, path, init).then(response => {
                console.log("Response Data is : " + JSON.stringify(response));

                if (JSON.stringify(response.TextDetections.length) > 0) {

                    this.setState({
                        detectedText: response.TextDetections
                    })
                    
                } else {
                    alert("Please Try Again.")
                }
            });
        }
    }
    
  
    render() {
        console.log(this.state.detectedText)
        var texts = this.state.detectedText.map(text => {
            return <Text style={{textAlign: 'center'}}>{text.DetectedText}</Text>
        })
        
        return (
            <View>
                <ScrollView>
                    <Text style= {{ fontSize: 20, color: "#000", textAlign: 'center', marginBottom: 15, marginTop: 10 }}>Text Image</Text>
                
                    <TextInput
                        placeholder="file name"
                        onChangeText={imageName => this.setState({imageName: imageName})}
                        underlineColorAndroid='transparent'
                        style={styles.TextInputStyleClass}
                    />

                    {this.state.capturedImage !== "" && <View style={styles.imageholder} >
                        <Image source={{uri : this.state.capturedImage}} style={styles.previewImage} />
                    </View>}
                    <View>
<br/>
                        {texts}
                    </View>
                    <TouchableHighlight style={[styles.buttonContainer, styles.captureButton]} onPress={this.captureImageButtonHandler}>
                        <Text style={styles.buttonText}>Capture Image</Text>
                    </TouchableHighlight>

                    <TouchableHighlight style={[styles.buttonContainer, styles.submitButton]} onPress={this.submitButtonHandler}>
                        <Text style={styles.buttonText}>Submit</Text>
                    </TouchableHighlight>
                    
                </ScrollView>
            </View>
        );
    }
}

const styles = StyleSheet.create({
    TextInputStyleClass: {
      textAlign: 'center',
      marginBottom: 7,
      height: 40,
      borderWidth: 1,
      margin: 10,
      borderColor: '#D0D0D0',
      borderRadius: 5 ,
    },
    inputContainer: {
      borderBottomColor: '#F5FCFF',
      backgroundColor: '#FFFFFF',
      borderRadius:30,
      borderBottomWidth: 1,
      width:300,
      height:45,
      marginBottom:20,
      flexDirection: 'row',
      alignItems:'center'
    },
    buttonContainer: {
      height:45,
      flexDirection: 'row',
      alignItems: 'center',
      justifyContent: 'center',
    //   marginBottom:20,
      width:"80%",
      borderRadius:30,
    //   marginTop: 20,
      margin: 20,
    },
    captureButton: {
      backgroundColor: "#337ab7",
      width: 350,
    },
    buttonText: {
      color: 'white',
      fontWeight: 'bold',
    },
    horizontal: {
      flexDirection: 'row',
      justifyContent: 'space-around',
      padding: 10
    },
    submitButton: {
      backgroundColor: "#C0C0C0",
      width: 350,
      marginTop: 5,
    },
    imageholder: {
      borderWidth: 1,
      borderColor: "grey",
      backgroundColor: "#eee",
      width: "50%",
      height: 150,
      marginTop: 10,
      marginLeft: 90,
      flexDirection: 'row',
      alignItems:'center'
    },
    previewImage: {
      width: "100%",
      height: "100%",
    }
});

export default Registration;

In the above code, we are configuring amplify with the API name and Endpoint URL that you created as shown below.

Amplify.configure({
 API: {
   endpoints: [
     {
       name: '<Your-API-Name>, 
       endpoint:'<Endpoint-URL>',
     },
   ],
 },
});

By clicking the capture button it will trigger the captureImageButtonHandler function. It will then ask the user to take a picture or select from file system. When user captures the image or selects from file system, we will store that image in the state as shown below.

captureImageButtonHandler = () => {
   this.setState({
     objectName: '',
   });
 
   ImagePicker.showImagePicker(
     {title: 'Pick an Image', maxWidth: 800, maxHeight: 600},
     response => {
       console.log('Response = ', response);
       if (response.didCancel) {
         console.log('User cancelled image picker');
       } else if (response.error) {
         console.log('ImagePicker Error: ', response.error);
       } else if (response.customButton) {
         console.log('User tapped custom button: ', response.customButton);
       } else {
         // You can also display the image using data:
         const source = {uri: 'data:image/jpeg;base64,' + response.data};
         this.setState({
           capturedImage: response.uri,
           base64String: source.uri,
         });
       }
     },
   );
 };

After capturing the image we will preview that image. By Clicking on submit button, submitButtonHandler function will get triggered where we will send the image to the end point as shown below.

submitButtonHandler = () => {
        if (this.state.imageName == '' || this.state.imageName == undefined || this.state.imageName == null) {
            alert("Please Enter the image name");
        } else if (this.state.capturedImage == '' || this.state.capturedImage == undefined || this.state.capturedImage == null) {
            alert("Please Capture the Image");
        } else {
            console.log("submiting")
            const apiName = "faceRekognition";
            const path = "/detecttext";
            const init = {
                headers: {
                    'Accept': 'application/json',
                    "Content-Type": "application/x-amz-json-1.1"
                },
                body: JSON.stringify({
                    Image: this.state.base64String,
                    name: this.state.imageName
                })
            }

            API.post(apiName, path, init).then(response => {
                console.log("Response Data is : " + JSON.stringify(response));
                if (JSON.stringify(response.TextDetections.length) > 0) {
                    this.setState({
                        detectedText: response.TextDetections
                    })
                    
                } else {
                    alert("Please Try Again.")
                }
            });
        }
    }

Lambda Function:

Add the following code into your lambda function that you created in your AWS Console.

const AWS = require('aws-sdk');
var rekognition = new AWS.Rekognition();
var s3Bucket = new AWS.S3( { params: {Bucket: "detect-text-in-image"} } );
var fs = require('fs');

exports.handler = (event, context, callback) => {
    let parsedData = JSON.parse(event)
    let encodedImage = parsedData.Image;
    var filePath = parsedData.name;
    let buf = new Buffer(encodedImage.replace(/^data:image\/\w+;base64,/, ""),'base64')
    var data = {
        Key: filePath, 
        Body: buf,
        ContentEncoding: 'base64',
        ContentType: 'image/jpeg'
    };
    s3Bucket.putObject(data, function(err, data){
        if (err) { 
            console.log('Error uploading data: ', data);
            callback(err, null);
        } else {
            var params = {
              Document: { /* required */
                Bytes: buf ,
                S3Object: {
                  Bucket: 'detect-text-in-image',
                  Name: filePath,
                //   Version: 'STRING_VALUE'
                }
              },
              FeatureTypes: ["TABLES" | "FORMS"]
            };

            var params = {
              Image: {
              S3Object: {
                Bucket: "detect-text-in-image", 
                Name: filePath
              }
              }
              };
            rekognition.detectText(params, function(err, data) {
                if (err){
                    console.log(err, err.stack);
                    callback(err)
                }
                else{
                    console.log(data);
                    callback(null, data);
                }
            });
        }
    });
};

In the above code, we would receive the image from React Native which we are storing in S3 Bucket. The stored image is sent to Amazon Recognition which has detectText method that detects the text in the image and sends the response with the detected text in JSON format.

Note: Make sure you have given permissions to the IAM role to access AWS Rekognition’s detectText API.

Here is how your home screen looks like:

Once you capture an image you can see a preview of that image as shown below.

On submitting the captured image with file name you can see the text in that image as shown below:

That’s all folks! I hope it was helpful.

This story is authored by Venu Vaka. He is a software engineer specializing in ReactJS and AWS Cloud.

Real Time Streaming Data Analytics using Amazon Kinesis Family

Amazon Kinesis Data Analytics

Amazon Kinesis Data Analytics (KDA) is the easiest way to analyze streaming data, gain actionable insights, and respond to your business and customer needs in real time. KDS reduces the complexity of building, managing and integrating streaming applications with other AWS services. SQL users can easily query streaming data or build entire streaming applications using templates and an interactive SQL editor. Java developers can quickly build sophisticated streaming applications using open source Java libraries and AWS integrations to transform and analyze data in real-time.

For deep dive into Amazon Kinesis Data Analytics, please go through the official docs.

Amazon Kinesis Data Streams

Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. KDS can continuously capture gigabytes of data per second from hundreds of thousands of sources such as website clickstreams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events. The data collected is available in milliseconds to enable real-time analytics use cases such as real-time dashboards, real-time anomaly detection, dynamic pricing, and more.

For more details about Amazon Kinesis Data Streams, please go through the official docs.

Amazon Kinesis Data Firehose

Amazon Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores and analytics tools. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk, enabling near real-time analytics with existing business intelligence tools and dashboards you’re already using today. It is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt the data before loading it, minimizing the amount of storage used at the destination and increasing security.

For more details about Amazon Kinesis Data Firehose, please go through the official docs.

To Create an Amazon Kinesis Data Stream using Console

  • Open the Kinesis console at https://console.aws.amazon.com/kinesis.
  • In the navigation bar, expand the Region selector and choose a Region.
  • Choose Create data stream.
  • On the Create Kinesis stream page, enter a name for your stream and the number of shards you need, and then click Create Kinesis stream.
    On the Kinesis streams page, your stream’s Status is shown as Creating while the stream is being created. When the stream is ready to use, the Status changes to Active.

Amazon Kinesis Data Generator

The Amazon Kinesis Data Generator (KDG) makes it easy to send data to Kinesis Streams or Kinesis Firehose.

While following this link, choose to Create a Cognito User with Cloud Formation.

  • Choose Create a Cognito User with Cloud Formation.
  • After choosing the above option, console redirects to the Cloud Formation Stack creation page. The console looks like the following.
  • Click on Next, provide the CloudFormation Stack Name and provide username, password details for creating Cognito User for Kinesis Data Generator. 
  • Click on Next and choose Create Stack.
  • After Status of Stack changes to Create complete, click on Outputs tab and open the link under the outputs section.
  • After opening the above link, provide the username and password created in earlier steps.
  • Select Region and Stream/delivery name as created.
    The Record template is 
{
    "sensor_id": {{random.number(50)}},
    "current_temperature": {{random.number(
        {
            "min":0,
            "max":150
        }
    )}},
    "location": "{{random.arrayElement(
        ["AUS","USA","UK"]
    )}}"
}

To Create the Kinesis Data Analytics Application

  • Open the Kinesis Data Analytics console at https://console.aws.amazon.com/kinesisanalytics.
  • Choose Create application.
  • On the Create application page, type an application name, type a description, choose SQL for the application’s Runtime setting, and then choose Create application.

Doing this creates a Kinesis data analytics application with a status of READY. The console shows the application hub where you can configure input and output.

In the next step, you configure input for the application. In the input configuration, you add a streaming data source to the application and discover a schema for an in-application input stream by sampling data on the streaming source.

Configure Streaming Source as Input to Kinesis Data Analytics Application

  • On the Kinesis Analytics applications page in the console, choose Connect streaming data.
  • Source section, where you specify a streaming source for your application. You can select an existing stream source or create one. By default the console names the in-application input stream that is created as INPUT_SQL_STREAM_001. For this exercise, keep this name as it appears.
    Stream reference name – This option shows the name of the in-application input stream that is created, SOURCE_SQL_STREAM_001. You can change the name of the stream.
  • Choose Discover Schema, which automatically discovers the schema of input stream.
  • Choose Save and continue.
    Now, we have an application with input configuration added to it. In the next step, we will add SQL code to perform some analytics on the data in-application input stream.

 Real-Time Analytics on Input Stream Data

  • On the Kinesis Analytics applications page in the console, choose Go to SQL editor.
  • In the Would you like to start running “ApplicationName”? dialog box, choose Yes, start application.
  • The console opens the SQL editor page. Review the page, including the buttons (Add SQL from templates, Save and run SQL) and various tabs.
  • Run Analytics on the input stream data using the following sample query. This Query detects an anomaly in the input stream and sends the anomaly data to anomaly_data_stream and normal data to output_data_stream. Load the following query in SQL editor and choose Save and run SQL.
CREATE OR REPLACE STREAM "anomaly_data_stream" (
	"sensor_id" INTEGER,
	"current_temperature" INTEGER, 
	"location" VARCHAR(16));

CREATE OR REPLACE  PUMP "STREAM_PUMP_ANOMALY" AS INSERT INTO "anomaly_data_stream"
SELECT STREAM "sensor_id",
				"current_temperature",
				"location"
FROM "SOURCE_SQL_STREAM_001" WHERE "current_temperature" > 100;

CREATE OR REPLACE STREAM "output_data_stream" (
	"sensor_id" INTEGER,
	"current_temperature" INTEGER, 
	"location" VARCHAR(16));

CREATE OR REPLACE  PUMP "STREAM_PUMP_OUTPUT" AS INSERT INTO "output_data_stream"
SELECT STREAM "sensor_id",
				"current_temperature",
				"location"
FROM "SOURCE_SQL_STREAM_001" WHERE "current_temperature" < 100;

It creates the in-application stream output_data_stream and anomaly_data_stream.
It creates the pump STREAM_PUMP_OUTPUT and STREAM_PUMP_ANOMALY, and uses it to select rows from SOURCE_SQL_STREAM_001 and insert them in the output_data_stream and anomaly_data_stream. You can see the results in the Real-time analytics tab.

  • The SQL editor has the following tabs:

    The Source data tab shows an in-application input stream data that is mapped to the streaming source. Choose the in-application stream, and you can see data coming in. ROWTIME – Each row in an in-application stream has a special column called ROWTIME. This column is the timestamp when Amazon Kinesis Data Analytics inserted the row in the first in-application stream (the in-application input stream that is mapped to the streaming source).

    The Real-time Analytics tab shows all the other in-application streams created by your application code. It also includes the error stream. Choose DESTINATION_SQL_STREAM to view the rows your application code inserted. 

    The Destination tab shows the external destination where Kinesis Data Analytics writes the query results. We haven’t configured any external destination for our application output yet. 

To create a delivery stream from Kinesis Data Firehose to Amazon S3

  • Open the Kinesis Data Firehose console at https://console.aws.amazon.com/firehose/.
  • Choose Create Delivery Stream. In this case, the name of the stream is anomaly-delivery-stream.
  • On the Destination page, choose the following options.
    • Destination – Choose Amazon S3.
    • Delivery stream name – Type a name for the delivery stream
    • S3 bucket – Choose an existing bucket, or choose New S3 Bucket. If you create a new bucket, type a name for the bucket and choose the region your console is currently using.
    • S3 prefix – Stream stores data in the provided prefix. For anomaly data, the prefix becomes 
      data/anomaly/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
    • S3 error prefix – errors in delivering stream to s3, stores in error prefix.
  • Choose Next.
  • On the Configuration page, leave the fields at the default settings. The only required step is to select an IAM role that enables Kinesis Data Firehose to access your resources, as follows:
    1. For IAM Role, choose Select an IAM role.
    2. In the drop-down menu, under Create/Update existing IAM role, choose Firehose delivery IAM role, leave the fields at their default settings, and then choose Allow.
  • Choose Next.
  • Review your settings, and then choose Create Delivery Stream.

The anomaly-delivery-stream created successfully. In the same way, create another Firehose stream named output-delivery-stream.

Configuring Application Output to Amazon Kinesis Data Firehose

We can optionally add an output configuration to the application, to persist everything written from an in-application stream to an external destination such as an Amazon Kinesis data stream, a Kinesis Data Firehose delivery stream, or an AWS Lambda function.

In this application, we are connecting the in-application stream to a Kinesis Data Firehose delivery stream.

In the Destination Tab, choose in-application stream as anomaly_data_stream and Firehose stream as anomaly-delivery-stream and select the format as JSON. In this way configure for output_data_stream as well.
You can see the following after configuring:

Data writes into S3 using Kinesis Firehose Delivery Stream. Now we can query the data in Athena by running a Crawler once on that path.

Thanks for the read. Hope it was helpful.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Object Detection in React Native App using AWS Rekognition

In this post, we are going to build a React Native app for detecting objects from an image using Amazon Rekognition.

Here we will capture an Image or Select it from file system. We will send that image to API Gateway where it triggers the Lambda Function which will store in S3 Bucket. The stored image is sent to Amazon Recognition which will detect the objects from the image.

Installing dependencies:

Let’s go to React Native Docs, select React Native CLI Quickstart and select our appropriate Development OS and the Target OS as Android, as we are going to build an android application.

Follow the docs for installing dependencies, then create a new React Native Application. Use the command line interface to generate a new React Native project called ObjectDetection.

react-native init ObjectDetection

Preparing the Android device:

We shall need an Android device to run our React Native Android app. This can be either a physical Android device, or more commonly, we can use an Android Virtual Device (AVD) which allows us to emulate an Android device on our computer (using Android Studio).

Either way, we shall need to prepare the device to run Android apps for development. If you have a physical Android device, you can use it for development in place of an AVD by connecting it to your computer using a USB cable and following the instructions here.

If you are using a virtual device follow this link. I shall be using a physical Android device.

Now go to the command line and run react-native run-android inside your React Native app directory

cd ObjectDetection && react-native run-android

If everything is set up correctly, you should see your new app running on your physical device or Android emulator.

API Creation in AWS Console: 

Before going further, create an API in your AWS console following this link.
Once your done with creating API come back to the React Native application.
Now, go to your project directory and Replace your App.js file with the following code.

import React, {Component} from 'react';
import {
 StyleSheet,
 View,
 Text,
 TextInput,
 Image,
 ScrollView,
 TouchableHighlight,
} from 'react-native';
import ImagePicker from 'react-native-image-picker';
import Amplify, {API} from 'aws-amplify';
import Video from 'react-native-video';
 
// Amplify configuration for API-Gateway
Amplify.configure({
 API: {
   endpoints: [
     {
       name: 'LabellingAPI',   //your api name
       endpoint:’<Endpoint-URL>’, //Your Endpoint URL
     },
   ],
 },
});
 
class Registration extends Component {
 constructor(props) {
   super(props);
   this.state = {
     username: 'storeImage.png',
     userId: '',
     image: '',
     capturedImage: '',
     objectName: '',
   };
 }
 
// It selects image from filesystem or capture from camera
 captureImageButtonHandler = () => {
   this.setState({
     objectName: '',
   });
 
   ImagePicker.showImagePicker(
     {title: 'Pick an Image', maxWidth: 800, maxHeight: 600},
     response => {
       console.log('Response = ', response);
       if (response.didCancel) {
         console.log('User cancelled image picker');
       } else if (response.error) {
         console.log('ImagePicker Error: ', response.error);
       } else if (response.customButton) {
         console.log('User tapped custom button: ', response.customButton);
       } else {
         // You can also display the image using data:
         const source = {uri: 'data:image/jpeg;base64,' + response.data};
         this.setState({
           capturedImage: response.uri,
           base64String: source.uri,
         });
       }
     },
   );
 };
 
// this method triggers when you click submit. If the image is valid then It will send the image to API Gateway. 
 submitButtonHandler = () => {
   if (
     this.state.capturedImage == '' ||
     this.state.capturedImage == undefined ||
     this.state.capturedImage == null
   ) {
     alert('Please Capture the Image');
   } else {
     const apiName = 'LabellingAPI';
     const path = '/storeimage';
     const init = {
       headers: {
         Accept: 'application/json',
         'Content-Type': 'application/x-amz-json-1.1',
       },
       body: JSON.stringify({
         Image: this.state.base64String,
         name: 'storeImage.png',
       }),
     };
 
     API.post(apiName, path, init).then(response => {
       if (JSON.stringify(response.Labels.length) > 0) {
         this.setState({
           objectName: response.Labels[0].Name,
         });
       } else {
         alert('Please Try Again.');
       }
     });
   }
 };
 
 render() {
   if (this.state.image !== '') {
   }
   return (
     <View style={styles.MainContainer}>
       <ScrollView>
         <Text
           style={{
             fontSize: 20,
             color: '#000',
             textAlign: 'center',
             marginBottom: 15,
             marginTop: 10,
           }}>
           Capture Image
         </Text>
         {this.state.capturedImage !== '' && (
           <View style={styles.imageholder}>
             <Image
               source={{uri: this.state.capturedImage}}
               style={styles.previewImage}
             />
           </View>
         )}
         {this.state.objectName ? (
           <TextInput
             underlineColorAndroid="transparent"
             style={styles.TextInputStyleClass}
             value={this.state.objectName}
           />
         ) : null}
         <TouchableHighlight
           style={[styles.buttonContainer, styles.captureButton]}
           onPress={this.captureImageButtonHandler}>
           <Text style={styles.buttonText}>Capture Image</Text>
         </TouchableHighlight>
 
         <TouchableHighlight
           style={[styles.buttonContainer, styles.submitButton]}
           onPress={this.submitButtonHandler}>
           <Text style={styles.buttonText}>Submit</Text>
         </TouchableHighlight>
       </ScrollView>
     </View>
   );
 }
}
 
const styles = StyleSheet.create({
 TextInputStyleClass: {
   textAlign: 'center',
   marginBottom: 7,
   height: 40,
   borderWidth: 1,
   marginLeft: 90,
   width: '50%',
   justifyContent: 'center',
   borderColor: '#D0D0D0',
   borderRadius: 5,
 },
 inputContainer: {
   borderBottomColor: '#F5FCFF',
   backgroundColor: '#FFFFFF',
   borderRadius: 30,
   borderBottomWidth: 1,
   width: 300,
   height: 45,
   marginBottom: 20,
   flexDirection: 'row',
   alignItems: 'center',
 },
 buttonContainer: {
   height: 45,
   flexDirection: 'row',
   alignItems: 'center',
   justifyContent: 'center',
   marginBottom: 20,
   width: '80%',
   borderRadius: 30,
   marginTop: 20,
   marginLeft: 5,
 },
 captureButton: {
   backgroundColor: '#337ab7',
   width: 350,
 },
 buttonText: {
   color: 'white',
   fontWeight: 'bold',
 },
 horizontal: {
   flexDirection: 'row',
   justifyContent: 'space-around',
   padding: 10,
 },
 submitButton: {
   backgroundColor: '#C0C0C0',
   width: 350,
   marginTop: 5,
 },
 imageholder: {
   borderWidth: 1,
   borderColor: 'grey',
   backgroundColor: '#eee',
   width: '50%',
   height: 150,
   marginTop: 10,
   marginLeft: 90,
   flexDirection: 'row',
   alignItems: 'center',
 },
 previewImage: {
   width: '100%',
   height: '100%',
 },
});
 
export default Registration;

In the above code, we are configuring amplify with the API name and Endpoint URL that you created as shown below.

Amplify.configure({
 API: {
   endpoints: [
     {
       name: '<Your-API-Name>, 
       endpoint:’<Endpoint-URL>’,
     },
   ],
 },
});

By clicking the capture button it will trigger the captureImageButtonHandler function. It will then ask the user to take a picture or select from file system. When user captures the image or selects from file system, we will store that image in the state as shown below.

captureImageButtonHandler = () => {
   this.setState({
     objectName: '',
   });
 
   ImagePicker.showImagePicker(
     {title: 'Pick an Image', maxWidth: 800, maxHeight: 600},
     response => {
       console.log('Response = ', response);
       if (response.didCancel) {
         console.log('User cancelled image picker');
       } else if (response.error) {
         console.log('ImagePicker Error: ', response.error);
       } else if (response.customButton) {
         console.log('User tapped custom button: ', response.customButton);
       } else {
         // You can also display the image using data:
         const source = {uri: 'data:image/jpeg;base64,' + response.data};
         this.setState({
           capturedImage: response.uri,
           base64String: source.uri,
         });
       }
     },
   );
 };

After capturing the image we will preview that image. By Clicking on submit button, submitButtonHandler function will get triggered where we will send the image to the end point as shown below.

submitButtonHandler = () => {
   if (
     this.state.capturedImage == '' ||
     this.state.capturedImage == undefined ||
     this.state.capturedImage == null
   ) {
     alert('Please Capture the Image');
   } else {
     const apiName = 'LabellingAPI';
     const path = '/storeimage';
     const init = {
       headers: {
         Accept: 'application/json',
         'Content-Type': 'application/x-amz-json-1.1',
       },
       body: JSON.stringify({
         Image: this.state.base64String,
         name: 'storeImage.png',
       }),
     };
 
     API.post(apiName, path, init).then(response => {
       if (JSON.stringify(response.Labels.length) > 0) {
         this.setState({
           objectName: response.Labels[0].Name,
         });
       } else {
         alert('Please Try Again.');
       }
     });
   }
 };

Lambda Function:

Add the following code into your lambda function that you created in your AWS Console.

const AWS = require('aws-sdk')
var rekognition = new AWS.Rekognition()
var s3Bucket = new AWS.S3( { params: {Bucket: "<Your-Bucket>"} } );
var fs = require('fs');
exports.handler = (event, context, callback) => {
   let parsedData = JSON.parse(event)
   let encodedImage = parsedData.Image;
   var filePath = parsedData.name;
   let buf = new Buffer(encodedImage.replace(/^data:image\/\w+;base64,/, ""),'base64')
   var data = {
       Key: filePath,
       Body: buf,
       ContentEncoding: 'base64',
       ContentType: 'image/jpeg'
   };
   s3Bucket.putObject(data, function(err, data){
       if (err) {
           console.log('Error uploading data: ', data);
           callback(err, null);
       } else {
           var params = {
             Image: {
              S3Object: {
               Bucket: "<Your-Bucket>",
               Name: filePath
              }
             },
             MaxLabels: 10,
             MinConfidence: 90
            };
           rekognition.detectLabels(params, function(err, data) {
               if (err){
                   console.log(err, err.stack);
                   callback(err)
               }
               else{
                   console.log(data);
                   callback(null, data);
               }
           });
       }
   });
};

In the above code, we would receive the image from React Native which we are storing in S3 Bucket. The stored image is sent to Amazon Recognition which has detectLabels method that detects the labels from the image and sends the response with the detected labels in JSON format.

capture image screen

Once you capture an image you can see a preview of that image as shown below.

Nike backpack

On submitting the captured image you can see the label of that image as shown below:

Object recognised as backpack

That’s all folks! I hope it was helpful.
For any queries drop them in the comments section.

This story is authored by Dheeraj Kumar and Venu Vaka. Dheeraj is a software engineer specializing in React Native and React based frontend development. Venu is a software engineer specializing in ReactJS and AWS Cloud.

Optimizing QuickSight using Athena Queries and SPICE: Operating cost analysis

In this post, I will be discussing as an example how an automobile manufacturing company could utilize QuickSight to analyze their sales data and make better decisions. We will also learn how to best optimize the QuickSight operational cost structure by using SPICE engine to ingest source data at certain recurring intervals from Athena queries. This has two major advantages : dashboards and analyses load quickly as the data source is within SPICE. Secondly, cost of data ingestion is also brought down as Athena is queried only to refresh the data load in SPICE.

We will look at a sales dashboard, created using data-sets prepared from data in refined zone in a DataLake created using LakeFormation. A Data engineering pipeline writes data to this refined zone with year and month partitions every hour.

In case you wish to build a similar thing and follow along, below is the link to raw datasets:

https://github.com/koushik-bitzop/data-sets/tree/master/sales2016-2018

Creating a SPICE based Athena Data-set:

Select Athena as the data set source:

Select use custom query.

Select Edit/Preview data and then choose data source as SPICE and click on Finish.

Once query successfully ran and you could see the data, click on the Save and Visualise.

In case you want to add any calculated fields or change data types you could do that in the red highlighted section shown above.

I have discussed in detail here in my previous articles Visualizing Multiple Datasets in AWS QuickSight and Adding User-Interactivity to AWS QuickSight Dashboards

Refresh Schedule for Data-sets:

Depending on how frequent new data is arrived you could schedule the refresh. For every refresh an Athena query is executed and the results are imported into SPICE.

Note: 

  1. In this example, Quicksight SPICE pull data refresh is whole data, not incremental.
  2. It is not possible to pass quicksight pass pushdown predicates (variables) from filters in dashboard to Athena. So if you want to look at a rolling window of data such as past 24 hours or past one month or past 6 months, we can use a WHERE clause in the Athena source query to fetch just those records. Also, if the data is partitioned by year and month, only required data is scanned thereby further saving on costs.

A lowdown on QuickSight Operating cost with this architecture:

We are looking at two main cost components:

  1. Athena – S3 data scan costs
  2. QuickSight Infrastructure costs

Athena – S3 data scan costs:

Athena pricing for successful queries:
1TB scan = 5$
S3 storage cost not included.

No. of queriesData scanned in S3Scheduled RefreshTotal Data scanned
(monthly)
Bill estimated
(monthly)
Bill estimated
(annual)
1150 to 210 KBHourly1*24*30*210KB = 0.0001512TB0.000756$0.009072$

Above numbers are a bit low to make an inference. Let us say, you have 4 such queries (each query is scanning around 150 to 200 MB) powering the dashboard and SPICE ingests this data once every hour.

No. of queriesData scanned in S3Scheduled RefreshTotal Data scanned
(monthly)
Bill estimated
(monthly)
Bill estimated
(annual)
4150 to 200 MBHourly4*24*30*300MB = 576GB or 0.57TB$2.88$34.56

In case, we do not use SPICE to load this data from Athena in an hourly fashion and instead use Athena query as the direct source, then cost of the dashboards would increase proportionately with each query. So as an example, if the dashboards are being viewed at a rate of 1000 views per hour (and each dashboard has 4 source queries), then the cost above would be multiplied by a staggering 1000 times! and the annual bill would be an eye popping $ 34,560.

QuickSight infrastructure cost (Standard Edition):

No charge for readers. $9 for Author with annual subscription.

User typeNo. of usersBill estimated
(monthly)
Bill estimated
(annual)
Author1$9$108
Reader3$0$0
Total
$9 pm$108 pa

Note: For Enterprise edition, Readers are billed $0.30 for a 30-minute session up to a maximum charge of $5/reader/month for unlimited use. Authors are billed $18 with annual subscription.
For SPICE additional capacity $0.25/GB/standard and $0.38/GB/enterprise. 

So overall we can see that using SPICE with a periodic data refresh causes the costs to be optimized in a smart way. That’s it folks. I hope it was helpful. For any queries, drop them in the comments section.

This story is authored by Koushik. Koushik is a software engineer and a keen data science and machine learning enthusiast.

AWS QuickSight Auto-Narratives to Highlight Insights using Natural Language Processing

Most often analyzing data sets to summarize their main characteristics, is done with visuals. Yet still one has to sift through the visuals, drilling down, comparing values, and rechecking ideas to extract a conclusion. But with QuickSight that is not the case, using its auto-narratives feature, one could extrapolate conclusion from the data analysis or highlight insights and state them plainly in a natural language as part of the analysis or report. However in day to day analysis, a balanced mix of plain statements and visuals is appreciated. One could use this feature to add a brief summary of the analysis or highlight important points.

In this blog post, I will be using Discovering Barcelona dashboard, created earlier for my previous articles Visualizing Multiple Datasets in AWS QuickSight and Adding User-Interactivity to AWS QuickSight Dashboards. We will look at how to add insights to QuickSight Dashboards, and use auto-narratives to give a brief about Accidents in Barcelona.

Let us have a look at what we are gonna build.

In the below picture, the green highlighted section is the Insights auto generated from the dataset by QuickSight. If you like these insights and want them as part of analysis, you could add them. This is shown in the red highlight.

Once an Insight is added to an analysis the content it holds is called a Narrative.

Adding a Custom Insight:

Let’s learn more about computations later, for now closing this Computation modal will add an empty insight to our analysis.

I also deleted the previous insight, so I could start from scratch with this new one. To customize the insight either click on Customize insight or from the drop down menu on the top right and choose Customize narrative. Make sure to add the fields required for the insight from the fields list. Select the insight visual and select the fields from Fields list. Once added you could see them in the Field wells bar highlighted in red at the top.

Computations are more like ready made templates, values coming from calculations done on the dataset, here’s a list of computations for you to explore. Parameters could also be used in the narrative logic. I have discussed what parameters are and how to use them here. Functions are the same as those we use to add calculated fields while editing data sets. Add computation.

The type of computation needed is chosen.

Once you apply the configuration, changes will be reflected in the analysis. Let us also add the Top ranked computations for month and district.

Once you add, they will be listed in the Computations section.

In the Computations section, the blue objects are variables that can be used in the narrative.

Now applying configurations would reflect in the analysis.

Similarly let’s add for the District also. First add the computation for the district and then configure the narrative.

Brief summary of the analysis using QuickSight Autonarratives

We successfully configured a custom narrative. 

One more cool thing about it is, filters linked with a control for a specific field can be added. I have a filter created earlier that applies to all visuals. Let us remove one district from using the control and see if it affects the Insight.

It affected, now you don’t see the Example district from both Visual, Insight, and also the stats have also changed!

That’s broadly about AWS QuickSight auto-narratives, I hope this was helpful. Please experiment, and do let me know if I missed something in the comment section.

This story is authored by Koushik. He is a software engineer and a keen data science and machine learning enthusiast.

AWS Machine Learning Data Engineering Pipeline for Batch Data

This post walks you through all the steps required to build a data engineering pipeline for batch data using AWS Step Functions. The sequence of steps works like so : the ingested data arrives as a CSV file in a S3 based data lake in the landing zone, which automatically triggers a Lambda function to invoke the Step Function. I have assumed that data is being ingested daily in a .csv file with a filename_date.csv naming convention like so customers_20190821.csv. The step function, as the first step, starts a landing to raw zone file transfer operation via a Lambda Function. Then we have an AWS Glue crawler crawl the raw data into an Athena table, which is used as a source for AWS Glue based PySpark transformation script. The transformed data is written in the refined zone in the parquet format. Again an AWS Glue crawler runs to “reflect” this refined data into another Athena table. Finally, the data science team can consume this refined data available in the Athena table, using an AWS Sagemaker based Jupyter notebook instance. It is to be noted that the data science does not need to do any data pull manually, as the data engineering pipeline automatically pulls in the delta data, as per the data refresh schedule that writes new data in the landing zone.

Let’s go through the steps

How to make daily data available to Amazon SageMaker?

What is Amazon SageMaker?

Amazon SageMaker is an end-to-end machine learning (ML) platform that can be leveraged to build, train, and deploy machine learning models in AWS. Using the Amazon SageMaker Notebook module, improves the efficiency of interacting with the data without the latency of bringing it locally.
For deep dive into Amazon SageMaker, please go through the official docs.

In this blog post, I will be using a dummy customers data. The customers data consists of retailer information and units purchased.

Updating Table Definitions with AWS Glue

The data catalog feature of AWS Glue and the inbuilt integration to Amazon S3 simplifies the process of identifying data and deriving the schema definition out of the source data. Glue crawlers within Data catalog, are used to build out the metadata tables of data stored in Amazon S3.

I created a crawler named raw for the data in raw zone (s3://bucketname/data/raw/customers/). In case you are just starting out on AWS Glue crawler, I have explained how to create one from scratch in one of my earlier article. If you run this crawler, it creates customers table in specified database (raw).

Create an invocation Lambda Function

In case you are just starting out on Lambda functions, I have explained how to create one from scratch with an IAM role to access the StepFunctions, Amazon S3, Lambda and CloudWatch in my earlier article.

Add trigger to the created Lambda function named invoke-step-functions. Configure Bucket, Prefix and  Suffix accordingly.

Once file is arrived at landing zone, it triggers the invoke Lambda function which extracts year, month, day from file name that comes from event. It passes year, month, day with two characters from uuid as input to the AWS StepFunctions.Please replace the following code in invoke-step-function Lambda.

import json
import uuid
import boto3
from datetime import datetime

sfn_client = boto3.client('stepfunctions')

stm_arn = 'arn:aws:states:us-west-2:XXXXXXXXXXXX:stateMachine:Datapipeline-for-SageMaker'

def lambda_handler(event, context):
    
    # Extract bucket name and file path from event
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    path = event['Records'][0]['s3']['object']['key']
    
    file_name_date = path.split('/')[2]
    processing_date_str = file_name_date.split('_')[1].replace('.csv', '')
    processing_date = datetime.strptime(processing_date_str, '%Y%m%d')
    
    # Extract year, month, day from date
    year = processing_date.strftime('%Y')
    month = processing_date.strftime('%m')
    day = processing_date.strftime('%d')
    
    uuid_temp = uuid.uuid4().hex[:2]
    execution_name = '{processing_date_str}-{uuid_temp}'.format(processing_date_str=processing_date_str, uuid_temp=uuid_temp)
    
    # Starts the execution of AWS StepFunctions
    response = sfn_client.start_execution(
          stateMachineArn = stm_arn,
          name= str(execution_name),
          input= json.dumps({"year": year, "month": month, "day": day})
      )
    
    return {"year": year, "month": month, "day": day}

Create a Generic FileTransfer Lambda

Create a Lambda function named generic-file-transfer as we created earlier in this article. In the file transfer Lambda function, it transfers files from landing zone to raw zone and landing zone to archive zone based on event coming from the StepFunction.

  1. If step is landing-to-raw-file-transfer, the Lambda function copies files from landing to raw zone.
  2. If step is landing-to-archive-file-transfer, the Lambda function copies files from landing to archive zone and deletes files from landing zone.

Please replace the following code in generic-file-transfer Lambda.

import json
import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    step = event['step']
    year = event['year']
    month = event['month']
    day = event['day']
    
    bucket_name = event['bucket_name']
    source_prefix = event['source_prefix']
    destination_prefix = event['destination_prefix']
    
    bucket = s3.Bucket(bucket_name)
    
    for objects in bucket.objects.filter(Prefix = source_prefix):
        file_path = objects.key
        
        if ('.csv' in file_path) and (step == 'landing-to-raw-file-transfer'):
            
            # Extract filename from file_path
            file_name_date = file_path.split('/')[2]
            file_name = file_name_date.split('_')[0]
            
            # Add filename to the destination prefix
            destination_prefix = '{destination_prefix}{file_name}/year={year}/month={month}/day={day}/'.format(destination_prefix=destination_prefix, file_name=file_name, year=year, month=month, day=day)
            print(destination_prefix)
            
            source_object = {'Bucket': bucket_name, "Key": file_path}
            
            # Replace source prefix with destination prefix
            new_path = file_path.replace(source_prefix, destination_prefix)
            
            # Copies file
            new_object = bucket.Object(new_path)
            new_object.copy(source_object)
         
        if ('.csv' in file_path) and (step == 'landing-to-archive-file-transfer'):
            
            # Add filename to the destination prefix
            destination_prefix = '{destination_prefix}{year}-{month}-{day}/'.format(destination_prefix=destination_prefix, year=year, month=month, day=day)
            print(destination_prefix)
            
            source_object = {'Bucket': bucket_name, "Key": file_path}
            
            # Replace source prefix with destination prefix
            new_path = file_path.replace(source_prefix, destination_prefix)
            
            # Copies file
            new_object = bucket.Object(new_path)
            new_object.copy(source_object)
            
            # Deletes copied file
            bucket.objects.filter(Prefix = file_path).delete()
            
    return {"year": year, "month": month, "day": day}

Generic FileTransfer Lambda function setup is now complete. We need to check all files are copied successfully from one zone to another zone. If you have large files that needs to be copied, you could check out our Lightening fast distributed file transfer architecture.

Create Generic FileTransfer Status Check Lambda Function

Create a Lambda function named generic-file-transfer-status. If the step is landing to raw file transfer, the Lambda function checks if all files are copied from landing to raw zone by comparing the number of objects in landing and raw zones. If count doesn’t match it will raise an exception, and that exception is handled in AWS StepFunctions and retries after some backoff rate. If the count matches, all files are copied successfully. If the step is landing to archive file transfer, the Lambda function checks that any files are left in landing zone. Please replace the following code in generic-file-transfer-status Lambda function.

import json
import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    step = event['step']
    year = event['year']
    month = event['month']
    day = event['day']
    
    bucket_name = event['bucket_name']
    source_prefix = event['source_prefix']
    destination_prefix = event['destination_prefix']
    
    bucket = s3.Bucket(bucket_name)
    
    class LandingToRawFileTransferIncompleteException(Exception):
        pass

    class LandingToArchiveFileTransferIncompleteException(Exception):
        pass
    
    if (step == 'landing-to-raw-file-transfer'):
        if file_transfer_status(bucket, source_prefix, destination_prefix):
            print('File Transfer from Landing to Raw Completed Successfully')
        else:
            raise LandingToRawFileTransferIncompleteException('File Transfer from Landing to Raw not completed')
    
    if (step == 'landing-to-archive-file-transfer'):
        if is_empty(bucket, source_prefix):
            print('File Transfer from Landing to Archive Completed Successfully')
        else:
            raise LandingToArchiveFileTransferIncompleteException('File Transfer from Landing to Archive not completed.')
    
    return {"year": year, "month": month, "day": day}

def file_transfer_status(bucket, source_prefix, destination_prefix):
    
    try:
        
        # Checks number of objects at the source prefix (count of objects at source i.e., landing zone)
        source_object_count = 0
        for obj in bucket.objects.filter(Prefix = source_prefix):
            path = obj.key
            if (".csv" in path):
                source_object_count = source_object_count + 1
        print(source_object_count)
        
        # Checks number of objects at the destination prefix (count of objects at destination i.e., raw zone)
        destination_object_count = 0
        for obj in bucket.objects.filter(Prefix = destination_prefix):
            path = obj.key
            
            if (".csv" in path):
                destination_object_count = destination_object_count + 1
        
        print(destination_object_count)
        return (source_object_count == destination_object_count)

    except Exception as e:
        print(e)
        raise e

def is_empty(bucket, prefix):
    
    try:
        # Checks if any files left in the prefix (i.e., files in landing zone)
        object_count = 0
        for obj in bucket.objects.filter(Prefix = prefix):
            path = obj.key

            if ('.csv' in path):
                object_count = object_count + 1
                    
        print(object_count)
        return (object_count == 0)
        
    except Exception as e:
        print(e)
        raise e

Create a Generic Crawler invocation Lamda

Create a Lambda function named generic-crawler-invoke. The Lambda function invokes a crawler. The crawler name is passed as argument from AWS StepFunctions through event object. Please replace the following code in generic-crawler-invoke Lambda function.

import json
import boto3

glue_client = boto3.client('glue')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    year = event['year']
    month = event['month']
    day = event['day']
    
    crawler_name = event['crawler_name']
    
    try:
        response = glue_client.start_crawler(Name = crawler_name)
    except Exception as e:
        print('Crawler in progress', e)
        raise e
    
    return {"year": year, "month": month, "day": day}

Create a Generic Crawler Status Lambda

Create a Lambda function named generic-crawler-status. The Lambda function checks whether the crawler ran successfully or not. If crawler is in running state, the Lambda function raises an exception and the exception will be handled in the Step Function and retries after a certain backoff rate. Please replace the following code in generic-crawler-status Lambda.

import json
import boto3

glue_client = boto3.client('glue')

def lambda_handler(event, context):
    
    class CrawlerInProgressException(Exception):
        pass
    
    # Extract Parametres from Event (invoked by StepFunctions)
    year = event['year']
    month = event['month']
    day = event['day']
    
    crawler_name = event['crawler_name']
    
    response = glue_client.get_crawler_metrics(CrawlerNameList =[crawler_name])
    print(response['CrawlerMetricsList'][0]['CrawlerName']) 
    print(response['CrawlerMetricsList'][0]['TimeLeftSeconds']) 
    print(response['CrawlerMetricsList'][0]['StillEstimating']) 
    
    if (response['CrawlerMetricsList'][0]['StillEstimating']):
        raise CrawlerInProgressException('Crawler In Progress!')
    elif (response['CrawlerMetricsList'][0]['TimeLeftSeconds'] > 0):
        raise CrawlerInProgressException('Crawler In Progress!')
    
    return {"year": year, "month": month, "day": day}

Create an AWS Glue Job

AWS Glue is a fully managed ETL (extract, transform, and load) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. For deep dive into AWS Glue, please go through the official docs.

Create an AWS Glue Job named raw-refined. In case you are just starting out on AWS Glue Jobs, I have explained how to create one from scratch in my earlier article. This Glue job converts file format from csv to parquet and stores in refined zone. The push down predicate is used as filter condition for reading data of only the processing date using the partitions.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
# args = getResolvedOptions(sys.argv, ['JOB_NAME'])

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'year', 'month', 'day'])

year = args['year']
month = args['month']
day = args['day']

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "raw", table_name = "customers", push_down_predicate ="((year == " + year + ") and (month == " + month + ") and (day == " + day + "))", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("email_id", "string", "email_id", "string"), ("retailer_name", "string", "retailer_name", "string"), ("units_purchased", "long", "units_purchased", "long"), ("purchase_date", "string", "purchase_date", "string"), ("sale_id", "string", "sale_id", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://bucketname/data/refined/customers/", "partitionKeys": ["year","month","day"]}, format = "parquet", transformation_ctx = "datasink4")

job.commit()

Create a Refined Crawler as we created Raw Crawler earlier in this article. Please point the crawler path to refined zone(s3://bucketname/data/refined/customers/) and database as refined. No need to create a Lambda function for refined crawler invocation and status, as we will pass crawler names from the StepFunction.

Resources required to create an the StepFunction have been created.

Creating the AWS StepFunction

StepFunction is where we create and orchestrate steps to process data according to our workflow. Create an AWS StepFunctions named Datapipeline-for-SageMaker.  In case you are just starting out on AWS StepFunctions, I have explained how to create one from scratch here.

Data is being ingested into landing zone. It triggers a Lambda function which in turn invokes the execution of the StepFunction. The steps in the StepFunction are as follows:

  1. Transfers files from landing zone to raw zone.
  2. Checks all files are copied to raw zone successfully or not.
  3. Invokes raw Crawler which crawls data in raw zone and updates/creates definition of table in the specified database.
  4. Checks if the Crawler is completed successfully or not.
  5. Invokes Glue Job and waits for it to complete.
  6. Invokes refined Crawler which crawls data from refined zone in and updates/creates definition of table in the specified database.
  7. Checks if the Crawler is completed successfully or not.
  8. Transfers files from landing zone to archive zone and deletes files from landing zone.
  9. Checks all files are copied and deleted from landing zone successfully.

Please update the StepFunctions definition with the following code.

{
  "Comment": "Datapipeline For MachineLearning in AWS Sagemaker",
  "StartAt": "LandingToRawFileTransfer",
  "States": {
    "LandingToRawFileTransfer": {
      "Comment": "Transfers files from landing zone to Raw zone.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-raw-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToRawFileTransferFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToRawFileTransferFailed"
        }
      ],
      "Next": "LandingToRawFileTransferPassed"
    },
    "LandingToRawFileTransferFailed": {
      "Type": "Fail",
      "Cause": "Landing To Raw File Transfer failed"
    },
    "LandingToRawFileTransferPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToRawFileTransferStatus"
    },
    "LandingToRawFileTransferStatus": {
      "Comment": "Checks whether all files are copied from landing to raw zone successfully.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-raw-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer-status",
      "Retry": [
        {
          "ErrorEquals": [
            "LandingToRawFileTransferInCompleteException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToRawFileTransferStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToRawFileTransferStatusFailed"
        }
      ],
      "Next": "LandingToRawFileTransferStatusPassed"
    },
    "LandingToRawFileTransferStatusFailed": {
      "Type": "Fail",
      "Cause": "Landing To Raw File Transfer failed"
    },
    "LandingToRawFileTransferStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "StartRawCrawler"
    },
    "StartRawCrawler": {
      "Comment": "Crawls data from raw zone and adds table definition to the specified Database. IF table definition exists updates the definition.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "raw",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-invoke",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "StartRawCrawlerFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "StartRawCrawlerFailed"
        }
      ],
      "Next": "StartRawCrawlerPassed"
    },
    "StartRawCrawlerFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "StartRawCrawlerPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "RawCrawlerStatus"
    },
    "RawCrawlerStatus": {
      "Comment": "Checks whether crawler is successfully completed.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "raw",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-status",
      "Retry": [
        {
          "ErrorEquals": [
            "CrawlerInProgressException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "RawCrawlerStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "RawCrawlerStatusFailed"
        }
      ],
      "Next": "RawCrawlerStatusPassed"
    },
    "RawCrawlerStatusFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "RawCrawlerStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "GlueJob"
    },
    "GlueJob": {
      "Comment": "Invokes Glue job and waits for Glue job to complete.",
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "retail-raw-refined",
        "Arguments": {
          "--refined_prefix": "data/refined",
          "--year.$": "$.year",
          "--month.$": "$.month",
          "--day.$": "$.day"
        }
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "GlueJobFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GlueJobFailed"
        }
      ],
      "Next": "GlueJobPassed"
    },
    "GlueJobFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "GlueJobPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.Arguments.--year",
        "month.$": "$.Arguments.--month",
        "day.$": "$.Arguments.--day"
      },
      "Next": "StartRefinedCrawler"
    },
    "StartRefinedCrawler": {
      "Comment": "Crawls data from refined zone and adds table definition to the specified Database.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "refined",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-invoke",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "StartRefinedCrawlerFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "StartRefinedCrawlerFailed"
        }
      ],
      "Next": "StartRefinedCrawlerPassed"
    },
    "StartRefinedCrawlerFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "StartRefinedCrawlerPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "RefinedCrawlerStatus"
    },
    "RefinedCrawlerStatus": {
      "Comment": "Checks whether crawler is successfully completed.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "refined",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-status",
      "Retry": [
        {
          "ErrorEquals": [
            "CrawlerInProgressException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "RefinedCrawlerStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "RefinedCrawlerStatusFailed"
        }
      ],
      "Next": "RefinedCrawlerStatusPassed"
    },
    "RefinedCrawlerStatusFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "RefinedCrawlerStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToArchiveFileTransfer"
    },
    "LandingToArchiveFileTransfer": {
      "Comment": "Transfers files from landing zone to archived zone",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-archive-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToArchiveFileTransferFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToArchiveFileTransferFailed"
        }
      ],
      "Next": "LandingToArchiveFileTransferPassed"
    },
    "LandingToArchiveFileTransferFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "LandingToArchiveFileTransferPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToArchiveFileTransferStatus"
    },
    "LandingToArchiveFileTransferStatus": {
      "Comment": "Checks whether all files are copied from landing to archived successfully.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-archive-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer-status",
      "Retry": [
        {
          "ErrorEquals": [
            "LandingToArchiveFileTransferInCompleteException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToArchiveFileTransferStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToArchiveFileTransferStatusFailed"
        }
      ],
      "Next": "LandingToArchiveFileTransferStatusPassed"
    },
    "LandingToArchiveFileTransferStatusFailed": {
      "Type": "Fail",
      "Cause": "LandingToArchiveFileTransfer invocation failed"
    },
    "LandingToArchiveFileTransferStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "End": true
    }
  }
}

After updating the AWS StepFunctions definition, the visual workflow looks like the following.

Now upload file in data/landing/ zone in the bucket  where the trigger has been configured with the Lambda. The execution of StepFunction has started and the visual workflow looks like the following.

In RawCrawlerStatus step, if the Lambda is failing we retry till sometime and then mark the StepFunction as failed. If the StepFunction ran successfully. The visual workflow of the StepFunction looks like following.

Machine Learning workflow using Amazon SageMaker

The final step in this data pipeline is to make the processed data available in a Jupyter notebook instance of the Amazon SageMaker. Jupyter notebooks are popularly used among data scientists to do exploratory data analysis, build and train machine learning models.

Create Notebook Instance in Amazon SageMaker

Step1: In the Amazon SageMaker console choose Create notebook instance.

Step2: In the Notebook Instance settings populate the Notebook instance name, choose an instance type depends on data size, and a role for the notebook instances in Amazon SageMaker to interact with Amazon S3. The SageMaker execution role needs to have the required permission to Athena, the S3 buckets where the data resides, and KMS if encrypted.

Step3: Wait for the Notebook instances to be created and the Status to change to InService.

Step4: Choose the Open Jupyter, which will open the notebook interface in a new browser tab.

Click new to create a new notebook in Jupyter. Amazon SageMaker provides several kernels for Jupyter including support for Python 2 and 3, MXNet, TensorFlow, and PySpark. Choose Python as the kernel for this exercise as it comes with the Pandas library built in.

Step5: Within the notebook, execute the following commands to install the Athena JDBC driver. PyAthena is a Python DB API 2.0 (PEP 249) compliant client for the Amazon Athena JDBC driver.

import sys
!{sys.executable} -m pip install PyAthena

Step6: After the Athena driver is installed, you can use the JDBC connection to connect to Athena and populate the Pandas data frames. For data scientists, working with data is typically divided into multiple stages: munging and cleaning data, analyzing/ modeling it, then organizing the results of the analysis into a form suitable for plotting or tabular display. Pandas is the ideal tool for all of these tasks.

from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
               region_name='REGION, for example, us-east-1')

df = pd.read_sql("SELECT * FROM <DATABASE>.<TABLENAME> limit 10;", conn)
df

As shown above, the dataframe always stays consistent with the latest incoming data because of the data engineering pipeline setup earlier in the ML workflow. This dataframe can be used for downstream ad-hoc model building purposes or for exploratory data analysis.

That’s it folks. Thanks for the read.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Real Time Face Identification on Live Camera Feed using Amazon Rekognition Video and Kinesis Video Streams

In this post, we are going to learn how to perform facial analysis on live feed by setting up a serverless video analytics architecture using Amazon Rekognition Video and Amazon Kinesis Video Streams.

Use cases:

  1. Intruder notification system
  2. Employee sign-in system

Architecture overview

Real time face recognition using AWS on a live video stream

We shall learn how to use the webcam of a laptop (we can, of course, use professional grade cameras and hook it up with Kinesis Video streams for a production ready system) to send a live video feed to the  Amazon Kinesis Video Stream. The stream processor in Amazon Rekognition Video picks up this webcam feed and analyses by comparing this feed with the faces in the face collection created beforehand. This analysis is written to the Amazon Kinesis Data Stream. For each record written to the Kinesis data stream, the lambda function is invoked. This lambda reads the record from kinesis stream data. If there are any facial matches or mismatches, depending upon how the lambda is configured an email notification is sent via Amazon SNS (Simple Notification Service) to the registered email addresses.

Note: Make sure all the above AWS resources are created in the same region.

Before going further, make sure you already have the AWS CLI configured on your machine. If not, follow this link –  https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html

Step-1: Create Kinesis Video Stream

We need to create the Kinesis Video Stream for our webcam to connect and send the feed to Kinesis Video Streams from AWS console. Create a new one by clicking the Create button. Give the stream a name in stream configuration.

We have successfully created a Kinesis video stream.

Step-2: Send Live Feed to the Kinesis Video Stream

We need a video producer, anything that sends media data to the Kinesis video stream is called a producer. AWS currently provides producer library primarily supported in these four languages(JAVA, Android, C++, C) only. We use the C++ Producer Library as a GStreamer plugin.

To easily send media from a variety of devices on a variety of operating systems, this tutorial uses GStreamer, an open-source media framework that standardizes access to cameras and other media sources.

Download the Amazon Kinesis Video Streams Producer SDK from Github using the following Git command:

git clone https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-cpp 

After downloading successfully you can compile and install the GStreamer sample in the kinesis-video-native-build directory using the following commands:

For Ubuntu – run the following commands

sudo apt-get update

sudo apt-get install libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev gstreamer1.0-plugins-base-apps

sudo apt-get install gstreamer1.0-plugins-bad gstreamer1.0-plugins-good gstreamer1.0-plugins-ugly gstreamer1.0-tools

For Windows – run the following commands

Inside mingw32 or mingw64 shell, go to kinesis-video-native-build directory and run ./min-install-script 

For macOS – run the following commands

Install homebrew

Run brew install pkg-config openssl cmake gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly log4cplus

Go to kinesis-video-native-build directory and run ./min-install-script

Running the GStreamer webcam sample application:
The sample application kinesis_video_gstreamer_sample_app in the kinesis-video-native-build directory uses GStreamer pipeline to get video data from the camera. Launch it with the kinesis video stream name created in Step-1 and it will start streaming from the camera.

AWS_ACCESS_KEY_ID=<YourAccessKeyId> AWS_SECRET_ACCESS_KEY=<YourSecretAccessKey> ./kinesis_video_gstreamer_sample_app <stream_name>

After running the above command, the streaming would start and you can see the live feed in Media preview of Kinesis Video Stream created in the Step-1.

Step-3: Creating Resources using AWS CloudFormation Stack

The CloudFormation stack will create the resources that are highlighted in the following image.

Highlighted resources to be created using CloudFormation
  1. The following link will automatically open a new CloudFormation Stack in us-west-2: CloudFormation Stack.
  2. Go to View in Designer and edit the template code and click on create stack.
    1. Change the name of the application from Default.
    2. Change the Nodejs version of the RekognitionVideoLambda to 10.x (Version 6.10 isn’t supported anymore) and then click on create stack.
editing CloudFormation stack template

Enter your Email Address to receive notifications and then choose Next as shown below:

Skip the Configure stack options step by choosing Next. In the final step you must check the checkbox and then choose Create stack as in the below image:

Once the stack created successfully you can see it in your stacks as below with status CREATE_COMPLETE. This creates the required resources.

Once completed, you will receive a confirmation email to subscribe/receive notifications. Make sure you subscribed to that.

snapshot from subscribe confirmation email

Step-4: Add face to a Collection

As learned earlier the Stream Processor in Amazon Rekognition Video picks up and analyzes the feed coming from Kinesis Video Stream by comparing it with the faces in the face collection. Let’s create this collection.

For more information follow the link – https://docs.aws.amazon.com/rekognition/latest/dg/create-collection-procedure.html

Create Collection: Create a face collection using the AWS CLI on the command line with below command:

aws rekognition create-collection --collection-id <Collection_Name> --region us-west-2

Add face(s) to the collection: You can use your picture for testing.
First we need to upload the image(s) to an Amazon S3 bucket in the us-west-2 region. Run the below command by replacing BUCKET_NAME, FILE_NAME with your details and you can give any name for FACE-TAG ( can be name of the person).

aws rekognition index-faces --image '{"S3Object":{"Bucket":"<BUCKET_NAME>","Name":"<FILE_NAME>.jpeg"}}' --collection-id "rekVideoBlog" --detection-attributes "ALL" --external-image-id "<FACE-TAG>" --region us-west-2

After that you will receive a response as output as shown below

Now we all ready to go further. If you have multiple photos run the above command again with the new file name.

Step-5: Creating the Stream Processor

We need to create a stream processor which does all the work of reading video stream, facial analysis against face collection and finally writing the analysis data to Kinesis data stream. It contains information about the Kinesis data stream, Kinesis video stream, Face collection ID and the role that is used by Rekognition to access Kinesis Video Stream.

Copy meta info required:
Go to your Kinesis video stream created in step-1, note down the Stream ARN (KINESIS_VIDEO_STREAM_ARN) from the stream info as shown below:

Next from the CloudFormation stack output note down the values of KinesisDataStreamArn (KINESIS_DATA_STREAM_ARN) and RecognitionVideoIAM (IAM_ROLE_ARN) as shown below:

Now create a JSON file in your system that contains the following information:

{
       "Name": "streamProcessorForRekognitionVideoBlog",
       "Input": {
              "KinesisVideoStream": {
                     "Arn": "<KINESIS_VIDEO_STREAM_ARN>"
              }
       },
       "Output": {
              "KinesisDataStream": {
                     "Arn": "<KINESIS_DATA_STREAM_ARN>"
              }
       },
       "RoleArn": "<IAM_ROLE_ARN>",
       "Settings": {
              "FaceSearch": {
                     "CollectionId": "COLLECTION_NAME",
                     "FaceMatchThreshold": 85.5
              }
       }
}

Create the stream processor with AWS CLI from command line with following command:

aws rekognition create-stream-processor --region us-west-2 --cli-input-json file://<PATH_TO_JSON_FILE_ABOVE>

Now start the stream processor with following command:

aws rekognition start-stream-processor --name streamProcessorForRekognitionVideoBlog --region us-west-2

You can see if the stream processor is in a running state with following command:

aws rekognition list-stream-processors --region us-west-2

If it’s running, you will see below response:

And also make sure your camera is streaming in command line and feed is being received by the Kinesis video stream.

Now you would get notified whenever known or an unknown person shows up in your webcam.

Example notification:

Note: 

  1. Lambda can be configured to send email notification only if unknown faces are detected or vice versa.
  2. For cost optimization, one could use a python script to call Amazon Rekognition service with a snapshot only when a person is detected instead of wasting resources in uninhabited/unpeopled area.

Step-6: Cleaning Up Once Done

Stop the stream processor.

aws rekognition stop-stream-processor --name streamProcessorForRekognitionVideoBlog --region us-west-2

Delete the stream processor.

aws rekognition delete-stream-processor --name streamProcessorForRekognitionVideoBlog --region us-west-2

Delete the Kinesis Video Stream. Go to the Kinesis video streams from AWS console and select your stream and Delete.

Delete the CloudFormation stack. Go to the CloudFormation, then select stacks from AWS console and select your stack and Delete.

Thanks for the read! I hope it was both fun and useful.

This story is co-authored by Venu and Koushik. Venu is a software engineer and machine learning enthusiast. Koushik is a software engineer and a keen data science and machine learning enthusiast.