Serverless Web Application Architecture using React with Amplify: Part2

In Part1, we have learned and built the AWS services (API Gateway, Lambda, DynamoDB) specific components of the serverless architecture as shown below. We shall build the rest of the serverless architecture in this article. To sum it up, we will build Jotter (a note taking app) in React and use Amplify to talk to these AWS services. Finally we shall deploy the React app on S3 cloud storage.

Serverless Web Application Architecture using React with Amplify

After deployment:

Serverless React Web Application Hosted on S3

Building the rest of the architecture

  1. Setting up a Cognito User Pool and Identity pool
  2. Building React application and configure Amplify.
  3. Create Sign-in, Sign-up using Cognito with Amplify.
  4. API Gateway integration.
  5. Secure the API Gateway with Cognito and test it.
  6. Deployment on S3 cloud storage.

Setting up a Cognito User Pool and Identity pool

Go to AWS console > Cognito > Create user pool.
Give the pool a name, and click on Review defaults.

This will show all the default settings for a cognito user pool, let’s go to each section on the left nav bar and modify accordingly such that it looks like below and create the cognito user pool.

Creating an App client. While creating an app client do not enable generate client secret.

After all modifications go to review and create pool. Now note down Pool Id, Pool ARN and App client Id

More details on setting up an authentication service using Cognito can be found here.

Let us build an Identity pool, which we could use to access AWS services (API Gateway in our case). But before that let us understand the difference between user pool and identity pool. Below image accurately depicts how both the services are different and how they complement each other.

Now we need to specify what AWS resources are accessible for users with temporary credentials obtained from the Cognito Identity Pool. In our case we want to invoke API gateway. Below policy exactly does that.

{
 "Version": "2012-10-17",
 "Statement": [
    {
     "Effect": "Allow",
     "Action": [
       "mobileanalytics:PutEvents",
       "cognito-sync:*",
       "cognito-identity:*"
      ],
     "Resource": [
        "*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "execute-api:Invoke"
      ],
      "Resource": [
        "arn:aws:executeapi:YOUR_API_GATEWAY_REGION:*:YOUR_API_GATEWAY_ID/*/*/*"
      ]
    }
  ]
}

Add your API Gateway region and gateway id in the resource ARN.

Copy paste the above policy, and click Allow. Click on edit identity pool and note down the Identity pool ID.

Building React application and configuring Amplify

Create a new react project: 

create-react-app jotter

Now go to project directory and run the app.

cd jotter
npm start

You should see your app running on local host port 3000. To start editing, open project directory in your favourite code editor.

Open up public/index.html and edit the title tag to:

<title>Jotter - A note taking app</title>

Installing react-bootstrap:
We shall be using bootstrap for styling

npm install react-bootstrap --save

We also need bootstrap CDN for the above library to work. Open up public/index.html file and add the below CDN link above title tag.

<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T"
crossorigin="anonymous" />

Handle Routes with React Router:

We are building a single page app, we are going to use React Router to handle routes/navigation.

Go to src/index.js replace 

ReactDOM.render(<App />, document.getElementById('root'));

with

ReactDOM.render(<Router><App /></Router>, document.getElementById('root'));

Let us create a routes component.
routes.js:

import React from "react";
import CreateJot from './CreateJot';
import AllJotsList from './AllJotsList';
import Login from './Login';
import Signup from './Signup';
import NotFound from './NotFound';
import Viewjot from './Viewjot';
import { Route, Switch } from "react-router-dom";

export default () => 
<Switch>
   <Route exact path="/" component={Login}/>
   <Route exact path="/newjot" component={CreateJot} />
   <Route exact path="/alljots" component={AllJotsList} />
   <Route exact path="/login" component={Login} />
   <Route exact path="/signup" component={Signup} />
   <Route exact path="/view" component={Viewjot}/>
   <Route component={NotFound} />
</Switch>;

If you observed we used a switch component here, which is more like an if else ladder statement, first match is rendered. If no path matches, then the last NotFound component is rendered. For now all the above components simply return <h1> tag with the name of the component. Create all the components like the following.

Signup.js

import React from 'react';

export default class Signup extends React.Component {
  constructor(props) {
    super(props);
    this.state = {}
  }
  render(){
    return (
       <h1>Signup</h1>
    );
  }
}

Lets us setup navigation and test our routes.
App.js

import React, {Fragment} from 'react';
import { Link, NavLink } from "react-router-dom";
import Routes from './components/Routes';
import { withRouter } from 'react-router';

class App extends React.Component {
    constructor(props) {
        super(props);
        this.state = {
            isAuthenticated: false
        }
    }
    render(){
        return (
            <Fragment>
                <div className="navbar navbar-expand-lg navbar-light bg-light">
                    <Link to="/" className="navbar-brand" href="#"><h1>Jotter.io</h1></Link>
                    <div className="collapse navbar-collapse" id="navbarNav">
                        <ul className="navbar-nav">
                            {this.state.isAuthenticated ? 
                                <Fragment>
                                    <li className="nav-item">
                                        <NavLink onClick={this.handleLogout}>Logout</NavLink>
                                    </li>
                                    <li className="nav-item">
                                        <NavLink to="/newjot" className="nav-link">New Jot</NavLink>
                                    </li>
                                    <li className="nav-item">
                                        <NavLink to="/alljots" className="nav-link">All Jots</NavLink>
                                    </li>
                                    <li className="nav-item">
                                        <NavLink to="/viewjot" className="nav-link">View Jot</NavLink>
                                    </li>
                                </Fragment> : 
                                <Fragment>
                                    <li className="nav-item">
                                        <NavLink to="/login" className="nav-link">Login</NavLink>
                                    </li>
                                    <li className="nav-item">
                                        <NavLink to="/signup" className="nav-link">Signup</NavLink>
                                    </li>
                                </Fragment>
                            } 
                        </ul>
                    </div>                   
                </div>
                <Routes/>
            </Fragment>
        );
    }
}

export default withRouter(App);

What we have done is initially set the isAuthenticated state to false. And conditionally render the navigation bar to show only those routes which are accessible when the user isn’t logged in and vice versa.

Let us test our routes now.

Now our routes are working fine. Though we conditionally rendered the navigation bar, all routes are still accessible via URL.

We shall work on securing our routes, once we configure Amplify. Let us build the Login and Signup components and then configure Amplify to secure our routes.
Login.js

import React, { Component } from "react";
import { FormGroup, FormControl, FormLabel, Button } from "react-bootstrap";

export default class Login extends Component {
    constructor(props) {
        super(props);
        this.state = {
            email: "",
            password: ""
        };
    }

    validateForm() {
        return this.state.email.length > 0 && this.state.password.length>0;
    }

    handleChange = event => {
        this.setState({
                [event.target.id]: event.target.value
            });
    }

    handleSubmit = event => {
        event.preventDefault();
        console.log("Submitted",this.state.email,this.state.password);
    }

    render() {
        return (
            <div className="Home">
                <div className="col-md-4"> 
                    <form >
                        <FormGroup controlId="email">
                            <FormLabel>Email</FormLabel>
                            <FormControl
                                autoFocus
                                type="email"
                                value={this.state.email}
                                onChange={this.handleChange}
                            />
                        </FormGroup>
                        <FormGroup controlId="password" >
                            <FormLabel>Password</FormLabel>
                            <FormControl
                                value={this.state.password}
                                onChange={this.handleChange}
                                type="password"
                            />
                        </FormGroup>
                        <Button onClick={this.handleSubmit}>
                        Login
                        </Button>
                            
                    </form>
                </div>
            </div>
        );
    }
}

Simple login page that logs user credentials on submit.

Signup.js

import React, { Component } from "react";
import { FormText, FormGroup, FormControl, FormLabel } from "react-bootstrap";
import React, { Component } from "react";
import { FormText, FormGroup, FormControl, FormLabel, Button } from "react-bootstrap";

export default class sample extends Component {
   constructor(props) {
   super(props);
   this.state = {
       email: "",
       password: "",
       confirmPassword: "",
       confirmationCode: "",
       newUser: null
       };
   }
   validateForm() {
       return (
           this.state.email.length > 0 && this.state.password.length > 0 && this.state.password === this.state.confirmPassword);
   }
      
   validateConfirmationForm() {
       return this.state.confirmationCode.length > 0;
   }
      
   handleChange = event => {
       this.setState({
           [event.target.id]: event.target.value
       });
   }

   handleSubmit = async event => {
       event.preventDefault();
   }
  
   handleConfirmationSubmit = async event => {
       event.preventDefault();
   }

   render() {
       return (
           <div className="Signup">
               {this.state.newUser === null ? this.renderForm() : this.renderConfirmationForm()}
           </div>
       );
   }

   renderConfirmationForm() {
       return (
           <div className="Home">
               <div className="col-md-4">
                   <form onSubmit={this.handleConfirmationSubmit}>
                       <FormGroup controlId="confirmationCode" >
                           <FormLabel>Confirmation Code</FormLabel>
                           <FormControl
                               autoFocus
                               type="tel"
                               value={this.state.confirmationCode}
                               onChange={this.handleChange}
                           />
                           <FormText>Please check your email for the code.</FormText>
                       </FormGroup>
                       <Button type="submit">
                           Verify
                       </Button>   
                   </form>
               </div>
           </div>
       );
   }

   renderForm() {
       return (
           <div className="Home">
               <div className="col-md-4">
                   <form onSubmit={this.handleSubmit}>
                       <FormGroup controlId="email" >
                           <FormLabel>Email</FormLabel>
                           <FormControl   
                               autoFocus
                               type="email"
                               value={this.state.email}
                               onChange={this.handleChange}
                           />
                       </FormGroup>
                       <FormGroup controlId="password" >
                           <FormLabel>Password</FormLabel>
                           <FormControl
                               value={this.state.password}
                               onChange={this.handleChange}
                               type="password"
                           />
                       </FormGroup>
                       <FormGroup controlId="confirmPassword" >
                           <FormLabel>Confirm Password</FormLabel>
                           <FormControl
                               value={this.state.confirmPassword}
                               onChange={this.handleChange}
                               type="password"
                           />
                       </FormGroup>
                       <Button type="submit">
                           Signup
                       </Button>   
                   </form>
               </div>
           </div>
       );
   }
}

