Machine Learning Operations (MLOps) Pipeline using Google Cloud Composer

In an earlier post, we had described the need for automating the Data Engineering pipeline for Machine Learning based systems. Today, we will expand the scope to setup a fully automated MLOps pipeline using Google Cloud Composer.

Cloud Composer

Cloud Composer is official defined as a fully managed workflow orchestration service that empowers you to author, schedule, and monitor pipelines that span across clouds and on-premises data centers. Built on the popular Apache Airflow open source project and operated using the Python programming language, Cloud Composer is free from lock-in and easy to use.

So let’s get on with the required steps to create this MLOps infrastructure on Google Cloud Platform

Creating a Cloud Composer Environment

Step1: Please enable the Cloud Composer API.

Step2: Go to create environment page in GCP console. Composer is available in Big Data section.

Step3: Click on create to start creating a Composer environment

Step4: Please select the Service account which has the required permissions to access GCS, Big Query, ML Engine and  Composer environment. The required roles for accessing Composer environment is Composer Administrator and Composer Worker. 
For more details about access control in Composer environment please see this.

Step5: Please use Python Version 3 and latest Image version.

Step6: Click on create. It will take about 15-20 minutes to create the environment. Once it completes, the environment page shall look like the following.

Click on Airflow to see Airflow WebUI. The Airflow WebUI looks as follows

DAGs folder is where our dag file is stored. DAG folder is nothing but a folder inside a GCS bucket which is created by the environment. To know more about the concept of DAG and general introduction to Airflow, please refer to this post.

You could see Composer related logs in Logging.

Step7: Please add the following PyPI packages in Composer environment.

Click on created environment and navigate to PYPI packages and click on edit to add packages

The required packages are:

# to read data from MongoDB
pymongo==3.8.0
oauth2client==4.1.3
# to read data from firestore
google-cloud-firestore==1.3.0
firebase-admin==2.17.0
google-api-core==1.13.0

Create a ML model

Step1: Please create a folder structure like the following on your instance.

ml_model
├── setup.py
└── trainer
    ├── __init__.py
    └── train.py

Step2: Please place the following code in train.py file, which shall upload the model to GCS bucket as shown below. This model would be used to create model versions as explained a bit later.

from google.cloud import bigquery
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
import numpy as np
from google.cloud import storage
import datetime
import json
import pickle
client = bigquery.Client()
sql = '''
SELECT *
FROM `<PROJECT_ID>.<DATASET>.<TABLENAME>`
'''

df = client.query(sql).to_dataframe()
df = df[['is_stressed', 'is_engaged', 'status']]

df['is_stressed'] = df['is_stressed'].fillna('n')
df['is_engaged'] = df['is_engaged'].fillna('n')
df['stressed'] = np.where(df['is_stressed']=='y', 1, 0)
df['engaged'] = np.where(df['is_engaged']=='y', 1, 0)
df['status'] = np.where(df['status']=='complete', 1, 0)

feature_cols = ['stressed', 'engaged']
X = df[feature_cols]
y = df.status
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.25,random_state=0)
logreg = LogisticRegression()
logreg.fit(X_train,y_train)
pkl_filename = "model.pkl"  
with open(pkl_filename, 'wb') as file:  
    pickle.dump(logreg, file)
BUCKET_NAME=BUCKET_NAME# Upload the model to GCS
bucket = storage.Client().bucket(BUCKET_NAME)
file_path = datetime.datetime.now().strftime('machine_learning/models/%Y%m%d_%H%M%S')
blob = bucket.blob('{}/{}'.format(
    file_path,
    pkl_filename))
blob.upload_from_filename(pkl_filename)

file_location = 'gs://{BUCKET_NAME}/{file_path}'.format(BUCKET_NAME=BUCKET_NAME, file_path=file_path)
file_config = json.dumps({'file_location': file_location})

bucket = storage.Client().bucket(COMPOSER_BUCKET)
blob = bucket.blob('data/file_config.json')
blob.upload_from_string(file_config)

Step3: Create an empty init.py file inside the trainer directory.

Step4: Please place the following code in setup.py file. The setup.py file contains required packages to execute code.

import setuptools

REQUIRED_PACKAGES = [
    'pandas-gbq==0.3.0',
    'cloudml-hypertune',
    'google-cloud-bigquery==1.14.0',
    'urllib3'
]

setuptools.setup(
    name='ml_model',
    version='1.0',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
    description='',
)

Step5: Packaging the code using the following command. It creates a gz file inside ml_model directory.

python3 setup.py sdist

Step6: The package name is the name that is specified in setup.py file. The package name becomes ml_model-1.0.tar.gz
Copy the package to gs://{your-GCS-bucket}/machine_learning/. This becomes the base directory for your machine learning activities described in this post.

Creating a DAG

In this use case, we have created a DAG file which exports some table data from a MongoDB instance into a GCS bucket and then creates a BigQuery table off of that exported data. It trains a model and creates version for that model. The DAG file supports full data extraction and daily data extraction explained in the code below using a variable tot_data. This variable is extracted from Airflow configurations set by the user. This process is also described later in this post.

Please place the following code in the DAG file.

import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
from airflow.operators.python_operator import PythonOperator
import pprint
import json
import re

from pymongo import MongoClient
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import storage
import os

from airflow import models
from mlengine_operator import MLEngineTrainingOperator, MLEngineVersionOperator

ts = datetime.now()
today = str(ts.date()) + 'T00:00:00.000Z'
yester_day = str(ts.date() - timedelta(days = 1)) + 'T00:00:00.000Z'

str_ts = ts.strftime('%Y_%m_%d_%H_%m_%S')

config = Variable.get("mongo_conf", deserialize_json=True)
host = config['host']
db_name = config['db_name']
table_name = config['table_name']
file_prefix = config['file_prefix']
bucket_name = config['bucket_name']
# file_path = file_prefix + '/' + table_name + '.json'
file_path = '{file_prefix}/{table_name}/{table_name}_{str_ts}.json'.format(file_prefix=file_prefix, str_ts=str_ts, table_name=table_name)
file_location = 'gs://' + bucket_name + '/' + file_prefix + '/' + table_name + '/' + table_name + '_*.json'
config['file_location'] = file_location
bq_dataset = config['bq_dataset']
tot_data = config['tot_data'].lower()

BUCKET_NAME = config['ml_configuration']['BUCKET_NAME']
BASE_DIR = config['ml_configuration']['BASE_DIR']
PACKAGE_NAME = config['ml_configuration']['PACKAGE_NAME']
TRAINER_BIN = os.path.join(BASE_DIR, 'packages', PACKAGE_NAME)
TRAINER_MODULE = config['ml_configuration']['TRAINER_MODULE']
RUNTIME_VERSION = config['ml_configuration']['RUNTIME_VERSION']
PROJECT_ID = config['ml_configuration']['PROJECT_ID']
MODEL_NAME = config['ml_configuration']['MODEL_NAME']

MODEL_FILE_BUCKET = config['ml_configuration']['MODEL_FILE_BUCKET']
model_file_loc = config['ml_configuration']['MODEL_FILE_LOCATION']

bucket = storage.Client().bucket(MODEL_FILE_BUCKET)
blob = bucket.get_blob(model_file_loc)
file_config = json.loads(blob.download_as_string().decode("utf-8"))
export_uri = file_config['file_location']

def flatten_json(y):
    out = {}

    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

def mongoexport():
        client = storage.Client()
        bucket = client.get_bucket(bucket_name)
        blob = bucket.blob(file_path)

        client = MongoClient(host)
        db = client[db_name]
        tasks = db[table_name]
        pprint.pprint(tasks.count_documents({}))
        # if tot_data is set to 'yes' in airflow configurations, full data 
        # is processed.  
        if tot_data == 'no':
          query = {"edit_datetime": { "$gte": yester_day, "$lt": today}}
          print(query)
          data = tasks.find(query)
        else:
          data = tasks.find()
        emp_list = []
        for record in data:
                emp_list.append(json.dumps(record, default=str))
        flat_list =[]
        for data in emp_list:
                flat_list.append((flatten_json(json.loads(data))))
        data = '\n'.join(json.dumps({re.sub('[^0-9a-zA-Z_ ]+', '', str(k)).lower().replace(' ', '_'): str(v) for k, v in record.items()}) for record in flat_list)
        blob.upload_from_string(data)

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG('ml_pipeline', schedule_interval=None, default_args=default_args) as dag:

    # priority_weight has type int in Airflow DB, uses the maximum.
    pymongo_export_op = PythonOperator(
        task_id='pymongo_export',
        python_callable=mongoexport,
        )

    update_bq_table_op = BashOperator(
        task_id='update_bq_table',
        bash_command='''
        bq rm -f {bq_dataset}.{table_name}
        bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON --ignore_unknown_values=True {bq_dataset}.{table_name} {file_location}
        '''.format(bq_dataset=bq_dataset, table_name=table_name, file_location=file_location)
        )

    date_nospecial = '{{ execution_date.strftime("%Y%m%d") }}'
    date_min_nospecial = '{{ execution_date.strftime("%Y%m%d_%H%m") }}'
    uuid = '{{ macros.uuid.uuid4().hex[:8] }}'

    training_op = MLEngineTrainingOperator(
      task_id='submit_job_for_training',
      project_id=PROJECT_ID,
      job_id='{}_{}_{}'.format(table_name, date_nospecial, uuid),
      package_uris=[os.path.join(TRAINER_BIN)],
      training_python_module=TRAINER_MODULE,
      training_args=[
          '--base-dir={}'.format(BASE_DIR),
          '--event-date={}'.format(date_nospecial),
      ],
      region='us-central1',
      runtime_version=RUNTIME_VERSION,
      python_version='3.5')

    create_version_op = MLEngineVersionOperator(
      task_id='create_version',
      project_id=PROJECT_ID,
      model_name=MODEL_NAME,
      version={
          'name': 'version_{}_{}'.format(date_min_nospecial, uuid),
          'deploymentUri': export_uri,
          'runtimeVersion': RUNTIME_VERSION,
          'pythonVersion': '3.5',
          'framework': 'SCIKIT_LEARN',
      },
      operation='create')

    pymongo_export_op >> update_bq_table_op >> training_op >> create_version_op

Once file is created, please upload the file to DAGs folder. And also please add the following plugin dependency file named mlengine_operator in DAGs folder.
Place the following code in mlengine_operator.py file.

import re

from apiclient import errors

from airflow.contrib.hooks.gcp_mlengine_hook import MLEngineHook
from airflow.exceptions import AirflowException
from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log


def _normalize_mlengine_job_id(job_id):

    # Add a prefix when a job_id starts with a digit or a template
    match = re.search(r'\d|\{{2}', job_id)
    if match and match.start() is 0:
        job = 'z_{}'.format(job_id)
    else:
        job = job_id

    # Clean up 'bad' characters except templates
    tracker = 0
    cleansed_job_id = ''
    for m in re.finditer(r'\{{2}.+?\}{2}', job):
        cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_',
                                  job[tracker:m.start()])
        cleansed_job_id += job[m.start():m.end()]
        tracker = m.end()

    # Clean up last substring or the full string if no templates
    cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_', job[tracker:])

    return cleansed_job_id


class MLEngineBatchPredictionOperator(BaseOperator):
   
    template_fields = [
        '_project_id',
        '_job_id',
        '_region',
        '_input_paths',
        '_output_path',
        '_model_name',
        '_version_name',
        '_uri',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 job_id,
                 region,
                 data_format,
                 input_paths,
                 output_path,
                 model_name=None,
                 version_name=None,
                 uri=None,
                 max_worker_count=None,
                 runtime_version=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):
        super(MLEngineBatchPredictionOperator, self).__init__(*args, **kwargs)

        self._project_id = project_id
        self._job_id = job_id
        self._region = region
        self._data_format = data_format
        self._input_paths = input_paths
        self._output_path = output_path
        self._model_name = model_name
        self._version_name = version_name
        self._uri = uri
        self._max_worker_count = max_worker_count
        self._runtime_version = runtime_version
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

        if not self._project_id:
            raise AirflowException('Google Cloud project id is required.')
        if not self._job_id:
            raise AirflowException(
                'An unique job id is required for Google MLEngine prediction '
                'job.')

        if self._uri:
            if self._model_name or self._version_name:
                raise AirflowException('Ambiguous model origin: Both uri and '
                                       'model/version name are provided.')

        if self._version_name and not self._model_name:
            raise AirflowException(
                'Missing model: Batch prediction expects '
                'a model name when a version name is provided.')

        if not (self._uri or self._model_name):
            raise AirflowException(
                'Missing model origin: Batch prediction expects a model, '
                'a model & version combination, or a URI to a savedModel.')

    def execute(self, context):
        job_id = _normalize_mlengine_job_id(self._job_id)
        prediction_request = {
            'jobId': job_id,
            'predictionInput': {
                'dataFormat': self._data_format,
                'inputPaths': self._input_paths,
                'outputPath': self._output_path,
                'region': self._region
            }
        }

        if self._uri:
            prediction_request['predictionInput']['uri'] = self._uri
        elif self._model_name:
            origin_name = 'projects/{}/models/{}'.format(
                self._project_id, self._model_name)
            if not self._version_name:
                prediction_request['predictionInput'][
                    'modelName'] = origin_name
            else:
                prediction_request['predictionInput']['versionName'] = \
                    origin_name + '/versions/{}'.format(self._version_name)

        if self._max_worker_count:
            prediction_request['predictionInput'][
                'maxWorkerCount'] = self._max_worker_count

        if self._runtime_version:
            prediction_request['predictionInput'][
                'runtimeVersion'] = self._runtime_version

        hook = MLEngineHook(self._gcp_conn_id, self._delegate_to)

        # Helper method to check if the existing job's prediction input is the
        # same as the request we get here.
        def check_existing_job(existing_job):
            return existing_job.get('predictionInput', None) == \
                prediction_request['predictionInput']

        try:
            finished_prediction_job = hook.create_job(
                self._project_id, prediction_request, check_existing_job)
        except errors.HttpError:
            raise

        if finished_prediction_job['state'] != 'SUCCEEDED':
            self.log.error('MLEngine batch prediction job failed: {}'.format(
                str(finished_prediction_job)))
            raise RuntimeError(finished_prediction_job['errorMessage'])

        return finished_prediction_job['predictionOutput']


class MLEngineModelOperator(BaseOperator):
    template_fields = [
        '_model',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 model,
                 operation='create',
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):
        super(MLEngineModelOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._model = model
        self._operation = operation
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

    def execute(self, context):
        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)
        if self._operation == 'create':
            return hook.create_model(self._project_id, self._model)
        elif self._operation == 'get':
            return hook.get_model(self._project_id, self._model['name'])
        else:
            raise ValueError('Unknown operation: {}'.format(self._operation))


