Scheduling tasks with AWS SQS and Lambda

In today’s blog post we will be learning a workaround for how to schedule or delay a message using AWS SQS despite its 15 minutes (900 seconds) upper limit.

But first let us understand some SQS attributes briefly, firstly Delivery Delay, it lets you specify a delay between 0 and 900 seconds (15 minutes). When set, any message sent to the queue will only become visible to consumers after the configured delay period. Secondly Visibility Timeout, the time that a received message from a queue will be invisible to be received again unless it’s deleted from the queue.

If you want to learn about dead letter queue and deduplication, you could follow my other article: Processing High Volume Big Data Concurrently with No Duplicates using AWS SQS.

So, when a consumer receives a message, the message remains in the queue but is invisible for the duration of its visibility timeout, after which other consumers will be able to see the message. Ideally, the first consumer would handle and delete the message before the visibility timeout expires.

The upper limit for visibility timeout is 12 hours. We could leverage this to schedule/delay a task.

A typical combination would be SQS with Lambda where the invoked function executes the task. Usually, standard queues when enabled with lambda triggers have immediate consumption that means when a message is inserted into the standard queue the lambda function is invoked immediately with the message available in the event object. 

Note: If the lambda results in an error the message stays in the queue for further receive requests, otherwise it is deleted.

That said, there could be 2 cases:

  1. A generic setup that can adapt to a range of time delays.
  2. A stand-alone setup built to handle only a fixed time delay.

The idea is to insert a message into the queue with task details and time to execute(target time) and have the lambda do the dirty work.

Case1: 

The Lambda function checks if target time equals current time, if so execute the task and message is deleted as the lambda executes without error else change the visibility timeout of that message in the queue with delta difference and raise an error leaving the message in the queue.

Case2:

The SQS’s default visibility timeout is configured with the required fixed time delay. The Lambda function checks if the difference of target time and current time equals fixed time delay, if so execute the task and message is deleted as the lambda executes without error else simply raise an error leaving the message untampered back in the queue.

The message is retried after it’s visibility timeout which is the required fixed time delay and is executed.

The problem with this approach is accuracy and scalability.

Here’s the lambda code for case2:
Processor.py

import boto3
import json
from datetime import datetime, timezone
import dateutil.tz

tz = dateutil.tz.gettz('US/Central')

fixed_time_delay = 1 # change this value, be it hour, min, sec

def lambda_handler(event, context):
    # TODO implement
    
    message = event['Records'][0]
    # print(message)
    result = json.loads(message['body'])
    
    task_details = result['task_details']
    target_time = result['execute_at']
    
    tt = datetime.strptime(target_time, "%d/%m/%Y, %H:%M %p CST")
    print(tt)
    
    t_now = datetime.now(tz)
    time_now = t_now.strftime("%d/%m/%Y, %H:%M %p CST")
    tn = datetime.strptime(time_now, "%d/%m/%Y, %H:%M %p CST")
    print(tn)
    
    delta_time = tn-tt
    print(delta_time)

    delta_in_whatever = #extract delay in hour, min, sec 
    
    if delta_in_whatever == fixed_time_delay: 
        # execute task logic
        print(task_details)
    else:
        raise e

Conclusion:
Scheduling tasks using SQS isn’t effective in all scenarios. You could use AWS step function’s wait state to achieve milliseconds accuracy, or Dynamo DB’s TTL feature to build an ad hoc scheduling mechanism, the choice of service used is largely dependent on the requirement. So, here’s a wonderful blog post that gives you a bigger picture of different ways to schedule a task on AWS.

This story is authored by Koushik. Koushik is a software engineer specializing in AWS Cloud Services.

Programmatically Updating Autoscaling policy on DynamoDB with boto3: Application Auto Scaling

In this blog post, we will be learning how to programmatically update the auto-scaling policy settings of a DynamoDB table. The idea is to scale it smoothly (minimal write request throttling) irrespective of the anticipated traffic spikes it receives. We do this using AWS Application Auto Scaling and Lambda (boto3).

Understanding how DynamoDB auto-scales

DynamoDB auto scaling works based on Cloudwatch metrics and alarms built on top of 3 parameters:

  1. Provisioned capacity
  2. Consumed capacity
  3. Target utilization

Let us understand these parameters briefly, Firstly Provisioned capacity is how many units are allocated for the table/index, while Consumed capacity is how much of it is utilized. Lastly, Target utilization is the threshold we specify at how much utilization(%) we want autoscaling to happen. Accordingly provisioned capacity is increased.
Note: Capacity can be for either writing or reading, sometimes both.

Let us say, you provisioned 50 write units for the table:

  • Autoscale by 30 units when 40 units (80% ) are consumed. Then your autoscaling policy would look something like this.
Provisioned write capacity: 50 units
Target utilization: 80%
max write units: 10,000
min write units: 30

This auto-scaling policy looks fine yet it is static and can it cope up with unpredictable traffic spikes? If so how much time does it take to autoscale (15min)? What if the increased units were less than required, how do you reduce throttling?

Traffic spikes

Broadly traffic spikes could be categorized into 2 categories:

  1. Anticipated spike
  2. Sudden spike

And the traffic fluctuation curve can be any of the following:

  1. Steady rise and fall.
  2. Steep peaks and valleys.

Having a single static autoscaling policy that copes up with all the above cases or a combination of them is not possible. DynamoDB auto-scales capacity based on target capacity utilization instead of write requests. This makes it inefficient and slow.

Target capacity utilization is the percentage of consumption over Provisioned. Mathematically put (consumed/provisioned)*100.

If the traffic is unpredictable and you have to understand and improve (tweak) DynamoDB autoscaling’s internal workflow, here is a wonderful blog post that I would highly recommend.

Anticipated traffic is in direct correlation with your actions, be it a scheduled marketing event, data ingestion or even sale transactions on a festive day. In such scenarios, a workaround would be the ability to programmatically update the autoscaling policy.

Updating autoscaling policy using AWS Application Auto Scaling.

AWS has this service called AWS Application Auto Scaling. With Application Auto Scaling, you can configure/update automatic scaling for the following resources:

  • Amazon ECS services
  • Amazon EC2 Spot Fleet requests
  • Amazon EMR clusters
  • Amazon AppStream 2.0 fleets
  • Amazon DynamoDB tables and global secondary indexes throughput capacity
  • Amazon Aurora Replicas
  • Amazon SageMaker endpoint variants
  • Custom resources provided by your own applications or services
  • Amazon Comprehend document classification endpoints
  • AWS Lambda function provisioned concurrency

The scaling of the provisioned capacity of these services is managed by the autoscaling policy that is in place. AWS Application Auto Scaling service can be used to modify/update this autoscaling policy.

Here is a sample Lambda (python) code that updates DynamoDB autoscaling settings:

# update-dynamo-autoscale-settings
import os
import sys
import boto3
from botocore.exceptions import ClientError

# Add path for additional imports
sys.path.append('./lib/python3.7/site-packages')

# Initialize boto3 clients
dynamodb = boto3.resource('dynamodb')
dynamodb_scaling = boto3.client('application-autoscaling')

# Initialize variables
table_name = "your dynamo table name"
table = dynamodb.Table(table_name)

def update_auto_scale_settings(min_write_capacity: int, table_name: str): 
    max_write_capacity = 40000 #default number
    dynamodb_scaling.register_scalable_target(ServiceNamespace = "dynamodb",
                                                 ResourceId = "table/{}".format(table_name),
                                                 ScalableDimension = "dynamodb:table:WriteCapacityUnits",
                                                 MinCapacity = min_write_capacity,
                                                 MaxCapacity = max_write_capacity)
    
    # if you have indexes on the table, add their names to indexes list below
    # indexes = ["index1", "index2", "index3"]
    # for index_name in indexes:
    #     scaling_dynamodb.register_scalable_target(ServiceNamespace = "dynamodb",
    #                                              ResourceId = "table/{table_name}/index/{index_name}".format(table_name = table_name, index_name = index_name),
    #                                              ScalableDimension = "dynamodb:index:WriteCapacityUnits",
    #                                              MinCapacity = min_write_capacity,
    #                                              MaxCapacity = max_write_capacity)



#put_scaling policy is another call that needs to be made, if you want a different target utilization value.

def lambda_handler(event, context):
    
    try:

        # logic before updating

        write_units = #number
        update_auto_scale_settings(table_name,write_units)
                
        # logic after updating

        # Insert item into dynamo
        # table.put_item(Item = record)
                    
    except Exception as e:
        raise e
    
    else:
        print("success")

I hope it was helpful. Thanks for the read!

This story is authored by Koushik. Koushik is a software engineer and a keen data science and machine learning enthusiast.

Image Text Detection with Bounding Boxes using OpenCV in React Native Mobile App

In our earlier blog post, we had built a Text Detection App with React Native using AWS Rekognition. The Text Detection App basically detects the texts and their dimensions in the captured image. This blog is an extension to it, where we shall learn how to draw Bounding Boxes using the dimensions of the detected text in the image. Assuming you had followed our earlier blog and created the Text Detection App we will proceed further.

