Processing Kinesis Data Streams with Spark Streaming


Solution Overview : In this blog, we are going to build a real time anomaly detection solution using Spark Streaming. Kinesis Data Streams would act as the input streaming source and the anomalous records would be written as Data Streams in DynamoDB.

Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. KDS can continuously capture gigabytes of data per second from hundreds of thousands of sources such as website clickstreams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events.

Data Streams

The unit of data stored by Kinesis Data Streams is a data record. A data stream represents a group of data records.

For deep dive into Kinesis Data Streams, please go through these official docs.

Kinesis Data Streams Producers

A producer puts data records into Amazon Kinesis Data Streams. For example, a web server sending log data to a Kinesis Data Stream is a producer.

For more details about Kinesis Data Streams Producers, please go through these official docs.

Kinesis Data Streams Consumers

A consumer, known as an Amazon Kinesis Data Streams application, is an application that you build to read and process data records from Kinesis Data Streams.

For more details about Kinesis Data Streams Consumers, please go through these official docs.


Creating a Kinesis Data Stream

Step1. Go to Amazon Kinesis console -> click on Create Data Stream

Step2. Give Kinesis Stream Name and Number of shards as per volume of the incoming data. In this case, Kinesis stream name as kinesis-stream and number of shards are 1.

Shards in Kinesis Data Streams

A shard is a uniquely identified sequence of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity.

For more about shards, please go through these official docs.

Step3. Click on Create Kinesis Stream

Kinesis Data Streams can be connected with Kinesis Data Firehoseto write the streamsinto S3.


Configure Kinesis Data Streams with Kinesis Data Producers

The Amazon Kinesis Data Generator (KDG) makes it easy to send data to Kinesis Streams or Kinesis Firehose.

While following this link, choose to Create a Cognito User with Cloud Formation.

After selecting the above option, we will navigate to the Cloud Formation console:

Click on Next and provide Username and Password for Cognito User for Kinesis Data Generator.

Click on Next and Create Stack.

CloudFormation Stack is created.

Click on Outputs tab and open the link

After opening the link, enter the usernameand password of Cognito user.

After Sign In is completed, select the RegionStream and configure the number of records per second. Choose record template as your requirement.

In this case, the template data format is

{{name.firstName}},{{random.number({“min”:10, “max”:550})}},{{random.arrayElement([“OK”,”FAIL”,”WARN”] )}}

The template data looks like the following

You can send different types of dummy data to Kinesis Data Streams.

Kinesis Data Streams with Kinesis Data Producers are ready. Now we shall build a Spark Streaming application which consumes data streams from Kinesis Data Streams and dumps the output streams into DynamoDB.


Create DynamoDB Tables To Store Data Frame

Go to Amazon DynamoDB console -> Choose Create Table and name the table, in this case, data_dump

In the same way, create another table named anomaly_data. Make sure Kinesis Data streams and DynamoDb tables are in the same region.

Spark Streaming with Kinesis Data Streams

Spark Streaming

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window.

For deep dive into Spark Streams, please go through docs.

In this case, the Scala programming language is used. Scala version is 2.11.12. Please install scala, sbt and spark.

Create a folder structure like the following

Kinesis-spark-streams-dynamo
| -- src/main/scala/packagename/object
| -- build.sbt
| -- project/assembly.sbt

In this case, the structure looks like the following

After creating the folder structure,

Please replace build.sbt file with the following code. The following code will add the required dependencies like spark, spark kinesis assembly, spark streaming and many more.

name := "kinesis-spark-streams-dynamo"

version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies += "com.audienceproject" %% "spark-dynamodb" % "0.4.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3"
libraryDependencies += "com.google.guava" % "guava" % "14.0.1"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.466"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.3"

assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}

Please replace assembly.sbt file with the following code. This will add the assembly plugin which can be used for creating the jar.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")

Please replace kinesis-spark-streams-dynamo file with the following code.

package com.wisdatum.kinesisspark

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.spark._
import org.apache.spark.streaming._
import com.amazonaws.services.kinesis.AmazonKinesis
import scala.collection.JavaConverters._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import org.apache.log4j.{Level, Logger}
import com.audienceproject.spark.dynamodb.implicits._