class MLEngineVersionOperator(BaseOperator):
    
    template_fields = [
        '_model_name',
        '_version_name',
        '_version',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 model_name,
                 version_name=None,
                 version=None,
                 operation='create',
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):

        super(MLEngineVersionOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._model_name = model_name
        self._version_name = version_name
        self._version = version or {}
        self._operation = operation
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

    def execute(self, context):
        if 'name' not in self._version:
            self._version['name'] = self._version_name

        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)

        if self._operation == 'create':
            assert self._version is not None
            return hook.create_version(self._project_id, self._model_name,
                                       self._version)
        elif self._operation == 'set_default':
            return hook.set_default_version(self._project_id, self._model_name,
                                            self._version['name'])
        elif self._operation == 'list':
            return hook.list_versions(self._project_id, self._model_name)
        elif self._operation == 'delete':
            return hook.delete_version(self._project_id, self._model_name,
                                       self._version['name'])
        else:
            raise ValueError('Unknown operation: {}'.format(self._operation))


class MLEngineTrainingOperator(BaseOperator):
    
    template_fields = [
        '_project_id',
        '_job_id',
        '_package_uris',
        '_training_python_module',
        '_training_args',
        '_region',
        '_scale_tier',
        '_runtime_version',
        '_python_version',
        '_job_dir'
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 job_id,
                 package_uris,
                 training_python_module,
                 training_args,
                 region,
                 scale_tier=None,
                 runtime_version=None,
                 python_version=None,
                 job_dir=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 mode='PRODUCTION',
                 *args,
                 **kwargs):
        super(MLEngineTrainingOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._job_id = job_id
        self._package_uris = package_uris
        self._training_python_module = training_python_module
        self._training_args = training_args
        self._region = region
        self._scale_tier = scale_tier
        self._runtime_version = runtime_version
        self._python_version = python_version
        self._job_dir = job_dir
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to
        self._mode = mode

        if not self._project_id:
            raise AirflowException('Google Cloud project id is required.')
        if not self._job_id:
            raise AirflowException(
                'An unique job id is required for Google MLEngine training '
                'job.')
        if not package_uris:
            raise AirflowException(
                'At least one python package is required for MLEngine '
                'Training job.')
        if not training_python_module:
            raise AirflowException(
                'Python module name to run after installing required '
                'packages is required.')
        if not self._region:
            raise AirflowException('Google Compute Engine region is required.')

    def execute(self, context):
        job_id = _normalize_mlengine_job_id(self._job_id)
        training_request = {
            'jobId': job_id,
            'trainingInput': {
                'scaleTier': self._scale_tier,
                'packageUris': self._package_uris,
                'pythonModule': self._training_python_module,
                'region': self._region,
                'args': self._training_args,
            }
        }

        if self._runtime_version:
            training_request['trainingInput']['runtimeVersion'] = self._runtime_version

        if self._python_version:
            training_request['trainingInput']['pythonVersion'] = self._python_version

        if self._job_dir:
            training_request['trainingInput']['jobDir'] = self._job_dir

        if self._mode == 'DRY_RUN':
            self.log.info('In dry_run mode.')
            self.log.info('MLEngine Training job request is: {}'.format(
                training_request))
            return

        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)

        # Helper method to check if the existing job's training input is the
        # same as the request we get here.
        def check_existing_job(existing_job):
            return existing_job.get('trainingInput', None) == \
                training_request['trainingInput']

        try:
            finished_training_job = hook.create_job(
                self._project_id, training_request, check_existing_job)
        except errors.HttpError:
            raise

        if finished_training_job['state'] != 'SUCCEEDED':
            self.log.error('MLEngine training job failed: {}'.format(
                str(finished_training_job)))
            raise RuntimeError(finished_training_job['errorMessage'])

Import variables from composer_conf.json file into Airflow Variables.
Go to Airflow WebUI → Admin → Variables → Browse to file path or configure variables manually.
Please place the following in composer_conf

{
  "mongo_conf": {
    "host": "mongodb://<instance-internal-ip>:27017",
    "db_name": "DBNAME",
    "table_name": "TABLENAME",
    "file_prefix": "Folder In GCS Bucket",
    "bq_dataset": "BigQuery Dataset",
    "bucket_name": "GCS Bucket",
    "tot_data": "yes",
    "ml_configuration": {
      "BUCKET_NAME": "GCS Bucket",
      "BASE_DIR": "gs://GCS Bucket/machine_learning/",
      "PACKAGE_NAME": "PACKAGE NAME FROM setup.py FILE in ML",
      "TRAINER_MODULE": "trainer.train",
      "RUNTIME_VERSION": "1.13",
      "PROJECT_ID": "GCP Project",
      "MODEL_FILE_BUCKET": "BUCKET CREATED BY Composer Environment",
      "MODEL_FILE_LOCATION": "data/MODEL LOCATION FILE",
      "MODEL_NAME": "MODEL_NAME"
    }
  }

Please store any configuration files or credentials file that are used by Composer in the data folder in the bucket created by Composer environment.

After configuring variables accordingly, you can see the DAG named ml_pipeline in the Airflow WebUI.

Please trigger the DAG file from Airflow WebUI. Once the DAG ran successfully. It looks like the following:

Thanks for the read and look forward to your comments.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Email Deliverability Analytics using SendGrid and AWS Big Data Services

Email Deliverability Analytics using SendGrid and AWS Big Data Services

In this post, we will run though a case study to setup an email deliverability analytics pipeline using SendGrid and AWS Big Data Services such as S3, Glue and Athena. To start off, when we send mails from SendGrid to recipients. we get responses (multiple response types are possible such as processed, delivered, blocked, deferred etc) from Email Service Providers such as gmail, yahoo etc. We could use this response data to improve our Email Deliverability by analyzing this email response data. This is achieved by logging these responses (via API Gateway and Lambda function) into Amazon S3 and then analyzing them using Athena. The chain of events is put in place by using a web hook that triggers a post request to AWS API Gateway on an event notification (response) from SendGrid. The API Gateway is further configured to trigger a Lambda Function which writes the email response data into S3. We then use Glue crawler to update the metadata in data catalogue, thereby making it available for Athena to perform SQL based analysis.

Without further ado, let’s set the ball rolling. Go to SendGrid and select Settings>Mail_Settings. Click on Event Notifications

We are gonna enable it by giving an Endpoint and select the Events for which you want to get a response. 

The above endpoint points to the AWS API Gateway (shown below) which is a POST request and it triggers the Lambda function as you can see.

Now our Lambda function stores the event payload data in S3 Bucket
Lambda code:

const AWS = require('aws-sdk')
    var s3Bucket = new AWS.S3( { params: {Bucket: "Your-Bucket"} } );
    
    exports.handler = (event, context, callback) => {
        console.log(event); // the response data
        let x = "";
        event.map((item)=>{
            x = x + JSON.stringify(item) + "\n"
        }) 
        let uuid = create_UUID();
        var filePath = "receivelogs/"+uuid;
        console.log(filePath);
        var data = {
            Key: filePath, 
            Body: x
        };
        s3Bucket.putObject(data, function(err, data){
            if (err) { 
                console.log('Error uploading data: ', data);
                callback(err, null);
            } else {
                console.log('Successfully uploaded the response');
                callback(null, data);
            }
        });
};
// this function will generate Unique User ID. Used as FileName
function create_UUID(){
   var dt = new Date().getTime();
   var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
       var r = (dt + Math.random()*16)%16 | 0;
       dt = Math.floor(dt/16);
       return (c=='x' ? r :(r&0x3|0x8)).toString(16);
   });
   return uuid;
}

When you send mail, the response is triggered from SendGrid via POST request to API Gateway and then the response gets stored in S3 via Lambda function.

AWS Glue is a fully managed ETL (extract, transform, and load) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. We use a crawler to populate the AWS Glue Data Catalog with tables. Below is the step-by-step process to setup the Glue crawler to read an S3 based data source and make it available as a database table for AWS Athena based analytics.

In the step above, you may need to create a new IAM role that provides access to the underlying S3 data.

So in the steps above, we have concluded the setup for the crawler to fetch the underlying data on S3.

When you run this crawler on the S3 based data source, it updates the metadata of objects in that path in Glue data catalogue. Now, Athena can query ( SQL operations) those objects in S3 using metadata available in data catalogue. A lot of business executives aren’t comfortable with SQL queries, perhaps an add-on to this data pipeline could be using AWS Quicksight for a more BI driven analysis.

Thanks for the read!

This story is authored by Santosh Kumar. He is an AWS Cloud Engineer.

Face Recognition App In React Native using AWS Rekognition

Note: Some images in this blog post are not showing up, please read the same post on medium here.

In this blog we are going to build an app for registering faces and verifying faces using Amazon Rekognition in React Native.

Installing dependencies:

Let’s go to React Native Docs, select React Native CLI Quickstart and select our appropriate Development OS and the Target OS as Android, as we are going to build an android application.

Follow the docs for installing dependencies, after installing create a new React Native Application. Use the command line interface to generate a new React Native project called FaceRegister.

react-native init FaceRegister

Preparing the Android device:

We shall need an Android device to run our React Native Android app. This can be either a physical Android device, or more commonly, we can use an Android Virtual Device (AVD) which allows us to emulate an Android device on our computer (using Android Studio).

Either way, we shall need to prepare the device to run Android apps for development.
If you have a physical Android device, you can use it for development in place of an AVD by connecting it to your computer using a USB cable and following the instructions here.

If you are using a virtual device follow this link. I shall be using physical android device.
Now go to the command line and run react-native run-android inside your React Native app directory:

cd FaceRegister
react-native run-android

If everything is set up correctly, you should see your new app running in your physical device or Android emulator.

In your system, you should see a folder named FaceRegister created. Now open FaceRegister folder with your favorite code editor and create a file called Register.js. We need an input box for the username or id for referring the image and a placeholder to preview the captured image and a submit button to register.

Open your Register.js file and copy the below code:

import React from 'react';
import { StyleSheet, View, Text, TextInput, Image, ScrollView, TouchableHighlight } from 'react-native';

class LoginScreen extends React.Component {
    constructor(props){
       super(props);
       this.state =  {
           username : '',
           capturedImage : ''
       };
   }

  
   render() {
       return (
           <View style={styles.MainContainer}>
               <ScrollView>
                   <Text style= {{ fontSize: 20, color: "#000", textAlign: 'center', marginBottom: 15, marginTop: 10 }}>Register Face</Text>
              
                   <TextInput
                       placeholder="Enter Username"
                       onChangeText={UserName => this.setState({username: UserName})}
                       underlineColorAndroid='transparent'
                       style={styles.TextInputStyleClass}
                   />
                   {this.state.capturedImage !== "" && <View style={styles.imageholder} >
                       <Image source={{uri : this.state.capturedImage}} style={styles.previewImage} />
                   </View>}
                  

                   <TouchableHighlight style={[styles.buttonContainer, styles.captureButton]}>
                       <Text style={styles.buttonText}>Capture Image</Text>
                   </TouchableHighlight>

                   <TouchableHighlight style={[styles.buttonContainer, styles.submitButton]}>
                       <Text style={styles.buttonText}>Submit</Text>
                   </TouchableHighlight>
               </ScrollView>
           </View>
       );
   }
}

const styles = StyleSheet.create({
   MainContainer: {
       marginTop: 60
   },
   TextInputStyleClass: {
     textAlign: 'center',
     marginBottom: 7,
     height: 40,
     borderWidth: 1,
     margin: 10,
     borderColor: '#D0D0D0',
     borderRadius: 5 ,
   },
   inputContainer: {
     borderBottomColor: '#F5FCFF',
     backgroundColor: '#FFFFFF',
     borderRadius:30,
     borderBottomWidth: 1,
     width:300,
     height:45,
     marginBottom:20,
     flexDirection: 'row',
     alignItems:'center'
   },
   buttonContainer: {
     height:45,
     flexDirection: 'row',
     alignItems: 'center',
     justifyContent: 'center',
     marginBottom:20,
     width:"80%",
     borderRadius:30,
     marginTop: 20,
     marginLeft: 5,
   },
   captureButton: {
     backgroundColor: "#337ab7",
     width: 350,
   },
   buttonText: {
     color: 'white',
     fontWeight: 'bold',
   },
   horizontal: {
     flexDirection: 'row',
     justifyContent: 'space-around',
     padding: 10
   },
   submitButton: {
     backgroundColor: "#C0C0C0",
     width: 350,
     marginTop: 5,
   },
   imageholder: {
     borderWidth: 1,
     borderColor: "grey",
     backgroundColor: "#eee",
     width: "50%",
     height: 150,
     marginTop: 10,
     marginLeft: 90,
     flexDirection: 'row',
     alignItems:'center'
   },
   previewImage: {
     width: "100%",
     height: "100%",
   }
});

export default LoginScreen;

Now import your Register file in your App.js file which is located in your project root folder. Open your App.js file and replace it with the below code:

import React, {Component} from 'react';
import {View} from 'react-native';
import LoginScreen from './LoginScreen';

class App extends Component {
   render() {
       return (
       <View>
           <LoginScreen />
       </View>
       );
   }
}

export default App;

Now run your app again. Run below command in the project directory:

react-native run-android

You can see a Text input for username and two buttons one(Capture image) for capturing an image and another(Submit) for submitting the details as shown below:

Let’s add the functionality to preview the captured image. We have a package called react-native-image-picker that enables to capture a picture from the device’s camera or to upload an image from the gallery. Go to the command line, in the project directory run the below command to install react-native-image-picker library:

yarn add react-native-image-picker || npm install --save react-native-image-picker

react-native link react-native-image-picker

Add the required permissions in the AndroidManifest.xml file which is located at android/app/src/main/:

<uses-permission android:name="android.permission.CAMERA" />
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>

For more information about this package follow this link.
Now add the below code in your Register.js file.

import React from 'react';
...
...
import ImagePicker from "react-native-image-picker"; //import this

class LoginScreen extends React.Component {
    constructor(props){
      ...
   }

//Add the below method...

   captureImageButtonHandler = () => {
       ImagePicker.showImagePicker({title: "Pick an Image", maxWidth: 800, maxHeight: 600}, (response) => {
           console.log('Response = ', response);
           // alert(response)
           if (response.didCancel) {
               console.log('User cancelled image picker');
           } else if (response.error) {
               console.log('ImagePicker Error: ', response.error);
           } else if (response.customButton) {
               console.log('User tapped custom button: ', response.customButton);
           } else {
               // You can also display the image using data:
               const source = { uri: 'data:image/jpeg;base64,' + response.data };
          
               this.setState({capturedImage: response.uri, base64String: source.uri });
           }
       });
   }
  