The following diagram depicts the architecture we will be building. 

The React app sends the image to be processed via an API call, detect_text lambda function stores it in S3 and calls Amazon Rekognition with its URL to get the detected texts along with their dimensions. With this data it invokes the draw_bounding_box lambda function, it fetches the image from S3, draws the bounding boxes and stores it as a new image. With new URL it responds back to detect_text lambda which in turn responds back to the app via API gateway with the image URL having bounding boxes.

In our previous blog we already have finished detecting the text part, let us look at creating the rest of the setup.

We will use another AWS lambda function to draw bounding boxes and that function would need OpenCV.

OpenCV (Open Source Computer Vision Library) is an open-source computer vision and machine learning software library which was built to provide a common infrastructure for computer vision applications and to accelerate the use of machine perception in the commercial products.

Preparing the package for draw_bounding_box Lambda:

We need OpenCV, Numpy libraries for image manipulation but lambda doesn’t support these libraries by default. We have to prepare them and upload them. So, we will be preparing the lambda code as a package locally and then upload.

To prepare these libraries, follow this link. After finishing the process you will get a zip file. Unzip the file and copy the below lambda code in .py file.

Note: The name of this .py file should be the same as your lambda index handler.
default name: lambda_handler

lambda_handler.py:

import cv2
import boto3
import numpy as np
import base64
import json

def lambda_handler(event, context):
    bucketName='<-Your-Bucket-Name->'
    s3_client = boto3.client('s3')

    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucketName)

    # reading image name from event object
    obj = bucket.Object(key=event['image'])
    # Image we read from S3 bucket
    response = obj.get()

    imgContent = response["Body"].read()
    np_array = np.fromstring(imgContent, np.uint8)
    ''' we can read an image using cv2.imread() or np.fromstring(imgContent, np.uint8) followed by image_np = cv2.imdecode(np_array, cv2.IMREAD_COLOR).    '''
    image_np = cv2.imdecode(np_array, cv2.IMREAD_COLOR)

    '''
    imdecode reads an image from the specified buffer in the memory. 
    If the buffer is too short or contains invalid data, the empty matrix/image is returned.
    In OpenCV you can easily read in images with different file formats (JPG, PNG, TIFF etc.) using imread
    '''
    height, width, channels = image_np.shape
    # reading dimensions from the event object
    dimensions=json.loads(event['boundingBoxDimensions'])

    for dimension in dimensions:
        leftbox = int(width * dimension['Left'])
        topbox = int(height * dimension['Top'])
        widthbox = int(width * dimension['Width'])
        heightbox = int(height * dimension['Height'])
        # Using cv2.rectangle, we will draw a rectangular box with respect to dimensions
        k=cv2.rectangle(image_np, (leftbox, topbox), (leftbox+widthbox, topbox+heightbox) ,(0, 255, 0), 2)

    # used to write the image changes in a local image
    # For that create any folder(here tmp) and place any sample image(here sample.jpeg) in it. If not, we would encounter the following error.
‘Utf8’ codec can’t decode byte 0xaa in position 1: invalid start byte: UnicodeDecodeError.
 
    cv2.imwrite("/tmp/sample.jpeg", k)

    newImage="<-New-Image-Name->"
    # we put the image in S3. And return the image name as we store the modified image in S3
    s3_client.put_object(Bucket=bucketName, Key=newImage, Body=open("/tmp/sample.jpeg", "rb").read())

    return {
        'statusCode': 200,
        'imageName':newImage,
        'body': 'Hello from Lambda!'
    }

The package folder structure would look like below.

As these files exceed the Lambda upload limit, we will be uploading them to S3 and then add it from there.

Zip this lambda-package and upload it to S3. You can paste its S3 URL to your function code and change the lambda runtime environment to use Python 2.7 (OpenCV dependency).

Invoking draw_bounding_box lambda

The detect_text lambda invokes draw_bounding_box lambda in RequestResponse mode, which means detect_text lambda waits for the response of draw_bounding_box lambda.

The draw_bounding_box lambda function reads the image name and box dimensions from the event object. Below is the code for detect_text lambda which invokes the draw_bounding_box lambda function.

detect_text.js

const AWS = require('aws-sdk');
// added package
const S3 = new AWS.S3({signatureVersion: 'v4'});

var rekognition = new AWS.Rekognition();
var s3Bucket = new AWS.S3( { params: {Bucket: "<-Your-Bucket-Name->"} } );
var fs = require('fs');
// To invoke lambda function
var lambda = new AWS.Lambda();

exports.handler = (event, context, callback) => {
    let parsedData = JSON.parse(event);
    let encodedImage = parsedData.Image;
    var filePath = parsedData.name;

    let buf = new Buffer(encodedImage.replace(/^data:image\/\w+;base64,/, ""),'base64');
    var data = {
        Key: filePath, 
        Body: buf,
        ContentEncoding: 'base64',
        ContentType: 'image/jpeg'
    };
    s3Bucket.putObject(data, function(err, data){
        if (err) { 
            console.log('Error uploading data: ', data);
            callback(err, null);
        } else {
            var params = {
                Image: {
                    S3Object: {
                        Bucket: "your-s3-bucket-name", 
                        Name: filePath
                    }
                }
            };

            rekognition.detectText(params, function(err, data) {
                if (err){
                    console.log(err, err.stack);
                    callback(err);
                }
                else{
                    console.log("data: ",data);
                    var detectedTextFromImage=[];
                    var geometry=[];
                    for (item in data.TextDetections){
                      if(data.TextDetections[item].Type === "LINE"){
                        geometry.push(data.TextDetections[item].Geometry.BoundingBox);
                        detectedTextFromImage.push(data.TextDetections[item].DetectedText);
                      }
                    }
                    var dimensions=JSON.stringify(geometry);
                    var payloadData={
                        "boundingBoxDimensions":dimensions,
                        "image": filePath
                    };

                    var params = {
                        FunctionName: 'draw_bounding_box',
                        InvocationType: "RequestResponse",
                        Payload: JSON.stringify(payloadData)
                    };
                    
                    lambda.invoke(params, function(err, data) {
                        if (err){
                            console.log("error occured");
                            console.log(err);
                        }
                        else{
                            var jsondata=JSON.parse(data.Payload);
                            var params = {
                                Bucket: "your-s3-bucket-name", 
                                Key: jsondata.imageName,
                            };
                            s3Bucket.getSignedUrl('getObject', params, function (err, url) {
                                var responseData={
                                        "DetectedText":detectedTextFromImage,
                                    "url":url
                                }
                                callback(null, responseData);
                            });                            
                        }
                    });
                    console.log("waiting for response");
                }
            });
        }
    });
};

Everything is similar except the rekognition.detectText() function. Upon success, we are storing the detected text in a list and dimensions in another list. Next, we need to pass the dimensions list and image name as arguments to the draw_bounding_box lambda function.

var payloadData={
    "boundingBoxDimensions":dimensions,
    "image": filePath
};

var params = {
    FunctionName: 'draw_bounding_box',
    InvocationType: "RequestResponse",
    Payload: JSON.stringify(payloadData)
};
lambda.invoke(params, function(err, data) {
    if (err){
        console.log("error occured");
        console.log(err);
    }
    else{
        var jsondata=JSON.parse(data.Payload);
        var params = {
            Bucket: "your-s3-bucket-name", 
            Key: jsondata.imageName,
        };
        s3Bucket.getSignedUrl('getObject', params, function (err, url) {
            var responseData={
                "DetectedText":detectedTextFromImage,
                "url":url
            }
            callback(null, responseData);
        });                            
    }
});

Lambda.invoke()  expects two arguments where the first argument needs to be an object which contains the name of the lambda function, invocation type, payload data. And the second argument is to handle success or failure response. When the detect_text lambda function invokes the draw_bounding_box function, it will process the image and give the response back to the detect_text lambda function. Upon success, we get the JSON object which contains the modified image name. 

Next, we use s3Bucket.getSignedUrl() to get the image URL which we will send to our React Native App with detected text also as a response.

Replace the existing App.js file, in react-native with the code below.
App.js

import React, {Component} from 'react';
import {
    StyleSheet,
    View,
    Text,
    TextInput,
    Image,
    ScrollView,
    TouchableHighlight,
    ActivityIndicator
} from 'react-native';
import ImagePicker from "react-native-image-picker";
import Amplify, {API} from "aws-amplify";
Amplify.configure({
    API: {
        endpoints: [
            {
                name: "<-Your-API-name->",
                endpoint: "<-Your-end-point-url->"
            }
        ]
    }
});

class Registration extends Component {
  
    constructor(props){
        super(props);
        this.state =  {
            isLoading : false,
            showInputField : false,
            imageName : '',
            capturedImage : '',
            detectedText: []
        };
    }

