Object Detection in React Native App using TensorFlow.js

In this post, we are going to build a React Native app for detecting objects in an image using TensorFlow.js.

TensorFlow.js is a JavaScript library for training and deploying machine learning models in the browser and in Node.js. It provides many pre-trained models that ease the time-consuming task of training a new machine learning model from scratch.

Solution Architecture:

The captured image or selected from the file system is sent to API Gateway where it triggers the Lambda function which will store the image in S3 Bucket and returns the stored image URL.

Installing Dependencies:

Go to React Native Docs, select React Native CLI Quickstart. As we are building an android application, select Development OS and the Target OS as Android.

Follow the docs for installing dependencies, then create a new React Native Application. Use the command-line interface to generate a new React Native project called ObjectDetection.

npx react-native init ObjectDetection

Preparing the Android device:

We shall need an Android device to run our React Native Android app. If you have a physical Android device, you can use it for development by connecting it to your computer using a USB cable and following the instructions here.

Now go to the command line and run following command inside your React Native app directory.

cd ObjectDetection
react-native run-android

If everything is set up correctly, you should see your new app running on your physical device. Next, we need to install the react-native-image-picker package to capture or select an image. To install the package run the following command inside the project directory.

npm install react-native-image-picker --save

We would also need a few other packages as well. To install them run the following commands inside the project directory.

npm install expo-permissions --save
npm install expo-constants --save
npm install jpeg-js --save

The expo-permissions package, allows us to use prompt for various permissions to access device sensors, device cameras, etc.

The expo-constants package provides system information that remains constant throughout the lifetime of the app.

The jpeg-js package is used to decode the data from the image.

Integrating TensorFlow.js in our React Native App:

Follow this link to integrate TensorFlow.js in our React Native App. After that, we must also install @tensorflow-models/mobilenet. To install these run the following command inside the project directory.

npm install @tensorflow-models/mobilenet --save

We also need to set up an API in the AWS console and also create a Lambda function which will store the image in S3 Bucket and will return the stored image URL.

API Creation in AWS Console:

Before going further, create an API in your AWS console following Working with API Gateway paragraph in the following post:

https://medium.com/zenofai/serverless-web-application-architecture-using-react-with-amplify-part1-5b4d89f384f7

Once you are done with creating API come back to the React Native application. Go to your project directory and replace your App.js file with the following code.

App.js

import React from 'react';
import {
  StyleSheet,
  Text,
  View,
  ScrollView,
  TouchableHighlight,
  Image
} from 'react-native';
import * as tf from '@tensorflow/tfjs';
import * as mobilenet from '@tensorflow-models/mobilenet';
import { fetch } from '@tensorflow/tfjs-react-native';
import Constants from 'expo-constants';
import * as Permissions from 'expo-permissions';
import * as jpeg from 'jpeg-js';
import ImagePicker from "react-native-image-picker";
import Amplify, { API } from "aws-amplify";

Amplify.configure({
  API: {
    endpoints: [
      {
        name: "<Your-API-Name>",
        endpoint: "<Your-API-Endpoint>"
      }
    ]
  }
});

class App extends React.Component {

  state = {
    isTfReady: false,
    isModelReady: false,
    predictions: null,
    image: null,
    base64String: '',
    capturedImage: '',
    imageSubmitted: false,
    s3ImageUrl: ''
  }

  async componentDidMount() {
    // Wait for tf to be ready.
    await tf.ready();
    // Signal to the app that tensorflow.js can now be used.
    this.setState({
      isTfReady: true
    });
    this.model = await mobilenet.load();
    this.setState({ isModelReady: true });
    this.askCameraPermission();
  }

  askCameraPermission = async () => {
    if (Constants.platform.android) {
      const { status } = await Permissions.askAsync(Permissions.CAMERA_ROLL);
      if (status !== 'granted') {
        alert('Please provide camera roll permissions to make this work!');
      }
    }
  }

  imageToTensor(rawImageData) {
    const TO_UINT8ARRAY = true;
    const { width, height, data } = jpeg.decode(rawImageData, TO_UINT8ARRAY);
    // Drop the alpha channel info for mobilenet
    const buffer = new Uint8Array(width * height * 3);
    let offset = 0 ; // offset into original data
    for (let i = 0; i < buffer.length; i += 3) {
      buffer[i] = data[offset];
      buffer[i + 1] = data[offset + 1];
      buffer[i + 2] = data[offset + 2];

      offset += 4;
    }

    return tf.tensor3d(buffer, [height, width, 3]);
  }

  classifyImage = async () => {
    try {
      const imageAssetPath = this.state.s3ImageUrl;
      const response = await fetch(imageAssetPath, {}, { isBinary: true });
      const rawImageData = await response.arrayBuffer();
      const imageTensor = this.imageToTensor(rawImageData);
      const predictions = await this.model.classify(imageTensor);
      this.setState({ predictions });
    } catch (error) {
      console.log(error);
    }
  }
  
  renderPrediction = prediction => {
    return (
      <Text key={prediction.className} style={styles.text}>
        {prediction.className}
      </Text>
    )
  }

  captureImageButtonHandler = () => {
    this.setState({
      imageSubmitted: false,
      predictions: null
    });
    ImagePicker.showImagePicker({ title: "Pick an Image", maxWidth: 800, maxHeight: 600 }, (response) => {
      if (response.didCancel) {
        console.log('User cancelled image picker');
      } else if (response.error) {
        console.log('ImagePicker Error: ', response.error);
      } else if (response.customButton) {
        console.log('User tapped custom button: ', response.customButton);
      } else {
        // You can also display the image using data:
        const source = { uri: 'data:image/jpeg;base64,' + response.data };
        this.setState({ capturedImage: response.uri, base64String: source.uri });
      }
    });
  }

  submitButtonHandler = () => {
    if (this.state.capturedImage == '' || this.state.capturedImage == undefined || this.state.capturedImage == null) {
      alert("Please Capture the Image");
    } else {
      this.setState({
        imageSubmitted: true
      });
      const apiName = "<Your-API-Name>";
      const path = "<Your-API-Path>";
      const init = {
        headers: {
          'Accept': 'application/json',
          "Content-Type": "application/x-amz-json-1.1"
        },
        body: JSON.stringify({
          Image: this.state.base64String,
          name: "testImage.jpg"
        })
      }

      API.post(apiName, path, init).then(response => {
        this.setState({
          s3ImageUrl: response
        });
        { this.state.s3ImageUrl !== '' ? this.classifyImage() : '' };
      });
    }
  }

  render() {
    const { isModelReady, predictions } = this.state
    const capturedImageUri = this.state.capturedImage;
    const imageSubmittedCheck = this.state.imageSubmitted;

    return (
      <View style={styles.MainContainer}>
        <ScrollView>
          <Text style={{ fontSize: 20, color: "#000", textAlign: 'center', marginBottom: 15, marginTop: 10 }}>Object Detection</Text>

          {this.state.capturedImage !== "" && <View style={styles.imageholder} >
            <Image source={{ uri: this.state.capturedImage }} style={styles.previewImage} />
          </View>}

          {this.state.capturedImage != '' && imageSubmittedCheck && (
            <View style={styles.predictionWrapper}>
              {isModelReady && capturedImageUri && imageSubmittedCheck && (
                <Text style={styles.text}>
                  Predictions: {predictions ? '' : 'Loading...'}
                </Text>
              )}
              {isModelReady &&
                predictions &&
                predictions.map(p => this.renderPrediction(p))}
            </View>
          )
          }

          <TouchableHighlight style={[styles.buttonContainer, styles.captureButton]} onPress={this.captureImageButtonHandler}>
            <Text style={styles.buttonText}>Capture Image</Text>
          </TouchableHighlight>

          <TouchableHighlight style={[styles.buttonContainer, styles.submitButton]} onPress={this.submitButtonHandler}>
            <Text style={styles.buttonText}>Submit</Text>
          </TouchableHighlight>

        </ScrollView>
      </View>
    );
  }
}

const styles = StyleSheet.create({
  MainContainer: {
    flex: 1,
    backgroundColor: 'white',
  },
  text: {
    color: '#000000',
    fontSize: 16
  },
  predictionWrapper: {
    height: 100,
    width: '100%',
    flexDirection: 'column',
    alignItems: 'center'
  },
  buttonContainer: {
    height: 45,
    flexDirection: 'row',
    alignItems: 'center',
    justifyContent: 'center',
    marginBottom: 20,
    width: "80%",
    borderRadius: 30,
    marginTop: 20,
    marginLeft: 30,
  },
  captureButton: {
    backgroundColor: "#337ab7",
    width: 350,
  },
  buttonText: {
    color: 'white',
    fontWeight: 'bold',
  },
  submitButton: {
    backgroundColor: "#C0C0C0",
    width: 350,
    marginTop: 5,
  },
  imageholder: {
    borderWidth: 1,
    borderColor: "grey",
    backgroundColor: "#eee",
    width: "50%",
    height: 150,
    marginTop: 10,
    marginLeft: 100,
    flexDirection: 'row',
    alignItems: 'center'
  },
  previewImage: {
    width: "100%",
    height: "100%",
  }
})

export default App;

In the above code, we are configuring amplify with the API name and Endpoint URL that you created as shown below.

Amplify.configure({
 API: {
   endpoints: [
     {
       name: '<Your-API-Name>, 
       endpoint: '<Your-API-Endpoint-URL>',
     },
   ],
 },
});

The capture button will trigger the captureImageButtonHandler function. It will then ask the user to take a picture or select an image from the file system. We will store that image in the state as shown below.

captureImageButtonHandler = () => {
    this.setState({
      imageSubmitted: false,
      predictions: null
    });

    ImagePicker.showImagePicker({ title: "Pick an Image", maxWidth: 800, maxHeight: 600 }, (response) => {
      if (response.didCancel) {
        console.log('User cancelled image picker');
      } else if (response.error) {
        console.log('ImagePicker Error: ', response.error);
      } else if (response.customButton) {
        console.log('User tapped custom button: ', response.customButton);
      } else {
        const source = { uri: 'data:image/jpeg;base64,' + response.data };
        this.setState({ capturedImage: response.uri, base64String: source.uri });
      }
    });
  }

After capturing the image we will preview that image. 

By Clicking on the submit button, the submitButtonHandler function will get triggered where we will send the image to the endpoint as shown below.

submitButtonHandler = () => {
    if (this.state.capturedImage == '' || this.state.capturedImage == undefined || this.state.capturedImage == null) {
      alert("Please Capture the Image");
    } else {
      this.setState({
        imageSubmitted: true
      });
      const apiName = "<Your-API-Name>";
      const path = "<Path-to-your-API>";
      const init = {
        headers: {
          'Accept': 'application/json',
          "Content-Type": "application/x-amz-json-1.1"
        },
        body: JSON.stringify({
          Image: this.state.base64String,
          name: "testImage.jpg"
        })
      }

      API.post(apiName, path, init).then(response => {
        this.setState({
          s3ImageUrl: response
        });
        { this.state.s3ImageUrl !== '' ? this.classifyImage() : '' };
      });
    }
  }

After submitting the image, the API gateway triggers the Lambda function. The Lambda function stores the submitted image in the S3 Bucket and returns its URL. Which is then sent back in the response. The received URL is then set to the state variable and classifyImage function is called as shown above.

The classifyImage function will read the raw data from the image and yield results upon classification in the form of Predictions. The image is going to be read from S3, as we stored its URL in the state of the app component we shall use it. Similarly, the results yielded by this asynchronous method must also be saved. We are storing them in the predictions variable.

classifyImage = async () => {
    try {
      const imageAssetPath = this.state.s3ImageUrl;
      const response = await fetch(imageAssetPath, {}, { isBinary: true });
      const rawImageData = await response.arrayBuffer();
      const imageTensor = this.imageToTensor(rawImageData);
      const predictions = await this.model.classify(imageTensor);
      this.setState({ predictions });
    } catch (error) {
      console.log(error);
    }
  }