   render() {
       return (
           <View style={styles.MainContainer}>
               ...
               ...
             // Add onPress property to capture image button //
           
              <TouchableHighlight style={[styles.buttonContainer, styles.loginButton]} onPress={this.captureImageButtonHandler}>
                       <Text style={styles.loginText}>Capture Image</Text>
               </TouchableHighlight>
              ...
              ...   
           </View>
       );
   }
}

const styles = StyleSheet.create({
...
...
...

});

export default LoginScreen;

Add the captureImageButtonHandler() method in the file and add the onPress property to Capture Image button to call this method. After updating the code, reload your app. Now you can access your camera and gallery by clicking on the Capture Image button. Once you capture an image you can see the preview of that image on your screen as below:

Now we need to register the captured image by storing it in the S3 bucket.

I have created an API Endpoint in API Gateway from AWS console which invokes a lambda function (register-face). All I have to do is to send a POST request to the API URL endpoint from the client-side.

Creating API endpoint:

view Working with API Gateway paragraph in the following post:
https://medium.com/zenofai/serverless-web-application-architecture-using-react-with-amplify-part1-5b4d89f384f7

In the below image I created two resources one for adding faces and another for searching face:

This is the lambda function called register-face that is invoked when we click on the Submit button.

const AWS = require('aws-sdk')
var rekognition = new AWS.Rekognition()
var s3Bucket = new AWS.S3( { params: {Bucket: "<bucket-name>"} } );
var fs = require('fs');

exports.handler = (event, context, callback) => {
    let parsedData = JSON.parse(event)
    let encodedImage = parsedData.Image;
    var filePath = "registered/" + parsedData.name;
    console.log(filePath)
    let decodedImage = new Buffer(encodedImage.replace(/^data:image\/\w+;base64,/, ""),'base64')
    var data = {
        Key: filePath, 
        Body: decodedImage,
        ContentEncoding: 'base64',
        ContentType: 'image/jpeg'
    };
    s3Bucket.putObject(data, function(err, data){
        if (err) { 
            console.log('Error uploading data: ', data);
            callback(err, null);
        } else {
            console.log('succesfully uploaded the image!');
            callback(null, data);
        }
    });
};

In the above code, I am storing the image in the registered folder (prefix) in the S3 bucket.

Just uploading faces in S3 bucket is not enough, we need to create a collection in an AWS region to store the registered faces from S3 bucket. Because we also need to add the verification or recognition process whether the face is registered or not. For that, we shall be using Amazon Rekognition to search faces in the collection. In Amazon Rekognition there is an operation called SearchFacesByImage which searches the image from the collection. Go through the Searching Faces in a Collection to know more. 
Add the below code to the register-face lambda function.

var params ={
        CollectionId: "<collection-id>", 
        DetectionAttributes: [], 
        ExternalImageId: parsedData.name, 
        Image: {
            S3Object: {
                Bucket: "<bucket-name>", 
                Name: filePath
            }
        }
    }
    setTimeout(function () {
        rekognition.indexFaces(params, function(err, data) {
            if (err){
                console.log(err, err.stack); // an error occurred
                callback(err)
            }
            else{
                console.log(data); // successful response
                callback(null,data);
            }
        });
    }, 3000);

So, the final lambda function looks as below:

const AWS = require('aws-sdk')
var rekognition = new AWS.Rekognition()
var s3Bucket = new AWS.S3( { params: {Bucket: "<bucket-name>"} } );
var fs = require('fs');

exports.handler = (event, context, callback) => {
    console.log(event);
    console.log(typeof event);
    console.log(JSON.parse(event));
    let parsedData = JSON.parse(event)
    let encodedImage = parsedData.Image;
    var filePath = "registered/" + parsedData.name;
    console.log(filePath)
    let buf = new Buffer(encodedImage.replace(/^data:image\/\w+;base64,/, ""),'base64')
    var data = {
        Key: filePath, 
        Body: buf,
        ContentEncoding: 'base64',
        ContentType: 'image/jpeg'
    };
    s3Bucket.putObject(data, function(err, data){
        if (err) { 
            console.log('Error uploading data: ', data);
            callback(err, null);
        } else {
            console.log('succesfully uploaded the image!');
            // callback(null, data);
        }
    });
    var params ={
        CollectionId: "face-collection", 
        DetectionAttributes: [], 
        ExternalImageId: parsedData.name, 
        Image: {
            S3Object: {
                Bucket: "face-recognise-test", 
                Name: filePath
            }
        }
    }
    setTimeout(function () {
        rekognition.indexFaces(params, function(err, data) {
            if (err){
                console.log(err, err.stack); // an error occurred
                callback(err)
            }
            else{
                console.log(data);           // successful response
                callback(null,data);
            }
        });
    }, 3000);
};

In this lambda function initially, I’m storing the image(face) in S3 bucket and then adding the same face to collection from S3 bucket.

Let’s get back to our client-side and install the aws-amplify library in our project root directory from the command line with below commands:

npm install --save aws-amplify
npm install --save aws-amplify-react-native
(or)
yarn add aws-amplify
yarn add aws-amplify-react-native

Now add the below code in Register.js file:

import React from 'react';
...
...
import Amplify, {API} from "aws-amplify";
Amplify.configure({
   API: {
       endpoints: [
           {
               name: "<API-name>",
               endpoint: "<your endpoint url>"
           }
       ]
   }
});

class Registration extends React.Component {
    constructor(props){
      ...
      ...
    }
   submitButtonHandler = () => {
       if (this.state.username == '' || this.state.username == undefined || this.state.username == null) {
           alert("Please Enter the Username");
       } else if (this.state.userId == '' || this.state.userId == undefined || this.state.userId == null) {
           alert("Please Enter the UserId");
       } else if(this.state.capturedImage == '' || this.state.capturedImage == undefined || this.state.capturedImage == null) {
           alert("Please Capture the Image");
       } else {
           const apiName = "<API-name>";
           const path = "<your path>";
           const init = {
               headers : {
                   'Accept': 'application/json',
                   "X-Amz-Target": "RekognitionService.IndexFaces",
                   "Content-Type": "application/x-amz-json-1.1"
               },
               body : JSON.stringify({
                   Image: this.state.base64String,
                   name: this.state.username
               })
           }
          
           API.post(apiName, path, init).then(response => {
               alert(JSON.stringify(response))
           });
       }
   }
   
   render() {
       if(this.state.image!=="") {
           // alert(this.state.image)
       }
       return (
           <View style={styles.MainContainer}>
               <ScrollView>
...
...

                   <TouchableHighlight style={[styles.buttonContainer, styles.signupButton]} onPress={this.submitButtonHandler}>
                       <Text style={styles.signupText}>Submit</Text>
                   </TouchableHighlight>
...
...   
            </ScrollView>
           </View>
       );
   }
}

In the above code, we added configuration to the API Gateway using amplify and created a method called submitButtonHandler() where we are going to do a POST request to the lambda function to register the face when the user clicks on the submit button. So, we have added the onPress property to submit button which calls the submitButtonHandler().

Here is the complete code for Register.js file:

import React, {Component} from 'react';
import { StyleSheet, View, Text, TextInput, Image, ScrollView, TouchableHighlight } from 'react-native';
import ImagePicker from "react-native-image-picker";
import Amplify, {API} from "aws-amplify";
Amplify.configure({
    API: {
        endpoints: [
            {
                name: "<api-name>",
                Endpoint: "<your endpoint url>"
            }
        ]
    }
});

class Registration extends Component {
  
    constructor(props){
        super(props);
        this.state =  {
            username : '',
            capturedImage : ''
        };
        // this.submitButtonHandler = this.submitButtonHandler.bind(this);
    }

    captureImageButtonHandler = () => {
        ImagePicker.showImagePicker({title: "Pick an Image", maxWidth: 800, maxHeight: 600}, (response) => {
            console.log('Response = ', response);
            // alert(response)
            if (response.didCancel) {
                console.log('User cancelled image picker');
            } else if (response.error) {
                console.log('ImagePicker Error: ', response.error);
            } else if (response.customButton) {
                console.log('User tapped custom button: ', response.customButton);
            } else {
                // You can also display the image using data:
                const source = { uri: 'data:image/jpeg;base64,' + response.data };
            
                this.setState({capturedImage: response.uri, base64String: source.uri });
            }
        });
    }

    submitButtonHandler = () => {
        if (this.state.username == '' || this.state.username == undefined || this.state.username == null) {
            alert("Please Enter the Username");
        } else if(this.state.capturedImage == '' || this.state.capturedImage == undefined || this.state.capturedImage == null) {
            alert("Please Capture the Image");
        } else {
            const apiName = "<api-name>";
            const path = "<your path>";
            const init = {
                headers : {
                    'Accept': 'application/json',
                    "X-Amz-Target": "RekognitionService.IndexFaces",
                    "Content-Type": "application/x-amz-json-1.1"
                },
                body : JSON.stringify({ 
                    Image: this.state.base64String,
                    name: this.state.username
                })
            }
            
            API.post(apiName, path, init).then(response => {
                alert(response);
            });
        }
    }

    render() {
        if(this.state.image!=="") {
            // alert(this.state.image)
        }
        return (
            <View style={styles.MainContainer}>
                <ScrollView>
                    <Text style= {{ fontSize: 20, color: "#000", textAlign: 'center', marginBottom: 15, marginTop: 10 }}>Register Face</Text>
                
                    <TextInput
                        placeholder="Enter Username"
                        onChangeText={UserName => this.setState({username: UserName})}
                        underlineColorAndroid='transparent'
                        style={styles.TextInputStyleClass}
                    />


                    {this.state.capturedImage !== "" && <View style={styles.imageholder} >
                        <Image source={{uri : this.state.capturedImage}} style={styles.previewImage} />
                    </View>}

                    <TouchableHighlight style={[styles.buttonContainer, styles.captureButton]} onPress={this.captureImageButtonHandler}>
                        <Text style={styles.buttonText}>Capture Image</Text>
                    </TouchableHighlight>

                    <TouchableHighlight style={[styles.buttonContainer, styles.submitButton]} onPress={this.submitButtonHandler}>
                        <Text style={styles.buttonText}>Submit</Text>
                    </TouchableHighlight>
                </ScrollView>
            </View>
        );
    }
}

const styles = StyleSheet.create({
    TextInputStyleClass: {
      textAlign: 'center',
      marginBottom: 7,
      height: 40,
      borderWidth: 1,
      margin: 10,
      borderColor: '#D0D0D0',
      borderRadius: 5 ,
    },
    inputContainer: {
      borderBottomColor: '#F5FCFF',
      backgroundColor: '#FFFFFF',
      borderRadius:30,
      borderBottomWidth: 1,
      width:300,
      height:45,
      marginBottom:20,
      flexDirection: 'row',
      alignItems:'center'
    },
    buttonContainer: {
      height:45,
      flexDirection: 'row',
      alignItems: 'center',
      justifyContent: 'center',
      marginBottom:20,
      width:"80%",
      borderRadius:30,
      marginTop: 20,
      marginLeft: 5,
    },
    captureButton: {
      backgroundColor: "#337ab7",
      width: 350,
    },
    buttonText: {
      color: 'white',
      fontWeight: 'bold',
    },
    horizontal: {
      flexDirection: 'row',
      justifyContent: 'space-around',
      padding: 10
    },
    submitButton: {
      backgroundColor: "#C0C0C0",
      width: 350,
      marginTop: 5,
    },
    imageholder: {
      borderWidth: 1,
      borderColor: "grey",
      backgroundColor: "#eee",
      width: "50%",
      height: 150,
      marginTop: 10,
      marginLeft: 90,
      flexDirection: 'row',
      alignItems:'center'
    },
    previewImage: {
      width: "100%",
      height: "100%",
    }
});

export default Registration;

Now reload your application and register the image(face).

After registering successfully you will receive an alert message as below:

Now go to your S3 bucket and check if the image is stored as below:

And also check in your collection using below command from your command line:

aws rekognition list-faces --collection-id "<your collection id>"

You will get a JSON data with list of faces that are registered as output. So, the registration process is working successfully. Now we need to add the verification/searchface process to our application. I created another lambda function (searchFace) for face verification. Here is the code for face verification lambda function.

const AWS = require('aws-sdk')
var rekognition = new AWS.Rekognition()
var s3Bucket = new AWS.S3( { params: {Bucket: "<bucket-name>"} } );
var fs = require('fs');

exports.handler = (event, context, callback) => {
    let parsedData = JSON.parse(event)
    let encodedImage = parsedData.Image;
    var filePath = parsedData.name + ".jpg";
    console.log(filePath)
    let decodedImage = new Buffer(encodedImage.replace(/^data:image\/\w+;base64,/, ""),'base64')
    var data = {
        Key: filePath, 
        Body: decodedImage,
        ContentEncoding: 'base64',
        ContentType: 'image/jpeg'
    };
    s3Bucket.putObject(data, function(err, data){
        if (err) { 
            console.log('Error uploading data: ', data);
            callback(err);
        } else {
            console.log('succesfully uploaded the image!');
            // callback(null, data);
        }
    });
    var params2 ={
        CollectionId: "<collectio-id>", 
        FaceMatchThreshold: 85, 
        Image: {
            S3Object: {
                Bucket: "<bucket-name>", 
                Name: filePath
            }
        }, 
        MaxFaces: 5
    }
    setTimeout(function () {
        rekognition.searchFacesByImage(params2, function(err, data) {
            if (err){
                console.log(err, err.stack); // an error occurred
                callback(err)
            }
            else{
                console.log(data);           // successful response
                callback(null,data);
            }
        });
    }, 2000);
};

In the above lambda function, we are using SearchFacesByImage. It searches the image from the collection. The response will be a JSON object. Now create a new file called Verification.js in your project root directory and copy the below code in it:

import React, {Component} from 'react';
import { StyleSheet, View, Text, TextInput, Image, ScrollView, TouchableHighlight } from 'react-native';
import ImagePicker from "react-native-image-picker";
import Amplify, {API} from "aws-amplify";
Amplify.configure({
   API: {
       endpoints: [
           {
               name: "<API-name>",
               endpoint: "<your endpoint url>"
           }
       ]
   }
});

class Verification extends Component {
    constructor(props){
       super(props);
       this.state =  {
           username: ''
           capturedImage : ''
       };
   }

