Machine Learning Operations (MLOps) Pipeline using Google Cloud Composer

In an earlier post, we had described the need for automating the Data Engineering pipeline for Machine Learning based systems. Today, we will expand the scope to setup a fully automated MLOps pipeline using Google Cloud Composer.

Cloud Composer

Cloud Composer is official defined as a fully managed workflow orchestration service that empowers you to author, schedule, and monitor pipelines that span across clouds and on-premises data centers. Built on the popular Apache Airflow open source project and operated using the Python programming language, Cloud Composer is free from lock-in and easy to use.

So let’s get on with the required steps to create this MLOps infrastructure on Google Cloud Platform

Creating a Cloud Composer Environment

Step1: Please enable the Cloud Composer API.

Step2: Go to create environment page in GCP console. Composer is available in Big Data section.

Step3: Click on create to start creating a Composer environment

Step4: Please select the Service account which has the required permissions to access GCS, Big Query, ML Engine and  Composer environment. The required roles for accessing Composer environment is Composer Administrator and Composer Worker. 
For more details about access control in Composer environment please see this.

Step5: Please use Python Version 3 and latest Image version.

Step6: Click on create. It will take about 15-20 minutes to create the environment. Once it completes, the environment page shall look like the following.

Click on Airflow to see Airflow WebUI. The Airflow WebUI looks as follows

DAGs folder is where our dag file is stored. DAG folder is nothing but a folder inside a GCS bucket which is created by the environment. To know more about the concept of DAG and general introduction to Airflow, please refer to this post.

You could see Composer related logs in Logging.

Step7: Please add the following PyPI packages in Composer environment.

Click on created environment and navigate to PYPI packages and click on edit to add packages

The required packages are:

# to read data from MongoDB
# to read data from firestore

Create a ML model

Step1: Please create a folder structure like the following on your instance.

└── trainer

Step2: Please place the following code in file, which shall upload the model to GCS bucket as shown below. This model would be used to create model versions as explained a bit later.