    captureImageButtonHandler = () => {
        ImagePicker.showImagePicker({title: "Pick an Image", maxWidth: 800, maxHeight: 600}, (response) => {
            console.log('Response - ', response);
            if (response.didCancel) {
                console.log('User cancelled image picker');
            } else if (response.error) {
                console.log('ImagePicker Error: ', response.error);
            } else if (response.customButton) {
                console.log('User tapped custom button: ', response.customButton);
            } else {
                const source = { uri: 'data:image/jpeg;base64,' + response.data };
                this.setState({
                    imageName: "IMG-" + Date.now(),
                    showInputField: true,
                    capturedImage: response.uri,
                    base64String: source.uri
                })
            }
        });
    }

    submitButtonHandler = () => {
        this.setState({
            isLoading: true
        })
        if (this.state.capturedImage == '' || this.state.capturedImage == undefined || this.state.capturedImage == null) {
            alert("Please Capture the Image");
        } else {
            console.log("submiting")
            const apiName = "<-Your-API-name->";
            const path = "/API-path";
            const init = {
                headers: {
                    'Accept': 'application/json',
                    "Content-Type": "application/x-amz-json-1.1"
                },
                body: JSON.stringify({
                    Image: this.state.base64String,
                    name: this.state.imageName
                })
            }

            API.post(apiName, path, init).then(response => {
                this.setState({
                    capturedImage: response.url,
                    detectedText: response.DetectedText,
                    isLoading:false
                })
            });
        }
    }
  
    render() {
        let inputField;
        let submitButtonField;
        if (this.state.showInputField) {
            inputField=
                    <View style={styles.buttonsstyle}>
                        <TextInput
                            placeholder="Img"
                            value={this.state.imageName}
                            onChangeText={imageName => this.setState({imageName: imageName})}
                            style={styles.TextInputStyleClass}
                        />
                    </View>;
            submitButtonField=<TouchableHighlight style={[styles.buttonContainer, styles.submitButton]} onPress={this.submitButtonHandler}>
                            <Text style={styles.buttonText}>Submit</Text>
                        </TouchableHighlight>
            
        }
        
        return (
            <View style={styles.screen}>
                <ScrollView>
                    <Text style= {{ fontSize: 20, color: "#000", textAlign: 'center', marginBottom: 15, marginTop: 10 }}>Text Extracter</Text>

                    {this.state.capturedImage !== "" && <View style={styles.imageholder} >
                        <Image source={{uri : this.state.capturedImage}} style={styles.previewImage} />
                    </View>}
                    {inputField}
                    {this.state.isLoading && (
                        <ActivityIndicator
                            style={styles.Loader}
                            color="#C00"
                            size="large"
                        />
                    )}
                    <View>
                        {
                       this.state.detectedText.map((data, index) => {
                       return(
                           <Text key={index} style={styles.DetextTextView}>{data}</Text>
                       )})
                       }
                    </View>
                    <View style={styles.buttonsstyle}>
                        <TouchableHighlight style={[styles.buttonContainer, styles.captureButton]} onPress={this.captureImageButtonHandler}>
                            <Text style={styles.buttonText}>Capture Image</Text>
                        </TouchableHighlight>
                        {submitButtonField}
                    </View>
                </ScrollView>
            </View>
        );
    }
}

const styles = StyleSheet.create({
    Loader:{
        flex: 1,
        justifyContent: 'center',
        alignItems: 'center',
        height: "100%"
    },
    screen:{
        flex:1,
        justifyContent: 'center',
    },
    buttonsstyle:{
        flex:1,
        alignItems:"center"
    },
    DetextTextView:{
      textAlign: 'center',
    },
    TextInputStyleClass: {
      textAlign: 'center',
      marginBottom: 7,
      height: "70%",
      margin: 10,
      width:"80%"
    },
    inputContainer: {
      borderBottomColor: '#F5FCFF',
      backgroundColor: '#FFFFFF',
      borderRadius:30,
      borderBottomWidth: 1,
      width:"90%",
      height:45,
      marginBottom:20,
      flexDirection: 'row',
      alignItems:'center'
    },
    buttonContainer: {
      height:45,
      flexDirection: 'row',
      alignItems: 'center',
      justifyContent: 'center',
      borderRadius:30,
      margin: 5,
    },
    captureButton: {
      backgroundColor: "#337ab7",
      width: "90%",
    },
    buttonText: {
      color: 'white',
      fontWeight: 'bold',
    },
    horizontal: {
      flexDirection: 'row',
      justifyContent: 'space-around',
      padding: 10
    },
    submitButton: {
      backgroundColor: "#C0C0C0",
      width: "90%",
      marginTop: 5,
    },
    imageholder: {
      borderWidth: 1,
      borderColor: "grey",
      backgroundColor: "#eee",
      width: "50%",
      height: 150,
      marginTop: 10,
      marginLeft: 90,
      flexDirection: 'row',
      alignItems:'center'
    },
    previewImage: {
      width: "100%",
      height: "100%",
    }
});

export default Registration;

Below are the screenshots of the React Native App running on an Android device.
We used the below image to extract text and add bounding boxes.

The image name is generated dynamically with epoch time, which is editable.

I hope it was helpful, thanks for the read!

This story is authored by Dheeraj Kumar and Santosh Kumar. Dheeraj is a software engineer specializing in React Native and React based frontend development. Santosh specializes on Cloud Services based development.

Federated Querying across Relational, Non-relational, Object, and Custom Data Sources using Amazon Athena

Querying Data from DynamoDB in Amazon Athena

Amazon Athena now enables users to run SQL queries across data stored in relational, non-relational, object, and custom data sources. With federated querying, customers can submit a single SQL query that scans data from multiple sources running on-premises or hosted in the cloud.

Athena executes federated queries using Athena Data Source Connectors that run on AWS Lambda. Athena federated query is available in Preview in the us-east-1 (N. Virginia) region.

Preparing to create federated queries is a two-part process:

  1. Deploying a Lambda function data source connector.
  2. Connecting the Lambda function to a data source. 

I assume that you have at least one DynamoDB table in us-east-1 region.

Deploy a Data Source Connector

  • Open the Amazon Athena console and choose the Connect data source. This feature is available in the region us-east-1 only.
  • On the Connect data source console, choose Query a data source feature. And choose Amazon DynamoDB as a data source.
  • Choose Next
  • For the Lambda function, choose to Configure new function. It opens in the Lambda console in a new tab with information about the connector.
  • Under ApplicationSettings, provide the required information.
    1. AthenaCatalogName – A name for the Lambda function.
    2. SpillBucket – An Amazon S3 bucket in your account to store data that exceeds Lambda function response size limits.
    3. SpillPrefix – Data that exceeds Lambda function response size limits stores under the Spillbucket/Spillprefix.
  • Choose I acknowledge that this app creates custom IAM roles and choose Deploy.

Connect to a data source using a connector that deployed in the earlier step

  • Open the Amazon Athena console and choose the Connect data source. This feature is available in the region us-east-1 only.
  • On the Connect data source console, choose Query a data source feature. And choose Amazon DynamoDB as a data source and choose Next.
  • Configure the Lambda function, choose the name of the lambda that you created in the earlier step.
  • Configure Catalog name, enter a unique name to use for the data source in your SQL queries, such as dynamo_athena.
  • Choose Connect. Now the data source is available under the Data Sources section in Amazon Athena.

Querying Data using Federated Queries

To use this feature in preview, you must create an Athena workgroup named AmazonAthenaPreviewFunctionality and join that workgroup.

Create an Athena workgroup

  • Open the Amazon Athena console and choose Workgroup, and choose Create workgroup.
  • After creating a Workgroup, under Workgroup section select the created workgroup and choose Switch workgroup.
  • Select the Data source that was created in the earlier step in Athena. After choosing the data source, the DynamoDb tables are available in Athena in the default database.

Querying Data in Athena using SQL Queries

The following query is used to retrieve data from DynamoDB in Athena.

SELECT * FROM "data_source_connector"."database_name"."table_name";

Creating Athena table using CTAS with results of querying DynamoDB

The CTAS query looks like the following. Using the CTAS query, the format of data can be changed into the required format be it parquet, JSON, and CSV, etc.

CREATE TABLE database.table_name
WITH (
      external_location = 's3://bucket-name/data/',
      format = 'parquet')
AS 
SELECT * FROM "data_source_connector"."database_name"."table_name";

I hope this was helpful and look forward to your comments.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

Create Voice Driven HealthCare Chatbot in React Native Using Google DialogFlow

In our earlier blog post, we had built a Healthcare Chatbot, in React Native using Dialogflow API. This blog is an extension of the chatbot we built earlier. We shall learn how to add support for voice-based user interaction to that chatbot. Assuming you had followed our earlier blog and created the chatbot we will proceed further.

The first thing is to specify voice permission for accessing the device’s microphone to record the audio which will help the app to operate properly. For that we just need to add the below code in AndroidManifest.xml located at project-name > android > app > src > main location.

AndroidManifest.xml

<uses-permission android:name="android.permission.SYSTEM_ALERT_WINDOW"/>
<uses-permission android:name="android.permission.RECORD_AUDIO" />