Configuring Amplify:

Install aws-amplify.

npm install --save aws-amplify

Create a config file with all the required details to connect to aws services we created earlier in part1.

config.js

export default {
   apiGateway: {
       REGION: "xx-xxxx-x",
       URL: " https://xxxxxxxxxx.execute-api.xx-xxxx-x.amazonaws.com/dev/"
   },
   cognito: {
       REGION: "xx-xxxx-x",
       USER_POOL_ID: "xx-xxxx-x_yyyyyxxxx",
       APP_CLIENT_ID: "xxxyyyxxyxyxyxxyxyxxxyxxyx",
       IDENTITY_POOL_ID: "xx-xxxx-x:aaaabbbb-aaaa-cccc-dddd-aaaabbbbcccc"
   }
  };

Import the config.js file and aws-amplify.
Open up index.js and add the configuration like this.

import React from 'react';
import ReactDOM from 'react-dom';
import './index.css';
import App from './App';
import * as serviceWorker from './serviceWorker';
import { BrowserRouter as Router} from "react-router-dom";
import config from "./config";
import Amplify from "aws-amplify";

Amplify.configure({
   Auth: {
       mandatorySignIn: true,
       region: config.cognito.REGION,
       userPoolId: config.cognito.USER_POOL_ID,
       identityPoolId: config.cognito.IDENTITY_POOL_ID,
       userPoolWebClientId: config.cognito.APP_CLIENT_ID
   },
   API: {
       endpoints: [
           {
               name: "jotter",
               endpoint: config.apiGateway.URL,
               region: config.apiGateway.REGION
           },
       ]
   }
});

ReactDOM.render(<Router><App /></Router>, document.getElementById('root'));

serviceWorker.unregister();

Now we have successfully configured amplify, we could connect to and use aws services with ease. If you would like to use any other aws services, you know where to configure them.

App.js

import React, {Fragment} from 'react';
import { Link, NavLink } from "react-router-dom";
import Routes from './components/Routes';
import { withRouter } from 'react-router';
import { Auth } from "aws-amplify";

class App extends React.Component {
    constructor(props) {
        super(props);
        this.state = {
            isAuthenticated: false
        }
    }

    userHasAuthenticated = (value) => {
        this.setState({ isAuthenticated: value });
    }

    handleLogout = async event => {
        await Auth.signOut();
        this.userHasAuthenticated(false);
        this.props.history.push("/login");
    }

    async componentDidMount() {
        try {
          await Auth.currentSession();
          this.userHasAuthenticated(true);
          this.props.history.push("/alljots");
        }
        catch(e) {
          if (e !== 'No current user') {
            alert(e);
          }
        }
    }

    render(){
        return (
            <Fragment>
                <div className="navbar navbar-expand-lg navbar-light bg-light">
                    <Link to="/" className="navbar-brand" href="#"><h1>Jotter.io</h1></Link>
                    <div className="collapse navbar-collapse" id="navbarNav">
                        <ul className="navbar-nav">
                            {this.state.isAuthenticated ? 
                                <Fragment>
                                    <li className="nav-item">
                                        <NavLink onClick={this.handleLogout}>Logout</NavLink>
                                    </li>
                                    <li className="nav-item">
                                        <NavLink to="/newjot" className="nav-link">New Jot</NavLink>
                                    </li>
                                    <li className="nav-item">
                                        <NavLink to="/alljots" className="nav-link">All Jots</NavLink>
                                    </li>
                                    <li className="nav-item">
                                        <NavLink to="/viewjot" className="nav-link">View Jot</NavLink>
                                    </li>
                                </Fragment> : 
                                <Fragment>
                                    <li className="nav-item">
                                        <NavLink to="/login" className="nav-link">Login</NavLink>
                                    </li>
                                    <li className="nav-item">
                                        <NavLink to="/signup" className="nav-link">Signup</NavLink>
                                    </li>
                                </Fragment>
                            } 
                        </ul>
                    </div>                   
                </div>
                <Routes userHasAuthenticated={this.userHasAuthenticated} isAuthenticated = {this.state.isAuthenticated}/>
            </Fragment>
        );
    }
}

export default withRouter(App);

We did 3 things here:

  1. Checking if there is any session after the component is mounted, so the isAuthenticated state is set to true. 
  2. Handling the logout click by clearing the session using Auth.signOut method. 
  3. Passing the isAuthenticated state and userHasAuthenticated function as props to all the routes. By doing so Login and Signup component can update the isAuthenticated state and all the other components receive this state as props. 

Routes.js

import React from "react";
import CreateJot from './CreateJot';
import AllJotsList from './AllJotsList';
import Login from './Login';
import Signup from './Signup';
import NotFound from './NotFound';
import Viewjot from './Viewjot';
import { Route, Switch } from "react-router-dom";

export default ( {childProps }) =>
<Switch>
   <Route path="/" exact component={Login} props={childProps}/>
   <Route exact path="/newjot" component={CreateJot} props={childProps}/>
   <Route exact path="/alljots" component={AllJotsList} props={childProps}/>
   <Route path="/login" exact component={Login} props={childProps}/>
   <Route path="/signup" exact component={Signup} props={childProps} />
   <Route path="/view" exact component={Viewjot}></Route>
   <Route component={NotFound} />
</Switch>;

Signup.js

import React, { Component } from "react";
import { FormText, FormGroup, FormControl, FormLabel, Button} from "react-bootstrap";
import { Auth } from "aws-amplify";

export default class Signup extends Component {
    constructor(props) {
    super(props);
    this.state = {
        email: "",
        password: "",
        confirmPassword: "",
        confirmationCode: "",
        newUser: null
        };
    }
    validateForm() {
        return (
            this.state.email.length > 0 && this.state.password.length > 0 && this.state.password === this.state.confirmPassword);
    }
        
    validateConfirmationForm() {
        return this.state.confirmationCode.length > 0;
    }
        
    handleChange = event => {
        this.setState({
            [event.target.id]: event.target.value
        });
    }

    handleSubmit = async event => {
        event.preventDefault();
        try {
            const newUser = await Auth.signUp({
                username: this.state.email,
                password: this.state.password
            });
            this.setState({newUser});
        } catch (e) {
            alert(e.message);
        }
    }
    
    handleConfirmationSubmit = async event => {
        event.preventDefault();
        try {
            await Auth.confirmSignUp(this.state.email, this.state.confirmationCode);
                await Auth.signIn(this.state.email, this.state.password);
                this.props.userHasAuthenticated(true);
                this.props.history.push("/");
        } catch (e) {
            alert(e.message);
        }
    }

    render() {
        return (
            <div className="Signup">
                {this.state.newUser === null ? this.renderForm() : this.renderConfirmationForm()}
            </div>
        );
    }

    renderConfirmationForm() {
        return (
            <div className="Home">
                <div className="col-md-4">
                    <form onSubmit={this.handleConfirmationSubmit}>
                        <FormGroup controlId="confirmationCode" >
                            <FormLabel>Confirmation Code</FormLabel>
                            <FormControl 
                                autoFocus
                                type="tel"
                                value={this.state.confirmationCode}
                                onChange={this.handleChange}
                            />
                            <FormText>Please check your email for the code.</FormText>
                        </FormGroup>
                        <Button type="submit">
                            Verify
                        </Button>
                    </form>
                </div>
            </div>
        );
    }

    renderForm() {
        return (
            <div className="Home">
                <div className="col-md-4">
                    <form onSubmit={this.handleSubmit}>
                        <FormGroup controlId="email" >
                            <FormLabel>Email</FormLabel>
                            <FormControl    
                                autoFocus
                                type="email"
                                value={this.state.email}
                                onChange={this.handleChange}
                            />
                        </FormGroup>
                        <FormGroup controlId="password" >
                            <FormLabel>Password</FormLabel>
                            <FormControl
                                value={this.state.password}
                                onChange={this.handleChange}
                                type="password"
                            />
                        </FormGroup>
                        <FormGroup controlId="confirmPassword" >
                            <FormLabel>Confirm Password</FormLabel>
                            <FormControl
                                value={this.state.confirmPassword}
                                onChange={this.handleChange}
                                type="password"
                            />
                        </FormGroup>
                        <Button type="submit">
                            Signup
                        </Button>
                    </form>
                </div>
            </div>
        );
    }
}

Let us test it.

Once user is successfully created, you shall see user appear in the User Pool. 

You will be redirected to Login page. As we haven’t secured our routes and you should be seeing this.

Let’s create those routes now.
Routes.js

import React from "react";
import CreateJot from './CreateJot';
import AllJotsList from './AllJotsList';
import Login from './Login';
import Signup from './Signup';
import NotFound from './NotFound';
import Viewjot from './Viewjot';
import { Route, Switch } from "react-router-dom";
import AuthorizedRoute from "./AuthorizedRoute";
import UnAuthorizedRoute from "./UnAuthorizedRoute";

export default (cprops) =>
<Switch>

  <UnAuthorizedRoute path="/login" exact component={Login} cprops={cprops}/>
  <UnAuthorizedRoute path="/signup" exact component={Signup} cprops={cprops}/>
 
  <AuthorizedRoute exact path="/newjot" component={CreateJot} cprops={cprops}/>
  <AuthorizedRoute exact path="/alljots" component={AllJotsList} cprops={cprops}/>
  <AuthorizedRoute exact path="/viewjot" component={Viewjot} cprops={cprops}/>
  <AuthorizedRoute exact path="/" component={AllJotsList} cprops={cprops}/>
 
  <Route component={NotFound} />