The package jpeg-js decodes the width, height, and binary data from the image inside the handler method imageToTensor, which accepts a parameter of the raw image data.

imageToTensor(rawImageData) {
    const TO_UINT8ARRAY = true;
    const { width, height, data } = jpeg.decode(rawImageData, TO_UINT8ARRAY);
    // Drop the alpha channel info for mobilenet
    const buffer = new Uint8Array(width * height * 3);
    let offset = 0 ; // offset into original data
    for (let i = 0; i < buffer.length; i += 3) {
      buffer[i] = data[offset];
      buffer[i + 1] = data[offset + 1];
      buffer[i + 2] = data[offset + 2];

      offset += 4;
    }

    return tf.tensor3d(buffer, [height, width, 3]);
  }

Here the TO_UINT8ARRAY array represents an array of 8-bit unsigned integers.

Lambda Function:

Add the below code to your Lambda function that you created earlier in your AWS Console. The below Lambda function stores the captured image in S3 Bucket and returns the URL of the image.

const AWS = require('aws-sdk');
var s3BucketName = "<Your-S3-BucketName>";
var s3Bucket = new AWS.S3( { params: {Bucket: s3BucketName, Region: "<Your-S3-Bucket-Region>"} } );

exports.handler = (event, context, callback) => {
    let parsedData = JSON.parse(event);
    let encodedImage = parsedData.Image;
    var filePath = parsedData.name;
    let buf = new Buffer(encodedImage.replace(/^data:image\/\w+;base64,/, ""),'base64');
    var data = {
        Key: filePath, 
        Body: buf,
        ContentEncoding: 'base64',
        ContentType: 'image/jpeg'
    };
    s3Bucket.putObject(data, function(err, data){
        if (err) { 
            callback(err, null);
        } else {
            var s3Url = "https://" + s3BucketName + '.' + "s3.amazonaws.com/" + filePath;
            callback(null, s3Url);
        }
    });
};

Running the App:

Run the application by executing the react-native run-android command from the terminal window. Below are the screenshots of the app running on an android device.

That’s all folks! I hope it was helpful. Any queries, please drop them in the comments section.

This story is authored by Dheeraj Kumar. He is a software engineer specializing in React Native and React based frontend development.

Scheduling tasks with AWS SQS and Lambda

In today’s blog post we will be learning a workaround for how to schedule or delay a message using AWS SQS despite its 15 minutes (900 seconds) upper limit.

But first let us understand some SQS attributes briefly, firstly Delivery Delay, it lets you specify a delay between 0 and 900 seconds (15 minutes). When set, any message sent to the queue will only become visible to consumers after the configured delay period. Secondly Visibility Timeout, the time that a received message from a queue will be invisible to be received again unless it’s deleted from the queue.

If you want to learn about dead letter queue and deduplication, you could follow my other article: Processing High Volume Big Data Concurrently with No Duplicates using AWS SQS.

So, when a consumer receives a message, the message remains in the queue but is invisible for the duration of its visibility timeout, after which other consumers will be able to see the message. Ideally, the first consumer would handle and delete the message before the visibility timeout expires.

The upper limit for visibility timeout is 12 hours. We could leverage this to schedule/delay a task.

A typical combination would be SQS with Lambda where the invoked function executes the task. Usually, standard queues when enabled with lambda triggers have immediate consumption that means when a message is inserted into the standard queue the lambda function is invoked immediately with the message available in the event object. 

Note: If the lambda results in an error the message stays in the queue for further receive requests, otherwise it is deleted.

That said, there could be 2 cases:

  1. A generic setup that can adapt to a range of time delays.
  2. A stand-alone setup built to handle only a fixed time delay.

The idea is to insert a message into the queue with task details and time to execute(target time) and have the lambda do the dirty work.

Case1: 

The Lambda function checks if target time equals current time, if so execute the task and message is deleted as the lambda executes without error else change the visibility timeout of that message in the queue with delta difference and raise an error leaving the message in the queue.

Case2:

The SQS’s default visibility timeout is configured with the required fixed time delay. The Lambda function checks if the difference of target time and current time equals fixed time delay, if so execute the task and message is deleted as the lambda executes without error else simply raise an error leaving the message untampered back in the queue.

The message is retried after it’s visibility timeout which is the required fixed time delay and is executed.

The problem with this approach is accuracy and scalability.

Here’s the lambda code for case2:
Processor.py

import boto3
import json
from datetime import datetime, timezone
import dateutil.tz

tz = dateutil.tz.gettz('US/Central')

fixed_time_delay = 1 # change this value, be it hour, min, sec

def lambda_handler(event, context):
    # TODO implement
    
    message = event['Records'][0]
    # print(message)
    result = json.loads(message['body'])
    
    task_details = result['task_details']
    target_time = result['execute_at']
    
    tt = datetime.strptime(target_time, "%d/%m/%Y, %H:%M %p CST")
    print(tt)
    
    t_now = datetime.now(tz)
    time_now = t_now.strftime("%d/%m/%Y, %H:%M %p CST")
    tn = datetime.strptime(time_now, "%d/%m/%Y, %H:%M %p CST")
    print(tn)
    
    delta_time = tn-tt
    print(delta_time)

    delta_in_whatever = #extract delay in hour, min, sec 
    
    if delta_in_whatever == fixed_time_delay: 
        # execute task logic
        print(task_details)
    else:
        raise e

Conclusion:
Scheduling tasks using SQS isn’t effective in all scenarios. You could use AWS step function’s wait state to achieve milliseconds accuracy, or Dynamo DB’s TTL feature to build an ad hoc scheduling mechanism, the choice of service used is largely dependent on the requirement. So, here’s a wonderful blog post that gives you a bigger picture of different ways to schedule a task on AWS.

This story is authored by Koushik. Koushik is a software engineer specializing in AWS Cloud Services.

Programmatically Updating Autoscaling policy on DynamoDB with boto3: Application Auto Scaling

In this blog post, we will be learning how to programmatically update the auto-scaling policy settings of a DynamoDB table. The idea is to scale it smoothly (minimal write request throttling) irrespective of the anticipated traffic spikes it receives. We do this using AWS Application Auto Scaling and Lambda (boto3).

Understanding how DynamoDB auto-scales

DynamoDB auto scaling works based on Cloudwatch metrics and alarms built on top of 3 parameters:

  1. Provisioned capacity
  2. Consumed capacity
  3. Target utilization

Let us understand these parameters briefly, Firstly Provisioned capacity is how many units are allocated for the table/index, while Consumed capacity is how much of it is utilized. Lastly, Target utilization is the threshold we specify at how much utilization(%) we want autoscaling to happen. Accordingly provisioned capacity is increased.
Note: Capacity can be for either writing or reading, sometimes both.

Let us say, you provisioned 50 write units for the table:

  • Autoscale by 30 units when 40 units (80% ) are consumed. Then your autoscaling policy would look something like this.
Provisioned write capacity: 50 units
Target utilization: 80%
max write units: 10,000
min write units: 30

This auto-scaling policy looks fine yet it is static and can it cope up with unpredictable traffic spikes? If so how much time does it take to autoscale (15min)? What if the increased units were less than required, how do you reduce throttling?

Traffic spikes

Broadly traffic spikes could be categorized into 2 categories:

  1. Anticipated spike
  2. Sudden spike

And the traffic fluctuation curve can be any of the following:

  1. Steady rise and fall.
  2. Steep peaks and valleys.

Having a single static autoscaling policy that copes up with all the above cases or a combination of them is not possible. DynamoDB auto-scales capacity based on target capacity utilization instead of write requests. This makes it inefficient and slow.

Target capacity utilization is the percentage of consumption over Provisioned. Mathematically put (consumed/provisioned)*100.

If the traffic is unpredictable and you have to understand and improve (tweak) DynamoDB autoscaling’s internal workflow, here is a wonderful blog post that I would highly recommend.

Anticipated traffic is in direct correlation with your actions, be it a scheduled marketing event, data ingestion or even sale transactions on a festive day. In such scenarios, a workaround would be the ability to programmatically update the autoscaling policy.

Updating autoscaling policy using AWS Application Auto Scaling.

AWS has this service called AWS Application Auto Scaling. With Application Auto Scaling, you can configure/update automatic scaling for the following resources:

  • Amazon ECS services
  • Amazon EC2 Spot Fleet requests
  • Amazon EMR clusters
  • Amazon AppStream 2.0 fleets
  • Amazon DynamoDB tables and global secondary indexes throughput capacity
  • Amazon Aurora Replicas
  • Amazon SageMaker endpoint variants
  • Custom resources provided by your own applications or services
  • Amazon Comprehend document classification endpoints
  • AWS Lambda function provisioned concurrency

The scaling of the provisioned capacity of these services is managed by the autoscaling policy that is in place. AWS Application Auto Scaling service can be used to modify/update this autoscaling policy.

Here is a sample Lambda (python) code that updates DynamoDB autoscaling settings:

# update-dynamo-autoscale-settings
import os
import sys
import boto3
from botocore.exceptions import ClientError

# Add path for additional imports
sys.path.append('./lib/python3.7/site-packages')

# Initialize boto3 clients
dynamodb = boto3.resource('dynamodb')
dynamodb_scaling = boto3.client('application-autoscaling')

# Initialize variables
table_name = "your dynamo table name"
table = dynamodb.Table(table_name)

def update_auto_scale_settings(min_write_capacity: int, table_name: str): 
    max_write_capacity = 40000 #default number
    dynamodb_scaling.register_scalable_target(ServiceNamespace = "dynamodb",
                                                 ResourceId = "table/{}".format(table_name),
                                                 ScalableDimension = "dynamodb:table:WriteCapacityUnits",
                                                 MinCapacity = min_write_capacity,
                                                 MaxCapacity = max_write_capacity)
    
    # if you have indexes on the table, add their names to indexes list below
    # indexes = ["index1", "index2", "index3"]
    # for index_name in indexes:
    #     scaling_dynamodb.register_scalable_target(ServiceNamespace = "dynamodb",
    #                                              ResourceId = "table/{table_name}/index/{index_name}".format(table_name = table_name, index_name = index_name),
    #                                              ScalableDimension = "dynamodb:index:WriteCapacityUnits",
    #                                              MinCapacity = min_write_capacity,
    #                                              MaxCapacity = max_write_capacity)



#put_scaling policy is another call that needs to be made, if you want a different target utilization value.

def lambda_handler(event, context):
    
    try:

        # logic before updating

        write_units = #number
        update_auto_scale_settings(table_name,write_units)
                
        # logic after updating

        # Insert item into dynamo
        # table.put_item(Item = record)
                    
    except Exception as e:
        raise e
    
    else:
        print("success")

I hope it was helpful. Thanks for the read!

This story is authored by Koushik. Koushik is a software engineer and a keen data science and machine learning enthusiast.

Image Text Detection with Bounding Boxes using OpenCV in React Native Mobile App

In our earlier blog post, we had built a Text Detection App with React Native using AWS Rekognition. The Text Detection App basically detects the texts and their dimensions in the captured image. This blog is an extension to it, where we shall learn how to draw Bounding Boxes using the dimensions of the detected text in the image. Assuming you had followed our earlier blog and created the Text Detection App we will proceed further.

The following diagram depicts the architecture we will be building. 

The React app sends the image to be processed via an API call, detect_text lambda function stores it in S3 and calls Amazon Rekognition with its URL to get the detected texts along with their dimensions. With this data it invokes the draw_bounding_box lambda function, it fetches the image from S3, draws the bounding boxes and stores it as a new image. With new URL it responds back to detect_text lambda which in turn responds back to the app via API gateway with the image URL having bounding boxes.

In our previous blog we already have finished detecting the text part, let us look at creating the rest of the setup.

We will use another AWS lambda function to draw bounding boxes and that function would need OpenCV.

OpenCV (Open Source Computer Vision Library) is an open-source computer vision and machine learning software library which was built to provide a common infrastructure for computer vision applications and to accelerate the use of machine perception in the commercial products.