   captureImageButtonHandler = () => {
       ImagePicker.showImagePicker({title: "Pick an Image", maxWidth: 800, maxHeight: 600}, (response) => {
           console.log('Response = ', response);
           // alert(response)
           if (response.didCancel) {
               console.log('User cancelled image picker');
           } else if (response.error) {
               console.log('ImagePicker Error: ', response.error);
           } else if (response.customButton) {
               console.log('User tapped custom button: ', response.customButton);
           } else {
               // You can also display the image using data:
               const source = { uri: 'data:image/jpeg;base64,' + response.data };
          
               this.setState({capturedImage: response.uri, base64String: source.uri });
           }
       });
   }

   verification = () => {
       if(this.state.capturedImage == '' || this.state.capturedImage == undefined || this.state.capturedImage == null) {
           alert("Please Capture the Image");
       } else {
           const apiName = "<api-name>";
           const path = "<your path>";
          
           const init = {
               headers : {
                   'Accept': 'application/json',
                   "X-Amz-Target": "RekognitionService.SearchFacesByImage",
                   "Content-Type": "application/x-amz-json-1.1"
               },
               body : JSON.stringify({
                   Image: this.state.base64String,
                   name: this.state.username
               })
           }
          
           API.post(apiName, path, init).then(response => {
               if(JSON.stringify(response.FaceMatches.length) > 0) {
                   alert(response.FaceMatches[0].Face.ExternalImageId)
               } else {
                   alert("No matches found.")
               }
           });
       }
   }

  
  
    render() {
       if(this.state.image!=="") {
           // alert(this.state.image)
       }
       return (
           <View style={styles.MainContainer}>
               <ScrollView>
                   <Text style= {{ fontSize: 20, color: "#000", textAlign: 'center', marginBottom: 15, marginTop: 10 }}>Verify Face</Text>
              
                   {this.state.capturedImage !== "" && <View style={styles.imageholder} >
                       <Image source={{uri : this.state.capturedImage}} style={styles.previewImage} />
                   </View>}

                   <TouchableHighlight style={[styles.buttonContainer, styles.captureButton]} onPress={this.captureImageButtonHandler}>
                       <Text style={styles.buttonText}>Capture Image</Text>
                   </TouchableHighlight>

                   <TouchableHighlight style={[styles.buttonContainer, styles.verifyButton]} onPress={this.verification}>
                       <Text style={styles.buttonText}>Verify</Text>
                   </TouchableHighlight>
               </ScrollView>
           </View>
       );
   }
}

const styles = StyleSheet.create({
   container: {
     flex: 1,
     backgroundColor: 'white',
     alignItems: 'center',
     justifyContent: 'center',
   },
   buttonContainer: {
     height:45,
     flexDirection: 'row',
     alignItems: 'center',
     justifyContent: 'center',
     marginBottom:20,
     width:"80%",
     borderRadius:30,
     marginTop: 20,
     marginLeft: 5,
   },
   captureButton: {
     backgroundColor: "#337ab7",
     width: 350,
   },
   buttonText: {
     color: 'white',
     fontWeight: 'bold',
   },
   verifyButton: {
     backgroundColor: "#C0C0C0",
     width: 350,
     marginTop: 5,
   },
   imageholder: {
     borderWidth: 1,
     borderColor: "grey",
     backgroundColor: "#eee",
     width: "50%",
     height: 150,
     marginTop: 10,
     marginLeft: 90,
     flexDirection: 'row',
     alignItems:'center'
   },
   previewImage: {
     width: "100%",
     height: "100%",
   }
});

export default Verification;

In the above code there are two buttons one (Capture image) for capturing the face that needs to be verified and another (Verify) for verifying the captured face if it is registered or not. When a user clicks on Verify button verification() method will be called in which we make a POST request (invoking searchFace lambda function via API gateway).

Now we are having two screens one for Registration and another for Verification.Let’s add navigation between two screens using react-navigation. The first step is to install react-navigation in your project:

npm install --save react-navigation

The second step is to install react-native-gesture-handler:

yarn add react-native-gesture-handler
# or with npm
# npm install --save react-native-gesture-handler

Now we need to link our react-native with react-native-gesture-handler:

react-native link react-native-gesture-handler

After that go back to your App.js file and replace it with the below code:

import React, {Component} from 'react';
import {View, Text, TouchableHighlight, StyleSheet} from 'react-native';
import Registration from './Registration';
import {createStackNavigator, createAppContainer} from 'react-navigation';
import Verification from './Verification';

class HomeScreen extends React.Component {
   render() {
       return (
           <View style={{ flex: 1, alignItems: "center" }}>
               <Text style= {{ fontSize: 30, color: "#000", marginBottom: 50, marginTop: 100 }}>Register Face ID</Text>
               <TouchableHighlight style={[styles.buttonContainer, styles.button]} onPress={() => this.props.navigation.navigate('Registration')}>
                   <Text style={styles.buttonText}>Registration</Text>
               </TouchableHighlight>
               <TouchableHighlight style={[styles.buttonContainer, styles.button]} onPress={() => this.props.navigation.navigate('Verification')}>
                   <Text style={styles.buttonText}>Verification</Text>
               </TouchableHighlight>
           </View>
       );
   }
}

const MainNavigator = createStackNavigator(
   {
       Home: {screen: HomeScreen},
       Registration: {screen: Registration},
       Verification: {screen: Verification}
   },
   {
       initialRouteName: 'Home',
   }
);

const AppContainer = createAppContainer(MainNavigator);

export default class App extends Component {
   render() {
       return <AppContainer />;
   }
}

const styles = StyleSheet.create({
   buttonContainer: {
       height:45,
       flexDirection: 'row',
       alignItems: 'center',
       justifyContent: 'center',
       marginBottom:20,
       width:"80%",
       borderRadius:30,
       marginTop: 20,
       marginLeft: 5,
   },
   button: {
       backgroundColor: "#337ab7",
       width: 350,
       marginTop: 5,
   },
   buttonText: {
       color: 'white',
       fontWeight: 'bold',
   },
})

Now reload your app and you could see your home screen as below:

When user clicks on Registration button it navigates to Registration screen as below:

When user clicks on Verification button it navigates to Verification screen as below:

Now let’s check the verification process.

Step 1: Navigate to Verification screen.

Step 2: Capture the registered image(face).

Step 3: Click on the verify button.

If everything is fine then you will receive an alert message with the face name as below:

If there are no face matches with the captured face then the user receives an alert message as “No matches found”.

Thanks for the read, I hope it was useful.

This story is authored by Venu Vaka. Venu is a software engineer and machine learning enthusiast.

Creating an Automated Data Engineering Pipeline for Batch Data in Machine Learning

A common use case in Machine Learning life cycle is to have access to the latest training data so as to prevent model deterioration. A lot of times data scientists find it cumbersome to manually export data from data sources such as relational databases or NoSQL data stores or even distributed data. This necessitates automating the data engineering pipeline in Machine Learning. In this post, we will describe how to set up this pipeline for batch data. This workflow is orchestrated via Airflow and can be set up to run at regular intervals: such as hourly, daily, weekly, etc depending on the specific business requirements.

Quick note – In case you are interested in building a real time data engineering pipeline for ML, please look at this post.

In this use case, we are going to export MongoDB data into Google BigQuery via Cloud Storage. The updated data in BigQuery is then made available in Jupyter Notebook as a Pandas Dataframe for downstream model building and analytics. As the pipeline automates the data ingestion and preprocessing, the data scientists always have access to the latest batch data in their Jupyter Notebooks hosted on Google AI Platform. 

We have a MongoDB service running in an instance and we have Airflow and mongoexport running on docker on another instance. Mongoexport is a utility that produces a JSON or CSV export of data stored in MongoDB. Now the data in MongoDB shall be extracted and transformed using mongoexport and loaded into CloudStorage. Airflow is used to schedule and orchestrate these exports. Once the data is available in CloudStorage it could be queried in BigQuery. We then get this data from BigQuery to Jupyter Notebook. Following is a step by step sequence of steps to set up this data pipeline.

You can create an instance in GCP by going to Compute Engine. Click on create instance.

Install.sh:

sudo apt-get update
curl -fsSL https://get.docker.com -o get-docker.sh
sh get-docker.sh
sudo usermod -aG docker $USER
sudo apt-get install -y python-pip
export AIRFLOW_HOME=~/airflow
sudo pip install apache-airflow
sudo pip install apache-airflow[postgres,s3]
airflow initdb
airflow webserver -p 8080 -D
airflow scheduler -D
sudo docker pull mongo
sudo docker run --name mongo_client -d mongo

Please run the install.sh file using ./install.sh command (please make sure file is executable), which would install Docker, Airflow, pulls Mongo image and runs the mongo image in a container named mongo_client.

After installation, for Airflow webUIhttp://<public-ip-instance>:8080 (You may need to open port 8080 in the network just for your public IP)


Please make sure the Google service account in the running instance must have permissions for accessing Big Query and Cloud Storage. After installation, add the Airflow job Python file (mongo-export.py) inside the airflow/dags folder.

Before running the Python file, please make sure that you create Dataset and create the table in BigQuery. Also change the appropriate values for the MongoDB source database, MongoDB source table, Cloud Storage destination bucket and BigQuery destination dataset in the Airflow job Python file (mongo-export.py). Big Query destination table name is the same as the source table in Mongo DB. 

Mongo-export.py:

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import json
from pandas.io.json import json_normalize