</Switch>;

AuthorizedRoute.js

import React from 'react'
import { Redirect, Route } from 'react-router-dom'

const AuthorizedRoute = ({ component: Component, cprops, ...rest }) => {
   // Add your own authentication on the below line.
 return (
   <Route {...rest} render={props =>
       cprops.isAuthenticated ? <Component {...props} {...cprops} /> : <Redirect to="/login" />}
   />
 )
}

export default AuthorizedRoute

What we did here is if logged in then we simply redirect to the component or else to login.

UnAuthorizedRoute.js

import React from 'react'
import { Redirect, Route } from 'react-router-dom'

const UnAuthorizedRoute = ({ component: Component, cprops, ...rest }) => {
   // Add your own authentication on the below line.
 return (
   <Route {...rest} render={props =>
       !cprops.isAuthenticated ? <Component {...props} {...cprops} /> : <Redirect to="/" />}
   />
 )
}

export default UnAuthorizedRoute

What we did here is if not logged in then we simply redirect to login or else to the component. Let us add the login functionality to our login page using Auth.signIn method.
Login.js

import React, { Component } from "react";
import { FormGroup, FormControl, FormLabel, Button } from "react-bootstrap";
import { Auth } from "aws-amplify";

export default class Login extends Component {
   constructor(props) {
       super(props);
       this.state = {
           email: "",
           password: ""
       };
   }

   validateForm() {
       return this.state.email.length > 0 && this.state.password.length>0;
   }

   handleChange = event => {
       this.setState({
               [event.target.id]: event.target.value
           });
   }

   handleSubmit = async event => {
       event.preventDefault();
       try {
           await Auth.signIn(this.state.email, this.state.password);
           this.props.userHasAuthenticated(true);
           this.props.history.push("/alljots");
       } catch (e) {
           alert(e.message);
       }
      
   }

   render() {
       return (
           <div className="Home">
               <div className="col-md-4">
                   <form onSubmit={this.handleSubmit}>
                       <FormGroup controlId="email">
                           <FormLabel>Email</FormLabel>
                           <FormControl
                               autoFocus
                               type="email"
                               value={this.state.email}
                               onChange={this.handleChange}
                           />
                       </FormGroup>
                       <FormGroup controlId="password" >
                           <FormLabel>Password</FormLabel>
                           <FormControl
                               value={this.state.password}
                               onChange={this.handleChange}
                               type="password"
                           />
                       </FormGroup>
                       <Button type="submit">
                           Login
                       </Button>
                   </form>
               </div>
           </div>
       );
   }
}

Securing API with Cognito:

Earlier in part1, we have created an API called jotter, now go to Authorizers section and create new Authorizer.

Once created, let us go to each resource and add authorization.

Do this for all the resources in the API.

API Gateway integration:

AllJotsList.js

import React from 'react';
import {Button} from 'react-bootstrap';
import {API, Auth} from 'aws-amplify';
import { Link } from "react-router-dom";

export default class AllJotsList extends React.Component{
   constructor(props) {
       super(props);
       this.state = {
           jotlist: []
       }
   };
   handleDelete = async (index) => {
       let djot = this.state.jotlist[index];
       let sessionObject = await Auth.currentSession();
       let idToken = sessionObject.idToken.jwtToken;
       let userid = sessionObject.idToken.payload.sub;
       try {
           const status = await this.DeleteNote(djot.jotid,userid,idToken);
           const jotlist = await this.jots(userid,idToken);
           this.setState({ jotlist });
       } catch (e) {
           alert(e);
       }
   }

   DeleteNote(jotid,userid,idToken) {
       let path = "jot?jotid="+jotid+"&userid="+userid;
       let myInit = {
           headers: { Authorization: idToken }
       }
       return API.del("jotter", path,myInit);
   }

   handleEdit = async (index) => {
       let jotlist = this.state.jotlist;
       let sessionObject = await Auth.currentSession();
       let idToken = sessionObject.idToken.jwtToken;
       let userid = sessionObject.idToken.payload.sub;
       let path = "/view?jotid="+jotlist[index].jotid+"&userid="+userid;
       this.props.history.push(path);
   }

   async componentDidMount() {
       if (!this.props.isAuthenticated) {
           return;
       }
       try {
           let sessionObject = await Auth.currentSession();
           let idToken = sessionObject.idToken.jwtToken;
           let userid = sessionObject.idToken.payload.sub;
           const jotlist = await this.jots(userid,idToken);
           this.setState({ jotlist });
       } catch (e) {
           alert(e);
       }
   }

   jots(userid,idToken) {
       let path = "/alljots?userid="+userid;
       let myInit = {
           headers: { Authorization: idToken }
       }
       return API.get("jotter", path, myInit);
   }

   render(){
       if(this.state.jotlist == 0)
           return(<h2>Please <Link to="/newjot">add jots</Link> to view.</h2>)
       else
           return(
               <div className="col-md-6">
               <table style={{"marginTop":"2%"}} className="table table-hover">
                   <thead>
                       <tr>
                           <th scope="col">#</th>
                           {/* <th scope="col">Last modified</th> */}
                           <th scope="col">Title</th>
                           <th scope="col">Options</th>
                       </tr>
                   </thead>
                   <tbody>
                           {
                           this.state.jotlist.map((jot, index) => {
                           return(
                               <tr key={index}>
                                       <td>{index+1}</td>
                                       {/* <td>{jot.lastmodified}</td> */}
                                       <td>{jot.title}</td>
                                       <td>
                                               <Button variant="outline-info" size="sm"
                                                   type="button"
                                                   onClick={()=>this.handleEdit(index)}
                                                   >Edit
                                               </Button>
                                           {" | "}
                                           <Button variant="outline-danger" size="sm"
                                               type="button"
                                               onClick={()=>this.handleDelete(index)}
                                               >Delete
                                           </Button>
                                       </td>
                               </tr>
                           )})
                           }}
                       </tbody>

               </table>
               </div>   
           );
   }
}

What we are doing here:

  1. An async call to our alljots api to get list of jots available for a given user, using API.get method.
  2. Since we have secured our API gateway with cognito, we need to send the id token in the Authorization header. Once we logged in Amplify automatically sets the session. We simply have to call the Auth.currentSession() method to get the session object. This session object contains all the data including tokens we need.
  3. Once we receive a successful response, if the list is empty we display a message “Please add jots to view.” Otherwise we iterate the list and display them as table.
  4. Each record in the table has an edit, and delete options. Edit option redirects to Viewjot component while delete is handled here itself using API.del method. After a successful response, 1-3 steps are repeated.

In the process of testing, I have deleted and created new jots. Now, you should be seeing a list of all jots of a logged in user.

For Viewjot and CreateJot we are using CKEditor.
Viewjot.js

import React, {Fragment} from 'react';
import {Button} from 'react-bootstrap';
import CKEditor from '@ckeditor/ckeditor5-react';
import BalloonEditor from '@ckeditor/ckeditor5-build-balloon';
import { API, Auth } from "aws-amplify";
import queryString from "query-string";


export default class ViewJot extends React.Component {
   constructor(props, match) {
       super(props,match);
       this.state = {
           titledata: "",
           bodydata: "",
           userid: "",
           jotid: "",
           edit: false
       }
   };
   onTitleChange = (event, editor) => {
       const data = editor.getData();
       this.setState({
           titledata: data
       });
   }
   onBodyChange = (event, editor) => {
       const data = editor.getData();
       this.setState({
           bodydata: data
       });
   }
   validateJot = () =>  {
       return this.state.titledata.length > 0 && this.state.bodydata > 0;
   }
   editJot = () => {
       this.setState({edit:true})
   }
   saveJot = async event => {
       event.preventDefault();
       let note = {
           title: this.state.titledata,
           body: this.state.bodydata,
           userid: this.state.userid,
           jotid: this.state.jotid
       };
       try {
           let sessionObject = await Auth.currentSession();
           let idToken = sessionObject.idToken.jwtToken;
           await this.saveNote(note,idToken);
           this.props.history.push("/alljots");
       }catch (e) {
           alert(e);
       }
   }
   saveNote(note, idToken) {
       let myInit = {
           headers: { Authorization: idToken},
           body: note
       }
       return API.post("jotter", "/newjot", myInit)
   }

   async componentDidMount(){
       try{
           let sessionObject = await Auth.currentSession();
           let idToken = sessionObject.idToken.jwtToken;
           let jot = await this.getNote(idToken);
           this.setState({
               titledata: jot.title,
               bodydata: jot.body,
               userid: jot.userid,
               jotid: jot.jotid
           });
       }catch(e){
           alert(e);
       }
   }

   getNote(idToken) {
       let params = queryString.parse(this.props.location.search);
       let path = "jot?jotid="+params.jotid+"&userid="+params.userid;
       let myInit = {
           headers: { Authorization: idToken }
       }
       return API.get("jotter", path,myInit);
   }
   render() {
       return (
           <Fragment>
               {!this.state.edit &&
               <div style={{marginTop: "1%", marginLeft: "5%"}}>
                   <Button
                       onClick={this.editJot}
                   >
                       Edit
                   </Button>
               </div>}
               {this.state.edit &&
               <div style={{marginTop: "1%", marginLeft: "5%"}}>
                   <Button
                       onClick={this.saveJot}
                   >
                       Save
                   </Button>
               </div>}
               <div style={{marginTop: "1%", marginLeft: "5%", width: "60%",border: "1px solid #cccc", marginBottom: "1%"}}>
                   <CKEditor
                       data={this.state.titledata}
                       editor={BalloonEditor}
                       disabled={!this.state.edit}
                       onChange={(event, editor) => {
                           this.onTitleChange(event, editor);
                   }}/>
               </div>
               <div style={{marginLeft: "5%", width: "60%",border: "1px solid #cccc"}}>
                   <CKEditor
                       data={this.state.bodydata}
                       editor={BalloonEditor}
                       disabled={!this.state.edit}
                       onChange={(event, editor) => {
                           this.onBodyChange(event, editor);
                   }}/>
               </div>          
           </Fragment>
       );
   }
}

