Real Time Face Identification on Live Camera Feed using Amazon Rekognition Video and Kinesis Video Streams

In this post, we are going to learn how to perform facial analysis on live feed by setting up a serverless video analytics architecture using Amazon Rekognition Video and Amazon Kinesis Video Streams.

Use cases:

  1. Intruder notification system
  2. Employee sign-in system

Architecture overview

Real time face recognition using AWS on a live video stream

We shall learn how to use the webcam of a laptop (we can, of course, use professional grade cameras and hook it up with Kinesis Video streams for a production ready system) to send a live video feed to the  Amazon Kinesis Video Stream. The stream processor in Amazon Rekognition Video picks up this webcam feed and analyses by comparing this feed with the faces in the face collection created beforehand. This analysis is written to the Amazon Kinesis Data Stream. For each record written to the Kinesis data stream, the lambda function is invoked. This lambda reads the record from kinesis stream data. If there are any facial matches or mismatches, depending upon how the lambda is configured an email notification is sent via Amazon SNS (Simple Notification Service) to the registered email addresses.

Note: Make sure all the above AWS resources are created in the same region.

Before going further, make sure you already have the AWS CLI configured on your machine. If not, follow this link –  https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html

Step-1: Create Kinesis Video Stream

We need to create the Kinesis Video Stream for our webcam to connect and send the feed to Kinesis Video Streams from AWS console. Create a new one by clicking the Create button. Give the stream a name in stream configuration.

We have successfully created a Kinesis video stream.

Step-2: Send Live Feed to the Kinesis Video Stream

We need a video producer, anything that sends media data to the Kinesis video stream is called a producer. AWS currently provides producer library primarily supported in these four languages(JAVA, Android, C++, C) only. We use the C++ Producer Library as a GStreamer plugin.

To easily send media from a variety of devices on a variety of operating systems, this tutorial uses GStreamer, an open-source media framework that standardizes access to cameras and other media sources.

Download the Amazon Kinesis Video Streams Producer SDK from Github using the following Git command:

git clone https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-cpp 

After downloading successfully you can compile and install the GStreamer sample in the kinesis-video-native-build directory using the following commands:

For Ubuntu – run the following commands

sudo apt-get update

sudo apt-get install libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev gstreamer1.0-plugins-base-apps

sudo apt-get install gstreamer1.0-plugins-bad gstreamer1.0-plugins-good gstreamer1.0-plugins-ugly gstreamer1.0-tools

For Windows – run the following commands

Inside mingw32 or mingw64 shell, go to kinesis-video-native-build directory and run ./min-install-script 

For macOS – run the following commands

Install homebrew

Run brew install pkg-config openssl cmake gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly log4cplus

Go to kinesis-video-native-build directory and run ./min-install-script

Running the GStreamer webcam sample application:
The sample application kinesis_video_gstreamer_sample_app in the kinesis-video-native-build directory uses GStreamer pipeline to get video data from the camera. Launch it with the kinesis video stream name created in Step-1 and it will start streaming from the camera.

AWS_ACCESS_KEY_ID=<YourAccessKeyId> AWS_SECRET_ACCESS_KEY=<YourSecretAccessKey> ./kinesis_video_gstreamer_sample_app <stream_name>

After running the above command, the streaming would start and you can see the live feed in Media preview of Kinesis Video Stream created in the Step-1.

Step-3: Creating Resources using AWS CloudFormation Stack

The CloudFormation stack will create the resources that are highlighted in the following image.

Highlighted resources to be created using CloudFormation
  1. The following link will automatically open a new CloudFormation Stack in us-west-2: CloudFormation Stack.
  2. Go to View in Designer and edit the template code and click on create stack.
    1. Change the name of the application from Default.
    2. Change the Nodejs version of the RekognitionVideoLambda to 10.x (Version 6.10 isn’t supported anymore) and then click on create stack.
editing CloudFormation stack template

Enter your Email Address to receive notifications and then choose Next as shown below:

Skip the Configure stack options step by choosing Next. In the final step you must check the checkbox and then choose Create stack as in the below image:

Once the stack created successfully you can see it in your stacks as below with status CREATE_COMPLETE. This creates the required resources.

