Serverless Architecture for Lightening Fast Distributed File Transfer on AWS Data Lake

Today, we are very excited to share our insights on setting up a serverless architecture for setting up a lightening fast way* to copy large number of objects across multiple folders or partitions in an AWS data lake on S3. Typically in a data lake, data is kept across various zones depending on data lifecycle. For example, as the data arrives from source, it can be kept in the raw zone and then post processing moved to a processed zone, so that the lake is ready for the next influx of data. The rate of object transfer is a crucial factor, as it affects the overall efficiency of the data processing lifecycle in the data lake.

*In our tests, we copied more than 300K objects ranging from 1KB to 10GB in size from the raw zone into the processed zone. Compared to the best known tool for hyper fast file transfer on AWS called s3s3mirror, we were able to finish this transfer of about 24GB of data in about 50% less time. More details have been provided at the end of the post.

We created a lambda invoke architecture that copies files/objects concurrently. The below picture accurately depicts it.


OMS (Orchestrator-Master-Slave) Lambda Architecture

For example, If we have an S3 bucket with the following folder structure with the actual objects further contained within this hierarchy of folders, sub-folders and partitions.

S3 file structure

Let us look at how we can use OMS Architecture (Orchestrator-Master-Slave) to achieve hyper-fast distributed/concurrent file transfer. The above architecture can be divided into two halves, Orchestrator-Master, Master-Slave.

Orchestrator-Master

The Orchestrator simply invokes a Master Lambda for each folder. Each Master then iterates the objects in that folder (including all sub-folders and partitions) and invokes a Slave Lambda for each object to copy it to the destination.

Orchestrator-Master Lambda invoke

Let us look at the Orchestrator Lambda code.
Source-to-Destination-File-Transfer-Orchestrator:

import os
import boto3
import json
from datetime import datetime

client_lambda = boto3.client('lambda')
master_lambda = "Source-to-Destination-File-Transfer-Master"

folder_names = ["folder1", "folder2", "folder3", "folder4", "folder5", "folder6", "folder7", "folder8", "folder9"]

def lambda_handler(event, context):
    
    t = datetime.now()
    print("start-time",t)
    
    try:            
        for folder_name in folder_names:
            
            payload_data = {
              'folder_name': folder_name
            }                
        
            payload = json.dumps(payload_data)
            client_lambda.invoke(
                FunctionName = master_lambda,
                InvocationType = 'Event',
                LogType = 'None',
                Payload = payload
            )
            print(payload)
            
    except Exception as e:
        print(e)
        raise e

Master-Slave

Master-Slave Lambda invoke

Let us look at the Master Lambda code.
Source-to-Destination-File-Transfer-Master:

import os
import boto3
import json
from botocore.exceptions import ClientError

s3 = boto3.resource('s3')
client_lambda = boto3.client('lambda')

source_bucket_name = 'source bucket name'
source_bucket = s3.Bucket(source_bucket_name)

slave_lambda = "Source-to-Destination-File-Transfer-Slave"

def lambda_handler(event, context):

    try:
        source_prefix = "" #add if any
        source_prefix = source_prefix + "/" + event['table_name'] + "/"

        for obj in source_bucket.objects.filter(Prefix = source_prefix):
            path = obj.key
            payload_data = {
               'file_path': path
            }
            payload = json.dumps(payload_data)
            client_lambda.invoke(
                FunctionName = slave_lambda,
                InvocationType = 'Event',
                LogType = 'None',
                Payload = payload
            )

    except Exception as e:
        print(e)
        raise e

Slave

Let us look at the Slave Lambda code.
Source-to-Destination-File-Transfer-Slave:

import os
import boto3
import json
import re
from botocore.exceptions import ClientError

s3 = boto3.resource('s3')

source_prefix = "" #add if any
source_bucket_name = "source bucket name"
source_bucket = s3.Bucket(source_bucket_name )

destination_bucket_name = "destination bucket name"
destination_bucket = s3.Bucket(destination_bucket_name )

def lambda_handler(event, context):
    try:
        destination_prefix = "" #add if any
        
        source_obj = { 'Bucket': source_bucket_name, 'Key': event['file_path']}
        file_path = event['file_path']
        
        #copying file
        new_key = file_path.replace(source_prefix, destination_prefix)
        new_obj = source_bucket.Object(new_key)
        new_obj.copy(source_obj)
        
    except Exception as e:
        raise e

You must ensure that these Lambda functions have been configured to meet the maximum execution time and memory limit constraints as per your case. We tested by setting the upper limit of execution time as 5 minutes and 1GB of available memory.

Calculating the Rate of File Transfer

To calculate the rate of file transfer we are printing start time at the beginning of Orchestrator Lambda execution. Once the file transfer is complete, we use another lambda to extract the last modified date attribute of the last copied object.

Extract-Last-Modified:

import json
import boto3
from datetime import datetime
from dateutil import tz

s3 = boto3.resource('s3')

destination_bucket_name = "destination bucket name"
destination_bucket = s3.Bucket(destination_bucket_name)
destination_prefix = "" #add if any

def lambda_handler(event, context):
    
    #initializing with some old date
    last_modified_date = datetime(1940, 7, 4).replace(tzinfo = tz.tzlocal()) 

    for obj in my_bucket.objects.filter(Prefix = destination_prefix):
        
        obj_date = obj.last_modified.replace(tzinfo = tz.tzlocal())
        
        if last_modified_date < obj_date:
            last_modified_date = obj_date
    
    print("end-time: ", last_modified_date)

Now we have both start-time from Orchestrator Lambda and end-time from Extract-last-modified Lambda, their difference is the time taken for file transfer.