We also need to make few changes in MainApplication.java file located at project-name > android > app > src > main > java > com > project-name location.
Modify the content in MainApplication.java as below.

MainApplicaiton.java

import android.app.Application;
import android.content.Context;
import com.facebook.react.ReactApplication;
import com.facebook.react.ReactNativeHost;
import com.facebook.react.ReactPackage;
import com.facebook.soloader.SoLoader;
import java.util.List;


// Additional packages which we need to add
import net.no_mad.tts.TextToSpeechPackage;
import com.oblador.vectoricons.VectorIconsPackage;
import com.facebook.react.shell.MainReactPackage;
import com.reactnativecommunity.rnpermissions.RNPermissionsPackage;
import com.wmjmc.reactspeech.VoicePackage;
import java.util.Arrays;

public class MainApplication extends Application implements ReactApplication {

  private final ReactNativeHost mReactNativeHost =
      new ReactNativeHost(this) {
        @Override
        public boolean getUseDeveloperSupport() {
          return BuildConfig.DEBUG;
        }
        // Replace your getPackages() method with below getPackages() method
        @Override
        protected List<ReactPackage> getPackages() {
          return Arrays.<ReactPackage>asList(
              new MainReactPackage(),
              new VectorIconsPackage(),
              new RNPermissionsPackage(),
              new VoicePackage(),
              new TextToSpeechPackage()
          );
        }

        @Override
        protected String getJSMainModuleName() {
          return "index";
        }
      };

  @Override
  public ReactNativeHost getReactNativeHost() {
    return mReactNativeHost;
  }

  @Override
  public void onCreate() {
    super.onCreate();
    SoLoader.init(this, /* native exopackage */ false);
    // initializeFlipper(this); // Remove this line if you don't want Flipper enabled
  }

  /**
   * Loads Flipper in React Native templates.
   *
   * @param context
   */

  // private static void initializeFlipper(Context context) {
  //   if (BuildConfig.DEBUG) {
  //     try {
  //       /*
  //        We use reflection here to pick up the class that initializes Flipper,
  //       since Flipper library is not available in release mode
  //       */
  //       Class<?> aClass = Class.forName("com.facebook.flipper.ReactNativeFlipper");
  //       aClass.getMethod("initializeFlipper", Context.class).invoke(null, context);
  //     } catch (ClassNotFoundException e) {
  //       e.printStackTrace();
  //     } catch (NoSuchMethodException e) {
  //       e.printStackTrace();
  //     } catch (IllegalAccessException e) {
  //       e.printStackTrace();
  //     } catch (InvocationTargetException e) {
  //       e.printStackTrace();
  //     }
  //   }
  // }
}

We have added import net.no_mad.tts.TextToSpeechPackage to help the app support text to speech conversion in order to receive the response not only as text but also as voice.

To capture voice input we have to add import com.wmjmc.reactspeech.VoicePackage.

If you intend to use react-native vector icons, you will be required to add import com.oblador.vectoricons.VectorIconsPackage.

The initialize flipper() method has to be commented to avoid encountering the error shown below:

We also need to comment the line initializeFlipper(this) in public void onCreate() { method.

Moving on from MainApplication.java,  we also need to modify our existing code in the App.js file.

Initially, we need to install some packages. To do so, navigate to your project folder in the terminal and execute the below command.

npm install react-native-elements react-native-vector-icons react-native-tts uuid react-native-android-voice --save

We are using react-native-elements to use the already built UI components. Use the react-native-vector-icons package if you want to use react-native vector icons.

React Native TTS is a text-to-speech library for react-native on iOS and Android.

Uuid package is used in order to generate UUID (unique id’s) which we would be using in the app.

react-native-android-voice is a speech-to-text library for react-native for the Android platform.

Now modify the content of the App.js file as below.
App.js:

import React, { Component, useEffect } from 'react';
import { View, StyleSheet } from 'react-native';
import { GiftedChat } from 'react-native-gifted-chat';
import { Dialogflow_V2 } from 'react-native-dialogflow';
import { dialogflowConfig } from './env';
import SpeechAndroid from 'react-native-android-voice';
import uuid from "uuid";
import Tts from 'react-native-tts';
import Icon from 'react-native-vector-icons/FontAwesome';
import { Button } from 'react-native-elements';

const BOT_USER = {
  _id: 2,
  name: 'Health Bot',
  avatar: 'https://previews.123rf.com/images/iulika1/iulika11909/iulika1190900021/129697389-medical-worker-health-professional-avatar-medical-staff-doctor-icon-isolated-on-white-background-vec.jpg'
};

var speak=0;

class App extends Component {
  state = {
    messages: [
      {
        _id: 1,
        text: 'Hi! I am the Healthbot 🤖.\n\nHow may I help you today?',
        createdAt: new Date(),
        user: BOT_USER,
      }
    ],
  };

  componentDidMount() {
    Dialogflow_V2.setConfiguration(
      dialogflowConfig.client_email,
      dialogflowConfig.private_key,
      Dialogflow_V2.LANG_ENGLISH_US,
      dialogflowConfig.project_id
    );
  }

  onSend(messages = []) {
    this.setState(previousState => ({
      messages: GiftedChat.append(previousState.messages, messages)
    }));

    let message = messages[0].text;
    Dialogflow_V2.requestQuery(
      message,
      result => this.handleGoogleResponse(result),
      error => console.log(error)
    );
  }

  handleGoogleResponse(result) {
    let receivedText = '';
    let splitReceivedText = '';
    var extractDate = '';
    const dateRegex = /\d{4}\-\d{2}\-\d{2}?/gm;

    receivedText = result.queryResult.fulfillmentMessages[0].text.text[0];
    splitReceivedText = receivedText.split('on')[0];
    extractDate = receivedText.match(dateRegex);

    if (extractDate != null) {
      var completeTimeValue = splitResponseForTime(receivedText);
      var timeValue = getTimeValue(completeTimeValue);
      function splitResponseForTime(str) {
        return str.split('at')[1];
      }

      function getTimeValue(str) {
        let time1 = str.split('T')[1];
        let hour = time1.split(':')[0];
        let min = time1.split(':')[1];
        return hour + ":" + min;
      }
      splitReceivedText = splitReceivedText + 'on ' + extractDate[0] + ' at ' + timeValue;
    }
    this.sendBotResponse(splitReceivedText);
  }

  sendBotResponse(text) {
    let msg = {
      _id: this.state.messages.length + 1,
      text,
      createdAt: new Date(),
      user: BOT_USER
    };

    this.setState(previousState => ({
      messages: GiftedChat.append(previousState.messages, msg)
    }));

    if (speak) {
      speak=0; 
      Tts.getInitStatus().then(() => {
        Tts.speak(text);
      }, (err) => {
        if (err.code === 'no_engine') {
          Tts.requestInstallEngine();
        }
      });
    }
  }

  uuidGen = () => {
    return uuid.v4();
  }

  micHandler = async () => {
    try {
      let textSpeech = await SpeechAndroid.startSpeech("Speak now", SpeechAndroid.ENGLISH);
      speak=1;
      let StateVariable = [{
        "text": textSpeech,
        "user": {
          "_id": 1
        },
        "createdAt": new Date(),
        "_id": this.uuidGen()
      }];
      await this.onSend(StateVariable);

      await ToastAndroid.show(spokenText, ToastAndroid.LONG);
    } catch (error) {
      switch (error) {
        case SpeechAndroid.E_VOICE_CANCELLED:
          ToastAndroid.show("Voice Recognizer cancelled", ToastAndroid.LONG);
          break;
        case SpeechAndroid.E_NO_MATCH:
          ToastAndroid.show("No match for what you said", ToastAndroid.LONG);
          break;
        case SpeechAndroid.E_SERVER_ERROR:
          ToastAndroid.show("Google Server Error", ToastAndroid.LONG);
          break;
      }
    }
  }


  render() {
    return (
      < View style={styles.screen} >
        <GiftedChat
          messages={this.state.messages}
          onSend={messages => this.onSend(messages)}
          user={{
            _id: 1
          }}
        />
        <Button
          icon={
            <Icon
              name="microphone"
              size={24}
              color="white"
            />
          }
          onPress={this.micHandler}
          />
        </View>
    );
  }
}

const styles = StyleSheet.create({
  screen: {
    flex: 1,
    backgroundColor: '#fff'
  },
});
export default App;

Now when we run the app using the react-native run-android command, we can see a microphone button that will enable speech to text feature in the app.

Once we click the mic button, it triggers micHandler() function which contains the logic to convert speech into text. This will automatically start recognizing and adjusting for the English language.

However, you can use different languages for speech. Read this for more information.

micHandler = async () => {
    try {
      let textSpeech = await SpeechAndroid.startSpeech("Speak now", SpeechAndroid.ENGLISH);
      speak=1;
      let messagesArray = [{
        "text": textSpeech,
        "user": {
          "_id": 1
        },
        "createdAt": new Date(),
        "_id": this.uuidGen()
      }];
      await this.onSend(messagesArray);
      await ToastAndroid.show(spokenText, ToastAndroid.LONG);
    } catch (error) {
      switch (error) {
        case SpeechAndroid.E_VOICE_CANCELLED:
          ToastAndroid.show("Voice Recognizer cancelled", ToastAndroid.LONG);
          break;
        case SpeechAndroid.E_NO_MATCH:
          ToastAndroid.show("No match for what you said", ToastAndroid.LONG);
          break;
        case SpeechAndroid.E_SERVER_ERROR:
          ToastAndroid.show("Google Server Error", ToastAndroid.LONG);
          break;
      }
    }
  }

micHandler() is an asynchronous function. Here, we will capture the data and store it in messagesArray array and send it as an argument to the onSend() function. As we need unique id for every message, we use uuidGen() function which will simply return unique id every time we call it.

If the response is successful, onSend() function will trigger handleGoogleResponse() function.
handleGoogleResponse() function is as shown below.

handleGoogleResponse(result) {
    let receivedText = '';
    let splitReceivedText = '';
    var extractDate = '';
    const dateRegex = /\d{4}\-\d{2}\-\d{2}?/gm;

    receivedText = result.queryResult.fulfillmentMessages[0].text.text[0];
    splitReceivedText = receivedText.split('on')[0];
    extractDate = receivedText.match(dateRegex);

    if (extractDate != null) {
      var completeTimeValue = splitResponseForTime(receivedText);
      var timeValue = getTimeValue(completeTimeValue);
      function splitResponseForTime(str) {
        return str.split('at')[1];
      }

      function getTimeValue(str) {
        let time1 = str.split('T')[1];
        let hour = time1.split(':')[0];
        let min = time1.split(':')[1];
        return hour + ":" + min;
      }
      splitReceivedText = splitReceivedText + 'on ' + extractDate[0] + ' at ' + timeValue;
    }
    this.sendBotResponse(splitReceivedText);
  }

In this method, we have written the logic which helps in structuring the response sent by Dialogflow API in the way we require. The structured text is then passed on to sendBotResponse() function to set the state of the messages array.

 sendBotResponse(text) {
    let msg = {
      _id: this.state.messages.length + 1,
      text,
      createdAt: new Date(),
      user: BOT_USER
    };

    this.setState(previousState => ({
      messages: GiftedChat.append(previousState.messages, msg)
    }));

    if (speak) {
      speak=0; 
      Tts.getInitStatus().then(() => {
        Tts.speak(text);
      }, (err) => {
        if (err.code === 'no_engine') {
          Tts.requestInstallEngine();
        }
      });
    }
  }

If the user is interacting with the bot using the voice, the response from the bot will not only be in the form of a text but also as a voice. That is handled using Tts.speak(text). It can take some time to initialize the TTS engine, and Tts.speak() will fail to speak until the engine is ready. To wait for successful initialization, we are using getInitStatus() call.

Below are the snapshots of the app running on an Android device.

That’s all. Thank you for the read!

This story is authored by Dheeraj Kumar and Santosh Kumar. Dheeraj is a software engineer specializing in React Native and React based frontend development. Santosh specializes on Cloud Services based development.

Efficiently Tagging AWS Resources Using CLI to Better Manage Resources and Billing Costs

It is common when organizations have large workloads based on on a multitude of AWS services, they may lose track of how resources are being used. In a nutshell, identifying resources can take rigorous effort. On AWS, utilization and cost go hand in hand and tagging helps ensure that the resources are managed efficiently. In fact, one could also build insightful reports/dashboards with the tags in place.

Tagging Strategy:

For tags to be effective at scale they need to be strategically managed. Many organizations group tags into different categories like technical, business, security and automation, etc. A typical set of tags could be:

  1. Name
  2. Owner
  3. Application/Project/Product
  4. Environment
  5. Client/Customer

For more on creative tagging strategies, please read this.

Prerequisites: AWS CLI configured.

Getting all untagged resources using CLI:

As of this writing, there is no CLI command to list all untagged resources. One could follow the below steps to get the list.

Step1: List all the resources in AWS and write them to a text file

aws lambda list-functions --profile PROFILE_NAME &>> resourcesList.txt

Note: The above command is for listing details of lambda resources. The command and its output might vary with other resources. Read more here.

&>> appends the output of the command to resourcesList.txt file in the current working directory.

The output of the above command is a JSON object that looks like this:

{
    "Functions": [
        {
            "FunctionName": "Chat-Conversation-POST",
            "FunctionArn": "arn:aws:lambda:us-west-2:89XXXXXXXX14:function:Chat-Conversation-POST",
            "Runtime": "nodejs8.10",
            "Role": "arn:aws:iam::89XXXXXXXX14:role/chat-lambda-data",
            "Handler": "index.handler",
            "CodeSize": 474,
            "Description": "",
            "Timeout": 15,
            "MemorySize": 128,
            "LastModified": "2019-05-02T13:20:53.887+0000",
            "CodeSha256": "h1bxXaXXXXXXxxxxxxxxxXxxXxxxxxxXXXxxxxxxmGg=",
            "Version": "$LATEST",
            "TracingConfig": {
                "Mode": "PassThrough"
            },
            "RevisionId": "f447bca3-06f9-49d8-8a5d-c740f6aec405"
        },
        {
            "FunctionName": "Chat-Conversation-GET",
            "FunctionArn": "arn:aws:lambda:us-west-2:89XXXXXXXX14:function:Chat-Conversation-GET",
            "Runtime": "nodejs8.10",
            "Role": "arn:aws:iam::89XXXXXXXX14:role/service-role/chat-lambda-data",
            "Handler": "index.handler",
            "CodeSize": 785,
            "Description": "",
            "Timeout": 25,
            "MemorySize": 128,
            "LastModified": "2019-05-04T14:23:07.002+0000",
            "CodeSha256": "h1bxXaXXXXXXxxxxxxxxxXxxXxxxxxxXXXxxxxxxmGg=",
            "Version": "$LATEST",
            "VpcConfig": {
                "SubnetIds": [],
                "SecurityGroupIds": [],
                "VpcId": ""
            },
            "TracingConfig": {
                "Mode": "PassThrough"
            },
            "RevisionId": "210dd3fa-ba47-4e06-ab53-e34aa793b344"
        }
    ]
}

Now, one could either use multiple selection (ctrl+d) in Sublime or a python script to extract the list of resource ARN/names.

Step2: Iterate this list of resource names, and fetch tagging details for all of them & append the output of these commands to a file.

echo RESOURCE_NAME: &>> tagsList.txt

aws lambda list-tags --resource arn:aws:lambda:us-west-2:89XXXXXXXX14:function:RESOURCE_NAME --profile PROFILE_NAME &>> tagsList.txt

The output of the above command is also a JSON object:

{
    "Tags": {}
}

As you can see there is no name attribute here so, we add the resource name before command output using echo:

RESOURCE_NAME:{
    "Tags": {}
}

Let us say, the resource names we have got in resourcesList.txt are as follows:

  • new-client-acquisition
  • initiate-raw-file-ingestion
  • initiate-raw-crawler
  • raw-refined-transform
  • initiate-refined-crawler
  • check-status

Creating commands for the above resources in sublime:

Step3: Extract resources with no tags from the tagsList.txt file.

Untagged = all – tagged

From the resourcesList.txt we get all the resource names, and from the tagsList.txt we get all tagged resources. You could use both these lists to get the untagged resources.

Step4: Preparing and Updating the tags

aws lambda tag-resource --resource arn:aws:lambda:us-west-2:89XXXXXXXX14:function:RESOURCE_NAME --tags Environment=prod,Project=sales,Name=RESOURCE_NAME --profile PROFILE_NAME

Create multiple commands for each resource name with the above template.

Once you create all the commands just copy and paste them in the terminal. That would update all the resources with new tags.

This is pretty much the steps involved in tagging resources, maybe a few tweaks have to be made depending on the AWS service.

Note: Output of all the above commands are executed with default region name specified in the AWS CLI profile, if not specified in the command.

One other way of tagging resources on AWS is using Tag Editor in Resource Groups. I found it hard to work with as one couldn’t sophisticatedly search, filter or group resource names.

I hope it was helpful. For any queries or if you know a better way of tagging let us know in the comment section. Happy to discuss it further.

Thank-you!

This story is authored by Koushik. Koushik is a software engineer and a keen data science and machine learning enthusiast.

How to Customize QuickSight Dashboards for User Specific Data

We have been getting a lot of queries on how to customize a single QuickSight dashboard for user specific data. We can accomplish this by filtering the dashboard data with login username using AWS QuickSight’s Row-Level Security. To further explain this use-case, let’s consider the sales department in a company. Every day your team of sales agents contacts a list of potential customers. Now you need a single dashboard that is accessed by all the agents but only displays the list of prospects he or she is assigned to.

Note: This is completely different from filter/controls on QuickSight dashboards. If you have filters/controls/parameters set up with dynamic values being picked up from the dataset, then even that data is filtered with Row-Level security, as the underlying dataset itself is filtered with the login username.

Let’s get on with the show! I have created a hypothetical data set. This dataset has a column named assigned-agent which shall be used for filtering.

Using this dataset, I have created a dashboard that looks like below.

This dashboard is shared with two other IAM users (sales agents).

As we haven’t set up any rules both of them can access whole data.

As you can see ziva, could also access whole data and we don’t want that!

Our requirement:

User NameAgent NamePermissions
nickNick HoweCan access only his prospects
zivaZiva MedalleCan access only her prospects
managerNASuper user, can access all prospects

Creating Data Set Rules for Row-Level Security:

Create a file or a query that contains the data set rules (permissions).

It doesn’t matter what order the fields are in. However, all the fields are case-sensitive. They must exactly match the field names and values.

The structure should look similar to one of the following. You must have at least one field that identifies either users or groups. You can include both, but only one is required, and only one is used at a time. If you are specifying groups, use only Amazon QuickSight groups or Microsoft AD groups.

The following example shows a table with user names.

UserNameagent_assigned
nickNick Howe
zivaZiva Medalle
managerNick Howe,Ziva Medalle

For SQL:

/* for users*/
select User as UserName, Agent as agent_assigned
from permissions_table;

Or if you prefer to use a .csv file:

UserName,agent_assigned
"nick","Nick Howe"
"ziva","Ziva Medalle"
"manager","Nick Howe,Ziva Medalle"

Here agent_assigned is a column in the dataset, and UserName is the same as QuickSight login name.

What we are essentially doing is mapping UserName with the agent_assigned column. Let’s suppose ziva has logged in, only those records with condition agent_assigned = Ziva Medalle are picked up. Same is the case with nick.

But in the case of the manager, we want him to be a superuser, so we added all the agent names (all values of agent_assigned column).

Note: If you are using an Athena or an RDS or a Redshift or an S3 CSV file-based dataset, just make sure the output format/structure of those sources matches the above-mentioned formats.

Create Permissions Data Set:

Create a QuickSight dataset with the above data set rules. Go to Manage data, choose New data set, choose source and create accordingly. As mine is a CSV, I have just uploaded it. To make sure that you can easily find it, give it a meaningful name, for example in my case Permissions-prospects-list.

After finishing, Refresh the page as it might not appear in the data sources list while applying it to the dataset.

Creating Row-Level Security: 

Choose Permissions, From the list choose the permissions dataset that you have created earlier.

Choose the Apply data set.

Once you have applied, you should be seeing the dataset has a new lock symbol on it saying restricted.

That’s it. Now the data is filtered/secured based on username.

Manager’s Account:

Ziva’s Account:

Nick’s Account:

You could also add Users to Groups and have permissions set at the group level. More information here.

I hope it was helpful, any queries drop them in the comments section.

Thanks for the read!

This story is authored by Koushik. Koushik is a software engineer and a keen data science and machine learning enthusiast.

Machine Learning based Fuzzy Matching using AWS Glue ML Transforms

Machine Learning Transforms in AWS Glue

Machine Learning Transforms in AWS Glue

AWS Glue provides machine learning capabilities to create custom transforms to do Machine Learning based fuzzy matching to deduplicate and cleanse your data. For this we are going to use a transform named FindMatches. The FindMatches transform enables you to identify duplicate or matching records in your dataset, even when the records do not have a common unique identifier and no fields match exactly. This will not require writing any code or knowing how machine learning works. For more details about ML Transforms, please go through the docs.

Creating a Machine Learning Transform with AWS Glue

This article walks you through the actions to create and manage a machine learning (ML) transform using AWS Glue. I assume that you are familiar with using the AWS Glue console to add crawlers and jobs and edit scripts. You should also be familiar with finding and downloading files on the Amazon Simple Storage Service (Amazon S3) console.

In case you are just starting out on AWS Glue, I have explained how to create an AWS Glue Crawler and Glue Job from scratch in one of my earlier articles.
The source data used in this blog is a hypothetical file named customers_data.csv. A second file, label_file.csv, is an example of a labeling file that contains both matching and nonmatching records used to teach the transform.

Step 1: Crawl the Data using AWS Glue Crawler

At the outset, crawl the source data from the CSV file in S3 to create a metadata table in the AWS Glue Data Catalog. I created a crawler pointing to the source location (s3://bucketname/data/ml-transform/customers/).

In case you are just starting out on the AWS Glue crawler, I have explained how to create one from scratch in one of my earlier articles. If you run this crawler, it creates a customers table in the specified database (ml-transform).

Step 2: Add a Machine Learning Transform

Next, add a machine learning transform that is based on the schema of your data source table created by the above crawler.

  • On the AWS Glue console, in the navigation pane, choose ML Transforms, Add transform.
    1. For transform name, enter ml-transform. This is the name of the transform that is used to find matches in the source data.
    2. Choose an IAM role that has permission to access Amazon S3 and AWS Glue API operations.

Choose Worker type and Maximum capacity as per the requirements.
3. For Data source, choose the table that was created in the earlier step. In this, the table named customers in database ml-transform.
4. For Primary key, choose the primary key column for the table, email.

  • Choose Finish.

Step 3: How to Teach Your Machine Learning Transform

Next, teach the machine learning transform using the sample labeling file.
You can’t use a machine language transform in an extract, transform, and load (ETL) job until its status is Ready for use. To get your transform ready, you must teach it how to identify matching and non-matching records by providing examples of matching and non-matching records. To teach your transform, you can Generate a label file, add labels, and then Upload label file.

For this article, the label file I have used is label_file.csv

  • On the AWS Glue console, in the navigation pane, choose ML Transforms.
  • Choose the earlier created transform, and then choose Action, Teach.
  • If you don’t have the label file, choose I do not have labels, you can Generate a label file, add labels, and then Upload label file.

If you have the label file, choose I have labels, then choose Upload labelling file from S3.
Choose an Amazon S3 path to the sample labeling file in the current AWS Region. (s3://bucketname/data/ml-transform/labels/label_file.csv) with the option to overwrite existing labels. The labeling file must be located in S3 in the same Region as the AWS Glue console.

When you upload a labeling file, a task is started in AWS Glue to add or overwrite the labels used to teach the transform how to process the data source.

  • Choose Finish, and return to the ML transforms list.

Step 4: Estimate the Quality of ML Transform

What is Labeling?

The act of labeling is creating a labeling file (such as in a spreadsheet) and adding identifiers, or labels, into the label column that identifies matching and non-matching records. It is important to have a clear and consistent definition of a match in your source data. AWS Glue learns from which records you designate as matches (or not) and uses your decisions to learn how to find duplicate records.

Next, you can estimate the quality of your machine learning transform. The quality depends on how much labeling you have done.

  • On the AWS Glue console, in the navigation pane, choose ML Transforms.
  • Choose the earlier created transform, and choose the Estimate quality tab. This tab displays the current quality estimates, if available, for the transform.
  • Choose Estimate quality to start a task to estimate the quality of the transform. The accuracy of the quality estimate is based on the labeling of the source data.
  • Navigate to the History tab. In this pane, task runs are listed for the transform, including the Estimating quality task. For more details about the run, choose Logs. Check that the run status is Succeeded when it finishes.

Step 5: Create and Run a Job with ML Transform

In this step, we use your machine learning transform to add and run a job in AWS Glue. When the transform is Ready for use, we can use it in an ETL job.

On the AWS Glue console, in the navigation pane, choose Jobs.

Choose Add job.

In case you are just starting out on AWS Glue ETL Job, I have explained how to create one from scratch in one of my earlier articles.

  • For Name, choose the example job in this tutorial, ml-transform.
  • Choose an IAM role that has permission to access Amazon S3 and AWS Glue API operations.
  • For ETL language, choose Spark 2.2, Python 2. Machine learning transforms are currently not supported for Spark 2.4.
  • For Data source, choose the table created in Step 1. The data source you choose must match the machine learning transform data source schema.
  • For Transform type, choose to Find matching records to create a job using a machine learning transform.
  • For Transform, choose transform created in step 2, the machine learning transform used by the job.
  • For Create tables in your data target, choose to create tables with the following properties.
    • Data store type — Amazon S3
    • Format — CSV
    • Compression type — None
    • Target path — The Amazon S3 path where the output of the job is written (in the current console AWS Region)

Choose Save job and edit script to display the script editor page. The script looks like the following. After you edit the script, choose Save.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglueml.transforms import FindMatches

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "ml_transforms", table_name = "customers", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "ml_transforms", table_name = "customers", transformation_ctx = "datasource0")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "ml_transforms", table_name = "customers", transformation_ctx = "resolvechoice1"]
## @return: resolvechoice1
## @inputs: [frame = datasource0]
resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "MATCH_CATALOG", database = "ml_transforms", table_name = "customers", transformation_ctx = "resolvechoice1")
## @type: FindMatches
## @args: [transformId = "eacb9a1ffbc686f61387f63", emitFusion = false, survivorComparisonField = "<primary_id>", transformation_ctx = "findmatches2"]
## @return: findmatches2
## @inputs: [frame = resolvechoice1]
findmatches2 = FindMatches.apply(frame = resolvechoice1, transformId = "eacb9a1ffbc686f61387f63", transformation_ctx = "findmatches2")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://bucket-name/data/ml-transforms/output/"}, format = "csv", transformation_ctx = "datasink3"]
## @return: datasink3
## @inputs: [frame = findmatches2]
datasink3 = glueContext.write_dynamic_frame.from_options(frame = findmatches2, connection_type = "s3", connection_options = {"path": "s3:/<bucket-name>/data/ml-transforms/output/"}, format = "csv", transformation_ctx = "datasink3")
job.commit()