What we are doing here is:

  1. As we redirect from all jots table on edit, we send the jot id as path parameter and use this to get data from API using API.get method.
  2. We have 2 modes built using state. We then use CKEditor in disable mode to only view the jot.
  3. Edit mode to edit jot data and a post request to update the data using API.post method.
  4. Same thing with CreateJot but only edit mode.

CreateJot.js

import React, {Fragment} from 'react';
import {Button} from 'react-bootstrap';
import CKEditor from '@ckeditor/ckeditor5-react';
import BalloonEditor from '@ckeditor/ckeditor5-build-balloon';
import { API,Auth } from "aws-amplify";
const titlecontent = "Title..";
const bodycontent = "Start writing notes..";

export default class CreateJot extends React.Component {
   constructor(props) {
       super(props);
       this.state = {
           titledata: titlecontent,
           bodydata: bodycontent,
       }
   };
   onTitleChange = (event, editor) => {
       const data = editor.getData();
       this.setState({
           titledata: data
       });
   }
   onBodyChange = (event, editor) => {
       const data = editor.getData();
       this.setState({
           bodydata: data
       });
   }
   validateJot = () =>  {
       return this.state.titledata.length > 0 && this.state.bodydata > 0;
   }
   createNew = async event => {
       event.preventDefault();
       let sessionObject = await Auth.currentSession();
       let idToken = sessionObject.idToken.jwtToken;
       let userid = sessionObject.idToken.payload.sub;
       let note = {
           title: this.state.titledata,
           body: this.state.bodydata,
           userid: userid,
           jotid: this.createJotid()
       };
       try {
           await this.createNote(note,idToken);
           this.props.history.push("/");
       }catch (e) {
           alert(e);
       }
   }
   createNote(note,idToken) {
       let myInit = {
           headers: { Authorization: idToken },
           body: note
       }
       return API.post("jotter", "/newjot", myInit);
   }
   createJotid(){
       let dt = new Date().getTime();
       let uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
           let r = (dt + Math.random()*16)%16 | 0;
           dt = Math.floor(dt/16);
           return (c=='x' ? r :(r&0x3|0x8)).toString(16);
       });
       return uuid;
   }
   render() {
       return (
           <Fragment>
               <div style={{marginTop: "1%", marginLeft: "5%"}}>
                   <Button
                       onClick={this.createNew}
                   >
                       Create
                   </Button>
               </div>
               <div style={{marginTop: "1%", marginLeft: "5%", width: "60%",border: "1px solid #cccc", marginBottom: "1%"}}>
                   <CKEditor
                       data={titlecontent}
                       editor={BalloonEditor}
                       onChange={(event, editor) => {
                           this.onTitleChange(event, editor);
                   }}/>
               </div>
               <div style={{marginLeft: "5%", width: "60%",border: "1px solid #cccc"}}>
                   <CKEditor
                       data={bodycontent}
                       editor={BalloonEditor}
                       onChange={(event, editor) => {
                           this.onBodyChange(event, editor);
                   }}/>
               </div>          
           </Fragment>
       );
   }
}

Deployment on S3 cloud storage

Manual Deployment:
Create a public S3 bucket. Then build react application using the command:

npm run build

This creates a folder named ‘build’ in the project directory with all the static files. Upload these files into the S3 bucket you created earlier.

After the upload is complete. Goto S3 bucket properties and enable Static website hosting. This will give you a URL endpoint to access the static files from browser. Now your web app is hosted on S3, and can be visited using the URL endpoint. You could also map a custom domain of yours to this endpoint. This way when users visit this custom domain, internally they are redirected to this endpoint.

Open Chrome or Firefox, and visit the endpoint. You should see your app running. Now your application is open to the world.

Auto Deployment:
What about code updates? Should i keep uploading static files to S3 manually again and again? No, you don’t have to. You could either build a sophisticated CI/CD pipeline which uses webhooks like Jenkins that pull code from Git events or automate the manual build and deploy to S3 process using react scripts with aws cli.

Thanks for the read, I hope it was helpful.

This story is authored by Koushik Busim. Koushik is a software engineer and a keen data science and machine learning enthusiast.

Serverless Web Application Architecture using React with Amplify: Part1

In this series we shall be building Jotter (a note taking app). Jotter is a serverless web application built with React and Amplify using AWS cloud services. The illustration below depicts the serverless architecture we are going to build.

Serverless Web Application Architecture using React with Amplify

After deployment:

Serverless React Web Application Hosted on S3

Understanding Serverless:

Just put your code on cloud and run it. Without worrying about infrastructure.

Servers are still there in serverless, but managed by service provider. In our case, the service provider is AWS. Let us learn a little about AWS & its services. Feel free to skip, if you are acquainted with AWS know-how.

Understanding AWS:

Amazon Web Services is a cloud services platform that offers various cloud services. You could build reliable, scalable applications without worrying about managing infrastructure. Amazon Web Services offers a broad set of global cloud-based products including compute, storage, databases, analytics, networking, mobile, developer tools, management tools, IoT, security and enterprise applications. These services help organizations move faster, lower IT costs, and scale.

In a nutshell, AWS is like playing Lego or Minecraft. In lego, you use blocks to create different structures right? We neither create nor maintain those blocks. All we do is join & disjoin blocks logically to construct structures. Similarly, in AWS each service is like a block, we compose these services to develop our serverless application. You could also build a server based web application using AWS EC2 instance configured with AWS Route53, but that’s not the scope of this series.

Advantages:

  1. Granular control over each block. Testing becomes easy, as if a block is malfunctioning, we only have to replace/fix that one block.
  2. Reusability: Certain individual blocks or groups of blocks can be reused for other structures.
  3. We only pay for the usage.
  4. Scalable, high availability and fault tolerance.

That’s broadly about AWS as a platform. We shall first understand these services and then build the serverless web application architecture we viewed earlier.

Services used in the architecture:

  1. DynamoDB (NoSQL Database)
  2. Lambda (Server side computing)
  3. API Gateway (HTTP calls interface)
  4. Cognito (Authentication)
  5. S3 (Cloud storage)
  6. Amplify and React (Client side)

Understanding DynamoDB:

DynamoDB is a distributed NoSQL, schemaless, key-value storage system. Extremely scalable as the amount of data stored mainly depends on the physical memory of the system. While in DynamoDB you don’t have any such limits as you can scale the system horizontally. You will pay only for the resources you provision.

Though it is schemaless it is still represented as a table. Each table is a collection of items. Value(Attributes) of each item can be a scalar, JSON, set. Item size should be less than 400KB (binary, UTF-8). Each item in the table is uniquely identified with a Primary key and is mandatory while creating the table. Primary key can be the same as Partition key or a combination of Partition key and Sort key. If it is a combination of both it is also called Composite primary key.

Note: Primary key cannot be modified once created. Partition key and Sort key values are internally used as input for a hash function to determine storage.

More on indexes and DynamoDB, Parallel scan here.

Understanding Lambda:

Lambda is a serverless compute service. It lets you run code without provisioning or managing servers. You pay only for the compute time you consume. It supports NodeJS, Python, Java, GO etc. Lambda can be triggered from a variety of AWS services. Learn more about Lambda here.

To Every Lambda function handler, 3 objects can passed as argument. 

  1. The first argument is the event object, which contains information from the invoker. The invoker passes this information as a JSON-formatted string when it invokes Lambda. When an AWS service invokes your function, the event object structure varies by service.
  2. The second argument is the context object, which contains information about the invocation, function, and execution environment. In the preceding example, the function gets the name of the log stream from the context object and returns it to the invoker.
  3. The third argument, callback, is a function that you can call in non-async functions to send a response to invoker. The callback function takes two arguments: an Error and a response. The response object must be compatible with JSON.stringify. Error should be null for successful response.

In our app, Lambda is used as a mediator for incoming HTTP requests & DynamoDB. Lambda writes, reads and processes data to/from DynamoDB accordingly.

Understanding API Gateway:

Amazon API Gateway is a fully managed service that makes it easy for developers to create, publish, maintain, monitor, and secure APIs at any scale. 

With a few clicks in the AWS Management Console, you can create REST and WebSocket APIs that act as a front door for applications to access data, business logic, or functionality from your backend services, such as workloads running on EC2, code running on Lambda, any web application, or real-time communication applications.

API Gateway handles all the tasks involved in accepting and processing up to hundreds of thousands of concurrent API calls, including traffic management, authorization and access control, monitoring, and API version management.

In our app, we use API Gateway to invoke different Lambda functions for different API calls.

Understanding Cognito:

Amazon Cognito User Pool makes it easy for developers to add sign-up and sign-in functionality to web and mobile applications. It serves as your own identity provider to maintain a user directory. It supports user registration and sign-in, as well as provisioning identity tokens for signed-in users.

Our Jotter app needs to handle user accounts and authentication in a secure and reliable way. We are going to use Cognito User Pool for it.

Understanding Amplify:

AWS Amplify is a framework provided by AWS to develop applications, with AWS cloud services. Amplify makes the process of stitching cloud services with our application hassle free. Amplify provides different libraries for different apps(iOS, Android, Web, React Native). Amplify javascript library is available as an npm package(aws-amplify). The aws-amplify client library uses a config file to connect AWS services. The services which amplify provides include Database, API, Lambda/serverless, Authentication, Hosting, Storage, Analytics. 