Preparing the package for draw_bounding_box Lambda:

We need OpenCV, Numpy libraries for image manipulation but lambda doesn’t support these libraries by default. We have to prepare them and upload them. So, we will be preparing the lambda code as a package locally and then upload.

To prepare these libraries, follow this link. After finishing the process you will get a zip file. Unzip the file and copy the below lambda code in .py file.

Note: The name of this .py file should be the same as your lambda index handler.
default name: lambda_handler

lambda_handler.py:

import cv2
import boto3
import numpy as np
import base64
import json

def lambda_handler(event, context):
    bucketName='<-Your-Bucket-Name->'
    s3_client = boto3.client('s3')

    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucketName)

    # reading image name from event object
    obj = bucket.Object(key=event['image'])
    # Image we read from S3 bucket
    response = obj.get()

    imgContent = response["Body"].read()
    np_array = np.fromstring(imgContent, np.uint8)
    ''' we can read an image using cv2.imread() or np.fromstring(imgContent, np.uint8) followed by image_np = cv2.imdecode(np_array, cv2.IMREAD_COLOR).    '''
    image_np = cv2.imdecode(np_array, cv2.IMREAD_COLOR)

    '''
    imdecode reads an image from the specified buffer in the memory. 
    If the buffer is too short or contains invalid data, the empty matrix/image is returned.
    In OpenCV you can easily read in images with different file formats (JPG, PNG, TIFF etc.) using imread
    '''
    height, width, channels = image_np.shape
    # reading dimensions from the event object
    dimensions=json.loads(event['boundingBoxDimensions'])

    for dimension in dimensions:
        leftbox = int(width * dimension['Left'])
        topbox = int(height * dimension['Top'])
        widthbox = int(width * dimension['Width'])
        heightbox = int(height * dimension['Height'])
        # Using cv2.rectangle, we will draw a rectangular box with respect to dimensions
        k=cv2.rectangle(image_np, (leftbox, topbox), (leftbox+widthbox, topbox+heightbox) ,(0, 255, 0), 2)

    # used to write the image changes in a local image
    # For that create any folder(here tmp) and place any sample image(here sample.jpeg) in it. If not, we would encounter the following error.
‘Utf8’ codec can’t decode byte 0xaa in position 1: invalid start byte: UnicodeDecodeError.
 
    cv2.imwrite("/tmp/sample.jpeg", k)

    newImage="<-New-Image-Name->"
    # we put the image in S3. And return the image name as we store the modified image in S3
    s3_client.put_object(Bucket=bucketName, Key=newImage, Body=open("/tmp/sample.jpeg", "rb").read())

    return {
        'statusCode': 200,
        'imageName':newImage,
        'body': 'Hello from Lambda!'
    }

The package folder structure would look like below.

As these files exceed the Lambda upload limit, we will be uploading them to S3 and then add it from there.

Zip this lambda-package and upload it to S3. You can paste its S3 URL to your function code and change the lambda runtime environment to use Python 2.7 (OpenCV dependency).

Invoking draw_bounding_box lambda

The detect_text lambda invokes draw_bounding_box lambda in RequestResponse mode, which means detect_text lambda waits for the response of draw_bounding_box lambda.

The draw_bounding_box lambda function reads the image name and box dimensions from the event object. Below is the code for detect_text lambda which invokes the draw_bounding_box lambda function.

detect_text.js

const AWS = require('aws-sdk');
// added package
const S3 = new AWS.S3({signatureVersion: 'v4'});

var rekognition = new AWS.Rekognition();
var s3Bucket = new AWS.S3( { params: {Bucket: "<-Your-Bucket-Name->"} } );
var fs = require('fs');
// To invoke lambda function
var lambda = new AWS.Lambda();

exports.handler = (event, context, callback) => {
    let parsedData = JSON.parse(event);
    let encodedImage = parsedData.Image;
    var filePath = parsedData.name;

    let buf = new Buffer(encodedImage.replace(/^data:image\/\w+;base64,/, ""),'base64');
    var data = {
        Key: filePath, 
        Body: buf,
        ContentEncoding: 'base64',
        ContentType: 'image/jpeg'
    };
    s3Bucket.putObject(data, function(err, data){
        if (err) { 
            console.log('Error uploading data: ', data);
            callback(err, null);
        } else {
            var params = {
                Image: {
                    S3Object: {
                        Bucket: "your-s3-bucket-name", 
                        Name: filePath
                    }
                }
            };

            rekognition.detectText(params, function(err, data) {
                if (err){
                    console.log(err, err.stack);
                    callback(err);
                }
                else{
                    console.log("data: ",data);
                    var detectedTextFromImage=[];
                    var geometry=[];
                    for (item in data.TextDetections){
                      if(data.TextDetections[item].Type === "LINE"){
                        geometry.push(data.TextDetections[item].Geometry.BoundingBox);
                        detectedTextFromImage.push(data.TextDetections[item].DetectedText);
                      }
                    }
                    var dimensions=JSON.stringify(geometry);
                    var payloadData={
                        "boundingBoxDimensions":dimensions,
                        "image": filePath
                    };

                    var params = {
                        FunctionName: 'draw_bounding_box',
                        InvocationType: "RequestResponse",
                        Payload: JSON.stringify(payloadData)
                    };
                    
                    lambda.invoke(params, function(err, data) {
                        if (err){
                            console.log("error occured");
                            console.log(err);
                        }
                        else{
                            var jsondata=JSON.parse(data.Payload);
                            var params = {
                                Bucket: "your-s3-bucket-name", 
                                Key: jsondata.imageName,
                            };
                            s3Bucket.getSignedUrl('getObject', params, function (err, url) {
                                var responseData={
                                        "DetectedText":detectedTextFromImage,
                                    "url":url
                                }
                                callback(null, responseData);
                            });                            
                        }
                    });
                    console.log("waiting for response");
                }
            });
        }
    });
};

Everything is similar except the rekognition.detectText() function. Upon success, we are storing the detected text in a list and dimensions in another list. Next, we need to pass the dimensions list and image name as arguments to the draw_bounding_box lambda function.

var payloadData={
    "boundingBoxDimensions":dimensions,
    "image": filePath
};

var params = {
    FunctionName: 'draw_bounding_box',
    InvocationType: "RequestResponse",
    Payload: JSON.stringify(payloadData)
};
lambda.invoke(params, function(err, data) {
    if (err){
        console.log("error occured");
        console.log(err);
    }
    else{
        var jsondata=JSON.parse(data.Payload);
        var params = {
            Bucket: "your-s3-bucket-name", 
            Key: jsondata.imageName,
        };
        s3Bucket.getSignedUrl('getObject', params, function (err, url) {
            var responseData={
                "DetectedText":detectedTextFromImage,
                "url":url
            }
            callback(null, responseData);
        });                            
    }
});

Lambda.invoke()  expects two arguments where the first argument needs to be an object which contains the name of the lambda function, invocation type, payload data. And the second argument is to handle success or failure response. When the detect_text lambda function invokes the draw_bounding_box function, it will process the image and give the response back to the detect_text lambda function. Upon success, we get the JSON object which contains the modified image name. 

Next, we use s3Bucket.getSignedUrl() to get the image URL which we will send to our React Native App with detected text also as a response.

Replace the existing App.js file, in react-native with the code below.
App.js

import React, {Component} from 'react';
import {
    StyleSheet,
    View,
    Text,
    TextInput,
    Image,
    ScrollView,
    TouchableHighlight,
    ActivityIndicator
} from 'react-native';
import ImagePicker from "react-native-image-picker";
import Amplify, {API} from "aws-amplify";
Amplify.configure({
    API: {
        endpoints: [
            {
                name: "<-Your-API-name->",
                endpoint: "<-Your-end-point-url->"
            }
        ]
    }
});

class Registration extends Component {
  
    constructor(props){
        super(props);
        this.state =  {
            isLoading : false,
            showInputField : false,
            imageName : '',
            capturedImage : '',
            detectedText: []
        };
    }

    captureImageButtonHandler = () => {
        ImagePicker.showImagePicker({title: "Pick an Image", maxWidth: 800, maxHeight: 600}, (response) => {
            console.log('Response - ', response);
            if (response.didCancel) {
                console.log('User cancelled image picker');
            } else if (response.error) {
                console.log('ImagePicker Error: ', response.error);
            } else if (response.customButton) {
                console.log('User tapped custom button: ', response.customButton);
            } else {
                const source = { uri: 'data:image/jpeg;base64,' + response.data };
                this.setState({
                    imageName: "IMG-" + Date.now(),
                    showInputField: true,
                    capturedImage: response.uri,
                    base64String: source.uri
                })
            }
        });
    }

    submitButtonHandler = () => {
        this.setState({
            isLoading: true
        })
        if (this.state.capturedImage == '' || this.state.capturedImage == undefined || this.state.capturedImage == null) {
            alert("Please Capture the Image");
        } else {
            console.log("submiting")
            const apiName = "<-Your-API-name->";
            const path = "/API-path";
            const init = {
                headers: {
                    'Accept': 'application/json',
                    "Content-Type": "application/x-amz-json-1.1"
                },
                body: JSON.stringify({
                    Image: this.state.base64String,
                    name: this.state.imageName
                })
            }

            API.post(apiName, path, init).then(response => {
                this.setState({
                    capturedImage: response.url,
                    detectedText: response.DetectedText,
                    isLoading:false
                })
            });
        }
    }
  
    render() {
        let inputField;
        let submitButtonField;
        if (this.state.showInputField) {
            inputField=
                    <View style={styles.buttonsstyle}>
                        <TextInput
                            placeholder="Img"
                            value={this.state.imageName}
                            onChangeText={imageName => this.setState({imageName: imageName})}
                            style={styles.TextInputStyleClass}
                        />
                    </View>;
            submitButtonField=<TouchableHighlight style={[styles.buttonContainer, styles.submitButton]} onPress={this.submitButtonHandler}>
                            <Text style={styles.buttonText}>Submit</Text>
                        </TouchableHighlight>
            
        }
        
        return (
            <View style={styles.screen}>
                <ScrollView>
                    <Text style= {{ fontSize: 20, color: "#000", textAlign: 'center', marginBottom: 15, marginTop: 10 }}>Text Extracter</Text>

                    {this.state.capturedImage !== "" && <View style={styles.imageholder} >
                        <Image source={{uri : this.state.capturedImage}} style={styles.previewImage} />
                    </View>}
                    {inputField}
                    {this.state.isLoading && (
                        <ActivityIndicator
                            style={styles.Loader}
                            color="#C00"
                            size="large"
                        />
                    )}
                    <View>
                        {
                       this.state.detectedText.map((data, index) => {
                       return(
                           <Text key={index} style={styles.DetextTextView}>{data}</Text>
                       )})
                       }
                    </View>
                    <View style={styles.buttonsstyle}>
                        <TouchableHighlight style={[styles.buttonContainer, styles.captureButton]} onPress={this.captureImageButtonHandler}>
                            <Text style={styles.buttonText}>Capture Image</Text>
                        </TouchableHighlight>
                        {submitButtonField}
                    </View>
                </ScrollView>
            </View>
        );
    }
}