object KinesisSparkStreamsDynamo {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
.asScala
.find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
.map(_.getName)
.getOrElse(
throw new IllegalArgumentException(s"Could not resolve region for endpoint: $endpoint"))
}

def main(args: Array[String]) {

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

val conf = new SparkConf().setAppName("KinesisSparkExample").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
println("Launching")
val Array(appName, streamName, endpointUrl, dynamoDbTableName) = args
println(streamName)
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()

require(credentials != null,
"No AWS credentials found. Please specify credentials using one of the methods specified " +
"in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
println("numShards are " + numShards)

val numStreams = numShards

val batchInterval = Milliseconds(100)

val kinesisCheckpointInterval = batchInterval

val regionName = getRegionNameByEndpoint(endpointUrl)

val anomalyDynamoTable = "data_anomaly"

println("regionName is " + regionName)

val kinesisStreams = (0 until numStreams).map { i =>
KinesisInputDStream.builder
.streamingContext(ssc)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointAppName(appName)
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
}

val unionStreams = ssc.union(kinesisStreams)

val inputStreamData = unionStreams.map { byteArray =>
val Array(sensorId, temp, status) = new String(byteArray).split(",")
StreamData(sensorId, temp.toInt, status)
}

val inputStream: DStream[StreamData] = inputStreamData

inputStream.window(Seconds(20)).foreachRDD { rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._

val inputStreamDataDF = rdd.toDF()
inputStreamDataDF.createOrReplaceTempView("hot_sensors")

val dataDumpDF = spark.sql("SELECT * FROM hot_sensors ORDER BY currentTemp DESC")
dataDumpDF.show(2)
dataDumpDF.write.dynamodb(dynamoDbTableName)

val anomalyDf = spark.sql("SELECT * FROM hot_sensors WHERE currentTemp > 100 ORDER BY currentTemp DESC")
anomalyDf.write.dynamodb(anomalyDynamoTable)
}

// To make sure data is not deleted by the time we query it interactively
ssc.remember(Minutes(1))

ssc.start()
ssc.awaitTermination()
}
}
case class StreamData(id: String, currentTemp: Int, status: String)

appName: The application name that will be used to checkpoint the Kinesis sequence numbers in the DynamoDB table.

  1. The application name must be unique for a given account and region.
  2. If the table exists but has incorrect checkpoint information (for a different stream, or old expired sequenced numbers), then there may be temporary errors.

kinesisCheckpointInterval
The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.

endpointURL:
Valid Kinesis endpoints URL can be found here.

For more details about building KinesisInputDStream, please go through the documentation.

Configure AWS Credentials using environment variables or using aws configure command.

Make sure all the resources are under the same account and region. Region of CloudFormation Stack that was created is in us-west-2 even though all the resources are in another region, this would not affect the process.


Building Executable Jar

  • Open Terminal -> Go to project root directory, in this case 
    kinesis-spark-streams-dynamo
  • Run sbt assembly

The jar has been packaged into project root directory/target/scala-2.11/XXXX.jar. Name of the jar is the name that provided in build.sbt file.

Run the Jar using spark-submit

  • Open Terminal -> Go to Spark bin directory
  • Run the following command, and it looks like
./bin/spark-submit ~/Desktop/kinesis-spark-streams-dynamo/target/scala-2.11/kinesis-spark-streams-dynamo-assembly-0.1.jar appName streamName endpointUrl dynamoDbTable

To know more about how to submit applications using spark-submit, please review this.

Arguments that are passed are highlighted in the above highlighted blue box. Place the arguments as needed.

Read Kinesis Data Streams in Spark Streams

  1. Go to Amazon Kinesis Data Generator-> Sign In using Cognito user
  2. Click on Send Data, it starts sending data to Kinesis Data Streams

Data would be sent to Kinesis Data Stream, in this case, kinesis-stream, it looks like this.

Monitoring Kinesis Data Streams

Go to Amazon Kinesis Console -> Choose Data streams -> Select created Data Stream -> click on Monitoring

The terminal looks like the following when it starts receiving the data from Kinesis Data Streams

The data_dump table has the whole data that is coming from Kinesis Data Streams. And the data in the data_dump table looks like

The data_anomaly table has data where currentTemp is greater than 100. Here the anomaly is temperature greater than 100. And the data in the data_anomaly table looks like