Note: One could also use AWS Amplify CLI to provision AWS services. The aws-amplify client library and Amplify CLI are two different things.

Amplify CLI internally uses cloudformation to provision/create, while aws-amplify client library is used to connect to AWS services. Using Amplify CLI is inconvenient as you are not creating services directly from AWS console but by using a CloudFormation stack internally. If successful, it returns a config file with all the metadata of different services provisioned. Instead a simple approach is to create required services from AWS console and update the config file manually and use it with aws-amplify client library.

In our Jotter app, we will use aws-amplify client javascript library to interact with AWS services.

Building the serverless architecture

API Gateway configured with Lamba to read and write data from DynamoDB.

Let us first build this setup and then add the remaining.

Working with DynamoDB:

Create table:

Go to AWS console > DynamoDB > Tables, choose Create table. As we learned earlier each table is a collection of items and each item is identified with a primary key.

So, give the table a name, set the schema for primary key. Each jot is uniquely identified with the combination of userid (partition key) and jotid (sort key).

Click on Create.

This will create an empty table with 2 columns namely userid, jotid.

Adding sample data:

Each jot item is a json object with the following structure: 

{  
   "userid":"<32 digit uuid code>",
   "jotid":"<32 digit uuid code>",
   "title":"title content",
   "body":"body content"
}

Initially you will be shown only 2 keys userid, jotid as they are part of the primary key. We shall add the remaining 2 attributes (title, body). I have generated 5 random uuid codes for populating the table. We shall be adding 4 items, all of them belong to the same user which means they will have the same userid but different jotid. Click on Create item.

Similarly, add remaining 3 to the table.

We have successfully created a table and added 4 items to the table, now let us create Lambda functions to process data from DynamoDB.

Working with Lambda:

As we have learned earlier, Lambda lets us run code without provisioning or managing servers. We shall use Lambda for our server side computing or business logic in simple terms.

Creating a Lambda Function:
Let us first understand our server side operations that Jotter app needs.

  • Read a list of all items of a user from DynamoDB table.
  • Write a new item to DynamoDB table by a user.
  • Read the full content of an item of a user.
  • Delete an item of a user.

We shall create 4 Lambda functions that will do each of these jobs/operations.

Get all items Lambda:

Every Lambda has to be configured with an IAM role. This IAM role defines access control to other AWS services. We shall create an IAM role for Lambda to access DynamoDB.

Go to IAM > Roles. Choose create role. After role is created, we could attach policies or rules in simple terms. We could either attach default policies provided by AWS or create custom policies tailored to our specific needs.

Custom policy:
Choose service, and then operations. Create policy.

Create IAM policy for DynamoDB write access

This way you could add custom policies to the role or use default AWS policies. I am giving AmazonDynamoDBFullAccess policy as i will be using this role for all my Lambda functions. A better approach would be creating different roles with custom policies for specific operations. Please do exercise all options. Do not give permissions more than needed.

Once role is created, go back to Lambda console, create the Lambda function with that role.

  • Choose Author from scratch
  • Give function a name(get-all-jots)
  • Choose runtime environment(Nodejs 10x)
  • Permissions, choose existing role and add the role created earlier.
  • Create function.

After creation, replace index.js code with below:
In the Lambda code, I used the javascript aws-sdk to connect to DynamoDB.

get-all-jots Lambda:

var AWS = require('aws-sdk');
var ddb = new AWS.DynamoDB({region: 'your table region', apiVersion: '2012-08-10'});
exports.handler = (event,context,callback) => {
    console.log(event.userid);
    // TODO implement
   var params = {
  ExpressionAttributeNames: {
   "#T": "title",
   "#JI": "jotid"
  }, 
  ExpressionAttributeValues: {
   ":id": {
     S: ""+event.userid
    }
  }, 
  FilterExpression: "userid = :id", 
  ProjectionExpression: "#JI,#T", 
  TableName: "jotter"
 };
 ddb.scan(params, function(err, data) {
   if (err){
        console.log(err, err.stack); // an error occurred
        callback(err)
   } 
   else{
        console.log(data);
        let jots = [];
        data.Items.map((item) => {
         let jot = {
          title: item.title.S,
          jotid: item.jotid.S
         }
        jots.push(jot)
        })
        console.log("helloworld",jots);
        callback(null,jots);
   }    
 });
};

Testing get-all-jots Lambda function:
Choose to configure test events beside Test button. Here we configure the event object that is passed as parameter to Lambda function.

Save it, and test the Lambda function. You should see DynamoDB data in the response.

If you don’t see the results, enjoy debugging the logs in CloudWatch. You could navigate to logs from monitoring tab in Lambda console.

Similarly create the remaining 3 Lambda functions and test them by configuring event objects.

create-new-jot Lambda:

var AWS = require('aws-sdk');
var ddb = new AWS.DynamoDB({region: 'your table region', apiVersion: '2012-08-10'});

exports.handler = (event,context, callback) => {
    var params = {
      TableName: 'jotter',
      Item: {
        'userid' : {S: ''+event.userid},
        'title' : {S: ''+event.title},
        'body':{S: ''+event.body},
        'jotid':{S: ''+event.jotid}
      }
    };

ddb.putItem(params, function(err, data) {
  if (err) {
    console.log("Error", err);
    callback(err);
  } 
  else {
    console.log("Success", data);
    callback(null, data);
  }
});
};

Event object for test configuration: (create-new-jot)

{  
   "userid":"<32 digit uuid code>",
   "jotid":"<32 digit uuid code>",
   "title":"title content",
   "body":"body content"
}

get-jot Lambda:

var AWS = require('aws-sdk');
var ddb = new AWS.DynamoDB({region: 'your table region', apiVersion: '2012-08-10'});

exports.handler = (event, context,callback) => {
 let jotid = event.jotid;
 let userid = event.userid;
 
 var params = {
  Key: {
   "jotid": {S: ""+jotid},
   "userid": {S: ""+userid}
  }, 
  TableName: "jotter"
 };
 ddb.getItem(params, function(err, data) {
   if (err){
    console.log(err);
    callback(err)   
   }
   else{
    console.log(data);
     let jot = { 
      "userid": data.Item.userid.S,
      "title": data.Item.title.S,
      "body": data.Item.body.S,
      "jotid": data.Item.jotid.S
    }
    console.log(data);
    callback(null,jot)
   }
 });
};

Event object for test configuration: (get-jot)

{  
   "userid":"<32 digit uuid code>",
   "jotid":"<32 digit uuid code>",
}

delete-jot Lambda:

var AWS = require('aws-sdk');
var ddb = new AWS.DynamoDB({region: 'your table region', apiVersion: '2012-08-10'});

exports.handler = (event, context,callback) => {
 let jotid = event.jotid;
 let userid = event.userid;
 
 var params = {
  Key: {
   "jotid": {S: ""+jotid},
   "userid": {S: ""+userid}
  }, 
  TableName: "jotter"
 };
 
 ddb.deleteItem(params, function(err, data) {
  if (err) {
    console.log(err, err.stack);
    callback(err)
  }
  else {
    console.log(data);
    callback(null,data);
  }
});
};

Event object for test configuration: (delete-jot)

{  
   "userid":"<32 digit uuid code>",
   "jotid":"<32 digit uuid code>",
}

Test all the above Lambda functions, and see if the data you get in the response from callback is correct. For new jot check if the data is updated in the DynamoDB table. If not enjoy debugging the logs. We shall now see how we can invoke these functions from API gateway for each API/http-method call.

Working with API Gateway:

As we learned earlier, it makes it easy for developers to create, publish, manage, and secure APIs at any scale. It acts as a front door to backend services.

Before we start building let us understand our requirements and design our API accordingly.

  • Get a list of all jots of a user.
  • Post a new jot by a user.
  • Get the full content of a jot of a user.
  • Delete a jot of a user.

Accordingly our API request URIs & http methods could be something like this.

  • GET <URL endpoint>/alljots?userid=<id>
  • POST <URL endpoint>/newjot with jot content in body
  • GET <URL endpoint>/jot?userid=<id>&jotid=<id>
  • DELETE <URL endpoint>/jot?userid=<id>&jotid=<id>

Go to console > API Gateway > APIs > Create API

After creating the API, we could add any no. of resources(paths) to it. Let us add first resource.

Enable API Gateway CORS as it allows requests from different origins. Let us add the get method to it.

After adding the method, select the integration type to be Lambda function, fill in Lambda function details like region and name. When this URI is called this Lambda is invoked.

After saving, select GET, it opens method execution.

Understanding Method Execution:

  1. Client requests are passed to Method Request.
  2. Method Request acts like a gatekeeper it authenticates request, if not it rejects the request with 401 status code. Then it passes to Integration Request.
  3. Integration Request controls what goes into action(Lambda in our case). Data mapping & shaping can be done here. 
  4. Call the action with request data as params. 
  5. When action calls back with response, Integration Response handles it.
  6. Integration Response controls what comes out of action. It can also be used to map data to a required structure.
  7. Method Response is used to define Integration Response. 
  8. Method Response responds to the Client.

Adding a Data mapping template to Integration Request:

What mapping templates does is it maps data from request body or params into an object that is passed as event object to action (Lambda function). We do this only when we have to extract and get data into some specific structure our action needs.

Select Integration Request, open Mapping Templates.

After adding Data mapping template to Integration Request, let us test the API.

Testing the API:

Click on TEST in the Method Execution section. Enter the QueryStrings params as:

userid=”uuid”

Click on Test. You should be seeing all the jots of that userid in the Response Body.

Deploy API to a stage:

You could create different deployment stages, like dev, prod, testing etc. Everything we create or modify, for them to take effect, API has to be deployed to a stage. This gives us a URL endpoint, we could use to make API calls. For every change made, we have to deploy or the changes will be local to API Gateway and don’t reflect on the URL endpoint. Each stage has different URL endpoint with different resources.