const styles = StyleSheet.create({
    Loader:{
        flex: 1,
        justifyContent: 'center',
        alignItems: 'center',
        height: "100%"
    },
    screen:{
        flex:1,
        justifyContent: 'center',
    },
    buttonsstyle:{
        flex:1,
        alignItems:"center"
    },
    DetextTextView:{
      textAlign: 'center',
    },
    TextInputStyleClass: {
      textAlign: 'center',
      marginBottom: 7,
      height: "70%",
      margin: 10,
      width:"80%"
    },
    inputContainer: {
      borderBottomColor: '#F5FCFF',
      backgroundColor: '#FFFFFF',
      borderRadius:30,
      borderBottomWidth: 1,
      width:"90%",
      height:45,
      marginBottom:20,
      flexDirection: 'row',
      alignItems:'center'
    },
    buttonContainer: {
      height:45,
      flexDirection: 'row',
      alignItems: 'center',
      justifyContent: 'center',
      borderRadius:30,
      margin: 5,
    },
    captureButton: {
      backgroundColor: "#337ab7",
      width: "90%",
    },
    buttonText: {
      color: 'white',
      fontWeight: 'bold',
    },
    horizontal: {
      flexDirection: 'row',
      justifyContent: 'space-around',
      padding: 10
    },
    submitButton: {
      backgroundColor: "#C0C0C0",
      width: "90%",
      marginTop: 5,
    },
    imageholder: {
      borderWidth: 1,
      borderColor: "grey",
      backgroundColor: "#eee",
      width: "50%",
      height: 150,
      marginTop: 10,
      marginLeft: 90,
      flexDirection: 'row',
      alignItems:'center'
    },
    previewImage: {
      width: "100%",
      height: "100%",
    }
});

export default Registration;

Below are the screenshots of the React Native App running on an Android device.
We used the below image to extract text and add bounding boxes.

The image name is generated dynamically with epoch time, which is editable.

I hope it was helpful, thanks for the read!

This story is authored by Dheeraj Kumar and Santosh Kumar. Dheeraj is a software engineer specializing in React Native and React based frontend development. Santosh specializes on Cloud Services based development.

Processing High Volume Big Data Concurrently with No Duplicates using AWS SQS

In this blog post, we’ll be looking at how one could leverage AWS Simple Queue Service (Standard queue) to achieve high concurrency while processing with no duplicates. Also we compare it with other AWS services like DynamoDB, SQS FIFO queue and Kinesis in terms of cost and performance.

A simple use case for the below architecture could be building an end-end messaging service, or sending out transactional emails. In both the above use cases, a highly concurrent processing with no duplicates is needed.

Using AWS SQS with Lambda to process Big data concurrently with no duplicates

We have a Lambda function that writes messages to the Standard queue in batches. This writer function is invoked when a file is posted to S3. While there are messages in the queue, Lambda polls the queue, reads messages in batches and invokes the Processor function synchronously with an event that contains queue messages. The processing function is invoked once for each batch. When the function successfully processes a batch, Lambda deletes its messages from the queue. If at all the function fails processing(raise error) the batch is put back in the queue. Now, the Standard queue is configured with redrive policy to move messages to a Dead Letter Queue (DLQ) when receive request reaches the Maximum receive count(MRC). We set the MRC to 1 to ensure deduplication.

Setting up Standard Queue with Dead Letter Queue

We need two queues one for processing, second for moving failed messages into it. First create the failed_messages queue. As it is needed while creating the message processing queue. Create a new queue, give it a name (failed_messages), select type as Standard and choose Configure Queue

According to the needs, set the queue attributes like visibility timeout, message retention period etc.

For processing messages, Create a new queue, give it a name, select type as standard and choose Configure Queue.

Set the Default Visibility Timeout to 5min and Dead Letter Queue Settings to setup the redrive policy to move failed messages into failed_messages queue created earlier.

From the SQS homepage, select processing queue, and select Redrive Policy, If setup correctly you should see the ARN of failed_messages queue there.

Creating the Writer and Processor lambda functions:

Writer.py

# Write batch messages to queue
import csv
import boto3

s3 = boto3.resource('s3')

# Update this dummy URL
processing_queue_url = "https://sqs.us-west-2.amazonaws.com/85XXXXXXX205/ToBeProcessed"

def lambda_handler(event, context):
    try:
        if 'Records' in event:
            bucket_name = event['Records'][0]['s3']['bucket']['name']        
            key = event['Records'][0]['s3']['object']['key']
            bucket = s3.Bucket(bucket_name)
            obj = bucket.Object(key=key)

            # get the object
            response = obj.get()['Body'].read().decode('utf-8').split('\n')
            resp = list(response)
            if resp[-1] == '':
                #removing header metadata and extra newline
                total_records = len(resp) - 2 
            else:
                #removing header metadata
                total_records = len(resp) - 1 
            print("total record count is :", total_records)

            batch_size = 0
            record_count = 0
            messages = []

            # Write to SQS
            for row in csv.DictReader(response):
                record_count += 1
                record = {}
                for k,v in row.items():
                    record[k] = v

                # Replace below with appropriate column with all values as unique
                unique_id = record['ANY_COLUMN_WITH_ALL_VALUES_UNIQUE']
                
                batch_size += 1
                messages.append(
                {
                    'Id': unique_id,
                    'MessageBody': json.dumps(record)
                })
                   
                if (batch_size == 10):
                    batch_size = 0
                    try:
                        response = sqs.send_message_batch(
                            QueueUrl = processing_queue_url,
                            Entries = messages
                        )
                        print("response:", response)
                        if 'Failed' in response:
                            print('failed_count:', len(response['Failed']))
                    except Exception as e:
                        print("error:",e)
                    messages = []
                
                # Handling last batch
                if(record_count == total_records):
                    print("batch size is :", batch_size)
                    batch_size = 0
                    try:
                        response = sqs.send_message_batch(
                            QueueUrl = processing_queue_url,
                            Entries = messages
                        )
                        print("response:", response)
                        if 'Failed' in response:
                            print('failed count is :', len(response['Failed']))
                    except Exception as e:
                        print("error:",e)
                    messages = []    
        
        print('record count is :', record_count)

    except Exception as e:
        return e

Processor.py

# Process queue messages

def handler(event, context):
    if 'Records' in event:
        try:
            messages = event['Records']
            for message in messages:
                print("message to be processed :", message)
                
                result = message['body']
                result = json.loads(result)

                print("result:",result)
            return {
                'statusCode': 200,
                'body': 'All messages processed successfully'
            }

        except Exception as e:
            print(e)
            return str(e)

Setting up S3 as trigger to Writer lambda

Setting up SQS trigger to processor Lambda

If set up properly, you should be able to view it in Lambda Triggers section from the SQS homepage like this.

The setup is done. To test this upload a .csv file to the S3 location.

SQS Standard Queue in comparison with FIFO queue

FIFO queue in SQS supports deduplication in two ways:

  1. Content based deduplication while writing to SQS.
  2. Processing one record/batch at a time. 

Unlike Standard Queue, FIFO doesn’t support concurrency and lambda invocation. On top of all this there is a limit to how many messages you could write to FIFO queue in a second. FIFO queues are much suited when the order of processing is important.

Cost analysis:
First 1 million Amazon SQS requests are free each month.

TypeCost per 1 million requests
Standard Queue$0.40
FIFO Queue$0.50

More on SQS pricing here.

SQS Standard Queue in comparison with DynamoDB

DynamoDB streams are slow when compared SQS, and costs on various aspects like:

  1. Data Storage
  2. Writes
  3. Reads
  4. Provisioned throughput
  5. Reserved capacity
  6. Indexed data storage
  7. Streams and many more.

In a nutshell, DynamoDB’s monthly cost is dictated by data storage, writes and reads. The best use cases for DynamoDB are those that require a flexible data model, reliable performance, and the automatic scaling of throughput capacity.

SQS Standard Queue in comparison with Kinesis

Kinesis primary use case is collecting, storing and processing real-time continuous data streams. Kinesis is designed for large scale data ingestion and processing, with the ability to maximise write throughput for large volumes of data.

While a message queue makes it easy to decouple and scale micro-services, distributed systems, and serverless applications. Using a queue, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be always available. In a nutshell, Serverless applications are built using micro services, message queue serves as a reliable plumbing.

Drawbacks of Kinesis:

  1. Shard management
  2. Limited Read Throughput

For a much detailed comparison of SQS and Kinesis visit here.

Thanks for the read, I hope it was helpful.

This story is authored by Koushik. Koushik is a software engineer and a keen data science and machine learning enthusiast.

Real Time Streaming Data Analytics using Amazon Kinesis Family

Amazon Kinesis Data Analytics

Amazon Kinesis Data Analytics (KDA) is the easiest way to analyze streaming data, gain actionable insights, and respond to your business and customer needs in real time. KDS reduces the complexity of building, managing and integrating streaming applications with other AWS services. SQL users can easily query streaming data or build entire streaming applications using templates and an interactive SQL editor. Java developers can quickly build sophisticated streaming applications using open source Java libraries and AWS integrations to transform and analyze data in real-time.

For deep dive into Amazon Kinesis Data Analytics, please go through the official docs.

Amazon Kinesis Data Streams

Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. KDS can continuously capture gigabytes of data per second from hundreds of thousands of sources such as website clickstreams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events. The data collected is available in milliseconds to enable real-time analytics use cases such as real-time dashboards, real-time anomaly detection, dynamic pricing, and more.

For more details about Amazon Kinesis Data Streams, please go through the official docs.

Amazon Kinesis Data Firehose

Amazon Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores and analytics tools. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk, enabling near real-time analytics with existing business intelligence tools and dashboards you’re already using today. It is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt the data before loading it, minimizing the amount of storage used at the destination and increasing security.

For more details about Amazon Kinesis Data Firehose, please go through the official docs.

To Create an Amazon Kinesis Data Stream using Console

  • Open the Kinesis console at https://console.aws.amazon.com/kinesis.
  • In the navigation bar, expand the Region selector and choose a Region.
  • Choose Create data stream.
  • On the Create Kinesis stream page, enter a name for your stream and the number of shards you need, and then click Create Kinesis stream.
    On the Kinesis streams page, your stream’s Status is shown as Creating while the stream is being created. When the stream is ready to use, the Status changes to Active.

Amazon Kinesis Data Generator

The Amazon Kinesis Data Generator (KDG) makes it easy to send data to Kinesis Streams or Kinesis Firehose.

While following this link, choose to Create a Cognito User with Cloud Formation.

  • Choose Create a Cognito User with Cloud Formation.
  • After choosing the above option, console redirects to the Cloud Formation Stack creation page. The console looks like the following.
  • Click on Next, provide the CloudFormation Stack Name and provide username, password details for creating Cognito User for Kinesis Data Generator. 
  • Click on Next and choose Create Stack.
  • After Status of Stack changes to Create complete, click on Outputs tab and open the link under the outputs section.
  • After opening the above link, provide the username and password created in earlier steps.
  • Select Region and Stream/delivery name as created.
    The Record template is 
{
    "sensor_id": {{random.number(50)}},
    "current_temperature": {{random.number(
        {
            "min":0,
            "max":150
        }
    )}},
    "location": "{{random.arrayElement(
        ["AUS","USA","UK"]
    )}}"
}

To Create the Kinesis Data Analytics Application

  • Open the Kinesis Data Analytics console at https://console.aws.amazon.com/kinesisanalytics.
  • Choose Create application.
  • On the Create application page, type an application name, type a description, choose SQL for the application’s Runtime setting, and then choose Create application.

Doing this creates a Kinesis data analytics application with a status of READY. The console shows the application hub where you can configure input and output.

In the next step, you configure input for the application. In the input configuration, you add a streaming data source to the application and discover a schema for an in-application input stream by sampling data on the streaming source.

Configure Streaming Source as Input to Kinesis Data Analytics Application

  • On the Kinesis Analytics applications page in the console, choose Connect streaming data.
  • Source section, where you specify a streaming source for your application. You can select an existing stream source or create one. By default the console names the in-application input stream that is created as INPUT_SQL_STREAM_001. For this exercise, keep this name as it appears.
    Stream reference name – This option shows the name of the in-application input stream that is created, SOURCE_SQL_STREAM_001. You can change the name of the stream.
  • Choose Discover Schema, which automatically discovers the schema of input stream.
  • Choose Save and continue.
    Now, we have an application with input configuration added to it. In the next step, we will add SQL code to perform some analytics on the data in-application input stream.

 Real-Time Analytics on Input Stream Data

  • On the Kinesis Analytics applications page in the console, choose Go to SQL editor.
  • In the Would you like to start running “ApplicationName”? dialog box, choose Yes, start application.
  • The console opens the SQL editor page. Review the page, including the buttons (Add SQL from templates, Save and run SQL) and various tabs.
  • Run Analytics on the input stream data using the following sample query. This Query detects an anomaly in the input stream and sends the anomaly data to anomaly_data_stream and normal data to output_data_stream. Load the following query in SQL editor and choose Save and run SQL.