Choose Run job to start the job run. Check the status of the job in the jobs list. When the job finishes, in the ML transform, History tab, there is a new Run ID row added of type ETL job. 

Navigate to the Jobs, History tab. In this pane, job runs are listed. For more details about the run, choose Logs. Check that the run status is Succeeded when it finishes.

Step 6: Verify Output Data from Amazon S3 in Amazon Athena

In this step, check the output of the job run in the Amazon S3 bucket that you chose when you added the job. You can create a table in the Glue Data catalog pointing to the output location, just like the way we crawled the source data in Step 1. You can then query the data in Athena.

However, the Find matches transform adds another column named match_id to identify matching records in the output. Rows with the same match_id are considered matching records.

If you don’t find any matches, you can continue to teach the transform by adding more labels.

Thanks for the read and look forward to your comments

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

Creating a Chatbot for Healthcare in React Native using Dialogflow

In this blog, we shall learn how to build an AI virtual assistant or a Chatbot using React Native and Dialogflow API.

Why are chatbots important?
A chatbot is a piece of software that helps in conducting a conversation through voice based or textual methods. Chatbots offer companies new opportunities to improve the customer engagement process and operational efficiency by reducing the typical cost of customer service.

Image result for dialogflow

What is Dialogflow?
Dialogflow (previously known as API.AI) is a Natural Language Processing (NLP) platform which can be greatly helpful to build conversational applications for a company’s customers in various languages and also across multiple platforms. Dialogflow enables developers to create text-based and voice conversation interfaces for responding to customer queries in different languages.