I hope this article was helpful in setting up Kinesis Data Streams that are consumed and processed using Spark Streaming and stored in DynamoDB.

This story is authored by P V Subbareddy. He is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

A Beginner’s Guide to Airflow

Airflow is used to create code pipeline where we can schedule and monitor our workflows. A workflow can be a collection of tasks to be executed like a flowchart.

It is like an orchestra conductor that controls all different data processing tools/tasks under one roof.

Why Airflow?

  • Open source.
  • Can handle upstream/downstream in an elegant way.
  • Jobs can pass parameters to other jobs downstream.
  • Ease of deployment of workflow.
  • Easy to reprocess historical jobs by date or rerun for specific intervals.
  • Integration with a lot of infrastructure.
  • Job testing through airflow itself.
  • Accessibility of log files and other meta-data through the web GUI.
  • Data sensors to trigger Directed Acyclic Graph (DAG) when data arrives.

Airflow Architecture:

Metadata: It stores the state of tasks and workflow.
Scheduler: It is a user DAG, with the state of tasks in metadata database. It decides what needs to execute.
Executor: It is a message queuing process. Decides which worker will execute each task.

Setting up airflow:

Use the following commands to install airflow

$ sudo pip install apache-airflow

You can install extra features like

$ sudo pip install apache-airflow[postgres,s3]

Airflow requires database to be initiated before you run tasks

$ airflow initdb

After installation the folder structure will be something like this.

If you don’t see ‘dags’ folder then you can create by yourself and name it ‘dags’. You shall dump all your python task files in that folder.

To start the web server

$ Airflow webserver -p 8080

Goto https://localhost:8080 to see airflow GUI. You should see something like this:

Airflow relies on 4 cores:

  1. DAG.
  2. Tasks.
  3. Scheduler.
  4. X-com.

Directed Acyclic Graph (DAG):

It is a collection of all tasks which you want to run in an organized way that shows their relationships and dependencies.

Basically a DAG is just a python file, which is used to organise tasks and set their execution content. DAGs do not perform any actual computation. Basically a DAG task contains 2 type of things:

  1. Operators
    It determines what actually gets done. It triggers a certain action like running a bash command, executing a python function, etc.
    There are different kinds of operators like:
    Bash operator: Executes a bash command.
    Python operator: Calls an arbitrary python function.
    Hive operator: Executes hql code or hive script in specific hive database.
    Bigquery operator: Executes Google BigQuery SQL query in a specific BigQuery database.
  2. Sensors : A certain type of operator that will keep running until a certain criteria is met.

Tasks :
Tasks are the elements that actually “do work” which we want to be performed. It is our job to write the configuration and organise tasks in specific order to create data pipeline.

Once the operator is instantiated, it is referred to as a “task”. An operator describes single task in a workflow

Scheduler :
It monitors all tasks and all dags. And trigger the task instance whose dependencies have met.
We create new python file in dags folder
Importing packages in our python file:

import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

Next setting default arguments

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': dt.datetime(2019, 6, 17, 8, 40, 00),
    'retries': 1,
}

Depends_on_past: we will be in a situation where  a task t1 is dependent on its previous execution. In that case we will use this.

Start_date: starting date of execution
Defining our tasks, dags, functions

def greet():
    print("Hello how are you ?")
    return 'greeted'

def response():
    return "im fine what about you?"

with DAG('YourDagName', default_args=default_args, schedule_interval='@daily',) as dag:
    opr_hello = BashOperator(task_id='say_Hi',bash_command='echo "Hi!!"')
    opr_greet = PythonOperator(task_id='greet', python_callable=greet)
    opr_sleep = BashOperator(task_id='sleep_me', bash_command='sleep 2')
    opr_respond = PythonOperator(task_id='respond', python_callable=response)

In this, I am using 2 simple functions and with dag passing my default arguments and also schedule time of execution.

After that we created 4 tasks in which 2 are python and other 2 are bash. If we want to execute a python function we need to use python operator and also the same with the bash operator. Python operator will take at least 2 arguments which is task id and python callable and for bash, task id and bash command.

Defining task dependencies or order in which the tasks should be executed in.