Before writing this post, we copied 24.1GB of objects using the above architecture, results are shown in the following screenshots:

duration	=	end-time - start-time
		=	10:04:49 - 10:03:28
		=	00:01:21 (hh-mm-ss)

To check the efficiency of our OMS Architecture, we compared the results of OMS with s3s3mirror, a utility for mirroring content from one S3 bucket to another or to/from the local filesystem. Below screenshot has the file transfer stats of s3s3 for the same set of files:

As we see the difference was 1 minutes and 8 seconds for total data transfer of about 24GB, it can be much higher for large data sets if we add more optimizations. I have only shared a generalized view of the OMS Architecture, it can be further fine-tuned to specific needs and get a highly optimized performance. For instance, if you have partitions in each folder and the OMS Architecture could yield much better results if you invoke Master Lambda for each partition inside the folder instead of invoking the master just at the folder level.

Thanks for the read. Looking forward to your thoughts.

This story is co-authored by Koushik and Subbareddy. Koushik is a software engineer and a keen data science and machine learning enthusiast. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Machine Learning Operations (MLOps) Pipeline using Google Cloud Composer

In an earlier post, we had described the need for automating the Data Engineering pipeline for Machine Learning based systems. Today, we will expand the scope to setup a fully automated MLOps pipeline using Google Cloud Composer.

Cloud Composer

Cloud Composer is official defined as a fully managed workflow orchestration service that empowers you to author, schedule, and monitor pipelines that span across clouds and on-premises data centers. Built on the popular Apache Airflow open source project and operated using the Python programming language, Cloud Composer is free from lock-in and easy to use.

So let’s get on with the required steps to create this MLOps infrastructure on Google Cloud Platform

Creating a Cloud Composer Environment

Step1: Please enable the Cloud Composer API.

Step2: Go to create environment page in GCP console. Composer is available in Big Data section.

Step3: Click on create to start creating a Composer environment

Step4: Please select the Service account which has the required permissions to access GCS, Big Query, ML Engine and  Composer environment. The required roles for accessing Composer environment is Composer Administrator and Composer Worker. 
For more details about access control in Composer environment please see this.

Step5: Please use Python Version 3 and latest Image version.

Step6: Click on create. It will take about 15-20 minutes to create the environment. Once it completes, the environment page shall look like the following.

Click on Airflow to see Airflow WebUI. The Airflow WebUI looks as follows

DAGs folder is where our dag file is stored. DAG folder is nothing but a folder inside a GCS bucket which is created by the environment. To know more about the concept of DAG and general introduction to Airflow, please refer to this post.

You could see Composer related logs in Logging.

Step7: Please add the following PyPI packages in Composer environment.

Click on created environment and navigate to PYPI packages and click on edit to add packages

The required packages are:

# to read data from MongoDB
pymongo==3.8.0
oauth2client==4.1.3
# to read data from firestore
google-cloud-firestore==1.3.0
firebase-admin==2.17.0
google-api-core==1.13.0

Create a ML model

Step1: Please create a folder structure like the following on your instance.

ml_model
├── setup.py
└── trainer
    ├── __init__.py
    └── train.py

Step2: Please place the following code in train.py file, which shall upload the model to GCS bucket as shown below. This model would be used to create model versions as explained a bit later.

from google.cloud import bigquery
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
import numpy as np
from google.cloud import storage
import datetime
import json
import pickle
client = bigquery.Client()
sql = '''
SELECT *
FROM `<PROJECT_ID>.<DATASET>.<TABLENAME>`
'''

df = client.query(sql).to_dataframe()
df = df[['is_stressed', 'is_engaged', 'status']]

df['is_stressed'] = df['is_stressed'].fillna('n')
df['is_engaged'] = df['is_engaged'].fillna('n')
df['stressed'] = np.where(df['is_stressed']=='y', 1, 0)
df['engaged'] = np.where(df['is_engaged']=='y', 1, 0)
df['status'] = np.where(df['status']=='complete', 1, 0)

feature_cols = ['stressed', 'engaged']
X = df[feature_cols]
y = df.status
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.25,random_state=0)
logreg = LogisticRegression()
logreg.fit(X_train,y_train)
pkl_filename = "model.pkl"  
with open(pkl_filename, 'wb') as file:  
    pickle.dump(logreg, file)
BUCKET_NAME=BUCKET_NAME# Upload the model to GCS
bucket = storage.Client().bucket(BUCKET_NAME)
file_path = datetime.datetime.now().strftime('machine_learning/models/%Y%m%d_%H%M%S')
blob = bucket.blob('{}/{}'.format(
    file_path,
    pkl_filename))
blob.upload_from_filename(pkl_filename)

file_location = 'gs://{BUCKET_NAME}/{file_path}'.format(BUCKET_NAME=BUCKET_NAME, file_path=file_path)
file_config = json.dumps({'file_location': file_location})

bucket = storage.Client().bucket(COMPOSER_BUCKET)
blob = bucket.blob('data/file_config.json')
blob.upload_from_string(file_config)

Step3: Create an empty init.py file inside the trainer directory.

Step4: Please place the following code in setup.py file. The setup.py file contains required packages to execute code.

import setuptools

REQUIRED_PACKAGES = [
    'pandas-gbq==0.3.0',
    'cloudml-hypertune',
    'google-cloud-bigquery==1.14.0',
    'urllib3'
]

setuptools.setup(
    name='ml_model',
    version='1.0',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
    description='',
)

Step5: Packaging the code using the following command. It creates a gz file inside ml_model directory.