CREATE OR REPLACE STREAM "anomaly_data_stream" (
	"sensor_id" INTEGER,
	"current_temperature" INTEGER, 
	"location" VARCHAR(16));

CREATE OR REPLACE  PUMP "STREAM_PUMP_ANOMALY" AS INSERT INTO "anomaly_data_stream"
SELECT STREAM "sensor_id",
				"current_temperature",
				"location"
FROM "SOURCE_SQL_STREAM_001" WHERE "current_temperature" > 100;

CREATE OR REPLACE STREAM "output_data_stream" (
	"sensor_id" INTEGER,
	"current_temperature" INTEGER, 
	"location" VARCHAR(16));

CREATE OR REPLACE  PUMP "STREAM_PUMP_OUTPUT" AS INSERT INTO "output_data_stream"
SELECT STREAM "sensor_id",
				"current_temperature",
				"location"
FROM "SOURCE_SQL_STREAM_001" WHERE "current_temperature" < 100;

It creates the in-application stream output_data_stream and anomaly_data_stream.
It creates the pump STREAM_PUMP_OUTPUT and STREAM_PUMP_ANOMALY, and uses it to select rows from SOURCE_SQL_STREAM_001 and insert them in the output_data_stream and anomaly_data_stream. You can see the results in the Real-time analytics tab.

  • The SQL editor has the following tabs:

    The Source data tab shows an in-application input stream data that is mapped to the streaming source. Choose the in-application stream, and you can see data coming in. ROWTIME – Each row in an in-application stream has a special column called ROWTIME. This column is the timestamp when Amazon Kinesis Data Analytics inserted the row in the first in-application stream (the in-application input stream that is mapped to the streaming source).

    The Real-time Analytics tab shows all the other in-application streams created by your application code. It also includes the error stream. Choose DESTINATION_SQL_STREAM to view the rows your application code inserted. 

    The Destination tab shows the external destination where Kinesis Data Analytics writes the query results. We haven’t configured any external destination for our application output yet. 

To create a delivery stream from Kinesis Data Firehose to Amazon S3

  • Open the Kinesis Data Firehose console at https://console.aws.amazon.com/firehose/.
  • Choose Create Delivery Stream. In this case, the name of the stream is anomaly-delivery-stream.
  • On the Destination page, choose the following options.
    • Destination – Choose Amazon S3.
    • Delivery stream name – Type a name for the delivery stream
    • S3 bucket – Choose an existing bucket, or choose New S3 Bucket. If you create a new bucket, type a name for the bucket and choose the region your console is currently using.
    • S3 prefix – Stream stores data in the provided prefix. For anomaly data, the prefix becomes 
      data/anomaly/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
    • S3 error prefix – errors in delivering stream to s3, stores in error prefix.
  • Choose Next.
  • On the Configuration page, leave the fields at the default settings. The only required step is to select an IAM role that enables Kinesis Data Firehose to access your resources, as follows:
    1. For IAM Role, choose Select an IAM role.
    2. In the drop-down menu, under Create/Update existing IAM role, choose Firehose delivery IAM role, leave the fields at their default settings, and then choose Allow.
  • Choose Next.
  • Review your settings, and then choose Create Delivery Stream.

The anomaly-delivery-stream created successfully. In the same way, create another Firehose stream named output-delivery-stream.

Configuring Application Output to Amazon Kinesis Data Firehose

We can optionally add an output configuration to the application, to persist everything written from an in-application stream to an external destination such as an Amazon Kinesis data stream, a Kinesis Data Firehose delivery stream, or an AWS Lambda function.

In this application, we are connecting the in-application stream to a Kinesis Data Firehose delivery stream.

In the Destination Tab, choose in-application stream as anomaly_data_stream and Firehose stream as anomaly-delivery-stream and select the format as JSON. In this way configure for output_data_stream as well.
You can see the following after configuring:

Data writes into S3 using Kinesis Firehose Delivery Stream. Now we can query the data in Athena by running a Crawler once on that path.

Thanks for the read. Hope it was helpful.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Object Detection in React Native App using AWS Rekognition

In this post, we are going to build a React Native app for detecting objects from an image using Amazon Rekognition.

Here we will capture an Image or Select it from file system. We will send that image to API Gateway where it triggers the Lambda Function which will store in S3 Bucket. The stored image is sent to Amazon Recognition which will detect the objects from the image.

Installing dependencies:

Let’s go to React Native Docs, select React Native CLI Quickstart and select our appropriate Development OS and the Target OS as Android, as we are going to build an android application.

Follow the docs for installing dependencies, then create a new React Native Application. Use the command line interface to generate a new React Native project called ObjectDetection.

react-native init ObjectDetection

Preparing the Android device:

We shall need an Android device to run our React Native Android app. This can be either a physical Android device, or more commonly, we can use an Android Virtual Device (AVD) which allows us to emulate an Android device on our computer (using Android Studio).

Either way, we shall need to prepare the device to run Android apps for development. If you have a physical Android device, you can use it for development in place of an AVD by connecting it to your computer using a USB cable and following the instructions here.

If you are using a virtual device follow this link. I shall be using a physical Android device.

Now go to the command line and run react-native run-android inside your React Native app directory

cd ObjectDetection && react-native run-android

If everything is set up correctly, you should see your new app running on your physical device or Android emulator.

API Creation in AWS Console: 

Before going further, create an API in your AWS console following Working with API Gateway paragraph in the following post:
https://medium.com/zenofai/serverless-web-application-architecture-using-react-with-amplify-part1-5b4d89f384f7
Once you are done with creating API come back to the React Native application.
Now, go to your project directory and Replace your App.js file with the following code.

import React, {Component} from 'react';
import {
 StyleSheet,
 View,
 Text,
 TextInput,
 Image,
 ScrollView,
 TouchableHighlight,
} from 'react-native';
import ImagePicker from 'react-native-image-picker';
import Amplify, {API} from 'aws-amplify';
import Video from 'react-native-video';
 
// Amplify configuration for API-Gateway
Amplify.configure({
 API: {
   endpoints: [
     {
       name: 'LabellingAPI',   //your api name
       endpoint:’<Endpoint-URL>’, //Your Endpoint URL
     },
   ],
 },
});
 
class Registration extends Component {
 constructor(props) {
   super(props);
   this.state = {
     username: 'storeImage.png',
     userId: '',
     image: '',
     capturedImage: '',
     objectName: '',
   };
 }
 
// It selects image from filesystem or capture from camera
 captureImageButtonHandler = () => {
   this.setState({
     objectName: '',
   });
 
   ImagePicker.showImagePicker(
     {title: 'Pick an Image', maxWidth: 800, maxHeight: 600},
     response => {
       console.log('Response = ', response);
       if (response.didCancel) {
         console.log('User cancelled image picker');
       } else if (response.error) {
         console.log('ImagePicker Error: ', response.error);
       } else if (response.customButton) {
         console.log('User tapped custom button: ', response.customButton);
       } else {
         // You can also display the image using data:
         const source = {uri: 'data:image/jpeg;base64,' + response.data};
         this.setState({
           capturedImage: response.uri,
           base64String: source.uri,
         });
       }
     },
   );
 };
 
// this method triggers when you click submit. If the image is valid then It will send the image to API Gateway. 
 submitButtonHandler = () => {
   if (
     this.state.capturedImage == '' ||
     this.state.capturedImage == undefined ||
     this.state.capturedImage == null
   ) {
     alert('Please Capture the Image');
   } else {
     const apiName = 'LabellingAPI';
     const path = '/storeimage';
     const init = {
       headers: {
         Accept: 'application/json',
         'Content-Type': 'application/x-amz-json-1.1',
       },
       body: JSON.stringify({
         Image: this.state.base64String,
         name: 'storeImage.png',
       }),
     };
 
     API.post(apiName, path, init).then(response => {
       if (JSON.stringify(response.Labels.length) > 0) {
         this.setState({
           objectName: response.Labels[0].Name,
         });
       } else {
         alert('Please Try Again.');
       }
     });
   }
 };
 
 render() {
   if (this.state.image !== '') {
   }
   return (
     <View style={styles.MainContainer}>
       <ScrollView>
         <Text
           style={{
             fontSize: 20,
             color: '#000',
             textAlign: 'center',
             marginBottom: 15,
             marginTop: 10,
           }}>
           Capture Image
         </Text>
         {this.state.capturedImage !== '' && (
           <View style={styles.imageholder}>
             <Image
               source={{uri: this.state.capturedImage}}
               style={styles.previewImage}
             />
           </View>
         )}
         {this.state.objectName ? (
           <TextInput
             underlineColorAndroid="transparent"
             style={styles.TextInputStyleClass}
             value={this.state.objectName}
           />
         ) : null}
         <TouchableHighlight
           style={[styles.buttonContainer, styles.captureButton]}
           onPress={this.captureImageButtonHandler}>
           <Text style={styles.buttonText}>Capture Image</Text>
         </TouchableHighlight>
 
         <TouchableHighlight
           style={[styles.buttonContainer, styles.submitButton]}
           onPress={this.submitButtonHandler}>
           <Text style={styles.buttonText}>Submit</Text>
         </TouchableHighlight>
       </ScrollView>
     </View>
   );
 }
}
 
const styles = StyleSheet.create({
 TextInputStyleClass: {
   textAlign: 'center',
   marginBottom: 7,
   height: 40,
   borderWidth: 1,
   marginLeft: 90,
   width: '50%',
   justifyContent: 'center',
   borderColor: '#D0D0D0',
   borderRadius: 5,
 },
 inputContainer: {
   borderBottomColor: '#F5FCFF',
   backgroundColor: '#FFFFFF',
   borderRadius: 30,
   borderBottomWidth: 1,
   width: 300,
   height: 45,
   marginBottom: 20,
   flexDirection: 'row',
   alignItems: 'center',
 },
 buttonContainer: {
   height: 45,
   flexDirection: 'row',
   alignItems: 'center',
   justifyContent: 'center',
   marginBottom: 20,
   width: '80%',
   borderRadius: 30,
   marginTop: 20,
   marginLeft: 5,
 },
 captureButton: {
   backgroundColor: '#337ab7',
   width: 350,
 },
 buttonText: {
   color: 'white',
   fontWeight: 'bold',
 },
 horizontal: {
   flexDirection: 'row',
   justifyContent: 'space-around',
   padding: 10,
 },
 submitButton: {
   backgroundColor: '#C0C0C0',
   width: 350,
   marginTop: 5,
 },
 imageholder: {
   borderWidth: 1,
   borderColor: 'grey',
   backgroundColor: '#eee',
   width: '50%',
   height: 150,
   marginTop: 10,
   marginLeft: 90,
   flexDirection: 'row',
   alignItems: 'center',
 },
 previewImage: {
   width: '100%',
   height: '100%',
 },
});
 
export default Registration;

In the above code, we are configuring amplify with the API name and Endpoint URL that you created as shown below.

Amplify.configure({
 API: {
   endpoints: [
     {
       name: '<Your-API-Name>, 
       endpoint:’<Endpoint-URL>’,
     },
   ],
 },
});

By clicking the capture button it will trigger the captureImageButtonHandler function. It will then ask the user to take a picture or select from file system. When user captures the image or selects from file system, we will store that image in the state as shown below.

captureImageButtonHandler = () => {
   this.setState({
     objectName: '',
   });
 
   ImagePicker.showImagePicker(
     {title: 'Pick an Image', maxWidth: 800, maxHeight: 600},
     response => {
       console.log('Response = ', response);
       if (response.didCancel) {
         console.log('User cancelled image picker');
       } else if (response.error) {
         console.log('ImagePicker Error: ', response.error);
       } else if (response.customButton) {
         console.log('User tapped custom button: ', response.customButton);
       } else {
         // You can also display the image using data:
         const source = {uri: 'data:image/jpeg;base64,' + response.data};
         this.setState({
           capturedImage: response.uri,
           base64String: source.uri,
         });
       }
     },
   );
 };