Task dependencies are set using

  • The set_upstream and set_downstream operator.
  • The bitshift operator <<  and >>.

t1.set_downstream(t2)
t1>>t2

The above 2 lines mean the same. This means t2 will depend on t1 to run successfully.

opr_hello >> opr_greet >> opr_sleep >> opr_respond

If you wrap up the whole code in one file and put that file in dags folder then you can see your dag in Airflow GUI. If you click on your DAG you will see tree view and many other options to view your data in different forms.

Dagrun are dags that runs at certain time.
TaskInstances are task belongs to dagrun.
Each dagrun and task instance is associated with an entry in airflow’s metadata database that logs their state(eg: queued, running, failed, skipped, etc).
X-com: XCom which means cross communication allow airflow tasks to send and receive data/status. XCom data are stored in the airflow database in the form of key-value pairs.

That’s all guys. Thanks for the read!

This story is co-authored by Santosh Kumar and PV Subbareddy. Santosh is a Software Engineer specializing on Cloud Services and DevOps. Subbareddy is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

Building a Simple React Application using Redux

In this article I will be building a simple react app that uses redux to manage its state. If you are a complete beginner, I recommend first reading my A Beginners Guide to Understanding Redux article then following this article. For this article, I have built a simple react app that renders a dummy signup form and a table which displays the list of employee details (dummy data). We shall integrate it with redux but first do the initial setup to follow along.

Initial Setup

Please clone this specific branch ‘without-redux’ from my GitHub repository.

git clone -b without-redux --single-branch https://github.com/koushik-bitzop/newempdetails.git

Run the above command on your terminal, this should create a folder and clone all the required code files from my repository.
To get started install the dependencies:

sudo npm install

Start the application server:

sudo npm start

You should see the app running on your localhost, like this:

The code files that you have cloned and the below are the same.

App.js

import React, {Component} from 'react';
import Header from './components/Header';
import Routes from './components/Routes';
import { BrowserRouter as Router} from 'react-router-dom';

class App extends Component {
 state = {  }
 render() {
   return (
   <Router>
       <Header/>
       <div className="row">
           <Routes />
       </div>
   </Router>
   );
 }
}

export default App;

Routes.js

import React, {Component} from 'react';
import {Switch} from 'react-router';
import {Route} from 'react-router-dom';
import FormdataTable from './FormdataTable';
import SignupForm from './SignupForm';

class Routes extends Component {
   render() {
       return ( 
           <Switch>
               <Route path={'/'} exact component={SignupForm } ></Route>
               <Route path={'/viewlist'} exact component={FormdataTable}></Route>
           </Switch>
       );
   }
}

export default Routes;

Header.js

import React,{Component} from 'react';
import {Link, NavLink} from 'react-router-dom';

class Header extends Component{
   render(){
       return(
           <nav className="navbar navbar-expand-lg navbar-light bg-light">
                  
               <Link to="/" className="navbar-brand" href="#"><h1>Wisdatum</h1></Link>

               <div className="collapse navbar-collapse" id="navbarNav">
                   <ul className="navbar-nav">
                       <li className="nav-item active">
                           <NavLink to="/" className="nav-link" href="#">Add new </NavLink>
                       </li>
                       <li className="nav-item">
                           <NavLink to="/viewlist" className="nav-link" href="#">View list</NavLink>
                       </li>
                   </ul>
               </div>

           </nav>
       );
   }
}

export default Header;

SignupForm.js

import React, {Component} from 'react';
import {Form, FormGroup, FormControl, FormLabel, Button} from 'react-bootstrap';