python3 setup.py sdist

Step6: The package name is the name that is specified in setup.py file. The package name becomes ml_model-1.0.tar.gz
Copy the package to gs://{your-GCS-bucket}/machine_learning/. This becomes the base directory for your machine learning activities described in this post.

Creating a DAG

In this use case, we have created a DAG file which exports some table data from a MongoDB instance into a GCS bucket and then creates a BigQuery table off of that exported data. It trains a model and creates version for that model. The DAG file supports full data extraction and daily data extraction explained in the code below using a variable tot_data. This variable is extracted from Airflow configurations set by the user. This process is also described later in this post.

Please place the following code in the DAG file.

import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
from airflow.operators.python_operator import PythonOperator
import pprint
import json
import re

from pymongo import MongoClient
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import storage
import os

from airflow import models
from mlengine_operator import MLEngineTrainingOperator, MLEngineVersionOperator

ts = datetime.now()
today = str(ts.date()) + 'T00:00:00.000Z'
yester_day = str(ts.date() - timedelta(days = 1)) + 'T00:00:00.000Z'

str_ts = ts.strftime('%Y_%m_%d_%H_%m_%S')

config = Variable.get("mongo_conf", deserialize_json=True)
host = config['host']
db_name = config['db_name']
table_name = config['table_name']
file_prefix = config['file_prefix']
bucket_name = config['bucket_name']
# file_path = file_prefix + '/' + table_name + '.json'
file_path = '{file_prefix}/{table_name}/{table_name}_{str_ts}.json'.format(file_prefix=file_prefix, str_ts=str_ts, table_name=table_name)
file_location = 'gs://' + bucket_name + '/' + file_prefix + '/' + table_name + '/' + table_name + '_*.json'
config['file_location'] = file_location
bq_dataset = config['bq_dataset']
tot_data = config['tot_data'].lower()

BUCKET_NAME = config['ml_configuration']['BUCKET_NAME']
BASE_DIR = config['ml_configuration']['BASE_DIR']
PACKAGE_NAME = config['ml_configuration']['PACKAGE_NAME']
TRAINER_BIN = os.path.join(BASE_DIR, 'packages', PACKAGE_NAME)
TRAINER_MODULE = config['ml_configuration']['TRAINER_MODULE']
RUNTIME_VERSION = config['ml_configuration']['RUNTIME_VERSION']
PROJECT_ID = config['ml_configuration']['PROJECT_ID']
MODEL_NAME = config['ml_configuration']['MODEL_NAME']

MODEL_FILE_BUCKET = config['ml_configuration']['MODEL_FILE_BUCKET']
model_file_loc = config['ml_configuration']['MODEL_FILE_LOCATION']

bucket = storage.Client().bucket(MODEL_FILE_BUCKET)
blob = bucket.get_blob(model_file_loc)
file_config = json.loads(blob.download_as_string().decode("utf-8"))
export_uri = file_config['file_location']

def flatten_json(y):
    out = {}

    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

def mongoexport():
        client = storage.Client()
        bucket = client.get_bucket(bucket_name)
        blob = bucket.blob(file_path)

        client = MongoClient(host)
        db = client[db_name]
        tasks = db[table_name]
        pprint.pprint(tasks.count_documents({}))
        # if tot_data is set to 'yes' in airflow configurations, full data 
        # is processed.  
        if tot_data == 'no':
          query = {"edit_datetime": { "$gte": yester_day, "$lt": today}}
          print(query)
          data = tasks.find(query)
        else:
          data = tasks.find()
        emp_list = []
        for record in data:
                emp_list.append(json.dumps(record, default=str))
        flat_list =[]
        for data in emp_list:
                flat_list.append((flatten_json(json.loads(data))))
        data = '\n'.join(json.dumps({re.sub('[^0-9a-zA-Z_ ]+', '', str(k)).lower().replace(' ', '_'): str(v) for k, v in record.items()}) for record in flat_list)
        blob.upload_from_string(data)

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG('ml_pipeline', schedule_interval=None, default_args=default_args) as dag:

    # priority_weight has type int in Airflow DB, uses the maximum.
    pymongo_export_op = PythonOperator(
        task_id='pymongo_export',
        python_callable=mongoexport,
        )

    update_bq_table_op = BashOperator(
        task_id='update_bq_table',
        bash_command='''
        bq rm -f {bq_dataset}.{table_name}
        bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON --ignore_unknown_values=True {bq_dataset}.{table_name} {file_location}
        '''.format(bq_dataset=bq_dataset, table_name=table_name, file_location=file_location)
        )

    date_nospecial = '{{ execution_date.strftime("%Y%m%d") }}'
    date_min_nospecial = '{{ execution_date.strftime("%Y%m%d_%H%m") }}'
    uuid = '{{ macros.uuid.uuid4().hex[:8] }}'

    training_op = MLEngineTrainingOperator(
      task_id='submit_job_for_training',
      project_id=PROJECT_ID,
      job_id='{}_{}_{}'.format(table_name, date_nospecial, uuid),
      package_uris=[os.path.join(TRAINER_BIN)],
      training_python_module=TRAINER_MODULE,
      training_args=[
          '--base-dir={}'.format(BASE_DIR),
          '--event-date={}'.format(date_nospecial),
      ],
      region='us-central1',
      runtime_version=RUNTIME_VERSION,
      python_version='3.5')

    create_version_op = MLEngineVersionOperator(
      task_id='create_version',
      project_id=PROJECT_ID,
      model_name=MODEL_NAME,
      version={
          'name': 'version_{}_{}'.format(date_min_nospecial, uuid),
          'deploymentUri': export_uri,
          'runtimeVersion': RUNTIME_VERSION,
          'pythonVersion': '3.5',
          'framework': 'SCIKIT_LEARN',
      },
      operation='create')

    pymongo_export_op >> update_bq_table_op >> training_op >> create_version_op