After capturing the image we will preview that image. By Clicking on submit button, submitButtonHandler function will get triggered where we will send the image to the end point as shown below.

submitButtonHandler = () => {
   if (
     this.state.capturedImage == '' ||
     this.state.capturedImage == undefined ||
     this.state.capturedImage == null
   ) {
     alert('Please Capture the Image');
   } else {
     const apiName = 'LabellingAPI';
     const path = '/storeimage';
     const init = {
       headers: {
         Accept: 'application/json',
         'Content-Type': 'application/x-amz-json-1.1',
       },
       body: JSON.stringify({
         Image: this.state.base64String,
         name: 'storeImage.png',
       }),
     };
 
     API.post(apiName, path, init).then(response => {
       if (JSON.stringify(response.Labels.length) > 0) {
         this.setState({
           objectName: response.Labels[0].Name,
         });
       } else {
         alert('Please Try Again.');
       }
     });
   }
 };

Lambda Function:

Add the following code into your lambda function that you created in your AWS Console.

const AWS = require('aws-sdk')
var rekognition = new AWS.Rekognition()
var s3Bucket = new AWS.S3( { params: {Bucket: "<Your-Bucket>"} } );
var fs = require('fs');
exports.handler = (event, context, callback) => {
   let parsedData = JSON.parse(event)
   let encodedImage = parsedData.Image;
   var filePath = parsedData.name;
   let buf = new Buffer(encodedImage.replace(/^data:image\/\w+;base64,/, ""),'base64')
   var data = {
       Key: filePath,
       Body: buf,
       ContentEncoding: 'base64',
       ContentType: 'image/jpeg'
   };
   s3Bucket.putObject(data, function(err, data){
       if (err) {
           console.log('Error uploading data: ', data);
           callback(err, null);
       } else {
           var params = {
             Image: {
              S3Object: {
               Bucket: "<Your-Bucket>",
               Name: filePath
              }
             },
             MaxLabels: 10,
             MinConfidence: 90
            };
           rekognition.detectLabels(params, function(err, data) {
               if (err){
                   console.log(err, err.stack);
                   callback(err)
               }
               else{
                   console.log(data);
                   callback(null, data);
               }
           });
       }
   });
};

In the above code, we would receive the image from React Native which we are storing in S3 Bucket. The stored image is sent to Amazon Recognition which has detectLabels method that detects the labels from the image and sends the response with the detected labels in JSON format.

capture image screen

Once you capture an image you can see a preview of that image as shown below.

Nike backpack

On submitting the captured image you can see the label of that image as shown below:

Object recognised as backpack

That’s all folks! I hope it was helpful.
For any queries drop them in the comments section.

This story is authored by Dheeraj Kumar and Venu Vaka. Dheeraj is a software engineer specializing in React Native and React based frontend development. Venu is a software engineer specializing in ReactJS and AWS Cloud.

AWS Machine Learning Data Engineering Pipeline for Batch Data

This post walks you through all the steps required to build a data engineering pipeline for batch data using AWS Step Functions. The sequence of steps works like so : the ingested data arrives as a CSV file in a S3 based data lake in the landing zone, which automatically triggers a Lambda function to invoke the Step Function. I have assumed that data is being ingested daily in a .csv file with a filename_date.csv naming convention like so customers_20190821.csv. The step function, as the first step, starts a landing to raw zone file transfer operation via a Lambda Function. Then we have an AWS Glue crawler crawl the raw data into an Athena table, which is used as a source for AWS Glue based PySpark transformation script. The transformed data is written in the refined zone in the parquet format. Again an AWS Glue crawler runs to “reflect” this refined data into another Athena table. Finally, the data science team can consume this refined data available in the Athena table, using an AWS Sagemaker based Jupyter notebook instance. It is to be noted that the data science does not need to do any data pull manually, as the data engineering pipeline automatically pulls in the delta data, as per the data refresh schedule that writes new data in the landing zone.

Let’s go through the steps

How to make daily data available to Amazon SageMaker?

What is Amazon SageMaker?

Amazon SageMaker is an end-to-end machine learning (ML) platform that can be leveraged to build, train, and deploy machine learning models in AWS. Using the Amazon SageMaker Notebook module, improves the efficiency of interacting with the data without the latency of bringing it locally.
For deep dive into Amazon SageMaker, please go through the official docs.

In this blog post, I will be using a dummy customers data. The customers data consists of retailer information and units purchased.

Updating Table Definitions with AWS Glue

The data catalog feature of AWS Glue and the inbuilt integration to Amazon S3 simplifies the process of identifying data and deriving the schema definition out of the source data. Glue crawlers within Data catalog, are used to build out the metadata tables of data stored in Amazon S3.

I created a crawler named raw for the data in raw zone (s3://bucketname/data/raw/customers/). In case you are just starting out on AWS Glue crawler, I have explained how to create one from scratch in one of my earlier article. If you run this crawler, it creates customers table in specified database (raw).

Create an invocation Lambda Function

In case you are just starting out on Lambda functions, I have explained how to create one from scratch with an IAM role to access the StepFunctions, Amazon S3, Lambda and CloudWatch in my earlier article.

Add trigger to the created Lambda function named invoke-step-functions. Configure Bucket, Prefix and  Suffix accordingly.

Once file is arrived at landing zone, it triggers the invoke Lambda function which extracts year, month, day from file name that comes from event. It passes year, month, day with two characters from uuid as input to the AWS StepFunctions.Please replace the following code in invoke-step-function Lambda.

import json
import uuid
import boto3
from datetime import datetime

sfn_client = boto3.client('stepfunctions')

stm_arn = 'arn:aws:states:us-west-2:XXXXXXXXXXXX:stateMachine:Datapipeline-for-SageMaker'

def lambda_handler(event, context):
    
    # Extract bucket name and file path from event
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    path = event['Records'][0]['s3']['object']['key']
    
    file_name_date = path.split('/')[2]
    processing_date_str = file_name_date.split('_')[1].replace('.csv', '')
    processing_date = datetime.strptime(processing_date_str, '%Y%m%d')
    
    # Extract year, month, day from date
    year = processing_date.strftime('%Y')
    month = processing_date.strftime('%m')
    day = processing_date.strftime('%d')
    
    uuid_temp = uuid.uuid4().hex[:2]
    execution_name = '{processing_date_str}-{uuid_temp}'.format(processing_date_str=processing_date_str, uuid_temp=uuid_temp)
    
    # Starts the execution of AWS StepFunctions
    response = sfn_client.start_execution(
          stateMachineArn = stm_arn,
          name= str(execution_name),
          input= json.dumps({"year": year, "month": month, "day": day})
      )
    
    return {"year": year, "month": month, "day": day}

Create a Generic FileTransfer Lambda

Create a Lambda function named generic-file-transfer as we created earlier in this article. In the file transfer Lambda function, it transfers files from landing zone to raw zone and landing zone to archive zone based on event coming from the StepFunction.

  1. If step is landing-to-raw-file-transfer, the Lambda function copies files from landing to raw zone.
  2. If step is landing-to-archive-file-transfer, the Lambda function copies files from landing to archive zone and deletes files from landing zone.

Please replace the following code in generic-file-transfer Lambda.

import json
import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    step = event['step']
    year = event['year']
    month = event['month']
    day = event['day']
    
    bucket_name = event['bucket_name']
    source_prefix = event['source_prefix']
    destination_prefix = event['destination_prefix']
    
    bucket = s3.Bucket(bucket_name)
    
    for objects in bucket.objects.filter(Prefix = source_prefix):
        file_path = objects.key
        
        if ('.csv' in file_path) and (step == 'landing-to-raw-file-transfer'):
            
            # Extract filename from file_path
            file_name_date = file_path.split('/')[2]
            file_name = file_name_date.split('_')[0]
            
            # Add filename to the destination prefix
            destination_prefix = '{destination_prefix}{file_name}/year={year}/month={month}/day={day}/'.format(destination_prefix=destination_prefix, file_name=file_name, year=year, month=month, day=day)
            print(destination_prefix)
            
            source_object = {'Bucket': bucket_name, "Key": file_path}
            
            # Replace source prefix with destination prefix
            new_path = file_path.replace(source_prefix, destination_prefix)
            
            # Copies file
            new_object = bucket.Object(new_path)
            new_object.copy(source_object)
         
        if ('.csv' in file_path) and (step == 'landing-to-archive-file-transfer'):
            
            # Add filename to the destination prefix
            destination_prefix = '{destination_prefix}{year}-{month}-{day}/'.format(destination_prefix=destination_prefix, year=year, month=month, day=day)
            print(destination_prefix)
            
            source_object = {'Bucket': bucket_name, "Key": file_path}
            
            # Replace source prefix with destination prefix
            new_path = file_path.replace(source_prefix, destination_prefix)
            
            # Copies file
            new_object = bucket.Object(new_path)
            new_object.copy(source_object)
            
            # Deletes copied file
            bucket.objects.filter(Prefix = file_path).delete()
            
    return {"year": year, "month": month, "day": day}

Generic FileTransfer Lambda function setup is now complete. We need to check all files are copied successfully from one zone to another zone. If you have large files that needs to be copied, you could check out our Lightening fast distributed file transfer architecture.

Create Generic FileTransfer Status Check Lambda Function

Create a Lambda function named generic-file-transfer-status. If the step is landing to raw file transfer, the Lambda function checks if all files are copied from landing to raw zone by comparing the number of objects in landing and raw zones. If count doesn’t match it will raise an exception, and that exception is handled in AWS StepFunctions and retries after some backoff rate. If the count matches, all files are copied successfully. If the step is landing to archive file transfer, the Lambda function checks that any files are left in landing zone. Please replace the following code in generic-file-transfer-status Lambda function.

import json
import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    step = event['step']
    year = event['year']
    month = event['month']
    day = event['day']
    
    bucket_name = event['bucket_name']
    source_prefix = event['source_prefix']
    destination_prefix = event['destination_prefix']
    
    bucket = s3.Bucket(bucket_name)
    
    class LandingToRawFileTransferIncompleteException(Exception):
        pass

    class LandingToArchiveFileTransferIncompleteException(Exception):
        pass
    
    if (step == 'landing-to-raw-file-transfer'):
        if file_transfer_status(bucket, source_prefix, destination_prefix):
            print('File Transfer from Landing to Raw Completed Successfully')
        else:
            raise LandingToRawFileTransferIncompleteException('File Transfer from Landing to Raw not completed')
    
    if (step == 'landing-to-archive-file-transfer'):
        if is_empty(bucket, source_prefix):
            print('File Transfer from Landing to Archive Completed Successfully')
        else:
            raise LandingToArchiveFileTransferIncompleteException('File Transfer from Landing to Archive not completed.')
    
    return {"year": year, "month": month, "day": day}

def file_transfer_status(bucket, source_prefix, destination_prefix):
    
    try:
        
        # Checks number of objects at the source prefix (count of objects at source i.e., landing zone)
        source_object_count = 0
        for obj in bucket.objects.filter(Prefix = source_prefix):
            path = obj.key
            if (".csv" in path):
                source_object_count = source_object_count + 1
        print(source_object_count)
        
        # Checks number of objects at the destination prefix (count of objects at destination i.e., raw zone)
        destination_object_count = 0
        for obj in bucket.objects.filter(Prefix = destination_prefix):
            path = obj.key
            
            if (".csv" in path):
                destination_object_count = destination_object_count + 1
        
        print(destination_object_count)
        return (source_object_count == destination_object_count)

    except Exception as e:
        print(e)
        raise e

def is_empty(bucket, prefix):
    
    try:
        # Checks if any files left in the prefix (i.e., files in landing zone)
        object_count = 0
        for obj in bucket.objects.filter(Prefix = prefix):
            path = obj.key

            if ('.csv' in path):
                object_count = object_count + 1
                    
        print(object_count)
        return (object_count == 0)
        
    except Exception as e:
        print(e)
        raise e

Create a Generic Crawler invocation Lamda

Create a Lambda function named generic-crawler-invoke. The Lambda function invokes a crawler. The crawler name is passed as argument from AWS StepFunctions through event object. Please replace the following code in generic-crawler-invoke Lambda function.

import json
import boto3

glue_client = boto3.client('glue')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    year = event['year']
    month = event['month']
    day = event['day']
    
    crawler_name = event['crawler_name']
    
    try:
        response = glue_client.start_crawler(Name = crawler_name)
    except Exception as e:
        print('Crawler in progress', e)
        raise e
    
    return {"year": year, "month": month, "day": day}

Create a Generic Crawler Status Lambda

Create a Lambda function named generic-crawler-status. The Lambda function checks whether the crawler ran successfully or not. If crawler is in running state, the Lambda function raises an exception and the exception will be handled in the Step Function and retries after a certain backoff rate. Please replace the following code in generic-crawler-status Lambda.

import json
import boto3

glue_client = boto3.client('glue')

def lambda_handler(event, context):
    
    class CrawlerInProgressException(Exception):
        pass
    
    # Extract Parametres from Event (invoked by StepFunctions)
    year = event['year']
    month = event['month']
    day = event['day']
    
    crawler_name = event['crawler_name']
    
    response = glue_client.get_crawler_metrics(CrawlerNameList =[crawler_name])
    print(response['CrawlerMetricsList'][0]['CrawlerName']) 
    print(response['CrawlerMetricsList'][0]['TimeLeftSeconds']) 
    print(response['CrawlerMetricsList'][0]['StillEstimating']) 
    
    if (response['CrawlerMetricsList'][0]['StillEstimating']):
        raise CrawlerInProgressException('Crawler In Progress!')
    elif (response['CrawlerMetricsList'][0]['TimeLeftSeconds'] > 0):
        raise CrawlerInProgressException('Crawler In Progress!')
    
    return {"year": year, "month": month, "day": day}

Create an AWS Glue Job

AWS Glue is a fully managed ETL (extract, transform, and load) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. For deep dive into AWS Glue, please go through the official docs.

Create an AWS Glue Job named raw-refined. In case you are just starting out on AWS Glue Jobs, I have explained how to create one from scratch in my earlier article. This Glue job converts file format from csv to parquet and stores in refined zone. The push down predicate is used as filter condition for reading data of only the processing date using the partitions.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
# args = getResolvedOptions(sys.argv, ['JOB_NAME'])

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'year', 'month', 'day'])