# Following are default arguments which could be overridden
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'email': ['airflow@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

bucket_name = '<Your_Bucket>'
db_name = '<Database_Name>'
dataset = '<Dataset_Name>'
table_name = '<Table_Name>'


time_stamp = datetime.now()
cur_date = time_stamp.strftime("%Y-%m-%d")

# It will flatten the nested json
def flatten_json(y):
    out = {}
    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

def convert_string(y):
    string_type = {}

    def convert(x, name=''):
        if type(x) is dict:
            for a in x:
                convert(str(x[a]), name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            string_type[name[:-1]] = x

    convert(y)
    return string_type


def json_flat():
    lines = [line.rstrip('\n') for line in open('/home/dev/'+ table_name + '-unformat.json')]
    flat_list = []
    for line in lines:
        line = line.replace("\"$", "\"")
        line = json.loads(line)
        try:
            flat_list.append(json.dumps(convert_string(flatten_json(line))))
        except Exception as e:
            print(e)
    flatted_json = '\n'.join(i for i in flat_list)

    with open('/home/dev/' + table_name + '.json', 'a') as file:
        file.write(flatted_json)
    return flatted_json 

dag = DAG('mongoexport-daily-gcs-bq', default_args=default_args, params = {'cur_date': cur_date, 'db_name': db_name, 'table_name': table_name, 'dataset': dataset, 'bucket_name': bucket_name})
#exports provide a table data into docker container 
t1 = BashOperator(
    task_id='mongoexport_to_container',
    bash_command='sudo docker exec -i mongo_client sh -c "mongoexport --host=<instance_public_ip> --db {{params.db_name}} --collection {{params.table_name}} --out {{params.table_name}}-unformat.json"',
    dag=dag)

# copies exported file into instance

t2 = BashOperator(
    task_id='cp_from_container_instance',
    bash_command='sudo docker cp mongo_client:/{{params.table_name}}-unformat.json /home/dev/',
    dag=dag)

t3 = PythonOperator(
    task_id='flattening_json',
    python_callable=json_flat,
    dag=dag)
# copies the flatten data from cloud storage
t4 = BashOperator(
    task_id='cp_from_instance_gcs',
    bash_command='gsutil cp /home/dev/{{params.table_name}}.json gs://{{params.bucket_name}}/raw/{{params.table_name}}/date={{params.cur_date}}/',
    dag=dag)
# 
t5 = BashOperator(
    task_id='cp_from_instance_gcs_daily_data',
    bash_command='gsutil cp /home/dev/{{params.table_name}}.json gs://{{params.bucket_name}}/curated/{{params.table_name}}/',
    dag=dag)

# removes the existing bigquery table
t6 = BashOperator(
    task_id='remove_bq_table',
    bash_command='bq rm -f {{params.dataset}}.{{params.table_name}}',
    dag=dag)
# creates a table in bigquery
t7 = BashOperator(
    task_id='create_bq_table',
    bash_command='bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON {{params.dataset}}.{{params.table_name}} gs://{{params.bucket_name}}/curated/{{params.table_name}}/{{params.table_name}}.json',
    dag=dag)
# removes data from container
t8 = BashOperator(
    task_id='remove_file_from_container',
    bash_command='sudo docker exec -i mongo_client sh -c "rm -rf {{params.table_name}}*.json"',
    dag=dag)
# removes data from instance
t9 = BashOperator(
    task_id='remove_file_from_instance',
    bash_command='rm -rf /home/dev/{{params.table_name}}*.json',
    dag=dag)

t1 >> t2
t2 >> t3
t3 >> [t4, t5]
[t4, t5] >> t6
t6 >> t7
t7 >> [t8, t9]

Then run the python file using python <file-path>.py  

(example: python airflow/dags/mongo-export.py).

After running the python file, the dag name shows in Airflow webUI. And you could trigger the dag manually. Please make sure toggle button is in ON status

Once the job completes, the data is stored in the bucket and also available in the destination table in BigQuery. You could see the table is created in BigQuery. Click on querytable to perform SQL operations and you could see your results in the preview tab at the bottom.

Now, you could access the data in Jupyter Notebook from BigQuery. Search for notebook in GCP console. 

Run the below commands in Jupyter Notebook.

from google.cloud import bigquery
client = bigquery.Client()
sql = """
SELECT * FROM 
`<project-name>.<dataset-name>.<table-name>`
"""
df = client.query(sql).to_dataframe()
df.head(10)

This loads the BigQuery data into Pandas dataframe and can be used for model creation as required. Later when the data pipeline is run as per schedule, the refreshed data would automatically be available in this Jupyter notebook via this SQL query.

Hope this helps you to automate your batch Data Engineering pipeline for Machine Learning. 

This story is co-authored by Santosh and Subbareddy. Santosh is an AWS Cloud Engineer and Subbareddy is a Big Data Engineer.

Real Time Data Engineering Pipeline for Machine Learning

Our focus in this post is to leverage Google Cloud Platform’s Big Data Services to build an end to end Data Engineering pipeline for streaming processes.

So what is Data Engineering?
Data Engineering is associated with data specifically around data delivery, storage and processing. The main goal is to provide a reliable infrastructure for data which includes operations such as collect, move, store and prepare data.

Most companies store their data in different formats across databases and as text files. This is where data engineers come in to picture, they build pipelines that transform this data into formats that data scientists could use.

Need for Data Engineering in Machine Learning:
Data engineers are responsible for:

  • Develop machine learning models.
  • Improve existing machine learning models.
  • Research and implement best practices to enhance existing machine learning infrastructure.
  • Developing, constructing, testing and maintaining architectures, such as databases and large-scale processing systems.
  • Analyzing large and complex data sets to derive valuable insights.

This is the reference architecture used to build the end to end pipe data pipeline :

Google Cloud Platform Data Engineering Pipeline for Streaming Processes

The Google Cloud Services used in above streaming process are:

  1. Cloud Firestore: Lets us store data in cloud so that we could sync it across all other devices and also share among multiple users. It is a NoSQL query document data which lets us store, query and sync.
  2. Cloud Function: A lightweight compute solution for developers to create single-purpose, stand-alone functions that respond to cloud events without the need to manage a server or runtime environment.
  3. Cloud Pub/Sub: A fully-managed real-time messaging service that allows you to send and receive messages across independent applications.
  4. Cloud Dataflow: A cloud-based data processing service for both batch and real-time data streaming applications. It enables developers to set up data processing pipelines for integrating, preparing and analyzing large data sets.
  5. Cloud Storage: A data storage service in which data is maintained, managed, backed up remotely and made available to users over a network.
  6. BigQuery: It was designed for analyzing data on the order of billions of rows, using a SQL-like syntax. It runs on the Google Cloud Storage infrastructure and could be accessed with a REST-oriented application programming interface (API).
  7. Jupyter notebook: An open source web application that you could use to create and share documents that contain live code, equations, visualizations, and text.

Create data engineering pipeline via Firestore Streaming

Step1: Add a new record in a collection (think of it as a table), say pubsub-event in firestore.

Step2: It triggers the cloud function named pubsub_event

Document Path: pubsub-event/{eventId}  listens for changes to all pubsub-event documents.

Below is the Cloud Function written in node js which triggers whenever there is a change in our source Firestore collection and publishes the data to Pub/Sub

const PubSub = require('@google-cloud/pubsub');
const pubsubClient = new PubSub();
const functions = require('firebase-functions');

exports.helloFirestore = functions.firestore
  .document("pubsub-event/{eventId}")
  .onCreate((snap, context) => {
    const event = snap.data();
    const payload_data = {};
    for (let key of Object.keys(event)) {
    	payload_data[key] = event[key];
    }
    console.log(JSON.stringify(payload_data))
    // The name for the new topic
    const topicName = 'pubsub-gcs';
    const dataBuffer = Buffer.from(JSON.stringify(payload_data));
    // Creates the new topic
    return pubsubClient
      .topic(topicName)
      .publisher()
      .publish(dataBuffer)
      .then(messageId => {
        console.log(`Message ${messageId} published.`);
        return messageId;
      })
      .catch(err => {
        console.error('ERROR:', err);
      });

  });

Below is the dependencies of the Cloud Function.

{
  "name": "functions",
  "description": "Cloud Functions for Firebase",
  "scripts": {
    "serve": "firebase serve --only functions",
    "shell": "firebase functions:shell",
    "start": "npm run shell",
    "deploy": "firebase deploy --only functions",
    "logs": "firebase functions:log"
  },
  "engines": {
    "node": "8"
  },
  "dependencies": {
    "@google-cloud/pubsub": "^0.18.0",
    "firebase-admin": "~7.0.0",
    "firebase-functions": "^2.3.1"
  },
  "devDependencies": {
    "firebase-functions-test": "^0.1.6"
  },
  "private": true
}

Step3: Cloud Function pubsub_event publishes data to Pub/Sub topic projects/ProjectName/topics/pubsub-gcs

Step4: As shown above, create an export job : ps-to-text-pubsub-gcs (implemented via Dataflow). This job reads data every 5 minutes (configurable to other values as well) from Pub/Sub topic pubsub-gcs and dumps this into the destination bucket on GCS.

Click on run the job.

 Step6: Now, we have data in CloudStorage. We shall use BigQuery to perform all the data manipulation operations. But first we need to create dataset in BigQuery to query data from GCS into Bigquery.

Go to BigQuery and create dataset. So that we create our table to access that data.

The dataset shall be created. By clicking on the dataset you shall see an option to CREATE TABLE.

Click on CREATE TABLE then we shall get the data from CloudStorage. While setting up the required inputs as indicated below, please make sure that you select “Table type” as External Table. This ensures that BigQuery is able to automatically load new data as it comes into GCS.

To create table in BigQuery from CloudStorage. Click on the browse button and configure file path.

Files that are having pubsub-event-* as prefix. This prefix is very important as it makes sure that all subsequent data dumps into GCS destination folder are also picked automatically by BigQuery. Select the file format to be JSON. Check the auto-detect schema box. Then click create table.


Quick Tip: For reading nested json files in BigQuery, please go through this resource. Now the data which is present in CloudStorage is also available in BigQuery and you could run sql commands to manipulate the data.

Click on table you have created, accounts is my table name and click on query table to make SQL operations and you could see your results in the preview tab at the bottom.

Step7: Now, we are on to the last step to access this BigQuery data in Jupyter Notebooks and use that as the source data to train and build our ML models.

Search for notebook in GCP console. 

You shall see something like this 

Click on OPEN JUPYTERLAB then it will redirect you to notebook.

from google.cloud import bigquery

client = bigquery.Client()

sql = """
SELECT * FROM 
`<project-name>.<dataset-name>.<table-name>`
"""

df = client.query(sql).to_dataframe()
df.head(10)

So in this way, we have built a data pipeline that continuously dumps data from Firestore into GCS every 5 minutes, which is then readily available in Jupyter Notebook via BigQuery for any downstream analytics and ML model building.

Look forward to your comments.

This story is co-authored by Santosh Kumar and PV Subbareddy. Santosh is a Software Engineer specializing on Cloud Services and DevOps. Subbareddy is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

Create a Language Translation Mobile App using React Native and Google APIs

In this blog, we are going to learn how to create a simple React Native based Language Translation Android app with Speech to Text and Text to Speech capabilities powered by Google APIs.

Installing dependencies:

Go to React Native Docs, select React Native CLI Quickstart and select your Development OS and Target OS -> Android, as we are going to build an Android application.

Follow the docs for installing dependencies and create a new React Native Application. Use the command line interface to generate a new React Native project called “Translator“:

react-native init Translator

You should see a folder named Translator created. Now open Translator folder with your favourite code editor and create a file called Translator.js. We need an input box for text that needs to be translated and another output section to display the translated text. We also need a select box that lists different languages to choose from for translation. Let’s create a json file, call it languages.json.

Go to languages.json file and copy the code below:

{
   "auto": "Auto Detect",
   "af": "Afrikaans",
   "sq": "Albanian",
   "am": "Amharic",
   "ar": "Arabic",
   "hy": "Armenian",
   "az": "Azerbaijani",
   "eu": "Basque",
   "be": "Belarusian",
   "bn": "Bengali",
   "bs": "Bosnian",
   "bg": "Bulgarian",
   "ca": "Catalan",
   "ceb": "Cebuano",
   "ny": "Chichewa",
   "zh-cn": "Chinese Simplified",
   "zh-tw": "Chinese Traditional",
   "co": "Corsican",
   "hr": "Croatian",
   "cs": "Czech",
   "da": "Danish",
   "nl": "Dutch",
   "en": "English",
   "eo": "Esperanto",
   "et": "Estonian",
   "tl": "Filipino",
   "fi": "Finnish",
   "fr": "French",
   "fy": "Frisian",
   "gl": "Galician",
   "ka": "Georgian",
   "de": "German",
   "el": "Greek",
   "gu": "Gujarati",
   "ht": "Haitian Creole",
   "ha": "Hausa",
   "haw": "Hawaiian",
   "iw": "Hebrew",
   "hi": "Hindi",
   "hmn": "Hmong",
   "hu": "Hungarian",
   "is": "Icelandic",
   "ig": "Igbo",
   "id": "Indonesian",
   "ga": "Irish",
   "it": "Italian",
   "ja": "Japanese",
   "jw": "Javanese",
   "kn": "Kannada",
   "kk": "Kazakh",
   "km": "Khmer",
   "ko": "Korean",
   "ku": "Kurdish (Kurmanji)",
   "ky": "Kyrgyz",
   "lo": "Lao",
   "la": "Latin",
   "lv": "Latvian",
   "lt": "Lithuanian",
   "lb": "Luxembourgish",
   "mk": "Macedonian",
   "mg": "Malagasy",
   "ms": "Malay",
   "ml": "Malayalam",
   "mt": "Maltese",
   "mi": "Maori",
   "mr": "Marathi",
   "mn": "Mongolian",
   "my": "Myanmar (Burmese)",
   "ne": "Nepali",
   "no": "Norwegian",
   "ps": "Pashto",
   "fa": "Persian",
   "pl": "Polish",
   "pt": "Portuguese",
   "ma": "Punjabi",
   "ro": "Romanian",
   "ru": "Russian",
   "sm": "Samoan",
   "gd": "Scots Gaelic",
   "sr": "Serbian",
   "st": "Sesotho",
   "sn": "Shona",
   "sd": "Sindhi",
   "si": "Sinhala",
   "sk": "Slovak",
   "sl": "Slovenian",
   "so": "Somali",
   "es": "Spanish",
   "su": "Sundanese",
   "sw": "Swahili",
   "sv": "Swedish",
   "tg": "Tajik",
   "ta": "Tamil",
   "te": "Telugu",
   "th": "Thai",
   "tr": "Turkish",
   "uk": "Ukrainian",
   "ur": "Urdu",
   "uz": "Uzbek",
   "vi": "Vietnamese",
   "cy": "Welsh",
   "xh": "Xhosa",
   "yi": "Yiddish",
   "yo": "Yoruba",
   "zu": "Zulu"
}

Translator.js (modify file), copy the code below:

import React, { Component } from 'react';
import { View, TextInput, StyleSheet, TouchableOpacity, TouchableHighlight, Text, Picker, Image } from 'react-native';
import Languages from './languages.json';

export default class Translator extends Component {

   constructor(props) {
       super(props);
       this.state = {
           languageFrom: "",
           languageTo: "",
           languageCode: 'en',
           inputText: "",
           outputText: "",
           submit: false,
       };
   }

   render() {
       return (
           <View style = {styles.container}>
               <View style={styles.input}>
                   <TextInput
                       style={{flex:1, height: 80}}
                       placeholder="Enter Text"
                       underlineColorAndroid="transparent"
                       onChangeText = {inputText => this.setState({inputText})}
                       value={this.state.inputText}
                   />
               </View>

               <Picker
               selectedValue={this.state.languageTo}
               onValueChange={ lang => this.setState({languageTo: lang, languageCode: lang})}
               >
                   {Object.keys(Languages).map(key => (
                       <Picker.Item label={Languages[key]} value={key} />
                   ))}
               </Picker>

               <View style = {styles.output}>
                  {/* output text displays here.. */}
               </View>
               <TouchableOpacity
                   style = {styles.submitButton}
                   onPress = {this.handleTranslate}
               >
                   <Text style = {styles.submitButtonText}> Submit </Text>
               </TouchableOpacity>
           </View>
       )
   }
}

const styles = StyleSheet.create({
   container: {
       paddingTop: 53
   },
   input: {
       flexDirection: 'row',
       justifyContent: 'center',
       alignItems: 'center',
       backgroundColor: '#fff',
       borderWidth: .5,
       borderColor: '#000',
       // height: 40,
       borderRadius: 5 ,
       margin: 10
   },
   output: {
       flexDirection: 'row',
       justifyContent: 'center',
       alignItems: 'center',
       backgroundColor: '#fff',
       borderWidth: .5,
       borderColor: '#000',
       borderRadius: 5 ,
       margin: 10,
       height: 80,
   },
   submitButton: {
       backgroundColor: '#7a42f4',
       padding: 10,
       margin: 15,
       borderRadius: 5 ,
       height: 40,
   },
   submitButtonText:{
       color: 'white'
   },
})

Now import your Translator.js in to App.js file.
Replace your App.js file with below code

import React, {Component} from 'react';
import {View} from 'react-native';
import Translator from './Translator';

export default class App extends Component {
   render() {
       return (
       <View>
           <Translator />
       </View>
       );
   }
}

Preparing the Android device

You will need an Android device to run your React Native Android app. This can be either a physical Android device, or more commonly, you can use an Android Virtual Device (AVD) which allows you to emulate an Android device on your computer (using Android Studio).

Either way, you will need to prepare the device to run Android apps for development.

Using a physical device

If you have a physical Android device, you can use it for development in place of an AVD by connecting it to your computer using a USB cable and following the instructions here.

If you are using virtual device follow this link.

Now go to command line and run react-native run-android inside your React Native app directory:

cd Translator
react-native run-android

If everything is set up correctly, you should see your new app running in your physical device or Android emulator shortly as below.

That’s great. We got the basic UI for our Translator app. Now we need to translate the input text into the selected language on submit. In React Native we have a library called react-native-power-translator for translating the text.

Let’s install the react-native-power-translator library. Go to the project root directory in command line and run the below command:

npm i react-native-power-translator --save

Usage:

import { PowerTranslator, ProviderTypes, TranslatorConfiguration, TranslatorFactory } from 'react-native-power-translator';

//Example
TranslatorConfiguration.setConfig('Provider_Type', 'Your_API_Key','Target_Language', 'Source_Language');

//Fill with your own details
TranslatorConfiguration.setConfig(ProviderTypes.Google, 'xxxx','fr');
  • PowerTranslator: a simple component to translate your texts.
  • ProviderTypes: type of cloud provider you want to use. There are two providers you can specify. ProviderTypes.Google for Google translate and ProviderTypes.Microsoft for Microsoft translator text cloud service.
  • TranslatorFactory: It returns a suitable translator instance, based on your configuration.
  • TranslatorConfiguration: It is a singleton class that keeps the translator configuration.

Now add the following code in your Translator.js file:

In the above code I’m using Google provider. You can use either Google or Microsoft provider.

Save all the files and run your app in the command line again and you can see a working app with translates text from one language to another as below.

import React, { Component } from 'react';
...
...
import { PowerTranslator, ProviderTypes, TranslatorConfiguration, TranslatorFactory } from 'react-native-power-translator';

export default class Translator extends Component {
...
...
render() {
       TranslatorConfiguration.setConfig(ProviderTypes.Google,’XXXX’, this.state.languageCode);
       return (
             ...
             ...
             ...
             <View style = {styles.output}>
                  {/* output text displays here.. */}
              {this.state.submit && <PowerTranslator  text={this.state.inputText} />}
              </View>

             ...
...
    
}
}

In the below image you can see the text that converted from English to French.

In Android devices you can download different language keyboards. So that you can translate your local language to other languages.

In Android devices you can download different language keyboards. So that you can translate your local language to other languages.

For speech to text we have a library called react-native-android-voice. Let’s install this library in our project.
Go to command line and navigate to project root directory and run the below command:

npm install --save react-native-android-voice

After installing successfully please follow the steps in this link for linking the library to your android project.

Once you completed linking libraries to your Android project, let’s start implementing it in our Translator.js file.

Let’s add a mic icon in our input box. When user taps on mic icon the speech feature will be enabled, there is a library called react-native-vector-icons. For installation follow the steps in this link.

In this project I’m using Ionicons icons, you can change it in iconFontNames in your android/app/build.gradle file as:

project.ext.vectoricons = [
   iconFontNames: [ 'Ionicons.ttf' ] // Name of the font files you want to copy
]

Now add the following code in Translator.js file.

import React, { Component } from 'react';
...
...
import Icon from "react-native-vector-icons/Ionicons";
import SpeechAndroid from 'react-native-android-voice';

export default class Translator extends Component {
constructor(props) {
      super(props);
      this.state = {
          languageFrom: "",
          ....
          ....
          micOn: false, //Add this
      };
      this._buttonClick = this._buttonClick.bind(this); //Add this
  }

...
async _buttonClick(){
       await this.setState({micOn: true})
       try{
           var spokenText = await SpeechAndroid.startSpeech("", SpeechAndroid.ENGLISH);
           await this.setState({inputText: spokenText});
           await ToastAndroid.show(spokenText , ToastAndroid.LONG);
       }catch(error){
           switch(error){
               case SpeechAndroid.E_VOICE_CANCELLED:
                   ToastAndroid.show("Voice Recognizer cancelled" , ToastAndroid.LONG);
                   break;
               case SpeechAndroid.E_NO_MATCH:
                   ToastAndroid.show("No match for what you said" , ToastAndroid.LONG);
                   break;
               case SpeechAndroid.E_SERVER_ERROR:
                   ToastAndroid.show("Google Server Error" , ToastAndroid.LONG);
                   break;
           }
       }
       this.setState({micOn: false})
   }

render() {
       TranslatorConfiguration.setConfig(ProviderTypes.Google,'XXXX', this.state.languageCode);
       return (
             <View style = {styles.container}>
              <View style={styles.input}>
                  <TextInput
                      ...
                      ...
                      ...
                  />
                  <TouchableOpacity onPress={this._buttonClick}>
                       {this.state.micOn ? <Icon size={30} name="md-mic" style={styles.micStyle}/> : <Icon size={30} name="md-mic-off" style={styles.micStyle}/>}
                   </TouchableOpacity>
              </View>
...
...
</View>
    )
}
}

const styles = StyleSheet.create({
  container: {
      paddingTop: 53
  },
...
...
...
  micStyle: {
      padding: 10,
      margin: 5,
      alignItems: 'center'
  }
})

After adding the code correctly, save all the changes and run your app. Now you can see a mic icon in the text input box which allows speech to text feature.

In the above code we are calling a function called _buttonClick() which contains speech to text logic. This will automatically start recognizing and adjusting for the English Language. You can use different languages for speech, you can check here for more information.

Now we successfully implemented speech to text to our Translator app. Let’s add text to speech feature which will turn the translated text into speech. For that we have a library called react-native-tts which converts text to speech.

Install react-native-tts in our project. Go to the command line and navigate to project root directory and run the following command:

npm install --save react-native-tts
react-native link react-native-tts

First command will install the library.
Second command will link the library to your android project.

Now add the following code in your Translator.js file

import React, { Component } from 'react';
...
...
import Icon from "react-native-vector-icons/Ionicons";
import SpeechAndroid from 'react-native-android-voice';

export default class Translator extends Component {
constructor(props) {
      super(props);
      this.state = {
          languageFrom: "",
          ...
          ...
          micOn: false, //Add this
      };
      this._buttonClick = this._buttonClick.bind(this); //Add this
  }


handleTranslate = () => {
       this.setState({submit: true})
       const translator = TranslatorFactory.createTranslator();
       translator.translate(this.state.inputText).then(translated => {
           // alert(translated)
           Tts.getInitStatus().then(() => {
               Tts.speak(translated);
           });
           Tts.stop();
       });
   }
...

render() {
         ...
    )
}
}

In the above code we have added the text to speech logic in handleTranslate function that called when submit button clicked.

Now our final Translator.js file will look like below:

import React, { Component } from 'react';
import { PowerTranslator, ProviderTypes, TranslatorConfiguration, TranslatorFactory } from 'react-native-power-translator';
import { View, TextInput, StyleSheet, TouchableOpacity, TouchableHighlight, Text, Picker, Image } from 'react-native';
import Icon from "react-native-vector-icons/Ionicons";
import Tts from 'react-native-tts';
import Languages from './languages.json';
import SpeechAndroid from 'react-native-android-voice';

export default class Translator extends Component {

   constructor(props) {
       super(props);
       this.state = {
           languageFrom: "",
           languageTo: "",
           languageCode: 'en',
           inputText: "",
           outputText: "",
           submit: false,
           micOn: false,
       };
       this._buttonClick = this._buttonClick.bind(this);
   }
   handleTranslate = () => {
       this.setState({submit: true})
       const translator = TranslatorFactory.createTranslator();
       translator.translate(this.state.inputText).then(translated => {
           Tts.getInitStatus().then(() => {
               Tts.speak(translated);
           });
           Tts.stop();
       });
   }
   async _buttonClick(){
       await this.setState({micOn: true})
       try{
           var spokenText = await SpeechAndroid.startSpeech("", SpeechAndroid.DEFAULT);
           await this.setState({inputText: spokenText});
           await ToastAndroid.show(spokenText , ToastAndroid.LONG);
       }catch(error){
           switch(error){
               case SpeechAndroid.E_VOICE_CANCELLED:
                   ToastAndroid.show("Voice Recognizer cancelled" , ToastAndroid.LONG);
                   break;
               case SpeechAndroid.E_NO_MATCH:
                   ToastAndroid.show("No match for what you said" , ToastAndroid.LONG);
                   break;
               case SpeechAndroid.E_SERVER_ERROR:
                   ToastAndroid.show("Google Server Error" , ToastAndroid.LONG);
                   break;
           }
       }
       this.setState({micOn: false})
   }

   render() {
       TranslatorConfiguration.setConfig(ProviderTypes.Google, 'XXXXXXXXX', this.state.languageCode);
       return (
           <View style = {styles.container}>
               <View style={styles.input}>
                   <TextInput
                       style={{flex:1, height: 80}}
                       placeholder="Enter Text"
                       underlineColorAndroid="transparent"
                       onChangeText = {inputText => this.setState({inputText})}
                       value={this.state.inputText}
                   />
                   <TouchableOpacity onPress={this._buttonClick}>
                       {this.state.micOn ? <Icon size={30} name="md-mic" style={styles.ImageStyle}/> : <Icon size={30} name="md-mic-off" style={styles.ImageStyle}/>}
                   </TouchableOpacity>
               </View>

               <Picker
               selectedValue={this.state.languageTo}
               onValueChange={ lang => this.setState({languageTo: lang, languageCode: lang})}
               >
                   {Object.keys(Languages).map(key => (
                       <Picker.Item label={Languages[key]} value={key} />
                   ))}
               </Picker>

               <View style = {styles.output}>
                   {this.state.submit && <PowerTranslator text={this.state.inputText} />}
                   {/* onTranslationEnd={this.textToSpeech} */}
               </View>
               <TouchableOpacity
                   style = {styles.submitButton}
                   onPress = {this.handleTranslate}
               >
                   <Text style = {styles.submitButtonText}> Submit </Text>
               </TouchableOpacity>
           </View>
       )
   }
}

const styles = StyleSheet.create({
   container: {
       paddingTop: 53
   },
   input: {
       flexDirection: 'row',
       justifyContent: 'center',
       alignItems: 'center',
       backgroundColor: '#fff',
       borderWidth: .5,
       borderColor: '#000',
       // height: 40,
       borderRadius: 5 ,
       margin: 10
   },
   output: {
       flexDirection: 'row',
       justifyContent: 'center',
       alignItems: 'center',
       backgroundColor: '#fff',
       borderWidth: .5,
       borderColor: '#000',
       borderRadius: 5 ,
       margin: 10,
       height: 80,
   },
   ImageStyle: {
       padding: 10,
       margin: 5,
       alignItems: 'center'
   },
   submitButton: {
       backgroundColor: '#7a42f4',
       padding: 10,
       margin: 15,
       borderRadius: 5 ,
       height: 40,
   },
   submitButtonText:{
       color: 'white'
   },
})

Make sure you have replaced ‘XXXXXX’ with your Google/Microsoft API-Key in TranslatorConfiguration in render method.

That’s it. Now we have a Language Translator, Speech to Text, Text to Speech features in our Translator application. We are ready to go now. Reload / Run your app and you can see a fully functional app.

When user taps on mic icon, an Android speech recognizer popup will be displayed as below.

If user didn’t speak or google doesn’t recognize the speech then it shows up as below:

Once Google recognizes speech then select a language to which you need to translate to and click the submit button, so that you would receive a translated text as speech.

That’s it folks!

This story is authored by Venu Vaka. Venu is a software engineer and machine learning enthusiast.

Processing Kinesis Data Streams with Spark Streaming


Solution Overview : In this blog, we are going to build a real time anomaly detection solution using Spark Streaming. Kinesis Data Streams would act as the input streaming source and the anomalous records would be written as Data Streams in DynamoDB.

Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. KDS can continuously capture gigabytes of data per second from hundreds of thousands of sources such as website clickstreams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events.

Data Streams

The unit of data stored by Kinesis Data Streams is a data record. A data stream represents a group of data records.

For deep dive into Kinesis Data Streams, please go through these official docs.

Kinesis Data Streams Producers

A producer puts data records into Amazon Kinesis Data Streams. For example, a web server sending log data to a Kinesis Data Stream is a producer.

For more details about Kinesis Data Streams Producers, please go through these official docs.

Kinesis Data Streams Consumers

A consumer, known as an Amazon Kinesis Data Streams application, is an application that you build to read and process data records from Kinesis Data Streams.

For more details about Kinesis Data Streams Consumers, please go through these official docs.


Creating a Kinesis Data Stream

Step1. Go to Amazon Kinesis console -> click on Create Data Stream

Step2. Give Kinesis Stream Name and Number of shards as per volume of the incoming data. In this case, Kinesis stream name as kinesis-stream and number of shards are 1.

Shards in Kinesis Data Streams

A shard is a uniquely identified sequence of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity.

For more about shards, please go through these official docs.

Step3. Click on Create Kinesis Stream

Kinesis Data Streams can be connected with Kinesis Data Firehoseto write the streamsinto S3.


Configure Kinesis Data Streams with Kinesis Data Producers

The Amazon Kinesis Data Generator (KDG) makes it easy to send data to Kinesis Streams or Kinesis Firehose.

While following this link, choose to Create a Cognito User with Cloud Formation.

After selecting the above option, we will navigate to the Cloud Formation console:

Click on Next and provide Username and Password for Cognito User for Kinesis Data Generator.

Click on Next and Create Stack.

CloudFormation Stack is created.

Click on Outputs tab and open the link

After opening the link, enter the usernameand password of Cognito user.

After Sign In is completed, select the RegionStream and configure the number of records per second. Choose record template as your requirement.

In this case, the template data format is

{{name.firstName}},{{random.number({“min”:10, “max”:550})}},{{random.arrayElement([“OK”,”FAIL”,”WARN”] )}}

The template data looks like the following

You can send different types of dummy data to Kinesis Data Streams.

Kinesis Data Streams with Kinesis Data Producers are ready. Now we shall build a Spark Streaming application which consumes data streams from Kinesis Data Streams and dumps the output streams into DynamoDB.


Create DynamoDB Tables To Store Data Frame

Go to Amazon DynamoDB console -> Choose Create Table and name the table, in this case, data_dump

In the same way, create another table named anomaly_data. Make sure Kinesis Data streams and DynamoDb tables are in the same region.

Spark Streaming with Kinesis Data Streams

Spark Streaming

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window.

For deep dive into Spark Streams, please go through docs.

In this case, the Scala programming language is used. Scala version is 2.11.12. Please install scala, sbt and spark.

Create a folder structure like the following

Kinesis-spark-streams-dynamo
| -- src/main/scala/packagename/object
| -- build.sbt
| -- project/assembly.sbt

In this case, the structure looks like the following

After creating the folder structure,

Please replace build.sbt file with the following code. The following code will add the required dependencies like spark, spark kinesis assembly, spark streaming and many more.

name := "kinesis-spark-streams-dynamo"

version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies += "com.audienceproject" %% "spark-dynamodb" % "0.4.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3"
libraryDependencies += "com.google.guava" % "guava" % "14.0.1"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.466"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.3"

assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}

Please replace assembly.sbt file with the following code. This will add the assembly plugin which can be used for creating the jar.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")

Please replace kinesis-spark-streams-dynamo file with the following code.

package com.wisdatum.kinesisspark

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.spark._
import org.apache.spark.streaming._
import com.amazonaws.services.kinesis.AmazonKinesis
import scala.collection.JavaConverters._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import org.apache.log4j.{Level, Logger}
import com.audienceproject.spark.dynamodb.implicits._

object KinesisSparkStreamsDynamo {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
.asScala
.find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
.map(_.getName)
.getOrElse(
throw new IllegalArgumentException(s"Could not resolve region for endpoint: $endpoint"))
}

def main(args: Array[String]) {

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

val conf = new SparkConf().setAppName("KinesisSparkExample").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
println("Launching")
val Array(appName, streamName, endpointUrl, dynamoDbTableName) = args
println(streamName)
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()

require(credentials != null,
"No AWS credentials found. Please specify credentials using one of the methods specified " +
"in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
println("numShards are " + numShards)

val numStreams = numShards

val batchInterval = Milliseconds(100)

val kinesisCheckpointInterval = batchInterval

val regionName = getRegionNameByEndpoint(endpointUrl)

val anomalyDynamoTable = "data_anomaly"

println("regionName is " + regionName)

val kinesisStreams = (0 until numStreams).map { i =>
KinesisInputDStream.builder
.streamingContext(ssc)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointAppName(appName)
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
}

val unionStreams = ssc.union(kinesisStreams)

val inputStreamData = unionStreams.map { byteArray =>
val Array(sensorId, temp, status) = new String(byteArray).split(",")
StreamData(sensorId, temp.toInt, status)
}

val inputStream: DStream[StreamData] = inputStreamData

inputStream.window(Seconds(20)).foreachRDD { rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._

val inputStreamDataDF = rdd.toDF()
inputStreamDataDF.createOrReplaceTempView("hot_sensors")

val dataDumpDF = spark.sql("SELECT * FROM hot_sensors ORDER BY currentTemp DESC")
dataDumpDF.show(2)
dataDumpDF.write.dynamodb(dynamoDbTableName)

val anomalyDf = spark.sql("SELECT * FROM hot_sensors WHERE currentTemp > 100 ORDER BY currentTemp DESC")
anomalyDf.write.dynamodb(anomalyDynamoTable)
}

// To make sure data is not deleted by the time we query it interactively
ssc.remember(Minutes(1))

ssc.start()
ssc.awaitTermination()
}
}
case class StreamData(id: String, currentTemp: Int, status: String)

appName: The application name that will be used to checkpoint the Kinesis sequence numbers in the DynamoDB table.

  1. The application name must be unique for a given account and region.
  2. If the table exists but has incorrect checkpoint information (for a different stream, or old expired sequenced numbers), then there may be temporary errors.

kinesisCheckpointInterval
The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.

endpointURL:
Valid Kinesis endpoints URL can be found here.

For more details about building KinesisInputDStream, please go through the documentation.

Configure AWS Credentials using environment variables or using aws configure command.

Make sure all the resources are under the same account and region. Region of CloudFormation Stack that was created is in us-west-2 even though all the resources are in another region, this would not affect the process.


Building Executable Jar

  • Open Terminal -> Go to project root directory, in this case 
    kinesis-spark-streams-dynamo
  • Run sbt assembly

The jar has been packaged into project root directory/target/scala-2.11/XXXX.jar. Name of the jar is the name that provided in build.sbt file.

Run the Jar using spark-submit

  • Open Terminal -> Go to Spark bin directory
  • Run the following command, and it looks like
./bin/spark-submit ~/Desktop/kinesis-spark-streams-dynamo/target/scala-2.11/kinesis-spark-streams-dynamo-assembly-0.1.jar appName streamName endpointUrl dynamoDbTable

To know more about how to submit applications using spark-submit, please review this.

Arguments that are passed are highlighted in the above highlighted blue box. Place the arguments as needed.

Read Kinesis Data Streams in Spark Streams

  1. Go to Amazon Kinesis Data Generator-> Sign In using Cognito user
  2. Click on Send Data, it starts sending data to Kinesis Data Streams

Data would be sent to Kinesis Data Stream, in this case, kinesis-stream, it looks like this.

Monitoring Kinesis Data Streams

Go to Amazon Kinesis Console -> Choose Data streams -> Select created Data Stream -> click on Monitoring

The terminal looks like the following when it starts receiving the data from Kinesis Data Streams

The data_dump table has the whole data that is coming from Kinesis Data Streams. And the data in the data_dump table looks like

The data_anomaly table has data where currentTemp is greater than 100. Here the anomaly is temperature greater than 100. And the data in the data_anomaly table looks like

I hope this article was helpful in setting up Kinesis Data Streams that are consumed and processed using Spark Streaming and stored in DynamoDB.

This story is authored by P V Subbareddy. He is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

A Beginner’s Guide to Airflow

Airflow is used to create code pipeline where we can schedule and monitor our workflows. A workflow can be a collection of tasks to be executed like a flowchart.

It is like an orchestra conductor that controls all different data processing tools/tasks under one roof.

Why Airflow?

  • Open source.
  • Can handle upstream/downstream in an elegant way.
  • Jobs can pass parameters to other jobs downstream.
  • Ease of deployment of workflow.
  • Easy to reprocess historical jobs by date or rerun for specific intervals.
  • Integration with a lot of infrastructure.
  • Job testing through airflow itself.
  • Accessibility of log files and other meta-data through the web GUI.
  • Data sensors to trigger Directed Acyclic Graph (DAG) when data arrives.

Airflow Architecture:

Metadata: It stores the state of tasks and workflow.
Scheduler: It is a user DAG, with the state of tasks in metadata database. It decides what needs to execute.
Executor: It is a message queuing process. Decides which worker will execute each task.

Setting up airflow:

Use the following commands to install airflow

$ sudo pip install apache-airflow

You can install extra features like

$ sudo pip install apache-airflow[postgres,s3]

Airflow requires database to be initiated before you run tasks

$ airflow initdb

After installation the folder structure will be something like this.

If you don’t see ‘dags’ folder then you can create by yourself and name it ‘dags’. You shall dump all your python task files in that folder.

To start the web server

$ Airflow webserver -p 8080

Goto https://localhost:8080 to see airflow GUI. You should see something like this:

Airflow relies on 4 cores:

  1. DAG.
  2. Tasks.
  3. Scheduler.
  4. X-com.

Directed Acyclic Graph (DAG):

It is a collection of all tasks which you want to run in an organized way that shows their relationships and dependencies.

Basically a DAG is just a python file, which is used to organise tasks and set their execution content. DAGs do not perform any actual computation. Basically a DAG task contains 2 type of things:

  1. Operators
    It determines what actually gets done. It triggers a certain action like running a bash command, executing a python function, etc.
    There are different kinds of operators like:
    Bash operator: Executes a bash command.
    Python operator: Calls an arbitrary python function.
    Hive operator: Executes hql code or hive script in specific hive database.
    Bigquery operator: Executes Google BigQuery SQL query in a specific BigQuery database.
  2. Sensors : A certain type of operator that will keep running until a certain criteria is met.

Tasks :
Tasks are the elements that actually “do work” which we want to be performed. It is our job to write the configuration and organise tasks in specific order to create data pipeline.

Once the operator is instantiated, it is referred to as a “task”. An operator describes single task in a workflow

Scheduler :
It monitors all tasks and all dags. And trigger the task instance whose dependencies have met.
We create new python file in dags folder
Importing packages in our python file:

import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

Next setting default arguments

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': dt.datetime(2019, 6, 17, 8, 40, 00),
    'retries': 1,
}