Once completed, you will receive a confirmation email to subscribe/receive notifications. Make sure you subscribed to that.

snapshot from subscribe confirmation email

Step-4: Add face to a Collection

As learned earlier the Stream Processor in Amazon Rekognition Video picks up and analyzes the feed coming from Kinesis Video Stream by comparing it with the faces in the face collection. Let’s create this collection.

For more information follow the link – https://docs.aws.amazon.com/rekognition/latest/dg/create-collection-procedure.html

Create Collection: Create a face collection using the AWS CLI on the command line with below command:

aws rekognition create-collection --collection-id <Collection_Name> --region us-west-2

Add face(s) to the collection: You can use your picture for testing.
First we need to upload the image(s) to an Amazon S3 bucket in the us-west-2 region. Run the below command by replacing BUCKET_NAME, FILE_NAME with your details and you can give any name for FACE-TAG ( can be name of the person).

aws rekognition index-faces --image '{"S3Object":{"Bucket":"<BUCKET_NAME>","Name":"<FILE_NAME>.jpeg"}}' --collection-id "rekVideoBlog" --detection-attributes "ALL" --external-image-id "<FACE-TAG>" --region us-west-2

After that you will receive a response as output as shown below

Now we all ready to go further. If you have multiple photos run the above command again with the new file name.

Step-5: Creating the Stream Processor

We need to create a stream processor which does all the work of reading video stream, facial analysis against face collection and finally writing the analysis data to Kinesis data stream. It contains information about the Kinesis data stream, Kinesis video stream, Face collection ID and the role that is used by Rekognition to access Kinesis Video Stream.

Copy meta info required:
Go to your Kinesis video stream created in step-1, note down the Stream ARN (KINESIS_VIDEO_STREAM_ARN) from the stream info as shown below:

Next from the CloudFormation stack output note down the values of KinesisDataStreamArn (KINESIS_DATA_STREAM_ARN) and RecognitionVideoIAM (IAM_ROLE_ARN) as shown below:

Now create a JSON file in your system that contains the following information:

{
       "Name": "streamProcessorForRekognitionVideoBlog",
       "Input": {
              "KinesisVideoStream": {
                     "Arn": "<KINESIS_VIDEO_STREAM_ARN>"
              }
       },
       "Output": {
              "KinesisDataStream": {
                     "Arn": "<KINESIS_DATA_STREAM_ARN>"
              }
       },
       "RoleArn": "<IAM_ROLE_ARN>",
       "Settings": {
              "FaceSearch": {
                     "CollectionId": "COLLECTION_NAME",
                     "FaceMatchThreshold": 85.5
              }
       }
}

Create the stream processor with AWS CLI from command line with following command:

aws rekognition create-stream-processor --region us-west-2 --cli-input-json file://<PATH_TO_JSON_FILE_ABOVE>

Now start the stream processor with following command:

aws rekognition start-stream-processor --name streamProcessorForRekognitionVideoBlog --region us-west-2

You can see if the stream processor is in a running state with following command:

aws rekognition list-stream-processors --region us-west-2

If it’s running, you will see below response:

And also make sure your camera is streaming in command line and feed is being received by the Kinesis video stream.

Now you would get notified whenever known or an unknown person shows up in your webcam.

Example notification:

Note: 

  1. Lambda can be configured to send email notification only if unknown faces are detected or vice versa.
  2. For cost optimization, one could use a python script to call Amazon Rekognition service with a snapshot only when a person is detected instead of wasting resources in uninhabited/unpeopled area.

Step-6: Cleaning Up Once Done

Stop the stream processor.

aws rekognition stop-stream-processor --name streamProcessorForRekognitionVideoBlog --region us-west-2

Delete the stream processor.

aws rekognition delete-stream-processor --name streamProcessorForRekognitionVideoBlog --region us-west-2

Delete the Kinesis Video Stream. Go to the Kinesis video streams from AWS console and select your stream and Delete.

Delete the CloudFormation stack. Go to the CloudFormation, then select stacks from AWS console and select your stack and Delete.

Thanks for the read! I hope it was both fun and useful.

This story is co-authored by Venu Vaka and Koushik Busim. Venu is a software engineer and machine learning enthusiast. Koushik is a software engineer and a keen data science and machine learning enthusiast.