Once file is created, please upload the file to DAGs folder. And also please add the following plugin dependency file named mlengine_operator in DAGs folder.
Place the following code in mlengine_operator.py file.

import re

from apiclient import errors

from airflow.contrib.hooks.gcp_mlengine_hook import MLEngineHook
from airflow.exceptions import AirflowException
from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log


def _normalize_mlengine_job_id(job_id):

    # Add a prefix when a job_id starts with a digit or a template
    match = re.search(r'\d|\{{2}', job_id)
    if match and match.start() is 0:
        job = 'z_{}'.format(job_id)
    else:
        job = job_id

    # Clean up 'bad' characters except templates
    tracker = 0
    cleansed_job_id = ''
    for m in re.finditer(r'\{{2}.+?\}{2}', job):
        cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_',
                                  job[tracker:m.start()])
        cleansed_job_id += job[m.start():m.end()]
        tracker = m.end()

    # Clean up last substring or the full string if no templates
    cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_', job[tracker:])

    return cleansed_job_id


class MLEngineBatchPredictionOperator(BaseOperator):
   
    template_fields = [
        '_project_id',
        '_job_id',
        '_region',
        '_input_paths',
        '_output_path',
        '_model_name',
        '_version_name',
        '_uri',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 job_id,
                 region,
                 data_format,
                 input_paths,
                 output_path,
                 model_name=None,
                 version_name=None,
                 uri=None,
                 max_worker_count=None,
                 runtime_version=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):
        super(MLEngineBatchPredictionOperator, self).__init__(*args, **kwargs)

        self._project_id = project_id
        self._job_id = job_id
        self._region = region
        self._data_format = data_format
        self._input_paths = input_paths
        self._output_path = output_path
        self._model_name = model_name
        self._version_name = version_name
        self._uri = uri
        self._max_worker_count = max_worker_count
        self._runtime_version = runtime_version
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

        if not self._project_id:
            raise AirflowException('Google Cloud project id is required.')
        if not self._job_id:
            raise AirflowException(
                'An unique job id is required for Google MLEngine prediction '
                'job.')

        if self._uri:
            if self._model_name or self._version_name:
                raise AirflowException('Ambiguous model origin: Both uri and '
                                       'model/version name are provided.')

        if self._version_name and not self._model_name:
            raise AirflowException(
                'Missing model: Batch prediction expects '
                'a model name when a version name is provided.')

        if not (self._uri or self._model_name):
            raise AirflowException(
                'Missing model origin: Batch prediction expects a model, '
                'a model & version combination, or a URI to a savedModel.')

    def execute(self, context):
        job_id = _normalize_mlengine_job_id(self._job_id)
        prediction_request = {
            'jobId': job_id,
            'predictionInput': {
                'dataFormat': self._data_format,
                'inputPaths': self._input_paths,
                'outputPath': self._output_path,
                'region': self._region
            }
        }

        if self._uri:
            prediction_request['predictionInput']['uri'] = self._uri
        elif self._model_name:
            origin_name = 'projects/{}/models/{}'.format(
                self._project_id, self._model_name)
            if not self._version_name:
                prediction_request['predictionInput'][
                    'modelName'] = origin_name
            else:
                prediction_request['predictionInput']['versionName'] = \
                    origin_name + '/versions/{}'.format(self._version_name)

        if self._max_worker_count:
            prediction_request['predictionInput'][
                'maxWorkerCount'] = self._max_worker_count

        if self._runtime_version:
            prediction_request['predictionInput'][
                'runtimeVersion'] = self._runtime_version

        hook = MLEngineHook(self._gcp_conn_id, self._delegate_to)

        # Helper method to check if the existing job's prediction input is the
        # same as the request we get here.
        def check_existing_job(existing_job):
            return existing_job.get('predictionInput', None) == \
                prediction_request['predictionInput']

        try:
            finished_prediction_job = hook.create_job(
                self._project_id, prediction_request, check_existing_job)
        except errors.HttpError:
            raise

        if finished_prediction_job['state'] != 'SUCCEEDED':
            self.log.error('MLEngine batch prediction job failed: {}'.format(
                str(finished_prediction_job)))
            raise RuntimeError(finished_prediction_job['errorMessage'])

        return finished_prediction_job['predictionOutput']


class MLEngineModelOperator(BaseOperator):
    template_fields = [
        '_model',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 model,
                 operation='create',
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):
        super(MLEngineModelOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._model = model
        self._operation = operation
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

    def execute(self, context):
        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)
        if self._operation == 'create':
            return hook.create_model(self._project_id, self._model)
        elif self._operation == 'get':
            return hook.get_model(self._project_id, self._model['name'])
        else:
            raise ValueError('Unknown operation: {}'.format(self._operation))


class MLEngineVersionOperator(BaseOperator):
    
    template_fields = [
        '_model_name',
        '_version_name',
        '_version',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 model_name,
                 version_name=None,
                 version=None,
                 operation='create',
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):

        super(MLEngineVersionOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._model_name = model_name
        self._version_name = version_name
        self._version = version or {}
        self._operation = operation
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

    def execute(self, context):
        if 'name' not in self._version:
            self._version['name'] = self._version_name

        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)

        if self._operation == 'create':
            assert self._version is not None
            return hook.create_version(self._project_id, self._model_name,
                                       self._version)
        elif self._operation == 'set_default':
            return hook.set_default_version(self._project_id, self._model_name,
                                            self._version['name'])
        elif self._operation == 'list':
            return hook.list_versions(self._project_id, self._model_name)
        elif self._operation == 'delete':
            return hook.delete_version(self._project_id, self._model_name,
                                       self._version['name'])
        else:
            raise ValueError('Unknown operation: {}'.format(self._operation))