Depends_on_past: we will be in a situation where  a task t1 is dependent on its previous execution. In that case we will use this.

Start_date: starting date of execution
Defining our tasks, dags, functions

def greet():
    print("Hello how are you ?")
    return 'greeted'

def response():
    return "im fine what about you?"

with DAG('YourDagName', default_args=default_args, schedule_interval='@daily',) as dag:
    opr_hello = BashOperator(task_id='say_Hi',bash_command='echo "Hi!!"')
    opr_greet = PythonOperator(task_id='greet', python_callable=greet)
    opr_sleep = BashOperator(task_id='sleep_me', bash_command='sleep 2')
    opr_respond = PythonOperator(task_id='respond', python_callable=response)

In this, I am using 2 simple functions and with dag passing my default arguments and also schedule time of execution.

After that we created 4 tasks in which 2 are python and other 2 are bash. If we want to execute a python function we need to use python operator and also the same with the bash operator. Python operator will take at least 2 arguments which is task id and python callable and for bash, task id and bash command.

Defining task dependencies or order in which the tasks should be executed in.

Task dependencies are set using

  • The set_upstream and set_downstream operator.
  • The bitshift operator <<  and >>.

t1.set_downstream(t2)
t1>>t2

The above 2 lines mean the same. This means t2 will depend on t1 to run successfully.