Setting Up a Data Lake on AWS Cloud Using LakeFormation

Setting up a Data Lake involves multiple steps such as collecting, cleansing, moving, and cataloging data, and then securely making that data available for downstream analytics and Machine Learning. AWS LakeFormation simplifies these processes and also automates certain processes like data ingestion. In this post, we shall be learning how to build a very simple data lake using LakeFormation with hypothetical retail sales data.

AWS Lake Formation provides its own permissions model that augments the AWS IAM permissions model. This centrally defined permissions model enables fine-grained access to data stored in data lake through a simple grant/revoke mechanism. These permissions are enforced at the table and column level on the data catalogue and are mapped to the underlying objects in S3. LakeFormation permissions are applicable across the full portfolio of AWS analytics and Machine Learning services, including Amazon Athena and Amazon Redshift.

So, let’s get on with the setup.

Adding an administrator

First and foremost step in using LakeFormation is to create an administrator. An administrator has full access to LakeFormation system and initial access to data configuration and access permissions. 

After adding an administrator, navigate to the Dashboard using the sidebar. This illustrates the typical process of Data lake setup.

Register location

From Register and Ingest sub menu in the sidebar, If you wish to setup data ingestion, that is, import unprocessed/landing data, AWS LakeFormation comes with in-house Blueprints that one could use to build Workflows. These workflows could be scheduled as per the needs of the end-user. Sources of data for these workflows can be a JDBC source, log files and many more. Learn more about importing data using workflows here.

If your ingestion process doesn’t involve any of the above mentioned ways and writes directly to S3, it’s alright. Either way we end up registering that S3 location as one of the Data Lake locations.

Once created you shall see its listing in the Data Lake locations.

You could not only access this location from here but also set permission to objects stored in that path. If preferred, one could register lake locations precisely for each processing zone and set permissions accordingly. I registered it to the whole bucket.

I created 2 retail datasets (.csv), one with 20 records and the other with 5 records. I have uploaded one of the datasets (20 records) to S3 with raw/retail_sales prefix.

Creating a Database

Lake Formation internally uses the Glue Data Catalog, so it shows all the databases available. From the Data Catalog sub menu in the sidebar, navigate to Databases to create and manage all the databases. I created a database called merchandise with default permissions.

Once created, you shall see its listing, and also manage, grant/revoke permissions and view tables in that DB.

Creating Crawlers and ETL jobs

From the Register and Ingest sub menu in the sidebar, navigate to Crawlers, Jobs to create and manage all Glue related services. Lake Formation redirects to AWS Glue and internally uses it. I created a crawler to get the metadata for objects residing in raw zone.

After running this crawler manually, now raw data can be queried from Athena.

I created an ETL job to run a transformation on this raw table data. 

All it does is change the class type of purchase date, which is from string class to date class. Creates partitions while writing to refined zone in parquet format. These partitions are created from the processing date but not the purchase date.

retail-raw-refined ETL job python script:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import *

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "merchandise", table_name = "raw_retail_sales", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "merchandise", table_name = "raw_retail_sales", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("email_id", "string", "email_id", "string"), ("retailer_name", "string", "retailer_name", "string"), ("units_purchased", "long", "units_purchased", "long"), ("purchase_date", "string", "purchase_date", "date"), ("sale_id", "string", "sale_id", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]

#convert glue object to sparkDF
sparkDF = datasource0.toDF()
sparkDF = sparkDF.withColumn('purchase_date', unix_timestamp(sparkDF.purchase_date, 'dd/MM/yyyy').cast(TimestampType()))

applymapping1 = DynamicFrame.fromDF(sparkDF, glueContext,"datafields")
# applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("email_id", "string", "email_id", "string"), ("retailer_name", "string", "retailer_name", "string"), ("units_purchased", "long", "units_purchased", "long"), ("purchase_date", "string", "purchase_date", "date"), ("sale_id", "string", "sale_id", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-787/refined/retail_sales"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
now = datetime.datetime.now()
path = "s3://test-787/refined/retail_sales/"+'year='+str(now.year)+'/month='+str(now.month)+'/day='+str(now.day)+'/'
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": path}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