class SignupForm extends Component {
   constructor(props) {
   super(props);
   this.state = {
       firstname: '',
       lastname: '',
       email: '',
       mobile: '',
       city: ''
   };

   // This binding is necessary to make `this` work in the callback
   this.handleSubmit = this.handleSubmit.bind(this);
   }
handleSubmit(){
   let formdata = {
       firstname: this.state.firstname,
       lastname: this.state.lastname,
       email: this.state.email,
       mobile: this.state.mobile,
       city: this.state.city
   };
   console.log("submitted",formdata);
   this.setState({
       firstname: '',
       lastname: '',
       email: '',
       mobile: '',
       city: ''
   });
}
   render() {
       return (
           <div className="col-md-4 offset-md-4">
               <Form>
                   <h2 style={{"textAlign":"center", "marginTop":"20px"}}>Enter Employee Details</h2>
                   <hr/>
                   <FormGroup>
                       <FormLabel>Firstname</FormLabel>
                       <FormControl
                           type="text"
                           name="firstname"
                           placeholder="Firstname"
                           onChange={e => {
                               this.setState({[e.target.name]:e.target.value});
                           }}
                           value = {this.state.firstname}
                       />
                   </FormGroup>
              
                   <FormGroup>
                       <FormLabel>Lastname</FormLabel>
                       <FormControl
                           type="text"
                           name="lastname"
                           placeholder="Lastname"
                           onChange={e => {
                               this.setState({[e.target.name]:e.target.value});
                           }}
                           value = {this.state.lastname}
                       />
                   </FormGroup>

                   <FormGroup>
                       <FormLabel>Email</FormLabel>
                       <FormControl
                           type="text"
                           name="email"
                           placeholder="Email"
                           onChange={e => {
                               this.setState({[e.target.name]:e.target.value});
                           }}
                           value = {this.state.email}
                       />
                   </FormGroup>

                   <FormGroup>
                       <FormLabel>Mobile</FormLabel>
                       <FormControl
                           type="text"
                           name="mobile"
                           placeholder="Mobile"
                           onChange={e => {
                               this.setState({[e.target.name]:e.target.value});
                           }}
                           value = {this.state.mobile}
                       />
                   </FormGroup>

                   <FormGroup>
                       <FormLabel>City</FormLabel>
                       <FormControl
                           type="text"
                           name="city"
                           placeholder="City/Village"
                           onChange={e => {
                               this.setState({[e.target.name]:e.target.value});
                           }}
                           value = {this.state.city}
                   />
                   </FormGroup>
                   <Button onClick={this.handleSubmit}>Submit</Button>
               </Form>
           </div>
        );
   }
}
export default SignupForm;

FormdataTable.js

import React,{Component} from 'react';
import empdata from '../data/employeedata.json';

class FormdataTable extends Component {
   state = {  }
   render() {
       console.log(empdata);
       return (
           <div className="col-md-10 offset-md-1">
               <h2 style={{"textAlign":"center", "marginTop":"20px", "marginBottom":"20px"}}>New Employee Details</h2>
               <table className="table table-hover">
                   <thead>
                       <tr>
                           <th scope="col">#</th>
                           <th scope="col">Fist Name</th>
                           <th scope="col">Last Name</th>
                           <th scope="col">Email</th>
                           <th scope="col">Mobile</th>
                           <th scope="col">City</th>
                           <th scope="col">Options</th>
                       </tr>
                   </thead>
                   <tbody>
                       {
                       empdata.map((emp, index) => {
                       return(
                           <tr key={index}>
                                   <td>{index+1}</td>
                                   <td>{emp.firstname}</td>
                                   <td>{emp.lastname}</td>
                                   <td>{emp.email}</td>
                                   <td>{emp.mobile}</td>
                                   <td>{emp.city}</td>
                                   <td>
                                       <button
                                           type="button"
                                           onClick={()=>this.handleEdit(index)}
                                           className="btn btn-sm btn-primary">Edit
                                       </button>
                                       {" | "}
                                       <button
                                           type="button"
                                           onClick={()=>this.handleDelete(index)}
                                           className="btn btn-sm btn-danger">Delete
                                       </button>
                                   </td>
                           </tr>
                       )})
                       }
                   </tbody>
               </table>
           </div>
       );
   }
}
export default FormdataTable;

Before we begin integrating with redux, please review and understand the code above. I have used bootstrap for styling and react-router for creating the navigation/routes in the application.

Integrating with redux:

The above diagram depicts redux workflow in a nutshell, UI triggers actions like form-submit, edit & delete in table. Each action represents an action object, these action objects are then dispatched to Reducer function which updates the Store. Store contains the State. All the components get their data from the store and will be updated with new state. This new state will re-render and defines the new UI changes, repeats the cycle.

Redux is external to react we shall need redux & react-redux library to connect to redux with react.

npm install redux react-redux --save

We shall go step by step.

Step1: Create a root reducer function