opr_hello >> opr_greet >> opr_sleep >> opr_respond

If you wrap up the whole code in one file and put that file in dags folder then you can see your dag in Airflow GUI. If you click on your DAG you will see tree view and many other options to view your data in different forms.

Dagrun are dags that runs at certain time.
TaskInstances are task belongs to dagrun.
Each dagrun and task instance is associated with an entry in airflow’s metadata database that logs their state(eg: queued, running, failed, skipped, etc).
X-com: XCom which means cross communication allow airflow tasks to send and receive data/status. XCom data are stored in the airflow database in the form of key-value pairs.

That’s all guys. Thanks for the read!

This story is co-authored by Santosh Kumar and PV Subbareddy. Santosh is a Software Engineer specializing on Cloud Services and DevOps. Subbareddy is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

Building a Simple React Application using Redux

In this article I will be building a simple react app that uses redux to manage its state. If you are a complete beginner, I recommend first reading my A Beginners Guide to Understanding Redux article then following this article. For this article, I have built a simple react app that renders a dummy signup form and a table which displays the list of employee details (dummy data). We shall integrate it with redux but first do the initial setup to follow along.

Initial Setup

Please clone this specific branch ‘without-redux’ from my GitHub repository.

git clone -b without-redux --single-branch https://github.com/koushik-bitzop/newempdetails.git

Run the above command on your terminal, this should create a folder and clone all the required code files from my repository.
To get started install the dependencies:

sudo npm install

Start the application server:

sudo npm start

You should see the app running on your localhost, like this:

The code files that you have cloned and the below are the same.

App.js

import React, {Component} from 'react';
import Header from './components/Header';
import Routes from './components/Routes';
import { BrowserRouter as Router} from 'react-router-dom';

class App extends Component {
 state = {  }
 render() {
   return (
   <Router>
       <Header/>
       <div className="row">
           <Routes />
       </div>
   </Router>
   );
 }
}

export default App;

Routes.js

import React, {Component} from 'react';
import {Switch} from 'react-router';
import {Route} from 'react-router-dom';
import FormdataTable from './FormdataTable';
import SignupForm from './SignupForm';

class Routes extends Component {
   render() {
       return ( 
           <Switch>
               <Route path={'/'} exact component={SignupForm } ></Route>
               <Route path={'/viewlist'} exact component={FormdataTable}></Route>
           </Switch>
       );
   }
}

export default Routes;

Header.js

import React,{Component} from 'react';
import {Link, NavLink} from 'react-router-dom';

class Header extends Component{
   render(){
       return(
           <nav className="navbar navbar-expand-lg navbar-light bg-light">
                  
               <Link to="/" className="navbar-brand" href="#"><h1>Wisdatum</h1></Link>

               <div className="collapse navbar-collapse" id="navbarNav">
                   <ul className="navbar-nav">
                       <li className="nav-item active">
                           <NavLink to="/" className="nav-link" href="#">Add new </NavLink>
                       </li>
                       <li className="nav-item">
                           <NavLink to="/viewlist" className="nav-link" href="#">View list</NavLink>
                       </li>
                   </ul>
               </div>

           </nav>
       );
   }
}

export default Header;

SignupForm.js

import React, {Component} from 'react';
import {Form, FormGroup, FormControl, FormLabel, Button} from 'react-bootstrap';

class SignupForm extends Component {
   constructor(props) {
   super(props);
   this.state = {
       firstname: '',
       lastname: '',
       email: '',
       mobile: '',
       city: ''
   };

   // This binding is necessary to make `this` work in the callback
   this.handleSubmit = this.handleSubmit.bind(this);
   }
handleSubmit(){
   let formdata = {
       firstname: this.state.firstname,
       lastname: this.state.lastname,
       email: this.state.email,
       mobile: this.state.mobile,
       city: this.state.city
   };
   console.log("submitted",formdata);
   this.setState({
       firstname: '',
       lastname: '',
       email: '',
       mobile: '',
       city: ''
   });
}
   render() {
       return (
           <div className="col-md-4 offset-md-4">
               <Form>
                   <h2 style={{"textAlign":"center", "marginTop":"20px"}}>Enter Employee Details</h2>
                   <hr/>
                   <FormGroup>
                       <FormLabel>Firstname</FormLabel>
                       <FormControl
                           type="text"
                           name="firstname"
                           placeholder="Firstname"
                           onChange={e => {
                               this.setState({[e.target.name]:e.target.value});
                           }}
                           value = {this.state.firstname}
                       />
                   </FormGroup>
              
                   <FormGroup>
                       <FormLabel>Lastname</FormLabel>
                       <FormControl
                           type="text"
                           name="lastname"
                           placeholder="Lastname"
                           onChange={e => {
                               this.setState({[e.target.name]:e.target.value});
                           }}
                           value = {this.state.lastname}
                       />
                   </FormGroup>

                   <FormGroup>
                       <FormLabel>Email</FormLabel>
                       <FormControl
                           type="text"
                           name="email"
                           placeholder="Email"
                           onChange={e => {
                               this.setState({[e.target.name]:e.target.value});
                           }}
                           value = {this.state.email}
                       />
                   </FormGroup>

                   <FormGroup>
                       <FormLabel>Mobile</FormLabel>
                       <FormControl
                           type="text"
                           name="mobile"
                           placeholder="Mobile"
                           onChange={e => {
                               this.setState({[e.target.name]:e.target.value});
                           }}
                           value = {this.state.mobile}
                       />
                   </FormGroup>

                   <FormGroup>
                       <FormLabel>City</FormLabel>
                       <FormControl
                           type="text"
                           name="city"
                           placeholder="City/Village"
                           onChange={e => {
                               this.setState({[e.target.name]:e.target.value});
                           }}
                           value = {this.state.city}
                   />
                   </FormGroup>
                   <Button onClick={this.handleSubmit}>Submit</Button>
               </Form>
           </div>
        );
   }
}
export default SignupForm;

FormdataTable.js

import React,{Component} from 'react';
import empdata from '../data/employeedata.json';

class FormdataTable extends Component {
   state = {  }
   render() {
       console.log(empdata);
       return (
           <div className="col-md-10 offset-md-1">
               <h2 style={{"textAlign":"center", "marginTop":"20px", "marginBottom":"20px"}}>New Employee Details</h2>
               <table className="table table-hover">
                   <thead>
                       <tr>
                           <th scope="col">#</th>
                           <th scope="col">Fist Name</th>
                           <th scope="col">Last Name</th>
                           <th scope="col">Email</th>
                           <th scope="col">Mobile</th>
                           <th scope="col">City</th>
                           <th scope="col">Options</th>
                       </tr>
                   </thead>
                   <tbody>
                       {
                       empdata.map((emp, index) => {
                       return(
                           <tr key={index}>
                                   <td>{index+1}</td>
                                   <td>{emp.firstname}</td>
                                   <td>{emp.lastname}</td>
                                   <td>{emp.email}</td>
                                   <td>{emp.mobile}</td>
                                   <td>{emp.city}</td>
                                   <td>
                                       <button
                                           type="button"
                                           onClick={()=>this.handleEdit(index)}
                                           className="btn btn-sm btn-primary">Edit
                                       </button>
                                       {" | "}
                                       <button
                                           type="button"
                                           onClick={()=>this.handleDelete(index)}
                                           className="btn btn-sm btn-danger">Delete
                                       </button>
                                   </td>
                           </tr>
                       )})
                       }
                   </tbody>
               </table>
           </div>
       );
   }
}
export default FormdataTable;