The lakeformation:GetDataAccess permission is needed for this job to work. I created a new policy named LakeFormationGetDataAccess and attached it to AWSGlueServiceRoleDefault role.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "lakeformation:GetDataAccess",
            "Resource": "*"
        }
    ]
}

After running the job manually, it will load new transformed data with partitions in the refined zone as specified in the job.

I created another crawler to get the metadata for these objects residing in refined zone.

After running this crawler manually, now refined data can be queried from Athena.

You could now see the newly added partition columns (year, month, day).

Let us add some new raw data and see how our ETL job process that delta difference.

We only want to process new data and old data is either moved to archive location or deleted from raw zone, whatever is preferred.

Run the ETL job again. See new files being added into refined zone.

Load new partitions using msck repair table query.

Note: Try creating another IAM user and as an administrator in the LakeFormation, give this user limited access to the tables, try querying using Athena. See if the permissions are working.

Pros and cons of LakeFormation

The UI is made simple, all under one roof. Most of the times, one needs to keep multiple tabs open and opening S3 locations is troublesome. This is made easy by register data lake locations feature, one not only can access these locations directly but also revoke/grant permissions of the objects residing there. 

Managing permissions on an Object level in S3 is a hectic process. But with LakeFormation permissions can be managed at the data catalog level. This enables one to grant/revoke permissions to users or roles on a table/column level. These permissions are internally mapped to underlying objects sitting in S3.

Though managing permissions, data ingestion workflow are made easy, but still most of the Glue processes like ETL, Crawler, ML specific transformations have to be setup manually.

This story is authored by Koushik Busim. Koushik is a software engineer and a keen data science and machine learning enthusiast.

Serverless Architecture for Lightening Fast Distributed File Transfer on AWS Data Lake

Today, we are very excited to share our insights on setting up a serverless architecture for setting up a lightening fast way* to copy large number of objects across multiple folders or partitions in an AWS data lake on S3. Typically in a data lake, data is kept across various zones depending on data lifecycle. For example, as the data arrives from source, it can be kept in the raw zone and then post processing moved to a processed zone, so that the lake is ready for the next influx of data. The rate of object transfer is a crucial factor, as it affects the overall efficiency of the data processing lifecycle in the data lake.

*In our tests, we copied more than 300K objects ranging from 1KB to 10GB in size from the raw zone into the processed zone. Compared to the best known tool for hyper fast file transfer on AWS called s3s3mirror, we were able to finish this transfer of about 24GB of data in about 50% less time. More details have been provided at the end of the post.

We created a lambda invoke architecture that copies files/objects concurrently. The below picture accurately depicts it.


OMS (Orchestrator-Master-Slave) Lambda Architecture

For example, If we have an S3 bucket with the following folder structure with the actual objects further contained within this hierarchy of folders, sub-folders and partitions.

S3 file structure

Let us look at how we can use OMS Architecture (Orchestrator-Master-Slave) to achieve hyper-fast distributed/concurrent file transfer. The above architecture can be divided into two halves, Orchestrator-Master, Master-Slave.

Orchestrator-Master

The Orchestrator simply invokes a Master Lambda for each folder. Each Master then iterates the objects in that folder (including all sub-folders and partitions) and invokes a Slave Lambda for each object to copy it to the destination.

Orchestrator-Master Lambda invoke

Let us look at the Orchestrator Lambda code.
Source-to-Destination-File-Transfer-Orchestrator:

import os
import boto3
import json
from datetime import datetime

client_lambda = boto3.client('lambda')
master_lambda = "Source-to-Destination-File-Transfer-Master"

folder_names = ["folder1", "folder2", "folder3", "folder4", "folder5", "folder6", "folder7", "folder8", "folder9"]

def lambda_handler(event, context):
    
    t = datetime.now()
    print("start-time",t)
    
    try:            
        for folder_name in folder_names:
            
            payload_data = {
              'folder_name': folder_name
            }                
        
            payload = json.dumps(payload_data)
            client_lambda.invoke(
                FunctionName = master_lambda,
                InvocationType = 'Event',
                LogType = 'None',
                Payload = payload
            )
            print(payload)
            
    except Exception as e:
        print(e)
        raise e