Now we have successfully created an API with alljots path/resource.

Testing API using postman:

Launch postman, enter the URL, choose HTTP method, add appropriate headers and body if needed. Send the request.

Tada! API is working. Similarly add the remaining 3 URI’s in API Gateway and test them using postman.

Body mapping template for Delete jot and Get jot URI resources is the same, but we pass 2 arguments(userid, jotid) in query string.

Note: The reason why we passed data in query strings for get & delete methods is that these methods do not contain request body, the only means of passing data is by using path parameters or query strings. 

Data mapping template for Delete and Get jot URI resources:

Example for Get: (similarly for Delete)

For newjot post request, data has to be passed in the body in this format:

{  
   "userid":"<32 digit uuid code>",
   "jotid":"<32 digit uuid code>",
   "title":"title content",
   "body":"body content"
}

Example for Post:

Note: And again if things didn’t work properly, There can be many possibilities of why something isn’t working as we are dealing with 3 services here, my answer is start debugging the CloudWatch logs. 

We have successfully built and tested the AWS services part in our serverless architecture. We will look at how to integrate it with a react application, create Sign-in, Sign-up using Cognito and how to secure our API’s in API Gateway with Cognito and finally deploy the app on S3 cloud storage in Part2.

This story is authored by Koushik Busim. Koushik is a software engineer and a keen data science and machine learning enthusiast.

Machine Learning Operations (MLOps) Pipeline using Google Cloud Composer

In an earlier post, we had described the need for automating the Data Engineering pipeline for Machine Learning based systems. Today, we will expand the scope to setup a fully automated MLOps pipeline using Google Cloud Composer.

Cloud Composer

Cloud Composer is official defined as a fully managed workflow orchestration service that empowers you to author, schedule, and monitor pipelines that span across clouds and on-premises data centers. Built on the popular Apache Airflow open source project and operated using the Python programming language, Cloud Composer is free from lock-in and easy to use.

So let’s get on with the required steps to create this MLOps infrastructure on Google Cloud Platform

Creating a Cloud Composer Environment

Step1: Please enable the Cloud Composer API.

Step2: Go to create environment page in GCP console. Composer is available in Big Data section.

Step3: Click on create to start creating a Composer environment

Step4: Please select the Service account which has the required permissions to access GCS, Big Query, ML Engine and  Composer environment. The required roles for accessing Composer environment is Composer Administrator and Composer Worker. 
For more details about access control in Composer environment please see this.

Step5: Please use Python Version 3 and latest Image version.

Step6: Click on create. It will take about 15-20 minutes to create the environment. Once it completes, the environment page shall look like the following.

Click on Airflow to see Airflow WebUI. The Airflow WebUI looks as follows

DAGs folder is where our dag file is stored. DAG folder is nothing but a folder inside a GCS bucket which is created by the environment. To know more about the concept of DAG and general introduction to Airflow, please refer to this post.

You could see Composer related logs in Logging.

Step7: Please add the following PyPI packages in Composer environment.

Click on created environment and navigate to PYPI packages and click on edit to add packages

The required packages are:

# to read data from MongoDB
pymongo==3.8.0
oauth2client==4.1.3
# to read data from firestore
google-cloud-firestore==1.3.0
firebase-admin==2.17.0
google-api-core==1.13.0

Create a ML model

Step1: Please create a folder structure like the following on your instance.

ml_model
├── setup.py
└── trainer
    ├── __init__.py
    └── train.py

Step2: Please place the following code in train.py file, which shall upload the model to GCS bucket as shown below. This model would be used to create model versions as explained a bit later.

from google.cloud import bigquery
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
import numpy as np
from google.cloud import storage
import datetime
import json
import pickle
client = bigquery.Client()
sql = '''
SELECT *
FROM `<PROJECT_ID>.<DATASET>.<TABLENAME>`
'''

df = client.query(sql).to_dataframe()
df = df[['is_stressed', 'is_engaged', 'status']]

df['is_stressed'] = df['is_stressed'].fillna('n')
df['is_engaged'] = df['is_engaged'].fillna('n')
df['stressed'] = np.where(df['is_stressed']=='y', 1, 0)
df['engaged'] = np.where(df['is_engaged']=='y', 1, 0)
df['status'] = np.where(df['status']=='complete', 1, 0)

feature_cols = ['stressed', 'engaged']
X = df[feature_cols]
y = df.status
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.25,random_state=0)
logreg = LogisticRegression()
logreg.fit(X_train,y_train)
pkl_filename = "model.pkl"  
with open(pkl_filename, 'wb') as file:  
    pickle.dump(logreg, file)
BUCKET_NAME=BUCKET_NAME# Upload the model to GCS
bucket = storage.Client().bucket(BUCKET_NAME)
file_path = datetime.datetime.now().strftime('machine_learning/models/%Y%m%d_%H%M%S')
blob = bucket.blob('{}/{}'.format(
    file_path,
    pkl_filename))
blob.upload_from_filename(pkl_filename)

file_location = 'gs://{BUCKET_NAME}/{file_path}'.format(BUCKET_NAME=BUCKET_NAME, file_path=file_path)
file_config = json.dumps({'file_location': file_location})

bucket = storage.Client().bucket(COMPOSER_BUCKET)
blob = bucket.blob('data/file_config.json')
blob.upload_from_string(file_config)

Step3: Create an empty init.py file inside the trainer directory.

Step4: Please place the following code in setup.py file. The setup.py file contains required packages to execute code.

import setuptools

REQUIRED_PACKAGES = [
    'pandas-gbq==0.3.0',
    'cloudml-hypertune',
    'google-cloud-bigquery==1.14.0',
    'urllib3'
]

setuptools.setup(
    name='ml_model',
    version='1.0',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
    description='',
)

Step5: Packaging the code using the following command. It creates a gz file inside ml_model directory.

python3 setup.py sdist

Step6: The package name is the name that is specified in setup.py file. The package name becomes ml_model-1.0.tar.gz
Copy the package to gs://{your-GCS-bucket}/machine_learning/. This becomes the base directory for your machine learning activities described in this post.

Creating a DAG

In this use case, we have created a DAG file which exports some table data from a MongoDB instance into a GCS bucket and then creates a BigQuery table off of that exported data. It trains a model and creates version for that model. The DAG file supports full data extraction and daily data extraction explained in the code below using a variable tot_data. This variable is extracted from Airflow configurations set by the user. This process is also described later in this post.

Please place the following code in the DAG file.

import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
from airflow.operators.python_operator import PythonOperator
import pprint
import json
import re

from pymongo import MongoClient
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import storage
import os

from airflow import models
from mlengine_operator import MLEngineTrainingOperator, MLEngineVersionOperator

ts = datetime.now()
today = str(ts.date()) + 'T00:00:00.000Z'
yester_day = str(ts.date() - timedelta(days = 1)) + 'T00:00:00.000Z'

str_ts = ts.strftime('%Y_%m_%d_%H_%m_%S')

config = Variable.get("mongo_conf", deserialize_json=True)
host = config['host']
db_name = config['db_name']
table_name = config['table_name']
file_prefix = config['file_prefix']
bucket_name = config['bucket_name']
# file_path = file_prefix + '/' + table_name + '.json'
file_path = '{file_prefix}/{table_name}/{table_name}_{str_ts}.json'.format(file_prefix=file_prefix, str_ts=str_ts, table_name=table_name)
file_location = 'gs://' + bucket_name + '/' + file_prefix + '/' + table_name + '/' + table_name + '_*.json'
config['file_location'] = file_location
bq_dataset = config['bq_dataset']
tot_data = config['tot_data'].lower()

BUCKET_NAME = config['ml_configuration']['BUCKET_NAME']
BASE_DIR = config['ml_configuration']['BASE_DIR']
PACKAGE_NAME = config['ml_configuration']['PACKAGE_NAME']
TRAINER_BIN = os.path.join(BASE_DIR, 'packages', PACKAGE_NAME)
TRAINER_MODULE = config['ml_configuration']['TRAINER_MODULE']
RUNTIME_VERSION = config['ml_configuration']['RUNTIME_VERSION']
PROJECT_ID = config['ml_configuration']['PROJECT_ID']
MODEL_NAME = config['ml_configuration']['MODEL_NAME']

MODEL_FILE_BUCKET = config['ml_configuration']['MODEL_FILE_BUCKET']
model_file_loc = config['ml_configuration']['MODEL_FILE_LOCATION']

bucket = storage.Client().bucket(MODEL_FILE_BUCKET)
blob = bucket.get_blob(model_file_loc)
file_config = json.loads(blob.download_as_string().decode("utf-8"))
export_uri = file_config['file_location']

def flatten_json(y):
    out = {}

    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