class MLEngineTrainingOperator(BaseOperator):
    
    template_fields = [
        '_project_id',
        '_job_id',
        '_package_uris',
        '_training_python_module',
        '_training_args',
        '_region',
        '_scale_tier',
        '_runtime_version',
        '_python_version',
        '_job_dir'
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 job_id,
                 package_uris,
                 training_python_module,
                 training_args,
                 region,
                 scale_tier=None,
                 runtime_version=None,
                 python_version=None,
                 job_dir=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 mode='PRODUCTION',
                 *args,
                 **kwargs):
        super(MLEngineTrainingOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._job_id = job_id
        self._package_uris = package_uris
        self._training_python_module = training_python_module
        self._training_args = training_args
        self._region = region
        self._scale_tier = scale_tier
        self._runtime_version = runtime_version
        self._python_version = python_version
        self._job_dir = job_dir
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to
        self._mode = mode

        if not self._project_id:
            raise AirflowException('Google Cloud project id is required.')
        if not self._job_id:
            raise AirflowException(
                'An unique job id is required for Google MLEngine training '
                'job.')
        if not package_uris:
            raise AirflowException(
                'At least one python package is required for MLEngine '
                'Training job.')
        if not training_python_module:
            raise AirflowException(
                'Python module name to run after installing required '
                'packages is required.')
        if not self._region:
            raise AirflowException('Google Compute Engine region is required.')

    def execute(self, context):
        job_id = _normalize_mlengine_job_id(self._job_id)
        training_request = {
            'jobId': job_id,
            'trainingInput': {
                'scaleTier': self._scale_tier,
                'packageUris': self._package_uris,
                'pythonModule': self._training_python_module,
                'region': self._region,
                'args': self._training_args,
            }
        }

        if self._runtime_version:
            training_request['trainingInput']['runtimeVersion'] = self._runtime_version

        if self._python_version:
            training_request['trainingInput']['pythonVersion'] = self._python_version

        if self._job_dir:
            training_request['trainingInput']['jobDir'] = self._job_dir

        if self._mode == 'DRY_RUN':
            self.log.info('In dry_run mode.')
            self.log.info('MLEngine Training job request is: {}'.format(
                training_request))
            return

        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)

        # Helper method to check if the existing job's training input is the
        # same as the request we get here.
        def check_existing_job(existing_job):
            return existing_job.get('trainingInput', None) == \
                training_request['trainingInput']

        try:
            finished_training_job = hook.create_job(
                self._project_id, training_request, check_existing_job)
        except errors.HttpError:
            raise

        if finished_training_job['state'] != 'SUCCEEDED':
            self.log.error('MLEngine training job failed: {}'.format(
                str(finished_training_job)))
            raise RuntimeError(finished_training_job['errorMessage'])

Import variables from composer_conf.json file into Airflow Variables.
Go to Airflow WebUI → Admin → Variables → Browse to file path or configure variables manually.
Please place the following in composer_conf

{
  "mongo_conf": {
    "host": "mongodb://<instance-internal-ip>:27017",
    "db_name": "DBNAME",
    "table_name": "TABLENAME",
    "file_prefix": "Folder In GCS Bucket",
    "bq_dataset": "BigQuery Dataset",
    "bucket_name": "GCS Bucket",
    "tot_data": "yes",
    "ml_configuration": {
      "BUCKET_NAME": "GCS Bucket",
      "BASE_DIR": "gs://GCS Bucket/machine_learning/",
      "PACKAGE_NAME": "PACKAGE NAME FROM setup.py FILE in ML",
      "TRAINER_MODULE": "trainer.train",
      "RUNTIME_VERSION": "1.13",
      "PROJECT_ID": "GCP Project",
      "MODEL_FILE_BUCKET": "BUCKET CREATED BY Composer Environment",
      "MODEL_FILE_LOCATION": "data/MODEL LOCATION FILE",
      "MODEL_NAME": "MODEL_NAME"
    }
  }

Please store any configuration files or credentials file that are used by Composer in the data folder in the bucket created by Composer environment.

After configuring variables accordingly, you can see the DAG named ml_pipeline in the Airflow WebUI.

Please trigger the DAG file from Airflow WebUI. Once the DAG ran successfully. It looks like the following:

Thanks for the read and look forward to your comments.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Email Deliverability Analytics using SendGrid and AWS Big Data Services

Email Deliverability Analytics using SendGrid and AWS Big Data Services

In this post, we will run though a case study to setup an email deliverability analytics pipeline using SendGrid and AWS Big Data Services such as S3, Glue and Athena. To start off, when we send mails from SendGrid to recipients. we get responses (multiple response types are possible such as processed, delivered, blocked, deferred etc) from Email Service Providers such as gmail, yahoo etc. We could use this response data to improve our Email Deliverability by analyzing this email response data. This is achieved by logging these responses (via API Gateway and Lambda function) into Amazon S3 and then analyzing them using Athena. The chain of events is put in place by using a web hook that triggers a post request to AWS API Gateway on an event notification (response) from SendGrid. The API Gateway is further configured to trigger a Lambda Function which writes the email response data into S3. We then use Glue crawler to update the metadata in data catalogue, thereby making it available for Athena to perform SQL based analysis.

Without further ado, let’s set the ball rolling. Go to SendGrid and select Settings>Mail_Settings. Click on Event Notifications

We are gonna enable it by giving an Endpoint and select the Events for which you want to get a response. 

The above endpoint points to the AWS API Gateway (shown below) which is a POST request and it triggers the Lambda function as you can see.

Now our Lambda function stores the event payload data in S3 Bucket
Lambda code:

const AWS = require('aws-sdk')
    var s3Bucket = new AWS.S3( { params: {Bucket: "Your-Bucket"} } );
    
    exports.handler = (event, context, callback) => {
        console.log(event); // the response data
        let x = "";
        event.map((item)=>{
            x = x + JSON.stringify(item) + "\n"
        }) 
        let uuid = create_UUID();
        var filePath = "receivelogs/"+uuid;
        console.log(filePath);
        var data = {
            Key: filePath, 
            Body: x
        };
        s3Bucket.putObject(data, function(err, data){
            if (err) { 
                console.log('Error uploading data: ', data);
                callback(err, null);
            } else {
                console.log('Successfully uploaded the response');
                callback(null, data);
            }
        });
};
// this function will generate Unique User ID. Used as FileName
function create_UUID(){
   var dt = new Date().getTime();
   var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
       var r = (dt + Math.random()*16)%16 | 0;
       dt = Math.floor(dt/16);
       return (c=='x' ? r :(r&0x3|0x8)).toString(16);
   });
   return uuid;
}