Why Dialogflow?
There are different chatbot SDK’s like Dialogflow, Amazon Lex, IBM Watson, Microsoft Bot Framework etc. The reasons to why we chose to use Dialogflow are:

  1. Dialogflow supports multiple platforms.
  2. Dialogflow supports all the devices like wearables, phones and other devices.
  3. Dialogflow also supports multiple languages.

How Dialogflow works?
In Dialogflow, the typical flow of any conversation involves these steps:

  1. The user providing an input.
  2. Dialogflow agent parsing that input based on the intent.
  3. Agent returning a response to the user.

Setting up Dialogflow account:

Navigate to console in the official website. After navigating to console you will be prompted to sign in with Google, go ahead and sign-in. After successfully signing in you can see a dashboard.

Before we dive into the platform and start building the bot/agent, let us learn about the terms used in Dialogflow.

After signing in, you could see a Create Agent tab. An agent is nothing but the bot that you would like to create. Give a name of your choice and click on the Create button. After creating successfully you could see multiple tabs on the left side of the screen like:

  1. Intents
  2. Entities
  3. Fulfillment etc

Intents:
An Intent is a specific action that the user can invoke by using one of the defined terms in the Dialogflow console. 

For example, the user could ask “What’s the time?” or “What is today’s date?” if these terms are defined within the console, then they will be detected by Dialogflow and intents that are defined under will get triggered.

You can create an intent by clicking on create intent as shown below.

You shall see some default intents already available. We can create the new intents here.

Entities:
An Entity is a property which can be used by Dialogflow to answer the request from the user. The entity will usually be a keyword within the request such as a name, date, time etc. 

Dialogflow has a rich set of predefined entities and also has an option that enables the developer to define custom entities as well.

Fulfillment:
When the user provides the input, Dialogflow needs to process the user input which might contain entities as well. Hence Dialogflow needs to request the information from web-hook so as to fulfill the users request. The input provided by the user along with entities is then sent to the web-hook so that the required information can be retrieved. Once the Dialogflow receives the information from web-hook it sends the response back to the user in the desired manner.

For example, if the user wants to know about weather conditions, a web-hook could be used to get info about weather and pass it on to the user.

Response:
It is the content which Dialogflow sends back to the user once the user’s query is processed.

Creating a ChatBot for Health care:

Now that we have learnt about some basic terms of Dialogflow, let us start building a chatbot (in this case Healthbot) which helps the user (patient) to schedule an appointment with a specific doctor in an organization.

Let’s go ahead and create an agent first. Here we are creating an agent with the name HealthBot.

After clicking the create button, the HealthBot agent would be created. It would look like below.

You could see some default intents there. We can create our own intents here. So, let’s move forward and start creating the intents.

The intent we will be creating here is “Schedule an Appointment”.

Save the intent after creating. In the Training Phrases section, we can add our own training phrases to train the agent.

When we add a particular training phrase , Dialogflow would look for predefined entities in the phrase, if found it will highlight them as shown.

Add few other relative training phrases and click on save.

Next in the Action and Parameters section we can make the @sys.person, @sys.date, @sys.time as required by checking on the Required checkbox. We can also define the prompts for the required fields so that if the user does not provide any one of them the defined prompt will be shown up asking the user to provide the required parameters.

The prompts for the entities could be defined by clicking define prompts under Prompts. Below are the prompts for the respective entities.

Next we have to add the response in the Response section.

After receiving all the required parameters from the user , we can phrase a response like shown.

Now we have to create a front end app using React Native which would communicate with the HealthBot agent.

Let’s go to React Native Docs, select React Native CLI Quickstart and select the appropriate development OS and the target OS as Android, as we are going to build an android application. 

Follow the docs for installing dependencies, then create a new react native application. Use the command line interface to create a new react native project.

react-native init <project-name>

By using the below commands you can run the app on android device. You could see the default welcome page. 

cd <project-name>
Npm install
React-native run-android

Note:  If you face an issue like “Failed to install the app. Make sure you have the Android development environment set up”, just traverse to <project-name>/android folder and create a file named local.properties and add the Android SDK path in it as shown here.

sdk.dir = Your Android SDK Path

We also need to install some dependencies using below command.

npm install react-native-gifted-chat
react-native-dialogflow -save

We are using react-native-gifted-chat package as it provides a customizable and complete chat UI interface.

We are also using react-native-dialogflow so that we can bridge our app with Google Dialogflow’s SDK. 

For our app to communicate with Dialogflow agent, we need to configure few things. For that create any .js file in your project root folder (in this env.js).
We need to configure few values in env.js file.

To get the values click on the Service Account link as shown in the image.
You can get this by clicking on the gear icon present beside the agent name on the left side of the screen.

After clicking the link , you would be shown a table called Service accounts for project “<Agent Name>”. Click on Actions and select create key option from there. A prompt will appear asking to choose an option. Select JSON and click on create. A json file would be downloaded. Just copy the contents of the json file and add it in env.js.

Your env.js file would look like below.

env.js

export const dialogflowConfig = {
  "type": "service_account",
  "project_id": "Health-bot",
  "private_key_id": "xxxx",
  "private_key": "-----BEGIN PRIVATE KEY-----\n xxxx\n-----END PRIVATE KEY-----\n",
  "client_email": "xxxx",
  "client_id": "xxxx",
  "auth_uri": "xxxx",
  "token_uri": "xxxx",
  "auth_provider_x509_cert_url": "xxxx",
  "client_x509_cert_url": "xxxx"
}

Now go to <project-name> directory and open App.js. Modify the content of App.js as below.

App.js

import React, { Component } from 'react';
import {View} from 'react-native';
import { GiftedChat } from 'react-native-gifted-chat';
import { Dialogflow_V2 } from 'react-native-dialogflow';
import { dialogflowConfig } from './env';

const BOT_USER = {
  _id: 2,
  name: 'Health Bot',
  avatar: 'https://previews.123rf.com/images/iulika1/iulika11909/iulika1190900021/129697389-medical-worker-health-professional-avatar-medical-staff-doctor-icon-isolated-on-white-background-vec.jpg'
};
class App extends Component {

  state = {
    messages: [
      {
        _id: 1,
        text: 'Hi! I am the Healthbot 🤖.\n\nHow may I help you today?',
        createdAt: new Date(),
        user: BOT_USER
      }
    ]
  };

  componentDidMount() {
    Dialogflow_V2.setConfiguration(
      dialogflowConfig.client_email,
      dialogflowConfig.private_key,
      Dialogflow_V2.LANG_ENGLISH_US,
      dialogflowConfig.project_id
    );
  }

  onSend(messages = []) {
    this.setState(previousState => ({
      messages: GiftedChat.append(previousState.messages, messages)
    }));

    let message = messages[0].text;
    Dialogflow_V2.requestQuery(
      message,
      result => this.handleGoogleResponse(result),
      error => console.log(error)
    );
  }