Before we begin integrating with redux, please review and understand the code above. I have used bootstrap for styling and react-router for creating the navigation/routes in the application.

Integrating with redux:

The above diagram depicts redux workflow in a nutshell, UI triggers actions like form-submit, edit & delete in table. Each action represents an action object, these action objects are then dispatched to Reducer function which updates the Store. Store contains the State. All the components get their data from the store and will be updated with new state. This new state will re-render and defines the new UI changes, repeats the cycle.

Redux is external to react we shall need redux & react-redux library to connect to redux with react.

npm install redux react-redux --save

We shall go step by step.

Step1: Create a root reducer function

Reducer function always talks to the store, updates the store and creates the store.

allreducer.js (create in src folder)

import empdata_json from '../data/employeedata.json';

function empdata(state = empdata_json, action){
    switch(action.type){
        default:
            return state;
    }
}

export default empdata;

The above reducer function accepts two arguments current state and an action object. Initial values of this state is set to dummy data imported from employeedata.json file. Whenever this function is invoked, based on the action type, the corresponding block is executed in the switch. Though the logic of the blocks may differ, they all pretty much do the same. Take the payload and return new state to update the store.

Step2: Create the store, make it available to whole tree

We have to import the above reducer function and use createStore method available in redux to create the store.

To make this store available to the whole app component tree we wrap the whole nested app component tree in Provider component available in react-redux store as props.

The <Provider /> makes the Redux store available to any nested components that have been wrapped in the connect() function. We will learn about connect() in the next step. Since any React component in a React Redux app can be connected, most applications will render a <Provider> at the top level, with the entire app’s component tree inside of it.

Normally, you can’t use a connected component unless it is nested inside of a <Provider>.

index.js (modify this file)

import React from 'react';
import ReactDOM from 'react-dom';
import './index.css';
import App from './App';
import * as serviceWorker from './serviceWorker';
import empdatareducer from '../src/allreducer';
import {createStore} from 'redux';
import {Provider} from 'react-redux';

const store = createStore(empdatareducer);
ReactDOM.render(<Provider store={store}><App /></Provider>, document.getElementById('root'));

// If you want your app to work offline and load faster, you can change
// unregister() to register() below. Note this comes with some pitfalls.
// Learn more about service workers: https://bit.ly/CRA-PWA
serviceWorker.unregister();

Step3: Connecting with redux

Now the store is available to whole component tree, components have to be connected to redux and be mapped with the store to get data(as props). The react-redux library comes with a function called connect, which is how you can feed data from Redux’s store into your React components and also dispatch actions to reducer function. The connect function is commonly passed 1 or 2 arguments:

First, a mapStateToProps function that takes out pieces of state out of Redux and assigns them to props that your React component will use.

And often a second argument: a mapDispatchToProps function which binds action creator functions(return action object) with dispatch, So you can just write props.actionName() and you don’t have to write dispatch: actionObject, for every event or action. The dispatch is invoked as soon as action creator returns an action object.

Remember, you can’t dispatch an action to a specific reducer function, Whenever there is a dispatch, it is received by all reducers, only those blocks where action type is matched are executed.

As our FormdataTable.js component requires the store data, we map its props with store.

FormdataTable.js (modify this file)

import React,{Component} from 'react';
import {connect } from 'react-redux';

class FormdataTable extends Component {
  state = {  }
  render() {
      //console.log(this.props.empdata);
      return (
          <div className="col-md-10 offset-md-1">
              <h2 style={{"textAlign":"center", "marginTop":"20px", "marginBottom":"20px"}}>New Employee Details</h2>
              <table className="table table-hover">
                  <thead>
                      <tr>
                          <th scope="col">#</th>
                          <th scope="col">Fist Name</th>
                          <th scope="col">Last Name</th>
                          <th scope="col">Email</th>
                          <th scope="col">Mobile</th>
                          <th scope="col">City</th>
                          <th scope="col">Options</th>
                      </tr>
                  </thead>
                  <tbody>
                      {
                      this.props.empdata.map((emp, index) => {
                      return(
                          <tr key={index}>
                                  <td>{index+1}</td>
                                  <td>{emp.firstname}</td>
                                  <td>{emp.lastname}</td>
                                  <td>{emp.email}</td>
                                  <td>{emp.mobile}</td>
                                  <td>{emp.city}</td>
                                  <td>
                                      <button
                                          type="button"
                                          onClick={()=>this.handleEdit(index)}
                                          className="btn btn-sm btn-primary">Edit
                                      </button>
                                      {" | "}
                                      <button
                                          type="button"
                                          onClick={()=>this.handleDelete(index)}
                                          className="btn btn-sm btn-danger">Delete
                                      </button>
                                  </td>
                          </tr>
                      )})
                      }
                  </tbody>
              </table>
          </div>
      );
  }
}
function mapStateToProps(state){
     return {
       empdata : state
   };
}
export default connect(mapStateToProps,null)(FormdataTable);

Our signup formdata is to be updated with the store, this is an action and should be dispatched. We will first write an action creator function and then bind it with dispatch using bindActionCreators available in redux. Only then we could dispatch it to the reducer to update with the store.

allactions.js (create in src folder)

export function addNewEmp(payload){
   const action = {
       type: "ADD_NEW_EMP",
       payload
   }
   return action;
}

In the above case payload is our formdata. Let us bind this action creator with dispatch and connect to redux to dispatch our action object containing payload(formdata).

SignupForm.js (modify this file)

import React,{Component} from 'react';
import {Form, FormGroup, FormControl, FormLabel, Button} from 'react-bootstrap';

import {connect} from 'react-redux';
import {bindActionCreators} from 'redux';
import {addNewEmp} from '../allactions';

class SignupForm extends Component {
  constructor(props) {
  super(props);
  this.state = {
      firstname: '',
      lastname: '',
      email: '',
      mobile: '',
      city: ''
  };

  // This binding is necessary to make `this` work in the callback
  this.handleSubmit = this.handleSubmit.bind(this);
  }
handleSubmit(){
  let formdata = {
      firstname: this.state.firstname,
      lastname: this.state.lastname,
      email: this.state.email,
      mobile: this.state.mobile,
      city: this.state.city
  };
  this.props.addNewEmp(formdata);
  console.log("submitted",formdata);
  this.setState({
      firstname: '',
      lastname: '',
      email: '',
      mobile: '',
      city: ''
  });
}
  render() {
      return (
          <div className="col-md-4 offset-md-4">
              <Form>
                  <h2 style={{"textAlign":"center", "marginTop":"20px"}}>Enter Employee Details</h2>
                  <hr/>
                  <FormGroup>
                      <FormLabel>Firstname</FormLabel>
                      <FormControl
                          type="text"
                          name="firstname"
                          placeholder="Firstname"
                          onChange={e => {
                              this.setState({[e.target.name]:e.target.value});
                          }}
                          value = {this.state.firstname}
                      />
                  </FormGroup>
            
                  <FormGroup>
                      <FormLabel>Lastname</FormLabel>
                      <FormControl
                          type="text"
                          name="lastname"
                          placeholder="Lastname"
                          onChange={e => {
                              this.setState({[e.target.name]:e.target.value});
                          }}
                          value = {this.state.lastname}
                      />
                  </FormGroup>

                  <FormGroup>
                      <FormLabel>Email</FormLabel>
                      <FormControl
                          type="text"
                          name="email"
                          placeholder="Email"
                          onChange={e => {
                              this.setState({[e.target.name]:e.target.value});
                          }}
                          value = {this.state.email}
                      />
                  </FormGroup>

                  <FormGroup>
                      <FormLabel>Mobile</FormLabel>
                      <FormControl
                          type="text"
                          name="mobile"
                          placeholder="Mobile"
                          onChange={e => {
                              this.setState({[e.target.name]:e.target.value});
                          }}
                          value = {this.state.mobile}
                      />
                  </FormGroup>

                  <FormGroup>
                      <FormLabel>City</FormLabel>
                      <FormControl
                          type="text"
                          name="city"
                          placeholder="City/Village"
                          onChange={e => {
                              this.setState({[e.target.name]:e.target.value});
                          }}
                          value = {this.state.city}
                  />
                  </FormGroup>
                  <Button onClick={this.handleSubmit}>Submit</Button>
              </Form>
          </div>
       );
  }
}

function mapDispatchToProps(dispatch){
   return bindActionCreators({addNewEmp},dispatch)
}
export default connect (null, mapDispatchToProps)(SignupForm)

That simple, we have successfully managed state with redux, now let’s test it.

Now, please try to implement the delete functionality by yourself like shown below

Before deletion:

After deletion:

If you could do it you are doing great. If not please feel free to refer my code files below.

https://github.com/koushik-bitzop/newempdetails/commit/cd9712a89c4de7625cecbd6e494199f0ad5dd20f?diff=split

I hope this article was helpful to you in learning redux. Thanks for the read!

This story is authored by Koushik. He is a software engineer and a keen data science and machine learning enthusiast.

Case Study – Apache Log Analysis using Logstash-Elasticsearch-Kibana (ELK) Stack

In the previous blog,  we loaded apache log data into Elasticsearch with Logstash.  Now our goal  is to read this data into Kibana to help us run some analytics use cases. Quick note – the entire log file will not only be read into Elasticsearch but will also be displayed onto the standard output. It takes about 3-4 minutes to display the entire log file. ( remove “ignore_older => 0” from the config file to read older logs). To cross check if the data has been loaded and indices have been created in Elasticsearch,  type the following in the browser http://localhost:9200/_cat/indices ( replace “localhost” by the server name that Elasticsearch is running on). This will show all the indexes created, logstash will create indexes that start as logstash-*. Once you find logstash indexes, its time to get them into Kibana.

Kibana accesses Elasticsearch indices using “index patterns”.  We specify the  pattern of the index name we are searching for, and create an index pattern for Kibana to fetch the data from Elasticsearch. If the difference between index name and index pattern is not immediately clear, please wait till we create index patterns in Kibana.

Log into Kibana from browser using http://localhost:5601/ (replace “localhost” by IP/name of the server Kibana is running on). Kibana home page will open up, if it doesn’t please check that Elasticsearch and Kibana are up and running on the server. In case you need to troubleshoot, please check the earlier post on troubleshooting kibana.

From Kibana home page (left side Menu), click on “Management->Index Patterns-> “+Create Index Pattern button. The following page opens up

In the Index Patterns field, type “logstash-*” and Kibana will display all the indexes in Elasticsearch whose name matches the given pattern. Click on “next” and choose “@timestamp” so we can filter our data by time.

Click on “Create index pattern” button and an index pattern will be created with all the fields being displayed

With index pattern created, we are ready to use apache_log data in Kibana. Click on “Discover” from left side Menu and choose logstash* from the drop down and all the data from the log will be displayed here. If you are using the same log as mine, initially you will not be able to see any data. That’s because the filter field on the right corner of the page will default to time “last 15 minutes”. Since, this log is an old one, click on the time and choose “Quick ” and then select “last 5 years” option and bingo! the log data shows up on the screen.

If the above setting is not clear, please check the screenshot below

In case you need a refresher on Kibana visualizations, check this out. You can use Discoverer to get a pie-chart of the different requests coming in. So let’s say you want to analyze the various request keywords for your web server traffic. This visualization shows the various requests (aggregate by “Terms” and field is “request.keyword”) that hit the apache server.

How does it help? Well, for websites with huge volume of traffic, this helps understand the pattern of resource consumption. Common questions that we can answer:

  • Is the new blog post garnering all the attention?
  • Is it the new pair of shoes that are being seen so frequently?
  • Are people interested in self help books or easy comedy?

Another use-case may be to analyze the HTTP response codes of the web server. We are pulling up the same pie-chart for the different response codes server has generated.

What do we infer from this visual? Well, is the web server able to provide a proper response as expected? Are we returning too many ‘page not found’ errors? Why do we have too many ‘authentication failed’ errors? Are a majority of users really forgetting their passwords or something malicious is going on?

In addition, we can also create dashboard level metrics for error code like so.

For time-series analysis, we need to click on Visualise->Time series->Visual Builder. Here, the screen is divided in two horizontal planes. In the bottom plane, choose “Panel Options” tab and type the index pattern as “logstash*” and the time series will show up as a graph like so

It shows the access rate for the given time period. Since, most of the log data is around the same time, let’s change the date (from Last 5 years) to around May 18, 2015 (we can change the date as below)

and the output changes like shown below. Here, the log data has been generated for every 5 times for the particular day selected.

Let’s say this is an access log for an online shopping website and a lot of users have accessed this on May 18 2015. Why? Probably because the company has come up with certain discounts or launched a new product.  If this data is considered in real time, we can visualize the number of people accessing the server currently. If its the festive season, and we are expecting a lot of traffic, we can also foresee when the servers will be stretched based on the historical pattern and act accordingly.  It also helps in marketing and sales: a lot of people are currently logged in, should I add an additional 5% discount to amp up my sales immediately?

If it’s a banking institution that the system is designed for, we can ask questions such as: Why are so many users trying to access the system at same time? Are they really bonafide users or some malware trying to break into the server? By installing a few plugins, we can also visualize which geographic area the requests are originating from . So, we will even get to know if requests are being made from a certain place. These visualizations are really powerful and user friendly and one doesn’t need to have a lot of technical expertise to use Kibana.

That’s about it on this one. I hope the blog posts in this series on ELK stack have been useful for the interested folks to sharpen their data analytics and visualizations chops.  

P.S. : some quick troubleshooting tips on Kibana index patterns:

What if the “create index pattern” page is displaying loading wheel indefinitely on clicking “create index pattern”?

Since Kibana opens in a webpage, we can use browser troubleshooting to see what’s wrong on our page. Right click on the page->Inspect->choose console tab. This shouldn’t show any errors, there can be log messages but not error. I had the forbidden error in red. On trying to refresh any index pattern, this error came up on screen Config: Error 403 Forbidden: blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];

This implies, the indexes are all read-only and hence no changes are possible. This happens when kibana runs out of space on the server it’s installed on. We ran out of disk space and had to add more space. Kibana forces read-only on the indexes but does not get them back to normal state in an out-of-space situation. We had to manually move them out like so

curl -XPUT http://localhost:9200/_all/_settings -d '{"index.blocks.read_only_allow_delete": null}'

(localhost to be replaced by your server IP/name). On completing successfully, it displays {“acknowledged:true”} . You can refresh Kibana from the webpage and try and create the index patterns now.