When you send mail, the response is triggered from SendGrid via POST request to API Gateway and then the response gets stored in S3 via Lambda function.

AWS Glue is a fully managed ETL (extract, transform, and load) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. We use a crawler to populate the AWS Glue Data Catalog with tables. Below is the step-by-step process to setup the Glue crawler to read an S3 based data source and make it available as a database table for AWS Athena based analytics.

In the step above, you may need to create a new IAM role that provides access to the underlying S3 data.

So in the steps above, we have concluded the setup for the crawler to fetch the underlying data on S3.

When you run this crawler on the S3 based data source, it updates the metadata of objects in that path in Glue data catalogue. Now, Athena can query ( SQL operations) those objects in S3 using metadata available in data catalogue. A lot of business executives aren’t comfortable with SQL queries, perhaps an add-on to this data pipeline could be using AWS Quicksight for a more BI driven analysis.

Thanks for the read!

This story is authored by Santosh Kumar. He is an AWS Cloud Engineer.

Creating an Automated Data Engineering Pipeline for Batch Data in Machine Learning

A common use case in Machine Learning life cycle is to have access to the latest training data so as to prevent model deterioration. A lot of times data scientists find it cumbersome to manually export data from data sources such as relational databases or NoSQL data stores or even distributed data. This necessitates automating the data engineering pipeline in Machine Learning. In this post, we will describe how to set up this pipeline for batch data. This workflow is orchestrated via Airflow and can be set up to run at regular intervals: such as hourly, daily, weekly, etc depending on the specific business requirements.

Quick note – In case you are interested in building a real time data engineering pipeline for ML, please look at this post.

In this use case, we are going to export MongoDB data into Google BigQuery via Cloud Storage. The updated data in BigQuery is then made available in Jupyter Notebook as a Pandas Dataframe for downstream model building and analytics. As the pipeline automates the data ingestion and preprocessing, the data scientists always have access to the latest batch data in their Jupyter Notebooks hosted on Google AI Platform. 

We have a MongoDB service running in an instance and we have Airflow and mongoexport running on docker on another instance. Mongoexport is a utility that produces a JSON or CSV export of data stored in MongoDB. Now the data in MongoDB shall be extracted and transformed using mongoexport and loaded into CloudStorage. Airflow is used to schedule and orchestrate these exports. Once the data is available in CloudStorage it could be queried in BigQuery. We then get this data from BigQuery to Jupyter Notebook. Following is a step by step sequence of steps to set up this data pipeline.

You can create an instance in GCP by going to Compute Engine. Click on create instance.

Install.sh:

sudo apt-get update
curl -fsSL https://get.docker.com -o get-docker.sh
sh get-docker.sh
sudo usermod -aG docker $USER
sudo apt-get install -y python-pip
export AIRFLOW_HOME=~/airflow
sudo pip install apache-airflow
sudo pip install apache-airflow[postgres,s3]
airflow initdb
airflow webserver -p 8080 -D
airflow scheduler -D
sudo docker pull mongo
sudo docker run --name mongo_client -d mongo

Please run the install.sh file using ./install.sh command (please make sure file is executable), which would install Docker, Airflow, pulls Mongo image and runs the mongo image in a container named mongo_client.

After installation, for Airflow webUIhttp://<public-ip-instance>:8080 (You may need to open port 8080 in the network just for your public IP)


Please make sure the Google service account in the running instance must have permissions for accessing Big Query and Cloud Storage. After installation, add the Airflow job Python file (mongo-export.py) inside the airflow/dags folder.

Before running the Python file, please make sure that you create Dataset and create the table in BigQuery. Also change the appropriate values for the MongoDB source database, MongoDB source table, Cloud Storage destination bucket and BigQuery destination dataset in the Airflow job Python file (mongo-export.py). Big Query destination table name is the same as the source table in Mongo DB. 

Mongo-export.py:

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import json
from pandas.io.json import json_normalize