def mongoexport():
        client = storage.Client()
        bucket = client.get_bucket(bucket_name)
        blob = bucket.blob(file_path)

        client = MongoClient(host)
        db = client[db_name]
        tasks = db[table_name]
        pprint.pprint(tasks.count_documents({}))
        # if tot_data is set to 'yes' in airflow configurations, full data 
        # is processed.  
        if tot_data == 'no':
          query = {"edit_datetime": { "$gte": yester_day, "$lt": today}}
          print(query)
          data = tasks.find(query)
        else:
          data = tasks.find()
        emp_list = []
        for record in data:
                emp_list.append(json.dumps(record, default=str))
        flat_list =[]
        for data in emp_list:
                flat_list.append((flatten_json(json.loads(data))))
        data = '\n'.join(json.dumps({re.sub('[^0-9a-zA-Z_ ]+', '', str(k)).lower().replace(' ', '_'): str(v) for k, v in record.items()}) for record in flat_list)
        blob.upload_from_string(data)

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG('ml_pipeline', schedule_interval=None, default_args=default_args) as dag:

    # priority_weight has type int in Airflow DB, uses the maximum.
    pymongo_export_op = PythonOperator(
        task_id='pymongo_export',
        python_callable=mongoexport,
        )

    update_bq_table_op = BashOperator(
        task_id='update_bq_table',
        bash_command='''
        bq rm -f {bq_dataset}.{table_name}
        bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON --ignore_unknown_values=True {bq_dataset}.{table_name} {file_location}
        '''.format(bq_dataset=bq_dataset, table_name=table_name, file_location=file_location)
        )

    date_nospecial = '{{ execution_date.strftime("%Y%m%d") }}'
    date_min_nospecial = '{{ execution_date.strftime("%Y%m%d_%H%m") }}'
    uuid = '{{ macros.uuid.uuid4().hex[:8] }}'

    training_op = MLEngineTrainingOperator(
      task_id='submit_job_for_training',
      project_id=PROJECT_ID,
      job_id='{}_{}_{}'.format(table_name, date_nospecial, uuid),
      package_uris=[os.path.join(TRAINER_BIN)],
      training_python_module=TRAINER_MODULE,
      training_args=[
          '--base-dir={}'.format(BASE_DIR),
          '--event-date={}'.format(date_nospecial),
      ],
      region='us-central1',
      runtime_version=RUNTIME_VERSION,
      python_version='3.5')

    create_version_op = MLEngineVersionOperator(
      task_id='create_version',
      project_id=PROJECT_ID,
      model_name=MODEL_NAME,
      version={
          'name': 'version_{}_{}'.format(date_min_nospecial, uuid),
          'deploymentUri': export_uri,
          'runtimeVersion': RUNTIME_VERSION,
          'pythonVersion': '3.5',
          'framework': 'SCIKIT_LEARN',
      },
      operation='create')

    pymongo_export_op >> update_bq_table_op >> training_op >> create_version_op

Once file is created, please upload the file to DAGs folder. And also please add the following plugin dependency file named mlengine_operator in DAGs folder.
Place the following code in mlengine_operator.py file.

import re

from apiclient import errors

from airflow.contrib.hooks.gcp_mlengine_hook import MLEngineHook
from airflow.exceptions import AirflowException
from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log


def _normalize_mlengine_job_id(job_id):

    # Add a prefix when a job_id starts with a digit or a template
    match = re.search(r'\d|\{{2}', job_id)
    if match and match.start() is 0:
        job = 'z_{}'.format(job_id)
    else:
        job = job_id

    # Clean up 'bad' characters except templates
    tracker = 0
    cleansed_job_id = ''
    for m in re.finditer(r'\{{2}.+?\}{2}', job):
        cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_',
                                  job[tracker:m.start()])
        cleansed_job_id += job[m.start():m.end()]
        tracker = m.end()

    # Clean up last substring or the full string if no templates
    cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_', job[tracker:])

    return cleansed_job_id


class MLEngineBatchPredictionOperator(BaseOperator):
   
    template_fields = [
        '_project_id',
        '_job_id',
        '_region',
        '_input_paths',
        '_output_path',
        '_model_name',
        '_version_name',
        '_uri',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 job_id,
                 region,
                 data_format,
                 input_paths,
                 output_path,
                 model_name=None,
                 version_name=None,
                 uri=None,
                 max_worker_count=None,
                 runtime_version=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):
        super(MLEngineBatchPredictionOperator, self).__init__(*args, **kwargs)

        self._project_id = project_id
        self._job_id = job_id
        self._region = region
        self._data_format = data_format
        self._input_paths = input_paths
        self._output_path = output_path
        self._model_name = model_name
        self._version_name = version_name
        self._uri = uri
        self._max_worker_count = max_worker_count
        self._runtime_version = runtime_version
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

        if not self._project_id:
            raise AirflowException('Google Cloud project id is required.')
        if not self._job_id:
            raise AirflowException(
                'An unique job id is required for Google MLEngine prediction '
                'job.')

        if self._uri:
            if self._model_name or self._version_name:
                raise AirflowException('Ambiguous model origin: Both uri and '
                                       'model/version name are provided.')

        if self._version_name and not self._model_name:
            raise AirflowException(
                'Missing model: Batch prediction expects '
                'a model name when a version name is provided.')

        if not (self._uri or self._model_name):
            raise AirflowException(
                'Missing model origin: Batch prediction expects a model, '
                'a model & version combination, or a URI to a savedModel.')

    def execute(self, context):
        job_id = _normalize_mlengine_job_id(self._job_id)
        prediction_request = {
            'jobId': job_id,
            'predictionInput': {
                'dataFormat': self._data_format,
                'inputPaths': self._input_paths,
                'outputPath': self._output_path,
                'region': self._region
            }
        }

        if self._uri:
            prediction_request['predictionInput']['uri'] = self._uri
        elif self._model_name:
            origin_name = 'projects/{}/models/{}'.format(
                self._project_id, self._model_name)
            if not self._version_name:
                prediction_request['predictionInput'][
                    'modelName'] = origin_name
            else:
                prediction_request['predictionInput']['versionName'] = \
                    origin_name + '/versions/{}'.format(self._version_name)

        if self._max_worker_count:
            prediction_request['predictionInput'][
                'maxWorkerCount'] = self._max_worker_count

        if self._runtime_version:
            prediction_request['predictionInput'][
                'runtimeVersion'] = self._runtime_version

        hook = MLEngineHook(self._gcp_conn_id, self._delegate_to)

        # Helper method to check if the existing job's prediction input is the
        # same as the request we get here.
        def check_existing_job(existing_job):
            return existing_job.get('predictionInput', None) == \
                prediction_request['predictionInput']

        try:
            finished_prediction_job = hook.create_job(
                self._project_id, prediction_request, check_existing_job)
        except errors.HttpError:
            raise

        if finished_prediction_job['state'] != 'SUCCEEDED':
            self.log.error('MLEngine batch prediction job failed: {}'.format(
                str(finished_prediction_job)))
            raise RuntimeError(finished_prediction_job['errorMessage'])

        return finished_prediction_job['predictionOutput']


class MLEngineModelOperator(BaseOperator):
    template_fields = [
        '_model',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 model,
                 operation='create',
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):
        super(MLEngineModelOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._model = model
        self._operation = operation
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

    def execute(self, context):
        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)
        if self._operation == 'create':
            return hook.create_model(self._project_id, self._model)
        elif self._operation == 'get':
            return hook.get_model(self._project_id, self._model['name'])
        else:
            raise ValueError('Unknown operation: {}'.format(self._operation))


class MLEngineVersionOperator(BaseOperator):
    
    template_fields = [
        '_model_name',
        '_version_name',
        '_version',
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 model_name,
                 version_name=None,
                 version=None,
                 operation='create',
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args,
                 **kwargs):

        super(MLEngineVersionOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._model_name = model_name
        self._version_name = version_name
        self._version = version or {}
        self._operation = operation
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to

    def execute(self, context):
        if 'name' not in self._version:
            self._version['name'] = self._version_name

        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)

        if self._operation == 'create':
            assert self._version is not None
            return hook.create_version(self._project_id, self._model_name,
                                       self._version)
        elif self._operation == 'set_default':
            return hook.set_default_version(self._project_id, self._model_name,
                                            self._version['name'])
        elif self._operation == 'list':
            return hook.list_versions(self._project_id, self._model_name)
        elif self._operation == 'delete':
            return hook.delete_version(self._project_id, self._model_name,
                                       self._version['name'])
        else:
            raise ValueError('Unknown operation: {}'.format(self._operation))


class MLEngineTrainingOperator(BaseOperator):
    
    template_fields = [
        '_project_id',
        '_job_id',
        '_package_uris',
        '_training_python_module',
        '_training_args',
        '_region',
        '_scale_tier',
        '_runtime_version',
        '_python_version',
        '_job_dir'
    ]

    @apply_defaults
    def __init__(self,
                 project_id,
                 job_id,
                 package_uris,
                 training_python_module,
                 training_args,
                 region,
                 scale_tier=None,
                 runtime_version=None,
                 python_version=None,
                 job_dir=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 mode='PRODUCTION',
                 *args,
                 **kwargs):
        super(MLEngineTrainingOperator, self).__init__(*args, **kwargs)
        self._project_id = project_id
        self._job_id = job_id
        self._package_uris = package_uris
        self._training_python_module = training_python_module
        self._training_args = training_args
        self._region = region
        self._scale_tier = scale_tier
        self._runtime_version = runtime_version
        self._python_version = python_version
        self._job_dir = job_dir
        self._gcp_conn_id = gcp_conn_id
        self._delegate_to = delegate_to
        self._mode = mode

        if not self._project_id:
            raise AirflowException('Google Cloud project id is required.')
        if not self._job_id:
            raise AirflowException(
                'An unique job id is required for Google MLEngine training '
                'job.')
        if not package_uris:
            raise AirflowException(
                'At least one python package is required for MLEngine '
                'Training job.')
        if not training_python_module:
            raise AirflowException(
                'Python module name to run after installing required '
                'packages is required.')
        if not self._region:
            raise AirflowException('Google Compute Engine region is required.')

    def execute(self, context):
        job_id = _normalize_mlengine_job_id(self._job_id)
        training_request = {
            'jobId': job_id,
            'trainingInput': {
                'scaleTier': self._scale_tier,
                'packageUris': self._package_uris,
                'pythonModule': self._training_python_module,
                'region': self._region,
                'args': self._training_args,
            }
        }

        if self._runtime_version:
            training_request['trainingInput']['runtimeVersion'] = self._runtime_version

        if self._python_version:
            training_request['trainingInput']['pythonVersion'] = self._python_version

        if self._job_dir:
            training_request['trainingInput']['jobDir'] = self._job_dir

        if self._mode == 'DRY_RUN':
            self.log.info('In dry_run mode.')
            self.log.info('MLEngine Training job request is: {}'.format(
                training_request))
            return

        hook = MLEngineHook(
            gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)

        # Helper method to check if the existing job's training input is the
        # same as the request we get here.
        def check_existing_job(existing_job):
            return existing_job.get('trainingInput', None) == \
                training_request['trainingInput']

        try:
            finished_training_job = hook.create_job(
                self._project_id, training_request, check_existing_job)
        except errors.HttpError:
            raise

        if finished_training_job['state'] != 'SUCCEEDED':
            self.log.error('MLEngine training job failed: {}'.format(
                str(finished_training_job)))
            raise RuntimeError(finished_training_job['errorMessage'])