  handleGoogleResponse(result) {
    let text = result.queryResult.fulfillmentMessages[0].text.text[0];
    this.sendBotResponse(text);
  }

  sendBotResponse(text) {
    let msg = {
      _id: this.state.messages.length + 1,
      text,
      createdAt: new Date(),
      user: BOT_USER
    };

    this.setState(previousState => ({
      messages: GiftedChat.append(previousState.messages, [msg])
    }));
  }

  render() {
    return (
      <View style={{ flex: 1, backgroundColor: '#fff' }}>
        <GiftedChat
          messages={this.state.messages}
          onSend={messages => this.onSend(messages)}
          user={{
            _id: 1
          }}
        />
      </View>
    );
  }
}
export default App;

When the App.js file renders, the first thing it renders is componentDidMount() where we set the configuration of Dialogflow as given below.


componentDidMount() {
    Dialogflow_V2.setConfiguration(
      dialogflowConfig.client_email,
      dialogflowConfig.private_key,
      Dialogflow_V2.LANG_ENGLISH_US,
      dialogflowConfig.project_id
    );
  }

When you click on send , it will trigger the onSend() method where the user message gets stored in the state variable and we will send a request to Dialogflow using Dialogflow_V2.requestQuery. If the response is successful, handleGoogleResponse() method gets triggered.

onSend(messages = []) {
    this.setState(previousState => ({
      messages: GiftedChat.append(previousState.messages, messages)
    }));

    let message = messages[0].text;
    Dialogflow_V2.requestQuery(
      message,
      result => this.handleGoogleResponse(result),
      error => console.log(error)
    );
  }

handleGoogleResponse() will get the text from the response and triggers sendBotResponse() method where it will set the state to response as shown below

handleGoogleResponse(result) {
    let text = result.queryResult.fulfillmentMessages[0].text.text[0];
    this.sendBotResponse(text);
  }

  sendBotResponse(text) {
      let msg = {
        _id: this.state.messages.length + 1,
        text,
        createdAt: new Date(),
        user: BOT_USER
      };

      this.setState(previousState => ({
        messages: GiftedChat.append(previousState.messages, [msg])
      }));
    }

Below are the images of the app running on an Android device.

That’s it folks, we hope it was fun and useful.

This story is authored by Dheeraj Kumar and Santosh Kumar. Dheeraj is a software engineer specializing in React Native and React based frontend development. Santosh specializes on Cloud Services based development.

Processing High Volume Big Data Concurrently with No Duplicates using AWS SQS

In this blog post, we’ll be looking at how one could leverage AWS Simple Queue Service (Standard queue) to achieve high concurrency while processing with no duplicates. Also we compare it with other AWS services like DynamoDB, SQS FIFO queue and Kinesis in terms of cost and performance.

A simple use case for the below architecture could be building an end-end messaging service, or sending out transactional emails. In both the above use cases, a highly concurrent processing with no duplicates is needed.

Using AWS SQS with Lambda to process Big data concurrently with no duplicates

We have a Lambda function that writes messages to the Standard queue in batches. This writer function is invoked when a file is posted to S3. While there are messages in the queue, Lambda polls the queue, reads messages in batches and invokes the Processor function synchronously with an event that contains queue messages. The processing function is invoked once for each batch. When the function successfully processes a batch, Lambda deletes its messages from the queue. If at all the function fails processing(raise error) the batch is put back in the queue. Now, the Standard queue is configured with redrive policy to move messages to a Dead Letter Queue (DLQ) when receive request reaches the Maximum receive count(MRC). We set the MRC to 1 to ensure deduplication.

Setting up Standard Queue with Dead Letter Queue

We need two queues one for processing, second for moving failed messages into it. First create the failed_messages queue. As it is needed while creating the message processing queue. Create a new queue, give it a name (failed_messages), select type as Standard and choose Configure Queue

According to the needs, set the queue attributes like visibility timeout, message retention period etc.

For processing messages, Create a new queue, give it a name, select type as standard and choose Configure Queue.

Set the Default Visibility Timeout to 5min and Dead Letter Queue Settings to setup the redrive policy to move failed messages into failed_messages queue created earlier.

From the SQS homepage, select processing queue, and select Redrive Policy, If setup correctly you should see the ARN of failed_messages queue there.

Creating the Writer and Processor lambda functions:

Writer.py

# Write batch messages to queue
import csv
import boto3

s3 = boto3.resource('s3')

# Update this dummy URL
processing_queue_url = "https://sqs.us-west-2.amazonaws.com/85XXXXXXX205/ToBeProcessed"

def lambda_handler(event, context):
    try:
        if 'Records' in event:
            bucket_name = event['Records'][0]['s3']['bucket']['name']        
            key = event['Records'][0]['s3']['object']['key']
            bucket = s3.Bucket(bucket_name)
            obj = bucket.Object(key=key)

            # get the object
            response = obj.get()['Body'].read().decode('utf-8').split('\n')
            resp = list(response)
            if resp[-1] == '':
                #removing header metadata and extra newline
                total_records = len(resp) - 2 
            else:
                #removing header metadata
                total_records = len(resp) - 1 
            print("total record count is :", total_records)

            batch_size = 0
            record_count = 0
            messages = []

            # Write to SQS
            for row in csv.DictReader(response):
                record_count += 1
                record = {}
                for k,v in row.items():
                    record[k] = v

                # Replace below with appropriate column with all values as unique
                unique_id = record['ANY_COLUMN_WITH_ALL_VALUES_UNIQUE']
                
                batch_size += 1
                messages.append(
                {
                    'Id': unique_id,
                    'MessageBody': json.dumps(record)
                })
                   
                if (batch_size == 10):
                    batch_size = 0
                    try:
                        response = sqs.send_message_batch(
                            QueueUrl = processing_queue_url,
                            Entries = messages
                        )
                        print("response:", response)
                        if 'Failed' in response:
                            print('failed_count:', len(response['Failed']))
                    except Exception as e:
                        print("error:",e)
                    messages = []
                
                # Handling last batch
                if(record_count == total_records):
                    print("batch size is :", batch_size)
                    batch_size = 0
                    try:
                        response = sqs.send_message_batch(
                            QueueUrl = processing_queue_url,
                            Entries = messages
                        )
                        print("response:", response)
                        if 'Failed' in response:
                            print('failed count is :', len(response['Failed']))
                    except Exception as e:
                        print("error:",e)
                    messages = []    
        
        print('record count is :', record_count)

    except Exception as e:
        return e

Processor.py

# Process queue messages

def handler(event, context):
    if 'Records' in event:
        try:
            messages = event['Records']
            for message in messages:
                print("message to be processed :", message)
                
                result = message['body']
                result = json.loads(result)

                print("result:",result)
            return {
                'statusCode': 200,
                'body': 'All messages processed successfully'
            }

        except Exception as e:
            print(e)
            return str(e)

Setting up S3 as trigger to Writer lambda

Setting up SQS trigger to processor Lambda

If set up properly, you should be able to view it in Lambda Triggers section from the SQS homepage like this.

The setup is done. To test this upload a .csv file to the S3 location.

SQS Standard Queue in comparison with FIFO queue

FIFO queue in SQS supports deduplication in two ways:

  1. Content based deduplication while writing to SQS.
  2. Processing one record/batch at a time. 

Unlike Standard Queue, FIFO doesn’t support concurrency and lambda invocation. On top of all this there is a limit to how many messages you could write to FIFO queue in a second. FIFO queues are much suited when the order of processing is important.

Cost analysis:
First 1 million Amazon SQS requests are free each month.

TypeCost per 1 million requests
Standard Queue$0.40
FIFO Queue$0.50

More on SQS pricing here.

SQS Standard Queue in comparison with DynamoDB

DynamoDB streams are slow when compared SQS, and costs on various aspects like:

  1. Data Storage
  2. Writes
  3. Reads
  4. Provisioned throughput
  5. Reserved capacity
  6. Indexed data storage
  7. Streams and many more.

In a nutshell, DynamoDB’s monthly cost is dictated by data storage, writes and reads. The best use cases for DynamoDB are those that require a flexible data model, reliable performance, and the automatic scaling of throughput capacity.

SQS Standard Queue in comparison with Kinesis

Kinesis primary use case is collecting, storing and processing real-time continuous data streams. Kinesis is designed for large scale data ingestion and processing, with the ability to maximise write throughput for large volumes of data.

While a message queue makes it easy to decouple and scale micro-services, distributed systems, and serverless applications. Using a queue, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be always available. In a nutshell, Serverless applications are built using micro services, message queue serves as a reliable plumbing.

Drawbacks of Kinesis:

  1. Shard management
  2. Limited Read Throughput

For a much detailed comparison of SQS and Kinesis visit here.

Thanks for the read, I hope it was helpful.

This story is authored by Koushik. Koushik is a software engineer and a keen data science and machine learning enthusiast.