Master-Slave

Master-Slave Lambda invoke

Let us look at the Master Lambda code.
Source-to-Destination-File-Transfer-Master:

import os
import boto3
import json
from botocore.exceptions import ClientError

s3 = boto3.resource('s3')
client_lambda = boto3.client('lambda')

source_bucket_name = 'source bucket name'
source_bucket = s3.Bucket(source_bucket_name)

slave_lambda = "Source-to-Destination-File-Transfer-Slave"

def lambda_handler(event, context):

    try:
        source_prefix = "" #add if any
        source_prefix = source_prefix + "/" + event['table_name'] + "/"

        for obj in source_bucket.objects.filter(Prefix = source_prefix):
            path = obj.key
            payload_data = {
               'file_path': path
            }
            payload = json.dumps(payload_data)
            client_lambda.invoke(
                FunctionName = slave_lambda,
                InvocationType = 'Event',
                LogType = 'None',
                Payload = payload
            )

    except Exception as e:
        print(e)
        raise e

Slave

Let us look at the Slave Lambda code.
Source-to-Destination-File-Transfer-Slave:

import os
import boto3
import json
import re
from botocore.exceptions import ClientError

s3 = boto3.resource('s3')

source_prefix = "" #add if any
source_bucket_name = "source bucket name"
source_bucket = s3.Bucket(source_bucket_name )

destination_bucket_name = "destination bucket name"
destination_bucket = s3.Bucket(destination_bucket_name )

def lambda_handler(event, context):
    try:
        destination_prefix = "" #add if any
        
        source_obj = { 'Bucket': source_bucket_name, 'Key': event['file_path']}
        file_path = event['file_path']
        
        #copying file
        new_key = file_path.replace(source_prefix, destination_prefix)
        new_obj = source_bucket.Object(new_key)
        new_obj.copy(source_obj)
        
    except Exception as e:
        raise e

You must ensure that these Lambda functions have been configured to meet the maximum execution time and memory limit constraints as per your case. We tested by setting the upper limit of execution time as 5 minutes and 1GB of available memory.

Calculating the Rate of File Transfer

To calculate the rate of file transfer we are printing start time at the beginning of Orchestrator Lambda execution. Once the file transfer is complete, we use another lambda to extract the last modified date attribute of the last copied object.

Extract-Last-Modified:

import json
import boto3
from datetime import datetime
from dateutil import tz

s3 = boto3.resource('s3')

destination_bucket_name = "destination bucket name"
destination_bucket = s3.Bucket(destination_bucket_name)
destination_prefix = "" #add if any

def lambda_handler(event, context):
    
    #initializing with some old date
    last_modified_date = datetime(1940, 7, 4).replace(tzinfo = tz.tzlocal()) 

    for obj in my_bucket.objects.filter(Prefix = destination_prefix):
        
        obj_date = obj.last_modified.replace(tzinfo = tz.tzlocal())
        
        if last_modified_date < obj_date:
            last_modified_date = obj_date
    
    print("end-time: ", last_modified_date)

Now we have both start-time from Orchestrator Lambda and end-time from Extract-last-modified Lambda, their difference is the time taken for file transfer.

Before writing this post, we copied 24.1GB of objects using the above architecture, results are shown in the following screenshots:

duration	=	end-time - start-time
		=	10:04:49 - 10:03:28
		=	00:01:21 (hh-mm-ss)

To check the efficiency of our OMS Architecture, we compared the results of OMS with s3s3mirror, a utility for mirroring content from one S3 bucket to another or to/from the local filesystem. Below screenshot has the file transfer stats of s3s3 for the same set of files:

As we see the difference was 1 minutes and 8 seconds for total data transfer of about 24GB, it can be much higher for large data sets if we add more optimizations. I have only shared a generalized view of the OMS Architecture, it can be further fine-tuned to specific needs and get a highly optimized performance. For instance, if you have partitions in each folder and the OMS Architecture could yield much better results if you invoke Master Lambda for each partition inside the folder instead of invoking the master just at the folder level.

Thanks for the read. Looking forward to your thoughts.

This story is co-authored by Koushik and Subbareddy. Koushik is a software engineer and a keen data science and machine learning enthusiast. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.