year = args['year']
month = args['month']
day = args['day']

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "raw", table_name = "customers", push_down_predicate ="((year == " + year + ") and (month == " + month + ") and (day == " + day + "))", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("email_id", "string", "email_id", "string"), ("retailer_name", "string", "retailer_name", "string"), ("units_purchased", "long", "units_purchased", "long"), ("purchase_date", "string", "purchase_date", "string"), ("sale_id", "string", "sale_id", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://bucketname/data/refined/customers/", "partitionKeys": ["year","month","day"]}, format = "parquet", transformation_ctx = "datasink4")

job.commit()

Create a Refined Crawler as we created Raw Crawler earlier in this article. Please point the crawler path to refined zone(s3://bucketname/data/refined/customers/) and database as refined. No need to create a Lambda function for refined crawler invocation and status, as we will pass crawler names from the StepFunction.

Resources required to create an the StepFunction have been created.

Creating the AWS StepFunction

StepFunction is where we create and orchestrate steps to process data according to our workflow. Create an AWS StepFunctions named Datapipeline-for-SageMaker.  In case you are just starting out on AWS StepFunctions, I have explained how to create one from scratch here.

Data is being ingested into landing zone. It triggers a Lambda function which in turn invokes the execution of the StepFunction. The steps in the StepFunction are as follows:

  1. Transfers files from landing zone to raw zone.
  2. Checks all files are copied to raw zone successfully or not.
  3. Invokes raw Crawler which crawls data in raw zone and updates/creates definition of table in the specified database.
  4. Checks if the Crawler is completed successfully or not.
  5. Invokes Glue Job and waits for it to complete.
  6. Invokes refined Crawler which crawls data from refined zone in and updates/creates definition of table in the specified database.
  7. Checks if the Crawler is completed successfully or not.
  8. Transfers files from landing zone to archive zone and deletes files from landing zone.
  9. Checks all files are copied and deleted from landing zone successfully.

Please update the StepFunctions definition with the following code.

{
  "Comment": "Datapipeline For MachineLearning in AWS Sagemaker",
  "StartAt": "LandingToRawFileTransfer",
  "States": {
    "LandingToRawFileTransfer": {
      "Comment": "Transfers files from landing zone to Raw zone.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-raw-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToRawFileTransferFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToRawFileTransferFailed"
        }
      ],
      "Next": "LandingToRawFileTransferPassed"
    },
    "LandingToRawFileTransferFailed": {
      "Type": "Fail",
      "Cause": "Landing To Raw File Transfer failed"
    },
    "LandingToRawFileTransferPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToRawFileTransferStatus"
    },
    "LandingToRawFileTransferStatus": {
      "Comment": "Checks whether all files are copied from landing to raw zone successfully.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-raw-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer-status",
      "Retry": [
        {
          "ErrorEquals": [
            "LandingToRawFileTransferInCompleteException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToRawFileTransferStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToRawFileTransferStatusFailed"
        }
      ],
      "Next": "LandingToRawFileTransferStatusPassed"
    },
    "LandingToRawFileTransferStatusFailed": {
      "Type": "Fail",
      "Cause": "Landing To Raw File Transfer failed"
    },
    "LandingToRawFileTransferStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "StartRawCrawler"
    },
    "StartRawCrawler": {
      "Comment": "Crawls data from raw zone and adds table definition to the specified Database. IF table definition exists updates the definition.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "raw",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-invoke",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "StartRawCrawlerFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "StartRawCrawlerFailed"
        }
      ],
      "Next": "StartRawCrawlerPassed"
    },
    "StartRawCrawlerFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "StartRawCrawlerPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "RawCrawlerStatus"
    },
    "RawCrawlerStatus": {
      "Comment": "Checks whether crawler is successfully completed.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "raw",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-status",
      "Retry": [
        {
          "ErrorEquals": [
            "CrawlerInProgressException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "RawCrawlerStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "RawCrawlerStatusFailed"
        }
      ],
      "Next": "RawCrawlerStatusPassed"
    },
    "RawCrawlerStatusFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "RawCrawlerStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "GlueJob"
    },
    "GlueJob": {
      "Comment": "Invokes Glue job and waits for Glue job to complete.",
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "retail-raw-refined",
        "Arguments": {
          "--refined_prefix": "data/refined",
          "--year.$": "$.year",
          "--month.$": "$.month",
          "--day.$": "$.day"
        }
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "GlueJobFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GlueJobFailed"
        }
      ],
      "Next": "GlueJobPassed"
    },
    "GlueJobFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "GlueJobPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.Arguments.--year",
        "month.$": "$.Arguments.--month",
        "day.$": "$.Arguments.--day"
      },
      "Next": "StartRefinedCrawler"
    },
    "StartRefinedCrawler": {
      "Comment": "Crawls data from refined zone and adds table definition to the specified Database.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "refined",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-invoke",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "StartRefinedCrawlerFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "StartRefinedCrawlerFailed"
        }
      ],
      "Next": "StartRefinedCrawlerPassed"
    },
    "StartRefinedCrawlerFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "StartRefinedCrawlerPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "RefinedCrawlerStatus"
    },
    "RefinedCrawlerStatus": {
      "Comment": "Checks whether crawler is successfully completed.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "refined",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-status",
      "Retry": [
        {
          "ErrorEquals": [
            "CrawlerInProgressException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "RefinedCrawlerStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "RefinedCrawlerStatusFailed"
        }
      ],
      "Next": "RefinedCrawlerStatusPassed"
    },
    "RefinedCrawlerStatusFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "RefinedCrawlerStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToArchiveFileTransfer"
    },
    "LandingToArchiveFileTransfer": {
      "Comment": "Transfers files from landing zone to archived zone",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-archive-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToArchiveFileTransferFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToArchiveFileTransferFailed"
        }
      ],
      "Next": "LandingToArchiveFileTransferPassed"
    },
    "LandingToArchiveFileTransferFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "LandingToArchiveFileTransferPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToArchiveFileTransferStatus"
    },
    "LandingToArchiveFileTransferStatus": {
      "Comment": "Checks whether all files are copied from landing to archived successfully.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-archive-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer-status",
      "Retry": [
        {
          "ErrorEquals": [
            "LandingToArchiveFileTransferInCompleteException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToArchiveFileTransferStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToArchiveFileTransferStatusFailed"
        }
      ],
      "Next": "LandingToArchiveFileTransferStatusPassed"
    },
    "LandingToArchiveFileTransferStatusFailed": {
      "Type": "Fail",
      "Cause": "LandingToArchiveFileTransfer invocation failed"
    },
    "LandingToArchiveFileTransferStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "End": true
    }
  }
}

After updating the AWS StepFunctions definition, the visual workflow looks like the following.

Now upload file in data/landing/ zone in the bucket  where the trigger has been configured with the Lambda. The execution of StepFunction has started and the visual workflow looks like the following.

In RawCrawlerStatus step, if the Lambda is failing we retry till sometime and then mark the StepFunction as failed. If the StepFunction ran successfully. The visual workflow of the StepFunction looks like following.

Machine Learning workflow using Amazon SageMaker

The final step in this data pipeline is to make the processed data available in a Jupyter notebook instance of the Amazon SageMaker. Jupyter notebooks are popularly used among data scientists to do exploratory data analysis, build and train machine learning models.

Create Notebook Instance in Amazon SageMaker

Step1: In the Amazon SageMaker console choose Create notebook instance.

Step2: In the Notebook Instance settings populate the Notebook instance name, choose an instance type depends on data size, and a role for the notebook instances in Amazon SageMaker to interact with Amazon S3. The SageMaker execution role needs to have the required permission to Athena, the S3 buckets where the data resides, and KMS if encrypted.

Step3: Wait for the Notebook instances to be created and the Status to change to InService.

Step4: Choose the Open Jupyter, which will open the notebook interface in a new browser tab.

Click new to create a new notebook in Jupyter. Amazon SageMaker provides several kernels for Jupyter including support for Python 2 and 3, MXNet, TensorFlow, and PySpark. Choose Python as the kernel for this exercise as it comes with the Pandas library built in.

Step5: Within the notebook, execute the following commands to install the Athena JDBC driver. PyAthena is a Python DB API 2.0 (PEP 249) compliant client for the Amazon Athena JDBC driver.

import sys
!{sys.executable} -m pip install PyAthena

Step6: After the Athena driver is installed, you can use the JDBC connection to connect to Athena and populate the Pandas data frames. For data scientists, working with data is typically divided into multiple stages: munging and cleaning data, analyzing/ modeling it, then organizing the results of the analysis into a form suitable for plotting or tabular display. Pandas is the ideal tool for all of these tasks.

from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
               region_name='REGION, for example, us-east-1')

df = pd.read_sql("SELECT * FROM <DATABASE>.<TABLENAME> limit 10;", conn)
df

As shown above, the dataframe always stays consistent with the latest incoming data because of the data engineering pipeline setup earlier in the ML workflow. This dataframe can be used for downstream ad-hoc model building purposes or for exploratory data analysis.

That’s it folks. Thanks for the read.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Setting Up a Data Lake on AWS Cloud Using LakeFormation

Setting up a Data Lake involves multiple steps such as collecting, cleansing, moving, and cataloging data, and then securely making that data available for downstream analytics and Machine Learning. AWS LakeFormation simplifies these processes and also automates certain processes like data ingestion. In this post, we shall be learning how to build a very simple data lake using LakeFormation with hypothetical retail sales data.