Reducer function always talks to the store, updates the store and creates the store.

allreducer.js (create in src folder)

import empdata_json from '../data/employeedata.json';

function empdata(state = empdata_json, action){
    switch(action.type){
        default:
            return state;
    }
}

export default empdata;

The above reducer function accepts two arguments current state and an action object. Initial values of this state is set to dummy data imported from employeedata.json file. Whenever this function is invoked, based on the action type, the corresponding block is executed in the switch. Though the logic of the blocks may differ, they all pretty much do the same. Take the payload and return new state to update the store.

Step2: Create the store, make it available to whole tree

We have to import the above reducer function and use createStore method available in redux to create the store.

To make this store available to the whole app component tree we wrap the whole nested app component tree in Provider component available in react-redux store as props.

The <Provider /> makes the Redux store available to any nested components that have been wrapped in the connect() function. We will learn about connect() in the next step. Since any React component in a React Redux app can be connected, most applications will render a <Provider> at the top level, with the entire app’s component tree inside of it.

Normally, you can’t use a connected component unless it is nested inside of a <Provider>.

index.js (modify this file)

import React from 'react';
import ReactDOM from 'react-dom';
import './index.css';
import App from './App';
import * as serviceWorker from './serviceWorker';
import empdatareducer from '../src/allreducer';
import {createStore} from 'redux';
import {Provider} from 'react-redux';

const store = createStore(empdatareducer);
ReactDOM.render(<Provider store={store}><App /></Provider>, document.getElementById('root'));

// If you want your app to work offline and load faster, you can change
// unregister() to register() below. Note this comes with some pitfalls.
// Learn more about service workers: https://bit.ly/CRA-PWA
serviceWorker.unregister();

Step3: Connecting with redux

Now the store is available to whole component tree, components have to be connected to redux and be mapped with the store to get data(as props). The react-redux library comes with a function called connect, which is how you can feed data from Redux’s store into your React components and also dispatch actions to reducer function. The connect function is commonly passed 1 or 2 arguments:

First, a mapStateToProps function that takes out pieces of state out of Redux and assigns them to props that your React component will use.

And often a second argument: a mapDispatchToProps function which binds action creator functions(return action object) with dispatch, So you can just write props.actionName() and you don’t have to write dispatch: actionObject, for every event or action. The dispatch is invoked as soon as action creator returns an action object.

Remember, you can’t dispatch an action to a specific reducer function, Whenever there is a dispatch, it is received by all reducers, only those blocks where action type is matched are executed.

As our FormdataTable.js component requires the store data, we map its props with store.

FormdataTable.js (modify this file)

import React,{Component} from 'react';
import {connect } from 'react-redux';

class FormdataTable extends Component {
  state = {  }
  render() {
      //console.log(this.props.empdata);
      return (
          <div className="col-md-10 offset-md-1">
              <h2 style={{"textAlign":"center", "marginTop":"20px", "marginBottom":"20px"}}>New Employee Details</h2>
              <table className="table table-hover">
                  <thead>
                      <tr>
                          <th scope="col">#</th>
                          <th scope="col">Fist Name</th>
                          <th scope="col">Last Name</th>
                          <th scope="col">Email</th>
                          <th scope="col">Mobile</th>
                          <th scope="col">City</th>
                          <th scope="col">Options</th>
                      </tr>
                  </thead>
                  <tbody>
                      {
                      this.props.empdata.map((emp, index) => {
                      return(
                          <tr key={index}>
                                  <td>{index+1}</td>
                                  <td>{emp.firstname}</td>
                                  <td>{emp.lastname}</td>
                                  <td>{emp.email}</td>
                                  <td>{emp.mobile}</td>
                                  <td>{emp.city}</td>
                                  <td>
                                      <button
                                          type="button"
                                          onClick={()=>this.handleEdit(index)}
                                          className="btn btn-sm btn-primary">Edit
                                      </button>
                                      {" | "}
                                      <button
                                          type="button"
                                          onClick={()=>this.handleDelete(index)}
                                          className="btn btn-sm btn-danger">Delete
                                      </button>
                                  </td>
                          </tr>
                      )})
                      }
                  </tbody>
              </table>
          </div>
      );
  }
}
function mapStateToProps(state){
     return {
       empdata : state
   };
}
export default connect(mapStateToProps,null)(FormdataTable);