Import variables from composer_conf.json file into Airflow Variables.
Go to Airflow WebUI → Admin → Variables → Browse to file path or configure variables manually.
Please place the following in composer_conf

{
  "mongo_conf": {
    "host": "mongodb://<instance-internal-ip>:27017",
    "db_name": "DBNAME",
    "table_name": "TABLENAME",
    "file_prefix": "Folder In GCS Bucket",
    "bq_dataset": "BigQuery Dataset",
    "bucket_name": "GCS Bucket",
    "tot_data": "yes",
    "ml_configuration": {
      "BUCKET_NAME": "GCS Bucket",
      "BASE_DIR": "gs://GCS Bucket/machine_learning/",
      "PACKAGE_NAME": "PACKAGE NAME FROM setup.py FILE in ML",
      "TRAINER_MODULE": "trainer.train",
      "RUNTIME_VERSION": "1.13",
      "PROJECT_ID": "GCP Project",
      "MODEL_FILE_BUCKET": "BUCKET CREATED BY Composer Environment",
      "MODEL_FILE_LOCATION": "data/MODEL LOCATION FILE",
      "MODEL_NAME": "MODEL_NAME"
    }
  }

Please store any configuration files or credentials file that are used by Composer in the data folder in the bucket created by Composer environment.

After configuring variables accordingly, you can see the DAG named ml_pipeline in the Airflow WebUI.

Please trigger the DAG file from Airflow WebUI. Once the DAG ran successfully. It looks like the following:

Thanks for the read and look forward to your comments.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Creating an Automated Data Engineering Pipeline for Batch Data in Machine Learning

A common use case in Machine Learning life cycle is to have access to the latest training data so as to prevent model deterioration. A lot of times data scientists find it cumbersome to manually export data from data sources such as relational databases or NoSQL data stores or even distributed data. This necessitates automating the data engineering pipeline in Machine Learning. In this post, we will describe how to set up this pipeline for batch data. This workflow is orchestrated via Airflow and can be set up to run at regular intervals: such as hourly, daily, weekly, etc depending on the specific business requirements.

Quick note – In case you are interested in building a real time data engineering pipeline for ML, please look at this post.

In this use case, we are going to export MongoDB data into Google BigQuery via Cloud Storage. The updated data in BigQuery is then made available in Jupyter Notebook as a Pandas Dataframe for downstream model building and analytics. As the pipeline automates the data ingestion and preprocessing, the data scientists always have access to the latest batch data in their Jupyter Notebooks hosted on Google AI Platform. 

We have a MongoDB service running in an instance and we have Airflow and mongoexport running on docker on another instance. Mongoexport is a utility that produces a JSON or CSV export of data stored in MongoDB. Now the data in MongoDB shall be extracted and transformed using mongoexport and loaded into CloudStorage. Airflow is used to schedule and orchestrate these exports. Once the data is available in CloudStorage it could be queried in BigQuery. We then get this data from BigQuery to Jupyter Notebook. Following is a step by step sequence of steps to set up this data pipeline.

You can create an instance in GCP by going to Compute Engine. Click on create instance.

Install.sh:

sudo apt-get update
curl -fsSL https://get.docker.com -o get-docker.sh
sh get-docker.sh
sudo usermod -aG docker $USER
sudo apt-get install -y python-pip
export AIRFLOW_HOME=~/airflow
sudo pip install apache-airflow
sudo pip install apache-airflow[postgres,s3]
airflow initdb
airflow webserver -p 8080 -D
airflow scheduler -D
sudo docker pull mongo
sudo docker run --name mongo_client -d mongo

Please run the install.sh file using ./install.sh command (please make sure file is executable), which would install Docker, Airflow, pulls Mongo image and runs the mongo image in a container named mongo_client.

After installation, for Airflow webUIhttp://<public-ip-instance>:8080 (You may need to open port 8080 in the network just for your public IP)


Please make sure the Google service account in the running instance must have permissions for accessing Big Query and Cloud Storage. After installation, add the Airflow job Python file (mongo-export.py) inside the airflow/dags folder.

Before running the Python file, please make sure that you create Dataset and create the table in BigQuery. Also change the appropriate values for the MongoDB source database, MongoDB source table, Cloud Storage destination bucket and BigQuery destination dataset in the Airflow job Python file (mongo-export.py). Big Query destination table name is the same as the source table in Mongo DB. 

Mongo-export.py:

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import json
from pandas.io.json import json_normalize

# Following are default arguments which could be overridden
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'email': ['airflow@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

bucket_name = '<Your_Bucket>'
db_name = '<Database_Name>'
dataset = '<Dataset_Name>'
table_name = '<Table_Name>'


time_stamp = datetime.now()
cur_date = time_stamp.strftime("%Y-%m-%d")

# It will flatten the nested json
def flatten_json(y):
    out = {}
    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

def convert_string(y):
    string_type = {}

    def convert(x, name=''):
        if type(x) is dict:
            for a in x:
                convert(str(x[a]), name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            string_type[name[:-1]] = x

    convert(y)
    return string_type


def json_flat():
    lines = [line.rstrip('\n') for line in open('/home/dev/'+ table_name + '-unformat.json')]
    flat_list = []
    for line in lines:
        line = line.replace("\"$", "\"")
        line = json.loads(line)
        try:
            flat_list.append(json.dumps(convert_string(flatten_json(line))))
        except Exception as e:
            print(e)
    flatted_json = '\n'.join(i for i in flat_list)

    with open('/home/dev/' + table_name + '.json', 'a') as file:
        file.write(flatted_json)
    return flatted_json 

dag = DAG('mongoexport-daily-gcs-bq', default_args=default_args, params = {'cur_date': cur_date, 'db_name': db_name, 'table_name': table_name, 'dataset': dataset, 'bucket_name': bucket_name})
#exports provide a table data into docker container 
t1 = BashOperator(
    task_id='mongoexport_to_container',
    bash_command='sudo docker exec -i mongo_client sh -c "mongoexport --host=<instance_public_ip> --db {{params.db_name}} --collection {{params.table_name}} --out {{params.table_name}}-unformat.json"',
    dag=dag)

# copies exported file into instance

t2 = BashOperator(
    task_id='cp_from_container_instance',
    bash_command='sudo docker cp mongo_client:/{{params.table_name}}-unformat.json /home/dev/',
    dag=dag)

t3 = PythonOperator(
    task_id='flattening_json',
    python_callable=json_flat,
    dag=dag)
# copies the flatten data from cloud storage
t4 = BashOperator(
    task_id='cp_from_instance_gcs',
    bash_command='gsutil cp /home/dev/{{params.table_name}}.json gs://{{params.bucket_name}}/raw/{{params.table_name}}/date={{params.cur_date}}/',
    dag=dag)
# 
t5 = BashOperator(
    task_id='cp_from_instance_gcs_daily_data',
    bash_command='gsutil cp /home/dev/{{params.table_name}}.json gs://{{params.bucket_name}}/curated/{{params.table_name}}/',
    dag=dag)

# removes the existing bigquery table
t6 = BashOperator(
    task_id='remove_bq_table',
    bash_command='bq rm -f {{params.dataset}}.{{params.table_name}}',
    dag=dag)
# creates a table in bigquery
t7 = BashOperator(
    task_id='create_bq_table',
    bash_command='bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON {{params.dataset}}.{{params.table_name}} gs://{{params.bucket_name}}/curated/{{params.table_name}}/{{params.table_name}}.json',
    dag=dag)
# removes data from container
t8 = BashOperator(
    task_id='remove_file_from_container',
    bash_command='sudo docker exec -i mongo_client sh -c "rm -rf {{params.table_name}}*.json"',
    dag=dag)
# removes data from instance
t9 = BashOperator(
    task_id='remove_file_from_instance',
    bash_command='rm -rf /home/dev/{{params.table_name}}*.json',
    dag=dag)

t1 >> t2
t2 >> t3
t3 >> [t4, t5]
[t4, t5] >> t6
t6 >> t7
t7 >> [t8, t9]

Then run the python file using python <file-path>.py  

(example: python airflow/dags/mongo-export.py).

After running the python file, the dag name shows in Airflow webUI. And you could trigger the dag manually. Please make sure toggle button is in ON status

Once the job completes, the data is stored in the bucket and also available in the destination table in BigQuery. You could see the table is created in BigQuery. Click on querytable to perform SQL operations and you could see your results in the preview tab at the bottom.

Now, you could access the data in Jupyter Notebook from BigQuery. Search for notebook in GCP console. 

Run the below commands in Jupyter Notebook.

from google.cloud import bigquery
client = bigquery.Client()
sql = """
SELECT * FROM 
`<project-name>.<dataset-name>.<table-name>`
"""
df = client.query(sql).to_dataframe()
df.head(10)

This loads the BigQuery data into Pandas dataframe and can be used for model creation as required. Later when the data pipeline is run as per schedule, the refreshed data would automatically be available in this Jupyter notebook via this SQL query.

Hope this helps you to automate your batch Data Engineering pipeline for Machine Learning. 

This story is co-authored by Santosh and Subbareddy. Santosh is an AWS Cloud Engineer and Subbareddy is a Big Data Engineer.