AWS Lake Formation provides its own permissions model that augments the AWS IAM permissions model. This centrally defined permissions model enables fine-grained access to data stored in data lake through a simple grant/revoke mechanism. These permissions are enforced at the table and column level on the data catalogue and are mapped to the underlying objects in S3. LakeFormation permissions are applicable across the full portfolio of AWS analytics and Machine Learning services, including Amazon Athena and Amazon Redshift.

So, let’s get on with the setup.

Adding an administrator

First and foremost step in using LakeFormation is to create an administrator. An administrator has full access to LakeFormation system and initial access to data configuration and access permissions. 

After adding an administrator, navigate to the Dashboard using the sidebar. This illustrates the typical process of Data lake setup.

Register location

From Register and Ingest sub menu in the sidebar, If you wish to setup data ingestion, that is, import unprocessed/landing data, AWS LakeFormation comes with in-house Blueprints that one could use to build Workflows. These workflows could be scheduled as per the needs of the end-user. Sources of data for these workflows can be a JDBC source, log files and many more. Learn more about importing data using workflows here.

If your ingestion process doesn’t involve any of the above mentioned ways and writes directly to S3, it’s alright. Either way we end up registering that S3 location as one of the Data Lake locations.

Once created you shall see its listing in the Data Lake locations.

You could not only access this location from here but also set permission to objects stored in that path. If preferred, one could register lake locations precisely for each processing zone and set permissions accordingly. I registered it to the whole bucket.

I created 2 retail datasets (.csv), one with 20 records and the other with 5 records. I have uploaded one of the datasets (20 records) to S3 with raw/retail_sales prefix.

Creating a Database

Lake Formation internally uses the Glue Data Catalog, so it shows all the databases available. From the Data Catalog sub menu in the sidebar, navigate to Databases to create and manage all the databases. I created a database called merchandise with default permissions.

Once created, you shall see its listing, and also manage, grant/revoke permissions and view tables in that DB.

Creating Crawlers and ETL jobs

From the Register and Ingest sub menu in the sidebar, navigate to Crawlers, Jobs to create and manage all Glue related services. Lake Formation redirects to AWS Glue and internally uses it. I created a crawler to get the metadata for objects residing in raw zone.

After running this crawler manually, now raw data can be queried from Athena.

I created an ETL job to run a transformation on this raw table data. 

All it does is change the class type of purchase date, which is from string class to date class. Creates partitions while writing to refined zone in parquet format. These partitions are created from the processing date but not the purchase date.

retail-raw-refined ETL job python script:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import *

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "merchandise", table_name = "raw_retail_sales", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "merchandise", table_name = "raw_retail_sales", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("email_id", "string", "email_id", "string"), ("retailer_name", "string", "retailer_name", "string"), ("units_purchased", "long", "units_purchased", "long"), ("purchase_date", "string", "purchase_date", "date"), ("sale_id", "string", "sale_id", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]

#convert glue object to sparkDF
sparkDF = datasource0.toDF()
sparkDF = sparkDF.withColumn('purchase_date', unix_timestamp(sparkDF.purchase_date, 'dd/MM/yyyy').cast(TimestampType()))

applymapping1 = DynamicFrame.fromDF(sparkDF, glueContext,"datafields")
# applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("email_id", "string", "email_id", "string"), ("retailer_name", "string", "retailer_name", "string"), ("units_purchased", "long", "units_purchased", "long"), ("purchase_date", "string", "purchase_date", "date"), ("sale_id", "string", "sale_id", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-787/refined/retail_sales"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
now = datetime.datetime.now()
path = "s3://test-787/refined/retail_sales/"+'year='+str(now.year)+'/month='+str(now.month)+'/day='+str(now.day)+'/'
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": path}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

The lakeformation:GetDataAccess permission is needed for this job to work. I created a new policy named LakeFormationGetDataAccess and attached it to AWSGlueServiceRoleDefault role.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "lakeformation:GetDataAccess",
            "Resource": "*"
        }
    ]
}

After running the job manually, it will load new transformed data with partitions in the refined zone as specified in the job.

I created another crawler to get the metadata for these objects residing in refined zone.

After running this crawler manually, now refined data can be queried from Athena.

You could now see the newly added partition columns (year, month, day).

Let us add some new raw data and see how our ETL job process that delta difference.

We only want to process new data and old data is either moved to archive location or deleted from raw zone, whatever is preferred.

Run the ETL job again. See new files being added into refined zone.

Load new partitions using msck repair table query.

Note: Try creating another IAM user and as an administrator in the LakeFormation, give this user limited access to the tables, try querying using Athena. See if the permissions are working.

Pros and cons of LakeFormation

The UI is made simple, all under one roof. Most of the times, one needs to keep multiple tabs open and opening S3 locations is troublesome. This is made easy by register data lake locations feature, one not only can access these locations directly but also revoke/grant permissions of the objects residing there. 

Managing permissions on an Object level in S3 is a hectic process. But with LakeFormation permissions can be managed at the data catalog level. This enables one to grant/revoke permissions to users or roles on a table/column level. These permissions are internally mapped to underlying objects sitting in S3.

Though managing permissions, data ingestion workflow are made easy, but still most of the Glue processes like ETL, Crawler, ML specific transformations have to be setup manually.

This story is authored by Koushik. He is a software engineer and a keen data science and machine learning enthusiast.

Serverless Architecture for Lightening Fast Distributed File Transfer on AWS Data Lake

Today, we are very excited to share our insights on setting up a serverless architecture for setting up a lightening fast way* to copy large number of objects across multiple folders or partitions in an AWS data lake on S3. Typically in a data lake, data is kept across various zones depending on data lifecycle. For example, as the data arrives from source, it can be kept in the raw zone and then post processing moved to a processed zone, so that the lake is ready for the next influx of data. The rate of object transfer is a crucial factor, as it affects the overall efficiency of the data processing lifecycle in the data lake.

*In our tests, we copied more than 300K objects ranging from 1KB to 10GB in size from the raw zone into the processed zone. Compared to the best known tool for hyper fast file transfer on AWS called s3s3mirror, we were able to finish this transfer of about 24GB of data in about 50% less time. More details have been provided at the end of the post.

We created a lambda invoke architecture that copies files/objects concurrently. The below picture accurately depicts it.


OMS (Orchestrator-Master-Slave) Lambda Architecture

For example, If we have an S3 bucket with the following folder structure with the actual objects further contained within this hierarchy of folders, sub-folders and partitions.

S3 file structure

Let us look at how we can use OMS Architecture (Orchestrator-Master-Slave) to achieve hyper-fast distributed/concurrent file transfer. The above architecture can be divided into two halves, Orchestrator-Master, Master-Slave.

Orchestrator-Master

The Orchestrator simply invokes a Master Lambda for each folder. Each Master then iterates the objects in that folder (including all sub-folders and partitions) and invokes a Slave Lambda for each object to copy it to the destination.

Orchestrator-Master Lambda invoke

Let us look at the Orchestrator Lambda code.
Source-to-Destination-File-Transfer-Orchestrator:

import os
import boto3
import json
from datetime import datetime

client_lambda = boto3.client('lambda')
master_lambda = "Source-to-Destination-File-Transfer-Master"

folder_names = ["folder1", "folder2", "folder3", "folder4", "folder5", "folder6", "folder7", "folder8", "folder9"]

def lambda_handler(event, context):
    
    t = datetime.now()
    print("start-time",t)
    
    try:            
        for folder_name in folder_names:
            
            payload_data = {
              'folder_name': folder_name
            }                
        
            payload = json.dumps(payload_data)
            client_lambda.invoke(
                FunctionName = master_lambda,
                InvocationType = 'Event',
                LogType = 'None',
                Payload = payload
            )
            print(payload)
            
    except Exception as e:
        print(e)
        raise e

Master-Slave

Master-Slave Lambda invoke

Let us look at the Master Lambda code.
Source-to-Destination-File-Transfer-Master:

import os
import boto3
import json
from botocore.exceptions import ClientError

s3 = boto3.resource('s3')
client_lambda = boto3.client('lambda')

source_bucket_name = 'source bucket name'
source_bucket = s3.Bucket(source_bucket_name)

slave_lambda = "Source-to-Destination-File-Transfer-Slave"

def lambda_handler(event, context):

    try:
        source_prefix = "" #add if any
        source_prefix = source_prefix + "/" + event['table_name'] + "/"

        for obj in source_bucket.objects.filter(Prefix = source_prefix):
            path = obj.key
            payload_data = {
               'file_path': path
            }
            payload = json.dumps(payload_data)
            client_lambda.invoke(
                FunctionName = slave_lambda,
                InvocationType = 'Event',
                LogType = 'None',
                Payload = payload
            )

    except Exception as e:
        print(e)
        raise e

Slave

Let us look at the Slave Lambda code.
Source-to-Destination-File-Transfer-Slave:

import os
import boto3
import json
import re
from botocore.exceptions import ClientError

s3 = boto3.resource('s3')

source_prefix = "" #add if any
source_bucket_name = "source bucket name"
source_bucket = s3.Bucket(source_bucket_name )

destination_bucket_name = "destination bucket name"
destination_bucket = s3.Bucket(destination_bucket_name )

def lambda_handler(event, context):
    try:
        destination_prefix = "" #add if any
        
        source_obj = { 'Bucket': source_bucket_name, 'Key': event['file_path']}
        file_path = event['file_path']
        
        #copying file
        new_key = file_path.replace(source_prefix, destination_prefix)
        new_obj = source_bucket.Object(new_key)
        new_obj.copy(source_obj)
        
    except Exception as e:
        raise e

You must ensure that these Lambda functions have been configured to meet the maximum execution time and memory limit constraints as per your case. We tested by setting the upper limit of execution time as 5 minutes and 1GB of available memory.

Calculating the Rate of File Transfer

To calculate the rate of file transfer we are printing start time at the beginning of Orchestrator Lambda execution. Once the file transfer is complete, we use another lambda to extract the last modified date attribute of the last copied object.

Extract-Last-Modified:

import json
import boto3
from datetime import datetime
from dateutil import tz

s3 = boto3.resource('s3')

destination_bucket_name = "destination bucket name"
destination_bucket = s3.Bucket(destination_bucket_name)
destination_prefix = "" #add if any

def lambda_handler(event, context):
    
    #initializing with some old date
    last_modified_date = datetime(1940, 7, 4).replace(tzinfo = tz.tzlocal()) 

    for obj in my_bucket.objects.filter(Prefix = destination_prefix):
        
        obj_date = obj.last_modified.replace(tzinfo = tz.tzlocal())
        
        if last_modified_date < obj_date:
            last_modified_date = obj_date
    
    print("end-time: ", last_modified_date)

Now we have both start-time from Orchestrator Lambda and end-time from Extract-last-modified Lambda, their difference is the time taken for file transfer.

Before writing this post, we copied 24.1GB of objects using the above architecture, results are shown in the following screenshots:

duration	=	end-time - start-time
		=	10:04:49 - 10:03:28
		=	00:01:21 (hh-mm-ss)

To check the efficiency of our OMS Architecture, we compared the results of OMS with s3s3mirror, a utility for mirroring content from one S3 bucket to another or to/from the local filesystem. Below screenshot has the file transfer stats of s3s3 for the same set of files:

As we see the difference was 1 minutes and 8 seconds for total data transfer of about 24GB, it can be much higher for large data sets if we add more optimizations. I have only shared a generalized view of the OMS Architecture, it can be further fine-tuned to specific needs and get a highly optimized performance. For instance, if you have partitions in each folder and the OMS Architecture could yield much better results if you invoke Master Lambda for each partition inside the folder instead of invoking the master just at the folder level.

Thanks for the read. Looking forward to your thoughts.

This story is co-authored by Koushik and Subbareddy. Koushik is a software engineer and a keen data science and machine learning enthusiast. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.