from import bigquery
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
import numpy as np
from import storage
import datetime
import json
import pickle
client = bigquery.Client()
sql = '''

df = client.query(sql).to_dataframe()
df = df[['is_stressed', 'is_engaged', 'status']]

df['is_stressed'] = df['is_stressed'].fillna('n')
df['is_engaged'] = df['is_engaged'].fillna('n')
df['stressed'] = np.where(df['is_stressed']=='y', 1, 0)
df['engaged'] = np.where(df['is_engaged']=='y', 1, 0)
df['status'] = np.where(df['status']=='complete', 1, 0)

feature_cols = ['stressed', 'engaged']
X = df[feature_cols]
y = df.status
logreg = LogisticRegression(),y_train)
pkl_filename = "model.pkl"  
with open(pkl_filename, 'wb') as file:  
    pickle.dump(logreg, file)
BUCKET_NAME=BUCKET_NAME# Upload the model to GCS
bucket = storage.Client().bucket(BUCKET_NAME)
file_path ='machine_learning/models/%Y%m%d_%H%M%S')
blob = bucket.blob('{}/{}'.format(

file_location = 'gs://{BUCKET_NAME}/{file_path}'.format(BUCKET_NAME=BUCKET_NAME, file_path=file_path)
file_config = json.dumps({'file_location': file_location})

bucket = storage.Client().bucket(COMPOSER_BUCKET)
blob = bucket.blob('data/file_config.json')

Step3: Create an empty file inside the trainer directory.

Step4: Please place the following code in file. The file contains required packages to execute code.

import setuptools



Step5: Packaging the code using the following command. It creates a gz file inside ml_model directory.

python3 sdist

Step6: The package name is the name that is specified in file. The package name becomes ml_model-1.0.tar.gz
Copy the package to gs://{your-GCS-bucket}/machine_learning/. This becomes the base directory for your machine learning activities described in this post.

Creating a DAG

In this use case, we have created a DAG file which exports some table data from a MongoDB instance into a GCS bucket and then creates a BigQuery table off of that exported data. It trains a model and creates version for that model. The DAG file supports full data extraction and daily data extraction explained in the code below using a variable tot_data. This variable is extracted from Airflow configurations set by the user. This process is also described later in this post.

Please place the following code in the DAG file.

import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
from airflow.operators.python_operator import PythonOperator
import pprint
import json
import re

from pymongo import MongoClient
from import storage
from import blob
from import storage
import os

from airflow import models
from mlengine_operator import MLEngineTrainingOperator, MLEngineVersionOperator

ts =
today = str( + 'T00:00:00.000Z'
yester_day = str( - timedelta(days = 1)) + 'T00:00:00.000Z'

str_ts = ts.strftime('%Y_%m_%d_%H_%m_%S')

config = Variable.get("mongo_conf", deserialize_json=True)
host = config['host']
db_name = config['db_name']
table_name = config['table_name']
file_prefix = config['file_prefix']
bucket_name = config['bucket_name']
# file_path = file_prefix + '/' + table_name + '.json'
file_path = '{file_prefix}/{table_name}/{table_name}_{str_ts}.json'.format(file_prefix=file_prefix, str_ts=str_ts, table_name=table_name)
file_location = 'gs://' + bucket_name + '/' + file_prefix + '/' + table_name + '/' + table_name + '_*.json'
config['file_location'] = file_location
bq_dataset = config['bq_dataset']
tot_data = config['tot_data'].lower()

BUCKET_NAME = config['ml_configuration']['BUCKET_NAME']
BASE_DIR = config['ml_configuration']['BASE_DIR']
PACKAGE_NAME = config['ml_configuration']['PACKAGE_NAME']
TRAINER_BIN = os.path.join(BASE_DIR, 'packages', PACKAGE_NAME)
TRAINER_MODULE = config['ml_configuration']['TRAINER_MODULE']
RUNTIME_VERSION = config['ml_configuration']['RUNTIME_VERSION']
PROJECT_ID = config['ml_configuration']['PROJECT_ID']
MODEL_NAME = config['ml_configuration']['MODEL_NAME']

MODEL_FILE_BUCKET = config['ml_configuration']['MODEL_FILE_BUCKET']
model_file_loc = config['ml_configuration']['MODEL_FILE_LOCATION']

bucket = storage.Client().bucket(MODEL_FILE_BUCKET)
blob = bucket.get_blob(model_file_loc)
file_config = json.loads(blob.download_as_string().decode("utf-8"))
export_uri = file_config['file_location']

def flatten_json(y):
    out = {}

    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
            out[name[:-1]] = x

    return out

def mongoexport():
        client = storage.Client()
        bucket = client.get_bucket(bucket_name)
        blob = bucket.blob(file_path)

        client = MongoClient(host)
        db = client[db_name]
        tasks = db[table_name]
        # if tot_data is set to 'yes' in airflow configurations, full data 
        # is processed.  
        if tot_data == 'no':
          query = {"edit_datetime": { "$gte": yester_day, "$lt": today}}
          data = tasks.find(query)
          data = tasks.find()
        emp_list = []
        for record in data:
                emp_list.append(json.dumps(record, default=str))
        flat_list =[]
        for data in emp_list:
        data = '\n'.join(json.dumps({re.sub('[^0-9a-zA-Z_ ]+', '', str(k)).lower().replace(' ', '_'): str(v) for k, v in record.items()}) for record in flat_list)

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)

with DAG('ml_pipeline', schedule_interval=None, default_args=default_args) as dag:

    # priority_weight has type int in Airflow DB, uses the maximum.
    pymongo_export_op = PythonOperator(

    update_bq_table_op = BashOperator(
        bq rm -f {bq_dataset}.{table_name}
        bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON --ignore_unknown_values=True {bq_dataset}.{table_name} {file_location}
        '''.format(bq_dataset=bq_dataset, table_name=table_name, file_location=file_location)

    date_nospecial = '{{ execution_date.strftime("%Y%m%d") }}'
    date_min_nospecial = '{{ execution_date.strftime("%Y%m%d_%H%m") }}'
    uuid = '{{ macros.uuid.uuid4().hex[:8] }}'

    training_op = MLEngineTrainingOperator(
      job_id='{}_{}_{}'.format(table_name, date_nospecial, uuid),

    create_version_op = MLEngineVersionOperator(
          'name': 'version_{}_{}'.format(date_min_nospecial, uuid),
          'deploymentUri': export_uri,
          'runtimeVersion': RUNTIME_VERSION,
          'pythonVersion': '3.5',
          'framework': 'SCIKIT_LEARN',

    pymongo_export_op >> update_bq_table_op >> training_op >> create_version_op

Once file is created, please upload the file to DAGs folder. And also please add the following plugin dependency file named mlengine_operator in DAGs folder.
Place the following code in file.

import re

from apiclient import errors

from airflow.contrib.hooks.gcp_mlengine_hook import MLEngineHook
from airflow.exceptions import AirflowException
from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log

def _normalize_mlengine_job_id(job_id):

    # Add a prefix when a job_id starts with a digit or a template
    match ='\d|\{{2}', job_id)
    if match and match.start() is 0:
        job = 'z_{}'.format(job_id)
        job = job_id

    # Clean up 'bad' characters except templates
    tracker = 0
    cleansed_job_id = ''
    for m in re.finditer(r'\{{2}.+?\}{2}', job):
        cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_',
        cleansed_job_id += job[m.start():m.end()]
        tracker = m.end()

    # Clean up last substring or the full string if no templates
    cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_', job[tracker:])

    return cleansed_job_id

class MLEngineBatchPredictionOperator(BaseOperator):
    template_fields = [

    def __init__(self,
        super(MLEngineBatchPredictionOperator, self).__init__(*args, **kwargs)

        self._project_id = project_id
        self._job_id = job_id
        self._region = region
        self._data_format = data_format
        self._input_paths = input_paths
        self._output_path = output_path
        self._model_name = model_name
        self._version_name = version_name
        self._uri = uri
        self._max_worker_count = max_worker_count
        self._runtime_version = runtime_version
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

        if not self._project_id:
            raise AirflowException('Google Cloud project id is required.')
        if not self._job_id:
            raise AirflowException(
                'An unique job id is required for Google MLEngine prediction '

        if self._uri:
            if self._model_name or self._version_name:
                raise AirflowException('Ambiguous model origin: Both uri and '
                                       'model/version name are provided.')

        if self._version_name and not self._model_name:
            raise AirflowException(
                'Missing model: Batch prediction expects '
                'a model name when a version name is provided.')

        if not (self._uri or self._model_name):
            raise AirflowException(
                'Missing model origin: Batch prediction expects a model, '
                'a model & version combination, or a URI to a savedModel.')

    def execute(self, context):
        job_id = _normalize_mlengine_job_id(self._job_id)
        prediction_request = {
            'jobId': job_id,
            'predictionInput': {
                'dataFormat': self._data_format,
                'inputPaths': self._input_paths,
                'outputPath': self._output_path,
                'region': self._region

        if self._uri:
            prediction_request['predictionInput']['uri'] = self._uri
        elif self._model_name:
            origin_name = 'projects/{}/models/{}'.format(
                self._project_id, self._model_name)
            if not self._version_name:
                    'modelName'] = origin_name
                prediction_request['predictionInput']['versionName'] = \
                    origin_name + '/versions/{}'.format(self._version_name)

        if self._max_worker_count:
                'maxWorkerCount'] = self._max_worker_count

        if self._runtime_version:
                'runtimeVersion'] = self._runtime_version

        hook = MLEngineHook(self._gcp_conn_id, self._delegate_to)

        # Helper method to check if the existing job's prediction input is the
        # same as the request we get here.
        def check_existing_job(existing_job):
            return existing_job.get('predictionInput', None) == \

            finished_prediction_job = hook.create_job(
                self._project_id, prediction_request, check_existing_job)
        except errors.HttpError:

        if finished_prediction_job['state'] != 'SUCCEEDED':
            self.log.error('MLEngine batch prediction job failed: {}'.format(
            raise RuntimeError(finished_prediction_job['errorMessage'])

        return finished_prediction_job['predictionOutput']

class MLEngineModelOperator(BaseOperator):
    template_fields = [

    def __init__(self,
        super(MLEngineModelOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._model = model
        self._operation = operation
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

    def execute(self, context):
        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)
        if self._operation == 'create':
            return hook.create_model(self._project_id, self._model)
        elif self._operation == 'get':
            return hook.get_model(self._project_id, self._model['name'])
            raise ValueError('Unknown operation: {}'.format(self._operation))

class MLEngineVersionOperator(BaseOperator):
    template_fields = [

    def __init__(self,

        super(MLEngineVersionOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._model_name = model_name
        self._version_name = version_name
        self._version = version or {}
        self._operation = operation
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

    def execute(self, context):
        if 'name' not in self._version:
            self._version['name'] = self._version_name

        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)

        if self._operation == 'create':
            assert self._version is not None
            return hook.create_version(self._project_id, self._model_name,
        elif self._operation == 'set_default':
            return hook.set_default_version(self._project_id, self._model_name,
        elif self._operation == 'list':
            return hook.list_versions(self._project_id, self._model_name)
        elif self._operation == 'delete':
            return hook.delete_version(self._project_id, self._model_name,
            raise ValueError('Unknown operation: {}'.format(self._operation))

class MLEngineTrainingOperator(BaseOperator):
    template_fields = [

    def __init__(self,
        super(MLEngineTrainingOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._job_id = job_id
        self._package_uris = package_uris
        self._training_python_module = training_python_module
        self._training_args = training_args
        self._region = region
        self._scale_tier = scale_tier
        self._runtime_version = runtime_version
        self._python_version = python_version
        self._job_dir = job_dir
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to
        self._mode = mode

        if not self._project_id:
            raise AirflowException('Google Cloud project id is required.')
        if not self._job_id:
            raise AirflowException(
                'An unique job id is required for Google MLEngine training '
        if not package_uris:
            raise AirflowException(
                'At least one python package is required for MLEngine '
                'Training job.')
        if not training_python_module:
            raise AirflowException(
                'Python module name to run after installing required '
                'packages is required.')
        if not self._region:
            raise AirflowException('Google Compute Engine region is required.')

    def execute(self, context):
        job_id = _normalize_mlengine_job_id(self._job_id)
        training_request = {
            'jobId': job_id,
            'trainingInput': {
                'scaleTier': self._scale_tier,
                'packageUris': self._package_uris,
                'pythonModule': self._training_python_module,
                'region': self._region,
                'args': self._training_args,

        if self._runtime_version:
            training_request['trainingInput']['runtimeVersion'] = self._runtime_version

        if self._python_version:
            training_request['trainingInput']['pythonVersion'] = self._python_version

        if self._job_dir:
            training_request['trainingInput']['jobDir'] = self._job_dir

        if self._mode == 'DRY_RUN':
  'In dry_run mode.')
  'MLEngine Training job request is: {}'.format(

        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)

        # Helper method to check if the existing job's training input is the
        # same as the request we get here.
        def check_existing_job(existing_job):
            return existing_job.get('trainingInput', None) == \

            finished_training_job = hook.create_job(
                self._project_id, training_request, check_existing_job)
        except errors.HttpError:

        if finished_training_job['state'] != 'SUCCEEDED':
            self.log.error('MLEngine training job failed: {}'.format(
            raise RuntimeError(finished_training_job['errorMessage'])

Import variables from composer_conf.json file into Airflow Variables.
Go to Airflow WebUI → Admin → Variables → Browse to file path or configure variables manually.
Please place the following in composer_conf

  "mongo_conf": {
    "host": "mongodb://<instance-internal-ip>:27017",
    "db_name": "DBNAME",
    "table_name": "TABLENAME",
    "file_prefix": "Folder In GCS Bucket",
    "bq_dataset": "BigQuery Dataset",
    "bucket_name": "GCS Bucket",
    "tot_data": "yes",
    "ml_configuration": {
      "BUCKET_NAME": "GCS Bucket",
      "BASE_DIR": "gs://GCS Bucket/machine_learning/",
      "TRAINER_MODULE": "trainer.train",
      "RUNTIME_VERSION": "1.13",
      "PROJECT_ID": "GCP Project",
      "MODEL_FILE_BUCKET": "BUCKET CREATED BY Composer Environment",

Please store any configuration files or credentials file that are used by Composer in the data folder in the bucket created by Composer environment.

After configuring variables accordingly, you can see the DAG named ml_pipeline in the Airflow WebUI.

Please trigger the DAG file from Airflow WebUI. Once the DAG ran successfully. It looks like the following:

Thanks for the read and look forward to your comments.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

A Beginner’s Guide to Airflow

Airflow is used to create code pipeline where we can schedule and monitor our workflows. A workflow can be a collection of tasks to be executed like a flowchart.

It is like an orchestra conductor that controls all different data processing tools/tasks under one roof.

Why Airflow?

  • Open source.
  • Can handle upstream/downstream in an elegant way.
  • Jobs can pass parameters to other jobs downstream.
  • Ease of deployment of workflow.
  • Easy to reprocess historical jobs by date or rerun for specific intervals.
  • Integration with a lot of infrastructure.
  • Job testing through airflow itself.
  • Accessibility of log files and other meta-data through the web GUI.
  • Data sensors to trigger Directed Acyclic Graph (DAG) when data arrives.

Airflow Architecture:

Metadata: It stores the state of tasks and workflow.
Scheduler: It is a user DAG, with the state of tasks in metadata database. It decides what needs to execute.
Executor: It is a message queuing process. Decides which worker will execute each task.

Setting up airflow:

Use the following commands to install airflow

$ sudo pip install apache-airflow

You can install extra features like

$ sudo pip install apache-airflow[postgres,s3]

Airflow requires database to be initiated before you run tasks

$ airflow initdb

After installation the folder structure will be something like this.

If you don’t see ‘dags’ folder then you can create by yourself and name it ‘dags’. You shall dump all your python task files in that folder.

To start the web server

$ Airflow webserver -p 8080

Goto https://localhost:8080 to see airflow GUI. You should see something like this:

Airflow relies on 4 cores:

  1. DAG.
  2. Tasks.
  3. Scheduler.
  4. X-com.

Directed Acyclic Graph (DAG):

It is a collection of all tasks which you want to run in an organized way that shows their relationships and dependencies.

Basically a DAG is just a python file, which is used to organise tasks and set their execution content. DAGs do not perform any actual computation. Basically a DAG task contains 2 type of things:

  1. Operators
    It determines what actually gets done. It triggers a certain action like running a bash command, executing a python function, etc.
    There are different kinds of operators like:
    Bash operator: Executes a bash command.
    Python operator: Calls an arbitrary python function.
    Hive operator: Executes hql code or hive script in specific hive database.
    Bigquery operator: Executes Google BigQuery SQL query in a specific BigQuery database.
  2. Sensors : A certain type of operator that will keep running until a certain criteria is met.

Tasks :
Tasks are the elements that actually “do work” which we want to be performed. It is our job to write the configuration and organise tasks in specific order to create data pipeline.

Once the operator is instantiated, it is referred to as a “task”. An operator describes single task in a workflow

Scheduler :
It monitors all tasks and all dags. And trigger the task instance whose dependencies have met.
We create new python file in dags folder
Importing packages in our python file:

import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

Next setting default arguments

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': dt.datetime(2019, 6, 17, 8, 40, 00),
    'retries': 1,

Depends_on_past: we will be in a situation where  a task t1 is dependent on its previous execution. In that case we will use this.

Start_date: starting date of execution
Defining our tasks, dags, functions

def greet():
    print("Hello how are you ?")
    return 'greeted'

def response():
    return "im fine what about you?"

with DAG('YourDagName', default_args=default_args, schedule_interval='@daily',) as dag:
    opr_hello = BashOperator(task_id='say_Hi',bash_command='echo "Hi!!"')
    opr_greet = PythonOperator(task_id='greet', python_callable=greet)
    opr_sleep = BashOperator(task_id='sleep_me', bash_command='sleep 2')
    opr_respond = PythonOperator(task_id='respond', python_callable=response)

In this, I am using 2 simple functions and with dag passing my default arguments and also schedule time of execution.

After that we created 4 tasks in which 2 are python and other 2 are bash. If we want to execute a python function we need to use python operator and also the same with the bash operator. Python operator will take at least 2 arguments which is task id and python callable and for bash, task id and bash command.

Defining task dependencies or order in which the tasks should be executed in.

Task dependencies are set using

  • The set_upstream and set_downstream operator.
  • The bitshift operator <<  and >>.


The above 2 lines mean the same. This means t2 will depend on t1 to run successfully.

opr_hello >> opr_greet >> opr_sleep >> opr_respond

If you wrap up the whole code in one file and put that file in dags folder then you can see your dag in Airflow GUI. If you click on your DAG you will see tree view and many other options to view your data in different forms.

Dagrun are dags that runs at certain time.
TaskInstances are task belongs to dagrun.
Each dagrun and task instance is associated with an entry in airflow’s metadata database that logs their state(eg: queued, running, failed, skipped, etc).
X-com: XCom which means cross communication allow airflow tasks to send and receive data/status. XCom data are stored in the airflow database in the form of key-value pairs.

That’s all guys. Thanks for the read!

This story is co-authored by Santosh Kumar and PV Subbareddy. Santosh is a Software Engineer specializing on Cloud Services and DevOps. Subbareddy is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.