# Following are default arguments which could be overridden
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'email': ['airflow@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

bucket_name = '<Your_Bucket>'
db_name = '<Database_Name>'
dataset = '<Dataset_Name>'
table_name = '<Table_Name>'


time_stamp = datetime.now()
cur_date = time_stamp.strftime("%Y-%m-%d")

# It will flatten the nested json
def flatten_json(y):
    out = {}
    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

def convert_string(y):
    string_type = {}

    def convert(x, name=''):
        if type(x) is dict:
            for a in x:
                convert(str(x[a]), name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            string_type[name[:-1]] = x

    convert(y)
    return string_type


def json_flat():
    lines = [line.rstrip('\n') for line in open('/home/dev/'+ table_name + '-unformat.json')]
    flat_list = []
    for line in lines:
        line = line.replace("\"$", "\"")
        line = json.loads(line)
        try:
            flat_list.append(json.dumps(convert_string(flatten_json(line))))
        except Exception as e:
            print(e)
    flatted_json = '\n'.join(i for i in flat_list)

    with open('/home/dev/' + table_name + '.json', 'a') as file:
        file.write(flatted_json)
    return flatted_json 

dag = DAG('mongoexport-daily-gcs-bq', default_args=default_args, params = {'cur_date': cur_date, 'db_name': db_name, 'table_name': table_name, 'dataset': dataset, 'bucket_name': bucket_name})
#exports provide a table data into docker container 
t1 = BashOperator(
    task_id='mongoexport_to_container',
    bash_command='sudo docker exec -i mongo_client sh -c "mongoexport --host=<instance_public_ip> --db {{params.db_name}} --collection {{params.table_name}} --out {{params.table_name}}-unformat.json"',
    dag=dag)

# copies exported file into instance

t2 = BashOperator(
    task_id='cp_from_container_instance',
    bash_command='sudo docker cp mongo_client:/{{params.table_name}}-unformat.json /home/dev/',
    dag=dag)

t3 = PythonOperator(
    task_id='flattening_json',
    python_callable=json_flat,
    dag=dag)
# copies the flatten data from cloud storage
t4 = BashOperator(
    task_id='cp_from_instance_gcs',
    bash_command='gsutil cp /home/dev/{{params.table_name}}.json gs://{{params.bucket_name}}/raw/{{params.table_name}}/date={{params.cur_date}}/',
    dag=dag)
# 
t5 = BashOperator(
    task_id='cp_from_instance_gcs_daily_data',
    bash_command='gsutil cp /home/dev/{{params.table_name}}.json gs://{{params.bucket_name}}/curated/{{params.table_name}}/',
    dag=dag)

# removes the existing bigquery table
t6 = BashOperator(
    task_id='remove_bq_table',
    bash_command='bq rm -f {{params.dataset}}.{{params.table_name}}',
    dag=dag)
# creates a table in bigquery
t7 = BashOperator(
    task_id='create_bq_table',
    bash_command='bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON {{params.dataset}}.{{params.table_name}} gs://{{params.bucket_name}}/curated/{{params.table_name}}/{{params.table_name}}.json',
    dag=dag)
# removes data from container
t8 = BashOperator(
    task_id='remove_file_from_container',
    bash_command='sudo docker exec -i mongo_client sh -c "rm -rf {{params.table_name}}*.json"',
    dag=dag)
# removes data from instance
t9 = BashOperator(
    task_id='remove_file_from_instance',
    bash_command='rm -rf /home/dev/{{params.table_name}}*.json',
    dag=dag)

t1 >> t2
t2 >> t3
t3 >> [t4, t5]
[t4, t5] >> t6
t6 >> t7
t7 >> [t8, t9]

Then run the python file using python <file-path>.py  

(example: python airflow/dags/mongo-export.py).

After running the python file, the dag name shows in Airflow webUI. And you could trigger the dag manually. Please make sure toggle button is in ON status

Once the job completes, the data is stored in the bucket and also available in the destination table in BigQuery. You could see the table is created in BigQuery. Click on querytable to perform SQL operations and you could see your results in the preview tab at the bottom.

Now, you could access the data in Jupyter Notebook from BigQuery. Search for notebook in GCP console. 

Run the below commands in Jupyter Notebook.

from google.cloud import bigquery
client = bigquery.Client()
sql = """
SELECT * FROM 
`<project-name>.<dataset-name>.<table-name>`
"""
df = client.query(sql).to_dataframe()
df.head(10)

This loads the BigQuery data into Pandas dataframe and can be used for model creation as required. Later when the data pipeline is run as per schedule, the refreshed data would automatically be available in this Jupyter notebook via this SQL query.

Hope this helps you to automate your batch Data Engineering pipeline for Machine Learning. 

This story is co-authored by Santosh and Subbareddy. Santosh is an AWS Cloud Engineer and Subbareddy is a Big Data Engineer.

Real Time Data Engineering Pipeline for Machine Learning

Our focus in this post is to leverage Google Cloud Platform’s Big Data Services to build an end to end Data Engineering pipeline for streaming processes.

So what is Data Engineering?
Data Engineering is associated with data specifically around data delivery, storage and processing. The main goal is to provide a reliable infrastructure for data which includes operations such as collect, move, store and prepare data.

Most companies store their data in different formats across databases and as text files. This is where data engineers come in to picture, they build pipelines that transform this data into formats that data scientists could use.

Need for Data Engineering in Machine Learning:
Data engineers are responsible for:

  • Develop machine learning models.
  • Improve existing machine learning models.
  • Research and implement best practices to enhance existing machine learning infrastructure.
  • Developing, constructing, testing and maintaining architectures, such as databases and large-scale processing systems.
  • Analyzing large and complex data sets to derive valuable insights.

This is the reference architecture used to build the end to end pipe data pipeline :

Google Cloud Platform Data Engineering Pipeline for Streaming Processes

The Google Cloud Services used in above streaming process are:

  1. Cloud Firestore: Lets us store data in cloud so that we could sync it across all other devices and also share among multiple users. It is a NoSQL query document data which lets us store, query and sync.
  2. Cloud Function: A lightweight compute solution for developers to create single-purpose, stand-alone functions that respond to cloud events without the need to manage a server or runtime environment.
  3. Cloud Pub/Sub: A fully-managed real-time messaging service that allows you to send and receive messages across independent applications.
  4. Cloud Dataflow: A cloud-based data processing service for both batch and real-time data streaming applications. It enables developers to set up data processing pipelines for integrating, preparing and analyzing large data sets.
  5. Cloud Storage: A data storage service in which data is maintained, managed, backed up remotely and made available to users over a network.
  6. BigQuery: It was designed for analyzing data on the order of billions of rows, using a SQL-like syntax. It runs on the Google Cloud Storage infrastructure and could be accessed with a REST-oriented application programming interface (API).
  7. Jupyter notebook: An open source web application that you could use to create and share documents that contain live code, equations, visualizations, and text.

Create data engineering pipeline via Firestore Streaming

Step1: Add a new record in a collection (think of it as a table), say pubsub-event in firestore.

Step2: It triggers the cloud function named pubsub_event

Document Path: pubsub-event/{eventId}  listens for changes to all pubsub-event documents.

Below is the Cloud Function written in node js which triggers whenever there is a change in our source Firestore collection and publishes the data to Pub/Sub

const PubSub = require('@google-cloud/pubsub');
const pubsubClient = new PubSub();
const functions = require('firebase-functions');

exports.helloFirestore = functions.firestore
  .document("pubsub-event/{eventId}")
  .onCreate((snap, context) => {
    const event = snap.data();
    const payload_data = {};
    for (let key of Object.keys(event)) {
    	payload_data[key] = event[key];
    }
    console.log(JSON.stringify(payload_data))
    // The name for the new topic
    const topicName = 'pubsub-gcs';
    const dataBuffer = Buffer.from(JSON.stringify(payload_data));
    // Creates the new topic
    return pubsubClient
      .topic(topicName)
      .publisher()
      .publish(dataBuffer)
      .then(messageId => {
        console.log(`Message ${messageId} published.`);
        return messageId;
      })
      .catch(err => {
        console.error('ERROR:', err);
      });

  });

Below is the dependencies of the Cloud Function.

{
  "name": "functions",
  "description": "Cloud Functions for Firebase",
  "scripts": {
    "serve": "firebase serve --only functions",
    "shell": "firebase functions:shell",
    "start": "npm run shell",
    "deploy": "firebase deploy --only functions",
    "logs": "firebase functions:log"
  },
  "engines": {
    "node": "8"
  },
  "dependencies": {
    "@google-cloud/pubsub": "^0.18.0",
    "firebase-admin": "~7.0.0",
    "firebase-functions": "^2.3.1"
  },
  "devDependencies": {
    "firebase-functions-test": "^0.1.6"
  },
  "private": true
}

Step3: Cloud Function pubsub_event publishes data to Pub/Sub topic projects/ProjectName/topics/pubsub-gcs

Step4: As shown above, create an export job : ps-to-text-pubsub-gcs (implemented via Dataflow). This job reads data every 5 minutes (configurable to other values as well) from Pub/Sub topic pubsub-gcs and dumps this into the destination bucket on GCS.

Click on run the job.

 Step6: Now, we have data in CloudStorage. We shall use BigQuery to perform all the data manipulation operations. But first we need to create dataset in BigQuery to query data from GCS into Bigquery.

Go to BigQuery and create dataset. So that we create our table to access that data.

The dataset shall be created. By clicking on the dataset you shall see an option to CREATE TABLE.

Click on CREATE TABLE then we shall get the data from CloudStorage. While setting up the required inputs as indicated below, please make sure that you select “Table type” as External Table. This ensures that BigQuery is able to automatically load new data as it comes into GCS.

To create table in BigQuery from CloudStorage. Click on the browse button and configure file path.

Files that are having pubsub-event-* as prefix. This prefix is very important as it makes sure that all subsequent data dumps into GCS destination folder are also picked automatically by BigQuery. Select the file format to be JSON. Check the auto-detect schema box. Then click create table.


Quick Tip: For reading nested json files in BigQuery, please go through this resource. Now the data which is present in CloudStorage is also available in BigQuery and you could run sql commands to manipulate the data.

Click on table you have created, accounts is my table name and click on query table to make SQL operations and you could see your results in the preview tab at the bottom.

Step7: Now, we are on to the last step to access this BigQuery data in Jupyter Notebooks and use that as the source data to train and build our ML models.

Search for notebook in GCP console. 

You shall see something like this 

Click on OPEN JUPYTERLAB then it will redirect you to notebook.

from google.cloud import bigquery

client = bigquery.Client()

sql = """
SELECT * FROM 
`<project-name>.<dataset-name>.<table-name>`
"""

df = client.query(sql).to_dataframe()
df.head(10)

So in this way, we have built a data pipeline that continuously dumps data from Firestore into GCS every 5 minutes, which is then readily available in Jupyter Notebook via BigQuery for any downstream analytics and ML model building.

Look forward to your comments.

This story is co-authored by Santosh Kumar and PV Subbareddy. Santosh is a Software Engineer specializing on Cloud Services and DevOps. Subbareddy is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.