Our signup formdata is to be updated with the store, this is an action and should be dispatched. We will first write an action creator function and then bind it with dispatch using bindActionCreators available in redux. Only then we could dispatch it to the reducer to update with the store.

allactions.js (create in src folder)

export function addNewEmp(payload){
   const action = {
       type: "ADD_NEW_EMP",
       payload
   }
   return action;
}

In the above case payload is our formdata. Let us bind this action creator with dispatch and connect to redux to dispatch our action object containing payload(formdata).

SignupForm.js (modify this file)

import React,{Component} from 'react';
import {Form, FormGroup, FormControl, FormLabel, Button} from 'react-bootstrap';

import {connect} from 'react-redux';
import {bindActionCreators} from 'redux';
import {addNewEmp} from '../allactions';

class SignupForm extends Component {
  constructor(props) {
  super(props);
  this.state = {
      firstname: '',
      lastname: '',
      email: '',
      mobile: '',
      city: ''
  };

  // This binding is necessary to make `this` work in the callback
  this.handleSubmit = this.handleSubmit.bind(this);
  }
handleSubmit(){
  let formdata = {
      firstname: this.state.firstname,
      lastname: this.state.lastname,
      email: this.state.email,
      mobile: this.state.mobile,
      city: this.state.city
  };
  this.props.addNewEmp(formdata);
  console.log("submitted",formdata);
  this.setState({
      firstname: '',
      lastname: '',
      email: '',
      mobile: '',
      city: ''
  });
}
  render() {
      return (
          <div className="col-md-4 offset-md-4">
              <Form>
                  <h2 style={{"textAlign":"center", "marginTop":"20px"}}>Enter Employee Details</h2>
                  <hr/>
                  <FormGroup>
                      <FormLabel>Firstname</FormLabel>
                      <FormControl
                          type="text"
                          name="firstname"
                          placeholder="Firstname"
                          onChange={e => {
                              this.setState({[e.target.name]:e.target.value});
                          }}
                          value = {this.state.firstname}
                      />
                  </FormGroup>
            
                  <FormGroup>
                      <FormLabel>Lastname</FormLabel>
                      <FormControl
                          type="text"
                          name="lastname"
                          placeholder="Lastname"
                          onChange={e => {
                              this.setState({[e.target.name]:e.target.value});
                          }}
                          value = {this.state.lastname}
                      />
                  </FormGroup>

                  <FormGroup>
                      <FormLabel>Email</FormLabel>
                      <FormControl
                          type="text"
                          name="email"
                          placeholder="Email"
                          onChange={e => {
                              this.setState({[e.target.name]:e.target.value});
                          }}
                          value = {this.state.email}
                      />
                  </FormGroup>

                  <FormGroup>
                      <FormLabel>Mobile</FormLabel>
                      <FormControl
                          type="text"
                          name="mobile"
                          placeholder="Mobile"
                          onChange={e => {
                              this.setState({[e.target.name]:e.target.value});
                          }}
                          value = {this.state.mobile}
                      />
                  </FormGroup>

                  <FormGroup>
                      <FormLabel>City</FormLabel>
                      <FormControl
                          type="text"
                          name="city"
                          placeholder="City/Village"
                          onChange={e => {
                              this.setState({[e.target.name]:e.target.value});
                          }}
                          value = {this.state.city}
                  />
                  </FormGroup>
                  <Button onClick={this.handleSubmit}>Submit</Button>
              </Form>
          </div>
       );
  }
}

function mapDispatchToProps(dispatch){
   return bindActionCreators({addNewEmp},dispatch)
}
export default connect (null, mapDispatchToProps)(SignupForm)

That simple, we have successfully managed state with redux, now let’s test it.

Now, please try to implement the delete functionality by yourself like shown below

Before deletion:

After deletion:

If you could do it you are doing great. If not please feel free to refer my code files below.

https://github.com/koushik-bitzop/newempdetails/commit/cd9712a89c4de7625cecbd6e494199f0ad5dd20f?diff=split

I hope this article was helpful to you in learning redux. Thanks for the read!

This story is authored by Koushik Busim. Koushik is a software engineer and a keen data science and machine learning enthusiast.